Skip to main content

uni_query/query/
df_planner.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Hybrid physical planner for DataFusion integration.
5//!
6//! This module provides [`HybridPhysicalPlanner`], which converts Cypher's
7//! [`LogicalPlan`] into a DataFusion [`ExecutionPlan`] tree. The "hybrid" nature
8//! refers to the mix of:
9//!
10//! - **Custom graph operators**: `GraphScanExec`, `GraphTraverseExec`, `GraphShortestPathExec`
11//! - **Native DataFusion operators**: `FilterExec`, `AggregateExec`, `SortExec`, etc.
12//!
13//! # Architecture
14//!
15//! ```text
16//! LogicalPlan (Cypher)
17//!        │
18//!        ▼
19//! ┌────────────────────┐
20//! │HybridPhysicalPlanner│
21//! │                    │
22//! │ Graph ops → Custom │
23//! │ Rel ops → DataFusion│
24//! └────────────────────┘
25//!        │
26//!        ▼
27//! ExecutionPlan (DataFusion)
28//! ```
29//!
30//! # Expression Translation
31//!
32//! Cypher expressions are translated to DataFusion expressions using
33//! [`cypher_expr_to_df`] from the `df_expr` module.
34
35use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
36use crate::query::df_graph::bind_fixed_path::BindFixedPathExec;
37use crate::query::df_graph::bind_zero_length_path::BindZeroLengthPathExec;
38use crate::query::df_graph::mutation_common::{MutationKind, extended_schema_for_new_vars};
39use crate::query::df_graph::mutation_create::new_create_exec;
40use crate::query::df_graph::mutation_delete::new_delete_exec;
41use crate::query::df_graph::mutation_merge::new_merge_exec;
42use crate::query::df_graph::mutation_remove::new_remove_exec;
43use crate::query::df_graph::mutation_set::new_set_exec;
44use crate::query::df_graph::recursive_cte::RecursiveCTEExec;
45use crate::query::df_graph::traverse::{
46    GraphVariableLengthTraverseExec, GraphVariableLengthTraverseMainExec,
47};
48use crate::query::df_graph::{
49    GraphApplyExec, GraphExecutionContext, GraphExtIdLookupExec, GraphProcedureCallExec,
50    GraphScanExec, GraphShortestPathExec, GraphTraverseExec, GraphTraverseMainExec,
51    GraphUnwindExec, GraphVectorKnnExec, L0Context, MutationContext, MutationExec,
52    OptionalFilterExec,
53};
54use crate::query::planner::{LogicalPlan, aggregate_column_name, collect_properties_from_plan};
55use anyhow::{Result, anyhow};
56use arrow_schema::{DataType, Schema, SchemaRef};
57use datafusion::common::JoinType;
58use datafusion::execution::SessionState;
59use datafusion::logical_expr::{Expr as DfExpr, ExprSchemable, SortExpr as DfSortExpr};
60use datafusion::physical_expr::{create_physical_expr, create_physical_sort_exprs};
61use datafusion::physical_plan::ExecutionPlan;
62use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
63use datafusion::physical_plan::filter::FilterExec;
64use datafusion::physical_plan::joins::NestedLoopJoinExec;
65use datafusion::physical_plan::limit::LocalLimitExec;
66use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
67use datafusion::physical_plan::projection::ProjectionExec;
68use datafusion::physical_plan::sorts::sort::SortExec;
69use datafusion::physical_plan::udaf::AggregateFunctionExpr;
70use datafusion::physical_plan::union::UnionExec;
71use datafusion::prelude::SessionContext;
72use parking_lot::RwLock;
73use std::collections::{HashMap, HashSet};
74use std::sync::Arc;
75use std::sync::atomic::{AtomicU64, Ordering};
76use uni_algo::algo::AlgorithmRegistry;
77use uni_common::core::schema::{PropertyMeta, Schema as UniSchema};
78use uni_cypher::ast::{
79    CypherLiteral, Direction as AstDirection, Expr, Pattern, PatternElement, SortItem,
80};
81use uni_store::runtime::l0::L0Buffer;
82use uni_store::runtime::property_manager::PropertyManager;
83use uni_store::storage::direction::Direction;
84use uni_store::storage::manager::StorageManager;
85use uni_xervo::runtime::ModelRuntime;
86
87/// An aggregate function expression paired with its optional filter.
88type PhysicalAggregate = (
89    Arc<AggregateFunctionExpr>,
90    Option<Arc<dyn datafusion::physical_expr::PhysicalExpr>>,
91);
92
93/// Hybrid physical planner that produces DataFusion ExecutionPlan trees.
94///
95/// Routes graph operations to custom `ExecutionPlan` implementations
96/// and relational operations to native DataFusion operators.
97///
98/// # Example
99///
100/// ```ignore
101/// let planner = HybridPhysicalPlanner::new(
102///     session_ctx,
103///     storage,
104///     l0,
105///     property_manager,
106///     schema,
107///     params,
108/// );
109///
110/// let execution_plan = planner.plan(&logical_plan)?;
111/// ```
112pub struct HybridPhysicalPlanner {
113    /// DataFusion session context.
114    session_ctx: Arc<RwLock<SessionContext>>,
115
116    /// Storage manager for dataset access.
117    storage: Arc<StorageManager>,
118
119    /// Graph execution context for custom operators.
120    graph_ctx: Arc<GraphExecutionContext>,
121
122    /// Schema for label/edge type lookups.
123    schema: Arc<UniSchema>,
124
125    /// Last flush version for staleness detection.
126    last_flush_version: AtomicU64,
127
128    /// Query parameters for expression translation.
129    params: HashMap<String, uni_common::Value>,
130
131    /// Correlated outer values from Apply input rows (for subquery correlation).
132    /// These take precedence over parameters during variable resolution to prevent
133    /// YIELD columns from shadowing user query parameters.
134    outer_values: HashMap<String, uni_common::Value>,
135
136    /// Mutation context for write operations (CREATE, SET, REMOVE, DELETE).
137    /// Present only when the query contains write clauses.
138    mutation_ctx: Option<Arc<MutationContext>>,
139}
140
141impl std::fmt::Debug for HybridPhysicalPlanner {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("HybridPhysicalPlanner")
144            .field(
145                "last_flush_version",
146                &self.last_flush_version.load(Ordering::Relaxed),
147            )
148            .finish_non_exhaustive()
149    }
150}
151
152impl HybridPhysicalPlanner {
153    /// Create a new hybrid physical planner.
154    ///
155    /// # Arguments
156    ///
157    /// * `session_ctx` - DataFusion session context
158    /// * `storage` - Storage manager for dataset access
159    /// * `l0` - Current L0 buffer for MVCC
160    /// * `property_manager` - Property manager for lazy loading
161    /// * `schema` - Uni schema for lookups
162    pub fn new(
163        session_ctx: Arc<RwLock<SessionContext>>,
164        storage: Arc<StorageManager>,
165        l0: Arc<RwLock<L0Buffer>>,
166        property_manager: Arc<PropertyManager>,
167        schema: Arc<UniSchema>,
168        params: HashMap<String, uni_common::Value>,
169    ) -> Self {
170        let graph_ctx = Arc::new(GraphExecutionContext::new(
171            storage.clone(),
172            l0,
173            property_manager,
174        ));
175
176        Self {
177            session_ctx,
178            storage,
179            graph_ctx,
180            schema,
181            last_flush_version: AtomicU64::new(0),
182            params,
183            outer_values: HashMap::new(),
184            mutation_ctx: None,
185        }
186    }
187
188    /// Resolve the set of property names for `variable` from the collected plan properties.
189    ///
190    /// If the property set contains `"*"`, expands to all schema-defined properties
191    /// for `schema_name` (a label or edge type name). Otherwise filters out the
192    /// wildcard sentinel and returns the explicit property names.
193    fn resolve_properties(
194        &self,
195        variable: &str,
196        schema_name: &str,
197        all_properties: &HashMap<String, HashSet<String>>,
198    ) -> Vec<String> {
199        // System columns managed by the engine — never treat as user properties.
200        const SYSTEM_COLUMNS: &[&str] =
201            &["_vid", "_labels", "_eid", "_src_vid", "_dst_vid", "_type"];
202
203        all_properties
204            .get(variable)
205            .map(|props| {
206                if props.contains("*") {
207                    let schema_props: Vec<String> = self
208                        .schema
209                        .properties
210                        .get(schema_name)
211                        .map(|p| p.keys().cloned().collect())
212                        .unwrap_or_default();
213
214                    // Collect explicit property names (non-wildcard, non-internal)
215                    let explicit: Vec<String> = props
216                        .iter()
217                        .filter(|p| *p != "*" && !p.starts_with('_'))
218                        .cloned()
219                        .collect();
220
221                    if schema_props.is_empty() && explicit.is_empty() {
222                        // Structural-only access, no specific properties needed
223                        return vec!["*".to_string()];
224                    }
225
226                    // Merge schema props + explicit props, dedup
227                    let mut combined: Vec<String> = schema_props;
228                    for p in explicit {
229                        if !combined.contains(&p) {
230                            combined.push(p);
231                        }
232                    }
233                    combined.retain(|p| !SYSTEM_COLUMNS.contains(&p.as_str()));
234                    combined.sort();
235                    combined
236                } else {
237                    let mut explicit_props: Vec<String> = props
238                        .iter()
239                        .filter(|p| *p != "*" && !SYSTEM_COLUMNS.contains(&p.as_str()))
240                        .cloned()
241                        .collect();
242                    explicit_props.sort();
243                    explicit_props
244                }
245            })
246            .unwrap_or_default()
247    }
248
249    /// Create planner with full L0 context.
250    pub fn with_l0_context(
251        session_ctx: Arc<RwLock<SessionContext>>,
252        storage: Arc<StorageManager>,
253        l0_context: L0Context,
254        property_manager: Arc<PropertyManager>,
255        schema: Arc<UniSchema>,
256        params: HashMap<String, uni_common::Value>,
257        outer_values: HashMap<String, uni_common::Value>,
258    ) -> Self {
259        let graph_ctx = Arc::new(GraphExecutionContext::with_l0_context(
260            storage.clone(),
261            l0_context,
262            property_manager,
263        ));
264
265        Self {
266            session_ctx,
267            storage,
268            graph_ctx,
269            schema,
270            last_flush_version: AtomicU64::new(0),
271            params,
272            outer_values,
273            mutation_ctx: None,
274        }
275    }
276
277    /// Unwrap the inner `GraphExecutionContext` from its `Arc`, preserving all
278    /// existing registries. If other Arc references exist, clones the base context
279    /// and re-attaches the saved registries.
280    fn take_graph_ctx(&mut self) -> GraphExecutionContext {
281        let algo_registry = self.graph_ctx.algo_registry().cloned();
282        let procedure_registry = self.graph_ctx.procedure_registry().cloned();
283        let xervo_runtime = self.graph_ctx.xervo_runtime().cloned();
284
285        let new_base = |ctx: &Arc<GraphExecutionContext>| {
286            GraphExecutionContext::with_l0_context(
287                ctx.storage().clone(),
288                ctx.l0_context().clone(),
289                ctx.property_manager().clone(),
290            )
291        };
292        let placeholder = Arc::new(new_base(&self.graph_ctx));
293        let arc = std::mem::replace(&mut self.graph_ctx, placeholder);
294        let mut ctx = Arc::try_unwrap(arc).unwrap_or_else(|arc| new_base(&arc));
295
296        if let Some(registry) = algo_registry {
297            ctx = ctx.with_algo_registry(registry);
298        }
299        if let Some(registry) = procedure_registry {
300            ctx = ctx.with_procedure_registry(registry);
301        }
302        if let Some(runtime) = xervo_runtime {
303            ctx = ctx.with_xervo_runtime(runtime);
304        }
305        ctx
306    }
307
308    /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
309    ///
310    /// Rebuilds the inner `GraphExecutionContext` with the registry attached.
311    pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
312        let ctx = self.take_graph_ctx().with_algo_registry(registry);
313        self.graph_ctx = Arc::new(ctx);
314        self
315    }
316
317    /// Set the external procedure registry for test/user-defined procedures.
318    ///
319    /// Rebuilds the inner `GraphExecutionContext` with the registry attached.
320    pub fn with_procedure_registry(
321        mut self,
322        registry: Arc<crate::query::executor::procedure::ProcedureRegistry>,
323    ) -> Self {
324        let ctx = self.take_graph_ctx().with_procedure_registry(registry);
325        self.graph_ctx = Arc::new(ctx);
326        self
327    }
328
329    /// Set Uni-Xervo runtime used by query-time vector auto-embedding.
330    pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
331        let ctx = self.take_graph_ctx().with_xervo_runtime(runtime);
332        self.graph_ctx = Arc::new(ctx);
333        self
334    }
335
336    /// Set the mutation context for write operations.
337    pub fn with_mutation_context(mut self, ctx: Arc<MutationContext>) -> Self {
338        self.mutation_ctx = Some(ctx);
339        self
340    }
341
342    /// Return the graph execution context (for columnar subplan execution).
343    pub fn graph_ctx(&self) -> &Arc<GraphExecutionContext> {
344        &self.graph_ctx
345    }
346
347    /// Return the DataFusion session context (for columnar subplan execution).
348    pub fn session_ctx(&self) -> &Arc<RwLock<SessionContext>> {
349        &self.session_ctx
350    }
351
352    /// Return the storage manager (for columnar subplan execution).
353    pub fn storage(&self) -> &Arc<StorageManager> {
354        &self.storage
355    }
356
357    /// Return the schema (for columnar subplan execution).
358    pub fn schema_info(&self) -> &Arc<UniSchema> {
359        &self.schema
360    }
361
362    /// Get the mutation context, returning an error if not set.
363    fn require_mutation_ctx(&self) -> Result<Arc<MutationContext>> {
364        self.mutation_ctx.clone().ok_or_else(|| {
365            tracing::error!(
366                "Mutation context not set — this indicates a routing bug where a write \
367                 operation was sent to the DataFusion engine without a MutationContext"
368            );
369            anyhow!("Mutation context not set — write operations require a MutationContext")
370        })
371    }
372
373    /// Build a `TranslationContext` with variable kinds collected from a LogicalPlan.
374    ///
375    /// This is used for expression translation in filters, projections, etc.
376    /// where bare variable references need to resolve to identity columns.
377    fn translation_context_for_plan(&self, plan: &LogicalPlan) -> TranslationContext {
378        let mut variable_kinds = HashMap::new();
379        let mut variable_labels = HashMap::new();
380        let mut node_variable_hints = Vec::new();
381        let mut mutation_edge_hints = Vec::new();
382        collect_variable_kinds(plan, &mut variable_kinds);
383        collect_mutation_node_hints(plan, &mut node_variable_hints);
384        collect_mutation_edge_hints(plan, &mut mutation_edge_hints);
385        self.collect_variable_labels(plan, &mut variable_labels);
386        TranslationContext {
387            parameters: self.params.clone(),
388            outer_values: self.outer_values.clone(),
389            variable_labels,
390            variable_kinds,
391            node_variable_hints,
392            mutation_edge_hints,
393            ..Default::default()
394        }
395    }
396
397    /// Recursively collect variable-to-label/type mappings from a `LogicalPlan`.
398    ///
399    /// For node variables, maps to the first label name. For edge variables, maps
400    /// to the edge type name (when a single type is known). This is used by
401    /// `type(r)` to resolve the edge type as a string literal.
402    fn collect_variable_labels(&self, plan: &LogicalPlan, labels: &mut HashMap<String, String>) {
403        match plan {
404            LogicalPlan::Scan {
405                variable,
406                labels: scan_labels,
407                ..
408            }
409            | LogicalPlan::ScanMainByLabels {
410                variable,
411                labels: scan_labels,
412                ..
413            } => {
414                if let Some(first) = scan_labels.first() {
415                    labels.insert(variable.clone(), first.clone());
416                }
417            }
418            LogicalPlan::Traverse {
419                input,
420                step_variable,
421                edge_type_ids,
422                target_variable,
423                target_label_id,
424                ..
425            } => {
426                self.collect_variable_labels(input, labels);
427                if let Some(sv) = step_variable
428                    && edge_type_ids.len() == 1
429                    && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
430                {
431                    labels.insert(sv.clone(), name.to_string());
432                }
433                if *target_label_id != 0
434                    && let Some(name) = self.schema.label_name_by_id(*target_label_id)
435                {
436                    labels.insert(target_variable.clone(), name.to_string());
437                }
438            }
439            LogicalPlan::TraverseMainByType {
440                input,
441                step_variable,
442                type_names,
443                ..
444            } => {
445                self.collect_variable_labels(input, labels);
446                if let Some(sv) = step_variable
447                    && type_names.len() == 1
448                {
449                    labels.insert(sv.clone(), type_names[0].clone());
450                }
451            }
452            // Wrapper nodes: recurse into input(s)
453            LogicalPlan::Filter { input, .. }
454            | LogicalPlan::Project { input, .. }
455            | LogicalPlan::Sort { input, .. }
456            | LogicalPlan::Limit { input, .. }
457            | LogicalPlan::Aggregate { input, .. }
458            | LogicalPlan::Distinct { input, .. }
459            | LogicalPlan::Window { input, .. }
460            | LogicalPlan::Unwind { input, .. }
461            | LogicalPlan::Create { input, .. }
462            | LogicalPlan::CreateBatch { input, .. }
463            | LogicalPlan::Merge { input, .. }
464            | LogicalPlan::Set { input, .. }
465            | LogicalPlan::Remove { input, .. }
466            | LogicalPlan::Delete { input, .. }
467            | LogicalPlan::Foreach { input, .. }
468            | LogicalPlan::SubqueryCall { input, .. } => {
469                self.collect_variable_labels(input, labels);
470            }
471            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
472                self.collect_variable_labels(left, labels);
473                self.collect_variable_labels(right, labels);
474            }
475            LogicalPlan::Apply {
476                input, subquery, ..
477            } => {
478                self.collect_variable_labels(input, labels);
479                self.collect_variable_labels(subquery, labels);
480            }
481            LogicalPlan::Explain { plan } => {
482                self.collect_variable_labels(plan, labels);
483            }
484            _ => {}
485        }
486    }
487
488    fn merged_edge_type_properties(&self, edge_type_ids: &[u32]) -> HashMap<String, PropertyMeta> {
489        crate::query::df_graph::common::merged_edge_schema_props(&self.schema, edge_type_ids)
490    }
491
492    /// Plan a logical plan into an execution plan.
493    ///
494    /// # Arguments
495    ///
496    /// * `logical` - The logical plan to convert
497    ///
498    /// # Returns
499    ///
500    /// DataFusion ExecutionPlan ready for execution.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if planning fails (unsupported operation, schema mismatch, etc.)
505    pub fn plan(&self, logical: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
506        // Collect all properties needed anywhere in the plan tree
507        let all_properties = collect_properties_from_plan(logical);
508
509        // Delegate to internal planning with properties context
510        self.plan_internal(logical, &all_properties)
511    }
512
513    /// Plan a LogicalPlan with additional property requirements.
514    ///
515    /// Merges `extra_properties` into the auto-collected properties from the plan tree.
516    /// Used by MERGE execution to ensure structural projections are applied for
517    /// variables that need full node/edge Maps in the output.
518    pub fn plan_with_properties(
519        &self,
520        logical: &LogicalPlan,
521        extra_properties: HashMap<String, HashSet<String>>,
522    ) -> Result<Arc<dyn ExecutionPlan>> {
523        let mut all_properties = collect_properties_from_plan(logical);
524        for (var, props) in extra_properties {
525            all_properties.entry(var).or_default().extend(props);
526        }
527        self.plan_internal(logical, &all_properties)
528    }
529
530    /// Wrap a plan with optional semantics.
531    ///
532    /// If optional is true, performs a Left Outer Join with a single-row source (PlaceholderRow)
533    /// to ensure at least one row (of NULLs) is returned if the input is empty.
534    ///
535    /// Conceptually: SELECT * FROM (SELECT 1) LEFT JOIN Plan ON true
536    fn wrap_optional(
537        &self,
538        plan: Arc<dyn ExecutionPlan>,
539        optional: bool,
540    ) -> Result<Arc<dyn ExecutionPlan>> {
541        if !optional {
542            return Ok(plan);
543        }
544
545        // Create a single-row source
546        let empty_schema = Arc::new(Schema::empty());
547        let placeholder = Arc::new(PlaceholderRowExec::new(empty_schema));
548
549        // Use NestedLoopJoin with Left Outer Join type
550        // This ensures if 'plan' is empty, we get 1 row with all NULLs
551        Ok(Arc::new(NestedLoopJoinExec::try_new(
552            placeholder,
553            plan,
554            None, // No filter
555            &JoinType::Left,
556            None, // No projection
557        )?))
558    }
559
560    fn plan_internal(
561        &self,
562        logical: &LogicalPlan,
563        all_properties: &HashMap<String, HashSet<String>>,
564    ) -> Result<Arc<dyn ExecutionPlan>> {
565        match logical {
566            // === Graph Operations ===
567            LogicalPlan::Scan {
568                label_id,
569                labels,
570                variable,
571                filter,
572                optional,
573            } => {
574                if labels.len() > 1 {
575                    // Multi-label: use main table with intersection semantics
576                    self.plan_multi_label_scan(
577                        labels,
578                        variable,
579                        filter.as_ref(),
580                        *optional,
581                        all_properties,
582                    )
583                } else {
584                    // Single-label: use per-label table
585                    self.plan_scan(
586                        *label_id,
587                        variable,
588                        filter.as_ref(),
589                        *optional,
590                        all_properties,
591                    )
592                }
593            }
594
595            // ScanMainByLabels is now supported via schemaless scan
596            LogicalPlan::ScanMainByLabels {
597                labels,
598                variable,
599                filter,
600                optional,
601            } => {
602                if labels.len() > 1 {
603                    // Multi-label schemaless scan
604                    self.plan_multi_label_scan(
605                        labels,
606                        variable,
607                        filter.as_ref(),
608                        *optional,
609                        all_properties,
610                    )
611                } else if let Some(label_name) = labels.first() {
612                    // Single label schemaless scan
613                    self.plan_schemaless_scan(
614                        label_name,
615                        variable,
616                        filter.as_ref(),
617                        *optional,
618                        all_properties,
619                    )
620                } else {
621                    // Empty labels - should not happen, fallback to scan all
622                    self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties)
623                }
624            }
625
626            // ScanAll is now supported via schemaless scan with empty label
627            LogicalPlan::ScanAll {
628                variable,
629                filter,
630                optional,
631            } => self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties),
632
633            // TraverseMainByType is now supported via schemaless traversal
634            LogicalPlan::TraverseMainByType {
635                type_names,
636                input,
637                direction,
638                source_variable,
639                target_variable,
640                step_variable,
641                min_hops,
642                max_hops,
643                optional,
644                target_filter,
645                path_variable,
646                is_variable_length,
647                scope_match_variables,
648                optional_pattern_vars,
649                edge_filter_expr,
650                path_mode,
651                ..
652            } => {
653                if *is_variable_length {
654                    let vlp_plan = self.plan_traverse_main_by_type_vlp(
655                        input,
656                        type_names,
657                        direction.clone(),
658                        source_variable,
659                        target_variable,
660                        step_variable.as_deref(),
661                        *min_hops,
662                        *max_hops,
663                        path_variable.as_deref(),
664                        *optional,
665                        all_properties,
666                        edge_filter_expr.as_ref(),
667                        path_mode,
668                        scope_match_variables,
669                    )?;
670                    self.apply_schemaless_traverse_filter(
671                        vlp_plan,
672                        target_filter.as_ref(),
673                        source_variable,
674                        target_variable,
675                        step_variable.as_deref(),
676                        path_variable.as_deref(),
677                        true, // is_variable_length
678                        *optional,
679                        optional_pattern_vars,
680                    )
681                } else {
682                    let base_plan = self.plan_traverse_main_by_type(
683                        input,
684                        type_names,
685                        direction.clone(),
686                        source_variable,
687                        target_variable,
688                        step_variable.as_deref(),
689                        *optional,
690                        optional_pattern_vars,
691                        all_properties,
692                        scope_match_variables,
693                    )?;
694                    // Apply edge property filter first, then target node filter.
695                    // Without the target_filter, MATCH (a)-[r]->(b {prop: val}) SET r.x
696                    // would apply SET to ALL edges from a, ignoring b's properties.
697                    let edge_filtered = self.apply_schemaless_traverse_filter(
698                        base_plan,
699                        edge_filter_expr.as_ref(),
700                        source_variable,
701                        target_variable,
702                        step_variable.as_deref(),
703                        path_variable.as_deref(),
704                        false,
705                        *optional,
706                        optional_pattern_vars,
707                    )?;
708                    self.apply_schemaless_traverse_filter(
709                        edge_filtered,
710                        target_filter.as_ref(),
711                        source_variable,
712                        target_variable,
713                        step_variable.as_deref(),
714                        path_variable.as_deref(),
715                        false,
716                        *optional,
717                        optional_pattern_vars,
718                    )
719                }
720            }
721
722            LogicalPlan::Traverse {
723                input,
724                edge_type_ids,
725                direction,
726                source_variable,
727                target_variable,
728                target_label_id,
729                step_variable,
730                min_hops,
731                max_hops,
732                optional,
733                target_filter,
734                path_variable,
735                is_variable_length,
736                optional_pattern_vars,
737                scope_match_variables,
738                edge_filter_expr,
739                path_mode,
740                qpp_steps,
741                ..
742            } => self.plan_traverse(
743                input,
744                edge_type_ids,
745                direction.clone(),
746                source_variable,
747                target_variable,
748                *target_label_id,
749                step_variable.as_deref(),
750                *min_hops,
751                *max_hops,
752                path_variable.as_deref(),
753                *optional,
754                target_filter.as_ref(),
755                *is_variable_length,
756                optional_pattern_vars,
757                all_properties,
758                scope_match_variables,
759                edge_filter_expr.as_ref(),
760                path_mode,
761                qpp_steps.as_deref(),
762            ),
763
764            LogicalPlan::ShortestPath {
765                input,
766                edge_type_ids,
767                direction,
768                source_variable,
769                target_variable,
770                target_label_id: _,
771                path_variable,
772                min_hops: _,
773                max_hops: _,
774            } => self.plan_shortest_path(
775                input,
776                edge_type_ids,
777                direction.clone(),
778                source_variable,
779                target_variable,
780                path_variable,
781                false,
782                all_properties,
783            ),
784
785            // === Relational Operations ===
786            LogicalPlan::Filter {
787                input,
788                predicate,
789                optional_variables,
790            } => self.plan_filter(input, predicate, optional_variables, all_properties),
791
792            LogicalPlan::Project { input, projections } => {
793                // Build alias map for ORDER BY alias resolution
794                // When plan is Project(Limit(Sort(...))), Sort needs to know aliases
795                let alias_map: HashMap<String, Expr> = projections
796                    .iter()
797                    .filter_map(|(expr, alias)| alias.as_ref().map(|a| (a.clone(), expr.clone())))
798                    .collect();
799
800                // Check if the input chain contains a Sort and pass alias map
801                self.plan_project_with_aliases(input, projections, all_properties, &alias_map)
802            }
803
804            LogicalPlan::Aggregate {
805                input,
806                group_by,
807                aggregates,
808            } => self.plan_aggregate(input, group_by, aggregates, all_properties),
809
810            LogicalPlan::Distinct { input } => {
811                let input_plan = self.plan_internal(input, all_properties)?;
812                let schema = input_plan.schema();
813                // Group by all columns with no aggregates = deduplication
814                let group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
815                    schema
816                        .fields()
817                        .iter()
818                        .enumerate()
819                        .map(|(i, f)| {
820                            (
821                                Arc::new(datafusion::physical_expr::expressions::Column::new(
822                                    f.name(),
823                                    i,
824                                ))
825                                    as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
826                                f.name().clone(),
827                            )
828                        })
829                        .collect();
830                let group_by = PhysicalGroupBy::new_single(group_exprs);
831                Ok(Arc::new(AggregateExec::try_new(
832                    AggregateMode::Single,
833                    group_by,
834                    vec![],
835                    vec![],
836                    input_plan.clone(),
837                    input_plan.schema(),
838                )?))
839            }
840
841            LogicalPlan::Sort { input, order_by } => {
842                self.plan_sort(input, order_by, all_properties, &HashMap::new())
843            }
844
845            LogicalPlan::Limit { input, skip, fetch } => {
846                self.plan_limit(input, *skip, *fetch, all_properties)
847            }
848
849            LogicalPlan::Union { left, right, all } => {
850                self.plan_union(left, right, *all, all_properties)
851            }
852
853            LogicalPlan::Empty => self.plan_empty(),
854
855            LogicalPlan::BindZeroLengthPath {
856                input,
857                node_variable,
858                path_variable,
859            } => {
860                self.plan_bind_zero_length_path(input, node_variable, path_variable, all_properties)
861            }
862
863            LogicalPlan::BindPath {
864                input,
865                node_variables,
866                edge_variables,
867                path_variable,
868            } => self.plan_bind_path(
869                input,
870                node_variables,
871                edge_variables,
872                path_variable,
873                all_properties,
874            ),
875
876            // === Mutation operators ===
877            LogicalPlan::Create { input, pattern } => {
878                tracing::debug!("Planning MutationCreateExec");
879                let child = self.plan_internal(input, all_properties)?;
880                let mutation_ctx = self.require_mutation_ctx()?;
881                Ok(Arc::new(new_create_exec(
882                    child,
883                    pattern.clone(),
884                    mutation_ctx,
885                )))
886            }
887            LogicalPlan::CreateBatch { input, patterns } => {
888                tracing::debug!(
889                    patterns = patterns.len(),
890                    "Planning MutationCreateExec (batch)"
891                );
892                let child = self.plan_internal(input, all_properties)?;
893                let mutation_ctx = self.require_mutation_ctx()?;
894                // Use a single MutationExec with CreateBatch to avoid N nested
895                // operators (which cause stack overflow for large N).
896                let output_schema = extended_schema_for_new_vars(&child.schema(), patterns);
897                Ok(Arc::new(MutationExec::new_with_schema(
898                    child,
899                    MutationKind::CreateBatch {
900                        patterns: patterns.clone(),
901                    },
902                    "MutationCreateExec",
903                    mutation_ctx,
904                    output_schema,
905                )))
906            }
907            LogicalPlan::Set { input, items } => {
908                tracing::debug!(items = items.len(), "Planning MutationSetExec");
909                let child = self.plan_internal(input, all_properties)?;
910                let mutation_ctx = self.require_mutation_ctx()?;
911                Ok(Arc::new(new_set_exec(child, items.clone(), mutation_ctx)))
912            }
913            LogicalPlan::Remove { input, items } => {
914                tracing::debug!(items = items.len(), "Planning MutationRemoveExec");
915                let child = self.plan_internal(input, all_properties)?;
916                let mutation_ctx = self.require_mutation_ctx()?;
917                Ok(Arc::new(new_remove_exec(
918                    child,
919                    items.clone(),
920                    mutation_ctx,
921                )))
922            }
923            LogicalPlan::Delete {
924                input,
925                items,
926                detach,
927            } => {
928                tracing::debug!(
929                    items = items.len(),
930                    detach = detach,
931                    "Planning MutationDeleteExec"
932                );
933                let child = self.plan_internal(input, all_properties)?;
934                let mutation_ctx = self.require_mutation_ctx()?;
935                Ok(Arc::new(new_delete_exec(
936                    child,
937                    items.clone(),
938                    *detach,
939                    mutation_ctx,
940                )))
941            }
942            LogicalPlan::Merge {
943                input,
944                pattern,
945                on_match,
946                on_create,
947            } => {
948                tracing::debug!("Planning MutationMergeExec");
949                let child = self.plan_internal(input, all_properties)?;
950                let mutation_ctx = self.require_mutation_ctx()?;
951                Ok(Arc::new(new_merge_exec(
952                    child,
953                    pattern.clone(),
954                    on_match.clone(),
955                    on_create.clone(),
956                    mutation_ctx,
957                )))
958            }
959
960            LogicalPlan::Window {
961                input,
962                window_exprs,
963            } => {
964                let input_plan = self.plan_internal(input, all_properties)?;
965                if !window_exprs.is_empty() {
966                    self.plan_window_functions(input_plan, window_exprs, Some(input.as_ref()))
967                } else {
968                    Ok(input_plan)
969                }
970            }
971
972            LogicalPlan::CrossJoin { left, right } => {
973                let left_plan = self.plan_internal(left, all_properties)?;
974                let right_plan = self.plan_internal(right, all_properties)?;
975
976                // For Locy IS-ref joins (graph scan × derived scan), strip structural
977                // projection columns (Struct-typed bare variable columns like "a", "b")
978                // from the graph scan output that conflict with derived scan column names.
979                // Non-conflicting struct columns (e.g., edge "e") are preserved for
980                // typed property access.
981                let left_plan = if matches!(right.as_ref(), LogicalPlan::LocyDerivedScan { .. }) {
982                    let derived_schema = right_plan.schema();
983                    let derived_names: HashSet<&str> = derived_schema
984                        .fields()
985                        .iter()
986                        .map(|f| f.name().as_str())
987                        .collect();
988                    strip_conflicting_structural_columns(left_plan, &derived_names)?
989                } else {
990                    left_plan
991                };
992
993                Ok(Arc::new(
994                    datafusion::physical_plan::joins::CrossJoinExec::new(left_plan, right_plan),
995                ))
996            }
997
998            LogicalPlan::Apply {
999                input,
1000                subquery,
1001                input_filter,
1002            } => self.plan_apply(input, subquery, input_filter.as_ref(), all_properties),
1003
1004            LogicalPlan::Unwind {
1005                input,
1006                expr,
1007                variable,
1008            } => self.plan_unwind(
1009                input.as_ref().clone(),
1010                expr.clone(),
1011                variable.clone(),
1012                all_properties,
1013            ),
1014
1015            LogicalPlan::VectorKnn {
1016                label_id,
1017                variable,
1018                property,
1019                query,
1020                k,
1021                threshold,
1022            } => self.plan_vector_knn(
1023                *label_id,
1024                variable,
1025                property,
1026                query.clone(),
1027                *k,
1028                *threshold,
1029                all_properties,
1030            ),
1031
1032            LogicalPlan::InvertedIndexLookup { .. } => Err(anyhow!(
1033                "Full-text search not yet supported in DataFusion engine"
1034            )),
1035
1036            LogicalPlan::AllShortestPaths {
1037                input,
1038                edge_type_ids,
1039                direction,
1040                source_variable,
1041                target_variable,
1042                target_label_id: _,
1043                path_variable,
1044                min_hops: _,
1045                max_hops: _,
1046            } => self.plan_shortest_path(
1047                input,
1048                edge_type_ids,
1049                direction.clone(),
1050                source_variable,
1051                target_variable,
1052                path_variable,
1053                true,
1054                all_properties,
1055            ),
1056
1057            LogicalPlan::QuantifiedPattern { .. } => Err(anyhow!(
1058                "Quantified patterns not yet supported in DataFusion engine"
1059            )),
1060
1061            LogicalPlan::RecursiveCTE {
1062                cte_name,
1063                initial,
1064                recursive,
1065            } => self.plan_recursive_cte(cte_name, initial, recursive, all_properties),
1066
1067            LogicalPlan::ProcedureCall {
1068                procedure_name,
1069                arguments,
1070                yield_items,
1071            } => self.plan_procedure_call(procedure_name, arguments, yield_items, all_properties),
1072
1073            LogicalPlan::SubqueryCall { input, subquery } => {
1074                self.plan_apply(input, subquery, None, all_properties)
1075            }
1076
1077            LogicalPlan::ExtIdLookup {
1078                variable,
1079                ext_id,
1080                filter,
1081                optional,
1082            } => self.plan_ext_id_lookup(variable, ext_id, filter.as_ref(), *optional),
1083
1084            LogicalPlan::Foreach {
1085                input,
1086                variable,
1087                list,
1088                body,
1089            } => {
1090                tracing::debug!(variable = variable.as_str(), "Planning ForeachExec");
1091                let child = self.plan_internal(input, all_properties)?;
1092                let mutation_ctx = self.require_mutation_ctx()?;
1093                Ok(Arc::new(
1094                    super::df_graph::mutation_foreach::ForeachExec::new(
1095                        child,
1096                        variable.clone(),
1097                        list.clone(),
1098                        body.clone(),
1099                        mutation_ctx,
1100                    ),
1101                ))
1102            }
1103
1104            // Locy standalone operators
1105            LogicalPlan::LocyPriority { input, key_columns } => {
1106                let child = self.plan_internal(input, all_properties)?;
1107                let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1108                let priority_col_index = child.schema().index_of("__priority").map_err(|_| {
1109                    anyhow::anyhow!("LocyPriority input must contain __priority column")
1110                })?;
1111                Ok(Arc::new(super::df_graph::locy_priority::PriorityExec::new(
1112                    child,
1113                    key_indices,
1114                    priority_col_index,
1115                )))
1116            }
1117
1118            LogicalPlan::LocyBestBy {
1119                input,
1120                key_columns,
1121                criteria,
1122            } => {
1123                let child = self.plan_internal(input, all_properties)?;
1124                let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1125                let sort_criteria = resolve_best_by_criteria(&child.schema(), criteria)?;
1126                Ok(Arc::new(super::df_graph::locy_best_by::BestByExec::new(
1127                    child,
1128                    key_indices,
1129                    sort_criteria,
1130                    true, // LocyBestBy logical plan always uses deterministic ordering
1131                )))
1132            }
1133
1134            LogicalPlan::LocyFold {
1135                input,
1136                key_columns,
1137                fold_bindings,
1138                strict_probability_domain,
1139                probability_epsilon,
1140            } => {
1141                let child = self.plan_internal(input, all_properties)?;
1142                let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1143                let bindings = resolve_fold_bindings(&child.schema(), fold_bindings)?;
1144                Ok(Arc::new(super::df_graph::locy_fold::FoldExec::new(
1145                    child,
1146                    key_indices,
1147                    bindings,
1148                    *strict_probability_domain,
1149                    *probability_epsilon,
1150                )))
1151            }
1152
1153            LogicalPlan::LocyDerivedScan {
1154                scan_index: _,
1155                data,
1156                schema,
1157            } => Ok(Arc::new(
1158                super::df_graph::locy_fixpoint::DerivedScanExec::new(
1159                    Arc::clone(data),
1160                    Arc::clone(schema),
1161                ),
1162            )),
1163
1164            LogicalPlan::LocyProject {
1165                input,
1166                projections,
1167                target_types,
1168            } => self.plan_locy_project(input, projections, target_types, all_properties),
1169
1170            LogicalPlan::LocyProgram {
1171                strata,
1172                commands,
1173                derived_scan_registry,
1174                max_iterations,
1175                timeout,
1176                max_derived_bytes,
1177                deterministic_best_by,
1178                strict_probability_domain,
1179                probability_epsilon,
1180                exact_probability,
1181                max_bdd_variables,
1182                top_k_proofs,
1183            } => {
1184                let output_schema = super::df_graph::locy_program::stats_schema();
1185
1186                Ok(Arc::new(
1187                    super::df_graph::locy_program::LocyProgramExec::new(
1188                        strata.clone(),
1189                        commands.clone(),
1190                        Arc::clone(derived_scan_registry),
1191                        Arc::clone(&self.graph_ctx),
1192                        Arc::clone(&self.session_ctx),
1193                        Arc::clone(&self.storage),
1194                        Arc::clone(&self.schema),
1195                        self.params.clone(),
1196                        output_schema,
1197                        *max_iterations,
1198                        *timeout,
1199                        *max_derived_bytes,
1200                        *deterministic_best_by,
1201                        *strict_probability_domain,
1202                        *probability_epsilon,
1203                        *exact_probability,
1204                        *max_bdd_variables,
1205                        *top_k_proofs,
1206                    ),
1207                ))
1208            }
1209
1210            // DDL operations should be handled separately
1211            LogicalPlan::CreateVectorIndex { .. }
1212            | LogicalPlan::CreateFullTextIndex { .. }
1213            | LogicalPlan::CreateScalarIndex { .. }
1214            | LogicalPlan::CreateJsonFtsIndex { .. }
1215            | LogicalPlan::DropIndex { .. }
1216            | LogicalPlan::ShowIndexes { .. }
1217            | LogicalPlan::Copy { .. }
1218            | LogicalPlan::Backup { .. }
1219            | LogicalPlan::ShowDatabase
1220            | LogicalPlan::ShowConfig
1221            | LogicalPlan::ShowStatistics
1222            | LogicalPlan::Vacuum
1223            | LogicalPlan::Checkpoint
1224            | LogicalPlan::CopyTo { .. }
1225            | LogicalPlan::CopyFrom { .. }
1226            | LogicalPlan::CreateLabel(_)
1227            | LogicalPlan::CreateEdgeType(_)
1228            | LogicalPlan::AlterLabel(_)
1229            | LogicalPlan::AlterEdgeType(_)
1230            | LogicalPlan::DropLabel(_)
1231            | LogicalPlan::DropEdgeType(_)
1232            | LogicalPlan::CreateConstraint(_)
1233            | LogicalPlan::DropConstraint(_)
1234            | LogicalPlan::ShowConstraints(_)
1235            | LogicalPlan::Explain { .. } => {
1236                Err(anyhow!("DDL/Admin operations should be handled separately"))
1237            }
1238        }
1239    }
1240
1241    /// Like `plan_internal`, but propagates alias mappings to Sort nodes.
1242    /// This is used when a Project wraps a Sort (possibly through Limit)
1243    /// so that ORDER BY can reference projection aliases.
1244    fn plan_internal_with_aliases(
1245        &self,
1246        logical: &LogicalPlan,
1247        all_properties: &HashMap<String, HashSet<String>>,
1248        alias_map: &HashMap<String, Expr>,
1249    ) -> Result<Arc<dyn ExecutionPlan>> {
1250        match logical {
1251            LogicalPlan::Sort { input, order_by } => {
1252                self.plan_sort(input, order_by, all_properties, alias_map)
1253            }
1254            LogicalPlan::Limit { input, skip, fetch } => {
1255                // Propagate aliases through Limit to reach Sort
1256                let input_plan =
1257                    self.plan_internal_with_aliases(input, all_properties, alias_map)?;
1258                if let Some(offset) = skip.filter(|&s| s > 0) {
1259                    use datafusion::physical_plan::limit::GlobalLimitExec;
1260                    Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, *fetch)))
1261                } else {
1262                    Ok(Arc::new(LocalLimitExec::new(
1263                        input_plan,
1264                        fetch.unwrap_or(usize::MAX),
1265                    )))
1266                }
1267            }
1268            // For all other nodes, fall through to normal planning
1269            _ => self.plan_internal(logical, all_properties),
1270        }
1271    }
1272
1273    /// Apply a node-level filter to a scan or lookup plan.
1274    ///
1275    /// Wraps the input plan with a `FilterExec` if `filter` is `Some`.
1276    /// Builds a `TranslationContext` marking `variable` as `VariableKind::Node`
1277    /// for correct expression translation.
1278    fn apply_scan_filter(
1279        &self,
1280        plan: Arc<dyn ExecutionPlan>,
1281        variable: &str,
1282        filter: Option<&Expr>,
1283        label_name: Option<&str>,
1284    ) -> Result<Arc<dyn ExecutionPlan>> {
1285        let Some(filter_expr) = filter else {
1286            return Ok(plan);
1287        };
1288
1289        let mut variable_kinds = HashMap::new();
1290        variable_kinds.insert(variable.to_string(), VariableKind::Node);
1291        let mut variable_labels = HashMap::new();
1292        if let Some(label) = label_name {
1293            variable_labels.insert(variable.to_string(), label.to_string());
1294        }
1295        let ctx = TranslationContext {
1296            parameters: self.params.clone(),
1297            variable_labels,
1298            variable_kinds,
1299            ..Default::default()
1300        };
1301        let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1302
1303        let schema = plan.schema();
1304
1305        let session = self.session_ctx.read();
1306        let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1307
1308        Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1309    }
1310
1311    /// Apply a filter to a schemaless traverse plan (TraverseMainByType).
1312    ///
1313    /// Builds a `TranslationContext` with the appropriate variable kinds for
1314    /// source, target, edge, and path variables, then creates and applies the
1315    /// filter. Used by both VLP (target_filter) and fixed-length (edge_filter)
1316    /// branches of TraverseMainByType planning.
1317    #[expect(clippy::too_many_arguments)]
1318    fn apply_schemaless_traverse_filter(
1319        &self,
1320        plan: Arc<dyn ExecutionPlan>,
1321        filter_expr: Option<&Expr>,
1322        source_variable: &str,
1323        target_variable: &str,
1324        step_variable: Option<&str>,
1325        path_variable: Option<&str>,
1326        is_variable_length: bool,
1327        optional: bool,
1328        optional_pattern_vars: &HashSet<String>,
1329    ) -> Result<Arc<dyn ExecutionPlan>> {
1330        let Some(filter_expr) = filter_expr else {
1331            return Ok(plan);
1332        };
1333
1334        let mut variable_kinds = HashMap::new();
1335        variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
1336        variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
1337        if let Some(sv) = step_variable {
1338            variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
1339        }
1340        if let Some(pv) = path_variable {
1341            variable_kinds.insert(pv.to_string(), VariableKind::Path);
1342        }
1343        let ctx = TranslationContext {
1344            parameters: self.params.clone(),
1345            variable_kinds,
1346            ..Default::default()
1347        };
1348        let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1349        let schema = plan.schema();
1350        let session = self.session_ctx.read();
1351        let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1352
1353        if optional {
1354            Ok(Arc::new(OptionalFilterExec::new(
1355                plan,
1356                physical_filter,
1357                optional_pattern_vars.clone(),
1358            )))
1359        } else {
1360            Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1361        }
1362    }
1363
1364    /// Plan an external ID lookup.
1365    fn plan_ext_id_lookup(
1366        &self,
1367        variable: &str,
1368        ext_id: &str,
1369        filter: Option<&Expr>,
1370        optional: bool,
1371    ) -> Result<Arc<dyn ExecutionPlan>> {
1372        // Collect properties needed from the filter
1373        let properties = if let Some(filter_expr) = filter {
1374            crate::query::df_expr::collect_properties(filter_expr)
1375                .into_iter()
1376                .filter(|(var, _)| var == variable)
1377                .map(|(_, prop)| prop)
1378                .collect()
1379        } else {
1380            vec![]
1381        };
1382
1383        let lookup_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphExtIdLookupExec::new(
1384            self.graph_ctx.clone(),
1385            variable.to_string(),
1386            ext_id.to_string(),
1387            properties,
1388            optional,
1389        ));
1390
1391        self.apply_scan_filter(lookup_plan, variable, filter, None)
1392    }
1393
1394    /// Plan an UNWIND operation.
1395    ///
1396    /// UNWIND expands a list expression into multiple rows.
1397    fn plan_unwind(
1398        &self,
1399        input: LogicalPlan,
1400        expr: Expr,
1401        variable: String,
1402        all_properties: &HashMap<String, HashSet<String>>,
1403    ) -> Result<Arc<dyn ExecutionPlan>> {
1404        // Recursively plan the input
1405        let input_plan = self.plan_internal(&input, all_properties)?;
1406
1407        let unwind = GraphUnwindExec::new(input_plan, expr, variable, self.params.clone());
1408
1409        Ok(Arc::new(unwind))
1410    }
1411
1412    /// Plan a recursive CTE (`WITH RECURSIVE`).
1413    ///
1414    /// Creates a [`RecursiveCTEExec`] that stores the logical plans and
1415    /// re-plans/executes them iteratively at execution time.
1416    fn plan_recursive_cte(
1417        &self,
1418        cte_name: &str,
1419        initial: &LogicalPlan,
1420        recursive: &LogicalPlan,
1421        _all_properties: &HashMap<String, HashSet<String>>,
1422    ) -> Result<Arc<dyn ExecutionPlan>> {
1423        Ok(Arc::new(RecursiveCTEExec::new(
1424            cte_name.to_string(),
1425            initial.clone(),
1426            recursive.clone(),
1427            self.graph_ctx.clone(),
1428            self.session_ctx.clone(),
1429            self.storage.clone(),
1430            self.schema.clone(),
1431            self.params.clone(),
1432        )))
1433    }
1434
1435    /// Plan an Apply (correlated subquery) or SubqueryCall.
1436    fn plan_apply(
1437        &self,
1438        input: &LogicalPlan,
1439        subquery: &LogicalPlan,
1440        input_filter: Option<&Expr>,
1441        all_properties: &HashMap<String, HashSet<String>>,
1442    ) -> Result<Arc<dyn ExecutionPlan>> {
1443        use crate::query::df_graph::common::infer_logical_plan_schema;
1444
1445        // 1. Plan input physically
1446        let input_exec = self.plan_internal(input, all_properties)?;
1447        let input_schema = input_exec.schema();
1448
1449        // 2. Infer subquery output schema from logical plan + UniSchema metadata
1450        let sub_schema = infer_logical_plan_schema(subquery, &self.schema);
1451
1452        // 3. Merge schemas: input fields + subquery fields (skip duplicates by name)
1453        let mut fields: Vec<Arc<arrow_schema::Field>> = input_schema.fields().to_vec();
1454        let input_field_names: HashSet<&str> = input_schema
1455            .fields()
1456            .iter()
1457            .map(|f| f.name().as_str())
1458            .collect();
1459        for field in sub_schema.fields() {
1460            if !input_field_names.contains(field.name().as_str()) {
1461                fields.push(field.clone());
1462            }
1463        }
1464        let output_schema: SchemaRef = Arc::new(Schema::new(fields));
1465
1466        Ok(Arc::new(GraphApplyExec::new(
1467            input_exec,
1468            subquery.clone(),
1469            input_filter.cloned(),
1470            self.graph_ctx.clone(),
1471            self.session_ctx.clone(),
1472            self.storage.clone(),
1473            self.schema.clone(),
1474            self.params.clone(),
1475            output_schema,
1476        )))
1477    }
1478
1479    /// Plan a vector KNN search.
1480    #[expect(clippy::too_many_arguments)]
1481    fn plan_vector_knn(
1482        &self,
1483        label_id: u16,
1484        variable: &str,
1485        property: &str,
1486        query_expr: Expr,
1487        k: usize,
1488        threshold: Option<f32>,
1489        all_properties: &HashMap<String, HashSet<String>>,
1490    ) -> Result<Arc<dyn ExecutionPlan>> {
1491        let label_name = self
1492            .schema
1493            .label_name_by_id(label_id)
1494            .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
1495
1496        let target_properties = self.resolve_properties(variable, label_name, all_properties);
1497
1498        let knn = GraphVectorKnnExec::new(
1499            self.graph_ctx.clone(),
1500            label_id,
1501            label_name,
1502            variable.to_string(),
1503            property.to_string(),
1504            query_expr,
1505            k,
1506            threshold,
1507            self.params.clone(),
1508            target_properties,
1509        );
1510
1511        Ok(Arc::new(knn))
1512    }
1513
1514    /// Plan a procedure call.
1515    fn plan_procedure_call(
1516        &self,
1517        procedure_name: &str,
1518        arguments: &[Expr],
1519        yield_items: &[(String, Option<String>)],
1520        all_properties: &HashMap<String, HashSet<String>>,
1521    ) -> Result<Arc<dyn ExecutionPlan>> {
1522        use crate::query::df_graph::procedure_call::map_yield_to_canonical;
1523
1524        // Build target_properties map for node-like yields in search procedures
1525        let mut target_properties: HashMap<String, Vec<String>> = HashMap::new();
1526
1527        if matches!(
1528            procedure_name,
1529            "uni.vector.query" | "uni.fts.query" | "uni.search"
1530        ) {
1531            for (name, alias) in yield_items {
1532                let output_name = alias.as_ref().unwrap_or(name);
1533                let canonical = map_yield_to_canonical(name);
1534                if canonical == "node" {
1535                    // Collect properties requested for this node variable
1536                    if let Some(props) = all_properties.get(output_name.as_str()) {
1537                        let prop_list: Vec<String> = props
1538                            .iter()
1539                            .filter(|p| *p != "*" && !p.starts_with('_'))
1540                            .cloned()
1541                            .collect();
1542                        target_properties.insert(output_name.clone(), prop_list);
1543                    }
1544                }
1545            }
1546        }
1547
1548        let exec = GraphProcedureCallExec::new(
1549            self.graph_ctx.clone(),
1550            procedure_name.to_string(),
1551            arguments.to_vec(),
1552            yield_items.to_vec(),
1553            self.params.clone(),
1554            self.outer_values.clone(),
1555            target_properties,
1556        );
1557
1558        Ok(Arc::new(exec))
1559    }
1560
1561    /// Plan a vertex scan.
1562    fn plan_scan(
1563        &self,
1564        label_id: u16,
1565        variable: &str,
1566        filter: Option<&Expr>,
1567        optional: bool,
1568        all_properties: &HashMap<String, HashSet<String>>,
1569    ) -> Result<Arc<dyn ExecutionPlan>> {
1570        let label_name = self
1571            .schema
1572            .label_name_by_id(label_id)
1573            .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
1574
1575        // Resolve properties collected from the entire plan tree, expanding "*" wildcards
1576        let mut properties = self.resolve_properties(variable, label_name, all_properties);
1577
1578        // Check if any projected property is NOT in the schema (needs overflow_json)
1579        let label_props = self.schema.properties.get(label_name);
1580        let has_projection_overflow = properties.iter().any(|p| {
1581            p != "overflow_json"
1582                && !p.starts_with('_')
1583                && !label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
1584        });
1585        if has_projection_overflow && !properties.iter().any(|p| p == "overflow_json") {
1586            properties.push("overflow_json".to_string());
1587        }
1588
1589        // If the filter references overflow properties (not in schema), ensure
1590        // `overflow_json` is projected so the DataFusion FilterExec can read it.
1591        if let Some(filter_expr) = filter {
1592            let filter_props = crate::query::df_expr::collect_properties(filter_expr);
1593            let has_overflow = filter_props.iter().any(|(var, prop)| {
1594                var == variable
1595                    && !prop.starts_with('_')
1596                    && label_props.is_none_or(|props| !props.contains_key(prop.as_str()))
1597            });
1598            if has_overflow && !properties.iter().any(|p| p == "overflow_json") {
1599                properties.push("overflow_json".to_string());
1600            }
1601        }
1602
1603        // If we need the full object (structural access), ensure _all_props and
1604        // overflow_json are projected BEFORE creating the scan.
1605        let var_props = all_properties.get(variable);
1606        let need_full = var_props.is_some_and(|p| p.contains("*"));
1607        if need_full {
1608            if !properties.contains(&"_all_props".to_string()) {
1609                properties.push("_all_props".to_string());
1610            }
1611            if !properties.contains(&"overflow_json".to_string()) {
1612                properties.push("overflow_json".to_string());
1613            }
1614        }
1615
1616        let mut scan_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphScanExec::new_vertex_scan(
1617            self.graph_ctx.clone(),
1618            label_name.to_string(),
1619            variable.to_string(),
1620            properties.clone(),
1621            None, // Filter will be applied as FilterExec on top
1622        ));
1623
1624        // Apply filter BEFORE structural projection so that the schema is
1625        // unambiguous (no duplicate `variable._vid` from both flat column and
1626        // struct field). This prevents "Ambiguous reference" errors when
1627        // comparing `_vid` (UInt64) against Int64 literals in type coercion.
1628        scan_plan = self.apply_scan_filter(scan_plan, variable, filter, Some(label_name))?;
1629
1630        if need_full {
1631            // Filter "*" (wildcard marker) and overflow_json from the structural
1632            // projection. Keep _all_props so properties()/keys() UDFs can use it.
1633            let struct_props: Vec<String> = properties
1634                .iter()
1635                .filter(|p| *p != "overflow_json" && *p != "*")
1636                .cloned()
1637                .collect();
1638            scan_plan = self.add_structural_projection(scan_plan, variable, &struct_props)?;
1639        }
1640
1641        self.wrap_optional(scan_plan, optional)
1642    }
1643
1644    /// Plan a schemaless vertex scan using the main vertices table.
1645    ///
1646    /// Used for labels that aren't in the schema - queries the main table
1647    /// with `array_contains(labels, 'X')` filter and extracts properties from `props_json`.
1648    /// Add a structural projection for a variable if wildcard access ("*") is needed.
1649    ///
1650    /// Derives the property list from the plan's output schema (columns with the
1651    /// variable prefix) and wraps them into a Struct column via `add_structural_projection`.
1652    fn add_wildcard_structural_projection(
1653        &self,
1654        plan: Arc<dyn ExecutionPlan>,
1655        variable: &str,
1656        all_properties: &HashMap<String, HashSet<String>>,
1657    ) -> Result<Arc<dyn ExecutionPlan>> {
1658        if !all_properties
1659            .get(variable)
1660            .is_some_and(|p| p.contains("*"))
1661        {
1662            return Ok(plan);
1663        }
1664        let prefix = format!("{}.", variable);
1665        let struct_props: Vec<String> = plan
1666            .schema()
1667            .fields()
1668            .iter()
1669            .filter_map(|f| {
1670                f.name()
1671                    .strip_prefix(&prefix)
1672                    .filter(|prop| !prop.starts_with('_') || *prop == "_all_props")
1673                    .map(|prop| prop.to_string())
1674            })
1675            .collect();
1676        self.add_structural_projection(plan, variable, &struct_props)
1677    }
1678
1679    /// Detect whether a target variable is already bound in the input plan's schema.
1680    ///
1681    /// Returns `Some("{target_variable}._vid")` when the column is present.
1682    fn detect_bound_target(input_schema: &SchemaRef, target_variable: &str) -> Option<String> {
1683        // Standard: {var}._vid from ScanNodes output
1684        let col = format!("{}._vid", target_variable);
1685        if input_schema.column_with_name(&col).is_some() {
1686            return Some(col);
1687        }
1688        // Fallback: bare variable name if it's a numeric (VID) column.
1689        // This handles EXISTS subquery contexts where imported variables are
1690        // projected as Parameter("{var}") → bare VID column.
1691        // VIDs are UInt64 in Arrow, but may become Int64 after parameter
1692        // round-tripping through Value::Integer → ScalarValue::Int64.
1693        if let Ok(field) = input_schema.field_with_name(target_variable)
1694            && matches!(
1695                field.data_type(),
1696                datafusion::arrow::datatypes::DataType::UInt64
1697                    | datafusion::arrow::datatypes::DataType::Int64
1698            )
1699        {
1700            return Some(target_variable.to_string());
1701        }
1702        None
1703    }
1704
1705    /// Resolve the property list and wildcard flag for a schemaless vertex scan.
1706    ///
1707    /// Filters out the `"*"` marker, ensures `_all_props` is present, and returns
1708    /// `(properties, need_full)` where `need_full` indicates structural access.
1709    fn resolve_schemaless_properties(
1710        variable: &str,
1711        all_properties: &HashMap<String, HashSet<String>>,
1712    ) -> (Vec<String>, bool) {
1713        let mut properties: Vec<String> = all_properties
1714            .get(variable)
1715            .map(|s| s.iter().filter(|p| *p != "*").cloned().collect())
1716            .unwrap_or_default();
1717        let need_full = all_properties
1718            .get(variable)
1719            .is_some_and(|p| p.contains("*"));
1720        if !properties.iter().any(|p| p == "_all_props") {
1721            properties.push("_all_props".to_string());
1722        }
1723        (properties, need_full)
1724    }
1725
1726    /// Collect edge columns (`._eid` and `__eid_to_*`) from a schema, filtered to the
1727    /// current MATCH scope. Optionally excludes a specific column (for rebound edge patterns).
1728    fn collect_used_edge_columns(
1729        schema: &SchemaRef,
1730        scope_match_variables: &HashSet<String>,
1731        exclude_col: Option<&str>,
1732    ) -> Vec<String> {
1733        schema
1734            .fields()
1735            .iter()
1736            .filter_map(|f| {
1737                let name = f.name();
1738                if exclude_col.is_some_and(|exc| name == exc) {
1739                    None
1740                } else if name.ends_with("._eid") {
1741                    let var_name = name.trim_end_matches("._eid");
1742                    scope_match_variables
1743                        .contains(var_name)
1744                        .then(|| name.clone())
1745                } else if name.starts_with("__eid_to_") {
1746                    let var_name = name.trim_start_matches("__eid_to_");
1747                    scope_match_variables
1748                        .contains(var_name)
1749                        .then(|| name.clone())
1750                } else {
1751                    None
1752                }
1753            })
1754            .collect()
1755    }
1756
1757    /// Conditionally add edge structural projection when the edge variable has wildcard access.
1758    /// Skips if `skip_if_vlp` is true (VLP step variables are already `List<Edge>`).
1759    fn maybe_add_edge_structural_projection(
1760        &self,
1761        plan: Arc<dyn ExecutionPlan>,
1762        step_variable: Option<&str>,
1763        source_variable: &str,
1764        target_variable: &str,
1765        all_properties: &HashMap<String, HashSet<String>>,
1766        skip_if_vlp: bool,
1767    ) -> Result<Arc<dyn ExecutionPlan>> {
1768        if skip_if_vlp {
1769            return Ok(plan);
1770        }
1771        let Some(edge_var) = step_variable else {
1772            return Ok(plan);
1773        };
1774        if !all_properties
1775            .get(edge_var)
1776            .is_some_and(|p| p.contains("*"))
1777        {
1778            return Ok(plan);
1779        }
1780        // Derive edge properties from the plan's output schema
1781        let prefix = format!("{}.", edge_var);
1782        let edge_props: Vec<String> = plan
1783            .schema()
1784            .fields()
1785            .iter()
1786            .filter_map(|f| {
1787                f.name()
1788                    .strip_prefix(&prefix)
1789                    .filter(|prop| !prop.starts_with('_') && *prop != "overflow_json")
1790                    .map(|prop| prop.to_string())
1791            })
1792            .collect();
1793        self.add_edge_structural_projection(
1794            plan,
1795            edge_var,
1796            &edge_props,
1797            source_variable,
1798            target_variable,
1799        )
1800    }
1801
1802    /// Apply filter, optional structural projection, and optional wrapping to a schemaless scan.
1803    fn finalize_schemaless_scan(
1804        &self,
1805        scan_plan: Arc<dyn ExecutionPlan>,
1806        variable: &str,
1807        filter: Option<&Expr>,
1808        optional: bool,
1809        properties: &[String],
1810        need_full: bool,
1811    ) -> Result<Arc<dyn ExecutionPlan>> {
1812        // Apply filter BEFORE structural projection to avoid ambiguous column
1813        // references (flat `var._vid` vs struct `var._vid` field).
1814        let mut plan = self.apply_scan_filter(scan_plan, variable, filter, None)?;
1815
1816        // If we need the full object (structural access), build a struct with _labels + properties.
1817        // This enables labels(n)/keys(n) UDFs which expect a Struct column with a _labels field.
1818        if need_full {
1819            // Filter out "*" (wildcard marker) from struct_props.
1820            // Keep "_all_props" so that keys()/properties() UDFs can extract
1821            // property names at runtime from the CypherValue blob.
1822            let struct_props: Vec<String> =
1823                properties.iter().filter(|p| *p != "*").cloned().collect();
1824            plan = self.add_structural_projection(plan, variable, &struct_props)?;
1825        }
1826
1827        self.wrap_optional(plan, optional)
1828    }
1829
1830    fn plan_schemaless_scan(
1831        &self,
1832        label_name: &str,
1833        variable: &str,
1834        filter: Option<&Expr>,
1835        optional: bool,
1836        all_properties: &HashMap<String, HashSet<String>>,
1837    ) -> Result<Arc<dyn ExecutionPlan>> {
1838        let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
1839        let scan_plan: Arc<dyn ExecutionPlan> =
1840            Arc::new(GraphScanExec::new_schemaless_vertex_scan(
1841                self.graph_ctx.clone(),
1842                label_name.to_string(),
1843                variable.to_string(),
1844                properties.clone(),
1845                None,
1846            ));
1847        self.finalize_schemaless_scan(
1848            scan_plan,
1849            variable,
1850            filter,
1851            optional,
1852            &properties,
1853            need_full,
1854        )
1855    }
1856
1857    /// Plan a multi-label vertex scan using the main vertices table.
1858    ///
1859    /// For patterns like `(n:A:B)`, scans vertices with ALL labels (intersection).
1860    fn plan_multi_label_scan(
1861        &self,
1862        labels: &[String],
1863        variable: &str,
1864        filter: Option<&Expr>,
1865        optional: bool,
1866        all_properties: &HashMap<String, HashSet<String>>,
1867    ) -> Result<Arc<dyn ExecutionPlan>> {
1868        let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
1869        let scan_plan: Arc<dyn ExecutionPlan> =
1870            Arc::new(GraphScanExec::new_multi_label_vertex_scan(
1871                self.graph_ctx.clone(),
1872                labels.to_vec(),
1873                variable.to_string(),
1874                properties.clone(),
1875                None,
1876            ));
1877        self.finalize_schemaless_scan(
1878            scan_plan,
1879            variable,
1880            filter,
1881            optional,
1882            &properties,
1883            need_full,
1884        )
1885    }
1886
1887    /// Plan a scan of all vertices regardless of label.
1888    ///
1889    /// This is used for `MATCH (n)` without a label filter.
1890    fn plan_scan_all(
1891        &self,
1892        variable: &str,
1893        filter: Option<&Expr>,
1894        optional: bool,
1895        all_properties: &HashMap<String, HashSet<String>>,
1896    ) -> Result<Arc<dyn ExecutionPlan>> {
1897        let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
1898        let scan_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphScanExec::new_schemaless_all_scan(
1899            self.graph_ctx.clone(),
1900            variable.to_string(),
1901            properties.clone(),
1902            None,
1903        ));
1904        self.finalize_schemaless_scan(
1905            scan_plan,
1906            variable,
1907            filter,
1908            optional,
1909            &properties,
1910            need_full,
1911        )
1912    }
1913
1914    /// Plan a graph traversal.
1915    #[expect(
1916        clippy::too_many_arguments,
1917        reason = "Graph traversal requires many parameters"
1918    )]
1919    fn plan_traverse(
1920        &self,
1921        input: &LogicalPlan,
1922        edge_type_ids: &[u32],
1923        direction: AstDirection,
1924        source_variable: &str,
1925        target_variable: &str,
1926        target_label_id: u16,
1927        step_variable: Option<&str>,
1928        min_hops: usize,
1929        max_hops: usize,
1930        path_variable: Option<&str>,
1931        optional: bool,
1932        target_filter: Option<&Expr>,
1933        is_variable_length: bool,
1934        optional_pattern_vars: &HashSet<String>,
1935        all_properties: &HashMap<String, HashSet<String>>,
1936        scope_match_variables: &HashSet<String>,
1937        edge_filter_expr: Option<&Expr>,
1938        path_mode: &crate::query::df_graph::nfa::PathMode,
1939        qpp_steps: Option<&[crate::query::planner::QppStepInfo]>,
1940    ) -> Result<Arc<dyn ExecutionPlan>> {
1941        let input_plan = self.plan_internal(input, all_properties)?;
1942
1943        let adj_direction = convert_direction(direction);
1944        let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
1945
1946        let traverse_plan: Arc<dyn ExecutionPlan> = if !is_variable_length {
1947            // Extract edge properties for pushdown hydration, expanding "*" wildcards
1948            let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
1949                let has_wildcard = all_properties
1950                    .get(edge_var)
1951                    .is_some_and(|props| props.contains("*"));
1952                if has_wildcard {
1953                    // Expand to all schema-defined properties across all matching edge types
1954                    let mut schema_props: Vec<String> = edge_type_ids
1955                        .iter()
1956                        .filter_map(|eid| self.schema.edge_type_name_by_id(*eid))
1957                        .flat_map(|name| {
1958                            self.schema
1959                                .properties
1960                                .get(name)
1961                                .map(|p| p.keys().cloned().collect::<Vec<_>>())
1962                                .unwrap_or_default()
1963                        })
1964                        .collect();
1965
1966                    // Also include explicitly referenced properties (non-wildcard, non-internal)
1967                    // that may be overflow properties not in the schema
1968                    if let Some(props) = all_properties.get(edge_var) {
1969                        for p in props {
1970                            if p != "*" && !p.starts_with('_') && !schema_props.contains(p) {
1971                                schema_props.push(p.clone());
1972                            }
1973                        }
1974                    }
1975                    schema_props
1976                } else {
1977                    all_properties
1978                        .get(edge_var)
1979                        .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
1980                        .unwrap_or_default()
1981                }
1982            } else {
1983                Vec::new()
1984            };
1985
1986            // Check if any edge property is NOT in the schema (needs overflow_json)
1987            if let Some(edge_var) = step_variable {
1988                let has_wildcard = all_properties
1989                    .get(edge_var)
1990                    .is_some_and(|props| props.contains("*"));
1991                let edge_type_props = self.merged_edge_type_properties(edge_type_ids);
1992                let has_overflow_edge_props = edge_properties.iter().any(|p| {
1993                    p != "overflow_json"
1994                        && !p.starts_with('_')
1995                        && !edge_type_props.contains_key(p.as_str())
1996                });
1997                // Add overflow_json if:
1998                // 1. Wildcard was used AND edge_properties is empty (no schema props for this edge type)
1999                // 2. OR there are overflow properties explicitly referenced
2000                let needs_overflow =
2001                    (has_wildcard && edge_properties.is_empty()) || has_overflow_edge_props;
2002                if needs_overflow && !edge_properties.contains(&"overflow_json".to_string()) {
2003                    edge_properties.push("overflow_json".to_string());
2004                }
2005
2006                // Add _all_props for L0 edge property visibility: schemaless edges
2007                // store properties by name in L0, not as overflow_json blobs, so we
2008                // need _all_props to surface them through the DataFusion path.
2009                if has_wildcard && !edge_properties.contains(&"_all_props".to_string()) {
2010                    edge_properties.push("_all_props".to_string());
2011                }
2012            }
2013
2014            // Extract target vertex properties, expanding "*" wildcards
2015            let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
2016            let mut target_properties =
2017                self.resolve_properties(target_variable, target_label_name_str, all_properties);
2018
2019            // Filter out "*" from target_properties — it is used for structural
2020            // projection (bare variable access like `RETURN t`) but must not be
2021            // passed to GraphTraverseExec as an actual property column name.
2022            target_properties.retain(|p| p != "*");
2023
2024            // When wildcard access was requested but no specific properties resolved,
2025            // add _all_props to ensure properties are loaded (mirrors plan_scan_all behavior).
2026            let target_has_wildcard = all_properties
2027                .get(target_variable)
2028                .is_some_and(|p| p.contains("*"));
2029            if target_has_wildcard && target_properties.is_empty() {
2030                target_properties.push("_all_props".to_string());
2031            }
2032
2033            // Check for non-schema properties that need CypherValue extraction.
2034            // For the traverse path, always use _all_props (not overflow_json) as
2035            // the CypherValue source since get_property_value handles _all_props directly.
2036            let target_label_props = if !target_label_name_str.is_empty() {
2037                self.schema.properties.get(target_label_name_str)
2038            } else {
2039                None
2040            };
2041            let has_non_schema_props = target_properties.iter().any(|p| {
2042                p != "overflow_json"
2043                    && p != "_all_props"
2044                    && !p.starts_with('_')
2045                    && !target_label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
2046            });
2047            if has_non_schema_props && !target_properties.iter().any(|p| p == "_all_props") {
2048                target_properties.push("_all_props".to_string());
2049            }
2050            // Also check the filter for non-schema property references
2051            if let Some(filter_expr) = target_filter {
2052                let filter_props = crate::query::df_expr::collect_properties(filter_expr);
2053                let has_overflow_filter = filter_props.iter().any(|(var, prop)| {
2054                    var == target_variable
2055                        && !prop.starts_with('_')
2056                        && !target_label_props
2057                            .is_some_and(|props| props.contains_key(prop.as_str()))
2058                });
2059                if has_overflow_filter && !target_properties.iter().any(|p| p == "_all_props") {
2060                    target_properties.push("_all_props".to_string());
2061                }
2062            }
2063            // For schema-defined labels that also have overflow properties, add overflow_json
2064            // for the scan path compatibility (Lance storage has overflow_json column).
2065            if !target_label_name_str.is_empty()
2066                && has_non_schema_props
2067                && !target_properties.iter().any(|p| p == "overflow_json")
2068            {
2069                target_properties.push("overflow_json".to_string());
2070            }
2071
2072            // Resolve target label name for property type lookups
2073            let target_label_name = if target_label_name_str.is_empty() {
2074                None
2075            } else {
2076                Some(target_label_name_str.to_string())
2077            };
2078
2079            // Single-hop traversal
2080            // Note: target_label_id is not passed here because VIDs no longer embed label info.
2081            // Label filtering for traversals is handled via the fallback executor when DataFusion
2082            // cannot handle the query, or via explicit filter predicates.
2083
2084            // Check if target variable is already bound (for cycle patterns like n-->k<--n)
2085            let bound_target_column =
2086                Self::detect_bound_target(&input_plan.schema(), target_variable);
2087
2088            // Collect edge ID columns from previous hops for relationship uniqueness.
2089            // Look for both explicit edge variables (ending in "._eid") and
2090            // internal tracking columns (starting with "__eid_to_").
2091            //
2092            // Rebound edge patterns (e.g. OPTIONAL MATCH ()-[r]->() where `r` is already bound)
2093            // use a temporary edge variable `__rebound_{r}` for traversal and then filter on eid.
2094            // Do not treat the already-bound `{r}._eid` as "used" here, otherwise the only
2095            // candidate edge is filtered out before rebound matching.
2096            // Handle rebound struct variables from WITH + aggregation.
2097            // When edge or target variables have passed through aggregation, they become
2098            // struct columns. Extract ALL fields as flat columns so that:
2099            // 1. {edge}._eid is available for uniqueness checking
2100            // 2. {edge}.{property} is available for downstream RETURN/WHERE
2101            // 3. {target}._vid is available for the bound target filter
2102            // 4. {target}.{property} is available for downstream RETURN/WHERE
2103            let mut input_plan = input_plan;
2104            for rebound_var in [
2105                step_variable.and_then(|sv| sv.strip_prefix("__rebound_")),
2106                target_variable.strip_prefix("__rebound_"),
2107            ]
2108            .into_iter()
2109            .flatten()
2110            {
2111                if input_plan
2112                    .schema()
2113                    .field_with_name(rebound_var)
2114                    .ok()
2115                    .is_some_and(|f| {
2116                        matches!(
2117                            f.data_type(),
2118                            datafusion::arrow::datatypes::DataType::Struct(_)
2119                        )
2120                    })
2121                {
2122                    input_plan = Self::extract_all_struct_fields(input_plan, rebound_var)?;
2123                }
2124            }
2125
2126            let rebound_bound_edge_col = step_variable
2127                .and_then(|sv| sv.strip_prefix("__rebound_"))
2128                .map(|bound| format!("{}._eid", bound));
2129
2130            let used_edge_columns = Self::collect_used_edge_columns(
2131                &input_plan.schema(),
2132                scope_match_variables,
2133                rebound_bound_edge_col.as_deref(),
2134            );
2135
2136            Arc::new(GraphTraverseExec::new(
2137                input_plan,
2138                source_col,
2139                edge_type_ids.to_vec(),
2140                adj_direction,
2141                target_variable.to_string(),
2142                step_variable.map(|s| s.to_string()),
2143                edge_properties,
2144                target_properties,
2145                target_label_name,
2146                None, // VIDs don't embed label - use VidLabelsIndex instead
2147                self.graph_ctx.clone(),
2148                optional,
2149                optional_pattern_vars.clone(),
2150                bound_target_column,
2151                used_edge_columns,
2152            ))
2153        } else {
2154            // Variable-length traversal
2155            if edge_type_ids.is_empty() {
2156                // No edge types - for min_hops=0, we can still emit zero-length paths
2157                // Use BindZeroLengthPath to create path with just the source node
2158                if let (0, Some(path_var)) = (min_hops, path_variable) {
2159                    return Ok(Arc::new(BindZeroLengthPathExec::new(
2160                        input_plan,
2161                        source_variable.to_string(),
2162                        path_var.to_string(),
2163                        self.graph_ctx.clone(),
2164                    )));
2165                } else if min_hops == 0 && step_variable.is_none() {
2166                    // min_hops=0 but no path variable - just return input as-is
2167                    // (the target is the same as source for zero-length)
2168                    return Ok(input_plan);
2169                }
2170            }
2171            {
2172                // Resolve target properties for VLP (same logic as single-hop above)
2173                let vlp_target_label_name_str =
2174                    self.schema.label_name_by_id(target_label_id).unwrap_or("");
2175                let vlp_target_properties_raw = self.resolve_properties(
2176                    target_variable,
2177                    vlp_target_label_name_str,
2178                    all_properties,
2179                );
2180                let target_has_wildcard = all_properties
2181                    .get(target_variable)
2182                    .is_some_and(|p| p.contains("*"));
2183                let vlp_target_label_props: Option<HashSet<String>> =
2184                    if vlp_target_label_name_str.is_empty() {
2185                        None
2186                    } else {
2187                        self.schema
2188                            .properties
2189                            .get(vlp_target_label_name_str)
2190                            .map(|props| props.keys().cloned().collect())
2191                    };
2192                let mut vlp_target_properties = sanitize_vlp_target_properties(
2193                    vlp_target_properties_raw,
2194                    target_has_wildcard,
2195                    vlp_target_label_props.as_ref(),
2196                );
2197                let vlp_target_label_name = if vlp_target_label_name_str.is_empty() {
2198                    None
2199                } else {
2200                    Some(vlp_target_label_name_str.to_string())
2201                };
2202
2203                // Check if target variable is already bound (for patterns where target is in scope)
2204                let bound_target_column =
2205                    Self::detect_bound_target(&input_plan.schema(), target_variable);
2206                if bound_target_column.is_some() {
2207                    // For correlated patterns with bound target, traversal only needs reachability.
2208                    // Reuse existing bound target columns from input and avoid re-hydrating props.
2209                    vlp_target_properties.clear();
2210                }
2211
2212                // VLP: compile edge predicates to Lance SQL for bitmap preselection
2213                let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
2214                    let edge_var_name = step_variable.unwrap_or("__anon_edge");
2215                    crate::query::pushdown::LanceFilterGenerator::generate(
2216                        std::slice::from_ref(expr),
2217                        edge_var_name,
2218                        None,
2219                    )
2220                });
2221
2222                // VLP: extract simple property equality conditions for L0 checking
2223                let edge_property_conditions = edge_filter_expr
2224                    .map(Self::extract_edge_property_conditions)
2225                    .unwrap_or_default();
2226
2227                // VLP: collect used edge columns for cross-pattern relationship uniqueness
2228                let used_edge_columns = Self::collect_used_edge_columns(
2229                    &input_plan.schema(),
2230                    scope_match_variables,
2231                    None,
2232                );
2233
2234                // VLP: determine output mode based on bound variables
2235                let output_mode = if step_variable.is_some() {
2236                    crate::query::df_graph::nfa::VlpOutputMode::StepVariable
2237                } else if path_variable.is_some() {
2238                    crate::query::df_graph::nfa::VlpOutputMode::FullPath
2239                } else {
2240                    crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
2241                };
2242
2243                // Compile QPP NFA if multi-step pattern, otherwise let exec compile VLP NFA
2244                let qpp_nfa = qpp_steps.map(|steps| {
2245                    use crate::query::df_graph::nfa::{QppStep, VertexConstraint};
2246                    let hops_per_iter = steps.len();
2247                    let min_iter = min_hops / hops_per_iter;
2248                    let max_iter = max_hops / hops_per_iter;
2249                    let nfa_steps: Vec<QppStep> = steps
2250                        .iter()
2251                        .map(|s| QppStep {
2252                            edge_type_ids: s.edge_type_ids.clone(),
2253                            direction: convert_direction(s.direction.clone()),
2254                            target_constraint: s
2255                                .target_label
2256                                .as_ref()
2257                                .map(|l| VertexConstraint::Label(l.clone())),
2258                        })
2259                        .collect();
2260                    crate::query::df_graph::nfa::PathNfa::from_qpp(nfa_steps, min_iter, max_iter)
2261                });
2262
2263                Arc::new(GraphVariableLengthTraverseExec::new(
2264                    input_plan,
2265                    source_col,
2266                    edge_type_ids.to_vec(),
2267                    adj_direction,
2268                    min_hops,
2269                    max_hops,
2270                    target_variable.to_string(),
2271                    step_variable.map(|s| s.to_string()),
2272                    path_variable.map(|s| s.to_string()),
2273                    vlp_target_properties,
2274                    vlp_target_label_name,
2275                    self.graph_ctx.clone(),
2276                    optional,
2277                    bound_target_column,
2278                    edge_lance_filter,
2279                    edge_property_conditions,
2280                    used_edge_columns,
2281                    path_mode.clone(),
2282                    output_mode,
2283                    qpp_nfa,
2284                ))
2285            }
2286        };
2287
2288        // Add structural projections for bare variable access (RETURN t, labels(t), etc.)
2289        let mut traverse_plan = traverse_plan;
2290
2291        // Structural projection for target variable
2292        traverse_plan = self.add_wildcard_structural_projection(
2293            traverse_plan,
2294            target_variable,
2295            all_properties,
2296        )?;
2297
2298        // Structural projection for edge variable
2299        // Only for single-hop traversals; VLP step variables are already List<Edge>
2300        traverse_plan = self.maybe_add_edge_structural_projection(
2301            traverse_plan,
2302            step_variable,
2303            source_variable,
2304            target_variable,
2305            all_properties,
2306            is_variable_length,
2307        )?;
2308
2309        // Apply target filter if present
2310        if let Some(filter_expr) = target_filter {
2311            // Build context with variable kinds for this traverse
2312            let mut variable_kinds = HashMap::new();
2313            variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
2314            variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
2315            if let Some(sv) = step_variable {
2316                variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
2317            }
2318            if let Some(pv) = path_variable {
2319                variable_kinds.insert(pv.to_string(), VariableKind::Path);
2320            }
2321            let mut variable_labels = HashMap::new();
2322            if let Some(sv) = step_variable
2323                && edge_type_ids.len() == 1
2324                && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
2325            {
2326                variable_labels.insert(sv.to_string(), name.to_string());
2327            }
2328            let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
2329            if !target_label_name_str.is_empty() {
2330                variable_labels.insert(
2331                    target_variable.to_string(),
2332                    target_label_name_str.to_string(),
2333                );
2334            }
2335            let ctx = TranslationContext {
2336                parameters: self.params.clone(),
2337                variable_labels,
2338                variable_kinds,
2339                ..Default::default()
2340            };
2341            let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
2342            let schema = traverse_plan.schema();
2343            let session = self.session_ctx.read();
2344            let physical_filter =
2345                self.create_physical_filter_expr(&df_filter, &schema, &session)?;
2346
2347            if optional {
2348                Ok(Arc::new(OptionalFilterExec::new(
2349                    traverse_plan,
2350                    physical_filter,
2351                    optional_pattern_vars.clone(),
2352                )))
2353            } else {
2354                Ok(Arc::new(FilterExec::try_new(
2355                    physical_filter,
2356                    traverse_plan,
2357                )?))
2358            }
2359        } else {
2360            Ok(traverse_plan)
2361        }
2362    }
2363
2364    /// Plan a schemaless edge traversal (TraverseMainByType).
2365    ///
2366    /// This is used for edges without a schema-defined type that must query the main edges table.
2367    /// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2368    #[expect(clippy::too_many_arguments)]
2369    fn plan_traverse_main_by_type(
2370        &self,
2371        input: &LogicalPlan,
2372        type_names: &[String],
2373        direction: AstDirection,
2374        source_variable: &str,
2375        target_variable: &str,
2376        step_variable: Option<&str>,
2377        optional: bool,
2378        optional_pattern_vars: &HashSet<String>,
2379        all_properties: &HashMap<String, HashSet<String>>,
2380        scope_match_variables: &HashSet<String>,
2381    ) -> Result<Arc<dyn ExecutionPlan>> {
2382        let input_plan = self.plan_internal(input, all_properties)?;
2383
2384        let adj_direction = convert_direction(direction);
2385        let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
2386
2387        // Check if target variable is already bound (for patterns where target is in scope)
2388        let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
2389
2390        // Extract edge properties for schemaless edges (all treated as Utf8/JSON)
2391        let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
2392            all_properties
2393                .get(edge_var)
2394                .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
2395                .unwrap_or_default()
2396        } else {
2397            Vec::new()
2398        };
2399
2400        // If edge has wildcard, include _all_props for keys()/properties() support
2401        if let Some(edge_var) = step_variable
2402            && all_properties
2403                .get(edge_var)
2404                .is_some_and(|props| props.contains("*"))
2405            && !edge_properties.iter().any(|p| p == "_all_props")
2406        {
2407            edge_properties.push("_all_props".to_string());
2408        }
2409
2410        // Extract target vertex properties
2411        let mut target_properties: Vec<String> = all_properties
2412            .get(target_variable)
2413            .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
2414            .unwrap_or_default();
2415
2416        // Always include _all_props so post-traverse filters can rewrite
2417        // property accesses to json_get_* calls against the CypherValue blob.
2418        // Also include it when wildcard access was requested (RETURN n) even if empty.
2419        let target_has_wildcard = all_properties
2420            .get(target_variable)
2421            .is_some_and(|p| p.contains("*"));
2422        if (target_has_wildcard || !target_properties.is_empty())
2423            && !target_properties.iter().any(|p| p == "_all_props")
2424        {
2425            target_properties.push("_all_props".to_string());
2426        }
2427        if bound_target_column.is_some() {
2428            // Target already comes from outer scope; avoid redundant property materialization.
2429            target_properties.clear();
2430        }
2431
2432        // Compute used_edge_columns for relationship uniqueness (same logic as Traverse).
2433        // Exclude the rebound edge's own column so the BFS can match the bound edge.
2434        let rebound_bound_edge_col = step_variable
2435            .and_then(|sv| sv.strip_prefix("__rebound_"))
2436            .map(|bound| format!("{}._eid", bound));
2437        let used_edge_columns = Self::collect_used_edge_columns(
2438            &input_plan.schema(),
2439            scope_match_variables,
2440            rebound_bound_edge_col.as_deref(),
2441        );
2442
2443        // Create the schemaless traversal execution plan
2444        let traverse_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphTraverseMainExec::new(
2445            input_plan,
2446            source_col,
2447            type_names.to_vec(),
2448            adj_direction,
2449            target_variable.to_string(),
2450            step_variable.map(|s| s.to_string()),
2451            edge_properties.clone(),
2452            target_properties,
2453            self.graph_ctx.clone(),
2454            optional,
2455            optional_pattern_vars.clone(),
2456            bound_target_column,
2457            used_edge_columns,
2458        ));
2459
2460        let mut result_plan = traverse_plan;
2461
2462        // Structural projection for target variable (RETURN t, labels(t), etc.)
2463        result_plan =
2464            self.add_wildcard_structural_projection(result_plan, target_variable, all_properties)?;
2465
2466        // Structural projection for edge variable (type(r), RETURN r, etc.)
2467        result_plan = self.maybe_add_edge_structural_projection(
2468            result_plan,
2469            step_variable,
2470            source_variable,
2471            target_variable,
2472            all_properties,
2473            false, // not variable-length
2474        )?;
2475
2476        Ok(result_plan)
2477    }
2478
2479    /// Plan a schemaless edge traversal with variable-length paths (TraverseMainByType VLP).
2480    ///
2481    /// This is used for VLP patterns on edges without a schema-defined type that must query the main edges table.
2482    /// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2483    #[expect(clippy::too_many_arguments)]
2484    fn plan_traverse_main_by_type_vlp(
2485        &self,
2486        input: &LogicalPlan,
2487        type_names: &[String],
2488        direction: AstDirection,
2489        source_variable: &str,
2490        target_variable: &str,
2491        step_variable: Option<&str>,
2492        min_hops: usize,
2493        max_hops: usize,
2494        path_variable: Option<&str>,
2495        optional: bool,
2496        all_properties: &HashMap<String, HashSet<String>>,
2497        edge_filter_expr: Option<&Expr>,
2498        path_mode: &crate::query::df_graph::nfa::PathMode,
2499        scope_match_variables: &HashSet<String>,
2500    ) -> Result<Arc<dyn ExecutionPlan>> {
2501        let input_plan = self.plan_internal(input, all_properties)?;
2502
2503        let adj_direction = convert_direction(direction);
2504        let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
2505
2506        // Check if target variable is already bound (for patterns where target is in scope)
2507        let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
2508
2509        // Extract target vertex properties
2510        let mut target_properties: Vec<String> = all_properties
2511            .get(target_variable)
2512            .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
2513            .unwrap_or_default();
2514
2515        // Always include _all_props so post-traverse filters can rewrite
2516        // property accesses to json_get_* calls against the CypherValue blob.
2517        // Also include it when wildcard access was requested (RETURN n) even if empty.
2518        let target_has_wildcard = all_properties
2519            .get(target_variable)
2520            .is_some_and(|p| p.contains("*"));
2521        if (target_has_wildcard || !target_properties.is_empty())
2522            && !target_properties.iter().any(|p| p == "_all_props")
2523        {
2524            target_properties.push("_all_props".to_string());
2525        }
2526        if bound_target_column.is_some() {
2527            // Correlated EXISTS only requires reachability; keep bound target columns from input.
2528            target_properties.clear();
2529        }
2530
2531        // VLP: compile edge predicates to Lance SQL for bitmap preselection
2532        let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
2533            let edge_var_name = step_variable.unwrap_or("__anon_edge");
2534            crate::query::pushdown::LanceFilterGenerator::generate(
2535                std::slice::from_ref(expr),
2536                edge_var_name,
2537                None,
2538            )
2539        });
2540
2541        // VLP: extract edge property conditions for BFS-level filtering
2542        let edge_property_conditions = edge_filter_expr
2543            .map(Self::extract_edge_property_conditions)
2544            .unwrap_or_default();
2545
2546        // VLP: collect used edge columns for cross-pattern relationship uniqueness
2547        let used_edge_columns =
2548            Self::collect_used_edge_columns(&input_plan.schema(), scope_match_variables, None);
2549
2550        // VLP: determine output mode based on bound variables
2551        let output_mode = if step_variable.is_some() {
2552            crate::query::df_graph::nfa::VlpOutputMode::StepVariable
2553        } else if path_variable.is_some() {
2554            crate::query::df_graph::nfa::VlpOutputMode::FullPath
2555        } else {
2556            crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
2557        };
2558
2559        let traverse_plan = Arc::new(GraphVariableLengthTraverseMainExec::new(
2560            input_plan,
2561            source_col,
2562            type_names.to_vec(),
2563            adj_direction,
2564            min_hops,
2565            max_hops,
2566            target_variable.to_string(),
2567            step_variable.map(|s| s.to_string()),
2568            path_variable.map(|s| s.to_string()),
2569            target_properties,
2570            self.graph_ctx.clone(),
2571            optional,
2572            bound_target_column,
2573            edge_lance_filter,
2574            edge_property_conditions,
2575            used_edge_columns,
2576            path_mode.clone(),
2577            output_mode,
2578        ));
2579
2580        Ok(traverse_plan)
2581    }
2582
2583    /// Plan a shortest path computation.
2584    #[expect(clippy::too_many_arguments)]
2585    fn plan_shortest_path(
2586        &self,
2587        input: &LogicalPlan,
2588        edge_type_ids: &[u32],
2589        direction: AstDirection,
2590        source_variable: &str,
2591        target_variable: &str,
2592        path_variable: &str,
2593        all_shortest: bool,
2594        all_properties: &HashMap<String, HashSet<String>>,
2595    ) -> Result<Arc<dyn ExecutionPlan>> {
2596        let input_plan = self.plan_internal(input, all_properties)?;
2597
2598        let adj_direction = convert_direction(direction);
2599        let source_col = format!("{}._vid", source_variable);
2600        let target_col = format!("{}._vid", target_variable);
2601
2602        Ok(Arc::new(GraphShortestPathExec::new(
2603            input_plan,
2604            source_col,
2605            target_col,
2606            edge_type_ids.to_vec(),
2607            adj_direction,
2608            path_variable.to_string(),
2609            self.graph_ctx.clone(),
2610            all_shortest,
2611        )))
2612    }
2613
2614    /// Plan a filter operation.
2615    ///
2616    /// When `optional_variables` is non-empty, applies OPTIONAL MATCH WHERE semantics:
2617    /// rows where all optional variables are NULL are preserved regardless of the predicate.
2618    fn plan_filter(
2619        &self,
2620        input: &LogicalPlan,
2621        predicate: &Expr,
2622        optional_variables: &HashSet<String>,
2623        all_properties: &HashMap<String, HashSet<String>>,
2624    ) -> Result<Arc<dyn ExecutionPlan>> {
2625        let input_plan = self.plan_internal(input, all_properties)?;
2626        let schema = input_plan.schema();
2627
2628        // Use CypherPhysicalExprCompiler for all filters (handles both schema-typed
2629        // and schemaless LargeBinary/CypherValue columns without coercion failures).
2630        let ctx = self.translation_context_for_plan(input);
2631        let session = self.session_ctx.read();
2632        let state = session.state();
2633        let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
2634            &state,
2635            Some(&ctx),
2636        )
2637        .with_subquery_ctx(
2638            self.graph_ctx.clone(),
2639            self.schema.clone(),
2640            self.session_ctx.clone(),
2641            self.storage.clone(),
2642            self.params.clone(),
2643        );
2644        let physical_predicate = compiler.compile(predicate, &schema)?;
2645
2646        // For OPTIONAL MATCH: use OptionalFilterExec for proper NULL row preservation.
2647        if !optional_variables.is_empty() {
2648            return Ok(Arc::new(OptionalFilterExec::new(
2649                input_plan,
2650                physical_predicate,
2651                optional_variables.clone(),
2652            )));
2653        }
2654
2655        Ok(Arc::new(FilterExec::try_new(
2656            physical_predicate,
2657            input_plan,
2658        )?))
2659    }
2660
2661    /// Plan a projection, passing alias map through to Sort nodes in the input chain.
2662    fn plan_project_with_aliases(
2663        &self,
2664        input: &LogicalPlan,
2665        projections: &[(Expr, Option<String>)],
2666        all_properties: &HashMap<String, HashSet<String>>,
2667        alias_map: &HashMap<String, Expr>,
2668    ) -> Result<Arc<dyn ExecutionPlan>> {
2669        // Route through plan_internal_with_aliases to propagate aliases to Sort
2670        let input_plan = self.plan_internal_with_aliases(input, all_properties, alias_map)?;
2671        self.plan_project_from_input(input_plan, projections, Some(input))
2672    }
2673
2674    /// Build projection expressions from an already-planned input.
2675    fn plan_project_from_input(
2676        &self,
2677        input_plan: Arc<dyn ExecutionPlan>,
2678        projections: &[(Expr, Option<String>)],
2679        context_plan: Option<&LogicalPlan>,
2680    ) -> Result<Arc<dyn ExecutionPlan>> {
2681        let schema = input_plan.schema();
2682
2683        let session = self.session_ctx.read();
2684        let state = session.state();
2685
2686        // Build translation context with variable kinds if we have a logical plan
2687        let ctx = context_plan.map(|p| self.translation_context_for_plan(p));
2688
2689        let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
2690
2691        for (expr, alias) in projections {
2692            // Handle whole-node/relationship projection: RETURN n
2693            // The scan layer materializes the variable as either:
2694            //   - A Struct column (registered labels via add_structural_projection)
2695            //   - A LargeBinary/CypherValue column aliased as the variable (schemaless via add_alias_projection)
2696            // Project that column directly, plus _vid/_labels helpers for post-processing.
2697            if let Expr::Variable(var_name) = expr {
2698                if schema.column_with_name(var_name).is_some() {
2699                    let (col_idx, _) = schema.column_with_name(var_name).unwrap();
2700                    let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2701                        datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
2702                    );
2703                    let name = alias.clone().unwrap_or_else(|| var_name.clone());
2704                    exprs.push((col_expr, name));
2705
2706                    // Include _vid and _labels as helper columns for post-processing
2707                    let vid_col = format!("{}._vid", var_name);
2708                    let labels_col = format!("{}._labels", var_name);
2709                    if let Some((vi, _)) = schema.column_with_name(&vid_col) {
2710                        let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2711                            datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
2712                        );
2713                        exprs.push((ve, vid_col.clone()));
2714                    }
2715                    if let Some((li, _)) = schema.column_with_name(&labels_col) {
2716                        let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2717                            datafusion::physical_expr::expressions::Column::new(&labels_col, li),
2718                        );
2719                        exprs.push((le, labels_col.clone()));
2720                    }
2721
2722                    // Carry through all {var}.{prop} columns so downstream
2723                    // operators (e.g. RETURN n.name after WITH n) can find them.
2724                    let prefix = format!("{}.", var_name);
2725                    for (idx, field) in schema.fields().iter().enumerate() {
2726                        let fname = field.name();
2727                        if fname.starts_with(&prefix)
2728                            && fname != &vid_col
2729                            && fname != &labels_col
2730                            && !exprs.iter().any(|(_, n)| n == fname)
2731                        {
2732                            let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2733                                Arc::new(datafusion::physical_expr::expressions::Column::new(
2734                                    fname, idx,
2735                                ));
2736                            exprs.push((prop_expr, fname.clone()));
2737                        }
2738                    }
2739                    continue;
2740                }
2741
2742                // No materialized column — build a struct from expanded dot-columns
2743                // This handles traversal targets that have b._vid, b.name, etc. but no b column
2744                let prefix = format!("{}.", var_name);
2745                let expanded_fields: Vec<(usize, String)> = schema
2746                    .fields()
2747                    .iter()
2748                    .enumerate()
2749                    .filter(|(_, f)| f.name().starts_with(&prefix))
2750                    .map(|(i, f)| (i, f.name().clone()))
2751                    .collect();
2752
2753                if !expanded_fields.is_empty() {
2754                    use datafusion::functions::expr_fn::named_struct;
2755                    use datafusion::logical_expr::lit;
2756
2757                    // Build named_struct args: pairs of (field_name_literal, column_ref)
2758                    let mut struct_args = Vec::new();
2759                    for (_, field_name) in &expanded_fields {
2760                        let prop_name = &field_name[prefix.len()..];
2761                        struct_args.push(lit(prop_name.to_string()));
2762                        // Use Column::from_name to avoid dot-parsing (b._vid != table b, col _vid)
2763                        struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
2764                            field_name.as_str(),
2765                        )));
2766                    }
2767
2768                    let struct_expr = named_struct(struct_args);
2769                    let df_schema =
2770                        datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
2771                    let session = self.session_ctx.read();
2772                    let state_ref = session.state();
2773                    let resolved_expr = Self::resolve_udfs(&struct_expr, &state_ref)?;
2774
2775                    use datafusion::physical_planner::PhysicalPlanner;
2776                    let phys_planner =
2777                        datafusion::physical_planner::DefaultPhysicalPlanner::default();
2778                    let physical_struct_expr = phys_planner.create_physical_expr(
2779                        &resolved_expr,
2780                        &df_schema,
2781                        &state_ref,
2782                    )?;
2783
2784                    let name = alias.clone().unwrap_or_else(|| var_name.clone());
2785                    exprs.push((physical_struct_expr, name));
2786
2787                    // Also include _vid and _labels helpers
2788                    let vid_col = format!("{}._vid", var_name);
2789                    let labels_col = format!("{}._labels", var_name);
2790                    if let Some((vi, _)) = schema.column_with_name(&vid_col) {
2791                        let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2792                            datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
2793                        );
2794                        exprs.push((ve, vid_col.clone()));
2795                    }
2796                    if let Some((li, _)) = schema.column_with_name(&labels_col) {
2797                        let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2798                            datafusion::physical_expr::expressions::Column::new(&labels_col, li),
2799                        );
2800                        exprs.push((le, labels_col.clone()));
2801                    }
2802
2803                    // Carry through remaining {var}.{prop} columns not already
2804                    // included by the struct projection above.
2805                    for (idx, field) in schema.fields().iter().enumerate() {
2806                        let fname = field.name();
2807                        if fname.starts_with(&prefix)
2808                            && fname != &vid_col
2809                            && fname != &labels_col
2810                            && !exprs.iter().any(|(_, n)| n == fname)
2811                        {
2812                            let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2813                                Arc::new(datafusion::physical_expr::expressions::Column::new(
2814                                    fname, idx,
2815                                ));
2816                            exprs.push((prop_expr, fname.clone()));
2817                        }
2818                    }
2819                    continue;
2820                }
2821                // Fall through to normal expression compilation if no matching columns at all
2822            }
2823
2824            // Handle RETURN * (wildcard) — expand to all input columns
2825            if matches!(expr, Expr::Wildcard) {
2826                for (col_idx, field) in schema.fields().iter().enumerate() {
2827                    let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2828                        datafusion::physical_expr::expressions::Column::new(field.name(), col_idx),
2829                    );
2830                    exprs.push((col_expr, field.name().clone()));
2831                }
2832                continue;
2833            }
2834
2835            let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
2836                &state,
2837                ctx.as_ref(),
2838            )
2839            .with_subquery_ctx(
2840                self.graph_ctx.clone(),
2841                self.schema.clone(),
2842                self.session_ctx.clone(),
2843                self.storage.clone(),
2844                self.params.clone(),
2845            );
2846            let physical_expr = compiler.compile(expr, &schema)?;
2847
2848            let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
2849            exprs.push((physical_expr, name));
2850        }
2851
2852        Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
2853    }
2854
2855    /// Plan a compact Locy YIELD projection — emits ONLY the listed expressions,
2856    /// without carrying through helper/property columns.
2857    ///
2858    /// Node variables are projected as their `._vid` column (UInt64).
2859    /// Other expressions are compiled normally, then CAST to target type if needed.
2860    fn plan_locy_project(
2861        &self,
2862        input: &LogicalPlan,
2863        projections: &[(Expr, Option<String>)],
2864        target_types: &[DataType],
2865        all_properties: &HashMap<String, HashSet<String>>,
2866    ) -> Result<Arc<dyn ExecutionPlan>> {
2867        use datafusion::physical_expr::expressions::Column;
2868
2869        let input_plan = self.plan_internal(input, all_properties)?;
2870        let schema = input_plan.schema();
2871
2872        let session = self.session_ctx.read();
2873        let state = session.state();
2874
2875        let ctx = self.translation_context_for_plan(input);
2876
2877        let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
2878
2879        for (i, (expr, alias)) in projections.iter().enumerate() {
2880            let target_type = target_types.get(i);
2881
2882            // Handle node/relationship variables: extract ._vid column
2883            if let Expr::Variable(var_name) = expr {
2884                // Check if this is a graph-expanded node variable ({var}._vid exists)
2885                let vid_col_name = format!("{}._vid", var_name);
2886                let vid_col_match = schema
2887                    .fields()
2888                    .iter()
2889                    .enumerate()
2890                    .find(|(_, f)| f.name() == &vid_col_name);
2891
2892                if let Some((vid_idx, _)) = vid_col_match {
2893                    // Node variable → extract VID (UInt64)
2894                    let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2895                        Arc::new(Column::new(&vid_col_name, vid_idx));
2896                    let name = alias.clone().unwrap_or_else(|| var_name.clone());
2897                    exprs.push((col_expr, name));
2898                    continue;
2899                }
2900
2901                // Direct column (e.g. from derived scan)
2902                if let Some((col_idx, _)) = schema.column_with_name(var_name) {
2903                    let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2904                        Arc::new(Column::new(var_name, col_idx));
2905                    let name = alias.clone().unwrap_or_else(|| var_name.clone());
2906                    exprs.push((col_expr, name));
2907                    continue;
2908                }
2909                // Fall through to generic expression compilation
2910            }
2911
2912            // Generic expression compilation (property access, literals, etc.)
2913            let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
2914                &state,
2915                Some(&ctx),
2916            )
2917            .with_subquery_ctx(
2918                self.graph_ctx.clone(),
2919                self.schema.clone(),
2920                self.session_ctx.clone(),
2921                self.storage.clone(),
2922                self.params.clone(),
2923            );
2924            let physical_expr = compiler.compile(expr, &schema)?;
2925
2926            // CAST if the compiled expression's output type doesn't match target.
2927            // Skip coercion when actual is a string type but target is numeric
2928            // (or vice versa) — this means `infer_expr_type` guessed wrong
2929            // (e.g. defaulting Property to Float64 for a string column).
2930            let physical_expr = if let Some(target_dt) = target_type {
2931                let actual_dt = physical_expr
2932                    .data_type(schema.as_ref())
2933                    .unwrap_or(DataType::LargeUtf8);
2934                let is_string = |dt: &DataType| matches!(dt, DataType::Utf8 | DataType::LargeUtf8);
2935                let is_numeric = |dt: &DataType| {
2936                    matches!(dt, DataType::Int64 | DataType::Float64 | DataType::UInt64)
2937                };
2938                let cross_domain = (is_string(&actual_dt) && is_numeric(target_dt))
2939                    || (is_numeric(&actual_dt) && is_string(target_dt));
2940                if actual_dt != *target_dt && !cross_domain {
2941                    coerce_physical_expr(physical_expr, &actual_dt, target_dt, schema.as_ref())
2942                } else {
2943                    physical_expr
2944                }
2945            } else {
2946                physical_expr
2947            };
2948
2949            let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
2950            exprs.push((physical_expr, name));
2951        }
2952
2953        Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
2954    }
2955
2956    /// Plan an aggregation.
2957    fn plan_aggregate(
2958        &self,
2959        input: &LogicalPlan,
2960        group_by: &[Expr],
2961        aggregates: &[Expr],
2962        all_properties: &HashMap<String, HashSet<String>>,
2963    ) -> Result<Arc<dyn ExecutionPlan>> {
2964        let input_plan = self.plan_internal(input, all_properties)?;
2965        let schema = input_plan.schema();
2966
2967        let session = self.session_ctx.read();
2968        let state = session.state();
2969
2970        // Build translation context with variable kinds from the input plan
2971        let ctx = self.translation_context_for_plan(input);
2972
2973        // Translate group by expressions
2974        use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
2975        let mut group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
2976            Vec::new();
2977        for expr in group_by {
2978            let name = expr.to_string_repr();
2979
2980            // Entity variables (Node/Edge) from traversals may not have a direct
2981            // column — only expanded property columns like "other._vid",
2982            // "other.name", etc. Skip them here; the property expansion loop
2983            // below adds those columns to the group-by instead.
2984            if let Expr::Variable(var_name) = expr
2985                && schema.column_with_name(var_name).is_none()
2986            {
2987                let prefix = format!("{}.", var_name);
2988                let has_expanded = schema
2989                    .fields()
2990                    .iter()
2991                    .any(|f| f.name().starts_with(&prefix));
2992                if has_expanded {
2993                    continue;
2994                }
2995            }
2996
2997            let physical_expr = if CypherPhysicalExprCompiler::contains_custom_expr(expr) {
2998                // Custom expressions (quantifiers, list comprehensions, reduce, etc.)
2999                // cannot be translated via cypher_expr_to_df; compile them directly.
3000                let compiler = CypherPhysicalExprCompiler::new(&state, Some(&ctx))
3001                    .with_subquery_ctx(
3002                        self.graph_ctx.clone(),
3003                        self.schema.clone(),
3004                        self.session_ctx.clone(),
3005                        self.storage.clone(),
3006                        self.params.clone(),
3007                    );
3008                compiler.compile(expr, &schema)?
3009            } else {
3010                // DateTime/Time struct grouping: group by UTC-normalized values
3011                // Two DateTimes with same UTC instant but different offsets should group together
3012                let df_schema_ref =
3013                    datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
3014                let df_expr = cypher_expr_to_df(expr, Some(&ctx))?;
3015                let df_expr = Self::resolve_udfs(&df_expr, &state)?;
3016                let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema_ref)?;
3017                let mut df_expr = Self::resolve_udfs(&df_expr, &state)?;
3018                if let Ok(expr_type) = df_expr.get_type(&df_schema_ref) {
3019                    if uni_common::core::schema::is_datetime_struct(&expr_type) {
3020                        // Group by UTC instant (nanos_since_epoch)
3021                        df_expr = crate::query::df_expr::extract_datetime_nanos(df_expr);
3022                    } else if uni_common::core::schema::is_time_struct(&expr_type) {
3023                        // Group by UTC-normalized time
3024                        // extract_time_nanos does: nanos_since_midnight - (offset_seconds * 1e9)
3025                        df_expr = crate::query::df_expr::extract_time_nanos(df_expr);
3026                    }
3027                }
3028
3029                // Convert logical expression to physical
3030                create_physical_expr(&df_expr, &df_schema_ref, state.execution_props())?
3031            };
3032            group_exprs.push((physical_expr, name));
3033        }
3034
3035        // For entity variables (Node/Edge) in group_by, also include their
3036        // property columns. Properties are functionally dependent on the entity,
3037        // so grouping by them is semantically correct and ensures they survive
3038        // the aggregation for downstream property access (e.g. RETURN a.name
3039        // after WITH a, min(...) AS m).
3040        for expr in group_by {
3041            if let Expr::Variable(var_name) = expr
3042                && matches!(
3043                    ctx.variable_kinds.get(var_name),
3044                    Some(VariableKind::Node) | Some(VariableKind::Edge)
3045                )
3046            {
3047                let prefix = format!("{}.", var_name);
3048                for (idx, field) in schema.fields().iter().enumerate() {
3049                    if field.name().starts_with(&prefix) {
3050                        let prop_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
3051                            datafusion::physical_expr::expressions::Column::new(field.name(), idx),
3052                        );
3053                        group_exprs.push((prop_col, field.name().clone()));
3054                    }
3055                }
3056            }
3057        }
3058
3059        let physical_group_by = PhysicalGroupBy::new_single(group_exprs);
3060
3061        // Pre-compute pattern comprehensions in aggregate arguments
3062        let (input_plan, schema, rewritten_aggregates) =
3063            self.precompute_custom_aggregate_args(input_plan, &schema, aggregates, &state, &ctx)?;
3064
3065        // Translate aggregates and their associated filter expressions
3066        // (e.g. collect() uses a filter to exclude null values per Cypher spec)
3067        let (aggr_exprs, filter_exprs): (Vec<_>, Vec<_>) = self
3068            .translate_aggregates(&rewritten_aggregates, &schema, &state, &ctx)?
3069            .into_iter()
3070            .unzip();
3071        let num_aggregates = aggr_exprs.len();
3072
3073        let agg_exec = Arc::new(AggregateExec::try_new(
3074            AggregateMode::Single,
3075            physical_group_by,
3076            aggr_exprs,
3077            filter_exprs,
3078            input_plan,
3079            schema,
3080        )?);
3081
3082        // DataFusion's AggregateExec auto-generates column names from physical
3083        // expressions (e.g. `count(Int32(1))`), but the logical plan's projection
3084        // expects names like `COUNT(n)`. Add a renaming projection to bridge this.
3085        let agg_schema = agg_exec.schema();
3086        // Use actual expanded group-by count (includes entity property columns)
3087        // rather than logical group_by.len() which doesn't account for expansion.
3088        let num_group_by = agg_schema.fields().len() - num_aggregates;
3089        let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
3090            Vec::new();
3091
3092        for (i, field) in agg_schema.fields().iter().enumerate() {
3093            let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
3094                datafusion::physical_expr::expressions::Column::new(field.name(), i),
3095            );
3096            let name = if i >= num_group_by {
3097                // Rename aggregate column to expected Cypher name
3098                aggregate_column_name(&aggregates[i - num_group_by])
3099            } else {
3100                field.name().clone()
3101            };
3102            proj_exprs.push((col_expr, name));
3103        }
3104
3105        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, agg_exec)?))
3106    }
3107
3108    /// Wrap a temporal aggregate argument with `get_field(arg, "nanos_since_epoch")` or
3109    /// `get_field(arg, "nanos_since_midnight")` when the argument is a DateTime/Time struct.
3110    ///
3111    /// Returns the argument unchanged for non-temporal types.
3112    fn wrap_temporal_sort_key(
3113        arg: datafusion::logical_expr::Expr,
3114        schema: &SchemaRef,
3115    ) -> Result<datafusion::logical_expr::Expr> {
3116        use datafusion::logical_expr::ScalarUDF;
3117        if let Ok(arg_type) = arg.get_type(&datafusion::common::DFSchema::try_from(
3118            schema.as_ref().clone(),
3119        )?) {
3120            if uni_common::core::schema::is_datetime_struct(&arg_type) {
3121                return Ok(datafusion::logical_expr::Expr::ScalarFunction(
3122                    datafusion::logical_expr::expr::ScalarFunction::new_udf(
3123                        Arc::new(ScalarUDF::from(
3124                            datafusion::functions::core::getfield::GetFieldFunc::new(),
3125                        )),
3126                        vec![arg, datafusion::logical_expr::lit("nanos_since_epoch")],
3127                    ),
3128                ));
3129            } else if uni_common::core::schema::is_time_struct(&arg_type) {
3130                return Ok(datafusion::logical_expr::Expr::ScalarFunction(
3131                    datafusion::logical_expr::expr::ScalarFunction::new_udf(
3132                        Arc::new(ScalarUDF::from(
3133                            datafusion::functions::core::getfield::GetFieldFunc::new(),
3134                        )),
3135                        vec![arg, datafusion::logical_expr::lit("nanos_since_midnight")],
3136                    ),
3137                ));
3138            }
3139        }
3140        Ok(arg)
3141    }
3142
3143    /// Translate Cypher aggregate expressions to DataFusion.
3144    fn translate_aggregates(
3145        &self,
3146        aggregates: &[Expr],
3147        schema: &SchemaRef,
3148        state: &SessionState,
3149        ctx: &TranslationContext,
3150    ) -> Result<Vec<PhysicalAggregate>> {
3151        use datafusion::functions_aggregate::expr_fn::{avg, count, max, min, sum};
3152
3153        let mut result: Vec<PhysicalAggregate> = Vec::new();
3154
3155        for agg_expr in aggregates {
3156            let Expr::FunctionCall {
3157                name,
3158                args,
3159                distinct,
3160                ..
3161            } = agg_expr
3162            else {
3163                return Err(anyhow!("Expected aggregate function, got: {:?}", agg_expr));
3164            };
3165
3166            let name_lower = name.to_lowercase();
3167
3168            // Helper to get required first argument
3169            let get_arg = || -> Result<DfExpr> {
3170                if args.is_empty() {
3171                    return Err(anyhow!("{}() requires an argument", name_lower));
3172                }
3173                cypher_expr_to_df(&args[0], Some(ctx))
3174            };
3175
3176            let df_agg = match name_lower.as_str() {
3177                "count" if args.is_empty() => count(datafusion::logical_expr::lit(1)),
3178                "count" => {
3179                    // For count(*) or count(variable) where variable is a node/edge
3180                    // (not a property), translate to count(lit(1)) since the variable
3181                    // itself has no column in the scan schema.
3182                    // Exception: COUNT(DISTINCT variable) needs the actual column
3183                    // reference so that null rows (from OPTIONAL MATCH) are excluded.
3184                    if matches!(args.first(), Some(uni_cypher::ast::Expr::Wildcard)) {
3185                        count(datafusion::logical_expr::lit(1))
3186                    } else if matches!(args.first(), Some(uni_cypher::ast::Expr::Variable(_))) {
3187                        if *distinct {
3188                            count(get_arg()?)
3189                        } else {
3190                            count(datafusion::logical_expr::lit(1))
3191                        }
3192                    } else {
3193                        count(get_arg()?)
3194                    }
3195                }
3196                "sum" => {
3197                    let arg = get_arg()?;
3198                    if self.is_large_binary_col(&arg, schema) {
3199                        let udaf = Arc::new(crate::query::df_udfs::create_cypher_sum_udaf());
3200                        udaf.call(vec![arg])
3201                    } else {
3202                        // Widen small integers to Int64 (DataFusion doesn't support Int32 sum).
3203                        // Float columns pass through unchanged so SUM preserves float type.
3204                        use datafusion::logical_expr::Cast;
3205                        let is_float = if let DfExpr::Column(col) = &arg
3206                            && let Ok(field) = schema.field_with_name(&col.name)
3207                        {
3208                            matches!(
3209                                field.data_type(),
3210                                datafusion::arrow::datatypes::DataType::Float32
3211                                    | datafusion::arrow::datatypes::DataType::Float64
3212                            )
3213                        } else {
3214                            false
3215                        };
3216                        if is_float {
3217                            sum(DfExpr::Cast(Cast::new(
3218                                Box::new(arg),
3219                                datafusion::arrow::datatypes::DataType::Float64,
3220                            )))
3221                        } else {
3222                            sum(DfExpr::Cast(Cast::new(
3223                                Box::new(arg),
3224                                datafusion::arrow::datatypes::DataType::Int64,
3225                            )))
3226                        }
3227                    }
3228                }
3229                "avg" => {
3230                    let arg = get_arg()?;
3231                    if self.is_large_binary_col(&arg, schema) {
3232                        let coerced = crate::query::df_udfs::cypher_to_float64_expr(arg);
3233                        avg(coerced)
3234                    } else {
3235                        use datafusion::logical_expr::Cast;
3236                        avg(DfExpr::Cast(Cast::new(
3237                            Box::new(arg),
3238                            datafusion::arrow::datatypes::DataType::Float64,
3239                        )))
3240                    }
3241                }
3242                "min" => {
3243                    // Use Cypher-aware min for LargeBinary columns (mixed types)
3244                    let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
3245
3246                    if self.is_large_binary_col(&arg, schema) {
3247                        let udaf = Arc::new(crate::query::df_udfs::create_cypher_min_udaf());
3248                        udaf.call(vec![arg])
3249                    } else {
3250                        min(arg)
3251                    }
3252                }
3253                "max" => {
3254                    // Use Cypher-aware max for LargeBinary columns (mixed types)
3255                    let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
3256
3257                    if self.is_large_binary_col(&arg, schema) {
3258                        let udaf = Arc::new(crate::query::df_udfs::create_cypher_max_udaf());
3259                        udaf.call(vec![arg])
3260                    } else {
3261                        max(arg)
3262                    }
3263                }
3264                "percentiledisc" => {
3265                    if args.len() != 2 {
3266                        return Err(anyhow!("percentileDisc() requires exactly 2 arguments"));
3267                    }
3268                    let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
3269                    let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
3270                    let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
3271                    let udaf =
3272                        Arc::new(crate::query::df_udfs::create_cypher_percentile_disc_udaf());
3273                    udaf.call(vec![coerced, pct_arg])
3274                }
3275                "percentilecont" => {
3276                    if args.len() != 2 {
3277                        return Err(anyhow!("percentileCont() requires exactly 2 arguments"));
3278                    }
3279                    let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
3280                    let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
3281                    let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
3282                    let udaf =
3283                        Arc::new(crate::query::df_udfs::create_cypher_percentile_cont_udaf());
3284                    udaf.call(vec![coerced, pct_arg])
3285                }
3286                "collect" => {
3287                    // Use custom Cypher collect UDAF that filters nulls and returns
3288                    // empty list (not null) when all inputs are null.
3289                    let arg = get_arg()?;
3290                    crate::query::df_udfs::create_cypher_collect_expr(arg, *distinct)
3291                }
3292                "btic_min" => {
3293                    let arg = get_arg()?;
3294                    let udaf = Arc::new(crate::query::df_udfs::create_btic_min_udaf());
3295                    udaf.call(vec![arg])
3296                }
3297                "btic_max" => {
3298                    let arg = get_arg()?;
3299                    let udaf = Arc::new(crate::query::df_udfs::create_btic_max_udaf());
3300                    udaf.call(vec![arg])
3301                }
3302                "btic_span_agg" => {
3303                    let arg = get_arg()?;
3304                    let udaf = Arc::new(crate::query::df_udfs::create_btic_span_agg_udaf());
3305                    udaf.call(vec![arg])
3306                }
3307                "btic_count_at" => {
3308                    if args.len() != 2 {
3309                        return Err(anyhow!("btic_count_at() requires exactly 2 arguments"));
3310                    }
3311                    let btic_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
3312                    let point_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
3313                    let udaf = Arc::new(crate::query::df_udfs::create_btic_count_at_udaf());
3314                    udaf.call(vec![btic_arg, point_arg])
3315                }
3316                _ => return Err(anyhow!("Unsupported aggregate function: {}", name)),
3317            };
3318
3319            // Apply DISTINCT if needed (collect/percentile handle their own distinct)
3320            let df_agg = if *distinct
3321                && !matches!(
3322                    name_lower.as_str(),
3323                    "collect" | "percentiledisc" | "percentilecont"
3324                ) {
3325                use datafusion::prelude::ExprFunctionExt;
3326                df_agg.distinct().build().map_err(|e| anyhow!("{}", e))?
3327            } else {
3328                df_agg
3329            };
3330
3331            // Resolve UDFs and apply type coercion inside aggregate arguments
3332            let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
3333            let df_agg = Self::resolve_udfs(&df_agg, state)?;
3334            let df_agg = crate::query::df_expr::apply_type_coercion(&df_agg, &df_schema)?;
3335            let df_agg = Self::resolve_udfs(&df_agg, state)?;
3336
3337            // Convert to physical aggregate
3338            let agg_and_filter = self.create_physical_aggregate(&df_agg, schema, state)?;
3339            result.push(agg_and_filter);
3340        }
3341
3342        Ok(result)
3343    }
3344
3345    /// Pre-compute pattern comprehensions in aggregate arguments.
3346    ///
3347    /// Scans aggregate expressions for pattern comprehensions, compiles them as
3348    /// physical expressions, adds them as projected columns, and rewrites the
3349    /// aggregate expressions to reference the pre-computed columns.
3350    fn precompute_custom_aggregate_args(
3351        &self,
3352        input_plan: Arc<dyn ExecutionPlan>,
3353        schema: &SchemaRef,
3354        aggregates: &[Expr],
3355        state: &SessionState,
3356        ctx: &TranslationContext,
3357    ) -> Result<(Arc<dyn ExecutionPlan>, SchemaRef, Vec<Expr>)> {
3358        use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
3359
3360        let mut needs_projection = false;
3361        let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
3362            Vec::new();
3363        let mut rewritten_aggregates = Vec::new();
3364        let mut col_counter = 0;
3365
3366        // First pass: copy all existing columns
3367        for (i, field) in schema.fields().iter().enumerate() {
3368            let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
3369                datafusion::physical_expr::expressions::Column::new(field.name(), i),
3370            );
3371            proj_exprs.push((col_expr, field.name().clone()));
3372        }
3373
3374        // Second pass: scan aggregates for custom expressions in arguments
3375        for agg_expr in aggregates {
3376            let Expr::FunctionCall {
3377                name,
3378                args,
3379                distinct,
3380                window_spec,
3381            } = agg_expr
3382            else {
3383                rewritten_aggregates.push(agg_expr.clone());
3384                continue;
3385            };
3386
3387            let mut rewritten_args = Vec::new();
3388            let mut agg_needs_rewrite = false;
3389
3390            for arg in args {
3391                if CypherPhysicalExprCompiler::contains_custom_expr(arg) {
3392                    // Compile the custom expression
3393                    let compiler = CypherPhysicalExprCompiler::new(state, Some(ctx))
3394                        .with_subquery_ctx(
3395                            self.graph_ctx.clone(),
3396                            self.schema.clone(),
3397                            self.session_ctx.clone(),
3398                            self.storage.clone(),
3399                            self.params.clone(),
3400                        );
3401                    let physical_expr = compiler.compile(arg, schema)?;
3402
3403                    // Add it as a projected column
3404                    let col_name = format!("__pc_{}", col_counter);
3405                    col_counter += 1;
3406                    proj_exprs.push((physical_expr, col_name.clone()));
3407
3408                    // Rewrite aggregate to reference the column
3409                    rewritten_args.push(Expr::Variable(col_name));
3410                    agg_needs_rewrite = true;
3411                    needs_projection = true;
3412                } else {
3413                    rewritten_args.push(arg.clone());
3414                }
3415            }
3416
3417            if agg_needs_rewrite {
3418                rewritten_aggregates.push(Expr::FunctionCall {
3419                    name: name.clone(),
3420                    args: rewritten_args,
3421                    distinct: *distinct,
3422                    window_spec: window_spec.clone(),
3423                });
3424            } else {
3425                rewritten_aggregates.push(agg_expr.clone());
3426            }
3427        }
3428
3429        if needs_projection {
3430            let projection_exec = Arc::new(
3431                datafusion::physical_plan::projection::ProjectionExec::try_new(
3432                    proj_exprs, input_plan,
3433                )?,
3434            );
3435            let new_schema = projection_exec.schema();
3436            Ok((projection_exec, new_schema, rewritten_aggregates))
3437        } else {
3438            Ok((input_plan, schema.clone(), aggregates.to_vec()))
3439        }
3440    }
3441
3442    /// Plan a sort operation.
3443    ///
3444    /// The `alias_map` provides a mapping from alias names to underlying expressions.
3445    /// This is needed because ORDER BY expressions may reference aliases defined in
3446    /// a parent Project node (e.g., `ORDER BY friend_count` where `friend_count`
3447    /// is an alias for `COUNT(r)`).
3448    fn plan_sort(
3449        &self,
3450        input: &LogicalPlan,
3451        order_by: &[SortItem],
3452        all_properties: &HashMap<String, HashSet<String>>,
3453        alias_map: &HashMap<String, Expr>,
3454    ) -> Result<Arc<dyn ExecutionPlan>> {
3455        let input_plan = self.plan_internal(input, all_properties)?;
3456        let schema = input_plan.schema();
3457
3458        let session = self.session_ctx.read();
3459
3460        // Build translation context with variable kinds from the input plan
3461        let ctx = self.translation_context_for_plan(input);
3462
3463        // Build DFSchema once for type coercion and physical expression conversion
3464        let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
3465
3466        // Translate sort expressions to DataFusion's SortExpr (a.k.a. Sort struct)
3467        // SortItem has `ascending: bool`, so use it directly
3468        // Default nulls_first to false for ASC, true for DESC
3469        use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
3470
3471        let mut df_sort_exprs = Vec::new();
3472        let mut custom_physical_overrides: Vec<(
3473            usize,
3474            Arc<dyn datafusion::physical_expr::PhysicalExpr>,
3475        )> = Vec::new();
3476        for item in order_by {
3477            let mut sort_expr = item.expr.clone();
3478
3479            // If the sort expression is a variable that matches an alias,
3480            // replace it with the underlying expression
3481            if let Expr::Variable(ref name) = sort_expr {
3482                // Check if this name exists in the input schema
3483                let col_name = name.as_str();
3484                let exists_in_schema = schema.fields().iter().any(|f| f.name() == col_name);
3485
3486                if !exists_in_schema && let Some(aliased_expr) = alias_map.get(col_name) {
3487                    sort_expr = aliased_expr.clone();
3488                }
3489            }
3490
3491            let asc = item.ascending;
3492            let nulls_first = !asc; // Standard SQL behavior: nulls last for ASC, first for DESC
3493
3494            // Custom expressions (similar_to, comprehensions, etc.) cannot be
3495            // translated via cypher_expr_to_df. Compile with the custom compiler
3496            // and save as an override for the physical sort expression.
3497            if CypherPhysicalExprCompiler::contains_custom_expr(&sort_expr) {
3498                let sort_state = session.state();
3499                let compiler = CypherPhysicalExprCompiler::new(&sort_state, Some(&ctx))
3500                    .with_subquery_ctx(
3501                        self.graph_ctx.clone(),
3502                        self.schema.clone(),
3503                        self.session_ctx.clone(),
3504                        self.storage.clone(),
3505                        self.params.clone(),
3506                    );
3507                let inner_physical = compiler.compile(&sort_expr, &schema)?;
3508
3509                // Use a dummy column reference for the logical sort expression
3510                // (we'll replace the physical expression below).
3511                let first_col = schema
3512                    .fields()
3513                    .first()
3514                    .map(|f| f.name().clone())
3515                    .unwrap_or_else(|| "_dummy_".to_string());
3516                let dummy_expr = DfExpr::Column(datafusion::common::Column::from_name(&first_col));
3517                let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
3518                let sort_key_expr = sort_key_udf.call(vec![dummy_expr]);
3519                custom_physical_overrides.push((df_sort_exprs.len(), inner_physical));
3520                df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
3521                continue;
3522            }
3523
3524            let df_expr = cypher_expr_to_df(&sort_expr, Some(&ctx))?;
3525            let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
3526            let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema)?;
3527            // Resolve UDFs again: apply_type_coercion may create new dummy UDF
3528            // placeholders (e.g. _cv_to_bool, _cypher_add) that need resolution.
3529            let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
3530
3531            // Single order-preserving sort key: _cypher_sort_key(expr) -> LargeBinary
3532            // The UDF handles all Cypher ordering semantics (cross-type ranks,
3533            // within-type comparisons, temporal normalization, NaN/null placement)
3534            // so memcmp of the resulting bytes gives correct Cypher ORDER BY.
3535            let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
3536            let sort_key_expr = sort_key_udf.call(vec![df_expr]);
3537            df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
3538        }
3539
3540        let mut physical_sort_exprs = create_physical_sort_exprs(
3541            &df_sort_exprs,
3542            &df_schema,
3543            session.state().execution_props(),
3544        )?;
3545
3546        // Replace the inner expression for custom sort expressions.
3547        // The _cypher_sort_key UDF wrapper is already in place; we just need
3548        // to swap the dummy column reference with the actual custom physical expr.
3549        for (idx, custom_inner) in custom_physical_overrides {
3550            if idx < physical_sort_exprs.len() {
3551                let phys = &physical_sort_exprs[idx];
3552                // The physical sort expression wraps _cypher_sort_key(dummy_col).
3553                // We need to replace the inner arg with our custom expression.
3554                // ScalarFunctionExpr wraps the UDF; rebuild it with the correct child.
3555                let sort_key_udf = Arc::new(crate::query::df_udfs::create_cypher_sort_key_udf());
3556                let config_options = Arc::new(datafusion::config::ConfigOptions::default());
3557                let udf_name = sort_key_udf.name().to_string();
3558                let new_sort_key = datafusion::physical_expr::ScalarFunctionExpr::new(
3559                    &udf_name,
3560                    sort_key_udf,
3561                    vec![custom_inner],
3562                    Arc::new(arrow_schema::Field::new(
3563                        "_cypher_sort_key",
3564                        DataType::LargeBinary,
3565                        true,
3566                    )),
3567                    config_options,
3568                );
3569                physical_sort_exprs[idx] = datafusion::physical_expr::PhysicalSortExpr {
3570                    expr: Arc::new(new_sort_key),
3571                    options: phys.options,
3572                };
3573            }
3574        }
3575
3576        // Convert Vec<PhysicalSortExpr> to LexOrdering
3577        // LexOrdering::new returns None for empty vector, so handle that case
3578        let lex_ordering = datafusion::physical_expr::LexOrdering::new(physical_sort_exprs)
3579            .ok_or_else(|| anyhow!("ORDER BY must have at least one sort expression"))?;
3580
3581        Ok(Arc::new(SortExec::new(lex_ordering, input_plan)))
3582    }
3583
3584    /// Plan a limit operation.
3585    fn plan_limit(
3586        &self,
3587        input: &LogicalPlan,
3588        skip: Option<usize>,
3589        fetch: Option<usize>,
3590        all_properties: &HashMap<String, HashSet<String>>,
3591    ) -> Result<Arc<dyn ExecutionPlan>> {
3592        let input_plan = self.plan_internal(input, all_properties)?;
3593
3594        // Handle SKIP via GlobalLimitExec (LocalLimitExec doesn't support offset)
3595        if let Some(offset) = skip.filter(|&s| s > 0) {
3596            use datafusion::physical_plan::limit::GlobalLimitExec;
3597            return Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, fetch)));
3598        }
3599
3600        if let Some(limit) = fetch {
3601            Ok(Arc::new(LocalLimitExec::new(input_plan, limit)))
3602        } else {
3603            // No limit, return input as-is
3604            Ok(input_plan)
3605        }
3606    }
3607
3608    /// Plan a union operation.
3609    fn plan_union(
3610        &self,
3611        left: &LogicalPlan,
3612        right: &LogicalPlan,
3613        all: bool,
3614        all_properties: &HashMap<String, HashSet<String>>,
3615    ) -> Result<Arc<dyn ExecutionPlan>> {
3616        let left_plan = self.plan_internal(left, all_properties)?;
3617        let right_plan = self.plan_internal(right, all_properties)?;
3618
3619        let union_plan = UnionExec::try_new(vec![left_plan, right_plan])?;
3620
3621        // UNION (without ALL) requires deduplication
3622        if !all {
3623            use datafusion::physical_plan::aggregates::{
3624                AggregateExec, AggregateMode, PhysicalGroupBy,
3625            };
3626            use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3627
3628            // First, coalesce all partitions into one to ensure global deduplication
3629            let coalesced = Arc::new(CoalescePartitionsExec::new(union_plan));
3630
3631            // Create group by all columns to deduplicate
3632            let schema = coalesced.schema();
3633            let group_by_exprs: Vec<_> = (0..schema.fields().len())
3634                .map(|i| {
3635                    (
3636                        Arc::new(datafusion::physical_plan::expressions::Column::new(
3637                            schema.field(i).name(),
3638                            i,
3639                        ))
3640                            as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
3641                        schema.field(i).name().clone(),
3642                    )
3643                })
3644                .collect();
3645
3646            let group_by = PhysicalGroupBy::new_single(group_by_exprs);
3647
3648            Ok(Arc::new(AggregateExec::try_new(
3649                AggregateMode::Single,
3650                group_by,
3651                vec![], // No aggregate functions, just grouping for distinct
3652                vec![], // No filters
3653                coalesced,
3654                schema,
3655            )?))
3656        } else {
3657            // UNION ALL - just return the union
3658            Ok(union_plan)
3659        }
3660    }
3661
3662    /// Plan all window functions (aggregate and manual) using DataFusion's WindowAggExec.
3663    ///
3664    /// Translates Cypher window expressions to DataFusion's window function execution plan.
3665    /// Supports both aggregate window functions (SUM, AVG, etc.) via AggregateUDF and
3666    /// manual window functions (ROW_NUMBER, RANK, LAG, etc.) via WindowUDF.
3667    fn plan_window_functions(
3668        &self,
3669        input: Arc<dyn ExecutionPlan>,
3670        window_exprs: &[Expr],
3671        context_plan: Option<&LogicalPlan>,
3672    ) -> Result<Arc<dyn ExecutionPlan>> {
3673        use datafusion::functions_aggregate::average::avg_udaf;
3674        use datafusion::functions_aggregate::count::count_udaf;
3675        use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
3676        use datafusion::functions_aggregate::sum::sum_udaf;
3677        use datafusion::functions_window::lead_lag::{lag_udwf, lead_udwf};
3678        use datafusion::functions_window::nth_value::{
3679            first_value_udwf, last_value_udwf, nth_value_udwf,
3680        };
3681        use datafusion::functions_window::ntile::ntile_udwf;
3682        use datafusion::functions_window::rank::{dense_rank_udwf, rank_udwf};
3683        use datafusion::functions_window::row_number::row_number_udwf;
3684        use datafusion::logical_expr::{WindowFrame, WindowFunctionDefinition};
3685        use datafusion::physical_expr::LexOrdering;
3686        use datafusion::physical_plan::sorts::sort::SortExec;
3687        use datafusion::physical_plan::windows::{WindowAggExec, create_window_expr};
3688
3689        let input_schema = input.schema();
3690        let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
3691
3692        let session = self.session_ctx.read();
3693        let state = session.state();
3694
3695        // Build translation context with variable kinds if we have a logical plan
3696        let tx_ctx = context_plan.map(|p| self.translation_context_for_plan(p));
3697        let mut window_expr_list = Vec::new();
3698
3699        for expr in window_exprs {
3700            let Expr::FunctionCall {
3701                name,
3702                args,
3703                distinct,
3704                window_spec: Some(window_spec),
3705            } = expr
3706            else {
3707                return Err(anyhow!("Expected window function call with OVER clause"));
3708            };
3709
3710            let name_lower = name.to_lowercase();
3711
3712            // Resolve the window function definition: either AggregateUDF or WindowUDF
3713            let (window_fn_def, is_aggregate) = match name_lower.as_str() {
3714                // Aggregate window functions → AggregateUDF
3715                "count" => (WindowFunctionDefinition::AggregateUDF(count_udaf()), true),
3716                "sum" => (WindowFunctionDefinition::AggregateUDF(sum_udaf()), true),
3717                "avg" => (WindowFunctionDefinition::AggregateUDF(avg_udaf()), true),
3718                "min" => (WindowFunctionDefinition::AggregateUDF(min_udaf()), true),
3719                "max" => (WindowFunctionDefinition::AggregateUDF(max_udaf()), true),
3720                // Manual window functions → WindowUDF
3721                "row_number" => (
3722                    WindowFunctionDefinition::WindowUDF(row_number_udwf()),
3723                    false,
3724                ),
3725                "rank" => (WindowFunctionDefinition::WindowUDF(rank_udwf()), false),
3726                "dense_rank" => (
3727                    WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
3728                    false,
3729                ),
3730                "lag" => (WindowFunctionDefinition::WindowUDF(lag_udwf()), false),
3731                "lead" => (WindowFunctionDefinition::WindowUDF(lead_udwf()), false),
3732                "ntile" => {
3733                    // Validate NTILE bucket count: must be positive
3734                    if let Some(Expr::Literal(CypherLiteral::Integer(n))) = args.first()
3735                        && *n <= 0
3736                    {
3737                        return Err(anyhow!("NTILE bucket count must be positive, got: {}", n));
3738                    }
3739                    (WindowFunctionDefinition::WindowUDF(ntile_udwf()), false)
3740                }
3741                "first_value" => (
3742                    WindowFunctionDefinition::WindowUDF(first_value_udwf()),
3743                    false,
3744                ),
3745                "last_value" => (
3746                    WindowFunctionDefinition::WindowUDF(last_value_udwf()),
3747                    false,
3748                ),
3749                "nth_value" => (WindowFunctionDefinition::WindowUDF(nth_value_udwf()), false),
3750                other => return Err(anyhow!("Unsupported window function: {}", other)),
3751            };
3752
3753            // Translate argument expressions to physical expressions
3754            let physical_args: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
3755                if args.is_empty() || matches!(args.as_slice(), [Expr::Wildcard]) {
3756                    // COUNT(*) or zero-arg functions (row_number, rank, dense_rank)
3757                    if is_aggregate {
3758                        vec![create_physical_expr(
3759                            &datafusion::logical_expr::lit(1),
3760                            &df_schema,
3761                            state.execution_props(),
3762                        )?]
3763                    } else {
3764                        // Manual window functions with no args (row_number, rank, dense_rank)
3765                        vec![]
3766                    }
3767                } else {
3768                    args.iter()
3769                        .map(|arg| {
3770                            let mut df_expr = cypher_expr_to_df(arg, tx_ctx.as_ref())?;
3771
3772                            // Cast numeric types only for SUM/AVG aggregate functions:
3773                            // SUM needs Int64 to avoid overflow, AVG needs Float64
3774                            if is_aggregate {
3775                                let cast_type = match name_lower.as_str() {
3776                                    "sum" => Some(datafusion::arrow::datatypes::DataType::Int64),
3777                                    "avg" => Some(datafusion::arrow::datatypes::DataType::Float64),
3778                                    _ => None,
3779                                };
3780                                if let Some(target_type) = cast_type {
3781                                    df_expr = DfExpr::Cast(datafusion::logical_expr::Cast::new(
3782                                        Box::new(df_expr),
3783                                        target_type,
3784                                    ));
3785                                }
3786                            }
3787
3788                            create_physical_expr(&df_expr, &df_schema, state.execution_props())
3789                                .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
3790                        })
3791                        .collect::<Result<Vec<_>>>()?
3792                };
3793
3794            // Translate PARTITION BY expressions to physical expressions
3795            let partition_by_physical: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
3796                window_spec
3797                    .partition_by
3798                    .iter()
3799                    .map(|e| {
3800                        let df_expr = cypher_expr_to_df(e, tx_ctx.as_ref())?;
3801                        create_physical_expr(&df_expr, &df_schema, state.execution_props())
3802                            .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
3803                    })
3804                    .collect::<Result<Vec<_>>>()?;
3805
3806            // Translate ORDER BY expressions to physical sort expressions
3807            let mut order_by_physical: Vec<datafusion::physical_expr::PhysicalSortExpr> =
3808                window_spec
3809                    .order_by
3810                    .iter()
3811                    .map(|sort_item| {
3812                        let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
3813                        let physical_expr =
3814                            create_physical_expr(&df_expr, &df_schema, state.execution_props())
3815                                .map_err(|e| anyhow!("Failed to create physical expr: {}", e))?;
3816                        Ok(datafusion::physical_expr::PhysicalSortExpr {
3817                            expr: physical_expr,
3818                            options: datafusion::arrow::compute::SortOptions {
3819                                descending: !sort_item.ascending,
3820                                nulls_first: !sort_item.ascending, // SQL standard: nulls last for ASC
3821                            },
3822                        })
3823                    })
3824                    .collect::<Result<Vec<_>>>()?;
3825
3826            // DataFusion requires partition columns to have an ordering.
3827            // If ORDER BY is empty but PARTITION BY is not, add partition columns to ordering.
3828            if order_by_physical.is_empty() && !partition_by_physical.is_empty() {
3829                for partition_expr in &partition_by_physical {
3830                    order_by_physical.push(datafusion::physical_expr::PhysicalSortExpr {
3831                        expr: Arc::clone(partition_expr),
3832                        options: datafusion::arrow::compute::SortOptions {
3833                            descending: false,
3834                            nulls_first: false,
3835                        },
3836                    });
3837                }
3838            }
3839
3840            // Create window frame based on function type:
3841            // - Aggregate functions: cumulative when ORDER BY present, full partition when absent
3842            // - Manual window functions: always full partition (frame is irrelevant for ranking,
3843            //   and value functions like last_value/first_value expect full-partition semantics)
3844            let window_frame = if is_aggregate {
3845                if window_spec.order_by.is_empty() {
3846                    // No ORDER BY: aggregate over entire partition
3847                    use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
3848                    Arc::new(WindowFrame::new_bounds(
3849                        WindowFrameUnits::Rows,
3850                        WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
3851                        WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
3852                    ))
3853                } else {
3854                    // With ORDER BY: cumulative from partition start to current row
3855                    Arc::new(WindowFrame::new(Some(false)))
3856                }
3857            } else {
3858                // Manual window functions: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
3859                use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
3860                Arc::new(WindowFrame::new_bounds(
3861                    WindowFrameUnits::Rows,
3862                    WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
3863                    WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
3864                ))
3865            };
3866
3867            // Get the output name
3868            let alias = expr.to_string_repr();
3869
3870            // Create the window expression using DataFusion's create_window_expr
3871            let window_expr = create_window_expr(
3872                &window_fn_def,
3873                alias,
3874                &physical_args,
3875                &partition_by_physical,
3876                &order_by_physical,
3877                window_frame,
3878                input_schema.clone(),
3879                false, // ignore_nulls
3880                *distinct,
3881                None, // filter
3882            )?;
3883
3884            window_expr_list.push(window_expr);
3885        }
3886
3887        // WindowAggExec requires input to be sorted by partition columns + order by columns.
3888        // Create a SortExec to ensure proper ordering.
3889        let mut sort_exprs = Vec::new();
3890
3891        // Add partition columns to sort (must be sorted by partition first)
3892        for expr in window_exprs {
3893            if let Expr::FunctionCall {
3894                window_spec: Some(window_spec),
3895                ..
3896            } = expr
3897            {
3898                for partition_expr in &window_spec.partition_by {
3899                    let df_expr = cypher_expr_to_df(partition_expr, tx_ctx.as_ref())?;
3900                    let physical_expr =
3901                        create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
3902
3903                    // Only add if not already in sort list
3904                    // Use display comparison as proxy for equality since PhysicalExpr doesn't implement Eq
3905                    if !sort_exprs
3906                        .iter()
3907                        .any(|s: &datafusion::physical_expr::PhysicalSortExpr| {
3908                            s.expr.to_string() == physical_expr.to_string()
3909                        })
3910                    {
3911                        sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
3912                            expr: physical_expr,
3913                            options: datafusion::arrow::compute::SortOptions {
3914                                descending: false,
3915                                nulls_first: false,
3916                            },
3917                        });
3918                    }
3919                }
3920
3921                // Then add order by columns
3922                for sort_item in &window_spec.order_by {
3923                    let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
3924                    let physical_expr =
3925                        create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
3926
3927                    sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
3928                        expr: physical_expr,
3929                        options: datafusion::arrow::compute::SortOptions {
3930                            descending: !sort_item.ascending,
3931                            nulls_first: !sort_item.ascending,
3932                        },
3933                    });
3934                }
3935            }
3936        }
3937
3938        // Add SortExec before WindowAggExec if we have partition or order by columns
3939        let sorted_input = if !sort_exprs.is_empty() {
3940            let lex_ordering = LexOrdering::new(sort_exprs)
3941                .ok_or_else(|| anyhow!("Failed to create LexOrdering for window function"))?;
3942            Arc::new(SortExec::new(lex_ordering, input)) as Arc<dyn ExecutionPlan>
3943        } else {
3944            input
3945        };
3946
3947        // Create WindowAggExec
3948        let window_agg_exec = WindowAggExec::try_new(
3949            window_expr_list,
3950            sorted_input,
3951            false, // can_repartition - keep data on current partitions
3952        )?;
3953
3954        Ok(Arc::new(window_agg_exec))
3955    }
3956
3957    /// Plan an empty input that produces exactly one row.
3958    ///
3959    /// In Cypher, `RETURN 1` (without MATCH) expects a single row to project from.
3960    /// This matches the fallback executor behavior which returns `vec![HashMap::new()]`.
3961    fn plan_empty(&self) -> Result<Arc<dyn ExecutionPlan>> {
3962        let schema = Arc::new(Schema::empty());
3963        // Use PlaceholderRowExec to produce exactly one row (like SQL's "SELECT 1").
3964        // EmptyExec produces 0 rows, which breaks `RETURN 1 AS num`.
3965        Ok(Arc::new(PlaceholderRowExec::new(schema)))
3966    }
3967
3968    /// Plan a zero-length path binding.
3969    /// Converts a single node pattern `p = (a)` into a Path with one node and zero edges.
3970    fn plan_bind_zero_length_path(
3971        &self,
3972        input: &LogicalPlan,
3973        node_variable: &str,
3974        path_variable: &str,
3975        all_properties: &HashMap<String, HashSet<String>>,
3976    ) -> Result<Arc<dyn ExecutionPlan>> {
3977        let input_plan = self.plan_internal(input, all_properties)?;
3978        Ok(Arc::new(BindZeroLengthPathExec::new(
3979            input_plan,
3980            node_variable.to_string(),
3981            path_variable.to_string(),
3982            self.graph_ctx.clone(),
3983        )))
3984    }
3985
3986    /// Plan a fixed-length path binding.
3987    /// Synthesizes a path struct from existing node and edge columns.
3988    fn plan_bind_path(
3989        &self,
3990        input: &LogicalPlan,
3991        node_variables: &[String],
3992        edge_variables: &[String],
3993        path_variable: &str,
3994        all_properties: &HashMap<String, HashSet<String>>,
3995    ) -> Result<Arc<dyn ExecutionPlan>> {
3996        let input_plan = self.plan_internal(input, all_properties)?;
3997        Ok(Arc::new(BindFixedPathExec::new(
3998            input_plan,
3999            node_variables.to_vec(),
4000            edge_variables.to_vec(),
4001            path_variable.to_string(),
4002            self.graph_ctx.clone(),
4003        )))
4004    }
4005
4006    /// Extract simple property equality conditions from a Cypher expression tree.
4007    ///
4008    /// Handles patterns generated by `properties_to_expr`:
4009    /// - `variable.prop = literal` → `(prop, value)`
4010    /// - `cond1 AND cond2` → recursive extraction
4011    ///
4012    /// Returns `Vec<(property_name, expected_value)>` for use in L0 edge property
4013    /// checking during VLP BFS.
4014    fn extract_edge_property_conditions(expr: &Expr) -> Vec<(String, uni_common::Value)> {
4015        match expr {
4016            Expr::BinaryOp {
4017                left,
4018                op: uni_cypher::ast::BinaryOp::Eq,
4019                right,
4020            } => {
4021                // Pattern: variable.prop = literal
4022                if let Expr::Property(inner, prop_name) = left.as_ref()
4023                    && matches!(inner.as_ref(), Expr::Variable(_))
4024                    && let Expr::Literal(lit) = right.as_ref()
4025                {
4026                    return vec![(prop_name.clone(), lit.to_value())];
4027                }
4028                // Reverse: literal = variable.prop
4029                if let Expr::Literal(lit) = left.as_ref()
4030                    && let Expr::Property(inner, prop_name) = right.as_ref()
4031                    && matches!(inner.as_ref(), Expr::Variable(_))
4032                {
4033                    return vec![(prop_name.clone(), lit.to_value())];
4034                }
4035                vec![]
4036            }
4037            Expr::BinaryOp {
4038                left,
4039                op: uni_cypher::ast::BinaryOp::And,
4040                right,
4041            } => {
4042                let mut result = Self::extract_edge_property_conditions(left);
4043                result.extend(Self::extract_edge_property_conditions(right));
4044                result
4045            }
4046            _ => vec![],
4047        }
4048    }
4049
4050    /// Create a physical filter expression from a DataFusion logical expression.
4051    ///
4052    /// Applies type coercion to resolve mismatches like Int32 vs Int64
4053    /// before creating the physical expression.
4054    fn create_physical_filter_expr(
4055        &self,
4056        expr: &DfExpr,
4057        schema: &SchemaRef,
4058        session: &SessionContext,
4059    ) -> Result<Arc<dyn datafusion::physical_expr::PhysicalExpr>> {
4060        let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4061        let state = session.state();
4062
4063        // Replace DummyUdf placeholders with registered UDFs
4064        let resolved_expr = Self::resolve_udfs(expr, &state)?;
4065
4066        // Apply type coercion to resolve Int32/Int64, Float32/Float64 mismatches
4067        let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
4068
4069        // Re-resolve UDFs after coercion (coercion may introduce new dummy UDF calls)
4070        let coerced_expr = Self::resolve_udfs(&coerced_expr, &state)?;
4071
4072        // Use SessionState's create_physical_expr to properly resolve UDFs
4073        use datafusion::physical_planner::PhysicalPlanner;
4074        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
4075        let physical = planner.create_physical_expr(&coerced_expr, &df_schema, &state)?;
4076
4077        Ok(physical)
4078    }
4079
4080    /// Resolve DummyUdf placeholders to actual registered UDFs from SessionState.
4081    ///
4082    /// Uses DataFusion's TreeNode API to traverse the entire expression tree,
4083    /// replacing any ScalarFunction nodes whose UDF name matches a registered UDF.
4084    fn resolve_udfs(expr: &DfExpr, state: &datafusion::execution::SessionState) -> Result<DfExpr> {
4085        use datafusion::common::tree_node::{Transformed, TreeNode};
4086        use datafusion::logical_expr::Expr as DfExpr;
4087
4088        let result = expr
4089            .clone()
4090            .transform_up(|node| {
4091                if let DfExpr::ScalarFunction(ref func) = node {
4092                    let udf_name = func.func.name();
4093                    if let Some(registered_udf) = state.scalar_functions().get(udf_name) {
4094                        return Ok(Transformed::yes(DfExpr::ScalarFunction(
4095                            datafusion::logical_expr::expr::ScalarFunction {
4096                                func: registered_udf.clone(),
4097                                args: func.args.clone(),
4098                            },
4099                        )));
4100                    }
4101                }
4102                Ok(Transformed::no(node))
4103            })
4104            .map_err(|e| anyhow::anyhow!("Failed to resolve UDFs: {}", e))?;
4105
4106        Ok(result.data)
4107    }
4108
4109    /// Add a structural projection on top of an execution plan to create a Struct column
4110    /// for a Node or Edge variable.
4111    fn add_structural_projection(
4112        &self,
4113        input: Arc<dyn ExecutionPlan>,
4114        variable: &str,
4115        properties: &[String],
4116    ) -> Result<Arc<dyn ExecutionPlan>> {
4117        use datafusion::functions::expr_fn::named_struct;
4118        use datafusion::logical_expr::lit;
4119        use datafusion::physical_plan::projection::ProjectionExec;
4120
4121        let input_schema = input.schema();
4122        let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4123            Vec::new();
4124
4125        // 1. Keep all existing columns
4126        for (i, field) in input_schema.fields().iter().enumerate() {
4127            let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4128                field.name(),
4129                i,
4130            ));
4131            proj_exprs.push((col_expr, field.name().clone()));
4132        }
4133
4134        // 2. Add the named_struct AS variable
4135        let mut struct_args = Vec::with_capacity(properties.len() * 2 + 4);
4136
4137        // Add _vid field for identity access
4138        struct_args.push(lit("_vid"));
4139        struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4140            format!("{}._vid", variable),
4141        )));
4142
4143        // Add _labels field for labels() function support
4144        struct_args.push(lit("_labels"));
4145        struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4146            format!("{}._labels", variable),
4147        )));
4148
4149        for prop in properties {
4150            struct_args.push(lit(prop.clone()));
4151            struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4152                format!("{}.{}", variable, prop),
4153            )));
4154        }
4155
4156        // If no properties, still create an empty struct to represent the entity
4157        let struct_expr = named_struct(struct_args);
4158
4159        let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
4160        let session = self.session_ctx.read();
4161        let state = session.state();
4162
4163        // Resolve DummyUdf placeholders
4164        let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
4165
4166        use datafusion::physical_planner::PhysicalPlanner;
4167        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
4168        let physical_struct_expr =
4169            planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
4170
4171        proj_exprs.push((physical_struct_expr, variable.to_string()));
4172
4173        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4174    }
4175
4176    /// Add a structural projection for an edge variable (builds a Struct with _eid, _type, _src, _dst + properties).
4177    fn add_edge_structural_projection(
4178        &self,
4179        input: Arc<dyn ExecutionPlan>,
4180        variable: &str,
4181        properties: &[String],
4182        source_variable: &str,
4183        target_variable: &str,
4184    ) -> Result<Arc<dyn ExecutionPlan>> {
4185        use datafusion::functions::expr_fn::named_struct;
4186        use datafusion::logical_expr::lit;
4187        use datafusion::physical_plan::projection::ProjectionExec;
4188
4189        let input_schema = input.schema();
4190        let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4191            Vec::new();
4192
4193        // 1. Keep all existing columns
4194        for (i, field) in input_schema.fields().iter().enumerate() {
4195            let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4196                field.name(),
4197                i,
4198            ));
4199            proj_exprs.push((col_expr, field.name().clone()));
4200        }
4201
4202        // 2. Build named_struct with system fields + properties
4203        let mut struct_args = Vec::with_capacity(properties.len() * 2 + 10);
4204
4205        // Add _eid field for identity access
4206        struct_args.push(lit("_eid"));
4207        struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4208            format!("{}._eid", variable),
4209        )));
4210
4211        struct_args.push(lit("_type"));
4212        struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4213            format!("{}._type", variable),
4214        )));
4215
4216        // Add _src and _dst from source/target variable VIDs so the result
4217        // normalizer can detect this as an edge.
4218        // Use {var}._vid when available, falling back to bare {var} column
4219        // (e.g., in EXISTS subqueries where the source is a parameter VID).
4220        let resolve_vid_col = |var: &str| -> String {
4221            let vid_col = format!("{}._vid", var);
4222            if input_schema.column_with_name(&vid_col).is_some() {
4223                vid_col
4224            } else {
4225                var.to_string()
4226            }
4227        };
4228        let src_col_name = resolve_vid_col(source_variable);
4229        let dst_col_name = resolve_vid_col(target_variable);
4230        struct_args.push(lit("_src"));
4231        struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4232            src_col_name,
4233        )));
4234
4235        struct_args.push(lit("_dst"));
4236        struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4237            dst_col_name,
4238        )));
4239
4240        // Include _all_props if present (for keys()/properties() on schemaless edges)
4241        let all_props_col = format!("{}._all_props", variable);
4242        if input_schema.column_with_name(&all_props_col).is_some() {
4243            struct_args.push(lit("_all_props"));
4244            struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4245                all_props_col,
4246            )));
4247        }
4248
4249        for prop in properties {
4250            struct_args.push(lit(prop.clone()));
4251            struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4252                format!("{}.{}", variable, prop),
4253            )));
4254        }
4255
4256        let struct_expr = named_struct(struct_args);
4257
4258        let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
4259        let session = self.session_ctx.read();
4260        let state = session.state();
4261
4262        let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
4263
4264        use datafusion::physical_planner::PhysicalPlanner;
4265        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
4266        let physical_struct_expr =
4267            planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
4268
4269        proj_exprs.push((physical_struct_expr, variable.to_string()));
4270
4271        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4272    }
4273
4274    /// Create a physical aggregate expression.
4275    fn create_physical_aggregate(
4276        &self,
4277        expr: &DfExpr,
4278        schema: &SchemaRef,
4279        state: &SessionState,
4280    ) -> Result<PhysicalAggregate> {
4281        use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
4282
4283        // Build a DFSchema from the Arrow schema for the function call
4284        let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4285
4286        // The function returns (AggregateFunctionExpr, Option<filter>, Vec<ordering>)
4287        let (agg_expr, filter, _ordering) = create_aggregate_expr_and_maybe_filter(
4288            expr,
4289            &df_schema,
4290            schema.as_ref(),
4291            state.execution_props(),
4292        )?;
4293        Ok((agg_expr, filter))
4294    }
4295
4296    /// Resolve the source VID column for traversal, adding a struct field extraction
4297    /// projection if the source variable is a struct column (e.g., after WITH aggregation).
4298    ///
4299    /// Returns the (possibly modified) input plan and the column name to use as the source VID.
4300    fn resolve_source_vid_col(
4301        input_plan: Arc<dyn ExecutionPlan>,
4302        source_variable: &str,
4303    ) -> Result<(Arc<dyn ExecutionPlan>, String)> {
4304        let source_vid_col = format!("{}._vid", source_variable);
4305        if input_plan
4306            .schema()
4307            .column_with_name(&source_vid_col)
4308            .is_some()
4309        {
4310            return Ok((input_plan, source_vid_col));
4311        }
4312        // Check if the variable is a struct column (entity after WITH aggregation).
4313        // If so, add a projection to extract _vid from the struct.
4314        if let Ok(field) = input_plan.schema().field_with_name(source_variable)
4315            && matches!(
4316                field.data_type(),
4317                datafusion::arrow::datatypes::DataType::Struct(_)
4318            )
4319        {
4320            let enriched = Self::extract_struct_identity_columns(input_plan, source_variable)?;
4321            return Ok((enriched, format!("{}._vid", source_variable)));
4322        }
4323        Ok((input_plan, source_variable.to_string()))
4324    }
4325
4326    /// Add a projection that extracts `{variable}._vid` and `{variable}._labels` from
4327    /// a struct column named `{variable}`. This is needed when an entity variable
4328    /// has been passed through a WITH + aggregation and exists as a struct rather
4329    /// than flat columns.
4330    fn extract_struct_identity_columns(
4331        input: Arc<dyn ExecutionPlan>,
4332        variable: &str,
4333    ) -> Result<Arc<dyn ExecutionPlan>> {
4334        use datafusion::common::ScalarValue;
4335        use datafusion::physical_plan::projection::ProjectionExec;
4336
4337        let schema = input.schema();
4338        let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4339            Vec::new();
4340
4341        // Keep all existing columns
4342        for (i, field) in schema.fields().iter().enumerate() {
4343            let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4344                field.name(),
4345                i,
4346            ));
4347            proj_exprs.push((col_expr, field.name().clone()));
4348        }
4349
4350        // Find the struct column and extract identity fields using get_field UDF
4351        if let Some((struct_idx, struct_field)) = schema
4352            .fields()
4353            .iter()
4354            .enumerate()
4355            .find(|(_, f)| f.name() == variable)
4356            && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
4357        {
4358            let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4359                datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
4360            );
4361            let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
4362                Arc::new(datafusion::logical_expr::ScalarUDF::from(
4363                    datafusion::functions::core::getfield::GetFieldFunc::new(),
4364                ));
4365
4366            // Extract _vid field
4367            if fields.iter().any(|f| f.name() == "_vid") {
4368                let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4369                    Arc::new(datafusion::physical_expr::expressions::Literal::new(
4370                        ScalarValue::Utf8(Some("_vid".to_string())),
4371                    ));
4372                let vid_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
4373                    get_field_udf.clone(),
4374                    vec![struct_col.clone(), field_name],
4375                    schema.as_ref(),
4376                    Arc::new(datafusion::common::config::ConfigOptions::default()),
4377                )?);
4378                proj_exprs.push((vid_expr, format!("{}._vid", variable)));
4379            }
4380
4381            // Extract _labels field
4382            if fields.iter().any(|f| f.name() == "_labels") {
4383                let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4384                    Arc::new(datafusion::physical_expr::expressions::Literal::new(
4385                        ScalarValue::Utf8(Some("_labels".to_string())),
4386                    ));
4387                let labels_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
4388                    get_field_udf,
4389                    vec![struct_col, field_name],
4390                    schema.as_ref(),
4391                    Arc::new(datafusion::common::config::ConfigOptions::default()),
4392                )?);
4393                proj_exprs.push((labels_expr, format!("{}._labels", variable)));
4394            }
4395        }
4396
4397        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4398    }
4399
4400    /// Add a projection that extracts ALL fields from a struct column named `{variable}`
4401    /// as flat `{variable}.{field_name}` columns. Used when a variable that passed through
4402    /// WITH + aggregation (and became a struct) is referenced by property access downstream.
4403    fn extract_all_struct_fields(
4404        input: Arc<dyn ExecutionPlan>,
4405        variable: &str,
4406    ) -> Result<Arc<dyn ExecutionPlan>> {
4407        use datafusion::common::ScalarValue;
4408        use datafusion::physical_plan::projection::ProjectionExec;
4409
4410        let schema = input.schema();
4411        let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4412            Vec::new();
4413
4414        // Keep all existing columns
4415        for (i, field) in schema.fields().iter().enumerate() {
4416            let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4417                field.name(),
4418                i,
4419            ));
4420            proj_exprs.push((col_expr, field.name().clone()));
4421        }
4422
4423        // Find the struct column and extract ALL fields
4424        if let Some((struct_idx, struct_field)) = schema
4425            .fields()
4426            .iter()
4427            .enumerate()
4428            .find(|(_, f)| f.name() == variable)
4429            && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
4430        {
4431            let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4432                datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
4433            );
4434            let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
4435                Arc::new(datafusion::logical_expr::ScalarUDF::from(
4436                    datafusion::functions::core::getfield::GetFieldFunc::new(),
4437                ));
4438
4439            for field in fields.iter() {
4440                let flat_name = format!("{}.{}", variable, field.name());
4441                // Skip if already exists as a flat column
4442                if schema.column_with_name(&flat_name).is_some() {
4443                    continue;
4444                }
4445                let field_lit: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4446                    Arc::new(datafusion::physical_expr::expressions::Literal::new(
4447                        ScalarValue::Utf8(Some(field.name().to_string())),
4448                    ));
4449                let extract_expr =
4450                    Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
4451                        get_field_udf.clone(),
4452                        vec![struct_col.clone(), field_lit],
4453                        schema.as_ref(),
4454                        Arc::new(datafusion::common::config::ConfigOptions::default()),
4455                    )?);
4456                proj_exprs.push((extract_expr, flat_name));
4457            }
4458        }
4459
4460        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4461    }
4462
4463    /// Check if a DataFusion expression refers to a LargeBinary column in the schema.
4464    fn is_large_binary_col(&self, expr: &DfExpr, schema: &SchemaRef) -> bool {
4465        if let DfExpr::Column(col) = expr
4466            && let Ok(field) = schema.field_with_name(&col.name)
4467        {
4468            return matches!(
4469                field.data_type(),
4470                datafusion::arrow::datatypes::DataType::LargeBinary
4471            );
4472        }
4473        // For any other expression type, conservatively return true
4474        // since schemaless properties are stored as LargeBinary
4475        true
4476    }
4477}
4478
4479// ---------------------------------------------------------------------------
4480// Locy operator helpers
4481// ---------------------------------------------------------------------------
4482
4483/// Resolve column names to indices in a schema.
4484/// Strip structural projection columns from a physical plan.
4485///
4486/// Graph scans add `named_struct` columns for node/edge variables (e.g., column `a`
4487/// of type `Struct{_vid, _labels, _all_props}`). When CrossJoined with a derived scan
4488/// Coerce a physical expression from `actual_dt` to `target_dt`.
4489///
4490/// Arrow's CastExpr cannot handle LargeBinary→Float64 because LargeBinary holds
4491/// serialized CypherValue bytes. For these cases, use the `_cypher_to_float64` UDF
4492/// which deserializes properly. For standard numeric coercions (Int64→Float64 etc.)
4493/// we use Arrow's built-in CastExpr.
4494fn coerce_physical_expr(
4495    expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
4496    actual_dt: &DataType,
4497    target_dt: &DataType,
4498    schema: &arrow_schema::Schema,
4499) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
4500    use datafusion::physical_expr::expressions::CastExpr;
4501
4502    match (actual_dt, target_dt) {
4503        // LargeBinary → Float64: use Cypher value deserializer UDF
4504        (DataType::LargeBinary, DataType::Float64) => wrap_cypher_to_float64(expr, schema),
4505        // LargeBinary → Int64: cast through Float64 first (extract number, then truncate)
4506        (DataType::LargeBinary, DataType::Int64) => {
4507            let float_expr = wrap_cypher_to_float64(expr, schema);
4508            Arc::new(CastExpr::new(float_expr, DataType::Int64, None))
4509        }
4510        // Standard Arrow casts (Int64→Float64, Float64→Int64, etc.)
4511        _ => Arc::new(CastExpr::new(expr, target_dt.clone(), None)),
4512    }
4513}
4514
4515/// Wrap a physical expression with `_cypher_to_float64` UDF.
4516fn wrap_cypher_to_float64(
4517    expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
4518    schema: &arrow_schema::Schema,
4519) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
4520    let udf = Arc::new(super::df_udfs::cypher_to_float64_udf());
4521    let config = Arc::new(datafusion::common::config::ConfigOptions::default());
4522    Arc::new(
4523        datafusion::physical_expr::ScalarFunctionExpr::try_new(udf, vec![expr], schema, config)
4524            .expect("CypherToFloat64Udf accepts Any(1) signature"),
4525    )
4526}
4527
4528/// Strip structural projection columns from a physical plan that conflict with
4529/// derived scan column names.
4530///
4531/// Graph scans add `named_struct` columns for node/edge variables (e.g., column `a`
4532/// of type `Struct{_vid, _labels, _all_props}`). When CrossJoined with a derived scan
4533/// that also has a column `a` (UInt64 VID), the duplicate name causes ambiguous
4534/// column resolution. This function removes ONLY those Struct-typed columns whose
4535/// names collide with derived scan columns, preserving non-conflicting struct columns
4536/// (like edge structs) that are needed for typed property access.
4537fn strip_conflicting_structural_columns(
4538    input: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
4539    derived_col_names: &HashSet<&str>,
4540) -> anyhow::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
4541    use datafusion::physical_plan::projection::ProjectionExec;
4542
4543    let schema = input.schema();
4544    let proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = schema
4545        .fields()
4546        .iter()
4547        .enumerate()
4548        .filter(|(_, f)| {
4549            // Remove Struct columns whose names conflict with derived scan columns.
4550            !(matches!(f.data_type(), arrow_schema::DataType::Struct(_))
4551                && derived_col_names.contains(f.name().as_str()))
4552        })
4553        .map(|(i, f)| {
4554            let col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4555                datafusion::physical_expr::expressions::Column::new(f.name(), i),
4556            );
4557            (col, f.name().clone())
4558        })
4559        .collect();
4560
4561    if proj_exprs.len() == schema.fields().len() {
4562        // No conflicting structural columns
4563        return Ok(input);
4564    }
4565
4566    Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4567}
4568
4569fn resolve_column_indices(
4570    schema: &arrow_schema::SchemaRef,
4571    column_names: &[String],
4572) -> anyhow::Result<Vec<usize>> {
4573    column_names
4574        .iter()
4575        .map(|name| {
4576            schema
4577                .index_of(name)
4578                .map_err(|_| anyhow::anyhow!("Column '{}' not found in schema", name))
4579        })
4580        .collect()
4581}
4582
4583/// Resolve BEST BY criteria from `(Expr, ascending)` pairs to `SortCriterion` values.
4584fn resolve_best_by_criteria(
4585    schema: &arrow_schema::SchemaRef,
4586    criteria: &[(Expr, bool)],
4587) -> anyhow::Result<Vec<super::df_graph::locy_best_by::SortCriterion>> {
4588    criteria
4589        .iter()
4590        .map(|(expr, ascending)| {
4591            // Extract candidate column names — try property name first (short),
4592            // then full "var.prop" form, then variable name.
4593            let candidates: Vec<String> = match expr {
4594                Expr::Property(base, prop) => {
4595                    if let Expr::Variable(var) = base.as_ref() {
4596                        vec![prop.clone(), format!("{}.{}", var, prop)]
4597                    } else {
4598                        vec![prop.clone()]
4599                    }
4600                }
4601                Expr::Variable(name) => {
4602                    let short = name.rsplit('.').next().unwrap_or(name).to_string();
4603                    if short != *name {
4604                        vec![short, name.clone()]
4605                    } else {
4606                        vec![name.clone()]
4607                    }
4608                }
4609                _ => {
4610                    return Err(anyhow::anyhow!(
4611                        "BEST BY criteria must be variable or property access"
4612                    ));
4613                }
4614            };
4615            let col_index = candidates
4616                .iter()
4617                .find_map(|name| schema.index_of(name).ok())
4618                .ok_or_else(|| {
4619                    anyhow::anyhow!(
4620                        "BEST BY column '{}' not found",
4621                        candidates.first().unwrap_or(&String::new())
4622                    )
4623                })?;
4624            Ok(super::df_graph::locy_best_by::SortCriterion {
4625                col_index,
4626                ascending: *ascending,
4627                nulls_first: false, // NULLS LAST is Locy default
4628            })
4629        })
4630        .collect()
4631}
4632
4633/// Resolve fold bindings from `(output_name, aggregate_expr)` to `FoldBinding` values.
4634fn resolve_fold_bindings(
4635    schema: &arrow_schema::SchemaRef,
4636    fold_bindings: &[(String, Expr)],
4637) -> anyhow::Result<Vec<super::df_graph::locy_fold::FoldBinding>> {
4638    fold_bindings
4639        .iter()
4640        .map(|(output_name, expr)| {
4641            // Parse aggregate expression: FunctionCall { name, args }
4642            match expr {
4643                Expr::FunctionCall { name, args, .. } => {
4644                    let upper = name.to_uppercase();
4645                    let is_count = matches!(upper.as_str(), "COUNT" | "MCOUNT");
4646
4647                    // COUNT/MCOUNT with zero args → CountAll
4648                    if is_count && args.is_empty() {
4649                        return Ok(super::df_graph::locy_fold::FoldBinding {
4650                            output_name: output_name.clone(),
4651                            kind: super::df_graph::locy_fold::FoldAggKind::CountAll,
4652                            input_col_index: 0, // unused for CountAll
4653                            input_col_name: None,
4654                        });
4655                    }
4656
4657                    let kind = match upper.as_str() {
4658                        "SUM" | "MSUM" => super::df_graph::locy_fold::FoldAggKind::Sum,
4659                        "COUNT" | "MCOUNT" => super::df_graph::locy_fold::FoldAggKind::Count,
4660                        "MAX" | "MMAX" => super::df_graph::locy_fold::FoldAggKind::Max,
4661                        "MIN" | "MMIN" => super::df_graph::locy_fold::FoldAggKind::Min,
4662                        "AVG" => super::df_graph::locy_fold::FoldAggKind::Avg,
4663                        "COLLECT" => super::df_graph::locy_fold::FoldAggKind::Collect,
4664                        "MNOR" => super::df_graph::locy_fold::FoldAggKind::Nor,
4665                        "MPROD" => super::df_graph::locy_fold::FoldAggKind::Prod,
4666                        other => {
4667                            return Err(anyhow::anyhow!(
4668                                "Unsupported FOLD aggregate function: {}",
4669                                other
4670                            ));
4671                        }
4672                    };
4673                    // The LocyProject aliases the aggregate input expression to the
4674                    // fold output name, so look up the output name in the schema.
4675                    let input_col_index = schema
4676                        .index_of(output_name)
4677                        .or_else(|_| {
4678                            // Fallback: try the raw argument column name
4679                            let col_name = match args.first() {
4680                                Some(Expr::Variable(name)) => Some(name.clone()),
4681                                Some(Expr::Property(base, prop)) => {
4682                                    if let Expr::Variable(var) = base.as_ref() {
4683                                        Some(format!("{}.{}", var, prop))
4684                                    } else {
4685                                        None
4686                                    }
4687                                }
4688                                _ => None,
4689                            };
4690                            col_name
4691                                .and_then(|n| schema.index_of(&n).ok())
4692                                .ok_or_else(|| {
4693                                    arrow_schema::ArrowError::SchemaError(format!(
4694                                        "FOLD column '{}' not found",
4695                                        output_name
4696                                    ))
4697                                })
4698                        })
4699                        .map_err(|_| anyhow::anyhow!("FOLD column '{}' not found", output_name))?;
4700                    Ok(super::df_graph::locy_fold::FoldBinding {
4701                        output_name: output_name.clone(),
4702                        kind,
4703                        input_col_index,
4704                        input_col_name: Some(output_name.clone()),
4705                    })
4706                }
4707                _ => Err(anyhow::anyhow!(
4708                    "FOLD binding must be an aggregate function call"
4709                )),
4710            }
4711        })
4712        .collect()
4713}
4714
4715/// Recursively collect variable kinds (node, edge, path) from a LogicalPlan.
4716///
4717/// This information is used by the expression translator to resolve bare variable
4718/// references to their identity columns (e.g., `n` → `n._vid` for nodes).
4719fn collect_variable_kinds(plan: &LogicalPlan, kinds: &mut HashMap<String, VariableKind>) {
4720    match plan {
4721        LogicalPlan::Scan { variable, .. }
4722        | LogicalPlan::ExtIdLookup { variable, .. }
4723        | LogicalPlan::ScanAll { variable, .. }
4724        | LogicalPlan::ScanMainByLabels { variable, .. }
4725        | LogicalPlan::VectorKnn { variable, .. }
4726        | LogicalPlan::InvertedIndexLookup { variable, .. } => {
4727            kinds.insert(variable.clone(), VariableKind::Node);
4728        }
4729        LogicalPlan::Traverse {
4730            input,
4731            source_variable,
4732            target_variable,
4733            step_variable,
4734            path_variable,
4735            is_variable_length,
4736            ..
4737        }
4738        | LogicalPlan::TraverseMainByType {
4739            input,
4740            source_variable,
4741            target_variable,
4742            step_variable,
4743            path_variable,
4744            is_variable_length,
4745            ..
4746        } => {
4747            collect_variable_kinds(input, kinds);
4748            kinds.insert(source_variable.clone(), VariableKind::Node);
4749            kinds.insert(target_variable.clone(), VariableKind::Node);
4750            if let Some(sv) = step_variable {
4751                kinds.insert(sv.clone(), VariableKind::edge_for(*is_variable_length));
4752            }
4753            if let Some(pv) = path_variable {
4754                kinds.insert(pv.clone(), VariableKind::Path);
4755            }
4756        }
4757        LogicalPlan::ShortestPath {
4758            input,
4759            source_variable,
4760            target_variable,
4761            path_variable,
4762            ..
4763        }
4764        | LogicalPlan::AllShortestPaths {
4765            input,
4766            source_variable,
4767            target_variable,
4768            path_variable,
4769            ..
4770        } => {
4771            collect_variable_kinds(input, kinds);
4772            kinds.insert(source_variable.clone(), VariableKind::Node);
4773            kinds.insert(target_variable.clone(), VariableKind::Node);
4774            kinds.insert(path_variable.clone(), VariableKind::Path);
4775        }
4776        LogicalPlan::QuantifiedPattern {
4777            input,
4778            pattern_plan,
4779            path_variable,
4780            start_variable,
4781            binding_variable,
4782            ..
4783        } => {
4784            collect_variable_kinds(input, kinds);
4785            collect_variable_kinds(pattern_plan, kinds);
4786            kinds.insert(start_variable.clone(), VariableKind::Node);
4787            kinds.insert(binding_variable.clone(), VariableKind::Node);
4788            if let Some(pv) = path_variable {
4789                kinds.insert(pv.clone(), VariableKind::Path);
4790            }
4791        }
4792        LogicalPlan::BindZeroLengthPath {
4793            input,
4794            node_variable,
4795            path_variable,
4796        } => {
4797            collect_variable_kinds(input, kinds);
4798            kinds.insert(node_variable.clone(), VariableKind::Node);
4799            kinds.insert(path_variable.clone(), VariableKind::Path);
4800        }
4801        LogicalPlan::BindPath {
4802            input,
4803            node_variables,
4804            edge_variables,
4805            path_variable,
4806        } => {
4807            collect_variable_kinds(input, kinds);
4808            for nv in node_variables {
4809                kinds.insert(nv.clone(), VariableKind::Node);
4810            }
4811            for ev in edge_variables {
4812                kinds.insert(ev.clone(), VariableKind::Edge);
4813            }
4814            kinds.insert(path_variable.clone(), VariableKind::Path);
4815        }
4816        // Wrapper nodes: recurse into input(s)
4817        LogicalPlan::Filter { input, .. }
4818        | LogicalPlan::Project { input, .. }
4819        | LogicalPlan::Sort { input, .. }
4820        | LogicalPlan::Limit { input, .. }
4821        | LogicalPlan::Aggregate { input, .. }
4822        | LogicalPlan::Distinct { input, .. }
4823        | LogicalPlan::Window { input, .. }
4824        | LogicalPlan::Unwind { input, .. }
4825        | LogicalPlan::Create { input, .. }
4826        | LogicalPlan::CreateBatch { input, .. }
4827        | LogicalPlan::Merge { input, .. }
4828        | LogicalPlan::Set { input, .. }
4829        | LogicalPlan::Remove { input, .. }
4830        | LogicalPlan::Delete { input, .. }
4831        | LogicalPlan::Foreach { input, .. }
4832        | LogicalPlan::SubqueryCall { input, .. } => {
4833            collect_variable_kinds(input, kinds);
4834        }
4835        LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
4836            collect_variable_kinds(left, kinds);
4837            collect_variable_kinds(right, kinds);
4838        }
4839        LogicalPlan::Apply {
4840            input, subquery, ..
4841        } => {
4842            collect_variable_kinds(input, kinds);
4843            collect_variable_kinds(subquery, kinds);
4844        }
4845        LogicalPlan::RecursiveCTE {
4846            initial, recursive, ..
4847        } => {
4848            collect_variable_kinds(initial, kinds);
4849            collect_variable_kinds(recursive, kinds);
4850        }
4851        LogicalPlan::Explain { plan } => {
4852            collect_variable_kinds(plan, kinds);
4853        }
4854        LogicalPlan::ProcedureCall {
4855            procedure_name,
4856            yield_items,
4857            ..
4858        } => {
4859            use crate::query::df_graph::procedure_call::map_yield_to_canonical;
4860            for (name, alias) in yield_items {
4861                let var = alias.as_ref().unwrap_or(name);
4862                if matches!(
4863                    procedure_name.as_str(),
4864                    "uni.vector.query" | "uni.fts.query" | "uni.search"
4865                ) {
4866                    let canonical = map_yield_to_canonical(name);
4867                    if canonical == "node" {
4868                        kinds.insert(var.clone(), VariableKind::Node);
4869                    }
4870                    // Scalar yields (distance, score, vid) don't need VariableKind
4871                }
4872                // For schema procedures, yields are all scalars — no entry needed
4873            }
4874        }
4875        // Locy operators — no variable kinds to collect
4876        LogicalPlan::LocyProgram { .. }
4877        | LogicalPlan::LocyFold { .. }
4878        | LogicalPlan::LocyBestBy { .. }
4879        | LogicalPlan::LocyPriority { .. }
4880        | LogicalPlan::LocyDerivedScan { .. }
4881        | LogicalPlan::LocyProject { .. } => {}
4882        // Leaf nodes with no variables or not applicable
4883        LogicalPlan::Empty
4884        | LogicalPlan::CreateVectorIndex { .. }
4885        | LogicalPlan::CreateFullTextIndex { .. }
4886        | LogicalPlan::CreateScalarIndex { .. }
4887        | LogicalPlan::CreateJsonFtsIndex { .. }
4888        | LogicalPlan::DropIndex { .. }
4889        | LogicalPlan::ShowIndexes { .. }
4890        | LogicalPlan::Copy { .. }
4891        | LogicalPlan::Backup { .. }
4892        | LogicalPlan::ShowDatabase
4893        | LogicalPlan::ShowConfig
4894        | LogicalPlan::ShowStatistics
4895        | LogicalPlan::Vacuum
4896        | LogicalPlan::Checkpoint
4897        | LogicalPlan::CopyTo { .. }
4898        | LogicalPlan::CopyFrom { .. }
4899        | LogicalPlan::CreateLabel(_)
4900        | LogicalPlan::CreateEdgeType(_)
4901        | LogicalPlan::AlterLabel(_)
4902        | LogicalPlan::AlterEdgeType(_)
4903        | LogicalPlan::DropLabel(_)
4904        | LogicalPlan::DropEdgeType(_)
4905        | LogicalPlan::CreateConstraint(_)
4906        | LogicalPlan::DropConstraint(_)
4907        | LogicalPlan::ShowConstraints(_) => {}
4908    }
4909}
4910
4911/// Collect node variable names from CREATE/MERGE patterns for startNode/endNode UDFs.
4912///
4913/// These hints are used alongside `variable_kinds` to identify node variables
4914/// in mutation contexts for startNode/endNode resolution.
4915fn collect_mutation_node_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
4916    match plan {
4917        LogicalPlan::Create { input, pattern } => {
4918            collect_node_names_from_pattern(pattern, hints);
4919            collect_mutation_node_hints(input, hints);
4920        }
4921        LogicalPlan::CreateBatch { input, patterns } => {
4922            for pattern in patterns {
4923                collect_node_names_from_pattern(pattern, hints);
4924            }
4925            collect_mutation_node_hints(input, hints);
4926        }
4927        LogicalPlan::Merge { input, pattern, .. } => {
4928            collect_node_names_from_pattern(pattern, hints);
4929            collect_mutation_node_hints(input, hints);
4930        }
4931        // For all other nodes, recurse into inputs
4932        LogicalPlan::Traverse { input, .. }
4933        | LogicalPlan::TraverseMainByType { input, .. }
4934        | LogicalPlan::Filter { input, .. }
4935        | LogicalPlan::Project { input, .. }
4936        | LogicalPlan::Sort { input, .. }
4937        | LogicalPlan::Limit { input, .. }
4938        | LogicalPlan::Aggregate { input, .. }
4939        | LogicalPlan::Distinct { input, .. }
4940        | LogicalPlan::Window { input, .. }
4941        | LogicalPlan::Unwind { input, .. }
4942        | LogicalPlan::Set { input, .. }
4943        | LogicalPlan::Remove { input, .. }
4944        | LogicalPlan::Delete { input, .. }
4945        | LogicalPlan::Foreach { input, .. }
4946        | LogicalPlan::SubqueryCall { input, .. }
4947        | LogicalPlan::ShortestPath { input, .. }
4948        | LogicalPlan::AllShortestPaths { input, .. }
4949        | LogicalPlan::QuantifiedPattern { input, .. }
4950        | LogicalPlan::BindZeroLengthPath { input, .. }
4951        | LogicalPlan::BindPath { input, .. } => {
4952            collect_mutation_node_hints(input, hints);
4953        }
4954        LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
4955            collect_mutation_node_hints(left, hints);
4956            collect_mutation_node_hints(right, hints);
4957        }
4958        LogicalPlan::Apply {
4959            input, subquery, ..
4960        } => {
4961            collect_mutation_node_hints(input, hints);
4962            collect_mutation_node_hints(subquery, hints);
4963        }
4964        LogicalPlan::RecursiveCTE {
4965            initial, recursive, ..
4966        } => {
4967            collect_mutation_node_hints(initial, hints);
4968            collect_mutation_node_hints(recursive, hints);
4969        }
4970        LogicalPlan::Explain { plan } => {
4971            collect_mutation_node_hints(plan, hints);
4972        }
4973        // Leaf nodes — nothing to collect
4974        _ => {}
4975    }
4976}
4977
4978/// Extract node variable names from a single Cypher pattern.
4979fn collect_node_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
4980    for path in &pattern.paths {
4981        for element in &path.elements {
4982            match element {
4983                PatternElement::Node(n) => {
4984                    if let Some(ref v) = n.variable
4985                        && !hints.contains(v)
4986                    {
4987                        hints.push(v.clone());
4988                    }
4989                }
4990                PatternElement::Parenthesized { pattern, .. } => {
4991                    let sub = Pattern {
4992                        paths: vec![pattern.as_ref().clone()],
4993                    };
4994                    collect_node_names_from_pattern(&sub, hints);
4995                }
4996                _ => {}
4997            }
4998        }
4999    }
5000}
5001
5002/// Collect edge (relationship) variable names from CREATE/MERGE patterns.
5003///
5004/// Used by `id()` to resolve edge identity as `_eid` instead of `_vid`.
5005fn collect_mutation_edge_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
5006    match plan {
5007        LogicalPlan::Create { input, pattern } | LogicalPlan::Merge { input, pattern, .. } => {
5008            collect_edge_names_from_pattern(pattern, hints);
5009            collect_mutation_edge_hints(input, hints);
5010        }
5011        LogicalPlan::CreateBatch { input, patterns } => {
5012            for pattern in patterns {
5013                collect_edge_names_from_pattern(pattern, hints);
5014            }
5015            collect_mutation_edge_hints(input, hints);
5016        }
5017        // For all other nodes, recurse into inputs
5018        LogicalPlan::Traverse { input, .. }
5019        | LogicalPlan::TraverseMainByType { input, .. }
5020        | LogicalPlan::Filter { input, .. }
5021        | LogicalPlan::Project { input, .. }
5022        | LogicalPlan::Sort { input, .. }
5023        | LogicalPlan::Limit { input, .. }
5024        | LogicalPlan::Aggregate { input, .. }
5025        | LogicalPlan::Distinct { input, .. }
5026        | LogicalPlan::Window { input, .. }
5027        | LogicalPlan::Unwind { input, .. }
5028        | LogicalPlan::Set { input, .. }
5029        | LogicalPlan::Remove { input, .. }
5030        | LogicalPlan::Delete { input, .. }
5031        | LogicalPlan::Foreach { input, .. }
5032        | LogicalPlan::SubqueryCall { input, .. }
5033        | LogicalPlan::ShortestPath { input, .. }
5034        | LogicalPlan::AllShortestPaths { input, .. }
5035        | LogicalPlan::QuantifiedPattern { input, .. }
5036        | LogicalPlan::BindZeroLengthPath { input, .. }
5037        | LogicalPlan::BindPath { input, .. } => {
5038            collect_mutation_edge_hints(input, hints);
5039        }
5040        LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
5041            collect_mutation_edge_hints(left, hints);
5042            collect_mutation_edge_hints(right, hints);
5043        }
5044        LogicalPlan::Apply {
5045            input, subquery, ..
5046        } => {
5047            collect_mutation_edge_hints(input, hints);
5048            collect_mutation_edge_hints(subquery, hints);
5049        }
5050        LogicalPlan::RecursiveCTE {
5051            initial, recursive, ..
5052        } => {
5053            collect_mutation_edge_hints(initial, hints);
5054            collect_mutation_edge_hints(recursive, hints);
5055        }
5056        LogicalPlan::Explain { plan } => {
5057            collect_mutation_edge_hints(plan, hints);
5058        }
5059        _ => {}
5060    }
5061}
5062
5063/// Extract edge (relationship) variable names from a single Cypher pattern.
5064fn collect_edge_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
5065    for path in &pattern.paths {
5066        for element in &path.elements {
5067            match element {
5068                PatternElement::Relationship(r) => {
5069                    if let Some(ref v) = r.variable
5070                        && !hints.contains(v)
5071                    {
5072                        hints.push(v.clone());
5073                    }
5074                }
5075                PatternElement::Parenthesized { pattern, .. } => {
5076                    let sub = Pattern {
5077                        paths: vec![pattern.as_ref().clone()],
5078                    };
5079                    collect_edge_names_from_pattern(&sub, hints);
5080                }
5081                _ => {}
5082            }
5083        }
5084    }
5085}
5086
5087/// Convert AST Direction to adjacency cache Direction.
5088fn convert_direction(ast_dir: AstDirection) -> Direction {
5089    match ast_dir {
5090        AstDirection::Outgoing => Direction::Outgoing,
5091        AstDirection::Incoming => Direction::Incoming,
5092        AstDirection::Both => Direction::Both,
5093    }
5094}
5095
5096/// Clean VLP target property list derived from planner property collection.
5097///
5098/// Removes the wildcard sentinel `"*"` (not a real property), and ensures
5099/// `_all_props` is loaded when wildcard/non-schema properties require it.
5100fn sanitize_vlp_target_properties(
5101    mut properties: Vec<String>,
5102    target_has_wildcard: bool,
5103    target_label_props: Option<&HashSet<String>>,
5104) -> Vec<String> {
5105    properties.retain(|p| p != "*");
5106
5107    if target_has_wildcard && properties.is_empty() {
5108        properties.push("_all_props".to_string());
5109    }
5110
5111    let has_non_schema_props = properties.iter().any(|p| {
5112        p != "_all_props"
5113            && p != "overflow_json"
5114            && !p.starts_with('_')
5115            && !target_label_props.is_some_and(|props| props.contains(p))
5116    });
5117    if has_non_schema_props && !properties.iter().any(|p| p == "_all_props") {
5118        properties.push("_all_props".to_string());
5119    }
5120
5121    properties
5122}
5123
5124#[cfg(test)]
5125mod tests {
5126    use super::*;
5127
5128    #[test]
5129    fn test_convert_direction() {
5130        assert!(matches!(
5131            convert_direction(AstDirection::Outgoing),
5132            Direction::Outgoing
5133        ));
5134        assert!(matches!(
5135            convert_direction(AstDirection::Incoming),
5136            Direction::Incoming
5137        ));
5138        assert!(matches!(
5139            convert_direction(AstDirection::Both),
5140            Direction::Both
5141        ));
5142    }
5143
5144    #[test]
5145    fn test_sanitize_vlp_target_properties_removes_wildcard() {
5146        let props = vec!["*".to_string(), "name".to_string()];
5147        let label_props = HashSet::from(["name".to_string()]);
5148        let sanitized = sanitize_vlp_target_properties(props, true, Some(&label_props));
5149
5150        assert_eq!(sanitized, vec!["name".to_string()]);
5151    }
5152
5153    #[test]
5154    fn test_sanitize_vlp_target_properties_adds_all_props_for_wildcard_empty() {
5155        let props = vec!["*".to_string()];
5156        let sanitized = sanitize_vlp_target_properties(props, true, None);
5157
5158        assert_eq!(sanitized, vec!["_all_props".to_string()]);
5159    }
5160
5161    #[test]
5162    fn test_sanitize_vlp_target_properties_adds_all_props_for_non_schema() {
5163        let props = vec!["custom_prop".to_string()];
5164        let label_props = HashSet::from(["name".to_string()]);
5165        let sanitized = sanitize_vlp_target_properties(props, false, Some(&label_props));
5166
5167        assert_eq!(
5168            sanitized,
5169            vec!["custom_prop".to_string(), "_all_props".to_string()]
5170        );
5171    }
5172}