Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55// DataFusion engine imports
56use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73/// Collect VIDs from all L0 buffers visible to a query context.
74///
75/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
76/// collects the results. Returns an empty vec when no query context is present.
77fn collect_l0_vids(
78    ctx: Option<&QueryContext>,
79    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81    let mut vids = Vec::new();
82    if let Some(ctx) = ctx {
83        vids.extend(extractor(&ctx.l0.read()));
84        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85            vids.extend(extractor(&tx_l0_arc.read()));
86        }
87        for pending_l0_arc in &ctx.pending_flush_l0s {
88            vids.extend(extractor(&pending_l0_arc.read()));
89        }
90    }
91    vids
92}
93
94/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
95///
96/// This is the fallback for pushdown hydration - if the entity only has system fields
97/// (indicating pushdown didn't load properties), we load all properties here.
98///
99/// System field counts:
100/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
101/// - Vertex: 3 fields (_vid, _label, _uid)
102async fn hydrate_entity_if_needed(
103    map: &mut HashMap<String, Value>,
104    prop_manager: &PropertyManager,
105    ctx: Option<&QueryContext>,
106) {
107    // Check for edge entity
108    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110            tracing::debug!(
111                "Pushdown fallback: hydrating edge {} at execution time",
112                eid_u64
113            );
114            if let Ok(Some(props)) = prop_manager
115                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116                .await
117            {
118                for (key, value) in props {
119                    map.entry(key).or_insert(value);
120                }
121            }
122        } else {
123            tracing::trace!(
124                "Pushdown success: edge {} already has {} properties",
125                eid_u64,
126                map.len() - EDGE_SYSTEM_FIELD_COUNT
127            );
128        }
129        return;
130    }
131
132    // Check for vertex entity
133    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135            tracing::debug!(
136                "Pushdown fallback: hydrating vertex {} at execution time",
137                vid_u64
138            );
139            if let Ok(Some(props)) = prop_manager
140                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141                .await
142            {
143                for (key, value) in props {
144                    map.entry(key).or_insert(value);
145                }
146            }
147        } else {
148            tracing::trace!(
149                "Pushdown success: vertex {} already has {} properties",
150                vid_u64,
151                map.len() - VERTEX_SYSTEM_FIELD_COUNT
152            );
153        }
154    }
155}
156
157/// Promote a VID-string placeholder variable to a `Map`, draining its
158/// dotted system/property columns (`var.<prop>`) into the new map.
159///
160/// Used by `record_batches_to_rows` for search-procedure outputs, where the
161/// bare variable arrives as a VID string rather than a materialized Map.
162fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163    let prefix = format!("{}.", var);
164    let mut map = HashMap::new();
165
166    let dotted_keys: Vec<String> = row
167        .keys()
168        .filter(|k| k.starts_with(&prefix))
169        .cloned()
170        .collect();
171
172    for key in &dotted_keys {
173        let prop_name = &key[prefix.len()..];
174        if let Some(val) = row.remove(key) {
175            map.insert(prop_name.to_string(), val);
176        }
177    }
178
179    // Replace the VID-string placeholder with the constructed Map
180    row.insert(var.to_string(), Value::Map(map));
181}
182
183/// Merge the helper system columns (`var._vid`, `var._labels`, `var._eid`,
184/// `var._type`) and remaining USER property columns (`var.<prop>`) into the
185/// bare `Map` variable, removing the dotted helper columns from `row`.
186///
187/// Merging user properties keeps `RETURN var.prop` consistent after a
188/// unit-subquery SET refreshes the dotted columns: without it the (now stale)
189/// bare entity Struct from the outer scan would override the post-SET values.
190/// Internal helpers (`_all_props`, `_src_vid`, …) and `overflow_json` are
191/// dropped silently.
192fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193    // Merge node system fields (_vid, _labels)
194    let vid_key = format!("{}._vid", var);
195    let labels_key = format!("{}._labels", var);
196
197    let vid_val = row.remove(&vid_key);
198    let labels_val = row.remove(&labels_key);
199
200    if let Some(Value::Map(map)) = row.get_mut(var) {
201        if let Some(v) = vid_val {
202            map.insert("_vid".to_string(), v);
203        }
204        if let Some(v) = labels_val {
205            map.insert("_labels".to_string(), v);
206        }
207    }
208
209    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
210    // These are emitted as helper columns by the traverse exec.
211    // The structural projection already includes them in the struct,
212    // but we still need to remove the dotted helper columns.
213    let eid_key = format!("{}._eid", var);
214    let type_key = format!("{}._type", var);
215
216    let eid_val = row.remove(&eid_key);
217    let type_val = row.remove(&type_key);
218
219    if (eid_val.is_some() || type_val.is_some())
220        && let Some(Value::Map(map)) = row.get_mut(var)
221    {
222        if let Some(v) = eid_val {
223            map.entry("_eid".to_string()).or_insert(v);
224        }
225        if let Some(v) = type_val {
226            map.entry("_type".to_string()).or_insert(v);
227        }
228    }
229
230    // Drain remaining dotted columns, merging surviving USER properties.
231    let prefix = format!("{}.", var);
232    let dotted_keys: Vec<String> = row
233        .keys()
234        .filter(|k| k.starts_with(&prefix))
235        .cloned()
236        .collect();
237    for key in dotted_keys {
238        let prop_name = key[prefix.len()..].to_string();
239        let val = row.remove(&key);
240        if prop_name.starts_with('_') || prop_name == "overflow_json" {
241            continue;
242        }
243        if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244            map.insert(prop_name, val);
245        }
246    }
247}
248
249impl Executor {
250    /// Helper to verify and filter candidates against an optional predicate.
251    ///
252    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
253    /// Returns only VIDs that pass the filter (or are not deleted).
254    async fn verify_and_filter_candidates(
255        &self,
256        mut candidates: Vec<Vid>,
257        variable: &str,
258        filter: Option<&Expr>,
259        ctx: Option<&QueryContext>,
260        prop_manager: &PropertyManager,
261        params: &HashMap<String, Value>,
262    ) -> Result<Vec<Vid>> {
263        candidates.sort_unstable();
264        candidates.dedup();
265
266        let mut verified_vids = Vec::new();
267        for vid in candidates {
268            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269                continue; // Deleted
270            };
271
272            if let Some(expr) = filter {
273                let mut props_map: HashMap<String, Value> = props;
274                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276                let mut row = HashMap::new();
277                row.insert(variable.to_string(), Value::Map(props_map));
278
279                let res = self
280                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
281                    .await?;
282                if res.as_bool().unwrap_or(false) {
283                    verified_vids.push(vid);
284                }
285            } else {
286                verified_vids.push(vid);
287            }
288        }
289
290        Ok(verified_vids)
291    }
292
293    pub(crate) async fn scan_storage_candidates(
294        &self,
295        label_id: u16,
296        variable: &str,
297        filter: Option<&Expr>,
298    ) -> Result<Vec<Vid>> {
299        let schema = self.storage.schema_manager().schema();
300        let label_name = schema
301            .label_name_by_id(label_id)
302            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304        // Generate filter SQL from expression
305        let empty_props = std::collections::HashMap::new();
306        let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307        let filter_sql = filter.and_then(|expr| {
308            LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309        });
310
311        self.storage
312            .scan_vertex_candidates(label_name, filter_sql.as_deref())
313            .await
314    }
315
316    pub(crate) async fn scan_label_with_filter(
317        &self,
318        label_id: u16,
319        variable: &str,
320        filter: Option<&Expr>,
321        ctx: Option<&QueryContext>,
322        prop_manager: &PropertyManager,
323        params: &HashMap<String, Value>,
324    ) -> Result<Vec<Vid>> {
325        let mut candidates = self
326            .scan_storage_candidates(label_id, variable, filter)
327            .await?;
328
329        // Convert label_id to label_name for L0 lookup
330        let schema = self.storage.schema_manager().schema();
331        if let Some(label_name) = schema.label_name_by_id(label_id) {
332            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333        }
334
335        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336            .await
337    }
338
339    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340        // Handle Value::Node directly (has vid field)
341        if let Value::Node(node) = val {
342            return Ok(node.vid);
343        }
344        // Handle Object (node) containing _vid field
345        if let Value::Map(map) = val
346            && let Some(vid_val) = map.get("_vid")
347            && let Some(v) = vid_val.as_u64()
348        {
349            return Ok(Vid::from(v));
350        }
351        // Handle string format
352        if let Some(s) = val.as_str()
353            && let Ok(id) = s.parse::<u64>()
354        {
355            return Ok(Vid::new(id));
356        }
357        // Handle raw u64
358        if let Some(v) = val.as_u64() {
359            return Ok(Vid::from(v));
360        }
361        Err(anyhow!("Invalid Vid format: {:?}", val))
362    }
363
364    /// Find a node value in the row by VID.
365    ///
366    /// Scans all values in the row, looking for a node (Map or Node) whose VID
367    /// matches the target. Returns the full node value if found, or a minimal
368    /// Map with just `_vid` as fallback.
369    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370        for val in row.values() {
371            if let Ok(vid) = Self::vid_from_value(val)
372                && vid == target_vid
373            {
374                return val.clone();
375            }
376        }
377        // Fallback: return minimal node map
378        Value::Map(HashMap::from([(
379            "_vid".to_string(),
380            Value::Int(target_vid.as_u64() as i64),
381        )]))
382    }
383
384    /// Create L0 context, session, and planner for DataFusion execution.
385    ///
386    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
387    /// Returns `(session_ctx, planner, prop_manager_arc)`.
388    pub async fn create_datafusion_planner(
389        &self,
390        prop_manager: &PropertyManager,
391        params: &HashMap<String, Value>,
392    ) -> Result<(
393        Arc<SyncRwLock<SessionContext>>,
394        HybridPhysicalPlanner,
395        Arc<PropertyManager>,
396    )> {
397        let query_ctx = self.get_context().await;
398        let l0_context = match query_ctx {
399            Some(ref ctx) => L0Context::from_query_context(ctx),
400            None => L0Context::empty(),
401        };
402
403        // C2: scans route through the version-pinned storage manager so
404        // post-snapshot L1 ROWS are invisible (the row-existence pin). The
405        // PropertyManager, by contrast, stays on LIVE storage: property
406        // point-reads must honor read-your-writes (a transaction's own
407        // uncommitted edge/vertex properties live in tx_l0, not L1, and a
408        // version filter would hide them — breaking e.g. MERGE's
409        // edge-property match). Cross-transaction property skew on an
410        // already-visible row is caught by OCC at commit, not the pin.
411        let effective_storage = self.effective_storage();
412
413        // Prefer the shared `Arc<PropertyManager>` installed via
414        // `Executor::set_prop_manager` — skips the LRU/Mutex allocation cost
415        // (~80 µs/query) of constructing a fresh, immediately-abandoned
416        // PropertyManager. Falls back to the legacy fresh-construction path
417        // for callers that have not installed one.
418        let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
419            shared.clone()
420        } else {
421            Arc::new(PropertyManager::new(
422                self.storage.clone(),
423                self.storage.schema_manager_arc(),
424                prop_manager.cache_size(),
425            ))
426        };
427
428        // Two-mode session construction (hot/cold template path, main)
429        // fused with plugin-framework scalar/optimizer registration
430        // (plugin-fw).
431        //
432        // Hot path: clone the pre-built `df_session_template` — an O(1)
433        // Arc bump on the inner `SessionState`. Skips the ~140 µs cost of
434        // `SessionContext::new()` + `register_cypher_udfs()`. The template
435        // only has the built-in Cypher UDFs baked in (see
436        // `Uni::build` → `register_cypher_udfs`), so it is ONLY safe to
437        // reuse when this query needs no per-query dynamic registration:
438        //   - no legacy `CustomFunctionRegistry` scalars,
439        //   - no plugin-registered optimizer rules,
440        //   - no host-level plugin scalars (`Uni::load_*_plugin`),
441        //   - no session-scoped plugin scalars.
442        //
443        // Cold path: construct a fresh, isolated `SessionContext` (folding
444        // in any plugin optimizer rules) and register the dynamic scalar
445        // sets so they do not leak into the shared template.
446        let has_custom_udfs = self
447            .custom_function_registry
448            .as_ref()
449            .is_some_and(|r| !r.is_empty());
450        let host_plugin_registry = self
451            .procedure_registry
452            .as_ref()
453            .and_then(|pr| pr.plugin_registry());
454        // Optimizer rules come from the host plugin registry (the legacy
455        // `CustomFunctionRegistry` only ever carried scalar fns). When no
456        // host registry is attached, there are no plugin optimizer rules.
457        let optimizer_providers = host_plugin_registry
458            .as_ref()
459            .map(|pr| pr.optimizer_rules())
460            .unwrap_or_default();
461        let session_local_registry =
462            crate::query::df_udfs_plugin::current_session_plugin_registry();
463        let needs_dynamic_registration = has_custom_udfs
464            || !optimizer_providers.is_empty()
465            || host_plugin_registry.is_some()
466            || session_local_registry.is_some();
467
468        let session = if let (Some(tmpl), false) = (
469            self.df_session_template.as_ref(),
470            needs_dynamic_registration,
471        ) {
472            // Hot path: clone the template (Cypher UDFs already registered).
473            (**tmpl).clone()
474        } else {
475            // Cold path: build a fresh session, optionally with any
476            // plugin-registered optimizer rules folded in.
477            //
478            // M5h: build a `SessionContext` that includes any
479            // plugin-registered optimizer rules. The rule chain is
480            // snapshotted at session-construction time; reload discipline
481            // is M10's problem.
482            let session = if optimizer_providers.is_empty() {
483                SessionContext::new()
484            } else {
485                use datafusion::execution::session_state::SessionStateBuilder;
486                use uni_plugin::traits::operator::OptimizerPhase;
487                let mut builder = SessionStateBuilder::new().with_default_features();
488                for provider in optimizer_providers.iter() {
489                    match provider.phase() {
490                        OptimizerPhase::Logical => {
491                            builder = builder.with_optimizer_rule(provider.rule());
492                        }
493                        OptimizerPhase::Physical => {
494                            if let Some(rule) = provider.physical_rule() {
495                                builder = builder.with_physical_optimizer_rule(rule);
496                            } else {
497                                tracing::debug!(
498                                    target: "uni.plugin.registry",
499                                    "physical-phase provider returned no physical_rule(); skipping"
500                                );
501                            }
502                        }
503                        OptimizerPhase::Both => {
504                            builder = builder.with_optimizer_rule(provider.rule());
505                            if let Some(rule) = provider.physical_rule() {
506                                builder = builder.with_physical_optimizer_rule(rule);
507                            }
508                        }
509                        _ => {
510                            tracing::debug!(
511                                target: "uni.plugin.registry",
512                                "skipping optimizer rule for unknown phase"
513                            );
514                        }
515                    }
516                }
517                let state = builder.build();
518                SessionContext::new_with_state(state)
519            };
520            crate::query::df_udfs::register_cypher_udfs(&session)?;
521            // Instance-scope, legacy `CustomFunctionRegistry` shadow path
522            // (db.register_function() entries + apoc-core mirrors). Routed
523            // through the plugin-registry adapter (plugin-fw).
524            if let Some(ref registry) = self.custom_function_registry {
525                crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
526                    &session, registry,
527                )?;
528            }
529            // Instance-scope, **host** plugin registry (M8.6 follow-up):
530            // `Uni::load_python_plugin` / `Uni::load_rhai_plugin` /
531            // `Uni::load_wasm_*` and other host-level plugin-load paths
532            // register into `UniInner.plugin_registry`. The executor
533            // reaches that registry via the procedure-registry's
534            // attached handle (set by `Uni::build` at construction time).
535            // Without this branch, scalars added through
536            // `Uni::load_*_plugin` were invisible to Cypher's UDF
537            // resolution despite being directly invokable via
538            // `db.plugin_registry().scalar_fn(...)`.
539            if let Some(ref host_pr) = host_plugin_registry {
540                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
541            }
542            // Session-scope (M8.6): when the call is inside a
543            // `scoped_with_session_plugin_registry` scope (set by a host
544            // crate's per-query execution path), register the session's
545            // local plugin scalars *last* so they shadow instance entries
546            // by name (DataFusion's `register_udf` is last-write-wins).
547            if let Some(ref session_local) = session_local_registry {
548                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
549            }
550            session
551        };
552        let session_ctx = Arc::new(SyncRwLock::new(session));
553
554        let mut planner = HybridPhysicalPlanner::with_l0_context(
555            session_ctx.clone(),
556            effective_storage.clone(),
557            l0_context,
558            prop_manager_arc.clone(),
559            effective_storage.schema_manager().schema(),
560            params.clone(),
561            HashMap::new(),
562        );
563
564        planner = planner.with_algo_registry(self.algo_registry.clone());
565        if let Some(ref registry) = self.procedure_registry {
566            planner = planner.with_procedure_registry(registry.clone());
567        }
568        // M5b follow-up #4: thread the host's plugin registry through to
569        // the physical planner so `plan_vector_knn` can consult
570        // `register_index_handle` for plugin-supplied vector index
571        // handles. The procedure registry's attached plugin registry is
572        // the host's actual instance (set by `Uni::build`, and what
573        // `db.plugin_registry()` returns to user code). The legacy
574        // `CustomFunctionRegistry` only ever carried scalar fns (no index
575        // handles / optimizer rules), so it is not a source here.
576        let host_plugin_registry = self
577            .procedure_registry
578            .as_ref()
579            .and_then(|r| r.plugin_registry());
580        if let Some(registry) = host_plugin_registry {
581            planner = planner.with_plugin_registry(registry);
582        }
583        if let Some(ref xervo_runtime) = self.xervo_runtime {
584            planner = planner.with_xervo_runtime(xervo_runtime.clone());
585        }
586        // FU-1 / M11 #6: thread the outer transaction's writer handle
587        // through so declared `WRITE`-mode procedures invoked from
588        // this query can run their Cypher bodies via the write-enabled
589        // inner-query host.
590        if let Some(writer) = &self.writer {
591            planner = planner.with_writer(Arc::clone(writer));
592        }
593
594        Ok((session_ctx, planner, prop_manager_arc))
595    }
596
597    /// Execute a DataFusion physical plan and collect all result batches.
598    pub fn collect_batches(
599        session_ctx: &Arc<SyncRwLock<SessionContext>>,
600        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
601    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
602        Box::pin(async move {
603            use futures::TryStreamExt;
604
605            let task_ctx = session_ctx.read().task_ctx();
606            let partition_count = execution_plan.output_partitioning().partition_count();
607            let mut all_batches = Vec::new();
608            for partition in 0..partition_count {
609                let stream = execution_plan.execute(partition, task_ctx.clone())?;
610                let batches: Vec<RecordBatch> = stream.try_collect().await?;
611                all_batches.extend(batches);
612            }
613            Ok(all_batches)
614        })
615    }
616
617    /// Executes a query using the DataFusion-based engine.
618    ///
619    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
620    /// trees with custom graph operators for graph-specific operations.
621    pub async fn execute_datafusion(
622        &self,
623        plan: LogicalPlan,
624        prop_manager: &PropertyManager,
625        params: &HashMap<String, Value>,
626    ) -> Result<Vec<RecordBatch>> {
627        let (batches, _plan) = self
628            .execute_datafusion_with_plan(plan, prop_manager, params)
629            .await?;
630        Ok(batches)
631    }
632
633    /// Executes a query using the DataFusion-based engine, returning both
634    /// result batches and the physical execution plan.
635    ///
636    /// The returned `Arc<dyn ExecutionPlan>` can be walked to extract per-operator
637    /// metrics (e.g., `output_rows`, `elapsed_compute`) that DataFusion's
638    /// `BaselineMetrics` recorded during execution.
639    pub async fn execute_datafusion_with_plan(
640        &self,
641        plan: LogicalPlan,
642        prop_manager: &PropertyManager,
643        params: &HashMap<String, Value>,
644    ) -> Result<(
645        Vec<RecordBatch>,
646        Arc<dyn datafusion::physical_plan::ExecutionPlan>,
647    )> {
648        let (session_ctx, mut planner, prop_manager_arc) =
649            self.create_datafusion_planner(prop_manager, params).await?;
650
651        // Build MutationContext when the plan contains write operations
652        if Self::contains_write_operations(&plan) {
653            let writer = self
654                .writer
655                .as_ref()
656                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
657                .clone();
658            let query_ctx = self.get_context().await;
659
660            debug_assert!(
661                query_ctx.is_some(),
662                "BUG: query_ctx is None for write operation"
663            );
664
665            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
666                executor: self.clone(),
667                writer,
668                prop_manager: prop_manager_arc,
669                params: params.clone(),
670                query_ctx,
671                tx_l0_override: self.transaction_l0_override.clone(),
672            });
673            planner = planner.with_mutation_context(mutation_ctx);
674            tracing::debug!(
675                plan_type = Self::get_plan_type(&plan),
676                "Mutation routed to DataFusion engine"
677            );
678        }
679
680        let execution_plan = planner.plan(&plan)?;
681        let plan_clone = Arc::clone(&execution_plan);
682        let result = Self::collect_batches(&session_ctx, execution_plan).await;
683
684        // Harvest warnings from the graph execution context after query completion.
685        let graph_warnings = planner.graph_ctx().take_warnings();
686        if !graph_warnings.is_empty()
687            && let Ok(mut w) = self.warnings.lock()
688        {
689            w.extend(graph_warnings);
690        }
691
692        result.map(|batches| (batches, plan_clone))
693    }
694
695    /// Execute a MERGE read sub-plan through the DataFusion engine.
696    ///
697    /// Plans and executes the MERGE pattern match using flat columnar output
698    /// (no structural projections), then groups dotted columns (`a._vid`,
699    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
700    pub(crate) async fn execute_merge_read_plan(
701        &self,
702        plan: LogicalPlan,
703        prop_manager: &PropertyManager,
704        params: &HashMap<String, Value>,
705        merge_variables: Vec<String>,
706    ) -> Result<Vec<HashMap<String, Value>>> {
707        let (session_ctx, planner, _prop_manager_arc) =
708            self.create_datafusion_planner(prop_manager, params).await?;
709
710        // Plan with full property access ("*") for all merge variables so that
711        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
712        let extra: HashMap<String, HashSet<String>> = merge_variables
713            .iter()
714            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
715            .collect();
716        let execution_plan = planner.plan_with_properties(&plan, extra)?;
717        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
718
719        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
720        let flat_rows = self.record_batches_to_rows(all_batches)?;
721
722        // Group dotted columns into per-variable Maps for MERGE's match logic.
723        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
724        let rows = flat_rows
725            .into_iter()
726            .map(|mut row| {
727                for var in &merge_variables {
728                    // Skip if already materialized (e.g., by record_batches_to_rows)
729                    if row.contains_key(var) {
730                        continue;
731                    }
732                    let prefix = format!("{}.", var);
733                    let dotted_keys: Vec<String> = row
734                        .keys()
735                        .filter(|k| k.starts_with(&prefix))
736                        .cloned()
737                        .collect();
738                    if !dotted_keys.is_empty() {
739                        let mut map = HashMap::new();
740                        for key in dotted_keys {
741                            let prop_name = key[prefix.len()..].to_string();
742                            if let Some(val) = row.remove(&key) {
743                                map.insert(prop_name, val);
744                            }
745                        }
746                        row.insert(var.clone(), Value::Map(map));
747                    }
748                }
749                row
750            })
751            .collect();
752
753        Ok(rows)
754    }
755
756    /// Converts DataFusion RecordBatches to row-based HashMap format.
757    ///
758    /// Handles special metadata on fields:
759    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
760    ///
761    /// Also normalizes path structures to user-facing format (converts _vid to _id).
762    pub(crate) fn record_batches_to_rows(
763        &self,
764        batches: Vec<RecordBatch>,
765    ) -> Result<Vec<HashMap<String, Value>>> {
766        let mut rows = Vec::new();
767
768        for batch in batches {
769            let num_rows = batch.num_rows();
770            let schema = batch.schema();
771
772            for row_idx in 0..num_rows {
773                let mut row = HashMap::new();
774
775                for (col_idx, field) in schema.fields().iter().enumerate() {
776                    let column = batch.column(col_idx);
777                    // Infer Uni DataType from Arrow type for DateTime/Time struct decoding
778                    let data_type =
779                        if uni_common::core::schema::is_datetime_struct(field.data_type()) {
780                            Some(&uni_common::DataType::DateTime)
781                        } else if uni_common::core::schema::is_time_struct(field.data_type()) {
782                            Some(&uni_common::DataType::Time)
783                        } else {
784                            None
785                        };
786                    let mut value =
787                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
788
789                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
790                    // Parse JSON string to restore the original type
791                    if field
792                        .metadata()
793                        .get("cv_encoded")
794                        .is_some_and(|v| v == "true")
795                        && let Value::String(s) = &value
796                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
797                    {
798                        value = Value::from(parsed);
799                    }
800
801                    // Normalize path structures to user-facing format
802                    value = Self::normalize_path_if_needed(value);
803
804                    row.insert(field.name().clone(), value);
805                }
806
807                // Merge system fields into bare variable maps.
808                // The projection step emits helper columns like "n._vid" and "n._labels"
809                // alongside the materialized "n" column (a Map of user properties).
810                // Here we merge those system fields into the map and remove the helpers.
811                //
812                // For search procedures (vector/FTS), the bare variable may be a VID
813                // string placeholder rather than a Map. In that case, promote it to a
814                // Map so we can merge system fields and properties into it.
815                let bare_vars: Vec<String> = row
816                    .keys()
817                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
818                    .cloned()
819                    .collect();
820
821                // Detect VID-placeholder variables that have system columns
822                // (e.g., "node" is String("1") but "node._vid" exists).
823                // Promote these to Maps so system fields can be merged in.
824                let vid_placeholder_vars: Vec<String> = row
825                    .keys()
826                    .filter(|k| {
827                        !k.contains('.')
828                            && matches!(row.get(*k), Some(Value::String(_)))
829                            && row.contains_key(&format!("{}._vid", k))
830                    })
831                    .cloned()
832                    .collect();
833
834                for var in &vid_placeholder_vars {
835                    promote_vid_placeholder(&mut row, var);
836                }
837
838                for var in &bare_vars {
839                    merge_dotted_columns(&mut row, var);
840                }
841
842                rows.push(row);
843            }
844        }
845
846        Ok(rows)
847    }
848
849    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
850    ///
851    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
852    /// Other values are returned unchanged to avoid interfering with query execution.
853    fn normalize_path_if_needed(value: Value) -> Value {
854        match value {
855            Value::Map(map)
856                if map.contains_key("nodes")
857                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
858            {
859                Self::normalize_path_map(map)
860            }
861            other => other,
862        }
863    }
864
865    /// Normalize a path map object.
866    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
867        // Normalize nodes array
868        if let Some(Value::List(nodes)) = map.remove("nodes") {
869            let normalized_nodes: Vec<Value> = nodes
870                .into_iter()
871                .map(|n| {
872                    if let Value::Map(node_map) = n {
873                        Self::normalize_path_node_map(node_map)
874                    } else {
875                        n
876                    }
877                })
878                .collect();
879            map.insert("nodes".to_string(), Value::List(normalized_nodes));
880        }
881
882        // Normalize relationships array (may be called "relationships" or "edges")
883        let rels_key = if map.contains_key("relationships") {
884            "relationships"
885        } else {
886            "edges"
887        };
888        if let Some(Value::List(rels)) = map.remove(rels_key) {
889            let normalized_rels: Vec<Value> = rels
890                .into_iter()
891                .map(|r| {
892                    if let Value::Map(rel_map) = r {
893                        Self::normalize_path_edge_map(rel_map)
894                    } else {
895                        r
896                    }
897                })
898                .collect();
899            map.insert("relationships".to_string(), Value::List(normalized_rels));
900        }
901
902        Value::Map(map)
903    }
904
905    /// Convert a Value to its string representation for path normalization.
906    fn value_to_id_string(val: Value) -> String {
907        match val {
908            Value::Int(n) => n.to_string(),
909            Value::Float(n) => n.to_string(),
910            Value::String(s) => s,
911            other => other.to_string(),
912        }
913    }
914
915    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
916    /// When `src_key == dst_key`, this simply stringifies the value in place.
917    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
918        if let Some(val) = map.remove(src_key) {
919            map.insert(
920                dst_key.to_string(),
921                Value::String(Self::value_to_id_string(val)),
922            );
923        }
924    }
925
926    /// Ensure the "properties" field is a non-null map.
927    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
928        match map.get("properties") {
929            Some(props) if !props.is_null() => {}
930            _ => {
931                map.insert("properties".to_string(), Value::Map(HashMap::new()));
932            }
933        }
934    }
935
936    /// Normalize a node within a path to user-facing format.
937    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
938        Self::stringify_map_field(&mut map, "_vid", "_id");
939        Self::ensure_properties_map(&mut map);
940        Value::Map(map)
941    }
942
943    /// Normalize an edge within a path to user-facing format.
944    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
945        Self::stringify_map_field(&mut map, "_eid", "_id");
946        Self::stringify_map_field(&mut map, "_src", "_src");
947        Self::stringify_map_field(&mut map, "_dst", "_dst");
948
949        if let Some(type_name) = map.remove("_type_name") {
950            map.insert("_type".to_string(), type_name);
951        }
952
953        Self::ensure_properties_map(&mut map);
954        Value::Map(map)
955    }
956
957    #[instrument(
958        skip(self, prop_manager, params),
959        fields(rows_returned, duration_ms),
960        level = "info"
961    )]
962    pub fn execute<'a>(
963        &'a self,
964        plan: LogicalPlan,
965        prop_manager: &'a PropertyManager,
966        params: &'a HashMap<String, Value>,
967    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
968        Box::pin(async move {
969            let query_type = Self::get_plan_type(&plan);
970            let ctx = self.get_context().await;
971            let start = Instant::now();
972
973            // Route DDL/Admin queries to the fallback executor.
974            // All other queries (including similar_to) flow through DataFusion.
975            let res = if Self::is_ddl_or_admin(&plan) {
976                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
977                    .await
978            } else {
979                let batches = self.execute_datafusion(plan, prop_manager, params).await?;
980                self.record_batches_to_rows(batches)
981            };
982
983            let duration = start.elapsed();
984            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
985                .record(duration.as_secs_f64());
986
987            tracing::Span::current().record("duration_ms", duration.as_millis());
988            match &res {
989                Ok(rows) => {
990                    tracing::Span::current().record("rows_returned", rows.len());
991                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
992                        .increment(rows.len() as u64);
993                }
994                Err(e) => {
995                    let error_type = if e.to_string().contains("timed out") {
996                        "timeout"
997                    } else if e.to_string().contains("syntax") {
998                        "syntax"
999                    } else {
1000                        "execution"
1001                    };
1002                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
1003                }
1004            }
1005
1006            res
1007        })
1008    }
1009
1010    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1011        match plan {
1012            LogicalPlan::Scan { .. } => "read_scan",
1013            LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1014            LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1015            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1016            LogicalPlan::Traverse { .. } => "read_traverse",
1017            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1018            LogicalPlan::ScanAll { .. } => "read_scan_all",
1019            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1020            LogicalPlan::VectorKnn { .. } => "read_vector",
1021            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1022            LogicalPlan::Merge { .. } => "write_merge",
1023            LogicalPlan::Delete { .. } => "write_delete",
1024            LogicalPlan::Set { .. } => "write_set",
1025            LogicalPlan::Remove { .. } => "write_remove",
1026            LogicalPlan::ProcedureCall { .. } => "call",
1027            LogicalPlan::Copy { .. } => "copy",
1028            LogicalPlan::Backup { .. } => "backup",
1029            _ => "other",
1030        }
1031    }
1032
1033    /// Return all direct child plan references from a `LogicalPlan`.
1034    ///
1035    /// This centralizes the variant→children mapping so that recursive walkers
1036    /// (e.g., `contains_write_operations`) can delegate the
1037    /// "recurse into children" logic instead of duplicating the match arms.
1038    ///
1039    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
1040    /// is not included because it requires special iteration. Callers that
1041    /// need to inspect the body should handle `Foreach` before falling through.
1042    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1043        match plan {
1044            // Single-input wrappers
1045            LogicalPlan::Project { input, .. }
1046            | LogicalPlan::Sort { input, .. }
1047            | LogicalPlan::Limit { input, .. }
1048            | LogicalPlan::Distinct { input }
1049            | LogicalPlan::Aggregate { input, .. }
1050            | LogicalPlan::Window { input, .. }
1051            | LogicalPlan::Unwind { input, .. }
1052            | LogicalPlan::Filter { input, .. }
1053            | LogicalPlan::Create { input, .. }
1054            | LogicalPlan::CreateBatch { input, .. }
1055            | LogicalPlan::Set { input, .. }
1056            | LogicalPlan::Remove { input, .. }
1057            | LogicalPlan::Delete { input, .. }
1058            | LogicalPlan::Merge { input, .. }
1059            | LogicalPlan::Foreach { input, .. }
1060            | LogicalPlan::Traverse { input, .. }
1061            | LogicalPlan::TraverseMainByType { input, .. }
1062            | LogicalPlan::BindZeroLengthPath { input, .. }
1063            | LogicalPlan::BindPath { input, .. }
1064            | LogicalPlan::ShortestPath { input, .. }
1065            | LogicalPlan::AllShortestPaths { input, .. }
1066            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1067
1068            // Two-input wrappers
1069            LogicalPlan::Apply {
1070                input, subquery, ..
1071            }
1072            | LogicalPlan::SubqueryCall { input, subquery } => {
1073                vec![input.as_ref(), subquery.as_ref()]
1074            }
1075            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1076                vec![left.as_ref(), right.as_ref()]
1077            }
1078            LogicalPlan::RecursiveCTE {
1079                initial, recursive, ..
1080            } => vec![initial.as_ref(), recursive.as_ref()],
1081            LogicalPlan::QuantifiedPattern {
1082                input,
1083                pattern_plan,
1084                ..
1085            } => vec![input.as_ref(), pattern_plan.as_ref()],
1086
1087            // Leaf nodes (scans, DDL, admin, etc.)
1088            _ => vec![],
1089        }
1090    }
1091
1092    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
1093    ///
1094    /// These operations don't produce data streams and aren't supported by the
1095    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
1096    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
1097    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
1098    pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1099        match plan {
1100            // DDL / schema operations
1101            LogicalPlan::CreateLabel(_)
1102            | LogicalPlan::CreateEdgeType(_)
1103            | LogicalPlan::AlterLabel(_)
1104            | LogicalPlan::AlterEdgeType(_)
1105            | LogicalPlan::DropLabel(_)
1106            | LogicalPlan::DropEdgeType(_)
1107            | LogicalPlan::CreateConstraint(_)
1108            | LogicalPlan::DropConstraint(_)
1109            | LogicalPlan::ShowConstraints(_) => true,
1110
1111            // Index operations
1112            LogicalPlan::CreateVectorIndex { .. }
1113            | LogicalPlan::CreateFullTextIndex { .. }
1114            | LogicalPlan::CreateScalarIndex { .. }
1115            | LogicalPlan::CreateJsonFtsIndex { .. }
1116            | LogicalPlan::DropIndex { .. }
1117            | LogicalPlan::ShowIndexes { .. } => true,
1118
1119            // Admin / utility operations
1120            LogicalPlan::ShowDatabase
1121            | LogicalPlan::ShowConfig
1122            | LogicalPlan::ShowStatistics
1123            | LogicalPlan::Vacuum
1124            | LogicalPlan::Checkpoint
1125            | LogicalPlan::Copy { .. }
1126            | LogicalPlan::CopyTo { .. }
1127            | LogicalPlan::CopyFrom { .. }
1128            | LogicalPlan::Backup { .. }
1129            | LogicalPlan::Explain { .. } => true,
1130
1131            // Procedure calls: DF-eligible procedures go through DataFusion,
1132            // everything else (DDL, admin, unknown) stays on fallback.
1133            LogicalPlan::ProcedureCall { procedure_name, .. } => {
1134                !Self::is_df_eligible_procedure(procedure_name)
1135            }
1136
1137            // Recurse through children using plan_children
1138            _ => Self::plan_children(plan)
1139                .iter()
1140                .any(|child| Self::is_ddl_or_admin(child)),
1141        }
1142    }
1143
1144    /// Returns `true` if the procedure is a read-only, data-producing procedure
1145    /// that can be executed through the DataFusion engine.
1146    ///
1147    /// This is a **positive allowlist** — unknown procedures default to the
1148    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
1149    fn is_df_eligible_procedure(name: &str) -> bool {
1150        matches!(
1151            name,
1152            "uni.schema.labels"
1153                | "uni.schema.edgeTypes"
1154                | "uni.schema.relationshipTypes"
1155                | "uni.schema.indexes"
1156                | "uni.schema.constraints"
1157                | "uni.schema.labelInfo"
1158                | "uni.vector.query"
1159                | "uni.fts.query"
1160                | "uni.search"
1161                // M5g — `uni.create.vNode` produces a typed Node yield
1162                // via the planner-level node-shape expansion in
1163                // `GraphProcedureCallExec`; only the DataFusion path
1164                // honours that, so route it there explicitly. (vEdge
1165                // emits a self-contained Struct column and works on
1166                // either path — list both for symmetry.)
1167                | "uni.create.vNode"
1168                | "uni.create.vEdge"
1169        ) || name.starts_with("uni.algo.")
1170    }
1171
1172    /// Check if a plan contains write/mutation operations anywhere in the tree.
1173    ///
1174    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
1175    /// are used to determine when a MutationContext needs to be built for DataFusion.
1176    /// This recurses through read-only wrapper nodes to detect writes nested inside
1177    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
1178    fn contains_write_operations(plan: &LogicalPlan) -> bool {
1179        match plan {
1180            LogicalPlan::Create { .. }
1181            | LogicalPlan::CreateBatch { .. }
1182            | LogicalPlan::Merge { .. }
1183            | LogicalPlan::Delete { .. }
1184            | LogicalPlan::Set { .. }
1185            | LogicalPlan::Remove { .. }
1186            | LogicalPlan::Foreach { .. } => true,
1187            _ => Self::plan_children(plan)
1188                .iter()
1189                .any(|child| Self::contains_write_operations(child)),
1190        }
1191    }
1192
1193    /// Executes a query as a stream of result batches.
1194    ///
1195    /// Routes DDL/Admin through the fallback executor and everything else
1196    /// through DataFusion.
1197    pub fn execute_stream(
1198        self,
1199        plan: LogicalPlan,
1200        prop_manager: Arc<PropertyManager>,
1201        params: HashMap<String, Value>,
1202    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1203        let this = self;
1204        let this_for_ctx = this.clone();
1205
1206        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1207
1208        ctx_stream
1209            .flat_map(move |ctx| {
1210                let plan = plan.clone();
1211                let this = this.clone();
1212                let prop_manager = prop_manager.clone();
1213                let params = params.clone();
1214
1215                let fut = async move {
1216                    if Self::is_ddl_or_admin(&plan) {
1217                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1218                            .await
1219                    } else {
1220                        let batches = this
1221                            .execute_datafusion(plan, &prop_manager, &params)
1222                            .await?;
1223                        this.record_batches_to_rows(batches)
1224                    }
1225                };
1226                stream::once(fut).boxed()
1227            })
1228            .boxed()
1229    }
1230
1231    /// Converts an Arrow array element at a given row index to a Value.
1232    /// Delegates to the shared implementation in arrow_convert module.
1233    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1234        arrow_convert::arrow_to_value(col, row, None)
1235    }
1236
1237    pub(crate) fn evaluate_expr<'a>(
1238        &'a self,
1239        expr: &'a Expr,
1240        row: &'a HashMap<String, Value>,
1241        prop_manager: &'a PropertyManager,
1242        params: &'a HashMap<String, Value>,
1243        ctx: Option<&'a QueryContext>,
1244    ) -> BoxFuture<'a, Result<Value>> {
1245        let this = self;
1246        Box::pin(async move {
1247            // First check if the expression itself is already pre-computed in the row
1248            let repr = expr.to_string_repr();
1249            if let Some(val) = row.get(&repr) {
1250                return Ok(val.clone());
1251            }
1252
1253            match expr {
1254                Expr::PatternComprehension { .. } => {
1255                    // Handled by DataFusion path via PatternComprehensionExecExpr
1256                    Err(anyhow::anyhow!(
1257                        "Pattern comprehensions are handled by DataFusion executor"
1258                    ))
1259                }
1260                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1261                    "COLLECT subqueries not yet supported in executor"
1262                )),
1263                Expr::Variable(name) => {
1264                    if let Some(val) = row.get(name) {
1265                        Ok(val.clone())
1266                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1267                        // Fallback: scan results may have system columns like "d._vid"
1268                        // without a materialized "d" Map. Return the VID so Property
1269                        // evaluation can fetch properties from storage.
1270                        Ok(vid_val.clone())
1271                    } else {
1272                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1273                    }
1274                }
1275                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1276                Expr::Property(var_expr, prop_name) => {
1277                    // Fast path: if the base is a Variable, try flat-key lookup first.
1278                    // DataFusion scan results use flat keys like "d.embedding" rather than
1279                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1280                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1281                        let flat_key = format!("{}.{}", var_name, prop_name);
1282                        if let Some(val) = row.get(flat_key.as_str()) {
1283                            return Ok(val.clone());
1284                        }
1285                    }
1286
1287                    let base_val = this
1288                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1289                        .await?;
1290
1291                    // Handle system properties _vid and _id directly
1292                    if (prop_name == "_vid" || prop_name == "_id")
1293                        && let Ok(vid) = Self::vid_from_value(&base_val)
1294                    {
1295                        return Ok(Value::Int(vid.as_u64() as i64));
1296                    }
1297
1298                    // Handle Value::Node - access properties directly or via prop manager
1299                    if let Value::Node(node) = &base_val {
1300                        // Handle system properties
1301                        if prop_name == "_vid" || prop_name == "_id" {
1302                            return Ok(Value::Int(node.vid.as_u64() as i64));
1303                        }
1304                        if prop_name == "_labels" {
1305                            return Ok(Value::List(
1306                                node.labels
1307                                    .iter()
1308                                    .map(|l| Value::String(l.clone()))
1309                                    .collect(),
1310                            ));
1311                        }
1312                        // Check in-memory properties first
1313                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1314                            return Ok(val.clone());
1315                        }
1316                        // Fallback to storage lookup
1317                        if let Ok(val) = prop_manager
1318                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1319                            .await
1320                        {
1321                            return Ok(val);
1322                        }
1323                        return Ok(Value::Null);
1324                    }
1325
1326                    // Handle Value::Edge - access properties directly or via prop manager
1327                    if let Value::Edge(edge) = &base_val {
1328                        // Handle system properties
1329                        if prop_name == "_eid" || prop_name == "_id" {
1330                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1331                        }
1332                        if prop_name == "_type" {
1333                            return Ok(Value::String(edge.edge_type.clone()));
1334                        }
1335                        if prop_name == "_src" {
1336                            return Ok(Value::Int(edge.src.as_u64() as i64));
1337                        }
1338                        if prop_name == "_dst" {
1339                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1340                        }
1341                        // Check in-memory properties first
1342                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1343                            return Ok(val.clone());
1344                        }
1345                        // Fallback to storage lookup
1346                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1347                        {
1348                            return Ok(val);
1349                        }
1350                        return Ok(Value::Null);
1351                    }
1352
1353                    // If base_val is an object (node/edge), check its properties first
1354                    // This handles properties from CREATE/MERGE that may not be persisted yet
1355                    if let Value::Map(map) = &base_val {
1356                        // First check top-level (for system properties like _id, _label, etc.)
1357                        if let Some(val) = map.get(prop_name.as_str()) {
1358                            return Ok(val.clone());
1359                        }
1360                        // Then check inside "properties" object (for user properties)
1361                        if let Some(Value::Map(props)) = map.get("properties")
1362                            && let Some(val) = props.get(prop_name.as_str())
1363                        {
1364                            return Ok(val.clone());
1365                        }
1366                        // Fallback to storage lookup using _vid or _id
1367                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1368                            map.get("_id")
1369                                .and_then(|v| v.as_str())
1370                                .and_then(|s| s.parse::<u64>().ok())
1371                        });
1372                        if let Some(id) = vid_opt {
1373                            let vid = Vid::from(id);
1374                            if let Ok(val) = prop_manager
1375                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1376                                .await
1377                            {
1378                                return Ok(val);
1379                            }
1380                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1381                            let eid = uni_common::core::id::Eid::from(id);
1382                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1383                                return Ok(val);
1384                            }
1385                        }
1386                        return Ok(Value::Null);
1387                    }
1388
1389                    // If base_val is just a VID, fetch from property manager
1390                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1391                        return prop_manager
1392                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1393                            .await;
1394                    }
1395
1396                    if base_val.is_null() {
1397                        return Ok(Value::Null);
1398                    }
1399
1400                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1401                    {
1402                        use crate::query::datetime::{
1403                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1404                            is_duration_string, is_temporal_accessor, is_temporal_string,
1405                        };
1406
1407                        // Handle Value::Temporal directly (no string parsing needed)
1408                        if let Value::Temporal(tv) = &base_val {
1409                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1410                                if is_duration_accessor(prop_name) {
1411                                    // Convert to string for the existing accessor logic
1412                                    return eval_duration_accessor(
1413                                        &base_val.to_string(),
1414                                        prop_name,
1415                                    );
1416                                }
1417                            } else if is_temporal_accessor(prop_name) {
1418                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1419                            }
1420                        }
1421
1422                        // Handle Value::String temporal (backward compat)
1423                        if let Value::String(s) = &base_val {
1424                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1425                                return eval_temporal_accessor(s, prop_name);
1426                            }
1427                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1428                                return eval_duration_accessor(s, prop_name);
1429                            }
1430                        }
1431                    }
1432
1433                    Err(anyhow!(
1434                        "Cannot access property '{}' on {:?}",
1435                        prop_name,
1436                        base_val
1437                    ))
1438                }
1439                Expr::ArrayIndex {
1440                    array: arr_expr,
1441                    index: idx_expr,
1442                } => {
1443                    let arr_val = this
1444                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1445                        .await?;
1446                    let idx_val = this
1447                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1448                        .await?;
1449
1450                    if let Value::List(arr) = &arr_val {
1451                        // Handle signed indices (allow negative)
1452                        if let Some(i) = idx_val.as_i64() {
1453                            let idx = if i < 0 {
1454                                // Negative index: -1 = last element, -2 = second to last, etc.
1455                                let positive_idx = arr.len() as i64 + i;
1456                                if positive_idx < 0 {
1457                                    return Ok(Value::Null); // Out of bounds
1458                                }
1459                                positive_idx as usize
1460                            } else {
1461                                i as usize
1462                            };
1463                            if idx < arr.len() {
1464                                return Ok(arr[idx].clone());
1465                            }
1466                            return Ok(Value::Null);
1467                        } else if idx_val.is_null() {
1468                            return Ok(Value::Null);
1469                        } else {
1470                            return Err(anyhow::anyhow!(
1471                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1472                                idx_val
1473                            ));
1474                        }
1475                    }
1476                    if let Value::Map(map) = &arr_val {
1477                        if let Some(key) = idx_val.as_str() {
1478                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1479                        } else if !idx_val.is_null() {
1480                            return Err(anyhow::anyhow!(
1481                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1482                                idx_val
1483                            ));
1484                        }
1485                    }
1486                    // Handle bracket access on Node: n['name'] returns property
1487                    if let Value::Node(node) = &arr_val {
1488                        if let Some(key) = idx_val.as_str() {
1489                            // Check in-memory properties first
1490                            if let Some(val) = node.properties.get(key) {
1491                                return Ok(val.clone());
1492                            }
1493                            // Fallback to property manager
1494                            if let Ok(val) = prop_manager
1495                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1496                                .await
1497                            {
1498                                return Ok(val);
1499                            }
1500                            return Ok(Value::Null);
1501                        } else if !idx_val.is_null() {
1502                            return Err(anyhow::anyhow!(
1503                                "TypeError: Node index must be a string, got: {:?}",
1504                                idx_val
1505                            ));
1506                        }
1507                    }
1508                    // Handle bracket access on Edge: e['property'] returns property
1509                    if let Value::Edge(edge) = &arr_val {
1510                        if let Some(key) = idx_val.as_str() {
1511                            // Check in-memory properties first
1512                            if let Some(val) = edge.properties.get(key) {
1513                                return Ok(val.clone());
1514                            }
1515                            // Fallback to property manager
1516                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1517                                return Ok(val);
1518                            }
1519                            return Ok(Value::Null);
1520                        } else if !idx_val.is_null() {
1521                            return Err(anyhow::anyhow!(
1522                                "TypeError: Edge index must be a string, got: {:?}",
1523                                idx_val
1524                            ));
1525                        }
1526                    }
1527                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1528                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1529                        && let Some(key) = idx_val.as_str()
1530                    {
1531                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1532                        {
1533                            return Ok(val);
1534                        }
1535                        return Ok(Value::Null);
1536                    }
1537                    if arr_val.is_null() {
1538                        return Ok(Value::Null);
1539                    }
1540                    Err(anyhow!(
1541                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1542                        arr_val
1543                    ))
1544                }
1545                Expr::ArraySlice { array, start, end } => {
1546                    let arr_val = this
1547                        .evaluate_expr(array, row, prop_manager, params, ctx)
1548                        .await?;
1549
1550                    if let Value::List(arr) = &arr_val {
1551                        let len = arr.len();
1552
1553                        // Evaluate start index (default to 0), null → null result
1554                        let start_idx = if let Some(s) = start {
1555                            let v = this
1556                                .evaluate_expr(s, row, prop_manager, params, ctx)
1557                                .await?;
1558                            if v.is_null() {
1559                                return Ok(Value::Null);
1560                            }
1561                            let raw = v.as_i64().unwrap_or(0);
1562                            if raw < 0 {
1563                                (len as i64 + raw).max(0) as usize
1564                            } else {
1565                                (raw as usize).min(len)
1566                            }
1567                        } else {
1568                            0
1569                        };
1570
1571                        // Evaluate end index (default to length), null → null result
1572                        let end_idx = if let Some(e) = end {
1573                            let v = this
1574                                .evaluate_expr(e, row, prop_manager, params, ctx)
1575                                .await?;
1576                            if v.is_null() {
1577                                return Ok(Value::Null);
1578                            }
1579                            let raw = v.as_i64().unwrap_or(len as i64);
1580                            if raw < 0 {
1581                                (len as i64 + raw).max(0) as usize
1582                            } else {
1583                                (raw as usize).min(len)
1584                            }
1585                        } else {
1586                            len
1587                        };
1588
1589                        // Return sliced array
1590                        if start_idx >= end_idx {
1591                            return Ok(Value::List(vec![]));
1592                        }
1593                        let end_idx = end_idx.min(len);
1594                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1595                    }
1596
1597                    if arr_val.is_null() {
1598                        return Ok(Value::Null);
1599                    }
1600                    Err(anyhow!("Cannot slice {:?}", arr_val))
1601                }
1602                Expr::Literal(lit) => Ok(lit.to_value()),
1603                Expr::List(items) => {
1604                    let mut vals = Vec::new();
1605                    for item in items {
1606                        vals.push(
1607                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1608                                .await?,
1609                        );
1610                    }
1611                    Ok(Value::List(vals))
1612                }
1613                Expr::Map(items) => {
1614                    let mut map = HashMap::new();
1615                    for (key, value_expr) in items {
1616                        let val = this
1617                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1618                            .await?;
1619                        map.insert(key.clone(), val);
1620                    }
1621                    Ok(Value::Map(map))
1622                }
1623                Expr::Exists { query, .. } => {
1624                    // Plan and execute subquery; failures return false (pattern doesn't match)
1625                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1626                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1627
1628                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1629                        Ok(plan) => {
1630                            let mut sub_params = params.clone();
1631                            sub_params.extend(row.clone());
1632
1633                            match this.execute(plan, prop_manager, &sub_params).await {
1634                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1635                                Err(e) => {
1636                                    log::debug!("EXISTS subquery execution failed: {}", e);
1637                                    Ok(Value::Bool(false))
1638                                }
1639                            }
1640                        }
1641                        Err(e) => {
1642                            log::debug!("EXISTS subquery planning failed: {}", e);
1643                            Ok(Value::Bool(false))
1644                        }
1645                    }
1646                }
1647                Expr::CountSubquery(query) => {
1648                    // Similar to Exists but returns count
1649                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1650
1651                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1652
1653                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1654                        Ok(plan) => {
1655                            let mut sub_params = params.clone();
1656                            sub_params.extend(row.clone());
1657
1658                            match this.execute(plan, prop_manager, &sub_params).await {
1659                                Ok(results) => Ok(Value::from(results.len() as i64)),
1660                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1661                            }
1662                        }
1663                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1664                    }
1665                }
1666                Expr::Quantifier {
1667                    quantifier,
1668                    variable,
1669                    list,
1670                    predicate,
1671                } => {
1672                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1673                    //
1674                    // This is the primary execution path for quantifiers because DataFusion
1675                    // does not support lambda functions yet. Queries with quantifiers attempt
1676                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1677                    // to this fallback executor path.
1678                    //
1679                    // This is intentional design - we get correct semantics with row-by-row
1680                    // evaluation until DataFusion adds lambda support.
1681                    //
1682                    // See: https://github.com/apache/datafusion/issues/14205
1683
1684                    // Evaluate the list expression
1685                    let list_val = this
1686                        .evaluate_expr(list, row, prop_manager, params, ctx)
1687                        .await?;
1688
1689                    // Handle null propagation
1690                    if list_val.is_null() {
1691                        return Ok(Value::Null);
1692                    }
1693
1694                    // Convert to array
1695                    let items = match list_val {
1696                        Value::List(arr) => arr,
1697                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1698                    };
1699
1700                    // Evaluate predicate for each item
1701                    let mut satisfied_count = 0;
1702                    for item in &items {
1703                        // Create new row with bound variable
1704                        let mut item_row = row.clone();
1705                        item_row.insert(variable.clone(), item.clone());
1706
1707                        // Evaluate predicate with bound variable
1708                        let pred_result = this
1709                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1710                            .await?;
1711
1712                        // Check if predicate is satisfied
1713                        if let Value::Bool(true) = pred_result {
1714                            satisfied_count += 1;
1715                        }
1716                    }
1717
1718                    // Return based on quantifier type
1719                    let result = match quantifier {
1720                        Quantifier::All => satisfied_count == items.len(),
1721                        Quantifier::Any => satisfied_count > 0,
1722                        Quantifier::Single => satisfied_count == 1,
1723                        Quantifier::None => satisfied_count == 0,
1724                    };
1725
1726                    Ok(Value::Bool(result))
1727                }
1728                Expr::ListComprehension {
1729                    variable,
1730                    list,
1731                    where_clause,
1732                    map_expr,
1733                } => {
1734                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1735                    //
1736                    // Similar to quantifiers, this requires lambda-like evaluation
1737                    // which DataFusion doesn't support yet. This is the primary execution path.
1738
1739                    // Evaluate the list expression
1740                    let list_val = this
1741                        .evaluate_expr(list, row, prop_manager, params, ctx)
1742                        .await?;
1743
1744                    // Handle null propagation
1745                    if list_val.is_null() {
1746                        return Ok(Value::Null);
1747                    }
1748
1749                    // Convert to array
1750                    let items = match list_val {
1751                        Value::List(arr) => arr,
1752                        _ => {
1753                            return Err(anyhow!(
1754                                "List comprehension expects a list, got: {:?}",
1755                                list_val
1756                            ));
1757                        }
1758                    };
1759
1760                    // Collect mapped values
1761                    let mut results = Vec::new();
1762                    for item in &items {
1763                        // Create new row with bound variable
1764                        let mut item_row = row.clone();
1765                        item_row.insert(variable.clone(), item.clone());
1766
1767                        // Apply WHERE filter if present
1768                        if let Some(predicate) = where_clause {
1769                            let pred_result = this
1770                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1771                                .await?;
1772
1773                            // Skip items that don't match the filter
1774                            if !matches!(pred_result, Value::Bool(true)) {
1775                                continue;
1776                            }
1777                        }
1778
1779                        // Apply map expression
1780                        let mapped_val = this
1781                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1782                            .await?;
1783                        results.push(mapped_val);
1784                    }
1785
1786                    Ok(Value::List(results))
1787                }
1788                Expr::BinaryOp { left, op, right } => {
1789                    // Short-circuit evaluation for AND/OR
1790                    match op {
1791                        BinaryOp::And => {
1792                            let l_val = this
1793                                .evaluate_expr(left, row, prop_manager, params, ctx)
1794                                .await?;
1795                            // Short-circuit: if left is false, don't evaluate right
1796                            if let Some(false) = l_val.as_bool() {
1797                                return Ok(Value::Bool(false));
1798                            }
1799                            let r_val = this
1800                                .evaluate_expr(right, row, prop_manager, params, ctx)
1801                                .await?;
1802                            eval_binary_op(&l_val, op, &r_val)
1803                        }
1804                        BinaryOp::Or => {
1805                            let l_val = this
1806                                .evaluate_expr(left, row, prop_manager, params, ctx)
1807                                .await?;
1808                            // Short-circuit: if left is true, don't evaluate right
1809                            if let Some(true) = l_val.as_bool() {
1810                                return Ok(Value::Bool(true));
1811                            }
1812                            let r_val = this
1813                                .evaluate_expr(right, row, prop_manager, params, ctx)
1814                                .await?;
1815                            eval_binary_op(&l_val, op, &r_val)
1816                        }
1817                        _ => {
1818                            // For all other operators, evaluate both sides
1819                            let l_val = this
1820                                .evaluate_expr(left, row, prop_manager, params, ctx)
1821                                .await?;
1822                            let r_val = this
1823                                .evaluate_expr(right, row, prop_manager, params, ctx)
1824                                .await?;
1825                            eval_binary_op(&l_val, op, &r_val)
1826                        }
1827                    }
1828                }
1829                Expr::In { expr, list } => {
1830                    let l_val = this
1831                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1832                        .await?;
1833                    let r_val = this
1834                        .evaluate_expr(list, row, prop_manager, params, ctx)
1835                        .await?;
1836                    eval_in_op(&l_val, &r_val)
1837                }
1838                Expr::UnaryOp { op, expr } => {
1839                    let val = this
1840                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1841                        .await?;
1842                    match op {
1843                        UnaryOp::Not => {
1844                            // Three-valued logic: NOT null = null
1845                            match val.as_bool() {
1846                                Some(b) => Ok(Value::Bool(!b)),
1847                                None if val.is_null() => Ok(Value::Null),
1848                                None => Err(anyhow!(
1849                                    "InvalidArgumentType: NOT requires a boolean argument"
1850                                )),
1851                            }
1852                        }
1853                        UnaryOp::Neg => {
1854                            if let Some(i) = val.as_i64() {
1855                                Ok(Value::Int(-i))
1856                            } else if let Some(f) = val.as_f64() {
1857                                Ok(Value::Float(-f))
1858                            } else {
1859                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1860                            }
1861                        }
1862                    }
1863                }
1864                Expr::IsNull(expr) => {
1865                    let val = this
1866                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1867                        .await?;
1868                    Ok(Value::Bool(val.is_null()))
1869                }
1870                Expr::IsNotNull(expr) => {
1871                    let val = this
1872                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1873                        .await?;
1874                    Ok(Value::Bool(!val.is_null()))
1875                }
1876                Expr::IsUnique(_) => {
1877                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1878                    Err(anyhow!(
1879                        "IS UNIQUE can only be used in constraint definitions"
1880                    ))
1881                }
1882                Expr::Case {
1883                    expr,
1884                    when_then,
1885                    else_expr,
1886                } => {
1887                    if let Some(base_expr) = expr {
1888                        let base_val = this
1889                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1890                            .await?;
1891                        for (w, t) in when_then {
1892                            let w_val = this
1893                                .evaluate_expr(w, row, prop_manager, params, ctx)
1894                                .await?;
1895                            if base_val == w_val {
1896                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1897                            }
1898                        }
1899                    } else {
1900                        for (w, t) in when_then {
1901                            let w_val = this
1902                                .evaluate_expr(w, row, prop_manager, params, ctx)
1903                                .await?;
1904                            if w_val.as_bool() == Some(true) {
1905                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1906                            }
1907                        }
1908                    }
1909                    if let Some(e) = else_expr {
1910                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1911                    }
1912                    Ok(Value::Null)
1913                }
1914                Expr::Wildcard => Ok(Value::Null),
1915                Expr::FunctionCall { name, args, .. } => {
1916                    // Special case: id() returns VID for nodes and EID for relationships
1917                    if name.eq_ignore_ascii_case("ID") {
1918                        if args.len() != 1 {
1919                            return Err(anyhow!("id() requires exactly 1 argument"));
1920                        }
1921                        let val = this
1922                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1923                            .await?;
1924                        if let Value::Map(map) = &val {
1925                            // Check for _vid (vertex) first
1926                            if let Some(vid_val) = map.get("_vid") {
1927                                return Ok(vid_val.clone());
1928                            }
1929                            // Check for _eid (edge/relationship)
1930                            if let Some(eid_val) = map.get("_eid") {
1931                                return Ok(eid_val.clone());
1932                            }
1933                            // Check for _id (fallback)
1934                            if let Some(id_val) = map.get("_id") {
1935                                return Ok(id_val.clone());
1936                            }
1937                        }
1938                        return Ok(Value::Null);
1939                    }
1940
1941                    // Special case: elementId() returns string format "label_id:local_offset"
1942                    if name.eq_ignore_ascii_case("ELEMENTID") {
1943                        if args.len() != 1 {
1944                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1945                        }
1946                        let val = this
1947                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1948                            .await?;
1949                        if let Value::Map(map) = &val {
1950                            // Check for _vid (vertex) first
1951                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1952                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1953                                return Ok(Value::String(vid_val.to_string()));
1954                            }
1955                            // Check for _eid (edge/relationship)
1956                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1957                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1958                                return Ok(Value::String(eid_val.to_string()));
1959                            }
1960                        }
1961                        return Ok(Value::Null);
1962                    }
1963
1964                    // Special case: type() returns the relationship type name
1965                    if name.eq_ignore_ascii_case("TYPE") {
1966                        if args.len() != 1 {
1967                            return Err(anyhow!("type() requires exactly 1 argument"));
1968                        }
1969                        let val = this
1970                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1971                            .await?;
1972                        if let Value::Map(map) = &val
1973                            && let Some(type_val) = map.get("_type")
1974                        {
1975                            // Numeric _type is an edge type ID; string _type is already a name
1976                            if let Some(type_id) =
1977                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1978                            {
1979                                if let Some(name) = this
1980                                    .storage
1981                                    .schema_manager()
1982                                    .edge_type_name_by_id_unified(type_id)
1983                                {
1984                                    return Ok(Value::String(name));
1985                                }
1986                            } else if let Some(name) = type_val.as_str() {
1987                                return Ok(Value::String(name.to_string()));
1988                            }
1989                        }
1990                        return Ok(Value::Null);
1991                    }
1992
1993                    // Special case: labels() returns the labels of a node
1994                    if name.eq_ignore_ascii_case("LABELS") {
1995                        if args.len() != 1 {
1996                            return Err(anyhow!("labels() requires exactly 1 argument"));
1997                        }
1998                        let val = this
1999                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2000                            .await?;
2001                        if let Value::Map(map) = &val
2002                            && let Some(labels_val) = map.get("_labels")
2003                        {
2004                            return Ok(labels_val.clone());
2005                        }
2006                        return Ok(Value::Null);
2007                    }
2008
2009                    // Special case: properties() returns the properties map of a node/edge
2010                    if name.eq_ignore_ascii_case("PROPERTIES") {
2011                        if args.len() != 1 {
2012                            return Err(anyhow!("properties() requires exactly 1 argument"));
2013                        }
2014                        let val = this
2015                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2016                            .await?;
2017                        if let Value::Map(map) = &val {
2018                            // Filter out internal properties (those starting with _)
2019                            let mut props = HashMap::new();
2020                            for (k, v) in map.iter() {
2021                                if !k.starts_with('_') {
2022                                    props.insert(k.clone(), v.clone());
2023                                }
2024                            }
2025                            return Ok(Value::Map(props));
2026                        }
2027                        return Ok(Value::Null);
2028                    }
2029
2030                    // Special case: startNode() returns the start node of a relationship
2031                    if name.eq_ignore_ascii_case("STARTNODE") {
2032                        if args.len() != 1 {
2033                            return Err(anyhow!("startNode() requires exactly 1 argument"));
2034                        }
2035                        let val = this
2036                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2037                            .await?;
2038                        if let Value::Edge(edge) = &val {
2039                            return Ok(Self::find_node_by_vid(row, edge.src));
2040                        }
2041                        if let Value::Map(map) = &val {
2042                            if let Some(start_node) = map.get("_startNode") {
2043                                return Ok(start_node.clone());
2044                            }
2045                            if let Some(src_vid) = map.get("_src_vid") {
2046                                return Ok(Value::Map(HashMap::from([(
2047                                    "_vid".to_string(),
2048                                    src_vid.clone(),
2049                                )])));
2050                            }
2051                            // Resolve _src VID by looking up node in row
2052                            if let Some(src_id) = map.get("_src")
2053                                && let Some(u) = src_id.as_u64()
2054                            {
2055                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2056                            }
2057                        }
2058                        return Ok(Value::Null);
2059                    }
2060
2061                    // Special case: endNode() returns the end node of a relationship
2062                    if name.eq_ignore_ascii_case("ENDNODE") {
2063                        if args.len() != 1 {
2064                            return Err(anyhow!("endNode() requires exactly 1 argument"));
2065                        }
2066                        let val = this
2067                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2068                            .await?;
2069                        if let Value::Edge(edge) = &val {
2070                            return Ok(Self::find_node_by_vid(row, edge.dst));
2071                        }
2072                        if let Value::Map(map) = &val {
2073                            if let Some(end_node) = map.get("_endNode") {
2074                                return Ok(end_node.clone());
2075                            }
2076                            if let Some(dst_vid) = map.get("_dst_vid") {
2077                                return Ok(Value::Map(HashMap::from([(
2078                                    "_vid".to_string(),
2079                                    dst_vid.clone(),
2080                                )])));
2081                            }
2082                            // Resolve _dst VID by looking up node in row
2083                            if let Some(dst_id) = map.get("_dst")
2084                                && let Some(u) = dst_id.as_u64()
2085                            {
2086                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2087                            }
2088                        }
2089                        return Ok(Value::Null);
2090                    }
2091
2092                    // Special case: hasLabel() checks if a node has a specific label
2093                    // Used for WHERE n:Label predicates
2094                    if name.eq_ignore_ascii_case("HASLABEL") {
2095                        if args.len() != 2 {
2096                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2097                        }
2098                        let node_val = this
2099                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2100                            .await?;
2101                        let label_val = this
2102                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2103                            .await?;
2104
2105                        let label_to_check = label_val.as_str().ok_or_else(|| {
2106                            anyhow!("Second argument to hasLabel must be a string")
2107                        })?;
2108
2109                        let has_label = match &node_val {
2110                            // Handle proper Value::Node type (from result normalization)
2111                            Value::Map(map) if map.contains_key("_vid") => {
2112                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2113                                    labels_arr
2114                                        .iter()
2115                                        .any(|l| l.as_str() == Some(label_to_check))
2116                                } else {
2117                                    false
2118                                }
2119                            }
2120                            // Also handle legacy Object format
2121                            Value::Map(map) => {
2122                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2123                                    labels_arr
2124                                        .iter()
2125                                        .any(|l| l.as_str() == Some(label_to_check))
2126                                } else {
2127                                    false
2128                                }
2129                            }
2130                            _ => false,
2131                        };
2132                        return Ok(Value::Bool(has_label));
2133                    }
2134
2135                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
2136                    // These should be parsed as Expr::Quantifier instead.
2137                    if matches!(
2138                        name.to_uppercase().as_str(),
2139                        "ANY" | "ALL" | "NONE" | "SINGLE"
2140                    ) {
2141                        return Err(anyhow!(
2142                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2143                            name.to_lowercase()
2144                        ));
2145                    }
2146
2147                    // Special case: COALESCE needs short-circuit evaluation
2148                    if name.eq_ignore_ascii_case("COALESCE") {
2149                        for arg in args {
2150                            let val = this
2151                                .evaluate_expr(arg, row, prop_manager, params, ctx)
2152                                .await?;
2153                            if !val.is_null() {
2154                                return Ok(val);
2155                            }
2156                        }
2157                        return Ok(Value::Null);
2158                    }
2159
2160                    // Special case: vector_similarity has dedicated implementation
2161                    if name.eq_ignore_ascii_case("vector_similarity") {
2162                        if args.len() != 2 {
2163                            return Err(anyhow!("vector_similarity takes 2 arguments"));
2164                        }
2165                        let v1 = this
2166                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2167                            .await?;
2168                        let v2 = this
2169                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2170                            .await?;
2171                        return eval_vector_similarity(&v1, &v2);
2172                    }
2173
2174                    // Special case: uni.validAt handles node fetching
2175                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
2176                        || name.eq_ignore_ascii_case("uni.validAt")
2177                        || name.eq_ignore_ascii_case("validAt")
2178                    {
2179                        if args.len() != 4 {
2180                            return Err(anyhow!("validAt requires 4 arguments"));
2181                        }
2182                        let node_val = this
2183                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2184                            .await?;
2185                        let start_prop = this
2186                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2187                            .await?
2188                            .as_str()
2189                            .ok_or(anyhow!("start_prop must be string"))?
2190                            .to_string();
2191                        let end_prop = this
2192                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2193                            .await?
2194                            .as_str()
2195                            .ok_or(anyhow!("end_prop must be string"))?
2196                            .to_string();
2197                        let time_val = this
2198                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2199                            .await?;
2200
2201                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2202                            anyhow!("time argument must be a datetime value or string")
2203                        })?;
2204
2205                        // Fetch temporal property values - supports both vertices and edges
2206                        let valid_from_val: Option<Value> = if let Ok(vid) =
2207                            Self::vid_from_value(&node_val)
2208                        {
2209                            // Vertex case - VID string format
2210                            prop_manager
2211                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2212                                .await
2213                                .ok()
2214                        } else if let Value::Map(map) = &node_val {
2215                            // Check for embedded _vid or _eid in object
2216                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2217                                let vid = Vid::from(vid_val);
2218                                prop_manager
2219                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2220                                    .await
2221                                    .ok()
2222                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2223                                // Edge case
2224                                let eid = uni_common::core::id::Eid::from(eid_val);
2225                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2226                            } else {
2227                                // Inline object - property embedded directly
2228                                map.get(&start_prop).cloned()
2229                            }
2230                        } else {
2231                            return Ok(Value::Bool(false));
2232                        };
2233
2234                        let valid_from = match valid_from_val {
2235                            Some(ref v) => match value_to_datetime_utc(v) {
2236                                Some(dt) => dt,
2237                                None if v.is_null() => return Ok(Value::Bool(false)),
2238                                None => {
2239                                    return Err(anyhow!(
2240                                        "Property {} must be a datetime value or string",
2241                                        start_prop
2242                                    ));
2243                                }
2244                            },
2245                            None => return Ok(Value::Bool(false)),
2246                        };
2247
2248                        let valid_to_val: Option<Value> = if let Ok(vid) =
2249                            Self::vid_from_value(&node_val)
2250                        {
2251                            // Vertex case - VID string format
2252                            prop_manager
2253                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2254                                .await
2255                                .ok()
2256                        } else if let Value::Map(map) = &node_val {
2257                            // Check for embedded _vid or _eid in object
2258                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2259                                let vid = Vid::from(vid_val);
2260                                prop_manager
2261                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2262                                    .await
2263                                    .ok()
2264                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2265                                // Edge case
2266                                let eid = uni_common::core::id::Eid::from(eid_val);
2267                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2268                            } else {
2269                                // Inline object - property embedded directly
2270                                map.get(&end_prop).cloned()
2271                            }
2272                        } else {
2273                            return Ok(Value::Bool(false));
2274                        };
2275
2276                        let valid_to = match valid_to_val {
2277                            Some(ref v) => match value_to_datetime_utc(v) {
2278                                Some(dt) => Some(dt),
2279                                None if v.is_null() => None,
2280                                None => {
2281                                    return Err(anyhow!(
2282                                        "Property {} must be a datetime value or null",
2283                                        end_prop
2284                                    ));
2285                                }
2286                            },
2287                            None => None,
2288                        };
2289
2290                        let is_valid = valid_from <= query_time
2291                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2292                        return Ok(Value::Bool(is_valid));
2293                    }
2294
2295                    // For all other functions, evaluate arguments then call helper
2296                    let mut evaluated_args = Vec::with_capacity(args.len());
2297                    for arg in args {
2298                        let mut val = this
2299                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2300                            .await?;
2301
2302                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2303                        // Functions like validAt() need access to properties like valid_from/valid_to.
2304                        if let Value::Map(ref mut map) = val {
2305                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2306                        }
2307
2308                        evaluated_args.push(val);
2309                    }
2310                    eval_scalar_function(
2311                        name,
2312                        &evaluated_args,
2313                        self.custom_function_registry.as_deref(),
2314                    )
2315                }
2316                Expr::Reduce {
2317                    accumulator,
2318                    init,
2319                    variable,
2320                    list,
2321                    expr,
2322                } => {
2323                    let mut acc = self
2324                        .evaluate_expr(init, row, prop_manager, params, ctx)
2325                        .await?;
2326                    let list_val = self
2327                        .evaluate_expr(list, row, prop_manager, params, ctx)
2328                        .await?;
2329
2330                    if let Value::List(items) = list_val {
2331                        for item in items {
2332                            // Create a temporary scope/row with accumulator and variable
2333                            // For simplicity in fallback executor, we can construct a new row map
2334                            // merging current row + new variables.
2335                            let mut scope = row.clone();
2336                            scope.insert(accumulator.clone(), acc.clone());
2337                            scope.insert(variable.clone(), item);
2338
2339                            acc = self
2340                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2341                                .await?;
2342                        }
2343                    } else {
2344                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2345                    }
2346                    Ok(acc)
2347                }
2348                Expr::ValidAt { .. } => {
2349                    // VALID_AT should have been transformed to a function call in the planner
2350                    Err(anyhow!(
2351                        "VALID_AT expression should have been transformed to function call in planner"
2352                    ))
2353                }
2354
2355                Expr::LabelCheck { expr, labels } => {
2356                    let val = this
2357                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2358                        .await?;
2359                    match &val {
2360                        Value::Null => Ok(Value::Null),
2361                        Value::Map(map) => {
2362                            // Check if this is an edge (has _eid) or node (has _vid)
2363                            let is_edge = map.contains_key("_eid")
2364                                || map.contains_key("_type_name")
2365                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2366
2367                            if is_edge {
2368                                // Edges have a single type
2369                                if labels.len() > 1 {
2370                                    return Ok(Value::Bool(false));
2371                                }
2372                                let label_to_check = &labels[0];
2373                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2374                                {
2375                                    t == label_to_check
2376                                } else if let Some(Value::String(t)) = map.get("_type") {
2377                                    t == label_to_check
2378                                } else {
2379                                    false
2380                                };
2381                                Ok(Value::Bool(has_type))
2382                            } else {
2383                                // Node: check all labels
2384                                let has_all = labels.iter().all(|label_to_check| {
2385                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2386                                        labels_arr
2387                                            .iter()
2388                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2389                                    } else {
2390                                        false
2391                                    }
2392                                });
2393                                Ok(Value::Bool(has_all))
2394                            }
2395                        }
2396                        _ => Ok(Value::Bool(false)),
2397                    }
2398                }
2399
2400                Expr::MapProjection { base, items } => {
2401                    let base_value = this
2402                        .evaluate_expr(base, row, prop_manager, params, ctx)
2403                        .await?;
2404
2405                    // Extract properties from the base object
2406                    let properties = match &base_value {
2407                        Value::Map(map) => map,
2408                        _ => {
2409                            return Err(anyhow!(
2410                                "Map projection requires object, got {:?}",
2411                                base_value
2412                            ));
2413                        }
2414                    };
2415
2416                    let mut result_map = HashMap::new();
2417
2418                    for item in items {
2419                        match item {
2420                            MapProjectionItem::Property(prop) => {
2421                                if let Some(value) = properties.get(prop.as_str()) {
2422                                    result_map.insert(prop.clone(), value.clone());
2423                                }
2424                            }
2425                            MapProjectionItem::AllProperties => {
2426                                // Include all properties except internal fields (those starting with _)
2427                                for (key, value) in properties.iter() {
2428                                    if !key.starts_with('_') {
2429                                        result_map.insert(key.clone(), value.clone());
2430                                    }
2431                                }
2432                            }
2433                            MapProjectionItem::LiteralEntry(key, expr) => {
2434                                let value = this
2435                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2436                                    .await?;
2437                                result_map.insert(key.clone(), value);
2438                            }
2439                            MapProjectionItem::Variable(var_name) => {
2440                                // Variable selector: include the value of the variable in the result
2441                                // e.g., person{.name, friend} includes the value of 'friend' variable
2442                                if let Some(value) = row.get(var_name.as_str()) {
2443                                    result_map.insert(var_name.clone(), value.clone());
2444                                }
2445                            }
2446                        }
2447                    }
2448
2449                    Ok(Value::Map(result_map))
2450                }
2451            }
2452        })
2453    }
2454
2455    pub(crate) fn execute_subplan<'a>(
2456        &'a self,
2457        plan: LogicalPlan,
2458        prop_manager: &'a PropertyManager,
2459        params: &'a HashMap<String, Value>,
2460        ctx: Option<&'a QueryContext>,
2461    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2462        Box::pin(async move {
2463            if let Some(ctx) = ctx {
2464                ctx.check_timeout()?;
2465            }
2466            match plan {
2467                LogicalPlan::Union { left, right, all } => {
2468                    self.execute_union(left, right, all, prop_manager, params, ctx)
2469                        .await
2470                }
2471                LogicalPlan::CreateVectorIndex {
2472                    config,
2473                    if_not_exists,
2474                } => {
2475                    if if_not_exists && self.index_exists_by_name(&config.name) {
2476                        return Ok(vec![]);
2477                    }
2478                    let idx_mgr = self.storage.index_manager();
2479                    idx_mgr.create_vector_index(config).await?;
2480                    Ok(vec![])
2481                }
2482                LogicalPlan::CreateFullTextIndex {
2483                    config,
2484                    if_not_exists,
2485                } => {
2486                    if if_not_exists && self.index_exists_by_name(&config.name) {
2487                        return Ok(vec![]);
2488                    }
2489                    let idx_mgr = self.storage.index_manager();
2490                    idx_mgr.create_fts_index(config).await?;
2491                    Ok(vec![])
2492                }
2493                LogicalPlan::CreateScalarIndex {
2494                    mut config,
2495                    if_not_exists,
2496                } => {
2497                    if if_not_exists && self.index_exists_by_name(&config.name) {
2498                        return Ok(vec![]);
2499                    }
2500
2501                    // Check for expression indexes - create generated columns
2502                    let mut modified_properties = Vec::new();
2503
2504                    for prop in &config.properties {
2505                        // Heuristic: if contains '(' and ')', it's an expression
2506                        if prop.contains('(') && prop.contains(')') {
2507                            let gen_col = SchemaManager::generated_column_name(prop);
2508
2509                            // Add generated property to schema
2510                            let sm = self.storage.schema_manager_arc();
2511                            if let Err(e) = sm.add_generated_property(
2512                                &config.label,
2513                                &gen_col,
2514                                DataType::String, // Default type for expressions
2515                                prop.clone(),
2516                            ) {
2517                                log::warn!("Failed to add generated property (might exist): {}", e);
2518                            }
2519
2520                            modified_properties.push(gen_col);
2521                        } else {
2522                            // Simple property - use as-is
2523                            modified_properties.push(prop.clone());
2524                        }
2525                    }
2526
2527                    config.properties = modified_properties;
2528
2529                    let idx_mgr = self.storage.index_manager();
2530                    idx_mgr.create_scalar_index(config).await?;
2531                    Ok(vec![])
2532                }
2533                LogicalPlan::CreateJsonFtsIndex {
2534                    config,
2535                    if_not_exists,
2536                } => {
2537                    if if_not_exists && self.index_exists_by_name(&config.name) {
2538                        return Ok(vec![]);
2539                    }
2540                    let idx_mgr = self.storage.index_manager();
2541                    idx_mgr.create_json_fts_index(config).await?;
2542                    Ok(vec![])
2543                }
2544                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2545                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2546                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2547                LogicalPlan::Vacuum => {
2548                    self.execute_vacuum().await?;
2549                    Ok(vec![])
2550                }
2551                LogicalPlan::Checkpoint => {
2552                    self.execute_checkpoint().await?;
2553                    Ok(vec![])
2554                }
2555                LogicalPlan::CopyTo {
2556                    label,
2557                    path,
2558                    format,
2559                    options,
2560                } => {
2561                    let count = self
2562                        .execute_copy_to(&label, &path, &format, &options)
2563                        .await?;
2564                    let mut result = HashMap::new();
2565                    result.insert("count".to_string(), Value::Int(count as i64));
2566                    Ok(vec![result])
2567                }
2568                LogicalPlan::CopyFrom {
2569                    label,
2570                    path,
2571                    format,
2572                    options,
2573                } => {
2574                    let count = self
2575                        .execute_copy_from(&label, &path, &format, &options)
2576                        .await?;
2577                    let mut result = HashMap::new();
2578                    result.insert("count".to_string(), Value::Int(count as i64));
2579                    Ok(vec![result])
2580                }
2581                LogicalPlan::CreateLabel(clause) => {
2582                    self.execute_create_label(clause).await?;
2583                    Ok(vec![])
2584                }
2585                LogicalPlan::CreateEdgeType(clause) => {
2586                    self.execute_create_edge_type(clause).await?;
2587                    Ok(vec![])
2588                }
2589                LogicalPlan::AlterLabel(clause) => {
2590                    self.execute_alter_label(clause).await?;
2591                    Ok(vec![])
2592                }
2593                LogicalPlan::AlterEdgeType(clause) => {
2594                    self.execute_alter_edge_type(clause).await?;
2595                    Ok(vec![])
2596                }
2597                LogicalPlan::DropLabel(clause) => {
2598                    self.execute_drop_label(clause).await?;
2599                    Ok(vec![])
2600                }
2601                LogicalPlan::DropEdgeType(clause) => {
2602                    self.execute_drop_edge_type(clause).await?;
2603                    Ok(vec![])
2604                }
2605                LogicalPlan::CreateConstraint(clause) => {
2606                    self.execute_create_constraint(clause).await?;
2607                    Ok(vec![])
2608                }
2609                LogicalPlan::DropConstraint(clause) => {
2610                    self.execute_drop_constraint(clause).await?;
2611                    Ok(vec![])
2612                }
2613                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2614                LogicalPlan::DropIndex { name, if_exists } => {
2615                    let idx_mgr = self.storage.index_manager();
2616                    match idx_mgr.drop_index(&name).await {
2617                        Ok(_) => Ok(vec![]),
2618                        Err(e) => {
2619                            if if_exists && e.to_string().contains("not found") {
2620                                Ok(vec![])
2621                            } else {
2622                                Err(e)
2623                            }
2624                        }
2625                    }
2626                }
2627                LogicalPlan::ShowIndexes { filter } => {
2628                    Ok(self.execute_show_indexes(filter.as_deref()))
2629                }
2630                // Scan/traverse nodes: delegate to DataFusion for data access,
2631                // then convert results to HashMaps for the fallback executor.
2632                LogicalPlan::Scan { .. }
2633                | LogicalPlan::FusedIndexScan { .. }
2634                | LogicalPlan::FusedIndexScanWrapped { .. }
2635                | LogicalPlan::ExtIdLookup { .. }
2636                | LogicalPlan::ScanAll { .. }
2637                | LogicalPlan::ScanMainByLabels { .. }
2638                | LogicalPlan::Traverse { .. }
2639                | LogicalPlan::TraverseMainByType { .. } => {
2640                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2641                    self.record_batches_to_rows(batches)
2642                }
2643                LogicalPlan::Filter {
2644                    input,
2645                    predicate,
2646                    optional_variables,
2647                } => {
2648                    let input_matches = self
2649                        .execute_subplan(*input, prop_manager, params, ctx)
2650                        .await?;
2651
2652                    tracing::debug!(
2653                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2654                        predicate,
2655                        input_matches.len(),
2656                        optional_variables
2657                    );
2658
2659                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2660                    // Group rows by non-optional variables, apply filter, and ensure
2661                    // at least one row per group (with NULLs if filter removes all).
2662                    if !optional_variables.is_empty() {
2663                        // Helper to check if a key belongs to an optional variable.
2664                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2665                        let is_optional_key = |k: &str| -> bool {
2666                            optional_variables.contains(k)
2667                                || optional_variables
2668                                    .iter()
2669                                    .any(|var| k.starts_with(&format!("{}.", var)))
2670                        };
2671
2672                        // Helper to check if a key is internal (should not affect grouping)
2673                        let is_internal_key =
2674                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2675
2676                        // Compute the key (non-optional, non-internal variables) for grouping
2677                        let non_optional_vars: Vec<String> = input_matches
2678                            .first()
2679                            .map(|row| {
2680                                row.keys()
2681                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2682                                    .cloned()
2683                                    .collect()
2684                            })
2685                            .unwrap_or_default();
2686
2687                        // Group rows by their non-optional variable values
2688                        let mut groups: std::collections::HashMap<
2689                            Vec<u8>,
2690                            Vec<HashMap<String, Value>>,
2691                        > = std::collections::HashMap::new();
2692
2693                        for row in &input_matches {
2694                            // Create a key from non-optional variable values
2695                            let key: Vec<u8> = non_optional_vars
2696                                .iter()
2697                                .map(|var| {
2698                                    row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2699                                })
2700                                .collect::<Vec<_>>()
2701                                .join("|")
2702                                .into_bytes();
2703
2704                            groups.entry(key).or_default().push(row.clone());
2705                        }
2706
2707                        let mut filtered = Vec::new();
2708                        for (_key, group_rows) in groups {
2709                            let mut group_passed = Vec::new();
2710
2711                            for row in &group_rows {
2712                                // If optional variables are already NULL, preserve the row
2713                                let has_null_optional = optional_variables.iter().any(|var| {
2714                                    // Check both "var" and "var._vid" style keys
2715                                    let direct_null =
2716                                        matches!(row.get(var), Some(Value::Null) | None);
2717                                    let prefixed_null = row
2718                                        .keys()
2719                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2720                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2721                                    direct_null || prefixed_null
2722                                });
2723
2724                                if has_null_optional {
2725                                    group_passed.push(row.clone());
2726                                    continue;
2727                                }
2728
2729                                let res = self
2730                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2731                                    .await?;
2732
2733                                if res.as_bool().unwrap_or(false) {
2734                                    group_passed.push(row.clone());
2735                                }
2736                            }
2737
2738                            if group_passed.is_empty() {
2739                                // No rows passed - emit one row with NULLs for optional variables
2740                                // Use the first row's non-optional values as a template
2741                                if let Some(template) = group_rows.first() {
2742                                    let mut null_row = HashMap::new();
2743                                    for (k, v) in template {
2744                                        if is_optional_key(k) {
2745                                            null_row.insert(k.clone(), Value::Null);
2746                                        } else {
2747                                            null_row.insert(k.clone(), v.clone());
2748                                        }
2749                                    }
2750                                    filtered.push(null_row);
2751                                }
2752                            } else {
2753                                filtered.extend(group_passed);
2754                            }
2755                        }
2756
2757                        tracing::debug!(
2758                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2759                            input_matches.len(),
2760                            filtered.len()
2761                        );
2762
2763                        return Ok(filtered);
2764                    }
2765
2766                    // Standard filter for non-OPTIONAL MATCH
2767                    let mut filtered = Vec::new();
2768                    for row in input_matches.iter() {
2769                        let res = self
2770                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2771                            .await?;
2772
2773                        let passes = res.as_bool().unwrap_or(false);
2774
2775                        if passes {
2776                            filtered.push(row.clone());
2777                        }
2778                    }
2779
2780                    tracing::debug!(
2781                        "Filter: {} input rows -> {} output rows",
2782                        input_matches.len(),
2783                        filtered.len()
2784                    );
2785
2786                    Ok(filtered)
2787                }
2788                LogicalPlan::ProcedureCall {
2789                    procedure_name,
2790                    arguments,
2791                    yield_items,
2792                } => {
2793                    let yield_names: Vec<String> =
2794                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2795                    let results = self
2796                        .execute_procedure(
2797                            &procedure_name,
2798                            &arguments,
2799                            &yield_names,
2800                            prop_manager,
2801                            params,
2802                            ctx,
2803                        )
2804                        .await?;
2805
2806                    // Handle aliasing: collect all original values first, then
2807                    // build the aliased row in one pass. This avoids issues when
2808                    // an alias matches another yield item's original name (e.g.,
2809                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2810                    // clobber the original "b" before it is renamed to "d").
2811                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2812                    if !has_aliases {
2813                        // No aliases (includes YIELD * which produces empty yield_items) —
2814                        // pass through the procedure output rows unchanged.
2815                        Ok(results)
2816                    } else {
2817                        let mut aliased_results = Vec::with_capacity(results.len());
2818                        for row in results {
2819                            let mut new_row = HashMap::new();
2820                            for (name, alias) in &yield_items {
2821                                let col_name = alias.as_ref().unwrap_or(name);
2822                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2823                                new_row.insert(col_name.clone(), val);
2824                            }
2825                            aliased_results.push(new_row);
2826                        }
2827                        Ok(aliased_results)
2828                    }
2829                }
2830                LogicalPlan::VectorKnn { .. } => {
2831                    unreachable!("VectorKnn is handled by DataFusion engine")
2832                }
2833                LogicalPlan::InvertedIndexLookup { .. } => {
2834                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2835                }
2836                LogicalPlan::Sort { input, order_by } => {
2837                    let rows = self
2838                        .execute_subplan(*input, prop_manager, params, ctx)
2839                        .await?;
2840                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2841                        .await
2842                }
2843                LogicalPlan::Limit { input, skip, fetch } => {
2844                    let rows = self
2845                        .execute_subplan(*input, prop_manager, params, ctx)
2846                        .await?;
2847                    let skip = skip.unwrap_or(0);
2848                    let take = fetch.unwrap_or(usize::MAX);
2849                    Ok(rows.into_iter().skip(skip).take(take).collect())
2850                }
2851                LogicalPlan::Aggregate {
2852                    input,
2853                    group_by,
2854                    aggregates,
2855                } => {
2856                    let rows = self
2857                        .execute_subplan(*input, prop_manager, params, ctx)
2858                        .await?;
2859                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2860                        .await
2861                }
2862                LogicalPlan::Window {
2863                    input,
2864                    window_exprs,
2865                } => {
2866                    let rows = self
2867                        .execute_subplan(*input, prop_manager, params, ctx)
2868                        .await?;
2869                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2870                        .await
2871                }
2872                LogicalPlan::Project { input, projections } => {
2873                    let matches = self
2874                        .execute_subplan(*input, prop_manager, params, ctx)
2875                        .await?;
2876                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2877                        .await
2878                }
2879                LogicalPlan::Distinct { input } => {
2880                    let rows = self
2881                        .execute_subplan(*input, prop_manager, params, ctx)
2882                        .await?;
2883                    let mut seen = std::collections::HashSet::new();
2884                    let mut result = Vec::new();
2885                    for row in rows {
2886                        let key = Self::canonical_row_key(&row);
2887                        if seen.insert(key) {
2888                            result.push(row);
2889                        }
2890                    }
2891                    Ok(result)
2892                }
2893                LogicalPlan::Unwind {
2894                    input,
2895                    expr,
2896                    variable,
2897                } => {
2898                    let input_rows = self
2899                        .execute_subplan(*input, prop_manager, params, ctx)
2900                        .await?;
2901                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2902                        .await
2903                }
2904                LogicalPlan::Apply {
2905                    input,
2906                    subquery,
2907                    input_filter,
2908                } => {
2909                    let input_rows = self
2910                        .execute_subplan(*input, prop_manager, params, ctx)
2911                        .await?;
2912                    self.execute_apply(
2913                        input_rows,
2914                        &subquery,
2915                        input_filter.as_ref(),
2916                        prop_manager,
2917                        params,
2918                        ctx,
2919                    )
2920                    .await
2921                }
2922                LogicalPlan::SubqueryCall { input, subquery } => {
2923                    let input_rows = self
2924                        .execute_subplan(*input, prop_manager, params, ctx)
2925                        .await?;
2926                    // Execute subquery for each input row (correlated)
2927                    // No input_filter for CALL { }
2928                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2929                        .await
2930                }
2931                LogicalPlan::RecursiveCTE {
2932                    cte_name,
2933                    initial,
2934                    recursive,
2935                } => {
2936                    self.execute_recursive_cte(
2937                        &cte_name,
2938                        *initial,
2939                        *recursive,
2940                        prop_manager,
2941                        params,
2942                        ctx,
2943                    )
2944                    .await
2945                }
2946                LogicalPlan::CrossJoin { left, right } => {
2947                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2948                        .await
2949                }
2950                LogicalPlan::Set { .. }
2951                | LogicalPlan::Remove { .. }
2952                | LogicalPlan::Merge { .. }
2953                | LogicalPlan::Create { .. }
2954                | LogicalPlan::CreateBatch { .. } => {
2955                    unreachable!("mutations are handled by DataFusion engine")
2956                }
2957                LogicalPlan::Delete { .. } => {
2958                    unreachable!("mutations are handled by DataFusion engine")
2959                }
2960                LogicalPlan::Copy {
2961                    target,
2962                    source,
2963                    is_export,
2964                    options,
2965                } => {
2966                    if is_export {
2967                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2968                            .await
2969                    } else {
2970                        self.execute_copy(&target, &source, &options, prop_manager)
2971                            .await
2972                    }
2973                }
2974                LogicalPlan::Backup {
2975                    destination,
2976                    options,
2977                } => self.execute_backup(&destination, &options).await,
2978                LogicalPlan::Explain { plan } => {
2979                    let plan_str = format!("{:#?}", plan);
2980                    let mut row = HashMap::new();
2981                    row.insert("plan".to_string(), Value::String(plan_str));
2982                    Ok(vec![row])
2983                }
2984                LogicalPlan::ShortestPath { .. } => {
2985                    unreachable!("ShortestPath is handled by DataFusion engine")
2986                }
2987                LogicalPlan::AllShortestPaths { .. } => {
2988                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2989                }
2990                LogicalPlan::Foreach { .. } => {
2991                    unreachable!("mutations are handled by DataFusion engine")
2992                }
2993                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2994                LogicalPlan::BindZeroLengthPath { .. } => {
2995                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2996                }
2997                LogicalPlan::BindPath { .. } => {
2998                    unreachable!("BindPath is handled by DataFusion engine")
2999                }
3000                LogicalPlan::QuantifiedPattern { .. } => {
3001                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
3002                }
3003                LogicalPlan::LocyProgram { .. }
3004                | LogicalPlan::LocyFold { .. }
3005                | LogicalPlan::LocyBestBy { .. }
3006                | LogicalPlan::LocyPriority { .. }
3007                | LogicalPlan::LocyDerivedScan { .. }
3008                | LogicalPlan::LocyProject { .. }
3009                | LogicalPlan::LocyModelInvoke { .. } => {
3010                    unreachable!("Locy operators are handled by DataFusion engine")
3011                }
3012            }
3013        })
3014    }
3015
3016    /// Execute a single plan from a FOREACH body with the given scope.
3017    ///
3018    /// Used by the DataFusion ForeachExec operator to delegate body clause
3019    /// execution back to the executor.
3020    #[expect(clippy::too_many_arguments)]
3021    pub(crate) async fn execute_foreach_body_plan(
3022        &self,
3023        plan: LogicalPlan,
3024        scope: &mut HashMap<String, Value>,
3025        writer: &uni_store::runtime::writer::Writer,
3026        prop_manager: &PropertyManager,
3027        params: &HashMap<String, Value>,
3028        ctx: Option<&QueryContext>,
3029        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3030    ) -> Result<()> {
3031        match plan {
3032            LogicalPlan::Set { items, .. } => {
3033                self.execute_set_items_locked(
3034                    &items,
3035                    scope,
3036                    writer,
3037                    prop_manager,
3038                    params,
3039                    ctx,
3040                    tx_l0,
3041                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3042                )
3043                .await?;
3044            }
3045            LogicalPlan::Remove { items, .. } => {
3046                self.execute_remove_items_locked(
3047                    &items,
3048                    scope,
3049                    writer,
3050                    prop_manager,
3051                    ctx,
3052                    tx_l0,
3053                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3054                )
3055                .await?;
3056            }
3057            LogicalPlan::Delete { items, detach, .. } => {
3058                for expr in &items {
3059                    let val = self
3060                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
3061                        .await?;
3062                    self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3063                        .await?;
3064                }
3065            }
3066            LogicalPlan::Create { pattern, .. } => {
3067                self.execute_create_pattern(
3068                    &pattern,
3069                    scope,
3070                    writer,
3071                    prop_manager,
3072                    params,
3073                    ctx,
3074                    tx_l0,
3075                    None,
3076                )
3077                .await?;
3078            }
3079            LogicalPlan::CreateBatch { patterns, .. } => {
3080                for pattern in &patterns {
3081                    self.execute_create_pattern(
3082                        pattern,
3083                        scope,
3084                        writer,
3085                        prop_manager,
3086                        params,
3087                        ctx,
3088                        tx_l0,
3089                        None,
3090                    )
3091                    .await?;
3092                }
3093            }
3094            LogicalPlan::Merge {
3095                pattern,
3096                on_match: _,
3097                on_create,
3098                ..
3099            } => {
3100                // Fold ON CREATE SET so a NOT-NULL property set only by
3101                // ON CREATE SET passes create-time validation (RC4); the
3102                // post-create SET below settles the final values.
3103                let seed_props = self
3104                    .on_create_seed_props(on_create.as_ref(), scope, prop_manager, params, ctx)
3105                    .await?;
3106                self.execute_create_pattern(
3107                    &pattern,
3108                    scope,
3109                    writer,
3110                    prop_manager,
3111                    params,
3112                    ctx,
3113                    tx_l0,
3114                    Some(&seed_props),
3115                )
3116                .await?;
3117                if let Some(on_create_clause) = on_create {
3118                    self.execute_set_items_locked(
3119                        &on_create_clause.items,
3120                        scope,
3121                        writer,
3122                        prop_manager,
3123                        params,
3124                        ctx,
3125                        tx_l0,
3126                        &crate::query::df_graph::mutation_common::Prefetch::default(),
3127                    )
3128                    .await?;
3129                }
3130            }
3131            LogicalPlan::Foreach {
3132                variable,
3133                list,
3134                body,
3135                ..
3136            } => {
3137                let list_val = self
3138                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
3139                    .await?;
3140                let items = match list_val {
3141                    Value::List(arr) => arr,
3142                    Value::Null => return Ok(()),
3143                    _ => return Err(anyhow!("FOREACH requires a list")),
3144                };
3145                for item in items {
3146                    let mut nested_scope = scope.clone();
3147                    nested_scope.insert(variable.clone(), item);
3148                    for nested_plan in &body {
3149                        Box::pin(self.execute_foreach_body_plan(
3150                            nested_plan.clone(),
3151                            &mut nested_scope,
3152                            writer,
3153                            prop_manager,
3154                            params,
3155                            ctx,
3156                            tx_l0,
3157                        ))
3158                        .await?;
3159                    }
3160                }
3161            }
3162            _ => {
3163                return Err(anyhow!(
3164                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3165                ));
3166            }
3167        }
3168        Ok(())
3169    }
3170
3171    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3172        let mut pairs: Vec<_> = row.iter().collect();
3173        pairs.sort_by_key(|(k, _)| *k);
3174
3175        pairs
3176            .into_iter()
3177            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3178            .collect::<Vec<_>>()
3179            .join("|")
3180    }
3181
3182    fn canonical_value_key(v: &Value) -> String {
3183        match v {
3184            Value::Null => "null".to_string(),
3185            Value::Bool(b) => format!("b:{b}"),
3186            Value::Int(i) => format!("n:{i}"),
3187            Value::Float(f) => {
3188                if f.is_nan() {
3189                    "nan".to_string()
3190                } else if f.is_infinite() {
3191                    if f.is_sign_positive() {
3192                        "inf:+".to_string()
3193                    } else {
3194                        "inf:-".to_string()
3195                    }
3196                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3197                    format!("n:{}", *f as i64)
3198                } else {
3199                    format!("f:{f}")
3200                }
3201            }
3202            Value::String(s) => {
3203                if let Some(k) = Self::temporal_string_key(s) {
3204                    format!("temporal:{k}")
3205                } else {
3206                    format!("s:{s}")
3207                }
3208            }
3209            Value::Bytes(b) => format!("bytes:{:?}", b),
3210            Value::List(items) => format!(
3211                "list:[{}]",
3212                items
3213                    .iter()
3214                    .map(Self::canonical_value_key)
3215                    .collect::<Vec<_>>()
3216                    .join(",")
3217            ),
3218            Value::Map(map) => {
3219                let mut pairs: Vec<_> = map.iter().collect();
3220                pairs.sort_by_key(|(k, _)| *k);
3221                format!(
3222                    "map:{{{}}}",
3223                    pairs
3224                        .into_iter()
3225                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3226                        .collect::<Vec<_>>()
3227                        .join(",")
3228                )
3229            }
3230            Value::Node(n) => {
3231                let mut labels = n.labels.clone();
3232                labels.sort();
3233                format!(
3234                    "node:{}:{}:{}",
3235                    n.vid.as_u64(),
3236                    labels.join(":"),
3237                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3238                )
3239            }
3240            Value::Edge(e) => format!(
3241                "edge:{}:{}:{}:{}:{}",
3242                e.eid.as_u64(),
3243                e.edge_type,
3244                e.src.as_u64(),
3245                e.dst.as_u64(),
3246                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3247            ),
3248            Value::Path(p) => format!(
3249                "path:nodes=[{}];edges=[{}]",
3250                p.nodes
3251                    .iter()
3252                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3253                    .collect::<Vec<_>>()
3254                    .join(","),
3255                p.edges
3256                    .iter()
3257                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3258                    .collect::<Vec<_>>()
3259                    .join(",")
3260            ),
3261            Value::Vector(vs) => format!("vec:{:?}", vs),
3262            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3263            _ => format!("{v:?}"),
3264        }
3265    }
3266
3267    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3268        match t {
3269            uni_common::TemporalValue::Date { days_since_epoch } => {
3270                format!("date:{days_since_epoch}")
3271            }
3272            uni_common::TemporalValue::LocalTime {
3273                nanos_since_midnight,
3274            } => format!("localtime:{nanos_since_midnight}"),
3275            uni_common::TemporalValue::Time {
3276                nanos_since_midnight,
3277                offset_seconds,
3278            } => {
3279                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3280                format!("time:{utc_nanos}")
3281            }
3282            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3283                format!("localdatetime:{nanos_since_epoch}")
3284            }
3285            uni_common::TemporalValue::DateTime {
3286                nanos_since_epoch, ..
3287            } => format!("datetime:{nanos_since_epoch}"),
3288            uni_common::TemporalValue::Duration {
3289                months,
3290                days,
3291                nanos,
3292            } => format!("duration:{months}:{days}:{nanos}"),
3293            uni_common::TemporalValue::Btic { lo, hi, meta } => {
3294                format!("btic:{lo}:{hi}:{meta}")
3295            }
3296        }
3297    }
3298
3299    fn temporal_string_key(s: &str) -> Option<String> {
3300        let fn_name = match classify_temporal(s)? {
3301            uni_common::TemporalType::Date => "DATE",
3302            uni_common::TemporalType::LocalTime => "LOCALTIME",
3303            uni_common::TemporalType::Time => "TIME",
3304            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3305            uni_common::TemporalType::DateTime => "DATETIME",
3306            uni_common::TemporalType::Duration => "DURATION",
3307            uni_common::TemporalType::Btic => return None, // BTIC uses dedicated parsing
3308        };
3309        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3310            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3311            _ => None,
3312        }
3313    }
3314
3315    /// Execute aggregate operation: GROUP BY + aggregate functions.
3316    /// Interval for timeout checks in aggregate loops.
3317    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3318
3319    pub(crate) async fn execute_aggregate(
3320        &self,
3321        rows: Vec<HashMap<String, Value>>,
3322        group_by: &[Expr],
3323        aggregates: &[Expr],
3324        prop_manager: &PropertyManager,
3325        params: &HashMap<String, Value>,
3326        ctx: Option<&QueryContext>,
3327    ) -> Result<Vec<HashMap<String, Value>>> {
3328        // CWE-400: Check timeout before aggregation
3329        if let Some(ctx) = ctx {
3330            ctx.check_timeout()?;
3331        }
3332
3333        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3334
3335        // Cypher semantics: aggregation without grouping keys returns one row even
3336        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3337        if rows.is_empty() {
3338            if group_by.is_empty() {
3339                let accs = Self::create_accumulators(aggregates);
3340                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3341                return Ok(vec![row]);
3342            }
3343            return Ok(vec![]);
3344        }
3345
3346        for (idx, row) in rows.into_iter().enumerate() {
3347            // Periodic timeout check during aggregation
3348            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3349                && let Some(ctx) = ctx
3350            {
3351                ctx.check_timeout()?;
3352            }
3353
3354            let key_vals = self
3355                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3356                .await?;
3357            // Build a canonical key so grouping follows Cypher value semantics
3358            // (e.g. temporal equality by instant, numeric normalization where applicable).
3359            let key_str = format!(
3360                "[{}]",
3361                key_vals
3362                    .iter()
3363                    .map(Self::canonical_value_key)
3364                    .collect::<Vec<_>>()
3365                    .join(",")
3366            );
3367
3368            let entry = groups
3369                .entry(key_str)
3370                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3371
3372            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3373                .await?;
3374        }
3375
3376        let results = groups
3377            .values()
3378            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3379            .collect();
3380
3381        Ok(results)
3382    }
3383
3384    pub(crate) async fn execute_window(
3385        &self,
3386        mut rows: Vec<HashMap<String, Value>>,
3387        window_exprs: &[Expr],
3388        _prop_manager: &PropertyManager,
3389        _params: &HashMap<String, Value>,
3390        ctx: Option<&QueryContext>,
3391    ) -> Result<Vec<HashMap<String, Value>>> {
3392        // CWE-400: Check timeout before window computation
3393        if let Some(ctx) = ctx {
3394            ctx.check_timeout()?;
3395        }
3396
3397        // If no rows or no window expressions, return as-is
3398        if rows.is_empty() || window_exprs.is_empty() {
3399            return Ok(rows);
3400        }
3401
3402        // Process each window function expression
3403        for window_expr in window_exprs {
3404            // Extract window function details
3405            let Expr::FunctionCall {
3406                name,
3407                args,
3408                window_spec: Some(window_spec),
3409                ..
3410            } = window_expr
3411            else {
3412                return Err(anyhow!(
3413                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3414                    window_expr
3415                ));
3416            };
3417
3418            let name_upper = name.to_uppercase();
3419
3420            // Validate it's a supported window function
3421            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3422                return Err(anyhow!(
3423                    "Unsupported window function: {}. Supported functions: {}",
3424                    name,
3425                    WINDOW_FUNCTIONS.join(", ")
3426                ));
3427            }
3428
3429            // Build partition groups based on PARTITION BY clause
3430            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3431
3432            for (row_idx, row) in rows.iter().enumerate() {
3433                // Evaluate partition key
3434                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3435                    // No partitioning - all rows in one partition
3436                    vec![]
3437                } else {
3438                    window_spec
3439                        .partition_by
3440                        .iter()
3441                        .map(|expr| self.evaluate_simple_expr(expr, row))
3442                        .collect::<Result<Vec<_>>>()?
3443                };
3444
3445                partition_map
3446                    .entry(partition_key)
3447                    .or_default()
3448                    .push(row_idx);
3449            }
3450
3451            // Process each partition
3452            for (_partition_key, row_indices) in partition_map.iter_mut() {
3453                // Sort rows within partition by ORDER BY clause
3454                if !window_spec.order_by.is_empty() {
3455                    row_indices.sort_by(|&a, &b| {
3456                        for sort_item in &window_spec.order_by {
3457                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3458                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3459
3460                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3461                                let cmp = Executor::compare_values(&va, &vb);
3462                                let cmp = if sort_item.ascending {
3463                                    cmp
3464                                } else {
3465                                    cmp.reverse()
3466                                };
3467                                if cmp != std::cmp::Ordering::Equal {
3468                                    return cmp;
3469                                }
3470                            }
3471                        }
3472                        std::cmp::Ordering::Equal
3473                    });
3474                }
3475
3476                // Compute window function values for this partition
3477                for (position, &row_idx) in row_indices.iter().enumerate() {
3478                    let window_value = match name_upper.as_str() {
3479                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3480                        "RANK" => {
3481                            // RANK: position (1-indexed) of first row in group of tied rows
3482                            let rank = if position == 0 {
3483                                1i64
3484                            } else {
3485                                let prev_row_idx = row_indices[position - 1];
3486                                let same_as_prev = self.rows_have_same_sort_keys(
3487                                    &window_spec.order_by,
3488                                    &rows,
3489                                    row_idx,
3490                                    prev_row_idx,
3491                                );
3492
3493                                if same_as_prev {
3494                                    // Walk backwards to find where this group started
3495                                    let mut group_start = position - 1;
3496                                    while group_start > 0 {
3497                                        let curr_idx = row_indices[group_start];
3498                                        let prev_idx = row_indices[group_start - 1];
3499                                        if !self.rows_have_same_sort_keys(
3500                                            &window_spec.order_by,
3501                                            &rows,
3502                                            curr_idx,
3503                                            prev_idx,
3504                                        ) {
3505                                            break;
3506                                        }
3507                                        group_start -= 1;
3508                                    }
3509                                    (group_start + 1) as i64
3510                                } else {
3511                                    (position + 1) as i64
3512                                }
3513                            };
3514                            Value::from(rank)
3515                        }
3516                        "DENSE_RANK" => {
3517                            // Dense rank: continuous ranking without gaps
3518                            let mut dense_rank = 1i64;
3519                            for i in 0..position {
3520                                let curr_idx = row_indices[i + 1];
3521                                let prev_idx = row_indices[i];
3522                                if !self.rows_have_same_sort_keys(
3523                                    &window_spec.order_by,
3524                                    &rows,
3525                                    curr_idx,
3526                                    prev_idx,
3527                                ) {
3528                                    dense_rank += 1;
3529                                }
3530                            }
3531                            Value::from(dense_rank)
3532                        }
3533                        "LAG" => {
3534                            let (value_expr, offset, default_value) =
3535                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3536
3537                            if position >= offset {
3538                                let target_idx = row_indices[position - offset];
3539                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3540                            } else {
3541                                default_value
3542                            }
3543                        }
3544                        "LEAD" => {
3545                            let (value_expr, offset, default_value) =
3546                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3547
3548                            if position + offset < row_indices.len() {
3549                                let target_idx = row_indices[position + offset];
3550                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3551                            } else {
3552                                default_value
3553                            }
3554                        }
3555                        "NTILE" => {
3556                            // Extract num_buckets argument: NTILE(num_buckets)
3557                            let num_buckets_expr = args.first().ok_or_else(|| {
3558                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3559                            })?;
3560                            let num_buckets_val =
3561                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3562                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3563                                anyhow!(
3564                                    "NTILE argument must be an integer, got: {:?}",
3565                                    num_buckets_val
3566                                )
3567                            })?;
3568
3569                            if num_buckets <= 0 {
3570                                return Err(anyhow!(
3571                                    "NTILE bucket count must be positive, got: {}",
3572                                    num_buckets
3573                                ));
3574                            }
3575
3576                            let num_buckets = num_buckets as usize;
3577                            let partition_size = row_indices.len();
3578
3579                            // Calculate bucket assignment using standard algorithm
3580                            // For N rows and B buckets:
3581                            // - Base size: N / B
3582                            // - Extra rows: N % B (go to first buckets)
3583                            let base_size = partition_size / num_buckets;
3584                            let extra_rows = partition_size % num_buckets;
3585
3586                            // Determine bucket for current row
3587                            let bucket = if position < extra_rows * (base_size + 1) {
3588                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3589                                position / (base_size + 1) + 1
3590                            } else {
3591                                // Row is in one of the normal-sized buckets
3592                                let adjusted_position = position - extra_rows * (base_size + 1);
3593                                extra_rows + (adjusted_position / base_size) + 1
3594                            };
3595
3596                            Value::from(bucket as i64)
3597                        }
3598                        "FIRST_VALUE" => {
3599                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3600                            let value_expr = args.first().ok_or_else(|| {
3601                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3602                            })?;
3603
3604                            // Get the first row in the partition (after ordering)
3605                            if row_indices.is_empty() {
3606                                Value::Null
3607                            } else {
3608                                let first_idx = row_indices[0];
3609                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3610                            }
3611                        }
3612                        "LAST_VALUE" => {
3613                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3614                            let value_expr = args.first().ok_or_else(|| {
3615                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3616                            })?;
3617
3618                            // Get the last row in the partition (after ordering)
3619                            if row_indices.is_empty() {
3620                                Value::Null
3621                            } else {
3622                                let last_idx = row_indices[row_indices.len() - 1];
3623                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3624                            }
3625                        }
3626                        "NTH_VALUE" => {
3627                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3628                            if args.len() != 2 {
3629                                return Err(anyhow!(
3630                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3631                                ));
3632                            }
3633
3634                            let value_expr = &args[0];
3635                            let n_expr = &args[1];
3636
3637                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3638                            let n = n_val.as_i64().ok_or_else(|| {
3639                                anyhow!(
3640                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3641                                    n_val
3642                                )
3643                            })?;
3644
3645                            if n <= 0 {
3646                                return Err(anyhow!(
3647                                    "NTH_VALUE position must be positive, got: {}",
3648                                    n
3649                                ));
3650                            }
3651
3652                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3653                            if nth_index < row_indices.len() {
3654                                let nth_idx = row_indices[nth_index];
3655                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3656                            } else {
3657                                Value::Null
3658                            }
3659                        }
3660                        _ => unreachable!("Window function {} already validated", name),
3661                    };
3662
3663                    // Add window function result to row
3664                    // Use the window expression's string representation as the column name
3665                    let col_name = window_expr.to_string_repr();
3666                    rows[row_idx].insert(col_name, window_value);
3667                }
3668            }
3669        }
3670
3671        Ok(rows)
3672    }
3673
3674    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3675    ///
3676    /// Uses `&self` for consistency with other evaluation methods, though it only
3677    /// recurses for property access.
3678    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3679        match expr {
3680            Expr::Variable(name) => row
3681                .get(name)
3682                .cloned()
3683                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3684            Expr::Property(base, prop) => {
3685                let base_val = self.evaluate_simple_expr(base, row)?;
3686                if let Value::Map(map) = base_val {
3687                    map.get(prop)
3688                        .cloned()
3689                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3690                } else {
3691                    Err(anyhow!("Cannot access property on non-object"))
3692                }
3693            }
3694            Expr::Literal(lit) => Ok(lit.to_value()),
3695            _ => Err(anyhow!(
3696                "Unsupported expression in window function: {:?}",
3697                expr
3698            )),
3699        }
3700    }
3701
3702    /// Check if two rows have matching sort keys for ranking functions.
3703    fn rows_have_same_sort_keys(
3704        &self,
3705        order_by: &[uni_cypher::ast::SortItem],
3706        rows: &[HashMap<String, Value>],
3707        idx_a: usize,
3708        idx_b: usize,
3709    ) -> bool {
3710        order_by.iter().all(|sort_item| {
3711            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3712            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3713            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3714        })
3715    }
3716
3717    /// Extract offset and default value for LAG/LEAD window functions.
3718    fn extract_lag_lead_params<'a>(
3719        &self,
3720        func_name: &str,
3721        args: &'a [Expr],
3722        row: &HashMap<String, Value>,
3723    ) -> Result<(&'a Expr, usize, Value)> {
3724        let value_expr = args.first().ok_or_else(|| {
3725            anyhow!(
3726                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3727                func_name,
3728                func_name
3729            )
3730        })?;
3731
3732        let offset = if let Some(offset_expr) = args.get(1) {
3733            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3734            offset_val.as_i64().ok_or_else(|| {
3735                anyhow!(
3736                    "{} offset must be an integer, got: {:?}",
3737                    func_name,
3738                    offset_val
3739                )
3740            })? as usize
3741        } else {
3742            1
3743        };
3744
3745        let default_value = if let Some(default_expr) = args.get(2) {
3746            self.evaluate_simple_expr(default_expr, row)?
3747        } else {
3748            Value::Null
3749        };
3750
3751        Ok((value_expr, offset, default_value))
3752    }
3753
3754    /// Evaluate group-by key expressions for a row.
3755    pub(crate) async fn evaluate_group_keys(
3756        &self,
3757        group_by: &[Expr],
3758        row: &HashMap<String, Value>,
3759        prop_manager: &PropertyManager,
3760        params: &HashMap<String, Value>,
3761        ctx: Option<&QueryContext>,
3762    ) -> Result<Vec<Value>> {
3763        let mut key_vals = Vec::new();
3764        for expr in group_by {
3765            key_vals.push(
3766                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3767                    .await?,
3768            );
3769        }
3770        Ok(key_vals)
3771    }
3772
3773    /// Update accumulators with values from the current row.
3774    pub(crate) async fn update_accumulators(
3775        &self,
3776        accs: &mut [Accumulator],
3777        aggregates: &[Expr],
3778        row: &HashMap<String, Value>,
3779        prop_manager: &PropertyManager,
3780        params: &HashMap<String, Value>,
3781        ctx: Option<&QueryContext>,
3782    ) -> Result<()> {
3783        for (i, agg_expr) in aggregates.iter().enumerate() {
3784            if let Expr::FunctionCall { args, .. } = agg_expr {
3785                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3786                let val = if is_wildcard {
3787                    Value::Null
3788                } else {
3789                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3790                        .await?
3791                };
3792                accs[i].update(&val, is_wildcard);
3793            }
3794        }
3795        Ok(())
3796    }
3797
3798    /// Execute sort operation with ORDER BY clauses.
3799    pub(crate) async fn execute_recursive_cte(
3800        &self,
3801        cte_name: &str,
3802        initial: LogicalPlan,
3803        recursive: LogicalPlan,
3804        prop_manager: &PropertyManager,
3805        params: &HashMap<String, Value>,
3806        ctx: Option<&QueryContext>,
3807    ) -> Result<Vec<HashMap<String, Value>>> {
3808        use std::collections::HashSet;
3809
3810        // Helper to create a stable key for cycle detection.
3811        // Uses sorted keys to ensure consistent ordering.
3812        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3813            let mut pairs: Vec<_> = row.iter().collect();
3814            pairs.sort_by(|a, b| a.0.cmp(b.0));
3815            format!("{pairs:?}")
3816        }
3817
3818        // 1. Execute Anchor
3819        let mut working_table = self
3820            .execute_subplan(initial, prop_manager, params, ctx)
3821            .await?;
3822        let mut result_table = working_table.clone();
3823
3824        // Track seen rows for cycle detection
3825        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3826
3827        // 2. Loop
3828        // Safety: Max iterations to prevent infinite loop
3829        let max_iterations = self.config.max_recursive_cte_iterations;
3830        for _iteration in 0..max_iterations {
3831            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3832            if let Some(ctx) = ctx {
3833                ctx.check_timeout()?;
3834            }
3835
3836            if working_table.is_empty() {
3837                break;
3838            }
3839
3840            // Bind working table to CTE name in params
3841            let working_val = Value::List(
3842                working_table
3843                    .iter()
3844                    .map(|row| {
3845                        if row.len() == 1 {
3846                            row.values().next().unwrap().clone()
3847                        } else {
3848                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3849                        }
3850                    })
3851                    .collect(),
3852            );
3853
3854            let mut next_params = params.clone();
3855            next_params.insert(cte_name.to_string(), working_val);
3856
3857            // Execute recursive part
3858            let next_result = self
3859                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3860                .await?;
3861
3862            if next_result.is_empty() {
3863                break;
3864            }
3865
3866            // Filter out already-seen rows (cycle detection)
3867            let new_rows: Vec<_> = next_result
3868                .into_iter()
3869                .filter(|row| {
3870                    let key = row_key(row);
3871                    seen.insert(key) // Returns false if already present
3872                })
3873                .collect();
3874
3875            if new_rows.is_empty() {
3876                // All results were cycles - terminate
3877                break;
3878            }
3879
3880            result_table.extend(new_rows.clone());
3881            working_table = new_rows;
3882        }
3883
3884        // Output accumulated results as a variable
3885        let final_list = Value::List(
3886            result_table
3887                .into_iter()
3888                .map(|row| {
3889                    // If the CTE returns a single column and we want to treat it as a list of values?
3890                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3891                    // Cypher LISTs usually contain values.
3892                    // If the row has 1 column, maybe unwrap?
3893                    // But SQL CTEs are tables.
3894                    // Let's stick to List<Map> for consistency with how we pass it in.
3895                    // UNLESS the user extracts it.
3896                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3897                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3898                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3899                    // If row has 1 col, users expect `[val, val]`.
3900                    if row.len() == 1 {
3901                        row.values().next().unwrap().clone()
3902                    } else {
3903                        Value::Map(row.into_iter().collect())
3904                    }
3905                })
3906                .collect(),
3907        );
3908
3909        let mut final_row = HashMap::new();
3910        final_row.insert(cte_name.to_string(), final_list);
3911        Ok(vec![final_row])
3912    }
3913
3914    /// Interval for timeout checks in sort loops.
3915    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3916
3917    pub(crate) async fn execute_sort(
3918        &self,
3919        rows: Vec<HashMap<String, Value>>,
3920        order_by: &[uni_cypher::ast::SortItem],
3921        prop_manager: &PropertyManager,
3922        params: &HashMap<String, Value>,
3923        ctx: Option<&QueryContext>,
3924    ) -> Result<Vec<HashMap<String, Value>>> {
3925        // CWE-400: Check timeout before potentially expensive sort
3926        if let Some(ctx) = ctx {
3927            ctx.check_timeout()?;
3928        }
3929
3930        let mut rows_with_keys = Vec::with_capacity(rows.len());
3931        for (idx, row) in rows.into_iter().enumerate() {
3932            // Periodic timeout check during key extraction
3933            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3934                && let Some(ctx) = ctx
3935            {
3936                ctx.check_timeout()?;
3937            }
3938
3939            let mut keys = Vec::new();
3940            for item in order_by {
3941                let val = row
3942                    .get(&item.expr.to_string_repr())
3943                    .cloned()
3944                    .unwrap_or(Value::Null);
3945                let val = if val.is_null() {
3946                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3947                        .await
3948                        .unwrap_or(Value::Null)
3949                } else {
3950                    val
3951                };
3952                keys.push(val);
3953            }
3954            rows_with_keys.push((row, keys));
3955        }
3956
3957        // Check timeout again before synchronous sort (can't be interrupted)
3958        if let Some(ctx) = ctx {
3959            ctx.check_timeout()?;
3960        }
3961
3962        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3963
3964        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3965    }
3966
3967    /// Create accumulators for aggregate expressions.
3968    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3969        aggregates
3970            .iter()
3971            .map(|expr| {
3972                if let Expr::FunctionCall { name, distinct, .. } = expr {
3973                    Accumulator::new(name, *distinct)
3974                } else {
3975                    Accumulator::new("COUNT", false)
3976                }
3977            })
3978            .collect()
3979    }
3980
3981    /// Build result row from group-by keys and accumulators.
3982    pub(crate) fn build_aggregate_result(
3983        group_by: &[Expr],
3984        aggregates: &[Expr],
3985        key_vals: &[Value],
3986        accs: &[Accumulator],
3987    ) -> HashMap<String, Value> {
3988        let mut res_row = HashMap::new();
3989        for (i, expr) in group_by.iter().enumerate() {
3990            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3991        }
3992        for (i, expr) in aggregates.iter().enumerate() {
3993            // Use aggregate_column_name to ensure consistency with planner
3994            let col_name = crate::query::planner::aggregate_column_name(expr);
3995            res_row.insert(col_name, accs[i].finish());
3996        }
3997        res_row
3998    }
3999
4000    /// Compare and return ordering for sort operation.
4001    pub(crate) fn compare_sort_keys(
4002        a_keys: &[Value],
4003        b_keys: &[Value],
4004        order_by: &[uni_cypher::ast::SortItem],
4005    ) -> std::cmp::Ordering {
4006        for (i, item) in order_by.iter().enumerate() {
4007            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
4008            if order != std::cmp::Ordering::Equal {
4009                return if item.ascending {
4010                    order
4011                } else {
4012                    order.reverse()
4013                };
4014            }
4015        }
4016        std::cmp::Ordering::Equal
4017    }
4018
4019    /// Executes BACKUP command to local or cloud storage.
4020    ///
4021    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
4022    pub(crate) async fn execute_backup(
4023        &self,
4024        destination: &str,
4025        _options: &HashMap<String, Value>,
4026    ) -> Result<Vec<HashMap<String, Value>>> {
4027        // 1. Flush L0
4028        if let Some(writer_arc) = &self.writer {
4029            let writer: &uni_store::Writer = writer_arc.as_ref();
4030            writer.flush_to_l1(None).await?;
4031        }
4032
4033        // 2. Snapshot
4034        let snapshot_manager = self.storage.snapshot_manager();
4035        let snapshot = snapshot_manager
4036            .load_latest_snapshot()
4037            .await?
4038            .ok_or_else(|| anyhow!("No snapshot found"))?;
4039
4040        // 3. Copy files - cloud or local path
4041        if is_cloud_url(destination) {
4042            self.backup_to_cloud(destination, &snapshot.snapshot_id)
4043                .await?;
4044        } else {
4045            // Validate local destination path against sandbox
4046            let validated_dest = self.validate_path(destination)?;
4047            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4048                .await?;
4049        }
4050
4051        let mut res = HashMap::new();
4052        res.insert(
4053            "status".to_string(),
4054            Value::String("Backup completed".to_string()),
4055        );
4056        res.insert(
4057            "snapshot_id".to_string(),
4058            Value::String(snapshot.snapshot_id),
4059        );
4060        Ok(vec![res])
4061    }
4062
4063    /// Backs up database to a local filesystem destination.
4064    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4065        let source_path = std::path::Path::new(self.storage.base_path());
4066
4067        if !dest_path.exists() {
4068            std::fs::create_dir_all(dest_path)?;
4069        }
4070
4071        // Recursive copy (local to local)
4072        if source_path.exists() {
4073            Self::copy_dir_all(source_path, dest_path)?;
4074        }
4075
4076        // Copy schema to destination/catalog/schema.json
4077        let schema_manager = self.storage.schema_manager();
4078        let dest_catalog = dest_path.join("catalog");
4079        if !dest_catalog.exists() {
4080            std::fs::create_dir_all(&dest_catalog)?;
4081        }
4082
4083        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4084        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4085
4086        Ok(())
4087    }
4088
4089    /// Backs up database to a cloud storage destination.
4090    ///
4091    /// Streams data from source to destination, supporting cross-cloud backups.
4092    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4093        use object_store::ObjectStore;
4094        use object_store::ObjectStoreExt;
4095        use object_store::local::LocalFileSystem;
4096        use object_store::path::Path as ObjPath;
4097
4098        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4099        let source_path = std::path::Path::new(self.storage.base_path());
4100
4101        // Create local store for source, coerced to dyn ObjectStore
4102        let src_store: Arc<dyn ObjectStore> =
4103            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4104
4105        // Copy catalog/ directory
4106        let catalog_src = ObjPath::from("catalog");
4107        let catalog_dst = if dest_prefix.as_ref().is_empty() {
4108            ObjPath::from("catalog")
4109        } else {
4110            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4111        };
4112        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4113
4114        // Copy storage/ directory
4115        let storage_src = ObjPath::from("storage");
4116        let storage_dst = if dest_prefix.as_ref().is_empty() {
4117            ObjPath::from("storage")
4118        } else {
4119            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4120        };
4121        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4122
4123        // Ensure schema is present at canonical catalog location.
4124        let schema_manager = self.storage.schema_manager();
4125        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4126        let schema_path = if dest_prefix.as_ref().is_empty() {
4127            ObjPath::from("catalog/schema.json")
4128        } else {
4129            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4130        };
4131        dest_store
4132            .put(&schema_path, bytes::Bytes::from(schema_content).into())
4133            .await?;
4134
4135        Ok(())
4136    }
4137
4138    /// Maximum directory depth for backup operations.
4139    ///
4140    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
4141    /// excessively deep directory structures.
4142    const MAX_BACKUP_DEPTH: usize = 100;
4143
4144    /// Maximum file count for backup operations.
4145    ///
4146    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
4147    /// long-running operations from malicious or unexpectedly large directories.
4148    const MAX_BACKUP_FILES: usize = 100_000;
4149
4150    /// Recursively copies a directory with security limits.
4151    ///
4152    /// # Security
4153    ///
4154    /// - **CWE-674**: Depth limit prevents stack overflow
4155    /// - **CWE-400**: File count limit prevents resource exhaustion
4156    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
4157    pub(crate) fn copy_dir_all(
4158        src: &std::path::Path,
4159        dst: &std::path::Path,
4160    ) -> std::io::Result<()> {
4161        let mut file_count = 0usize;
4162        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4163    }
4164
4165    /// Internal implementation with depth and file count tracking.
4166    pub(crate) fn copy_dir_all_impl(
4167        src: &std::path::Path,
4168        dst: &std::path::Path,
4169        depth: usize,
4170        file_count: &mut usize,
4171    ) -> std::io::Result<()> {
4172        if depth >= Self::MAX_BACKUP_DEPTH {
4173            return Err(std::io::Error::new(
4174                std::io::ErrorKind::InvalidInput,
4175                format!(
4176                    "Maximum backup depth {} exceeded at {:?}",
4177                    Self::MAX_BACKUP_DEPTH,
4178                    src
4179                ),
4180            ));
4181        }
4182
4183        std::fs::create_dir_all(dst)?;
4184
4185        for entry in std::fs::read_dir(src)? {
4186            if *file_count >= Self::MAX_BACKUP_FILES {
4187                return Err(std::io::Error::new(
4188                    std::io::ErrorKind::InvalidInput,
4189                    format!(
4190                        "Maximum backup file count {} exceeded",
4191                        Self::MAX_BACKUP_FILES
4192                    ),
4193                ));
4194            }
4195            *file_count += 1;
4196
4197            let entry = entry?;
4198            let metadata = entry.metadata()?;
4199
4200            // Skip symlinks to prevent loops and traversal attacks
4201            if metadata.file_type().is_symlink() {
4202                // Silently skip - logging would require tracing dependency
4203                continue;
4204            }
4205
4206            let dst_path = dst.join(entry.file_name());
4207            if metadata.is_dir() {
4208                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4209            } else {
4210                std::fs::copy(entry.path(), dst_path)?;
4211            }
4212        }
4213        Ok(())
4214    }
4215
4216    pub(crate) async fn execute_copy(
4217        &self,
4218        target: &str,
4219        source: &str,
4220        options: &HashMap<String, Value>,
4221        prop_manager: &PropertyManager,
4222    ) -> Result<Vec<HashMap<String, Value>>> {
4223        let format = options
4224            .get("format")
4225            .and_then(|v| v.as_str())
4226            .unwrap_or_else(|| {
4227                if source.ends_with(".parquet") {
4228                    "parquet"
4229                } else {
4230                    "csv"
4231                }
4232            });
4233
4234        match format.to_lowercase().as_str() {
4235            "csv" => self.execute_csv_import(target, source, options).await,
4236            "parquet" => {
4237                self.execute_parquet_import(target, source, options, prop_manager)
4238                    .await
4239            }
4240            _ => Err(anyhow!("Unsupported format: {}", format)),
4241        }
4242    }
4243
4244    pub(crate) async fn execute_csv_import(
4245        &self,
4246        target: &str,
4247        source: &str,
4248        options: &HashMap<String, Value>,
4249    ) -> Result<Vec<HashMap<String, Value>>> {
4250        // Validate source path against sandbox
4251        let validated_source = self.validate_path(source)?;
4252
4253        let writer_lock = self
4254            .writer
4255            .as_ref()
4256            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4257
4258        let schema = self.storage.schema_manager().schema();
4259
4260        // 1. Determine if target is Label or EdgeType
4261        let label_meta = schema.labels.get(target);
4262        let edge_meta = schema.edge_types.get(target);
4263
4264        if label_meta.is_none() && edge_meta.is_none() {
4265            return Err(anyhow!("Target '{}' not found in schema", target));
4266        }
4267
4268        // 2. Open CSV
4269        let delimiter_str = options
4270            .get("delimiter")
4271            .and_then(|v| v.as_str())
4272            .unwrap_or(",");
4273        let delimiter = if delimiter_str.is_empty() {
4274            b','
4275        } else {
4276            delimiter_str.as_bytes()[0]
4277        };
4278        let has_header = options
4279            .get("header")
4280            .and_then(|v| v.as_bool())
4281            .unwrap_or(true);
4282
4283        let mut rdr = csv::ReaderBuilder::new()
4284            .delimiter(delimiter)
4285            .has_headers(has_header)
4286            .from_path(&validated_source)?;
4287
4288        let headers = rdr.headers()?.clone();
4289        let mut count = 0;
4290
4291        let writer: &uni_store::Writer = writer_lock.as_ref();
4292
4293        // Chunked VID/EID refill for streaming CSV: amortizes the IdAllocator
4294        // mutex over `CSV_ID_CHUNK` rows. End-of-iteration may waste up to
4295        // CSV_ID_CHUNK-1 IDs — harmless in the u64 space.
4296        const CSV_ID_CHUNK: usize = 256;
4297
4298        if label_meta.is_some() {
4299            let target_props = schema
4300                .properties
4301                .get(target)
4302                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4303
4304            let mut vid_chunk: std::collections::VecDeque<Vid> =
4305                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4306            for result in rdr.records() {
4307                let record = result?;
4308                let mut props = HashMap::new();
4309
4310                for (i, header) in headers.iter().enumerate() {
4311                    if let Some(val_str) = record.get(i)
4312                        && let Some(prop_meta) = target_props.get(header)
4313                    {
4314                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4315                        props.insert(header.to_string(), val);
4316                    }
4317                }
4318
4319                if vid_chunk.is_empty() {
4320                    vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4321                }
4322                let vid = vid_chunk.pop_front().unwrap();
4323                writer
4324                    .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4325                    .await?;
4326                count += 1;
4327            }
4328        } else if let Some(meta) = edge_meta {
4329            let type_id = meta.id;
4330            let target_props = schema
4331                .properties
4332                .get(target)
4333                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4334
4335            // For edges, we need src and dst VIDs.
4336            // Expecting columns '_src' and '_dst' or as specified in options.
4337            let src_col = options
4338                .get("src_col")
4339                .and_then(|v| v.as_str())
4340                .unwrap_or("_src");
4341            let dst_col = options
4342                .get("dst_col")
4343                .and_then(|v| v.as_str())
4344                .unwrap_or("_dst");
4345
4346            let mut eid_chunk: std::collections::VecDeque<Eid> =
4347                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4348            for result in rdr.records() {
4349                let record = result?;
4350                let mut props = HashMap::new();
4351                let mut src_vid = None;
4352                let mut dst_vid = None;
4353
4354                for (i, header) in headers.iter().enumerate() {
4355                    if let Some(val_str) = record.get(i) {
4356                        if header == src_col {
4357                            src_vid =
4358                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4359                        } else if header == dst_col {
4360                            dst_vid =
4361                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4362                        } else if let Some(prop_meta) = target_props.get(header) {
4363                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4364                            props.insert(header.to_string(), val);
4365                        }
4366                    }
4367                }
4368
4369                let src =
4370                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4371                let dst = dst_vid
4372                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4373
4374                if eid_chunk.is_empty() {
4375                    eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4376                }
4377                let eid = eid_chunk.pop_front().unwrap();
4378                writer
4379                    .insert_edge(
4380                        src,
4381                        dst,
4382                        type_id,
4383                        eid,
4384                        props,
4385                        Some(target.to_string()),
4386                        None,
4387                    )
4388                    .await?;
4389                count += 1;
4390            }
4391        }
4392
4393        let mut res = HashMap::new();
4394        res.insert("count".to_string(), Value::Int(count as i64));
4395        Ok(vec![res])
4396    }
4397
4398    /// Imports data from Parquet file to a label or edge type.
4399    ///
4400    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4401    pub(crate) async fn execute_parquet_import(
4402        &self,
4403        target: &str,
4404        source: &str,
4405        options: &HashMap<String, Value>,
4406        _prop_manager: &PropertyManager,
4407    ) -> Result<Vec<HashMap<String, Value>>> {
4408        let writer_lock = self
4409            .writer
4410            .as_ref()
4411            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4412
4413        let schema = self.storage.schema_manager().schema();
4414
4415        // 1. Determine if target is Label or EdgeType
4416        let label_meta = schema.labels.get(target);
4417        let edge_meta = schema.edge_types.get(target);
4418
4419        if label_meta.is_none() && edge_meta.is_none() {
4420            return Err(anyhow!("Target '{}' not found in schema", target));
4421        }
4422
4423        // 2. Open Parquet - support both local and cloud URLs
4424        let reader = if is_cloud_url(source) {
4425            self.open_parquet_from_cloud(source).await?
4426        } else {
4427            // Validate local source path against sandbox
4428            let validated_source = self.validate_path(source)?;
4429            let file = std::fs::File::open(&validated_source)?;
4430            let builder =
4431                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4432            builder.build()?
4433        };
4434        let mut reader = reader;
4435
4436        let mut count = 0;
4437        let writer: &uni_store::Writer = writer_lock.as_ref();
4438
4439        if label_meta.is_some() {
4440            let target_props = schema
4441                .properties
4442                .get(target)
4443                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4444
4445            for batch in reader.by_ref() {
4446                let batch = batch?;
4447                let num_rows = batch.num_rows();
4448                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
4449                let vids = writer.allocate_vids(num_rows).await?;
4450                for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4451                    let mut props = HashMap::new();
4452                    for field in batch.schema().fields() {
4453                        let name = field.name();
4454                        if target_props.contains_key(name) {
4455                            let col = batch.column_by_name(name).unwrap();
4456                            if !col.is_null(row) {
4457                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4458                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4459                                let val =
4460                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4461                                props.insert(name.clone(), val);
4462                            }
4463                        }
4464                    }
4465                    writer
4466                        .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4467                        .await?;
4468                    count += 1;
4469                }
4470            }
4471        } else if let Some(meta) = edge_meta {
4472            let type_id = meta.id;
4473            let target_props = schema
4474                .properties
4475                .get(target)
4476                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4477
4478            let src_col = options
4479                .get("src_col")
4480                .and_then(|v| v.as_str())
4481                .unwrap_or("_src");
4482            let dst_col = options
4483                .get("dst_col")
4484                .and_then(|v| v.as_str())
4485                .unwrap_or("_dst");
4486
4487            for batch in reader {
4488                let batch = batch?;
4489                let num_rows = batch.num_rows();
4490                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
4491                let eids = writer.allocate_eids(num_rows).await?;
4492                for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4493                    let mut props = HashMap::new();
4494                    let mut src_vid = None;
4495                    let mut dst_vid = None;
4496
4497                    for field in batch.schema().fields() {
4498                        let name = field.name();
4499                        let col = batch.column_by_name(name).unwrap();
4500                        if col.is_null(row) {
4501                            continue;
4502                        }
4503
4504                        if name == src_col {
4505                            let val = Self::arrow_to_value(col.as_ref(), row);
4506                            src_vid = Some(Self::vid_from_value(&val)?);
4507                        } else if name == dst_col {
4508                            let val = Self::arrow_to_value(col.as_ref(), row);
4509                            dst_vid = Some(Self::vid_from_value(&val)?);
4510                        } else if let Some(pm) = target_props.get(name) {
4511                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4512                            let val =
4513                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4514                            props.insert(name.clone(), val);
4515                        }
4516                    }
4517
4518                    let src = src_vid
4519                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4520                    let dst = dst_vid.ok_or_else(|| {
4521                        anyhow!("Missing destination VID in column '{}'", dst_col)
4522                    })?;
4523
4524                    writer
4525                        .insert_edge(
4526                            src,
4527                            dst,
4528                            type_id,
4529                            eid,
4530                            props,
4531                            Some(target.to_string()),
4532                            None,
4533                        )
4534                        .await?;
4535                    count += 1;
4536                }
4537            }
4538        }
4539
4540        let mut res = HashMap::new();
4541        res.insert("count".to_string(), Value::Int(count as i64));
4542        Ok(vec![res])
4543    }
4544
4545    /// Opens a Parquet file from a cloud URL.
4546    ///
4547    /// Downloads the file to memory and creates a Parquet reader.
4548    async fn open_parquet_from_cloud(
4549        &self,
4550        source_url: &str,
4551    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4552        use object_store::ObjectStoreExt;
4553
4554        let (store, path) = build_store_from_url(source_url)?;
4555
4556        // Download file contents
4557        let bytes = store.get(&path).await?.bytes().await?;
4558
4559        // Create a Parquet reader from the bytes
4560        let reader = bytes::Bytes::from(bytes.to_vec());
4561        let builder =
4562            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4563        Ok(builder.build()?)
4564    }
4565
4566    pub(crate) async fn scan_edge_type(
4567        &self,
4568        edge_type: &str,
4569        ctx: Option<&QueryContext>,
4570    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4571        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4572
4573        // 1. Scan L2 (Base)
4574        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4575
4576        // 2. Scan L1 (Delta)
4577        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4578
4579        // 3. Scan L0 (Memory) and filter tombstoned vertices
4580        if let Some(ctx) = ctx {
4581            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4582            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4583        }
4584
4585        Ok(edges
4586            .into_iter()
4587            .map(|(eid, (src, dst))| (eid, src, dst))
4588            .collect())
4589    }
4590
4591    /// Scan L2 (base) storage for edges of a given type.
4592    ///
4593    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4594    /// This L2 scan will typically find no data.
4595    pub(crate) async fn scan_edge_type_l2(
4596        &self,
4597        _edge_type: &str,
4598        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4599    ) -> Result<()> {
4600        // Edges are now stored in delta datasets (L1) via LanceDB.
4601        // Legacy L2 base edge storage is no longer used.
4602        Ok(())
4603    }
4604
4605    /// Scan L1 (delta) storage for edges of a given type.
4606    pub(crate) async fn scan_edge_type_l1(
4607        &self,
4608        edge_type: &str,
4609        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4610    ) -> Result<()> {
4611        if let Ok(Some(batch)) = self
4612            .storage
4613            .scan_delta_table(
4614                edge_type,
4615                "fwd",
4616                &["eid", "src_vid", "dst_vid", "op", "_version"],
4617                None,
4618            )
4619            .await
4620        {
4621            // Collect ops with versions: eid -> (version, op, src, dst)
4622            let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4623                HashMap::new();
4624
4625            self.process_delta_batch(&batch, &mut versioned_ops)?;
4626
4627            // Apply the winning ops
4628            for (eid, (_, op, src, dst)) in versioned_ops {
4629                if op == 0 {
4630                    edges.insert(eid, (src, dst));
4631                } else if op == 1 {
4632                    edges.remove(&eid);
4633                }
4634            }
4635        }
4636        Ok(())
4637    }
4638
4639    /// Process a delta batch, tracking versioned operations.
4640    pub(crate) fn process_delta_batch(
4641        &self,
4642        batch: &arrow_array::RecordBatch,
4643        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4644    ) -> Result<()> {
4645        use arrow_array::UInt64Array;
4646        let eid_col = batch
4647            .column_by_name("eid")
4648            .ok_or(anyhow!("Missing eid"))?
4649            .as_any()
4650            .downcast_ref::<UInt64Array>()
4651            .ok_or(anyhow!("Invalid eid"))?;
4652        let src_col = batch
4653            .column_by_name("src_vid")
4654            .ok_or(anyhow!("Missing src_vid"))?
4655            .as_any()
4656            .downcast_ref::<UInt64Array>()
4657            .ok_or(anyhow!("Invalid src_vid"))?;
4658        let dst_col = batch
4659            .column_by_name("dst_vid")
4660            .ok_or(anyhow!("Missing dst_vid"))?
4661            .as_any()
4662            .downcast_ref::<UInt64Array>()
4663            .ok_or(anyhow!("Invalid dst_vid"))?;
4664        let op_col = batch
4665            .column_by_name("op")
4666            .ok_or(anyhow!("Missing op"))?
4667            .as_any()
4668            .downcast_ref::<arrow_array::UInt8Array>()
4669            .ok_or(anyhow!("Invalid op"))?;
4670        let version_col = batch
4671            .column_by_name("_version")
4672            .ok_or(anyhow!("Missing _version"))?
4673            .as_any()
4674            .downcast_ref::<UInt64Array>()
4675            .ok_or(anyhow!("Invalid _version"))?;
4676
4677        for i in 0..batch.num_rows() {
4678            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4679            let version = version_col.value(i);
4680            let op = op_col.value(i);
4681            let src = Vid::from(src_col.value(i));
4682            let dst = Vid::from(dst_col.value(i));
4683
4684            match versioned_ops.entry(eid) {
4685                std::collections::hash_map::Entry::Vacant(e) => {
4686                    e.insert((version, op, src, dst));
4687                }
4688                std::collections::hash_map::Entry::Occupied(mut e) => {
4689                    if version > e.get().0 {
4690                        e.insert((version, op, src, dst));
4691                    }
4692                }
4693            }
4694        }
4695        Ok(())
4696    }
4697
4698    /// Scan L0 (memory) buffers for edges of a given type.
4699    pub(crate) fn scan_edge_type_l0(
4700        &self,
4701        edge_type: &str,
4702        ctx: &QueryContext,
4703        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4704    ) {
4705        let schema = self.storage.schema_manager().schema();
4706        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4707
4708        if let Some(type_id) = type_id {
4709            // Main L0
4710            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4711
4712            // Transaction L0
4713            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4714                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4715            }
4716
4717            // Pending flush L0s
4718            for pending_l0_arc in &ctx.pending_flush_l0s {
4719                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4720            }
4721        }
4722    }
4723
4724    /// Scan a single L0 buffer for edges and apply tombstones.
4725    pub(crate) fn scan_single_l0(
4726        &self,
4727        l0: &uni_store::runtime::L0Buffer,
4728        type_id: u32,
4729        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4730    ) {
4731        for edge_entry in l0.graph.edges() {
4732            if edge_entry.edge_type == type_id {
4733                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4734            }
4735        }
4736        // Process Tombstones
4737        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4738        for eid in eids_to_check {
4739            if l0.is_tombstoned(eid) {
4740                edges.remove(&eid);
4741            }
4742        }
4743    }
4744
4745    /// Filter out edges connected to tombstoned vertices.
4746    pub(crate) fn filter_tombstoned_vertex_edges(
4747        &self,
4748        ctx: &QueryContext,
4749        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4750    ) {
4751        let l0 = ctx.l0.read();
4752        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4753
4754        // Include tx_l0 vertex tombstones if present
4755        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4756            let tx_l0 = tx_l0_arc.read();
4757            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4758        }
4759
4760        // Include pending flush L0 vertex tombstones
4761        for pending_l0_arc in &ctx.pending_flush_l0s {
4762            let pending_l0 = pending_l0_arc.read();
4763            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4764        }
4765
4766        edges.retain(|_, (src, dst)| {
4767            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4768        });
4769    }
4770
4771    /// Execute a projection operation.
4772    pub(crate) async fn execute_project(
4773        &self,
4774        input_rows: Vec<HashMap<String, Value>>,
4775        projections: &[(Expr, Option<String>)],
4776        prop_manager: &PropertyManager,
4777        params: &HashMap<String, Value>,
4778        ctx: Option<&QueryContext>,
4779    ) -> Result<Vec<HashMap<String, Value>>> {
4780        let mut results = Vec::new();
4781        for m in input_rows {
4782            let mut row = HashMap::new();
4783            for (expr, alias) in projections {
4784                let val = self
4785                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4786                    .await?;
4787                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4788                row.insert(name, val);
4789            }
4790            results.push(row);
4791        }
4792        Ok(results)
4793    }
4794
4795    /// Execute an UNWIND operation.
4796    pub(crate) async fn execute_unwind(
4797        &self,
4798        input_rows: Vec<HashMap<String, Value>>,
4799        expr: &Expr,
4800        variable: &str,
4801        prop_manager: &PropertyManager,
4802        params: &HashMap<String, Value>,
4803        ctx: Option<&QueryContext>,
4804    ) -> Result<Vec<HashMap<String, Value>>> {
4805        let mut results = Vec::new();
4806        for row in input_rows {
4807            let val = self
4808                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4809                .await?;
4810            if let Value::List(items) = val {
4811                for item in items {
4812                    let mut new_row = row.clone();
4813                    new_row.insert(variable.to_string(), item);
4814                    results.push(new_row);
4815                }
4816            }
4817        }
4818        Ok(results)
4819    }
4820
4821    /// Execute an APPLY (correlated subquery) operation.
4822    pub(crate) async fn execute_apply(
4823        &self,
4824        input_rows: Vec<HashMap<String, Value>>,
4825        subquery: &LogicalPlan,
4826        input_filter: Option<&Expr>,
4827        prop_manager: &PropertyManager,
4828        params: &HashMap<String, Value>,
4829        ctx: Option<&QueryContext>,
4830    ) -> Result<Vec<HashMap<String, Value>>> {
4831        let mut filtered_rows = input_rows;
4832
4833        if let Some(filter) = input_filter {
4834            let mut filtered = Vec::new();
4835            for row in filtered_rows {
4836                let res = self
4837                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4838                    .await?;
4839                if res.as_bool().unwrap_or(false) {
4840                    filtered.push(row);
4841                }
4842            }
4843            filtered_rows = filtered;
4844        }
4845
4846        // Handle empty input: execute subquery once with empty context
4847        // This is critical for standalone CALL statements at the beginning of a query
4848        if filtered_rows.is_empty() {
4849            let sub_rows = self
4850                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4851                .await?;
4852            return Ok(sub_rows);
4853        }
4854
4855        let mut results = Vec::new();
4856        for row in filtered_rows {
4857            let mut sub_params = params.clone();
4858            sub_params.extend(row.clone());
4859
4860            let sub_rows = self
4861                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4862                .await?;
4863
4864            for sub_row in sub_rows {
4865                let mut new_row = row.clone();
4866                new_row.extend(sub_row);
4867                results.push(new_row);
4868            }
4869        }
4870        Ok(results)
4871    }
4872
4873    /// Execute SHOW INDEXES command.
4874    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4875        let schema = self.storage.schema_manager().schema();
4876        let mut rows = Vec::new();
4877        for idx in &schema.indexes {
4878            let (name, type_str, details) = match idx {
4879                uni_common::core::schema::IndexDefinition::Vector(c) => (
4880                    c.name.clone(),
4881                    "VECTOR",
4882                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4883                ),
4884                uni_common::core::schema::IndexDefinition::FullText(c) => (
4885                    c.name.clone(),
4886                    "FULLTEXT",
4887                    format!("on {}:{:?}", c.label, c.properties),
4888                ),
4889                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4890                    cfg.name.clone(),
4891                    "SCALAR",
4892                    format!(":{}({:?})", cfg.label, cfg.properties),
4893                ),
4894                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4895            };
4896
4897            if let Some(f) = filter
4898                && f != type_str
4899            {
4900                continue;
4901            }
4902
4903            let mut row = HashMap::new();
4904            row.insert("name".to_string(), Value::String(name));
4905            row.insert("type".to_string(), Value::String(type_str.to_string()));
4906            row.insert("details".to_string(), Value::String(details));
4907            rows.push(row);
4908        }
4909        rows
4910    }
4911
4912    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4913        let mut row = HashMap::new();
4914        row.insert("name".to_string(), Value::String("uni".to_string()));
4915        // Could add storage path, etc.
4916        vec![row]
4917    }
4918
4919    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4920        // Placeholder as we don't easy access to config struct from here
4921        vec![]
4922    }
4923
4924    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4925        let snapshot = self
4926            .storage
4927            .snapshot_manager()
4928            .load_latest_snapshot()
4929            .await?;
4930        let mut results = Vec::new();
4931
4932        if let Some(snap) = snapshot {
4933            for (label, s) in &snap.vertices {
4934                let mut row = HashMap::new();
4935                row.insert("type".to_string(), Value::String("Label".to_string()));
4936                row.insert("name".to_string(), Value::String(label.clone()));
4937                row.insert("count".to_string(), Value::Int(s.count as i64));
4938                results.push(row);
4939            }
4940            for (edge, s) in &snap.edges {
4941                let mut row = HashMap::new();
4942                row.insert("type".to_string(), Value::String("Edge".to_string()));
4943                row.insert("name".to_string(), Value::String(edge.clone()));
4944                row.insert("count".to_string(), Value::Int(s.count as i64));
4945                results.push(row);
4946            }
4947        }
4948
4949        Ok(results)
4950    }
4951
4952    pub(crate) fn execute_show_constraints(
4953        &self,
4954        clause: ShowConstraints,
4955    ) -> Vec<HashMap<String, Value>> {
4956        let schema = self.storage.schema_manager().schema();
4957        let mut rows = Vec::new();
4958        for c in &schema.constraints {
4959            if let Some(target) = &clause.target {
4960                match (target, &c.target) {
4961                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4962                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4963                        if e1 == e2 => {}
4964                    _ => continue,
4965                }
4966            }
4967
4968            let mut row = HashMap::new();
4969            row.insert("name".to_string(), Value::String(c.name.clone()));
4970            let type_str = match c.constraint_type {
4971                ConstraintType::Unique { .. } => "UNIQUE",
4972                ConstraintType::Exists { .. } => "EXISTS",
4973                ConstraintType::Check { .. } => "CHECK",
4974                _ => "UNKNOWN",
4975            };
4976            row.insert("type".to_string(), Value::String(type_str.to_string()));
4977
4978            let target_str = match &c.target {
4979                ConstraintTarget::Label(l) => format!("(:{})", l),
4980                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4981                _ => "UNKNOWN".to_string(),
4982            };
4983            row.insert("target".to_string(), Value::String(target_str));
4984
4985            rows.push(row);
4986        }
4987        rows
4988    }
4989
4990    /// Execute a MERGE operation.
4991    pub(crate) async fn execute_cross_join(
4992        &self,
4993        left: Box<LogicalPlan>,
4994        right: Box<LogicalPlan>,
4995        prop_manager: &PropertyManager,
4996        params: &HashMap<String, Value>,
4997        ctx: Option<&QueryContext>,
4998    ) -> Result<Vec<HashMap<String, Value>>> {
4999        let left_rows = self
5000            .execute_subplan(*left, prop_manager, params, ctx)
5001            .await?;
5002        let right_rows = self
5003            .execute_subplan(*right, prop_manager, params, ctx)
5004            .await?;
5005
5006        let mut results = Vec::new();
5007        for l in &left_rows {
5008            for r in &right_rows {
5009                let mut combined = l.clone();
5010                combined.extend(r.clone());
5011                results.push(combined);
5012            }
5013        }
5014        Ok(results)
5015    }
5016
5017    /// Execute a UNION operation with optional deduplication.
5018    pub(crate) async fn execute_union(
5019        &self,
5020        left: Box<LogicalPlan>,
5021        right: Box<LogicalPlan>,
5022        all: bool,
5023        prop_manager: &PropertyManager,
5024        params: &HashMap<String, Value>,
5025        ctx: Option<&QueryContext>,
5026    ) -> Result<Vec<HashMap<String, Value>>> {
5027        let mut left_rows = self
5028            .execute_subplan(*left, prop_manager, params, ctx)
5029            .await?;
5030        let mut right_rows = self
5031            .execute_subplan(*right, prop_manager, params, ctx)
5032            .await?;
5033
5034        left_rows.append(&mut right_rows);
5035
5036        if !all {
5037            let mut seen = HashSet::new();
5038            left_rows.retain(|row| {
5039                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5040                let key = format!("{sorted_row:?}");
5041                seen.insert(key)
5042            });
5043        }
5044        Ok(left_rows)
5045    }
5046
5047    /// Check if an index with the given name exists.
5048    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5049        let schema = self.storage.schema_manager().schema();
5050        schema.indexes.iter().any(|idx| match idx {
5051            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5052            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5053            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5054            _ => false,
5055        })
5056    }
5057
5058    pub(crate) async fn execute_export(
5059        &self,
5060        target: &str,
5061        source: &str,
5062        options: &HashMap<String, Value>,
5063        prop_manager: &PropertyManager,
5064        ctx: Option<&QueryContext>,
5065    ) -> Result<Vec<HashMap<String, Value>>> {
5066        let format = options
5067            .get("format")
5068            .and_then(|v| v.as_str())
5069            .unwrap_or("csv")
5070            .to_lowercase();
5071
5072        match format.as_str() {
5073            "csv" => {
5074                self.execute_csv_export(target, source, options, prop_manager, ctx)
5075                    .await
5076            }
5077            "parquet" => {
5078                self.execute_parquet_export(target, source, options, prop_manager, ctx)
5079                    .await
5080            }
5081            _ => Err(anyhow!("Unsupported export format: {}", format)),
5082        }
5083    }
5084
5085    pub(crate) async fn execute_csv_export(
5086        &self,
5087        target: &str,
5088        source: &str,
5089        options: &HashMap<String, Value>,
5090        prop_manager: &PropertyManager,
5091        ctx: Option<&QueryContext>,
5092    ) -> Result<Vec<HashMap<String, Value>>> {
5093        // Validate destination path against sandbox
5094        let validated_dest = self.validate_path(source)?;
5095
5096        let schema = self.storage.schema_manager().schema();
5097        let label_meta = schema.labels.get(target);
5098        let edge_meta = schema.edge_types.get(target);
5099
5100        if label_meta.is_none() && edge_meta.is_none() {
5101            return Err(anyhow!("Target '{}' not found in schema", target));
5102        }
5103
5104        let delimiter_str = options
5105            .get("delimiter")
5106            .and_then(|v| v.as_str())
5107            .unwrap_or(",");
5108        let delimiter = if delimiter_str.is_empty() {
5109            b','
5110        } else {
5111            delimiter_str.as_bytes()[0]
5112        };
5113        let has_header = options
5114            .get("header")
5115            .and_then(|v| v.as_bool())
5116            .unwrap_or(true);
5117
5118        let mut wtr = csv::WriterBuilder::new()
5119            .delimiter(delimiter)
5120            .from_path(&validated_dest)?;
5121
5122        let mut count = 0;
5123        // Empty properties map for labels/edge types without registered properties
5124        let empty_props = HashMap::new();
5125
5126        if let Some(meta) = label_meta {
5127            let label_id = meta.id;
5128            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5129            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5130            prop_names.sort();
5131
5132            let mut headers = vec!["_vid".to_string()];
5133            headers.extend(prop_names.clone());
5134
5135            if has_header {
5136                wtr.write_record(&headers)?;
5137            }
5138
5139            let vids = self
5140                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5141                .await?;
5142
5143            for vid in vids {
5144                let props = prop_manager
5145                    .get_all_vertex_props_with_ctx(vid, ctx)
5146                    .await?
5147                    .unwrap_or_default();
5148
5149                let mut row = Vec::with_capacity(headers.len());
5150                row.push(vid.to_string());
5151                for p_name in &prop_names {
5152                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5153                    row.push(self.format_csv_value(val));
5154                }
5155                wtr.write_record(&row)?;
5156                count += 1;
5157            }
5158        } else if let Some(meta) = edge_meta {
5159            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5160            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5161            prop_names.sort();
5162
5163            // Headers for Edge: _eid, _src, _dst, _type, ...props
5164            let mut headers = vec![
5165                "_eid".to_string(),
5166                "_src".to_string(),
5167                "_dst".to_string(),
5168                "_type".to_string(),
5169            ];
5170            headers.extend(prop_names.clone());
5171
5172            if has_header {
5173                wtr.write_record(&headers)?;
5174            }
5175
5176            let edges = self.scan_edge_type(target, ctx).await?;
5177
5178            for (eid, src, dst) in edges {
5179                let props = prop_manager
5180                    .get_all_edge_props_with_ctx(eid, ctx)
5181                    .await?
5182                    .unwrap_or_default();
5183
5184                let mut row = Vec::with_capacity(headers.len());
5185                row.push(eid.to_string());
5186                row.push(src.to_string());
5187                row.push(dst.to_string());
5188                row.push(meta.id.to_string());
5189
5190                for p_name in &prop_names {
5191                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5192                    row.push(self.format_csv_value(val));
5193                }
5194                wtr.write_record(&row)?;
5195                count += 1;
5196            }
5197        }
5198
5199        wtr.flush()?;
5200        let mut res = HashMap::new();
5201        res.insert("count".to_string(), Value::Int(count as i64));
5202        Ok(vec![res])
5203    }
5204
5205    /// Exports data to Parquet format.
5206    ///
5207    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
5208    pub(crate) async fn execute_parquet_export(
5209        &self,
5210        target: &str,
5211        destination: &str,
5212        _options: &HashMap<String, Value>,
5213        prop_manager: &PropertyManager,
5214        ctx: Option<&QueryContext>,
5215    ) -> Result<Vec<HashMap<String, Value>>> {
5216        let schema_manager = self.storage.schema_manager();
5217        let schema = schema_manager.schema();
5218        let label_meta = schema.labels.get(target);
5219        let edge_meta = schema.edge_types.get(target);
5220
5221        if label_meta.is_none() && edge_meta.is_none() {
5222            return Err(anyhow!("Target '{}' not found in schema", target));
5223        }
5224
5225        let arrow_schema = if label_meta.is_some() {
5226            let dataset = self.storage.vertex_dataset(target)?;
5227            dataset.get_arrow_schema(&schema)?
5228        } else {
5229            // Edge Schema
5230            let dataset = self.storage.edge_dataset(target, "", "")?;
5231            dataset.get_arrow_schema(&schema)?
5232        };
5233
5234        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5235
5236        if let Some(meta) = label_meta {
5237            let label_id = meta.id;
5238            let vids = self
5239                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5240                .await?;
5241
5242            for vid in vids {
5243                let mut props = prop_manager
5244                    .get_all_vertex_props_with_ctx(vid, ctx)
5245                    .await?
5246                    .unwrap_or_default();
5247
5248                props.insert(
5249                    "_vid".to_string(),
5250                    uni_common::Value::Int(vid.as_u64() as i64),
5251                );
5252                if !props.contains_key("_uid") {
5253                    props.insert(
5254                        "_uid".to_string(),
5255                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5256                    );
5257                }
5258                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5259                props.insert("_version".to_string(), uni_common::Value::Int(1));
5260                rows.push(props);
5261            }
5262        } else if edge_meta.is_some() {
5263            let edges = self.scan_edge_type(target, ctx).await?;
5264            for (eid, src, dst) in edges {
5265                let mut props = prop_manager
5266                    .get_all_edge_props_with_ctx(eid, ctx)
5267                    .await?
5268                    .unwrap_or_default();
5269
5270                props.insert(
5271                    "eid".to_string(),
5272                    uni_common::Value::Int(eid.as_u64() as i64),
5273                );
5274                props.insert(
5275                    "src_vid".to_string(),
5276                    uni_common::Value::Int(src.as_u64() as i64),
5277                );
5278                props.insert(
5279                    "dst_vid".to_string(),
5280                    uni_common::Value::Int(dst.as_u64() as i64),
5281                );
5282                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5283                props.insert("_version".to_string(), uni_common::Value::Int(1));
5284                rows.push(props);
5285            }
5286        }
5287
5288        // Write to cloud or local file
5289        if is_cloud_url(destination) {
5290            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5291                .await?;
5292        } else {
5293            // Validate local destination path against sandbox
5294            let validated_dest = self.validate_path(destination)?;
5295            let file = std::fs::File::create(&validated_dest)?;
5296            let mut writer =
5297                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5298
5299            // Write all in one batch for now (simplification)
5300            if !rows.is_empty() {
5301                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5302                writer.write(&batch)?;
5303            }
5304
5305            writer.close()?;
5306        }
5307
5308        let mut res = HashMap::new();
5309        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5310        Ok(vec![res])
5311    }
5312
5313    /// Writes Parquet data to a cloud storage destination.
5314    async fn write_parquet_to_cloud(
5315        &self,
5316        dest_url: &str,
5317        rows: &[HashMap<String, uni_common::Value>],
5318        arrow_schema: &arrow_schema::Schema,
5319    ) -> Result<()> {
5320        use object_store::ObjectStoreExt;
5321
5322        let (store, path) = build_store_from_url(dest_url)?;
5323
5324        // Write to an in-memory buffer
5325        let mut buffer = Vec::new();
5326        {
5327            let mut writer = parquet::arrow::ArrowWriter::try_new(
5328                &mut buffer,
5329                Arc::new(arrow_schema.clone()),
5330                None,
5331            )?;
5332
5333            if !rows.is_empty() {
5334                let batch = self.rows_to_batch(rows, arrow_schema)?;
5335                writer.write(&batch)?;
5336            }
5337
5338            writer.close()?;
5339        }
5340
5341        // Upload to cloud storage
5342        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5343
5344        Ok(())
5345    }
5346
5347    pub(crate) fn rows_to_batch(
5348        &self,
5349        rows: &[HashMap<String, uni_common::Value>],
5350        schema: &arrow_schema::Schema,
5351    ) -> Result<RecordBatch> {
5352        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5353
5354        for field in schema.fields() {
5355            let name = field.name();
5356            let dt = field.data_type();
5357
5358            let values: Vec<uni_common::Value> = rows
5359                .iter()
5360                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5361                .collect();
5362            let array = self.values_to_array(&values, dt)?;
5363            columns.push(array);
5364        }
5365
5366        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5367    }
5368
5369    /// Convert a slice of Values to an Arrow array.
5370    /// Delegates to the shared implementation in arrow_convert module.
5371    pub(crate) fn values_to_array(
5372        &self,
5373        values: &[uni_common::Value],
5374        dt: &arrow_schema::DataType,
5375    ) -> Result<Arc<dyn Array>> {
5376        arrow_convert::values_to_array(values, dt)
5377    }
5378
5379    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5380        match val {
5381            Value::Null => "".to_string(),
5382            Value::String(s) => s,
5383            Value::Int(i) => i.to_string(),
5384            Value::Float(f) => f.to_string(),
5385            Value::Bool(b) => b.to_string(),
5386            _ => format!("{val}"),
5387        }
5388    }
5389
5390    pub(crate) fn parse_csv_value(
5391        &self,
5392        s: &str,
5393        data_type: &uni_common::core::schema::DataType,
5394        prop_name: &str,
5395    ) -> Result<Value> {
5396        if s.is_empty() || s.to_lowercase() == "null" {
5397            return Ok(Value::Null);
5398        }
5399
5400        use uni_common::core::schema::DataType;
5401        match data_type {
5402            DataType::String => Ok(Value::String(s.to_string())),
5403            DataType::Int32 | DataType::Int64 => {
5404                let i = s.parse::<i64>().map_err(|_| {
5405                    anyhow!(
5406                        "Failed to parse integer for property '{}': {}",
5407                        prop_name,
5408                        s
5409                    )
5410                })?;
5411                Ok(Value::Int(i))
5412            }
5413            DataType::Float32 | DataType::Float64 => {
5414                let f = s.parse::<f64>().map_err(|_| {
5415                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5416                })?;
5417                Ok(Value::Float(f))
5418            }
5419            DataType::Bool => {
5420                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5421                    anyhow!(
5422                        "Failed to parse boolean for property '{}': {}",
5423                        prop_name,
5424                        s
5425                    )
5426                })?;
5427                Ok(Value::Bool(b))
5428            }
5429            DataType::CypherValue => {
5430                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5431                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5432                })?;
5433                Ok(Value::from(json_val))
5434            }
5435            DataType::Vector { .. } => {
5436                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5437                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5438                })?;
5439                Ok(Value::Vector(v))
5440            }
5441            _ => Ok(Value::String(s.to_string())),
5442        }
5443    }
5444
5445    pub(crate) async fn detach_delete_vertex(
5446        &self,
5447        vid: Vid,
5448        writer: &Writer,
5449        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5450    ) -> Result<()> {
5451        let schema = self.storage.schema_manager().schema();
5452        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5453
5454        // 1. Find and delete all outgoing edges
5455        let out_graph = self
5456            .storage
5457            .load_subgraph_cached(
5458                &[vid],
5459                &edge_type_ids,
5460                1,
5461                uni_store::runtime::Direction::Outgoing,
5462                Some(writer.l0_manager.get_current()),
5463            )
5464            .await?;
5465
5466        for edge in out_graph.edges() {
5467            writer
5468                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5469                .await?;
5470        }
5471
5472        // 2. Find and delete all incoming edges
5473        let in_graph = self
5474            .storage
5475            .load_subgraph_cached(
5476                &[vid],
5477                &edge_type_ids,
5478                1,
5479                uni_store::runtime::Direction::Incoming,
5480                Some(writer.l0_manager.get_current()),
5481            )
5482            .await?;
5483
5484        for edge in in_graph.edges() {
5485            writer
5486                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5487                .await?;
5488        }
5489
5490        Ok(())
5491    }
5492
5493    /// Load outgoing + incoming subgraphs (1-hop) for a batch of vertices
5494    /// in two scans — one per direction. Shared infrastructure for
5495    /// `batch_detach_delete_vertices` (which deletes the loaded edges) and
5496    /// `batch_check_vertices_have_no_edges` (which errors if any
5497    /// non-tombstoned edges exist).
5498    ///
5499    /// Returns raw `WorkingGraph`s with **no tombstone filtering**;
5500    /// callers that care about same-batch edge deletes (e.g., the
5501    /// non-detach check that runs after edges have been individually
5502    /// DELETEd) layer the filter on top.
5503    async fn batch_load_incident_edges(
5504        &self,
5505        vids: &[Vid],
5506        writer: &Writer,
5507    ) -> Result<(
5508        uni_store::runtime::working_graph::WorkingGraph,
5509        uni_store::runtime::working_graph::WorkingGraph,
5510    )> {
5511        let schema = self.storage.schema_manager().schema();
5512        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5513        let l0 = Some(writer.l0_manager.get_current());
5514
5515        let out_graph = self
5516            .storage
5517            .load_subgraph_cached(
5518                vids,
5519                &edge_type_ids,
5520                1,
5521                uni_store::runtime::Direction::Outgoing,
5522                l0.clone(),
5523            )
5524            .await?;
5525        let in_graph = self
5526            .storage
5527            .load_subgraph_cached(
5528                vids,
5529                &edge_type_ids,
5530                1,
5531                uni_store::runtime::Direction::Incoming,
5532                l0,
5533            )
5534            .await?;
5535        Ok((out_graph, in_graph))
5536    }
5537
5538    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5539    pub(crate) async fn batch_detach_delete_vertices(
5540        &self,
5541        vids: &[Vid],
5542        labels_per_vid: Vec<Option<Vec<String>>>,
5543        writer: &Writer,
5544        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5545    ) -> Result<()> {
5546        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5547
5548        for edge in out_graph.edges() {
5549            writer
5550                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5551                .await?;
5552        }
5553        for edge in in_graph.edges() {
5554            writer
5555                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5556                .await?;
5557        }
5558
5559        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5560            writer.delete_vertex(*vid, labels, tx_l0).await?;
5561        }
5562
5563        Ok(())
5564    }
5565
5566    /// Batch non-detach DELETE check: verify NO incident edges exist for
5567    /// any VID in `vids`, modulo tombstones from the writer's L0 and the
5568    /// tx-local L0 (same set the per-VID `check_vertex_has_no_edges`
5569    /// consults at write.rs:2650-2673).
5570    ///
5571    /// Why this exists: the per-VID `check_vertex_has_no_edges` does two
5572    /// `load_subgraph_cached(&[vid], ...)` calls per vertex (outgoing +
5573    /// incoming). At N vertices that's 2N subgraph loads. The batched
5574    /// path collapses to 2 loads regardless of N.
5575    ///
5576    /// Tombstone visibility: detach (which deletes everything) doesn't
5577    /// need the tombstone filter; non-detach does, because the caller
5578    /// may have already DELETEd some incident edges in the same
5579    /// statement (e.g. `MATCH (a)-[r]->(b) DELETE r, a` — edges are
5580    /// deleted before nodes per mutation_common.rs:720-724, so by the
5581    /// time this check runs, `r`'s eid is in `tx_l0.tombstones` and
5582    /// must be excluded from the "still has edges" set).
5583    pub(crate) async fn batch_check_vertices_have_no_edges(
5584        &self,
5585        vids: &[Vid],
5586        writer: &Writer,
5587        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5588    ) -> Result<()> {
5589        if vids.is_empty() {
5590            return Ok(());
5591        }
5592
5593        let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5594            std::collections::HashSet::new();
5595        {
5596            let writer_l0 = writer.l0_manager.get_current();
5597            let guard = writer_l0.read();
5598            for &eid in guard.tombstones.keys() {
5599                tombstoned_eids.insert(eid);
5600            }
5601        }
5602        if let Some(tx) = tx_l0 {
5603            let guard = tx.read();
5604            for &eid in guard.tombstones.keys() {
5605                tombstoned_eids.insert(eid);
5606            }
5607        }
5608
5609        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5610
5611        for edge in out_graph.edges().chain(in_graph.edges()) {
5612            if !tombstoned_eids.contains(&edge.eid) {
5613                return Err(anyhow!(
5614                    "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5615                    edge.src_vid
5616                ));
5617            }
5618        }
5619        Ok(())
5620    }
5621}