Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55// DataFusion engine imports
56use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73/// Collect VIDs from all L0 buffers visible to a query context.
74///
75/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
76/// collects the results. Returns an empty vec when no query context is present.
77fn collect_l0_vids(
78    ctx: Option<&QueryContext>,
79    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81    let mut vids = Vec::new();
82    if let Some(ctx) = ctx {
83        vids.extend(extractor(&ctx.l0.read()));
84        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85            vids.extend(extractor(&tx_l0_arc.read()));
86        }
87        for pending_l0_arc in &ctx.pending_flush_l0s {
88            vids.extend(extractor(&pending_l0_arc.read()));
89        }
90    }
91    vids
92}
93
94/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
95///
96/// This is the fallback for pushdown hydration - if the entity only has system fields
97/// (indicating pushdown didn't load properties), we load all properties here.
98///
99/// System field counts:
100/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
101/// - Vertex: 3 fields (_vid, _label, _uid)
102async fn hydrate_entity_if_needed(
103    map: &mut HashMap<String, Value>,
104    prop_manager: &PropertyManager,
105    ctx: Option<&QueryContext>,
106) {
107    // Check for edge entity
108    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110            tracing::debug!(
111                "Pushdown fallback: hydrating edge {} at execution time",
112                eid_u64
113            );
114            if let Ok(Some(props)) = prop_manager
115                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116                .await
117            {
118                for (key, value) in props {
119                    map.entry(key).or_insert(value);
120                }
121            }
122        } else {
123            tracing::trace!(
124                "Pushdown success: edge {} already has {} properties",
125                eid_u64,
126                map.len() - EDGE_SYSTEM_FIELD_COUNT
127            );
128        }
129        return;
130    }
131
132    // Check for vertex entity
133    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135            tracing::debug!(
136                "Pushdown fallback: hydrating vertex {} at execution time",
137                vid_u64
138            );
139            if let Ok(Some(props)) = prop_manager
140                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141                .await
142            {
143                for (key, value) in props {
144                    map.entry(key).or_insert(value);
145                }
146            }
147        } else {
148            tracing::trace!(
149                "Pushdown success: vertex {} already has {} properties",
150                vid_u64,
151                map.len() - VERTEX_SYSTEM_FIELD_COUNT
152            );
153        }
154    }
155}
156
157/// Promote a VID-string placeholder variable to a `Map`, draining its
158/// dotted system/property columns (`var.<prop>`) into the new map.
159///
160/// Used by `record_batches_to_rows` for search-procedure outputs, where the
161/// bare variable arrives as a VID string rather than a materialized Map.
162fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163    let prefix = format!("{}.", var);
164    let mut map = HashMap::new();
165
166    let dotted_keys: Vec<String> = row
167        .keys()
168        .filter(|k| k.starts_with(&prefix))
169        .cloned()
170        .collect();
171
172    for key in &dotted_keys {
173        let prop_name = &key[prefix.len()..];
174        if let Some(val) = row.remove(key) {
175            map.insert(prop_name.to_string(), val);
176        }
177    }
178
179    // Replace the VID-string placeholder with the constructed Map
180    row.insert(var.to_string(), Value::Map(map));
181}
182
183/// Merge the helper system columns (`var._vid`, `var._labels`, `var._eid`,
184/// `var._type`) and remaining USER property columns (`var.<prop>`) into the
185/// bare `Map` variable, removing the dotted helper columns from `row`.
186///
187/// Merging user properties keeps `RETURN var.prop` consistent after a
188/// unit-subquery SET refreshes the dotted columns: without it the (now stale)
189/// bare entity Struct from the outer scan would override the post-SET values.
190/// Internal helpers (`_all_props`, `_src_vid`, …) and `overflow_json` are
191/// dropped silently.
192fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193    // Merge node system fields (_vid, _labels)
194    let vid_key = format!("{}._vid", var);
195    let labels_key = format!("{}._labels", var);
196
197    let vid_val = row.remove(&vid_key);
198    let labels_val = row.remove(&labels_key);
199
200    if let Some(Value::Map(map)) = row.get_mut(var) {
201        if let Some(v) = vid_val {
202            map.insert("_vid".to_string(), v);
203        }
204        if let Some(v) = labels_val {
205            map.insert("_labels".to_string(), v);
206        }
207    }
208
209    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
210    // These are emitted as helper columns by the traverse exec.
211    // The structural projection already includes them in the struct,
212    // but we still need to remove the dotted helper columns.
213    let eid_key = format!("{}._eid", var);
214    let type_key = format!("{}._type", var);
215
216    let eid_val = row.remove(&eid_key);
217    let type_val = row.remove(&type_key);
218
219    if (eid_val.is_some() || type_val.is_some())
220        && let Some(Value::Map(map)) = row.get_mut(var)
221    {
222        if let Some(v) = eid_val {
223            map.entry("_eid".to_string()).or_insert(v);
224        }
225        if let Some(v) = type_val {
226            map.entry("_type".to_string()).or_insert(v);
227        }
228    }
229
230    // Drain remaining dotted columns, merging surviving USER properties.
231    let prefix = format!("{}.", var);
232    let dotted_keys: Vec<String> = row
233        .keys()
234        .filter(|k| k.starts_with(&prefix))
235        .cloned()
236        .collect();
237    for key in dotted_keys {
238        let prop_name = key[prefix.len()..].to_string();
239        let val = row.remove(&key);
240        if prop_name.starts_with('_') || prop_name == "overflow_json" {
241            continue;
242        }
243        if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244            map.insert(prop_name, val);
245        }
246    }
247}
248
249impl Executor {
250    /// Helper to verify and filter candidates against an optional predicate.
251    ///
252    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
253    /// Returns only VIDs that pass the filter (or are not deleted).
254    async fn verify_and_filter_candidates(
255        &self,
256        mut candidates: Vec<Vid>,
257        variable: &str,
258        filter: Option<&Expr>,
259        ctx: Option<&QueryContext>,
260        prop_manager: &PropertyManager,
261        params: &HashMap<String, Value>,
262    ) -> Result<Vec<Vid>> {
263        candidates.sort_unstable();
264        candidates.dedup();
265
266        let mut verified_vids = Vec::new();
267        for vid in candidates {
268            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269                continue; // Deleted
270            };
271
272            if let Some(expr) = filter {
273                let mut props_map: HashMap<String, Value> = props;
274                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276                let mut row = HashMap::new();
277                row.insert(variable.to_string(), Value::Map(props_map));
278
279                let res = self
280                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
281                    .await?;
282                if res.as_bool().unwrap_or(false) {
283                    verified_vids.push(vid);
284                }
285            } else {
286                verified_vids.push(vid);
287            }
288        }
289
290        Ok(verified_vids)
291    }
292
293    pub(crate) async fn scan_storage_candidates(
294        &self,
295        label_id: u16,
296        variable: &str,
297        filter: Option<&Expr>,
298    ) -> Result<Vec<Vid>> {
299        let schema = self.storage.schema_manager().schema();
300        let label_name = schema
301            .label_name_by_id(label_id)
302            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304        // Generate filter SQL from expression
305        let empty_props = std::collections::HashMap::new();
306        let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307        let filter_sql = filter.and_then(|expr| {
308            LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309        });
310
311        self.storage
312            .scan_vertex_candidates(label_name, filter_sql.as_deref())
313            .await
314    }
315
316    pub(crate) async fn scan_label_with_filter(
317        &self,
318        label_id: u16,
319        variable: &str,
320        filter: Option<&Expr>,
321        ctx: Option<&QueryContext>,
322        prop_manager: &PropertyManager,
323        params: &HashMap<String, Value>,
324    ) -> Result<Vec<Vid>> {
325        let mut candidates = self
326            .scan_storage_candidates(label_id, variable, filter)
327            .await?;
328
329        // Convert label_id to label_name for L0 lookup
330        let schema = self.storage.schema_manager().schema();
331        if let Some(label_name) = schema.label_name_by_id(label_id) {
332            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333        }
334
335        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336            .await
337    }
338
339    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340        // Handle Value::Node directly (has vid field)
341        if let Value::Node(node) = val {
342            return Ok(node.vid);
343        }
344        // Handle Object (node) containing _vid field
345        if let Value::Map(map) = val
346            && let Some(vid_val) = map.get("_vid")
347            && let Some(v) = vid_val.as_u64()
348        {
349            return Ok(Vid::from(v));
350        }
351        // Handle string format
352        if let Some(s) = val.as_str()
353            && let Ok(id) = s.parse::<u64>()
354        {
355            return Ok(Vid::new(id));
356        }
357        // Handle raw u64
358        if let Some(v) = val.as_u64() {
359            return Ok(Vid::from(v));
360        }
361        Err(anyhow!("Invalid Vid format: {:?}", val))
362    }
363
364    /// Find a node value in the row by VID.
365    ///
366    /// Scans all values in the row, looking for a node (Map or Node) whose VID
367    /// matches the target. Returns the full node value if found, or a minimal
368    /// Map with just `_vid` as fallback.
369    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370        for val in row.values() {
371            if let Ok(vid) = Self::vid_from_value(val)
372                && vid == target_vid
373            {
374                return val.clone();
375            }
376        }
377        // Fallback: return minimal node map
378        Value::Map(HashMap::from([(
379            "_vid".to_string(),
380            Value::Int(target_vid.as_u64() as i64),
381        )]))
382    }
383
384    /// Create L0 context, session, and planner for DataFusion execution.
385    ///
386    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
387    /// Returns `(session_ctx, planner, prop_manager_arc)`.
388    pub async fn create_datafusion_planner(
389        &self,
390        prop_manager: &PropertyManager,
391        params: &HashMap<String, Value>,
392    ) -> Result<(
393        Arc<SyncRwLock<SessionContext>>,
394        HybridPhysicalPlanner,
395        Arc<PropertyManager>,
396    )> {
397        let query_ctx = self.get_context().await;
398        let l0_context = match query_ctx {
399            Some(ref ctx) => L0Context::from_query_context(ctx),
400            None => L0Context::empty(),
401        };
402
403        // C2: scans route through the version-pinned storage manager so
404        // post-snapshot L1 ROWS are invisible (the row-existence pin). The
405        // PropertyManager, by contrast, stays on LIVE storage: property
406        // point-reads must honor read-your-writes (a transaction's own
407        // uncommitted edge/vertex properties live in tx_l0, not L1, and a
408        // version filter would hide them — breaking e.g. MERGE's
409        // edge-property match). Cross-transaction property skew on an
410        // already-visible row is caught by OCC at commit, not the pin.
411        let effective_storage = self.effective_storage();
412
413        // Prefer the shared `Arc<PropertyManager>` installed via
414        // `Executor::set_prop_manager` — skips the LRU/Mutex allocation cost
415        // (~80 µs/query) of constructing a fresh, immediately-abandoned
416        // PropertyManager. Falls back to the legacy fresh-construction path
417        // for callers that have not installed one.
418        let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
419            shared.clone()
420        } else {
421            Arc::new(PropertyManager::new(
422                self.storage.clone(),
423                self.storage.schema_manager_arc(),
424                prop_manager.cache_size(),
425            ))
426        };
427
428        // Two-mode session construction (hot/cold template path, main)
429        // fused with plugin-framework scalar/optimizer registration
430        // (plugin-fw).
431        //
432        // Hot path: clone the pre-built `df_session_template` — an O(1)
433        // Arc bump on the inner `SessionState`. Skips the ~140 µs cost of
434        // `SessionContext::new()` + `register_cypher_udfs()`. The template
435        // only has the built-in Cypher UDFs baked in (see
436        // `Uni::build` → `register_cypher_udfs`), so it is ONLY safe to
437        // reuse when this query needs no per-query dynamic registration:
438        //   - no legacy `CustomFunctionRegistry` scalars,
439        //   - no plugin-registered optimizer rules,
440        //   - no host-level plugin scalars (`Uni::load_*_plugin`),
441        //   - no session-scoped plugin scalars.
442        //
443        // Cold path: construct a fresh, isolated `SessionContext` (folding
444        // in any plugin optimizer rules) and register the dynamic scalar
445        // sets so they do not leak into the shared template.
446        let has_custom_udfs = self
447            .custom_function_registry
448            .as_ref()
449            .is_some_and(|r| !r.is_empty());
450        let host_plugin_registry = self
451            .procedure_registry
452            .as_ref()
453            .and_then(|pr| pr.plugin_registry());
454        // Optimizer rules come from the host plugin registry (the legacy
455        // `CustomFunctionRegistry` only ever carried scalar fns). When no
456        // host registry is attached, there are no plugin optimizer rules.
457        let optimizer_providers = host_plugin_registry
458            .as_ref()
459            .map(|pr| pr.optimizer_rules())
460            .unwrap_or_default();
461        let session_local_registry =
462            crate::query::df_udfs_plugin::current_session_plugin_registry();
463        let needs_dynamic_registration = has_custom_udfs
464            || !optimizer_providers.is_empty()
465            || host_plugin_registry.is_some()
466            || session_local_registry.is_some();
467
468        let session = if let (Some(tmpl), false) = (
469            self.df_session_template.as_ref(),
470            needs_dynamic_registration,
471        ) {
472            // Hot path: clone the template (Cypher UDFs already registered).
473            (**tmpl).clone()
474        } else {
475            // Cold path: build a fresh session, optionally with any
476            // plugin-registered optimizer rules folded in.
477            //
478            // M5h: build a `SessionContext` that includes any
479            // plugin-registered optimizer rules. The rule chain is
480            // snapshotted at session-construction time; reload discipline
481            // is M10's problem.
482            let session = if optimizer_providers.is_empty() {
483                SessionContext::new()
484            } else {
485                use datafusion::execution::session_state::SessionStateBuilder;
486                use uni_plugin::traits::operator::OptimizerPhase;
487                let mut builder = SessionStateBuilder::new().with_default_features();
488                for provider in optimizer_providers.iter() {
489                    match provider.phase() {
490                        OptimizerPhase::Logical => {
491                            builder = builder.with_optimizer_rule(provider.rule());
492                        }
493                        OptimizerPhase::Physical => {
494                            if let Some(rule) = provider.physical_rule() {
495                                builder = builder.with_physical_optimizer_rule(rule);
496                            } else {
497                                tracing::debug!(
498                                    target: "uni.plugin.registry",
499                                    "physical-phase provider returned no physical_rule(); skipping"
500                                );
501                            }
502                        }
503                        OptimizerPhase::Both => {
504                            builder = builder.with_optimizer_rule(provider.rule());
505                            if let Some(rule) = provider.physical_rule() {
506                                builder = builder.with_physical_optimizer_rule(rule);
507                            }
508                        }
509                        _ => {
510                            tracing::debug!(
511                                target: "uni.plugin.registry",
512                                "skipping optimizer rule for unknown phase"
513                            );
514                        }
515                    }
516                }
517                let state = builder.build();
518                SessionContext::new_with_state(state)
519            };
520            crate::query::df_udfs::register_cypher_udfs(&session)?;
521            // Instance-scope, legacy `CustomFunctionRegistry` shadow path
522            // (db.register_function() entries + apoc-core mirrors). Routed
523            // through the plugin-registry adapter (plugin-fw).
524            if let Some(ref registry) = self.custom_function_registry {
525                crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
526                    &session, registry,
527                )?;
528            }
529            // Instance-scope, **host** plugin registry (M8.6 follow-up):
530            // `Uni::load_python_plugin` / `Uni::load_rhai_plugin` /
531            // `Uni::load_wasm_*` and other host-level plugin-load paths
532            // register into `UniInner.plugin_registry`. The executor
533            // reaches that registry via the procedure-registry's
534            // attached handle (set by `Uni::build` at construction time).
535            // Without this branch, scalars added through
536            // `Uni::load_*_plugin` were invisible to Cypher's UDF
537            // resolution despite being directly invokable via
538            // `db.plugin_registry().scalar_fn(...)`.
539            if let Some(ref host_pr) = host_plugin_registry {
540                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
541            }
542            // Session-scope (M8.6): when the call is inside a
543            // `scoped_with_session_plugin_registry` scope (set by a host
544            // crate's per-query execution path), register the session's
545            // local plugin scalars *last* so they shadow instance entries
546            // by name (DataFusion's `register_udf` is last-write-wins).
547            if let Some(ref session_local) = session_local_registry {
548                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
549            }
550            session
551        };
552        let session_ctx = Arc::new(SyncRwLock::new(session));
553
554        let mut planner = HybridPhysicalPlanner::with_l0_context(
555            session_ctx.clone(),
556            effective_storage.clone(),
557            l0_context,
558            prop_manager_arc.clone(),
559            effective_storage.schema_manager().schema(),
560            params.clone(),
561            HashMap::new(),
562        );
563
564        planner = planner.with_algo_registry(self.algo_registry.clone());
565        if let Some(ref registry) = self.procedure_registry {
566            planner = planner.with_procedure_registry(registry.clone());
567        }
568        // M5b follow-up #4: thread the host's plugin registry through to
569        // the physical planner so `plan_vector_knn` can consult
570        // `register_index_handle` for plugin-supplied vector index
571        // handles. The procedure registry's attached plugin registry is
572        // the host's actual instance (set by `Uni::build`, and what
573        // `db.plugin_registry()` returns to user code). The legacy
574        // `CustomFunctionRegistry` only ever carried scalar fns (no index
575        // handles / optimizer rules), so it is not a source here.
576        let host_plugin_registry = self
577            .procedure_registry
578            .as_ref()
579            .and_then(|r| r.plugin_registry());
580        if let Some(registry) = host_plugin_registry {
581            planner = planner.with_plugin_registry(registry);
582        }
583        if let Some(ref xervo_runtime) = self.xervo_runtime {
584            planner = planner.with_xervo_runtime(xervo_runtime.clone());
585        }
586        // FU-1 / M11 #6: thread the outer transaction's writer handle
587        // through so declared `WRITE`-mode procedures invoked from
588        // this query can run their Cypher bodies via the write-enabled
589        // inner-query host.
590        if let Some(writer) = &self.writer {
591            planner = planner.with_writer(Arc::clone(writer));
592        }
593
594        Ok((session_ctx, planner, prop_manager_arc))
595    }
596
597    /// Execute a DataFusion physical plan and collect all result batches.
598    pub fn collect_batches(
599        session_ctx: &Arc<SyncRwLock<SessionContext>>,
600        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
601    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
602        Box::pin(async move {
603            use futures::TryStreamExt;
604
605            let task_ctx = session_ctx.read().task_ctx();
606            let partition_count = execution_plan.output_partitioning().partition_count();
607            let mut all_batches = Vec::new();
608            for partition in 0..partition_count {
609                let stream = execution_plan.execute(partition, task_ctx.clone())?;
610                let batches: Vec<RecordBatch> = stream.try_collect().await?;
611                all_batches.extend(batches);
612            }
613            Ok(all_batches)
614        })
615    }
616
617    /// Executes a query using the DataFusion-based engine.
618    ///
619    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
620    /// trees with custom graph operators for graph-specific operations.
621    pub async fn execute_datafusion(
622        &self,
623        plan: LogicalPlan,
624        prop_manager: &PropertyManager,
625        params: &HashMap<String, Value>,
626    ) -> Result<Vec<RecordBatch>> {
627        let (batches, _plan) = self
628            .execute_datafusion_with_plan(plan, prop_manager, params)
629            .await?;
630        Ok(batches)
631    }
632
633    /// Executes a query using the DataFusion-based engine, returning both
634    /// result batches and the physical execution plan.
635    ///
636    /// The returned `Arc<dyn ExecutionPlan>` can be walked to extract per-operator
637    /// metrics (e.g., `output_rows`, `elapsed_compute`) that DataFusion's
638    /// `BaselineMetrics` recorded during execution.
639    pub async fn execute_datafusion_with_plan(
640        &self,
641        plan: LogicalPlan,
642        prop_manager: &PropertyManager,
643        params: &HashMap<String, Value>,
644    ) -> Result<(
645        Vec<RecordBatch>,
646        Arc<dyn datafusion::physical_plan::ExecutionPlan>,
647    )> {
648        let (session_ctx, mut planner, prop_manager_arc) =
649            self.create_datafusion_planner(prop_manager, params).await?;
650
651        // Build MutationContext when the plan contains write operations
652        if Self::contains_write_operations(&plan) {
653            let writer = self
654                .writer
655                .as_ref()
656                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
657                .clone();
658            let query_ctx = self.get_context().await;
659
660            debug_assert!(
661                query_ctx.is_some(),
662                "BUG: query_ctx is None for write operation"
663            );
664
665            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
666                executor: self.clone(),
667                writer,
668                prop_manager: prop_manager_arc,
669                params: params.clone(),
670                query_ctx,
671                tx_l0_override: self.transaction_l0_override.clone(),
672            });
673            planner = planner.with_mutation_context(mutation_ctx);
674            tracing::debug!(
675                plan_type = Self::get_plan_type(&plan),
676                "Mutation routed to DataFusion engine"
677            );
678        }
679
680        let execution_plan = planner.plan(&plan)?;
681        let plan_clone = Arc::clone(&execution_plan);
682        let result = Self::collect_batches(&session_ctx, execution_plan).await;
683
684        // Harvest warnings from the graph execution context after query completion.
685        let graph_warnings = planner.graph_ctx().take_warnings();
686        if !graph_warnings.is_empty()
687            && let Ok(mut w) = self.warnings.lock()
688        {
689            w.extend(graph_warnings);
690        }
691
692        result.map(|batches| (batches, plan_clone))
693    }
694
695    /// Execute a MERGE read sub-plan through the DataFusion engine.
696    ///
697    /// Plans and executes the MERGE pattern match using flat columnar output
698    /// (no structural projections), then groups dotted columns (`a._vid`,
699    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
700    pub(crate) async fn execute_merge_read_plan(
701        &self,
702        plan: LogicalPlan,
703        prop_manager: &PropertyManager,
704        params: &HashMap<String, Value>,
705        merge_variables: Vec<String>,
706    ) -> Result<Vec<HashMap<String, Value>>> {
707        let (session_ctx, planner, _prop_manager_arc) =
708            self.create_datafusion_planner(prop_manager, params).await?;
709
710        // Plan with full property access ("*") for all merge variables so that
711        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
712        let extra: HashMap<String, HashSet<String>> = merge_variables
713            .iter()
714            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
715            .collect();
716        let execution_plan = planner.plan_with_properties(&plan, extra)?;
717        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
718
719        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
720        let flat_rows = self.record_batches_to_rows(all_batches)?;
721
722        // Group dotted columns into per-variable Maps for MERGE's match logic.
723        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
724        let rows = flat_rows
725            .into_iter()
726            .map(|mut row| {
727                for var in &merge_variables {
728                    // Skip if already materialized (e.g., by record_batches_to_rows)
729                    if row.contains_key(var) {
730                        continue;
731                    }
732                    let prefix = format!("{}.", var);
733                    let dotted_keys: Vec<String> = row
734                        .keys()
735                        .filter(|k| k.starts_with(&prefix))
736                        .cloned()
737                        .collect();
738                    if !dotted_keys.is_empty() {
739                        let mut map = HashMap::new();
740                        for key in dotted_keys {
741                            let prop_name = key[prefix.len()..].to_string();
742                            if let Some(val) = row.remove(&key) {
743                                map.insert(prop_name, val);
744                            }
745                        }
746                        row.insert(var.clone(), Value::Map(map));
747                    }
748                }
749                row
750            })
751            .collect();
752
753        Ok(rows)
754    }
755
756    /// Converts DataFusion RecordBatches to row-based HashMap format.
757    ///
758    /// Handles special metadata on fields:
759    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
760    ///
761    /// Also normalizes path structures to user-facing format (converts _vid to _id).
762    pub(crate) fn record_batches_to_rows(
763        &self,
764        batches: Vec<RecordBatch>,
765    ) -> Result<Vec<HashMap<String, Value>>> {
766        let mut rows = Vec::new();
767
768        for batch in batches {
769            let num_rows = batch.num_rows();
770            let schema = batch.schema();
771
772            for row_idx in 0..num_rows {
773                let mut row = HashMap::new();
774
775                for (col_idx, field) in schema.fields().iter().enumerate() {
776                    let column = batch.column(col_idx);
777                    // Recover the Uni DataType so LargeBinary/struct columns decode
778                    // correctly. Raw `Bytes` columns are tagged with `uni_raw_bytes` at
779                    // scan time (scan.rs `property_field`) because `Bytes`, `CypherValue`,
780                    // and `Duration` all map to Arrow `LargeBinary` and are otherwise
781                    // indistinguishable here; without this hint a raw `Bytes` value would
782                    // be run through the CypherValue codec and corrupted (issue #93).
783                    let data_type = if field
784                        .metadata()
785                        .get("uni_raw_bytes")
786                        .is_some_and(|v| v == "true")
787                    {
788                        Some(&uni_common::DataType::Bytes)
789                    } else if uni_common::core::schema::is_datetime_struct(field.data_type()) {
790                        Some(&uni_common::DataType::DateTime)
791                    } else if uni_common::core::schema::is_time_struct(field.data_type()) {
792                        Some(&uni_common::DataType::Time)
793                    } else {
794                        None
795                    };
796                    let mut value =
797                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
798
799                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
800                    // Parse JSON string to restore the original type
801                    if field
802                        .metadata()
803                        .get("cv_encoded")
804                        .is_some_and(|v| v == "true")
805                        && let Value::String(s) = &value
806                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
807                    {
808                        value = Value::from(parsed);
809                    }
810
811                    // Normalize path structures to user-facing format
812                    value = Self::normalize_path_if_needed(value);
813
814                    row.insert(field.name().clone(), value);
815                }
816
817                // Merge system fields into bare variable maps.
818                // The projection step emits helper columns like "n._vid" and "n._labels"
819                // alongside the materialized "n" column (a Map of user properties).
820                // Here we merge those system fields into the map and remove the helpers.
821                //
822                // For search procedures (vector/FTS), the bare variable may be a VID
823                // string placeholder rather than a Map. In that case, promote it to a
824                // Map so we can merge system fields and properties into it.
825                let bare_vars: Vec<String> = row
826                    .keys()
827                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
828                    .cloned()
829                    .collect();
830
831                // Detect VID-placeholder variables that have system columns
832                // (e.g., "node" is String("1") but "node._vid" exists).
833                // Promote these to Maps so system fields can be merged in.
834                let vid_placeholder_vars: Vec<String> = row
835                    .keys()
836                    .filter(|k| {
837                        !k.contains('.')
838                            && matches!(row.get(*k), Some(Value::String(_)))
839                            && row.contains_key(&format!("{}._vid", k))
840                    })
841                    .cloned()
842                    .collect();
843
844                for var in &vid_placeholder_vars {
845                    promote_vid_placeholder(&mut row, var);
846                }
847
848                for var in &bare_vars {
849                    merge_dotted_columns(&mut row, var);
850                }
851
852                rows.push(row);
853            }
854        }
855
856        Ok(rows)
857    }
858
859    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
860    ///
861    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
862    /// Other values are returned unchanged to avoid interfering with query execution.
863    fn normalize_path_if_needed(value: Value) -> Value {
864        match value {
865            Value::Map(map)
866                if map.contains_key("nodes")
867                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
868            {
869                Self::normalize_path_map(map)
870            }
871            other => other,
872        }
873    }
874
875    /// Normalize a path map object.
876    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
877        // Normalize nodes array
878        if let Some(Value::List(nodes)) = map.remove("nodes") {
879            let normalized_nodes: Vec<Value> = nodes
880                .into_iter()
881                .map(|n| {
882                    if let Value::Map(node_map) = n {
883                        Self::normalize_path_node_map(node_map)
884                    } else {
885                        n
886                    }
887                })
888                .collect();
889            map.insert("nodes".to_string(), Value::List(normalized_nodes));
890        }
891
892        // Normalize relationships array (may be called "relationships" or "edges")
893        let rels_key = if map.contains_key("relationships") {
894            "relationships"
895        } else {
896            "edges"
897        };
898        if let Some(Value::List(rels)) = map.remove(rels_key) {
899            let normalized_rels: Vec<Value> = rels
900                .into_iter()
901                .map(|r| {
902                    if let Value::Map(rel_map) = r {
903                        Self::normalize_path_edge_map(rel_map)
904                    } else {
905                        r
906                    }
907                })
908                .collect();
909            map.insert("relationships".to_string(), Value::List(normalized_rels));
910        }
911
912        Value::Map(map)
913    }
914
915    /// Convert a Value to its string representation for path normalization.
916    fn value_to_id_string(val: Value) -> String {
917        match val {
918            Value::Int(n) => n.to_string(),
919            Value::Float(n) => n.to_string(),
920            Value::String(s) => s,
921            other => other.to_string(),
922        }
923    }
924
925    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
926    /// When `src_key == dst_key`, this simply stringifies the value in place.
927    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
928        if let Some(val) = map.remove(src_key) {
929            map.insert(
930                dst_key.to_string(),
931                Value::String(Self::value_to_id_string(val)),
932            );
933        }
934    }
935
936    /// Ensure the "properties" field is a non-null map.
937    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
938        match map.get("properties") {
939            Some(props) if !props.is_null() => {}
940            _ => {
941                map.insert("properties".to_string(), Value::Map(HashMap::new()));
942            }
943        }
944    }
945
946    /// Normalize a node within a path to user-facing format.
947    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
948        Self::stringify_map_field(&mut map, "_vid", "_id");
949        Self::ensure_properties_map(&mut map);
950        Value::Map(map)
951    }
952
953    /// Normalize an edge within a path to user-facing format.
954    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
955        Self::stringify_map_field(&mut map, "_eid", "_id");
956        Self::stringify_map_field(&mut map, "_src", "_src");
957        Self::stringify_map_field(&mut map, "_dst", "_dst");
958
959        if let Some(type_name) = map.remove("_type_name") {
960            map.insert("_type".to_string(), type_name);
961        }
962
963        Self::ensure_properties_map(&mut map);
964        Value::Map(map)
965    }
966
967    #[instrument(
968        skip(self, prop_manager, params),
969        fields(rows_returned, duration_ms),
970        level = "info"
971    )]
972    pub fn execute<'a>(
973        &'a self,
974        plan: LogicalPlan,
975        prop_manager: &'a PropertyManager,
976        params: &'a HashMap<String, Value>,
977    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
978        Box::pin(async move {
979            let query_type = Self::get_plan_type(&plan);
980            let ctx = self.get_context().await;
981            let start = Instant::now();
982
983            // Route DDL/Admin queries to the fallback executor.
984            // All other queries (including similar_to) flow through DataFusion.
985            let res = if Self::is_ddl_or_admin(&plan) {
986                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
987                    .await
988            } else {
989                let batches = self.execute_datafusion(plan, prop_manager, params).await?;
990                self.record_batches_to_rows(batches)
991            };
992
993            let duration = start.elapsed();
994            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
995                .record(duration.as_secs_f64());
996
997            tracing::Span::current().record("duration_ms", duration.as_millis());
998            match &res {
999                Ok(rows) => {
1000                    tracing::Span::current().record("rows_returned", rows.len());
1001                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
1002                        .increment(rows.len() as u64);
1003                }
1004                Err(e) => {
1005                    let error_type = if e.to_string().contains("timed out") {
1006                        "timeout"
1007                    } else if e.to_string().contains("syntax") {
1008                        "syntax"
1009                    } else {
1010                        "execution"
1011                    };
1012                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
1013                }
1014            }
1015
1016            res
1017        })
1018    }
1019
1020    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1021        match plan {
1022            LogicalPlan::Scan { .. } => "read_scan",
1023            LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1024            LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1025            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1026            LogicalPlan::Traverse { .. } => "read_traverse",
1027            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1028            LogicalPlan::ScanAll { .. } => "read_scan_all",
1029            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1030            LogicalPlan::VectorKnn { .. } => "read_vector",
1031            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1032            LogicalPlan::Merge { .. } => "write_merge",
1033            LogicalPlan::Delete { .. } => "write_delete",
1034            LogicalPlan::Set { .. } => "write_set",
1035            LogicalPlan::Remove { .. } => "write_remove",
1036            LogicalPlan::ProcedureCall { .. } => "call",
1037            LogicalPlan::Copy { .. } => "copy",
1038            LogicalPlan::Backup { .. } => "backup",
1039            _ => "other",
1040        }
1041    }
1042
1043    /// Return all direct child plan references from a `LogicalPlan`.
1044    ///
1045    /// This centralizes the variant→children mapping so that recursive walkers
1046    /// (e.g., `contains_write_operations`) can delegate the
1047    /// "recurse into children" logic instead of duplicating the match arms.
1048    ///
1049    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
1050    /// is not included because it requires special iteration. Callers that
1051    /// need to inspect the body should handle `Foreach` before falling through.
1052    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1053        match plan {
1054            // Single-input wrappers
1055            LogicalPlan::Project { input, .. }
1056            | LogicalPlan::Sort { input, .. }
1057            | LogicalPlan::Limit { input, .. }
1058            | LogicalPlan::Distinct { input }
1059            | LogicalPlan::Aggregate { input, .. }
1060            | LogicalPlan::Window { input, .. }
1061            | LogicalPlan::Unwind { input, .. }
1062            | LogicalPlan::Filter { input, .. }
1063            | LogicalPlan::Create { input, .. }
1064            | LogicalPlan::CreateBatch { input, .. }
1065            | LogicalPlan::Set { input, .. }
1066            | LogicalPlan::Remove { input, .. }
1067            | LogicalPlan::Delete { input, .. }
1068            | LogicalPlan::Merge { input, .. }
1069            | LogicalPlan::Foreach { input, .. }
1070            | LogicalPlan::Traverse { input, .. }
1071            | LogicalPlan::TraverseMainByType { input, .. }
1072            | LogicalPlan::BindZeroLengthPath { input, .. }
1073            | LogicalPlan::BindPath { input, .. }
1074            | LogicalPlan::ShortestPath { input, .. }
1075            | LogicalPlan::AllShortestPaths { input, .. }
1076            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1077
1078            // Two-input wrappers
1079            LogicalPlan::Apply {
1080                input, subquery, ..
1081            }
1082            | LogicalPlan::SubqueryCall { input, subquery } => {
1083                vec![input.as_ref(), subquery.as_ref()]
1084            }
1085            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1086                vec![left.as_ref(), right.as_ref()]
1087            }
1088            LogicalPlan::RecursiveCTE {
1089                initial, recursive, ..
1090            } => vec![initial.as_ref(), recursive.as_ref()],
1091            LogicalPlan::QuantifiedPattern {
1092                input,
1093                pattern_plan,
1094                ..
1095            } => vec![input.as_ref(), pattern_plan.as_ref()],
1096
1097            // Leaf nodes (scans, DDL, admin, etc.)
1098            _ => vec![],
1099        }
1100    }
1101
1102    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
1103    ///
1104    /// These operations don't produce data streams and aren't supported by the
1105    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
1106    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
1107    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
1108    pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1109        match plan {
1110            // DDL / schema operations
1111            LogicalPlan::CreateLabel(_)
1112            | LogicalPlan::CreateEdgeType(_)
1113            | LogicalPlan::AlterLabel(_)
1114            | LogicalPlan::AlterEdgeType(_)
1115            | LogicalPlan::DropLabel(_)
1116            | LogicalPlan::DropEdgeType(_)
1117            | LogicalPlan::CreateConstraint(_)
1118            | LogicalPlan::DropConstraint(_)
1119            | LogicalPlan::ShowConstraints(_) => true,
1120
1121            // Index operations
1122            LogicalPlan::CreateVectorIndex { .. }
1123            | LogicalPlan::CreateFullTextIndex { .. }
1124            | LogicalPlan::CreateScalarIndex { .. }
1125            | LogicalPlan::CreateJsonFtsIndex { .. }
1126            | LogicalPlan::DropIndex { .. }
1127            | LogicalPlan::ShowIndexes { .. } => true,
1128
1129            // Admin / utility operations
1130            LogicalPlan::ShowDatabase
1131            | LogicalPlan::ShowConfig
1132            | LogicalPlan::ShowStatistics
1133            | LogicalPlan::Vacuum
1134            | LogicalPlan::Checkpoint
1135            | LogicalPlan::Copy { .. }
1136            | LogicalPlan::CopyTo { .. }
1137            | LogicalPlan::CopyFrom { .. }
1138            | LogicalPlan::Backup { .. }
1139            | LogicalPlan::Explain { .. } => true,
1140
1141            // Procedure calls: DF-eligible procedures go through DataFusion,
1142            // everything else (DDL, admin, unknown) stays on fallback.
1143            LogicalPlan::ProcedureCall { procedure_name, .. } => {
1144                !Self::is_df_eligible_procedure(procedure_name)
1145            }
1146
1147            // Recurse through children using plan_children
1148            _ => Self::plan_children(plan)
1149                .iter()
1150                .any(|child| Self::is_ddl_or_admin(child)),
1151        }
1152    }
1153
1154    /// Returns `true` if the procedure is a read-only, data-producing procedure
1155    /// that can be executed through the DataFusion engine.
1156    ///
1157    /// This is a **positive allowlist** — unknown procedures default to the
1158    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
1159    fn is_df_eligible_procedure(name: &str) -> bool {
1160        matches!(
1161            name,
1162            "uni.schema.labels"
1163                | "uni.schema.edgeTypes"
1164                | "uni.schema.relationshipTypes"
1165                | "uni.schema.indexes"
1166                | "uni.schema.constraints"
1167                | "uni.schema.labelInfo"
1168                | "uni.vector.query"
1169                | "uni.fts.query"
1170                | "uni.search"
1171                // M5g — `uni.create.vNode` produces a typed Node yield
1172                // via the planner-level node-shape expansion in
1173                // `GraphProcedureCallExec`; only the DataFusion path
1174                // honours that, so route it there explicitly. (vEdge
1175                // emits a self-contained Struct column and works on
1176                // either path — list both for symmetry.)
1177                | "uni.create.vNode"
1178                | "uni.create.vEdge"
1179        ) || name.starts_with("uni.algo.")
1180    }
1181
1182    /// Check if a plan contains write/mutation operations anywhere in the tree.
1183    ///
1184    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
1185    /// are used to determine when a MutationContext needs to be built for DataFusion.
1186    /// This recurses through read-only wrapper nodes to detect writes nested inside
1187    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
1188    fn contains_write_operations(plan: &LogicalPlan) -> bool {
1189        match plan {
1190            LogicalPlan::Create { .. }
1191            | LogicalPlan::CreateBatch { .. }
1192            | LogicalPlan::Merge { .. }
1193            | LogicalPlan::Delete { .. }
1194            | LogicalPlan::Set { .. }
1195            | LogicalPlan::Remove { .. }
1196            | LogicalPlan::Foreach { .. } => true,
1197            _ => Self::plan_children(plan)
1198                .iter()
1199                .any(|child| Self::contains_write_operations(child)),
1200        }
1201    }
1202
1203    /// Executes a query as a stream of result batches.
1204    ///
1205    /// Routes DDL/Admin through the fallback executor and everything else
1206    /// through DataFusion.
1207    pub fn execute_stream(
1208        self,
1209        plan: LogicalPlan,
1210        prop_manager: Arc<PropertyManager>,
1211        params: HashMap<String, Value>,
1212    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1213        let this = self;
1214        let this_for_ctx = this.clone();
1215
1216        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1217
1218        ctx_stream
1219            .flat_map(move |ctx| {
1220                let plan = plan.clone();
1221                let this = this.clone();
1222                let prop_manager = prop_manager.clone();
1223                let params = params.clone();
1224
1225                let fut = async move {
1226                    if Self::is_ddl_or_admin(&plan) {
1227                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1228                            .await
1229                    } else {
1230                        let batches = this
1231                            .execute_datafusion(plan, &prop_manager, &params)
1232                            .await?;
1233                        this.record_batches_to_rows(batches)
1234                    }
1235                };
1236                stream::once(fut).boxed()
1237            })
1238            .boxed()
1239    }
1240
1241    /// Converts an Arrow array element at a given row index to a Value.
1242    /// Delegates to the shared implementation in arrow_convert module.
1243    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1244        arrow_convert::arrow_to_value(col, row, None)
1245    }
1246
1247    pub(crate) fn evaluate_expr<'a>(
1248        &'a self,
1249        expr: &'a Expr,
1250        row: &'a HashMap<String, Value>,
1251        prop_manager: &'a PropertyManager,
1252        params: &'a HashMap<String, Value>,
1253        ctx: Option<&'a QueryContext>,
1254    ) -> BoxFuture<'a, Result<Value>> {
1255        let this = self;
1256        Box::pin(async move {
1257            // First check if the expression itself is already pre-computed in the row
1258            let repr = expr.to_string_repr();
1259            if let Some(val) = row.get(&repr) {
1260                return Ok(val.clone());
1261            }
1262
1263            match expr {
1264                Expr::PatternComprehension { .. } => {
1265                    // Handled by DataFusion path via PatternComprehensionExecExpr
1266                    Err(anyhow::anyhow!(
1267                        "Pattern comprehensions are handled by DataFusion executor"
1268                    ))
1269                }
1270                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1271                    "COLLECT subqueries not yet supported in executor"
1272                )),
1273                Expr::Variable(name) => {
1274                    if let Some(val) = row.get(name) {
1275                        Ok(val.clone())
1276                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1277                        // Fallback: scan results may have system columns like "d._vid"
1278                        // without a materialized "d" Map. Return the VID so Property
1279                        // evaluation can fetch properties from storage.
1280                        Ok(vid_val.clone())
1281                    } else {
1282                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1283                    }
1284                }
1285                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1286                Expr::Property(var_expr, prop_name) => {
1287                    // Fast path: if the base is a Variable, try flat-key lookup first.
1288                    // DataFusion scan results use flat keys like "d.embedding" rather than
1289                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1290                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1291                        let flat_key = format!("{}.{}", var_name, prop_name);
1292                        if let Some(val) = row.get(flat_key.as_str()) {
1293                            return Ok(val.clone());
1294                        }
1295                    }
1296
1297                    let base_val = this
1298                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1299                        .await?;
1300
1301                    // Handle system properties _vid and _id directly
1302                    if (prop_name == "_vid" || prop_name == "_id")
1303                        && let Ok(vid) = Self::vid_from_value(&base_val)
1304                    {
1305                        return Ok(Value::Int(vid.as_u64() as i64));
1306                    }
1307
1308                    // Handle Value::Node - access properties directly or via prop manager
1309                    if let Value::Node(node) = &base_val {
1310                        // Handle system properties
1311                        if prop_name == "_vid" || prop_name == "_id" {
1312                            return Ok(Value::Int(node.vid.as_u64() as i64));
1313                        }
1314                        if prop_name == "_labels" {
1315                            return Ok(Value::List(
1316                                node.labels
1317                                    .iter()
1318                                    .map(|l| Value::String(l.clone()))
1319                                    .collect(),
1320                            ));
1321                        }
1322                        // Check in-memory properties first
1323                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1324                            return Ok(val.clone());
1325                        }
1326                        // Fallback to storage lookup
1327                        if let Ok(val) = prop_manager
1328                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1329                            .await
1330                        {
1331                            return Ok(val);
1332                        }
1333                        return Ok(Value::Null);
1334                    }
1335
1336                    // Handle Value::Edge - access properties directly or via prop manager
1337                    if let Value::Edge(edge) = &base_val {
1338                        // Handle system properties
1339                        if prop_name == "_eid" || prop_name == "_id" {
1340                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1341                        }
1342                        if prop_name == "_type" {
1343                            return Ok(Value::String(edge.edge_type.clone()));
1344                        }
1345                        if prop_name == "_src" {
1346                            return Ok(Value::Int(edge.src.as_u64() as i64));
1347                        }
1348                        if prop_name == "_dst" {
1349                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1350                        }
1351                        // Check in-memory properties first
1352                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1353                            return Ok(val.clone());
1354                        }
1355                        // Fallback to storage lookup
1356                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1357                        {
1358                            return Ok(val);
1359                        }
1360                        return Ok(Value::Null);
1361                    }
1362
1363                    // If base_val is an object (node/edge), check its properties first
1364                    // This handles properties from CREATE/MERGE that may not be persisted yet
1365                    if let Value::Map(map) = &base_val {
1366                        // First check top-level (for system properties like _id, _label, etc.)
1367                        if let Some(val) = map.get(prop_name.as_str()) {
1368                            return Ok(val.clone());
1369                        }
1370                        // Then check inside "properties" object (for user properties)
1371                        if let Some(Value::Map(props)) = map.get("properties")
1372                            && let Some(val) = props.get(prop_name.as_str())
1373                        {
1374                            return Ok(val.clone());
1375                        }
1376                        // Fallback to storage lookup using _vid or _id
1377                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1378                            map.get("_id")
1379                                .and_then(|v| v.as_str())
1380                                .and_then(|s| s.parse::<u64>().ok())
1381                        });
1382                        if let Some(id) = vid_opt {
1383                            let vid = Vid::from(id);
1384                            if let Ok(val) = prop_manager
1385                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1386                                .await
1387                            {
1388                                return Ok(val);
1389                            }
1390                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1391                            let eid = uni_common::core::id::Eid::from(id);
1392                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1393                                return Ok(val);
1394                            }
1395                        }
1396                        return Ok(Value::Null);
1397                    }
1398
1399                    // If base_val is just a VID, fetch from property manager
1400                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1401                        return prop_manager
1402                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1403                            .await;
1404                    }
1405
1406                    if base_val.is_null() {
1407                        return Ok(Value::Null);
1408                    }
1409
1410                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1411                    {
1412                        use crate::query::datetime::{
1413                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1414                            is_duration_string, is_temporal_accessor, is_temporal_string,
1415                        };
1416
1417                        // Handle Value::Temporal directly (no string parsing needed)
1418                        if let Value::Temporal(tv) = &base_val {
1419                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1420                                if is_duration_accessor(prop_name) {
1421                                    // Convert to string for the existing accessor logic
1422                                    return eval_duration_accessor(
1423                                        &base_val.to_string(),
1424                                        prop_name,
1425                                    );
1426                                }
1427                            } else if is_temporal_accessor(prop_name) {
1428                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1429                            }
1430                        }
1431
1432                        // Handle Value::String temporal (backward compat)
1433                        if let Value::String(s) = &base_val {
1434                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1435                                return eval_temporal_accessor(s, prop_name);
1436                            }
1437                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1438                                return eval_duration_accessor(s, prop_name);
1439                            }
1440                        }
1441                    }
1442
1443                    Err(anyhow!(
1444                        "Cannot access property '{}' on {:?}",
1445                        prop_name,
1446                        base_val
1447                    ))
1448                }
1449                Expr::ArrayIndex {
1450                    array: arr_expr,
1451                    index: idx_expr,
1452                } => {
1453                    let arr_val = this
1454                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1455                        .await?;
1456                    let idx_val = this
1457                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1458                        .await?;
1459
1460                    if let Value::List(arr) = &arr_val {
1461                        // Handle signed indices (allow negative)
1462                        if let Some(i) = idx_val.as_i64() {
1463                            let idx = if i < 0 {
1464                                // Negative index: -1 = last element, -2 = second to last, etc.
1465                                let positive_idx = arr.len() as i64 + i;
1466                                if positive_idx < 0 {
1467                                    return Ok(Value::Null); // Out of bounds
1468                                }
1469                                positive_idx as usize
1470                            } else {
1471                                i as usize
1472                            };
1473                            if idx < arr.len() {
1474                                return Ok(arr[idx].clone());
1475                            }
1476                            return Ok(Value::Null);
1477                        } else if idx_val.is_null() {
1478                            return Ok(Value::Null);
1479                        } else {
1480                            return Err(anyhow::anyhow!(
1481                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1482                                idx_val
1483                            ));
1484                        }
1485                    }
1486                    if let Value::Map(map) = &arr_val {
1487                        if let Some(key) = idx_val.as_str() {
1488                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1489                        } else if !idx_val.is_null() {
1490                            return Err(anyhow::anyhow!(
1491                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1492                                idx_val
1493                            ));
1494                        }
1495                    }
1496                    // Handle bracket access on Node: n['name'] returns property
1497                    if let Value::Node(node) = &arr_val {
1498                        if let Some(key) = idx_val.as_str() {
1499                            // Check in-memory properties first
1500                            if let Some(val) = node.properties.get(key) {
1501                                return Ok(val.clone());
1502                            }
1503                            // Fallback to property manager
1504                            if let Ok(val) = prop_manager
1505                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1506                                .await
1507                            {
1508                                return Ok(val);
1509                            }
1510                            return Ok(Value::Null);
1511                        } else if !idx_val.is_null() {
1512                            return Err(anyhow::anyhow!(
1513                                "TypeError: Node index must be a string, got: {:?}",
1514                                idx_val
1515                            ));
1516                        }
1517                    }
1518                    // Handle bracket access on Edge: e['property'] returns property
1519                    if let Value::Edge(edge) = &arr_val {
1520                        if let Some(key) = idx_val.as_str() {
1521                            // Check in-memory properties first
1522                            if let Some(val) = edge.properties.get(key) {
1523                                return Ok(val.clone());
1524                            }
1525                            // Fallback to property manager
1526                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1527                                return Ok(val);
1528                            }
1529                            return Ok(Value::Null);
1530                        } else if !idx_val.is_null() {
1531                            return Err(anyhow::anyhow!(
1532                                "TypeError: Edge index must be a string, got: {:?}",
1533                                idx_val
1534                            ));
1535                        }
1536                    }
1537                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1538                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1539                        && let Some(key) = idx_val.as_str()
1540                    {
1541                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1542                        {
1543                            return Ok(val);
1544                        }
1545                        return Ok(Value::Null);
1546                    }
1547                    if arr_val.is_null() {
1548                        return Ok(Value::Null);
1549                    }
1550                    Err(anyhow!(
1551                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1552                        arr_val
1553                    ))
1554                }
1555                Expr::ArraySlice { array, start, end } => {
1556                    let arr_val = this
1557                        .evaluate_expr(array, row, prop_manager, params, ctx)
1558                        .await?;
1559
1560                    if let Value::List(arr) = &arr_val {
1561                        let len = arr.len();
1562
1563                        // Evaluate start index (default to 0), null → null result
1564                        let start_idx = if let Some(s) = start {
1565                            let v = this
1566                                .evaluate_expr(s, row, prop_manager, params, ctx)
1567                                .await?;
1568                            if v.is_null() {
1569                                return Ok(Value::Null);
1570                            }
1571                            let raw = v.as_i64().unwrap_or(0);
1572                            if raw < 0 {
1573                                (len as i64 + raw).max(0) as usize
1574                            } else {
1575                                (raw as usize).min(len)
1576                            }
1577                        } else {
1578                            0
1579                        };
1580
1581                        // Evaluate end index (default to length), null → null result
1582                        let end_idx = if let Some(e) = end {
1583                            let v = this
1584                                .evaluate_expr(e, row, prop_manager, params, ctx)
1585                                .await?;
1586                            if v.is_null() {
1587                                return Ok(Value::Null);
1588                            }
1589                            let raw = v.as_i64().unwrap_or(len as i64);
1590                            if raw < 0 {
1591                                (len as i64 + raw).max(0) as usize
1592                            } else {
1593                                (raw as usize).min(len)
1594                            }
1595                        } else {
1596                            len
1597                        };
1598
1599                        // Return sliced array
1600                        if start_idx >= end_idx {
1601                            return Ok(Value::List(vec![]));
1602                        }
1603                        let end_idx = end_idx.min(len);
1604                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1605                    }
1606
1607                    if arr_val.is_null() {
1608                        return Ok(Value::Null);
1609                    }
1610                    Err(anyhow!("Cannot slice {:?}", arr_val))
1611                }
1612                Expr::Literal(lit) => Ok(lit.to_value()),
1613                Expr::List(items) => {
1614                    let mut vals = Vec::new();
1615                    for item in items {
1616                        vals.push(
1617                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1618                                .await?,
1619                        );
1620                    }
1621                    Ok(Value::List(vals))
1622                }
1623                Expr::Map(items) => {
1624                    let mut map = HashMap::new();
1625                    for (key, value_expr) in items {
1626                        let val = this
1627                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1628                            .await?;
1629                        map.insert(key.clone(), val);
1630                    }
1631                    Ok(Value::Map(map))
1632                }
1633                Expr::Exists { query, .. } => {
1634                    // Plan and execute subquery; failures return false (pattern doesn't match)
1635                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1636                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1637
1638                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1639                        Ok(plan) => {
1640                            let mut sub_params = params.clone();
1641                            sub_params.extend(row.clone());
1642
1643                            match this.execute(plan, prop_manager, &sub_params).await {
1644                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1645                                Err(e) => {
1646                                    log::debug!("EXISTS subquery execution failed: {}", e);
1647                                    Ok(Value::Bool(false))
1648                                }
1649                            }
1650                        }
1651                        Err(e) => {
1652                            log::debug!("EXISTS subquery planning failed: {}", e);
1653                            Ok(Value::Bool(false))
1654                        }
1655                    }
1656                }
1657                Expr::CountSubquery(query) => {
1658                    // Similar to Exists but returns count
1659                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1660
1661                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1662
1663                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1664                        Ok(plan) => {
1665                            let mut sub_params = params.clone();
1666                            sub_params.extend(row.clone());
1667
1668                            match this.execute(plan, prop_manager, &sub_params).await {
1669                                Ok(results) => Ok(Value::from(results.len() as i64)),
1670                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1671                            }
1672                        }
1673                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1674                    }
1675                }
1676                Expr::Quantifier {
1677                    quantifier,
1678                    variable,
1679                    list,
1680                    predicate,
1681                } => {
1682                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1683                    //
1684                    // This is the primary execution path for quantifiers because DataFusion
1685                    // does not support lambda functions yet. Queries with quantifiers attempt
1686                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1687                    // to this fallback executor path.
1688                    //
1689                    // This is intentional design - we get correct semantics with row-by-row
1690                    // evaluation until DataFusion adds lambda support.
1691                    //
1692                    // See: https://github.com/apache/datafusion/issues/14205
1693
1694                    // Evaluate the list expression
1695                    let list_val = this
1696                        .evaluate_expr(list, row, prop_manager, params, ctx)
1697                        .await?;
1698
1699                    // Handle null propagation
1700                    if list_val.is_null() {
1701                        return Ok(Value::Null);
1702                    }
1703
1704                    // Convert to array
1705                    let items = match list_val {
1706                        Value::List(arr) => arr,
1707                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1708                    };
1709
1710                    // Evaluate predicate for each item
1711                    let mut satisfied_count = 0;
1712                    for item in &items {
1713                        // Create new row with bound variable
1714                        let mut item_row = row.clone();
1715                        item_row.insert(variable.clone(), item.clone());
1716
1717                        // Evaluate predicate with bound variable
1718                        let pred_result = this
1719                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1720                            .await?;
1721
1722                        // Check if predicate is satisfied
1723                        if let Value::Bool(true) = pred_result {
1724                            satisfied_count += 1;
1725                        }
1726                    }
1727
1728                    // Return based on quantifier type
1729                    let result = match quantifier {
1730                        Quantifier::All => satisfied_count == items.len(),
1731                        Quantifier::Any => satisfied_count > 0,
1732                        Quantifier::Single => satisfied_count == 1,
1733                        Quantifier::None => satisfied_count == 0,
1734                    };
1735
1736                    Ok(Value::Bool(result))
1737                }
1738                Expr::ListComprehension {
1739                    variable,
1740                    list,
1741                    where_clause,
1742                    map_expr,
1743                } => {
1744                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1745                    //
1746                    // Similar to quantifiers, this requires lambda-like evaluation
1747                    // which DataFusion doesn't support yet. This is the primary execution path.
1748
1749                    // Evaluate the list expression
1750                    let list_val = this
1751                        .evaluate_expr(list, row, prop_manager, params, ctx)
1752                        .await?;
1753
1754                    // Handle null propagation
1755                    if list_val.is_null() {
1756                        return Ok(Value::Null);
1757                    }
1758
1759                    // Convert to array
1760                    let items = match list_val {
1761                        Value::List(arr) => arr,
1762                        _ => {
1763                            return Err(anyhow!(
1764                                "List comprehension expects a list, got: {:?}",
1765                                list_val
1766                            ));
1767                        }
1768                    };
1769
1770                    // Collect mapped values
1771                    let mut results = Vec::new();
1772                    for item in &items {
1773                        // Create new row with bound variable
1774                        let mut item_row = row.clone();
1775                        item_row.insert(variable.clone(), item.clone());
1776
1777                        // Apply WHERE filter if present
1778                        if let Some(predicate) = where_clause {
1779                            let pred_result = this
1780                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1781                                .await?;
1782
1783                            // Skip items that don't match the filter
1784                            if !matches!(pred_result, Value::Bool(true)) {
1785                                continue;
1786                            }
1787                        }
1788
1789                        // Apply map expression
1790                        let mapped_val = this
1791                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1792                            .await?;
1793                        results.push(mapped_val);
1794                    }
1795
1796                    Ok(Value::List(results))
1797                }
1798                Expr::BinaryOp { left, op, right } => {
1799                    // Short-circuit evaluation for AND/OR
1800                    match op {
1801                        BinaryOp::And => {
1802                            let l_val = this
1803                                .evaluate_expr(left, row, prop_manager, params, ctx)
1804                                .await?;
1805                            // Short-circuit: if left is false, don't evaluate right
1806                            if let Some(false) = l_val.as_bool() {
1807                                return Ok(Value::Bool(false));
1808                            }
1809                            let r_val = this
1810                                .evaluate_expr(right, row, prop_manager, params, ctx)
1811                                .await?;
1812                            eval_binary_op(&l_val, op, &r_val)
1813                        }
1814                        BinaryOp::Or => {
1815                            let l_val = this
1816                                .evaluate_expr(left, row, prop_manager, params, ctx)
1817                                .await?;
1818                            // Short-circuit: if left is true, don't evaluate right
1819                            if let Some(true) = l_val.as_bool() {
1820                                return Ok(Value::Bool(true));
1821                            }
1822                            let r_val = this
1823                                .evaluate_expr(right, row, prop_manager, params, ctx)
1824                                .await?;
1825                            eval_binary_op(&l_val, op, &r_val)
1826                        }
1827                        _ => {
1828                            // For all other operators, evaluate both sides
1829                            let l_val = this
1830                                .evaluate_expr(left, row, prop_manager, params, ctx)
1831                                .await?;
1832                            let r_val = this
1833                                .evaluate_expr(right, row, prop_manager, params, ctx)
1834                                .await?;
1835                            eval_binary_op(&l_val, op, &r_val)
1836                        }
1837                    }
1838                }
1839                Expr::In { expr, list } => {
1840                    let l_val = this
1841                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1842                        .await?;
1843                    let r_val = this
1844                        .evaluate_expr(list, row, prop_manager, params, ctx)
1845                        .await?;
1846                    eval_in_op(&l_val, &r_val)
1847                }
1848                Expr::UnaryOp { op, expr } => {
1849                    let val = this
1850                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1851                        .await?;
1852                    match op {
1853                        UnaryOp::Not => {
1854                            // Three-valued logic: NOT null = null
1855                            match val.as_bool() {
1856                                Some(b) => Ok(Value::Bool(!b)),
1857                                None if val.is_null() => Ok(Value::Null),
1858                                None => Err(anyhow!(
1859                                    "InvalidArgumentType: NOT requires a boolean argument"
1860                                )),
1861                            }
1862                        }
1863                        UnaryOp::Neg => {
1864                            if let Some(i) = val.as_i64() {
1865                                Ok(Value::Int(-i))
1866                            } else if let Some(f) = val.as_f64() {
1867                                Ok(Value::Float(-f))
1868                            } else {
1869                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1870                            }
1871                        }
1872                    }
1873                }
1874                Expr::IsNull(expr) => {
1875                    let val = this
1876                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1877                        .await?;
1878                    Ok(Value::Bool(val.is_null()))
1879                }
1880                Expr::IsNotNull(expr) => {
1881                    let val = this
1882                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1883                        .await?;
1884                    Ok(Value::Bool(!val.is_null()))
1885                }
1886                Expr::IsUnique(_) => {
1887                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1888                    Err(anyhow!(
1889                        "IS UNIQUE can only be used in constraint definitions"
1890                    ))
1891                }
1892                Expr::Case {
1893                    expr,
1894                    when_then,
1895                    else_expr,
1896                } => {
1897                    if let Some(base_expr) = expr {
1898                        let base_val = this
1899                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1900                            .await?;
1901                        for (w, t) in when_then {
1902                            let w_val = this
1903                                .evaluate_expr(w, row, prop_manager, params, ctx)
1904                                .await?;
1905                            if base_val == w_val {
1906                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1907                            }
1908                        }
1909                    } else {
1910                        for (w, t) in when_then {
1911                            let w_val = this
1912                                .evaluate_expr(w, row, prop_manager, params, ctx)
1913                                .await?;
1914                            if w_val.as_bool() == Some(true) {
1915                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1916                            }
1917                        }
1918                    }
1919                    if let Some(e) = else_expr {
1920                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1921                    }
1922                    Ok(Value::Null)
1923                }
1924                Expr::Wildcard => Ok(Value::Null),
1925                Expr::FunctionCall { name, args, .. } => {
1926                    // Special case: id() returns VID for nodes and EID for relationships
1927                    if name.eq_ignore_ascii_case("ID") {
1928                        if args.len() != 1 {
1929                            return Err(anyhow!("id() requires exactly 1 argument"));
1930                        }
1931                        let val = this
1932                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1933                            .await?;
1934                        if let Value::Map(map) = &val {
1935                            // Check for _vid (vertex) first
1936                            if let Some(vid_val) = map.get("_vid") {
1937                                return Ok(vid_val.clone());
1938                            }
1939                            // Check for _eid (edge/relationship)
1940                            if let Some(eid_val) = map.get("_eid") {
1941                                return Ok(eid_val.clone());
1942                            }
1943                            // Check for _id (fallback)
1944                            if let Some(id_val) = map.get("_id") {
1945                                return Ok(id_val.clone());
1946                            }
1947                        }
1948                        return Ok(Value::Null);
1949                    }
1950
1951                    // Special case: elementId() returns string format "label_id:local_offset"
1952                    if name.eq_ignore_ascii_case("ELEMENTID") {
1953                        if args.len() != 1 {
1954                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1955                        }
1956                        let val = this
1957                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1958                            .await?;
1959                        if let Value::Map(map) = &val {
1960                            // Check for _vid (vertex) first
1961                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1962                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1963                                return Ok(Value::String(vid_val.to_string()));
1964                            }
1965                            // Check for _eid (edge/relationship)
1966                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1967                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1968                                return Ok(Value::String(eid_val.to_string()));
1969                            }
1970                        }
1971                        return Ok(Value::Null);
1972                    }
1973
1974                    // Special case: type() returns the relationship type name
1975                    if name.eq_ignore_ascii_case("TYPE") {
1976                        if args.len() != 1 {
1977                            return Err(anyhow!("type() requires exactly 1 argument"));
1978                        }
1979                        let val = this
1980                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1981                            .await?;
1982                        if let Value::Map(map) = &val
1983                            && let Some(type_val) = map.get("_type")
1984                        {
1985                            // Numeric _type is an edge type ID; string _type is already a name
1986                            if let Some(type_id) =
1987                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1988                            {
1989                                if let Some(name) = this
1990                                    .storage
1991                                    .schema_manager()
1992                                    .edge_type_name_by_id_unified(type_id)
1993                                {
1994                                    return Ok(Value::String(name));
1995                                }
1996                            } else if let Some(name) = type_val.as_str() {
1997                                return Ok(Value::String(name.to_string()));
1998                            }
1999                        }
2000                        return Ok(Value::Null);
2001                    }
2002
2003                    // Special case: labels() returns the labels of a node
2004                    if name.eq_ignore_ascii_case("LABELS") {
2005                        if args.len() != 1 {
2006                            return Err(anyhow!("labels() requires exactly 1 argument"));
2007                        }
2008                        let val = this
2009                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2010                            .await?;
2011                        if let Value::Map(map) = &val
2012                            && let Some(labels_val) = map.get("_labels")
2013                        {
2014                            return Ok(labels_val.clone());
2015                        }
2016                        return Ok(Value::Null);
2017                    }
2018
2019                    // Special case: properties() returns the properties map of a node/edge
2020                    if name.eq_ignore_ascii_case("PROPERTIES") {
2021                        if args.len() != 1 {
2022                            return Err(anyhow!("properties() requires exactly 1 argument"));
2023                        }
2024                        let val = this
2025                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2026                            .await?;
2027                        if let Value::Map(map) = &val {
2028                            // Filter out internal properties (those starting with _)
2029                            let mut props = HashMap::new();
2030                            for (k, v) in map.iter() {
2031                                if !k.starts_with('_') {
2032                                    props.insert(k.clone(), v.clone());
2033                                }
2034                            }
2035                            return Ok(Value::Map(props));
2036                        }
2037                        return Ok(Value::Null);
2038                    }
2039
2040                    // Special case: startNode() returns the start node of a relationship
2041                    if name.eq_ignore_ascii_case("STARTNODE") {
2042                        if args.len() != 1 {
2043                            return Err(anyhow!("startNode() requires exactly 1 argument"));
2044                        }
2045                        let val = this
2046                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2047                            .await?;
2048                        if let Value::Edge(edge) = &val {
2049                            return Ok(Self::find_node_by_vid(row, edge.src));
2050                        }
2051                        if let Value::Map(map) = &val {
2052                            if let Some(start_node) = map.get("_startNode") {
2053                                return Ok(start_node.clone());
2054                            }
2055                            if let Some(src_vid) = map.get("_src_vid") {
2056                                return Ok(Value::Map(HashMap::from([(
2057                                    "_vid".to_string(),
2058                                    src_vid.clone(),
2059                                )])));
2060                            }
2061                            // Resolve _src VID by looking up node in row
2062                            if let Some(src_id) = map.get("_src")
2063                                && let Some(u) = src_id.as_u64()
2064                            {
2065                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2066                            }
2067                        }
2068                        return Ok(Value::Null);
2069                    }
2070
2071                    // Special case: endNode() returns the end node of a relationship
2072                    if name.eq_ignore_ascii_case("ENDNODE") {
2073                        if args.len() != 1 {
2074                            return Err(anyhow!("endNode() requires exactly 1 argument"));
2075                        }
2076                        let val = this
2077                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2078                            .await?;
2079                        if let Value::Edge(edge) = &val {
2080                            return Ok(Self::find_node_by_vid(row, edge.dst));
2081                        }
2082                        if let Value::Map(map) = &val {
2083                            if let Some(end_node) = map.get("_endNode") {
2084                                return Ok(end_node.clone());
2085                            }
2086                            if let Some(dst_vid) = map.get("_dst_vid") {
2087                                return Ok(Value::Map(HashMap::from([(
2088                                    "_vid".to_string(),
2089                                    dst_vid.clone(),
2090                                )])));
2091                            }
2092                            // Resolve _dst VID by looking up node in row
2093                            if let Some(dst_id) = map.get("_dst")
2094                                && let Some(u) = dst_id.as_u64()
2095                            {
2096                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2097                            }
2098                        }
2099                        return Ok(Value::Null);
2100                    }
2101
2102                    // Special case: hasLabel() checks if a node has a specific label
2103                    // Used for WHERE n:Label predicates
2104                    if name.eq_ignore_ascii_case("HASLABEL") {
2105                        if args.len() != 2 {
2106                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2107                        }
2108                        let node_val = this
2109                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2110                            .await?;
2111                        let label_val = this
2112                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2113                            .await?;
2114
2115                        let label_to_check = label_val.as_str().ok_or_else(|| {
2116                            anyhow!("Second argument to hasLabel must be a string")
2117                        })?;
2118
2119                        let has_label = match &node_val {
2120                            // Handle proper Value::Node type (from result normalization)
2121                            Value::Map(map) if map.contains_key("_vid") => {
2122                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2123                                    labels_arr
2124                                        .iter()
2125                                        .any(|l| l.as_str() == Some(label_to_check))
2126                                } else {
2127                                    false
2128                                }
2129                            }
2130                            // Also handle legacy Object format
2131                            Value::Map(map) => {
2132                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2133                                    labels_arr
2134                                        .iter()
2135                                        .any(|l| l.as_str() == Some(label_to_check))
2136                                } else {
2137                                    false
2138                                }
2139                            }
2140                            _ => false,
2141                        };
2142                        return Ok(Value::Bool(has_label));
2143                    }
2144
2145                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
2146                    // These should be parsed as Expr::Quantifier instead.
2147                    if matches!(
2148                        name.to_uppercase().as_str(),
2149                        "ANY" | "ALL" | "NONE" | "SINGLE"
2150                    ) {
2151                        return Err(anyhow!(
2152                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2153                            name.to_lowercase()
2154                        ));
2155                    }
2156
2157                    // Special case: COALESCE needs short-circuit evaluation
2158                    if name.eq_ignore_ascii_case("COALESCE") {
2159                        for arg in args {
2160                            let val = this
2161                                .evaluate_expr(arg, row, prop_manager, params, ctx)
2162                                .await?;
2163                            if !val.is_null() {
2164                                return Ok(val);
2165                            }
2166                        }
2167                        return Ok(Value::Null);
2168                    }
2169
2170                    // Special case: vector_similarity has dedicated implementation
2171                    if name.eq_ignore_ascii_case("vector_similarity") {
2172                        if args.len() != 2 {
2173                            return Err(anyhow!("vector_similarity takes 2 arguments"));
2174                        }
2175                        let v1 = this
2176                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2177                            .await?;
2178                        let v2 = this
2179                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2180                            .await?;
2181                        return eval_vector_similarity(&v1, &v2);
2182                    }
2183
2184                    // Special case: uni.validAt handles node fetching
2185                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
2186                        || name.eq_ignore_ascii_case("uni.validAt")
2187                        || name.eq_ignore_ascii_case("validAt")
2188                    {
2189                        if args.len() != 4 {
2190                            return Err(anyhow!("validAt requires 4 arguments"));
2191                        }
2192                        let node_val = this
2193                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2194                            .await?;
2195                        let start_prop = this
2196                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2197                            .await?
2198                            .as_str()
2199                            .ok_or(anyhow!("start_prop must be string"))?
2200                            .to_string();
2201                        let end_prop = this
2202                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2203                            .await?
2204                            .as_str()
2205                            .ok_or(anyhow!("end_prop must be string"))?
2206                            .to_string();
2207                        let time_val = this
2208                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2209                            .await?;
2210
2211                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2212                            anyhow!("time argument must be a datetime value or string")
2213                        })?;
2214
2215                        // Fetch temporal property values - supports both vertices and edges
2216                        let valid_from_val: Option<Value> = if let Ok(vid) =
2217                            Self::vid_from_value(&node_val)
2218                        {
2219                            // Vertex case - VID string format
2220                            prop_manager
2221                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2222                                .await
2223                                .ok()
2224                        } else if let Value::Map(map) = &node_val {
2225                            // Check for embedded _vid or _eid in object
2226                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2227                                let vid = Vid::from(vid_val);
2228                                prop_manager
2229                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2230                                    .await
2231                                    .ok()
2232                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2233                                // Edge case
2234                                let eid = uni_common::core::id::Eid::from(eid_val);
2235                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2236                            } else {
2237                                // Inline object - property embedded directly
2238                                map.get(&start_prop).cloned()
2239                            }
2240                        } else {
2241                            return Ok(Value::Bool(false));
2242                        };
2243
2244                        let valid_from = match valid_from_val {
2245                            Some(ref v) => match value_to_datetime_utc(v) {
2246                                Some(dt) => dt,
2247                                None if v.is_null() => return Ok(Value::Bool(false)),
2248                                None => {
2249                                    return Err(anyhow!(
2250                                        "Property {} must be a datetime value or string",
2251                                        start_prop
2252                                    ));
2253                                }
2254                            },
2255                            None => return Ok(Value::Bool(false)),
2256                        };
2257
2258                        let valid_to_val: Option<Value> = if let Ok(vid) =
2259                            Self::vid_from_value(&node_val)
2260                        {
2261                            // Vertex case - VID string format
2262                            prop_manager
2263                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2264                                .await
2265                                .ok()
2266                        } else if let Value::Map(map) = &node_val {
2267                            // Check for embedded _vid or _eid in object
2268                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2269                                let vid = Vid::from(vid_val);
2270                                prop_manager
2271                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2272                                    .await
2273                                    .ok()
2274                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2275                                // Edge case
2276                                let eid = uni_common::core::id::Eid::from(eid_val);
2277                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2278                            } else {
2279                                // Inline object - property embedded directly
2280                                map.get(&end_prop).cloned()
2281                            }
2282                        } else {
2283                            return Ok(Value::Bool(false));
2284                        };
2285
2286                        let valid_to = match valid_to_val {
2287                            Some(ref v) => match value_to_datetime_utc(v) {
2288                                Some(dt) => Some(dt),
2289                                None if v.is_null() => None,
2290                                None => {
2291                                    return Err(anyhow!(
2292                                        "Property {} must be a datetime value or null",
2293                                        end_prop
2294                                    ));
2295                                }
2296                            },
2297                            None => None,
2298                        };
2299
2300                        let is_valid = valid_from <= query_time
2301                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2302                        return Ok(Value::Bool(is_valid));
2303                    }
2304
2305                    // For all other functions, evaluate arguments then call helper
2306                    let mut evaluated_args = Vec::with_capacity(args.len());
2307                    for arg in args {
2308                        let mut val = this
2309                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2310                            .await?;
2311
2312                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2313                        // Functions like validAt() need access to properties like valid_from/valid_to.
2314                        if let Value::Map(ref mut map) = val {
2315                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2316                        }
2317
2318                        evaluated_args.push(val);
2319                    }
2320                    eval_scalar_function(
2321                        name,
2322                        &evaluated_args,
2323                        self.custom_function_registry.as_deref(),
2324                    )
2325                }
2326                Expr::Reduce {
2327                    accumulator,
2328                    init,
2329                    variable,
2330                    list,
2331                    expr,
2332                } => {
2333                    let mut acc = self
2334                        .evaluate_expr(init, row, prop_manager, params, ctx)
2335                        .await?;
2336                    let list_val = self
2337                        .evaluate_expr(list, row, prop_manager, params, ctx)
2338                        .await?;
2339
2340                    if let Value::List(items) = list_val {
2341                        for item in items {
2342                            // Create a temporary scope/row with accumulator and variable
2343                            // For simplicity in fallback executor, we can construct a new row map
2344                            // merging current row + new variables.
2345                            let mut scope = row.clone();
2346                            scope.insert(accumulator.clone(), acc.clone());
2347                            scope.insert(variable.clone(), item);
2348
2349                            acc = self
2350                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2351                                .await?;
2352                        }
2353                    } else {
2354                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2355                    }
2356                    Ok(acc)
2357                }
2358                Expr::ValidAt { .. } => {
2359                    // VALID_AT should have been transformed to a function call in the planner
2360                    Err(anyhow!(
2361                        "VALID_AT expression should have been transformed to function call in planner"
2362                    ))
2363                }
2364
2365                Expr::LabelCheck { expr, labels } => {
2366                    let val = this
2367                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2368                        .await?;
2369                    match &val {
2370                        Value::Null => Ok(Value::Null),
2371                        Value::Map(map) => {
2372                            // Check if this is an edge (has _eid) or node (has _vid)
2373                            let is_edge = map.contains_key("_eid")
2374                                || map.contains_key("_type_name")
2375                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2376
2377                            if is_edge {
2378                                // Edges have a single type
2379                                if labels.len() > 1 {
2380                                    return Ok(Value::Bool(false));
2381                                }
2382                                let label_to_check = &labels[0];
2383                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2384                                {
2385                                    t == label_to_check
2386                                } else if let Some(Value::String(t)) = map.get("_type") {
2387                                    t == label_to_check
2388                                } else {
2389                                    false
2390                                };
2391                                Ok(Value::Bool(has_type))
2392                            } else {
2393                                // Node: check all labels
2394                                let has_all = labels.iter().all(|label_to_check| {
2395                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2396                                        labels_arr
2397                                            .iter()
2398                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2399                                    } else {
2400                                        false
2401                                    }
2402                                });
2403                                Ok(Value::Bool(has_all))
2404                            }
2405                        }
2406                        _ => Ok(Value::Bool(false)),
2407                    }
2408                }
2409
2410                Expr::MapProjection { base, items } => {
2411                    let base_value = this
2412                        .evaluate_expr(base, row, prop_manager, params, ctx)
2413                        .await?;
2414
2415                    // Extract properties from the base object
2416                    let properties = match &base_value {
2417                        Value::Map(map) => map,
2418                        _ => {
2419                            return Err(anyhow!(
2420                                "Map projection requires object, got {:?}",
2421                                base_value
2422                            ));
2423                        }
2424                    };
2425
2426                    let mut result_map = HashMap::new();
2427
2428                    for item in items {
2429                        match item {
2430                            MapProjectionItem::Property(prop) => {
2431                                if let Some(value) = properties.get(prop.as_str()) {
2432                                    result_map.insert(prop.clone(), value.clone());
2433                                }
2434                            }
2435                            MapProjectionItem::AllProperties => {
2436                                // Include all properties except internal fields (those starting with _)
2437                                for (key, value) in properties.iter() {
2438                                    if !key.starts_with('_') {
2439                                        result_map.insert(key.clone(), value.clone());
2440                                    }
2441                                }
2442                            }
2443                            MapProjectionItem::LiteralEntry(key, expr) => {
2444                                let value = this
2445                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2446                                    .await?;
2447                                result_map.insert(key.clone(), value);
2448                            }
2449                            MapProjectionItem::Variable(var_name) => {
2450                                // Variable selector: include the value of the variable in the result
2451                                // e.g., person{.name, friend} includes the value of 'friend' variable
2452                                if let Some(value) = row.get(var_name.as_str()) {
2453                                    result_map.insert(var_name.clone(), value.clone());
2454                                }
2455                            }
2456                        }
2457                    }
2458
2459                    Ok(Value::Map(result_map))
2460                }
2461            }
2462        })
2463    }
2464
2465    pub(crate) fn execute_subplan<'a>(
2466        &'a self,
2467        plan: LogicalPlan,
2468        prop_manager: &'a PropertyManager,
2469        params: &'a HashMap<String, Value>,
2470        ctx: Option<&'a QueryContext>,
2471    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2472        Box::pin(async move {
2473            if let Some(ctx) = ctx {
2474                ctx.check_timeout()?;
2475            }
2476            match plan {
2477                LogicalPlan::Union { left, right, all } => {
2478                    self.execute_union(left, right, all, prop_manager, params, ctx)
2479                        .await
2480                }
2481                LogicalPlan::CreateVectorIndex {
2482                    config,
2483                    if_not_exists,
2484                } => {
2485                    if if_not_exists && self.index_exists_by_name(&config.name) {
2486                        return Ok(vec![]);
2487                    }
2488                    let idx_mgr = self.storage.index_manager();
2489                    idx_mgr.create_vector_index(config).await?;
2490                    Ok(vec![])
2491                }
2492                LogicalPlan::CreateFullTextIndex {
2493                    config,
2494                    if_not_exists,
2495                } => {
2496                    if if_not_exists && self.index_exists_by_name(&config.name) {
2497                        return Ok(vec![]);
2498                    }
2499                    let idx_mgr = self.storage.index_manager();
2500                    idx_mgr.create_fts_index(config).await?;
2501                    Ok(vec![])
2502                }
2503                LogicalPlan::CreateScalarIndex {
2504                    mut config,
2505                    if_not_exists,
2506                } => {
2507                    if if_not_exists && self.index_exists_by_name(&config.name) {
2508                        return Ok(vec![]);
2509                    }
2510
2511                    // Check for expression indexes - create generated columns
2512                    let mut modified_properties = Vec::new();
2513
2514                    for prop in &config.properties {
2515                        // Heuristic: if contains '(' and ')', it's an expression
2516                        if prop.contains('(') && prop.contains(')') {
2517                            let gen_col = SchemaManager::generated_column_name(prop);
2518
2519                            // Add generated property to schema
2520                            let sm = self.storage.schema_manager_arc();
2521                            if let Err(e) = sm.add_generated_property(
2522                                &config.label,
2523                                &gen_col,
2524                                DataType::String, // Default type for expressions
2525                                prop.clone(),
2526                            ) {
2527                                log::warn!("Failed to add generated property (might exist): {}", e);
2528                            }
2529
2530                            modified_properties.push(gen_col);
2531                        } else {
2532                            // Simple property - use as-is
2533                            modified_properties.push(prop.clone());
2534                        }
2535                    }
2536
2537                    config.properties = modified_properties;
2538
2539                    let idx_mgr = self.storage.index_manager();
2540                    idx_mgr.create_scalar_index(config).await?;
2541                    Ok(vec![])
2542                }
2543                LogicalPlan::CreateJsonFtsIndex {
2544                    config,
2545                    if_not_exists,
2546                } => {
2547                    if if_not_exists && self.index_exists_by_name(&config.name) {
2548                        return Ok(vec![]);
2549                    }
2550                    let idx_mgr = self.storage.index_manager();
2551                    idx_mgr.create_json_fts_index(config).await?;
2552                    Ok(vec![])
2553                }
2554                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2555                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2556                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2557                LogicalPlan::Vacuum => {
2558                    self.execute_vacuum().await?;
2559                    Ok(vec![])
2560                }
2561                LogicalPlan::Checkpoint => {
2562                    self.execute_checkpoint().await?;
2563                    Ok(vec![])
2564                }
2565                LogicalPlan::CopyTo {
2566                    label,
2567                    path,
2568                    format,
2569                    options,
2570                } => {
2571                    let count = self
2572                        .execute_copy_to(&label, &path, &format, &options)
2573                        .await?;
2574                    let mut result = HashMap::new();
2575                    result.insert("count".to_string(), Value::Int(count as i64));
2576                    Ok(vec![result])
2577                }
2578                LogicalPlan::CopyFrom {
2579                    label,
2580                    path,
2581                    format,
2582                    options,
2583                } => {
2584                    let count = self
2585                        .execute_copy_from(&label, &path, &format, &options)
2586                        .await?;
2587                    let mut result = HashMap::new();
2588                    result.insert("count".to_string(), Value::Int(count as i64));
2589                    Ok(vec![result])
2590                }
2591                LogicalPlan::CreateLabel(clause) => {
2592                    self.execute_create_label(clause).await?;
2593                    Ok(vec![])
2594                }
2595                LogicalPlan::CreateEdgeType(clause) => {
2596                    self.execute_create_edge_type(clause).await?;
2597                    Ok(vec![])
2598                }
2599                LogicalPlan::AlterLabel(clause) => {
2600                    self.execute_alter_label(clause).await?;
2601                    Ok(vec![])
2602                }
2603                LogicalPlan::AlterEdgeType(clause) => {
2604                    self.execute_alter_edge_type(clause).await?;
2605                    Ok(vec![])
2606                }
2607                LogicalPlan::DropLabel(clause) => {
2608                    self.execute_drop_label(clause).await?;
2609                    Ok(vec![])
2610                }
2611                LogicalPlan::DropEdgeType(clause) => {
2612                    self.execute_drop_edge_type(clause).await?;
2613                    Ok(vec![])
2614                }
2615                LogicalPlan::CreateConstraint(clause) => {
2616                    self.execute_create_constraint(clause).await?;
2617                    Ok(vec![])
2618                }
2619                LogicalPlan::DropConstraint(clause) => {
2620                    self.execute_drop_constraint(clause).await?;
2621                    Ok(vec![])
2622                }
2623                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2624                LogicalPlan::DropIndex { name, if_exists } => {
2625                    let idx_mgr = self.storage.index_manager();
2626                    match idx_mgr.drop_index(&name).await {
2627                        Ok(_) => Ok(vec![]),
2628                        Err(e) => {
2629                            if if_exists && e.to_string().contains("not found") {
2630                                Ok(vec![])
2631                            } else {
2632                                Err(e)
2633                            }
2634                        }
2635                    }
2636                }
2637                LogicalPlan::ShowIndexes { filter } => {
2638                    Ok(self.execute_show_indexes(filter.as_deref()))
2639                }
2640                // Scan/traverse nodes: delegate to DataFusion for data access,
2641                // then convert results to HashMaps for the fallback executor.
2642                LogicalPlan::Scan { .. }
2643                | LogicalPlan::FusedIndexScan { .. }
2644                | LogicalPlan::FusedIndexScanWrapped { .. }
2645                | LogicalPlan::ExtIdLookup { .. }
2646                | LogicalPlan::ScanAll { .. }
2647                | LogicalPlan::ScanMainByLabels { .. }
2648                | LogicalPlan::Traverse { .. }
2649                | LogicalPlan::TraverseMainByType { .. } => {
2650                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2651                    self.record_batches_to_rows(batches)
2652                }
2653                LogicalPlan::Filter {
2654                    input,
2655                    predicate,
2656                    optional_variables,
2657                } => {
2658                    let input_matches = self
2659                        .execute_subplan(*input, prop_manager, params, ctx)
2660                        .await?;
2661
2662                    tracing::debug!(
2663                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2664                        predicate,
2665                        input_matches.len(),
2666                        optional_variables
2667                    );
2668
2669                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2670                    // Group rows by non-optional variables, apply filter, and ensure
2671                    // at least one row per group (with NULLs if filter removes all).
2672                    if !optional_variables.is_empty() {
2673                        // Helper to check if a key belongs to an optional variable.
2674                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2675                        let is_optional_key = |k: &str| -> bool {
2676                            optional_variables.contains(k)
2677                                || optional_variables
2678                                    .iter()
2679                                    .any(|var| k.starts_with(&format!("{}.", var)))
2680                        };
2681
2682                        // Helper to check if a key is internal (should not affect grouping)
2683                        let is_internal_key =
2684                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2685
2686                        // Compute the key (non-optional, non-internal variables) for grouping
2687                        let non_optional_vars: Vec<String> = input_matches
2688                            .first()
2689                            .map(|row| {
2690                                row.keys()
2691                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2692                                    .cloned()
2693                                    .collect()
2694                            })
2695                            .unwrap_or_default();
2696
2697                        // Group rows by their non-optional variable values
2698                        let mut groups: std::collections::HashMap<
2699                            Vec<u8>,
2700                            Vec<HashMap<String, Value>>,
2701                        > = std::collections::HashMap::new();
2702
2703                        for row in &input_matches {
2704                            // Create a key from non-optional variable values
2705                            let key: Vec<u8> = non_optional_vars
2706                                .iter()
2707                                .map(|var| {
2708                                    row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2709                                })
2710                                .collect::<Vec<_>>()
2711                                .join("|")
2712                                .into_bytes();
2713
2714                            groups.entry(key).or_default().push(row.clone());
2715                        }
2716
2717                        let mut filtered = Vec::new();
2718                        for (_key, group_rows) in groups {
2719                            let mut group_passed = Vec::new();
2720
2721                            for row in &group_rows {
2722                                // If optional variables are already NULL, preserve the row
2723                                let has_null_optional = optional_variables.iter().any(|var| {
2724                                    // Check both "var" and "var._vid" style keys
2725                                    let direct_null =
2726                                        matches!(row.get(var), Some(Value::Null) | None);
2727                                    let prefixed_null = row
2728                                        .keys()
2729                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2730                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2731                                    direct_null || prefixed_null
2732                                });
2733
2734                                if has_null_optional {
2735                                    group_passed.push(row.clone());
2736                                    continue;
2737                                }
2738
2739                                let res = self
2740                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2741                                    .await?;
2742
2743                                if res.as_bool().unwrap_or(false) {
2744                                    group_passed.push(row.clone());
2745                                }
2746                            }
2747
2748                            if group_passed.is_empty() {
2749                                // No rows passed - emit one row with NULLs for optional variables
2750                                // Use the first row's non-optional values as a template
2751                                if let Some(template) = group_rows.first() {
2752                                    let mut null_row = HashMap::new();
2753                                    for (k, v) in template {
2754                                        if is_optional_key(k) {
2755                                            null_row.insert(k.clone(), Value::Null);
2756                                        } else {
2757                                            null_row.insert(k.clone(), v.clone());
2758                                        }
2759                                    }
2760                                    filtered.push(null_row);
2761                                }
2762                            } else {
2763                                filtered.extend(group_passed);
2764                            }
2765                        }
2766
2767                        tracing::debug!(
2768                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2769                            input_matches.len(),
2770                            filtered.len()
2771                        );
2772
2773                        return Ok(filtered);
2774                    }
2775
2776                    // Standard filter for non-OPTIONAL MATCH
2777                    let mut filtered = Vec::new();
2778                    for row in input_matches.iter() {
2779                        let res = self
2780                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2781                            .await?;
2782
2783                        let passes = res.as_bool().unwrap_or(false);
2784
2785                        if passes {
2786                            filtered.push(row.clone());
2787                        }
2788                    }
2789
2790                    tracing::debug!(
2791                        "Filter: {} input rows -> {} output rows",
2792                        input_matches.len(),
2793                        filtered.len()
2794                    );
2795
2796                    Ok(filtered)
2797                }
2798                LogicalPlan::ProcedureCall {
2799                    procedure_name,
2800                    arguments,
2801                    yield_items,
2802                } => {
2803                    let yield_names: Vec<String> =
2804                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2805                    let results = self
2806                        .execute_procedure(
2807                            &procedure_name,
2808                            &arguments,
2809                            &yield_names,
2810                            prop_manager,
2811                            params,
2812                            ctx,
2813                        )
2814                        .await?;
2815
2816                    // Handle aliasing: collect all original values first, then
2817                    // build the aliased row in one pass. This avoids issues when
2818                    // an alias matches another yield item's original name (e.g.,
2819                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2820                    // clobber the original "b" before it is renamed to "d").
2821                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2822                    if !has_aliases {
2823                        // No aliases (includes YIELD * which produces empty yield_items) —
2824                        // pass through the procedure output rows unchanged.
2825                        Ok(results)
2826                    } else {
2827                        let mut aliased_results = Vec::with_capacity(results.len());
2828                        for row in results {
2829                            let mut new_row = HashMap::new();
2830                            for (name, alias) in &yield_items {
2831                                let col_name = alias.as_ref().unwrap_or(name);
2832                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2833                                new_row.insert(col_name.clone(), val);
2834                            }
2835                            aliased_results.push(new_row);
2836                        }
2837                        Ok(aliased_results)
2838                    }
2839                }
2840                LogicalPlan::VectorKnn { .. } => {
2841                    unreachable!("VectorKnn is handled by DataFusion engine")
2842                }
2843                LogicalPlan::InvertedIndexLookup { .. } => {
2844                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2845                }
2846                LogicalPlan::Sort { input, order_by } => {
2847                    let rows = self
2848                        .execute_subplan(*input, prop_manager, params, ctx)
2849                        .await?;
2850                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2851                        .await
2852                }
2853                LogicalPlan::Limit { input, skip, fetch } => {
2854                    let rows = self
2855                        .execute_subplan(*input, prop_manager, params, ctx)
2856                        .await?;
2857                    let skip = skip.unwrap_or(0);
2858                    let take = fetch.unwrap_or(usize::MAX);
2859                    Ok(rows.into_iter().skip(skip).take(take).collect())
2860                }
2861                LogicalPlan::Aggregate {
2862                    input,
2863                    group_by,
2864                    aggregates,
2865                } => {
2866                    let rows = self
2867                        .execute_subplan(*input, prop_manager, params, ctx)
2868                        .await?;
2869                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2870                        .await
2871                }
2872                LogicalPlan::Window {
2873                    input,
2874                    window_exprs,
2875                } => {
2876                    let rows = self
2877                        .execute_subplan(*input, prop_manager, params, ctx)
2878                        .await?;
2879                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2880                        .await
2881                }
2882                LogicalPlan::Project { input, projections } => {
2883                    let matches = self
2884                        .execute_subplan(*input, prop_manager, params, ctx)
2885                        .await?;
2886                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2887                        .await
2888                }
2889                LogicalPlan::Distinct { input } => {
2890                    let rows = self
2891                        .execute_subplan(*input, prop_manager, params, ctx)
2892                        .await?;
2893                    let mut seen = std::collections::HashSet::new();
2894                    let mut result = Vec::new();
2895                    for row in rows {
2896                        let key = Self::canonical_row_key(&row);
2897                        if seen.insert(key) {
2898                            result.push(row);
2899                        }
2900                    }
2901                    Ok(result)
2902                }
2903                LogicalPlan::Unwind {
2904                    input,
2905                    expr,
2906                    variable,
2907                } => {
2908                    let input_rows = self
2909                        .execute_subplan(*input, prop_manager, params, ctx)
2910                        .await?;
2911                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2912                        .await
2913                }
2914                LogicalPlan::Apply {
2915                    input,
2916                    subquery,
2917                    input_filter,
2918                } => {
2919                    let input_rows = self
2920                        .execute_subplan(*input, prop_manager, params, ctx)
2921                        .await?;
2922                    self.execute_apply(
2923                        input_rows,
2924                        &subquery,
2925                        input_filter.as_ref(),
2926                        prop_manager,
2927                        params,
2928                        ctx,
2929                    )
2930                    .await
2931                }
2932                LogicalPlan::SubqueryCall { input, subquery } => {
2933                    let input_rows = self
2934                        .execute_subplan(*input, prop_manager, params, ctx)
2935                        .await?;
2936                    // Execute subquery for each input row (correlated)
2937                    // No input_filter for CALL { }
2938                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2939                        .await
2940                }
2941                LogicalPlan::RecursiveCTE {
2942                    cte_name,
2943                    initial,
2944                    recursive,
2945                } => {
2946                    self.execute_recursive_cte(
2947                        &cte_name,
2948                        *initial,
2949                        *recursive,
2950                        prop_manager,
2951                        params,
2952                        ctx,
2953                    )
2954                    .await
2955                }
2956                LogicalPlan::CrossJoin { left, right } => {
2957                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2958                        .await
2959                }
2960                LogicalPlan::Set { .. }
2961                | LogicalPlan::Remove { .. }
2962                | LogicalPlan::Merge { .. }
2963                | LogicalPlan::Create { .. }
2964                | LogicalPlan::CreateBatch { .. } => {
2965                    unreachable!("mutations are handled by DataFusion engine")
2966                }
2967                LogicalPlan::Delete { .. } => {
2968                    unreachable!("mutations are handled by DataFusion engine")
2969                }
2970                LogicalPlan::Copy {
2971                    target,
2972                    source,
2973                    is_export,
2974                    options,
2975                } => {
2976                    if is_export {
2977                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2978                            .await
2979                    } else {
2980                        self.execute_copy(&target, &source, &options, prop_manager)
2981                            .await
2982                    }
2983                }
2984                LogicalPlan::Backup {
2985                    destination,
2986                    options,
2987                } => self.execute_backup(&destination, &options).await,
2988                LogicalPlan::Explain { plan } => {
2989                    let plan_str = format!("{:#?}", plan);
2990                    let mut row = HashMap::new();
2991                    row.insert("plan".to_string(), Value::String(plan_str));
2992                    Ok(vec![row])
2993                }
2994                LogicalPlan::ShortestPath { .. } => {
2995                    unreachable!("ShortestPath is handled by DataFusion engine")
2996                }
2997                LogicalPlan::AllShortestPaths { .. } => {
2998                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2999                }
3000                LogicalPlan::Foreach { .. } => {
3001                    unreachable!("mutations are handled by DataFusion engine")
3002                }
3003                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
3004                LogicalPlan::BindZeroLengthPath { .. } => {
3005                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
3006                }
3007                LogicalPlan::BindPath { .. } => {
3008                    unreachable!("BindPath is handled by DataFusion engine")
3009                }
3010                LogicalPlan::QuantifiedPattern { .. } => {
3011                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
3012                }
3013                LogicalPlan::LocyProgram { .. }
3014                | LogicalPlan::LocyFold { .. }
3015                | LogicalPlan::LocyBestBy { .. }
3016                | LogicalPlan::LocyPriority { .. }
3017                | LogicalPlan::LocyDerivedScan { .. }
3018                | LogicalPlan::LocyProject { .. }
3019                | LogicalPlan::LocyModelInvoke { .. } => {
3020                    unreachable!("Locy operators are handled by DataFusion engine")
3021                }
3022            }
3023        })
3024    }
3025
3026    /// Execute a single plan from a FOREACH body with the given scope.
3027    ///
3028    /// Used by the DataFusion ForeachExec operator to delegate body clause
3029    /// execution back to the executor.
3030    #[expect(clippy::too_many_arguments)]
3031    pub(crate) async fn execute_foreach_body_plan(
3032        &self,
3033        plan: LogicalPlan,
3034        scope: &mut HashMap<String, Value>,
3035        writer: &uni_store::runtime::writer::Writer,
3036        prop_manager: &PropertyManager,
3037        params: &HashMap<String, Value>,
3038        ctx: Option<&QueryContext>,
3039        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3040    ) -> Result<()> {
3041        match plan {
3042            LogicalPlan::Set { items, .. } => {
3043                self.execute_set_items_locked(
3044                    &items,
3045                    scope,
3046                    writer,
3047                    prop_manager,
3048                    params,
3049                    ctx,
3050                    tx_l0,
3051                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3052                )
3053                .await?;
3054            }
3055            LogicalPlan::Remove { items, .. } => {
3056                self.execute_remove_items_locked(
3057                    &items,
3058                    scope,
3059                    writer,
3060                    prop_manager,
3061                    ctx,
3062                    tx_l0,
3063                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3064                )
3065                .await?;
3066            }
3067            LogicalPlan::Delete { items, detach, .. } => {
3068                for expr in &items {
3069                    let val = self
3070                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
3071                        .await?;
3072                    self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3073                        .await?;
3074                }
3075            }
3076            LogicalPlan::Create { pattern, .. } => {
3077                self.execute_create_pattern(
3078                    &pattern,
3079                    scope,
3080                    writer,
3081                    prop_manager,
3082                    params,
3083                    ctx,
3084                    tx_l0,
3085                    None,
3086                )
3087                .await?;
3088            }
3089            LogicalPlan::CreateBatch { patterns, .. } => {
3090                for pattern in &patterns {
3091                    self.execute_create_pattern(
3092                        pattern,
3093                        scope,
3094                        writer,
3095                        prop_manager,
3096                        params,
3097                        ctx,
3098                        tx_l0,
3099                        None,
3100                    )
3101                    .await?;
3102                }
3103            }
3104            LogicalPlan::Merge {
3105                pattern,
3106                on_match: _,
3107                on_create,
3108                ..
3109            } => {
3110                // Fold ON CREATE SET so a NOT-NULL property set only by
3111                // ON CREATE SET passes create-time validation (RC4); the
3112                // post-create SET below settles the final values.
3113                let seed_props = self
3114                    .on_create_seed_props(on_create.as_ref(), scope, prop_manager, params, ctx)
3115                    .await?;
3116                self.execute_create_pattern(
3117                    &pattern,
3118                    scope,
3119                    writer,
3120                    prop_manager,
3121                    params,
3122                    ctx,
3123                    tx_l0,
3124                    Some(&seed_props),
3125                )
3126                .await?;
3127                if let Some(on_create_clause) = on_create {
3128                    self.execute_set_items_locked(
3129                        &on_create_clause.items,
3130                        scope,
3131                        writer,
3132                        prop_manager,
3133                        params,
3134                        ctx,
3135                        tx_l0,
3136                        &crate::query::df_graph::mutation_common::Prefetch::default(),
3137                    )
3138                    .await?;
3139                }
3140            }
3141            LogicalPlan::Foreach {
3142                variable,
3143                list,
3144                body,
3145                ..
3146            } => {
3147                let list_val = self
3148                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
3149                    .await?;
3150                let items = match list_val {
3151                    Value::List(arr) => arr,
3152                    Value::Null => return Ok(()),
3153                    _ => return Err(anyhow!("FOREACH requires a list")),
3154                };
3155                for item in items {
3156                    let mut nested_scope = scope.clone();
3157                    nested_scope.insert(variable.clone(), item);
3158                    for nested_plan in &body {
3159                        Box::pin(self.execute_foreach_body_plan(
3160                            nested_plan.clone(),
3161                            &mut nested_scope,
3162                            writer,
3163                            prop_manager,
3164                            params,
3165                            ctx,
3166                            tx_l0,
3167                        ))
3168                        .await?;
3169                    }
3170                }
3171            }
3172            _ => {
3173                return Err(anyhow!(
3174                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3175                ));
3176            }
3177        }
3178        Ok(())
3179    }
3180
3181    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3182        let mut pairs: Vec<_> = row.iter().collect();
3183        pairs.sort_by_key(|(k, _)| *k);
3184
3185        pairs
3186            .into_iter()
3187            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3188            .collect::<Vec<_>>()
3189            .join("|")
3190    }
3191
3192    fn canonical_value_key(v: &Value) -> String {
3193        match v {
3194            Value::Null => "null".to_string(),
3195            Value::Bool(b) => format!("b:{b}"),
3196            Value::Int(i) => format!("n:{i}"),
3197            Value::Float(f) => {
3198                if f.is_nan() {
3199                    "nan".to_string()
3200                } else if f.is_infinite() {
3201                    if f.is_sign_positive() {
3202                        "inf:+".to_string()
3203                    } else {
3204                        "inf:-".to_string()
3205                    }
3206                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3207                    format!("n:{}", *f as i64)
3208                } else {
3209                    format!("f:{f}")
3210                }
3211            }
3212            Value::String(s) => {
3213                if let Some(k) = Self::temporal_string_key(s) {
3214                    format!("temporal:{k}")
3215                } else {
3216                    format!("s:{s}")
3217                }
3218            }
3219            Value::Bytes(b) => format!("bytes:{:?}", b),
3220            Value::List(items) => format!(
3221                "list:[{}]",
3222                items
3223                    .iter()
3224                    .map(Self::canonical_value_key)
3225                    .collect::<Vec<_>>()
3226                    .join(",")
3227            ),
3228            Value::Map(map) => {
3229                let mut pairs: Vec<_> = map.iter().collect();
3230                pairs.sort_by_key(|(k, _)| *k);
3231                format!(
3232                    "map:{{{}}}",
3233                    pairs
3234                        .into_iter()
3235                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3236                        .collect::<Vec<_>>()
3237                        .join(",")
3238                )
3239            }
3240            Value::Node(n) => {
3241                let mut labels = n.labels.clone();
3242                labels.sort();
3243                format!(
3244                    "node:{}:{}:{}",
3245                    n.vid.as_u64(),
3246                    labels.join(":"),
3247                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3248                )
3249            }
3250            Value::Edge(e) => format!(
3251                "edge:{}:{}:{}:{}:{}",
3252                e.eid.as_u64(),
3253                e.edge_type,
3254                e.src.as_u64(),
3255                e.dst.as_u64(),
3256                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3257            ),
3258            Value::Path(p) => format!(
3259                "path:nodes=[{}];edges=[{}]",
3260                p.nodes
3261                    .iter()
3262                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3263                    .collect::<Vec<_>>()
3264                    .join(","),
3265                p.edges
3266                    .iter()
3267                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3268                    .collect::<Vec<_>>()
3269                    .join(",")
3270            ),
3271            Value::Vector(vs) => format!("vec:{:?}", vs),
3272            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3273            _ => format!("{v:?}"),
3274        }
3275    }
3276
3277    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3278        match t {
3279            uni_common::TemporalValue::Date { days_since_epoch } => {
3280                format!("date:{days_since_epoch}")
3281            }
3282            uni_common::TemporalValue::LocalTime {
3283                nanos_since_midnight,
3284            } => format!("localtime:{nanos_since_midnight}"),
3285            uni_common::TemporalValue::Time {
3286                nanos_since_midnight,
3287                offset_seconds,
3288            } => {
3289                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3290                format!("time:{utc_nanos}")
3291            }
3292            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3293                format!("localdatetime:{nanos_since_epoch}")
3294            }
3295            uni_common::TemporalValue::DateTime {
3296                nanos_since_epoch, ..
3297            } => format!("datetime:{nanos_since_epoch}"),
3298            uni_common::TemporalValue::Duration {
3299                months,
3300                days,
3301                nanos,
3302            } => format!("duration:{months}:{days}:{nanos}"),
3303            uni_common::TemporalValue::Btic { lo, hi, meta } => {
3304                format!("btic:{lo}:{hi}:{meta}")
3305            }
3306        }
3307    }
3308
3309    fn temporal_string_key(s: &str) -> Option<String> {
3310        let fn_name = match classify_temporal(s)? {
3311            uni_common::TemporalType::Date => "DATE",
3312            uni_common::TemporalType::LocalTime => "LOCALTIME",
3313            uni_common::TemporalType::Time => "TIME",
3314            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3315            uni_common::TemporalType::DateTime => "DATETIME",
3316            uni_common::TemporalType::Duration => "DURATION",
3317            uni_common::TemporalType::Btic => return None, // BTIC uses dedicated parsing
3318        };
3319        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3320            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3321            _ => None,
3322        }
3323    }
3324
3325    /// Execute aggregate operation: GROUP BY + aggregate functions.
3326    /// Interval for timeout checks in aggregate loops.
3327    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3328
3329    pub(crate) async fn execute_aggregate(
3330        &self,
3331        rows: Vec<HashMap<String, Value>>,
3332        group_by: &[Expr],
3333        aggregates: &[Expr],
3334        prop_manager: &PropertyManager,
3335        params: &HashMap<String, Value>,
3336        ctx: Option<&QueryContext>,
3337    ) -> Result<Vec<HashMap<String, Value>>> {
3338        // CWE-400: Check timeout before aggregation
3339        if let Some(ctx) = ctx {
3340            ctx.check_timeout()?;
3341        }
3342
3343        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3344
3345        // Cypher semantics: aggregation without grouping keys returns one row even
3346        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3347        if rows.is_empty() {
3348            if group_by.is_empty() {
3349                let accs = Self::create_accumulators(aggregates);
3350                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3351                return Ok(vec![row]);
3352            }
3353            return Ok(vec![]);
3354        }
3355
3356        for (idx, row) in rows.into_iter().enumerate() {
3357            // Periodic timeout check during aggregation
3358            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3359                && let Some(ctx) = ctx
3360            {
3361                ctx.check_timeout()?;
3362            }
3363
3364            let key_vals = self
3365                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3366                .await?;
3367            // Build a canonical key so grouping follows Cypher value semantics
3368            // (e.g. temporal equality by instant, numeric normalization where applicable).
3369            let key_str = format!(
3370                "[{}]",
3371                key_vals
3372                    .iter()
3373                    .map(Self::canonical_value_key)
3374                    .collect::<Vec<_>>()
3375                    .join(",")
3376            );
3377
3378            let entry = groups
3379                .entry(key_str)
3380                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3381
3382            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3383                .await?;
3384        }
3385
3386        let results = groups
3387            .values()
3388            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3389            .collect();
3390
3391        Ok(results)
3392    }
3393
3394    pub(crate) async fn execute_window(
3395        &self,
3396        mut rows: Vec<HashMap<String, Value>>,
3397        window_exprs: &[Expr],
3398        _prop_manager: &PropertyManager,
3399        _params: &HashMap<String, Value>,
3400        ctx: Option<&QueryContext>,
3401    ) -> Result<Vec<HashMap<String, Value>>> {
3402        // CWE-400: Check timeout before window computation
3403        if let Some(ctx) = ctx {
3404            ctx.check_timeout()?;
3405        }
3406
3407        // If no rows or no window expressions, return as-is
3408        if rows.is_empty() || window_exprs.is_empty() {
3409            return Ok(rows);
3410        }
3411
3412        // Process each window function expression
3413        for window_expr in window_exprs {
3414            // Extract window function details
3415            let Expr::FunctionCall {
3416                name,
3417                args,
3418                window_spec: Some(window_spec),
3419                ..
3420            } = window_expr
3421            else {
3422                return Err(anyhow!(
3423                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3424                    window_expr
3425                ));
3426            };
3427
3428            let name_upper = name.to_uppercase();
3429
3430            // Validate it's a supported window function
3431            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3432                return Err(anyhow!(
3433                    "Unsupported window function: {}. Supported functions: {}",
3434                    name,
3435                    WINDOW_FUNCTIONS.join(", ")
3436                ));
3437            }
3438
3439            // Build partition groups based on PARTITION BY clause
3440            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3441
3442            for (row_idx, row) in rows.iter().enumerate() {
3443                // Evaluate partition key
3444                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3445                    // No partitioning - all rows in one partition
3446                    vec![]
3447                } else {
3448                    window_spec
3449                        .partition_by
3450                        .iter()
3451                        .map(|expr| self.evaluate_simple_expr(expr, row))
3452                        .collect::<Result<Vec<_>>>()?
3453                };
3454
3455                partition_map
3456                    .entry(partition_key)
3457                    .or_default()
3458                    .push(row_idx);
3459            }
3460
3461            // Process each partition
3462            for (_partition_key, row_indices) in partition_map.iter_mut() {
3463                // Sort rows within partition by ORDER BY clause
3464                if !window_spec.order_by.is_empty() {
3465                    row_indices.sort_by(|&a, &b| {
3466                        for sort_item in &window_spec.order_by {
3467                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3468                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3469
3470                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3471                                let cmp = Executor::compare_values(&va, &vb);
3472                                let cmp = if sort_item.ascending {
3473                                    cmp
3474                                } else {
3475                                    cmp.reverse()
3476                                };
3477                                if cmp != std::cmp::Ordering::Equal {
3478                                    return cmp;
3479                                }
3480                            }
3481                        }
3482                        std::cmp::Ordering::Equal
3483                    });
3484                }
3485
3486                // Compute window function values for this partition
3487                for (position, &row_idx) in row_indices.iter().enumerate() {
3488                    let window_value = match name_upper.as_str() {
3489                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3490                        "RANK" => {
3491                            // RANK: position (1-indexed) of first row in group of tied rows
3492                            let rank = if position == 0 {
3493                                1i64
3494                            } else {
3495                                let prev_row_idx = row_indices[position - 1];
3496                                let same_as_prev = self.rows_have_same_sort_keys(
3497                                    &window_spec.order_by,
3498                                    &rows,
3499                                    row_idx,
3500                                    prev_row_idx,
3501                                );
3502
3503                                if same_as_prev {
3504                                    // Walk backwards to find where this group started
3505                                    let mut group_start = position - 1;
3506                                    while group_start > 0 {
3507                                        let curr_idx = row_indices[group_start];
3508                                        let prev_idx = row_indices[group_start - 1];
3509                                        if !self.rows_have_same_sort_keys(
3510                                            &window_spec.order_by,
3511                                            &rows,
3512                                            curr_idx,
3513                                            prev_idx,
3514                                        ) {
3515                                            break;
3516                                        }
3517                                        group_start -= 1;
3518                                    }
3519                                    (group_start + 1) as i64
3520                                } else {
3521                                    (position + 1) as i64
3522                                }
3523                            };
3524                            Value::from(rank)
3525                        }
3526                        "DENSE_RANK" => {
3527                            // Dense rank: continuous ranking without gaps
3528                            let mut dense_rank = 1i64;
3529                            for i in 0..position {
3530                                let curr_idx = row_indices[i + 1];
3531                                let prev_idx = row_indices[i];
3532                                if !self.rows_have_same_sort_keys(
3533                                    &window_spec.order_by,
3534                                    &rows,
3535                                    curr_idx,
3536                                    prev_idx,
3537                                ) {
3538                                    dense_rank += 1;
3539                                }
3540                            }
3541                            Value::from(dense_rank)
3542                        }
3543                        "LAG" => {
3544                            let (value_expr, offset, default_value) =
3545                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3546
3547                            if position >= offset {
3548                                let target_idx = row_indices[position - offset];
3549                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3550                            } else {
3551                                default_value
3552                            }
3553                        }
3554                        "LEAD" => {
3555                            let (value_expr, offset, default_value) =
3556                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3557
3558                            if position + offset < row_indices.len() {
3559                                let target_idx = row_indices[position + offset];
3560                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3561                            } else {
3562                                default_value
3563                            }
3564                        }
3565                        "NTILE" => {
3566                            // Extract num_buckets argument: NTILE(num_buckets)
3567                            let num_buckets_expr = args.first().ok_or_else(|| {
3568                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3569                            })?;
3570                            let num_buckets_val =
3571                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3572                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3573                                anyhow!(
3574                                    "NTILE argument must be an integer, got: {:?}",
3575                                    num_buckets_val
3576                                )
3577                            })?;
3578
3579                            if num_buckets <= 0 {
3580                                return Err(anyhow!(
3581                                    "NTILE bucket count must be positive, got: {}",
3582                                    num_buckets
3583                                ));
3584                            }
3585
3586                            let num_buckets = num_buckets as usize;
3587                            let partition_size = row_indices.len();
3588
3589                            // Calculate bucket assignment using standard algorithm
3590                            // For N rows and B buckets:
3591                            // - Base size: N / B
3592                            // - Extra rows: N % B (go to first buckets)
3593                            let base_size = partition_size / num_buckets;
3594                            let extra_rows = partition_size % num_buckets;
3595
3596                            // Determine bucket for current row
3597                            let bucket = if position < extra_rows * (base_size + 1) {
3598                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3599                                position / (base_size + 1) + 1
3600                            } else {
3601                                // Row is in one of the normal-sized buckets
3602                                let adjusted_position = position - extra_rows * (base_size + 1);
3603                                extra_rows + (adjusted_position / base_size) + 1
3604                            };
3605
3606                            Value::from(bucket as i64)
3607                        }
3608                        "FIRST_VALUE" => {
3609                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3610                            let value_expr = args.first().ok_or_else(|| {
3611                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3612                            })?;
3613
3614                            // Get the first row in the partition (after ordering)
3615                            if row_indices.is_empty() {
3616                                Value::Null
3617                            } else {
3618                                let first_idx = row_indices[0];
3619                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3620                            }
3621                        }
3622                        "LAST_VALUE" => {
3623                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3624                            let value_expr = args.first().ok_or_else(|| {
3625                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3626                            })?;
3627
3628                            // Get the last row in the partition (after ordering)
3629                            if row_indices.is_empty() {
3630                                Value::Null
3631                            } else {
3632                                let last_idx = row_indices[row_indices.len() - 1];
3633                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3634                            }
3635                        }
3636                        "NTH_VALUE" => {
3637                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3638                            if args.len() != 2 {
3639                                return Err(anyhow!(
3640                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3641                                ));
3642                            }
3643
3644                            let value_expr = &args[0];
3645                            let n_expr = &args[1];
3646
3647                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3648                            let n = n_val.as_i64().ok_or_else(|| {
3649                                anyhow!(
3650                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3651                                    n_val
3652                                )
3653                            })?;
3654
3655                            if n <= 0 {
3656                                return Err(anyhow!(
3657                                    "NTH_VALUE position must be positive, got: {}",
3658                                    n
3659                                ));
3660                            }
3661
3662                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3663                            if nth_index < row_indices.len() {
3664                                let nth_idx = row_indices[nth_index];
3665                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3666                            } else {
3667                                Value::Null
3668                            }
3669                        }
3670                        _ => unreachable!("Window function {} already validated", name),
3671                    };
3672
3673                    // Add window function result to row
3674                    // Use the window expression's string representation as the column name
3675                    let col_name = window_expr.to_string_repr();
3676                    rows[row_idx].insert(col_name, window_value);
3677                }
3678            }
3679        }
3680
3681        Ok(rows)
3682    }
3683
3684    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3685    ///
3686    /// Uses `&self` for consistency with other evaluation methods, though it only
3687    /// recurses for property access.
3688    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3689        match expr {
3690            Expr::Variable(name) => row
3691                .get(name)
3692                .cloned()
3693                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3694            Expr::Property(base, prop) => {
3695                let base_val = self.evaluate_simple_expr(base, row)?;
3696                if let Value::Map(map) = base_val {
3697                    map.get(prop)
3698                        .cloned()
3699                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3700                } else {
3701                    Err(anyhow!("Cannot access property on non-object"))
3702                }
3703            }
3704            Expr::Literal(lit) => Ok(lit.to_value()),
3705            _ => Err(anyhow!(
3706                "Unsupported expression in window function: {:?}",
3707                expr
3708            )),
3709        }
3710    }
3711
3712    /// Check if two rows have matching sort keys for ranking functions.
3713    fn rows_have_same_sort_keys(
3714        &self,
3715        order_by: &[uni_cypher::ast::SortItem],
3716        rows: &[HashMap<String, Value>],
3717        idx_a: usize,
3718        idx_b: usize,
3719    ) -> bool {
3720        order_by.iter().all(|sort_item| {
3721            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3722            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3723            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3724        })
3725    }
3726
3727    /// Extract offset and default value for LAG/LEAD window functions.
3728    fn extract_lag_lead_params<'a>(
3729        &self,
3730        func_name: &str,
3731        args: &'a [Expr],
3732        row: &HashMap<String, Value>,
3733    ) -> Result<(&'a Expr, usize, Value)> {
3734        let value_expr = args.first().ok_or_else(|| {
3735            anyhow!(
3736                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3737                func_name,
3738                func_name
3739            )
3740        })?;
3741
3742        let offset = if let Some(offset_expr) = args.get(1) {
3743            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3744            offset_val.as_i64().ok_or_else(|| {
3745                anyhow!(
3746                    "{} offset must be an integer, got: {:?}",
3747                    func_name,
3748                    offset_val
3749                )
3750            })? as usize
3751        } else {
3752            1
3753        };
3754
3755        let default_value = if let Some(default_expr) = args.get(2) {
3756            self.evaluate_simple_expr(default_expr, row)?
3757        } else {
3758            Value::Null
3759        };
3760
3761        Ok((value_expr, offset, default_value))
3762    }
3763
3764    /// Evaluate group-by key expressions for a row.
3765    pub(crate) async fn evaluate_group_keys(
3766        &self,
3767        group_by: &[Expr],
3768        row: &HashMap<String, Value>,
3769        prop_manager: &PropertyManager,
3770        params: &HashMap<String, Value>,
3771        ctx: Option<&QueryContext>,
3772    ) -> Result<Vec<Value>> {
3773        let mut key_vals = Vec::new();
3774        for expr in group_by {
3775            key_vals.push(
3776                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3777                    .await?,
3778            );
3779        }
3780        Ok(key_vals)
3781    }
3782
3783    /// Update accumulators with values from the current row.
3784    pub(crate) async fn update_accumulators(
3785        &self,
3786        accs: &mut [Accumulator],
3787        aggregates: &[Expr],
3788        row: &HashMap<String, Value>,
3789        prop_manager: &PropertyManager,
3790        params: &HashMap<String, Value>,
3791        ctx: Option<&QueryContext>,
3792    ) -> Result<()> {
3793        for (i, agg_expr) in aggregates.iter().enumerate() {
3794            if let Expr::FunctionCall { args, .. } = agg_expr {
3795                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3796                let val = if is_wildcard {
3797                    Value::Null
3798                } else {
3799                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3800                        .await?
3801                };
3802                accs[i].update(&val, is_wildcard);
3803            }
3804        }
3805        Ok(())
3806    }
3807
3808    /// Execute sort operation with ORDER BY clauses.
3809    pub(crate) async fn execute_recursive_cte(
3810        &self,
3811        cte_name: &str,
3812        initial: LogicalPlan,
3813        recursive: LogicalPlan,
3814        prop_manager: &PropertyManager,
3815        params: &HashMap<String, Value>,
3816        ctx: Option<&QueryContext>,
3817    ) -> Result<Vec<HashMap<String, Value>>> {
3818        use std::collections::HashSet;
3819
3820        // Helper to create a stable key for cycle detection.
3821        // Uses sorted keys to ensure consistent ordering.
3822        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3823            let mut pairs: Vec<_> = row.iter().collect();
3824            pairs.sort_by(|a, b| a.0.cmp(b.0));
3825            format!("{pairs:?}")
3826        }
3827
3828        // 1. Execute Anchor
3829        let mut working_table = self
3830            .execute_subplan(initial, prop_manager, params, ctx)
3831            .await?;
3832        let mut result_table = working_table.clone();
3833
3834        // Track seen rows for cycle detection
3835        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3836
3837        // 2. Loop
3838        // Safety: Max iterations to prevent infinite loop
3839        let max_iterations = self.config.max_recursive_cte_iterations;
3840        for _iteration in 0..max_iterations {
3841            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3842            if let Some(ctx) = ctx {
3843                ctx.check_timeout()?;
3844            }
3845
3846            if working_table.is_empty() {
3847                break;
3848            }
3849
3850            // Bind working table to CTE name in params
3851            let working_val = Value::List(
3852                working_table
3853                    .iter()
3854                    .map(|row| {
3855                        if row.len() == 1 {
3856                            row.values().next().unwrap().clone()
3857                        } else {
3858                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3859                        }
3860                    })
3861                    .collect(),
3862            );
3863
3864            let mut next_params = params.clone();
3865            next_params.insert(cte_name.to_string(), working_val);
3866
3867            // Execute recursive part
3868            let next_result = self
3869                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3870                .await?;
3871
3872            if next_result.is_empty() {
3873                break;
3874            }
3875
3876            // Filter out already-seen rows (cycle detection)
3877            let new_rows: Vec<_> = next_result
3878                .into_iter()
3879                .filter(|row| {
3880                    let key = row_key(row);
3881                    seen.insert(key) // Returns false if already present
3882                })
3883                .collect();
3884
3885            if new_rows.is_empty() {
3886                // All results were cycles - terminate
3887                break;
3888            }
3889
3890            result_table.extend(new_rows.clone());
3891            working_table = new_rows;
3892        }
3893
3894        // Output accumulated results as a variable
3895        let final_list = Value::List(
3896            result_table
3897                .into_iter()
3898                .map(|row| {
3899                    // If the CTE returns a single column and we want to treat it as a list of values?
3900                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3901                    // Cypher LISTs usually contain values.
3902                    // If the row has 1 column, maybe unwrap?
3903                    // But SQL CTEs are tables.
3904                    // Let's stick to List<Map> for consistency with how we pass it in.
3905                    // UNLESS the user extracts it.
3906                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3907                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3908                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3909                    // If row has 1 col, users expect `[val, val]`.
3910                    if row.len() == 1 {
3911                        row.values().next().unwrap().clone()
3912                    } else {
3913                        Value::Map(row.into_iter().collect())
3914                    }
3915                })
3916                .collect(),
3917        );
3918
3919        let mut final_row = HashMap::new();
3920        final_row.insert(cte_name.to_string(), final_list);
3921        Ok(vec![final_row])
3922    }
3923
3924    /// Interval for timeout checks in sort loops.
3925    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3926
3927    pub(crate) async fn execute_sort(
3928        &self,
3929        rows: Vec<HashMap<String, Value>>,
3930        order_by: &[uni_cypher::ast::SortItem],
3931        prop_manager: &PropertyManager,
3932        params: &HashMap<String, Value>,
3933        ctx: Option<&QueryContext>,
3934    ) -> Result<Vec<HashMap<String, Value>>> {
3935        // CWE-400: Check timeout before potentially expensive sort
3936        if let Some(ctx) = ctx {
3937            ctx.check_timeout()?;
3938        }
3939
3940        let mut rows_with_keys = Vec::with_capacity(rows.len());
3941        for (idx, row) in rows.into_iter().enumerate() {
3942            // Periodic timeout check during key extraction
3943            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3944                && let Some(ctx) = ctx
3945            {
3946                ctx.check_timeout()?;
3947            }
3948
3949            let mut keys = Vec::new();
3950            for item in order_by {
3951                let val = row
3952                    .get(&item.expr.to_string_repr())
3953                    .cloned()
3954                    .unwrap_or(Value::Null);
3955                let val = if val.is_null() {
3956                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3957                        .await
3958                        .unwrap_or(Value::Null)
3959                } else {
3960                    val
3961                };
3962                keys.push(val);
3963            }
3964            rows_with_keys.push((row, keys));
3965        }
3966
3967        // Check timeout again before synchronous sort (can't be interrupted)
3968        if let Some(ctx) = ctx {
3969            ctx.check_timeout()?;
3970        }
3971
3972        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3973
3974        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3975    }
3976
3977    /// Create accumulators for aggregate expressions.
3978    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3979        aggregates
3980            .iter()
3981            .map(|expr| {
3982                if let Expr::FunctionCall { name, distinct, .. } = expr {
3983                    Accumulator::new(name, *distinct)
3984                } else {
3985                    Accumulator::new("COUNT", false)
3986                }
3987            })
3988            .collect()
3989    }
3990
3991    /// Build result row from group-by keys and accumulators.
3992    pub(crate) fn build_aggregate_result(
3993        group_by: &[Expr],
3994        aggregates: &[Expr],
3995        key_vals: &[Value],
3996        accs: &[Accumulator],
3997    ) -> HashMap<String, Value> {
3998        let mut res_row = HashMap::new();
3999        for (i, expr) in group_by.iter().enumerate() {
4000            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
4001        }
4002        for (i, expr) in aggregates.iter().enumerate() {
4003            // Use aggregate_column_name to ensure consistency with planner
4004            let col_name = crate::query::planner::aggregate_column_name(expr);
4005            res_row.insert(col_name, accs[i].finish());
4006        }
4007        res_row
4008    }
4009
4010    /// Compare and return ordering for sort operation.
4011    pub(crate) fn compare_sort_keys(
4012        a_keys: &[Value],
4013        b_keys: &[Value],
4014        order_by: &[uni_cypher::ast::SortItem],
4015    ) -> std::cmp::Ordering {
4016        for (i, item) in order_by.iter().enumerate() {
4017            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
4018            if order != std::cmp::Ordering::Equal {
4019                return if item.ascending {
4020                    order
4021                } else {
4022                    order.reverse()
4023                };
4024            }
4025        }
4026        std::cmp::Ordering::Equal
4027    }
4028
4029    /// Executes BACKUP command to local or cloud storage.
4030    ///
4031    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
4032    pub(crate) async fn execute_backup(
4033        &self,
4034        destination: &str,
4035        _options: &HashMap<String, Value>,
4036    ) -> Result<Vec<HashMap<String, Value>>> {
4037        // 1. Flush L0
4038        if let Some(writer_arc) = &self.writer {
4039            let writer: &uni_store::Writer = writer_arc.as_ref();
4040            writer.flush_to_l1(None).await?;
4041        }
4042
4043        // 2. Snapshot
4044        let snapshot_manager = self.storage.snapshot_manager();
4045        let snapshot = snapshot_manager
4046            .load_latest_snapshot()
4047            .await?
4048            .ok_or_else(|| anyhow!("No snapshot found"))?;
4049
4050        // 3. Copy files - cloud or local path
4051        if is_cloud_url(destination) {
4052            self.backup_to_cloud(destination, &snapshot.snapshot_id)
4053                .await?;
4054        } else {
4055            // Validate local destination path against sandbox
4056            let validated_dest = self.validate_path(destination)?;
4057            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4058                .await?;
4059        }
4060
4061        let mut res = HashMap::new();
4062        res.insert(
4063            "status".to_string(),
4064            Value::String("Backup completed".to_string()),
4065        );
4066        res.insert(
4067            "snapshot_id".to_string(),
4068            Value::String(snapshot.snapshot_id),
4069        );
4070        Ok(vec![res])
4071    }
4072
4073    /// Backs up database to a local filesystem destination.
4074    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4075        let source_path = std::path::Path::new(self.storage.base_path());
4076
4077        if !dest_path.exists() {
4078            std::fs::create_dir_all(dest_path)?;
4079        }
4080
4081        // Recursive copy (local to local)
4082        if source_path.exists() {
4083            Self::copy_dir_all(source_path, dest_path)?;
4084        }
4085
4086        // Copy schema to destination/catalog/schema.json
4087        let schema_manager = self.storage.schema_manager();
4088        let dest_catalog = dest_path.join("catalog");
4089        if !dest_catalog.exists() {
4090            std::fs::create_dir_all(&dest_catalog)?;
4091        }
4092
4093        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4094        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4095
4096        Ok(())
4097    }
4098
4099    /// Backs up database to a cloud storage destination.
4100    ///
4101    /// Streams data from source to destination, supporting cross-cloud backups.
4102    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4103        use object_store::ObjectStore;
4104        use object_store::ObjectStoreExt;
4105        use object_store::local::LocalFileSystem;
4106        use object_store::path::Path as ObjPath;
4107
4108        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4109        let source_path = std::path::Path::new(self.storage.base_path());
4110
4111        // Create local store for source, coerced to dyn ObjectStore
4112        let src_store: Arc<dyn ObjectStore> =
4113            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4114
4115        // Copy catalog/ directory
4116        let catalog_src = ObjPath::from("catalog");
4117        let catalog_dst = if dest_prefix.as_ref().is_empty() {
4118            ObjPath::from("catalog")
4119        } else {
4120            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4121        };
4122        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4123
4124        // Copy storage/ directory
4125        let storage_src = ObjPath::from("storage");
4126        let storage_dst = if dest_prefix.as_ref().is_empty() {
4127            ObjPath::from("storage")
4128        } else {
4129            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4130        };
4131        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4132
4133        // Ensure schema is present at canonical catalog location.
4134        let schema_manager = self.storage.schema_manager();
4135        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4136        let schema_path = if dest_prefix.as_ref().is_empty() {
4137            ObjPath::from("catalog/schema.json")
4138        } else {
4139            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4140        };
4141        dest_store
4142            .put(&schema_path, bytes::Bytes::from(schema_content).into())
4143            .await?;
4144
4145        Ok(())
4146    }
4147
4148    /// Maximum directory depth for backup operations.
4149    ///
4150    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
4151    /// excessively deep directory structures.
4152    const MAX_BACKUP_DEPTH: usize = 100;
4153
4154    /// Maximum file count for backup operations.
4155    ///
4156    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
4157    /// long-running operations from malicious or unexpectedly large directories.
4158    const MAX_BACKUP_FILES: usize = 100_000;
4159
4160    /// Recursively copies a directory with security limits.
4161    ///
4162    /// # Security
4163    ///
4164    /// - **CWE-674**: Depth limit prevents stack overflow
4165    /// - **CWE-400**: File count limit prevents resource exhaustion
4166    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
4167    pub(crate) fn copy_dir_all(
4168        src: &std::path::Path,
4169        dst: &std::path::Path,
4170    ) -> std::io::Result<()> {
4171        let mut file_count = 0usize;
4172        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4173    }
4174
4175    /// Internal implementation with depth and file count tracking.
4176    pub(crate) fn copy_dir_all_impl(
4177        src: &std::path::Path,
4178        dst: &std::path::Path,
4179        depth: usize,
4180        file_count: &mut usize,
4181    ) -> std::io::Result<()> {
4182        if depth >= Self::MAX_BACKUP_DEPTH {
4183            return Err(std::io::Error::new(
4184                std::io::ErrorKind::InvalidInput,
4185                format!(
4186                    "Maximum backup depth {} exceeded at {:?}",
4187                    Self::MAX_BACKUP_DEPTH,
4188                    src
4189                ),
4190            ));
4191        }
4192
4193        std::fs::create_dir_all(dst)?;
4194
4195        for entry in std::fs::read_dir(src)? {
4196            if *file_count >= Self::MAX_BACKUP_FILES {
4197                return Err(std::io::Error::new(
4198                    std::io::ErrorKind::InvalidInput,
4199                    format!(
4200                        "Maximum backup file count {} exceeded",
4201                        Self::MAX_BACKUP_FILES
4202                    ),
4203                ));
4204            }
4205            *file_count += 1;
4206
4207            let entry = entry?;
4208            let metadata = entry.metadata()?;
4209
4210            // Skip symlinks to prevent loops and traversal attacks
4211            if metadata.file_type().is_symlink() {
4212                // Silently skip - logging would require tracing dependency
4213                continue;
4214            }
4215
4216            let dst_path = dst.join(entry.file_name());
4217            if metadata.is_dir() {
4218                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4219            } else {
4220                std::fs::copy(entry.path(), dst_path)?;
4221            }
4222        }
4223        Ok(())
4224    }
4225
4226    pub(crate) async fn execute_copy(
4227        &self,
4228        target: &str,
4229        source: &str,
4230        options: &HashMap<String, Value>,
4231        prop_manager: &PropertyManager,
4232    ) -> Result<Vec<HashMap<String, Value>>> {
4233        let format = options
4234            .get("format")
4235            .and_then(|v| v.as_str())
4236            .unwrap_or_else(|| {
4237                if source.ends_with(".parquet") {
4238                    "parquet"
4239                } else {
4240                    "csv"
4241                }
4242            });
4243
4244        match format.to_lowercase().as_str() {
4245            "csv" => self.execute_csv_import(target, source, options).await,
4246            "parquet" => {
4247                self.execute_parquet_import(target, source, options, prop_manager)
4248                    .await
4249            }
4250            _ => Err(anyhow!("Unsupported format: {}", format)),
4251        }
4252    }
4253
4254    pub(crate) async fn execute_csv_import(
4255        &self,
4256        target: &str,
4257        source: &str,
4258        options: &HashMap<String, Value>,
4259    ) -> Result<Vec<HashMap<String, Value>>> {
4260        // Validate source path against sandbox
4261        let validated_source = self.validate_path(source)?;
4262
4263        let writer_lock = self
4264            .writer
4265            .as_ref()
4266            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4267
4268        let schema = self.storage.schema_manager().schema();
4269
4270        // 1. Determine if target is Label or EdgeType
4271        let label_meta = schema.labels.get(target);
4272        let edge_meta = schema.edge_types.get(target);
4273
4274        if label_meta.is_none() && edge_meta.is_none() {
4275            return Err(anyhow!("Target '{}' not found in schema", target));
4276        }
4277
4278        // 2. Open CSV
4279        let delimiter_str = options
4280            .get("delimiter")
4281            .and_then(|v| v.as_str())
4282            .unwrap_or(",");
4283        let delimiter = if delimiter_str.is_empty() {
4284            b','
4285        } else {
4286            delimiter_str.as_bytes()[0]
4287        };
4288        let has_header = options
4289            .get("header")
4290            .and_then(|v| v.as_bool())
4291            .unwrap_or(true);
4292
4293        let mut rdr = csv::ReaderBuilder::new()
4294            .delimiter(delimiter)
4295            .has_headers(has_header)
4296            .from_path(&validated_source)?;
4297
4298        let headers = rdr.headers()?.clone();
4299        let mut count = 0;
4300
4301        let writer: &uni_store::Writer = writer_lock.as_ref();
4302
4303        // Chunked VID/EID refill for streaming CSV: amortizes the IdAllocator
4304        // mutex over `CSV_ID_CHUNK` rows. End-of-iteration may waste up to
4305        // CSV_ID_CHUNK-1 IDs — harmless in the u64 space.
4306        const CSV_ID_CHUNK: usize = 256;
4307
4308        if label_meta.is_some() {
4309            let target_props = schema
4310                .properties
4311                .get(target)
4312                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4313
4314            let mut vid_chunk: std::collections::VecDeque<Vid> =
4315                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4316            for result in rdr.records() {
4317                let record = result?;
4318                let mut props = HashMap::new();
4319
4320                for (i, header) in headers.iter().enumerate() {
4321                    if let Some(val_str) = record.get(i)
4322                        && let Some(prop_meta) = target_props.get(header)
4323                    {
4324                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4325                        props.insert(header.to_string(), val);
4326                    }
4327                }
4328
4329                if vid_chunk.is_empty() {
4330                    vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4331                }
4332                let vid = vid_chunk.pop_front().unwrap();
4333                writer
4334                    .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4335                    .await?;
4336                count += 1;
4337            }
4338        } else if let Some(meta) = edge_meta {
4339            let type_id = meta.id;
4340            let target_props = schema
4341                .properties
4342                .get(target)
4343                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4344
4345            // For edges, we need src and dst VIDs.
4346            // Expecting columns '_src' and '_dst' or as specified in options.
4347            let src_col = options
4348                .get("src_col")
4349                .and_then(|v| v.as_str())
4350                .unwrap_or("_src");
4351            let dst_col = options
4352                .get("dst_col")
4353                .and_then(|v| v.as_str())
4354                .unwrap_or("_dst");
4355
4356            let mut eid_chunk: std::collections::VecDeque<Eid> =
4357                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4358            for result in rdr.records() {
4359                let record = result?;
4360                let mut props = HashMap::new();
4361                let mut src_vid = None;
4362                let mut dst_vid = None;
4363
4364                for (i, header) in headers.iter().enumerate() {
4365                    if let Some(val_str) = record.get(i) {
4366                        if header == src_col {
4367                            src_vid =
4368                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4369                        } else if header == dst_col {
4370                            dst_vid =
4371                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4372                        } else if let Some(prop_meta) = target_props.get(header) {
4373                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4374                            props.insert(header.to_string(), val);
4375                        }
4376                    }
4377                }
4378
4379                let src =
4380                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4381                let dst = dst_vid
4382                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4383
4384                if eid_chunk.is_empty() {
4385                    eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4386                }
4387                let eid = eid_chunk.pop_front().unwrap();
4388                writer
4389                    .insert_edge(
4390                        src,
4391                        dst,
4392                        type_id,
4393                        eid,
4394                        props,
4395                        Some(target.to_string()),
4396                        None,
4397                    )
4398                    .await?;
4399                count += 1;
4400            }
4401        }
4402
4403        let mut res = HashMap::new();
4404        res.insert("count".to_string(), Value::Int(count as i64));
4405        Ok(vec![res])
4406    }
4407
4408    /// Imports data from Parquet file to a label or edge type.
4409    ///
4410    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4411    pub(crate) async fn execute_parquet_import(
4412        &self,
4413        target: &str,
4414        source: &str,
4415        options: &HashMap<String, Value>,
4416        _prop_manager: &PropertyManager,
4417    ) -> Result<Vec<HashMap<String, Value>>> {
4418        let writer_lock = self
4419            .writer
4420            .as_ref()
4421            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4422
4423        let schema = self.storage.schema_manager().schema();
4424
4425        // 1. Determine if target is Label or EdgeType
4426        let label_meta = schema.labels.get(target);
4427        let edge_meta = schema.edge_types.get(target);
4428
4429        if label_meta.is_none() && edge_meta.is_none() {
4430            return Err(anyhow!("Target '{}' not found in schema", target));
4431        }
4432
4433        // 2. Open Parquet - support both local and cloud URLs
4434        let reader = if is_cloud_url(source) {
4435            self.open_parquet_from_cloud(source).await?
4436        } else {
4437            // Validate local source path against sandbox
4438            let validated_source = self.validate_path(source)?;
4439            let file = std::fs::File::open(&validated_source)?;
4440            let builder =
4441                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4442            builder.build()?
4443        };
4444        let mut reader = reader;
4445
4446        let mut count = 0;
4447        let writer: &uni_store::Writer = writer_lock.as_ref();
4448
4449        if label_meta.is_some() {
4450            let target_props = schema
4451                .properties
4452                .get(target)
4453                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4454
4455            for batch in reader.by_ref() {
4456                let batch = batch?;
4457                let num_rows = batch.num_rows();
4458                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
4459                let vids = writer.allocate_vids(num_rows).await?;
4460                for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4461                    let mut props = HashMap::new();
4462                    for field in batch.schema().fields() {
4463                        let name = field.name();
4464                        if target_props.contains_key(name) {
4465                            let col = batch.column_by_name(name).unwrap();
4466                            if !col.is_null(row) {
4467                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4468                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4469                                let val =
4470                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4471                                props.insert(name.clone(), val);
4472                            }
4473                        }
4474                    }
4475                    writer
4476                        .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4477                        .await?;
4478                    count += 1;
4479                }
4480            }
4481        } else if let Some(meta) = edge_meta {
4482            let type_id = meta.id;
4483            let target_props = schema
4484                .properties
4485                .get(target)
4486                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4487
4488            let src_col = options
4489                .get("src_col")
4490                .and_then(|v| v.as_str())
4491                .unwrap_or("_src");
4492            let dst_col = options
4493                .get("dst_col")
4494                .and_then(|v| v.as_str())
4495                .unwrap_or("_dst");
4496
4497            for batch in reader {
4498                let batch = batch?;
4499                let num_rows = batch.num_rows();
4500                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
4501                let eids = writer.allocate_eids(num_rows).await?;
4502                for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4503                    let mut props = HashMap::new();
4504                    let mut src_vid = None;
4505                    let mut dst_vid = None;
4506
4507                    for field in batch.schema().fields() {
4508                        let name = field.name();
4509                        let col = batch.column_by_name(name).unwrap();
4510                        if col.is_null(row) {
4511                            continue;
4512                        }
4513
4514                        if name == src_col {
4515                            let val = Self::arrow_to_value(col.as_ref(), row);
4516                            src_vid = Some(Self::vid_from_value(&val)?);
4517                        } else if name == dst_col {
4518                            let val = Self::arrow_to_value(col.as_ref(), row);
4519                            dst_vid = Some(Self::vid_from_value(&val)?);
4520                        } else if let Some(pm) = target_props.get(name) {
4521                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4522                            let val =
4523                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4524                            props.insert(name.clone(), val);
4525                        }
4526                    }
4527
4528                    let src = src_vid
4529                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4530                    let dst = dst_vid.ok_or_else(|| {
4531                        anyhow!("Missing destination VID in column '{}'", dst_col)
4532                    })?;
4533
4534                    writer
4535                        .insert_edge(
4536                            src,
4537                            dst,
4538                            type_id,
4539                            eid,
4540                            props,
4541                            Some(target.to_string()),
4542                            None,
4543                        )
4544                        .await?;
4545                    count += 1;
4546                }
4547            }
4548        }
4549
4550        let mut res = HashMap::new();
4551        res.insert("count".to_string(), Value::Int(count as i64));
4552        Ok(vec![res])
4553    }
4554
4555    /// Opens a Parquet file from a cloud URL.
4556    ///
4557    /// Downloads the file to memory and creates a Parquet reader.
4558    async fn open_parquet_from_cloud(
4559        &self,
4560        source_url: &str,
4561    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4562        use object_store::ObjectStoreExt;
4563
4564        let (store, path) = build_store_from_url(source_url)?;
4565
4566        // Download file contents
4567        let bytes = store.get(&path).await?.bytes().await?;
4568
4569        // Create a Parquet reader from the bytes
4570        let reader = bytes::Bytes::from(bytes.to_vec());
4571        let builder =
4572            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4573        Ok(builder.build()?)
4574    }
4575
4576    pub(crate) async fn scan_edge_type(
4577        &self,
4578        edge_type: &str,
4579        ctx: Option<&QueryContext>,
4580    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4581        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4582
4583        // 1. Scan L2 (Base)
4584        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4585
4586        // 2. Scan L1 (Delta)
4587        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4588
4589        // 3. Scan L0 (Memory) and filter tombstoned vertices
4590        if let Some(ctx) = ctx {
4591            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4592            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4593        }
4594
4595        Ok(edges
4596            .into_iter()
4597            .map(|(eid, (src, dst))| (eid, src, dst))
4598            .collect())
4599    }
4600
4601    /// Scan L2 (base) storage for edges of a given type.
4602    ///
4603    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4604    /// This L2 scan will typically find no data.
4605    pub(crate) async fn scan_edge_type_l2(
4606        &self,
4607        _edge_type: &str,
4608        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4609    ) -> Result<()> {
4610        // Edges are now stored in delta datasets (L1) via LanceDB.
4611        // Legacy L2 base edge storage is no longer used.
4612        Ok(())
4613    }
4614
4615    /// Scan L1 (delta) storage for edges of a given type.
4616    pub(crate) async fn scan_edge_type_l1(
4617        &self,
4618        edge_type: &str,
4619        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4620    ) -> Result<()> {
4621        if let Ok(Some(batch)) = self
4622            .storage
4623            .scan_delta_table(
4624                edge_type,
4625                "fwd",
4626                &["eid", "src_vid", "dst_vid", "op", "_version"],
4627                None,
4628            )
4629            .await
4630        {
4631            // Collect ops with versions: eid -> (version, op, src, dst)
4632            let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4633                HashMap::new();
4634
4635            self.process_delta_batch(&batch, &mut versioned_ops)?;
4636
4637            // Apply the winning ops
4638            for (eid, (_, op, src, dst)) in versioned_ops {
4639                if op == 0 {
4640                    edges.insert(eid, (src, dst));
4641                } else if op == 1 {
4642                    edges.remove(&eid);
4643                }
4644            }
4645        }
4646        Ok(())
4647    }
4648
4649    /// Process a delta batch, tracking versioned operations.
4650    pub(crate) fn process_delta_batch(
4651        &self,
4652        batch: &arrow_array::RecordBatch,
4653        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4654    ) -> Result<()> {
4655        use arrow_array::UInt64Array;
4656        let eid_col = batch
4657            .column_by_name("eid")
4658            .ok_or(anyhow!("Missing eid"))?
4659            .as_any()
4660            .downcast_ref::<UInt64Array>()
4661            .ok_or(anyhow!("Invalid eid"))?;
4662        let src_col = batch
4663            .column_by_name("src_vid")
4664            .ok_or(anyhow!("Missing src_vid"))?
4665            .as_any()
4666            .downcast_ref::<UInt64Array>()
4667            .ok_or(anyhow!("Invalid src_vid"))?;
4668        let dst_col = batch
4669            .column_by_name("dst_vid")
4670            .ok_or(anyhow!("Missing dst_vid"))?
4671            .as_any()
4672            .downcast_ref::<UInt64Array>()
4673            .ok_or(anyhow!("Invalid dst_vid"))?;
4674        let op_col = batch
4675            .column_by_name("op")
4676            .ok_or(anyhow!("Missing op"))?
4677            .as_any()
4678            .downcast_ref::<arrow_array::UInt8Array>()
4679            .ok_or(anyhow!("Invalid op"))?;
4680        let version_col = batch
4681            .column_by_name("_version")
4682            .ok_or(anyhow!("Missing _version"))?
4683            .as_any()
4684            .downcast_ref::<UInt64Array>()
4685            .ok_or(anyhow!("Invalid _version"))?;
4686
4687        for i in 0..batch.num_rows() {
4688            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4689            let version = version_col.value(i);
4690            let op = op_col.value(i);
4691            let src = Vid::from(src_col.value(i));
4692            let dst = Vid::from(dst_col.value(i));
4693
4694            match versioned_ops.entry(eid) {
4695                std::collections::hash_map::Entry::Vacant(e) => {
4696                    e.insert((version, op, src, dst));
4697                }
4698                std::collections::hash_map::Entry::Occupied(mut e) => {
4699                    if version > e.get().0 {
4700                        e.insert((version, op, src, dst));
4701                    }
4702                }
4703            }
4704        }
4705        Ok(())
4706    }
4707
4708    /// Scan L0 (memory) buffers for edges of a given type.
4709    pub(crate) fn scan_edge_type_l0(
4710        &self,
4711        edge_type: &str,
4712        ctx: &QueryContext,
4713        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4714    ) {
4715        let schema = self.storage.schema_manager().schema();
4716        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4717
4718        if let Some(type_id) = type_id {
4719            // Main L0
4720            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4721
4722            // Transaction L0
4723            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4724                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4725            }
4726
4727            // Pending flush L0s
4728            for pending_l0_arc in &ctx.pending_flush_l0s {
4729                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4730            }
4731        }
4732    }
4733
4734    /// Scan a single L0 buffer for edges and apply tombstones.
4735    pub(crate) fn scan_single_l0(
4736        &self,
4737        l0: &uni_store::runtime::L0Buffer,
4738        type_id: u32,
4739        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4740    ) {
4741        for edge_entry in l0.graph.edges() {
4742            if edge_entry.edge_type == type_id {
4743                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4744            }
4745        }
4746        // Process Tombstones
4747        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4748        for eid in eids_to_check {
4749            if l0.is_tombstoned(eid) {
4750                edges.remove(&eid);
4751            }
4752        }
4753    }
4754
4755    /// Filter out edges connected to tombstoned vertices.
4756    pub(crate) fn filter_tombstoned_vertex_edges(
4757        &self,
4758        ctx: &QueryContext,
4759        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4760    ) {
4761        let l0 = ctx.l0.read();
4762        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4763
4764        // Include tx_l0 vertex tombstones if present
4765        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4766            let tx_l0 = tx_l0_arc.read();
4767            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4768        }
4769
4770        // Include pending flush L0 vertex tombstones
4771        for pending_l0_arc in &ctx.pending_flush_l0s {
4772            let pending_l0 = pending_l0_arc.read();
4773            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4774        }
4775
4776        edges.retain(|_, (src, dst)| {
4777            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4778        });
4779    }
4780
4781    /// Execute a projection operation.
4782    pub(crate) async fn execute_project(
4783        &self,
4784        input_rows: Vec<HashMap<String, Value>>,
4785        projections: &[(Expr, Option<String>)],
4786        prop_manager: &PropertyManager,
4787        params: &HashMap<String, Value>,
4788        ctx: Option<&QueryContext>,
4789    ) -> Result<Vec<HashMap<String, Value>>> {
4790        let mut results = Vec::new();
4791        for m in input_rows {
4792            let mut row = HashMap::new();
4793            for (expr, alias) in projections {
4794                let val = self
4795                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4796                    .await?;
4797                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4798                row.insert(name, val);
4799            }
4800            results.push(row);
4801        }
4802        Ok(results)
4803    }
4804
4805    /// Execute an UNWIND operation.
4806    pub(crate) async fn execute_unwind(
4807        &self,
4808        input_rows: Vec<HashMap<String, Value>>,
4809        expr: &Expr,
4810        variable: &str,
4811        prop_manager: &PropertyManager,
4812        params: &HashMap<String, Value>,
4813        ctx: Option<&QueryContext>,
4814    ) -> Result<Vec<HashMap<String, Value>>> {
4815        let mut results = Vec::new();
4816        for row in input_rows {
4817            let val = self
4818                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4819                .await?;
4820            if let Value::List(items) = val {
4821                for item in items {
4822                    let mut new_row = row.clone();
4823                    new_row.insert(variable.to_string(), item);
4824                    results.push(new_row);
4825                }
4826            }
4827        }
4828        Ok(results)
4829    }
4830
4831    /// Execute an APPLY (correlated subquery) operation.
4832    pub(crate) async fn execute_apply(
4833        &self,
4834        input_rows: Vec<HashMap<String, Value>>,
4835        subquery: &LogicalPlan,
4836        input_filter: Option<&Expr>,
4837        prop_manager: &PropertyManager,
4838        params: &HashMap<String, Value>,
4839        ctx: Option<&QueryContext>,
4840    ) -> Result<Vec<HashMap<String, Value>>> {
4841        let mut filtered_rows = input_rows;
4842
4843        if let Some(filter) = input_filter {
4844            let mut filtered = Vec::new();
4845            for row in filtered_rows {
4846                let res = self
4847                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4848                    .await?;
4849                if res.as_bool().unwrap_or(false) {
4850                    filtered.push(row);
4851                }
4852            }
4853            filtered_rows = filtered;
4854        }
4855
4856        // Handle empty input: execute subquery once with empty context
4857        // This is critical for standalone CALL statements at the beginning of a query
4858        if filtered_rows.is_empty() {
4859            let sub_rows = self
4860                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4861                .await?;
4862            return Ok(sub_rows);
4863        }
4864
4865        let mut results = Vec::new();
4866        for row in filtered_rows {
4867            let mut sub_params = params.clone();
4868            sub_params.extend(row.clone());
4869
4870            let sub_rows = self
4871                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4872                .await?;
4873
4874            for sub_row in sub_rows {
4875                let mut new_row = row.clone();
4876                new_row.extend(sub_row);
4877                results.push(new_row);
4878            }
4879        }
4880        Ok(results)
4881    }
4882
4883    /// Execute SHOW INDEXES command.
4884    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4885        let schema = self.storage.schema_manager().schema();
4886        let mut rows = Vec::new();
4887        for idx in &schema.indexes {
4888            let (name, type_str, details) = match idx {
4889                uni_common::core::schema::IndexDefinition::Vector(c) => (
4890                    c.name.clone(),
4891                    "VECTOR",
4892                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4893                ),
4894                uni_common::core::schema::IndexDefinition::FullText(c) => (
4895                    c.name.clone(),
4896                    "FULLTEXT",
4897                    format!("on {}:{:?}", c.label, c.properties),
4898                ),
4899                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4900                    cfg.name.clone(),
4901                    "SCALAR",
4902                    format!(":{}({:?})", cfg.label, cfg.properties),
4903                ),
4904                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4905            };
4906
4907            if let Some(f) = filter
4908                && f != type_str
4909            {
4910                continue;
4911            }
4912
4913            let mut row = HashMap::new();
4914            row.insert("name".to_string(), Value::String(name));
4915            row.insert("type".to_string(), Value::String(type_str.to_string()));
4916            row.insert("details".to_string(), Value::String(details));
4917            rows.push(row);
4918        }
4919        rows
4920    }
4921
4922    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4923        let mut row = HashMap::new();
4924        row.insert("name".to_string(), Value::String("uni".to_string()));
4925        // Could add storage path, etc.
4926        vec![row]
4927    }
4928
4929    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4930        // Placeholder as we don't easy access to config struct from here
4931        vec![]
4932    }
4933
4934    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4935        let snapshot = self
4936            .storage
4937            .snapshot_manager()
4938            .load_latest_snapshot()
4939            .await?;
4940        let mut results = Vec::new();
4941
4942        if let Some(snap) = snapshot {
4943            for (label, s) in &snap.vertices {
4944                let mut row = HashMap::new();
4945                row.insert("type".to_string(), Value::String("Label".to_string()));
4946                row.insert("name".to_string(), Value::String(label.clone()));
4947                row.insert("count".to_string(), Value::Int(s.count as i64));
4948                results.push(row);
4949            }
4950            for (edge, s) in &snap.edges {
4951                let mut row = HashMap::new();
4952                row.insert("type".to_string(), Value::String("Edge".to_string()));
4953                row.insert("name".to_string(), Value::String(edge.clone()));
4954                row.insert("count".to_string(), Value::Int(s.count as i64));
4955                results.push(row);
4956            }
4957        }
4958
4959        Ok(results)
4960    }
4961
4962    pub(crate) fn execute_show_constraints(
4963        &self,
4964        clause: ShowConstraints,
4965    ) -> Vec<HashMap<String, Value>> {
4966        let schema = self.storage.schema_manager().schema();
4967        let mut rows = Vec::new();
4968        for c in &schema.constraints {
4969            if let Some(target) = &clause.target {
4970                match (target, &c.target) {
4971                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4972                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4973                        if e1 == e2 => {}
4974                    _ => continue,
4975                }
4976            }
4977
4978            let mut row = HashMap::new();
4979            row.insert("name".to_string(), Value::String(c.name.clone()));
4980            let type_str = match c.constraint_type {
4981                ConstraintType::Unique { .. } => "UNIQUE",
4982                ConstraintType::Exists { .. } => "EXISTS",
4983                ConstraintType::Check { .. } => "CHECK",
4984                _ => "UNKNOWN",
4985            };
4986            row.insert("type".to_string(), Value::String(type_str.to_string()));
4987
4988            let target_str = match &c.target {
4989                ConstraintTarget::Label(l) => format!("(:{})", l),
4990                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4991                _ => "UNKNOWN".to_string(),
4992            };
4993            row.insert("target".to_string(), Value::String(target_str));
4994
4995            rows.push(row);
4996        }
4997        rows
4998    }
4999
5000    /// Execute a MERGE operation.
5001    pub(crate) async fn execute_cross_join(
5002        &self,
5003        left: Box<LogicalPlan>,
5004        right: Box<LogicalPlan>,
5005        prop_manager: &PropertyManager,
5006        params: &HashMap<String, Value>,
5007        ctx: Option<&QueryContext>,
5008    ) -> Result<Vec<HashMap<String, Value>>> {
5009        let left_rows = self
5010            .execute_subplan(*left, prop_manager, params, ctx)
5011            .await?;
5012        let right_rows = self
5013            .execute_subplan(*right, prop_manager, params, ctx)
5014            .await?;
5015
5016        let mut results = Vec::new();
5017        for l in &left_rows {
5018            for r in &right_rows {
5019                let mut combined = l.clone();
5020                combined.extend(r.clone());
5021                results.push(combined);
5022            }
5023        }
5024        Ok(results)
5025    }
5026
5027    /// Execute a UNION operation with optional deduplication.
5028    pub(crate) async fn execute_union(
5029        &self,
5030        left: Box<LogicalPlan>,
5031        right: Box<LogicalPlan>,
5032        all: bool,
5033        prop_manager: &PropertyManager,
5034        params: &HashMap<String, Value>,
5035        ctx: Option<&QueryContext>,
5036    ) -> Result<Vec<HashMap<String, Value>>> {
5037        let mut left_rows = self
5038            .execute_subplan(*left, prop_manager, params, ctx)
5039            .await?;
5040        let mut right_rows = self
5041            .execute_subplan(*right, prop_manager, params, ctx)
5042            .await?;
5043
5044        left_rows.append(&mut right_rows);
5045
5046        if !all {
5047            let mut seen = HashSet::new();
5048            left_rows.retain(|row| {
5049                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5050                let key = format!("{sorted_row:?}");
5051                seen.insert(key)
5052            });
5053        }
5054        Ok(left_rows)
5055    }
5056
5057    /// Check if an index with the given name exists.
5058    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5059        let schema = self.storage.schema_manager().schema();
5060        schema.indexes.iter().any(|idx| match idx {
5061            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5062            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5063            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5064            _ => false,
5065        })
5066    }
5067
5068    pub(crate) async fn execute_export(
5069        &self,
5070        target: &str,
5071        source: &str,
5072        options: &HashMap<String, Value>,
5073        prop_manager: &PropertyManager,
5074        ctx: Option<&QueryContext>,
5075    ) -> Result<Vec<HashMap<String, Value>>> {
5076        let format = options
5077            .get("format")
5078            .and_then(|v| v.as_str())
5079            .unwrap_or("csv")
5080            .to_lowercase();
5081
5082        match format.as_str() {
5083            "csv" => {
5084                self.execute_csv_export(target, source, options, prop_manager, ctx)
5085                    .await
5086            }
5087            "parquet" => {
5088                self.execute_parquet_export(target, source, options, prop_manager, ctx)
5089                    .await
5090            }
5091            _ => Err(anyhow!("Unsupported export format: {}", format)),
5092        }
5093    }
5094
5095    pub(crate) async fn execute_csv_export(
5096        &self,
5097        target: &str,
5098        source: &str,
5099        options: &HashMap<String, Value>,
5100        prop_manager: &PropertyManager,
5101        ctx: Option<&QueryContext>,
5102    ) -> Result<Vec<HashMap<String, Value>>> {
5103        // Validate destination path against sandbox
5104        let validated_dest = self.validate_path(source)?;
5105
5106        let schema = self.storage.schema_manager().schema();
5107        let label_meta = schema.labels.get(target);
5108        let edge_meta = schema.edge_types.get(target);
5109
5110        if label_meta.is_none() && edge_meta.is_none() {
5111            return Err(anyhow!("Target '{}' not found in schema", target));
5112        }
5113
5114        let delimiter_str = options
5115            .get("delimiter")
5116            .and_then(|v| v.as_str())
5117            .unwrap_or(",");
5118        let delimiter = if delimiter_str.is_empty() {
5119            b','
5120        } else {
5121            delimiter_str.as_bytes()[0]
5122        };
5123        let has_header = options
5124            .get("header")
5125            .and_then(|v| v.as_bool())
5126            .unwrap_or(true);
5127
5128        let mut wtr = csv::WriterBuilder::new()
5129            .delimiter(delimiter)
5130            .from_path(&validated_dest)?;
5131
5132        let mut count = 0;
5133        // Empty properties map for labels/edge types without registered properties
5134        let empty_props = HashMap::new();
5135
5136        if let Some(meta) = label_meta {
5137            let label_id = meta.id;
5138            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5139            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5140            prop_names.sort();
5141
5142            let mut headers = vec!["_vid".to_string()];
5143            headers.extend(prop_names.clone());
5144
5145            if has_header {
5146                wtr.write_record(&headers)?;
5147            }
5148
5149            let vids = self
5150                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5151                .await?;
5152
5153            for vid in vids {
5154                let props = prop_manager
5155                    .get_all_vertex_props_with_ctx(vid, ctx)
5156                    .await?
5157                    .unwrap_or_default();
5158
5159                let mut row = Vec::with_capacity(headers.len());
5160                row.push(vid.to_string());
5161                for p_name in &prop_names {
5162                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5163                    row.push(self.format_csv_value(val));
5164                }
5165                wtr.write_record(&row)?;
5166                count += 1;
5167            }
5168        } else if let Some(meta) = edge_meta {
5169            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5170            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5171            prop_names.sort();
5172
5173            // Headers for Edge: _eid, _src, _dst, _type, ...props
5174            let mut headers = vec![
5175                "_eid".to_string(),
5176                "_src".to_string(),
5177                "_dst".to_string(),
5178                "_type".to_string(),
5179            ];
5180            headers.extend(prop_names.clone());
5181
5182            if has_header {
5183                wtr.write_record(&headers)?;
5184            }
5185
5186            let edges = self.scan_edge_type(target, ctx).await?;
5187
5188            for (eid, src, dst) in edges {
5189                let props = prop_manager
5190                    .get_all_edge_props_with_ctx(eid, ctx)
5191                    .await?
5192                    .unwrap_or_default();
5193
5194                let mut row = Vec::with_capacity(headers.len());
5195                row.push(eid.to_string());
5196                row.push(src.to_string());
5197                row.push(dst.to_string());
5198                row.push(meta.id.to_string());
5199
5200                for p_name in &prop_names {
5201                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5202                    row.push(self.format_csv_value(val));
5203                }
5204                wtr.write_record(&row)?;
5205                count += 1;
5206            }
5207        }
5208
5209        wtr.flush()?;
5210        let mut res = HashMap::new();
5211        res.insert("count".to_string(), Value::Int(count as i64));
5212        Ok(vec![res])
5213    }
5214
5215    /// Exports data to Parquet format.
5216    ///
5217    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
5218    pub(crate) async fn execute_parquet_export(
5219        &self,
5220        target: &str,
5221        destination: &str,
5222        _options: &HashMap<String, Value>,
5223        prop_manager: &PropertyManager,
5224        ctx: Option<&QueryContext>,
5225    ) -> Result<Vec<HashMap<String, Value>>> {
5226        let schema_manager = self.storage.schema_manager();
5227        let schema = schema_manager.schema();
5228        let label_meta = schema.labels.get(target);
5229        let edge_meta = schema.edge_types.get(target);
5230
5231        if label_meta.is_none() && edge_meta.is_none() {
5232            return Err(anyhow!("Target '{}' not found in schema", target));
5233        }
5234
5235        let arrow_schema = if label_meta.is_some() {
5236            let dataset = self.storage.vertex_dataset(target)?;
5237            dataset.get_arrow_schema(&schema)?
5238        } else {
5239            // Edge Schema
5240            let dataset = self.storage.edge_dataset(target, "", "")?;
5241            dataset.get_arrow_schema(&schema)?
5242        };
5243
5244        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5245
5246        if let Some(meta) = label_meta {
5247            let label_id = meta.id;
5248            let vids = self
5249                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5250                .await?;
5251
5252            for vid in vids {
5253                let mut props = prop_manager
5254                    .get_all_vertex_props_with_ctx(vid, ctx)
5255                    .await?
5256                    .unwrap_or_default();
5257
5258                props.insert(
5259                    "_vid".to_string(),
5260                    uni_common::Value::Int(vid.as_u64() as i64),
5261                );
5262                if !props.contains_key("_uid") {
5263                    props.insert(
5264                        "_uid".to_string(),
5265                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5266                    );
5267                }
5268                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5269                props.insert("_version".to_string(), uni_common::Value::Int(1));
5270                rows.push(props);
5271            }
5272        } else if edge_meta.is_some() {
5273            let edges = self.scan_edge_type(target, ctx).await?;
5274            for (eid, src, dst) in edges {
5275                let mut props = prop_manager
5276                    .get_all_edge_props_with_ctx(eid, ctx)
5277                    .await?
5278                    .unwrap_or_default();
5279
5280                props.insert(
5281                    "eid".to_string(),
5282                    uni_common::Value::Int(eid.as_u64() as i64),
5283                );
5284                props.insert(
5285                    "src_vid".to_string(),
5286                    uni_common::Value::Int(src.as_u64() as i64),
5287                );
5288                props.insert(
5289                    "dst_vid".to_string(),
5290                    uni_common::Value::Int(dst.as_u64() as i64),
5291                );
5292                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5293                props.insert("_version".to_string(), uni_common::Value::Int(1));
5294                rows.push(props);
5295            }
5296        }
5297
5298        // Write to cloud or local file
5299        if is_cloud_url(destination) {
5300            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5301                .await?;
5302        } else {
5303            // Validate local destination path against sandbox
5304            let validated_dest = self.validate_path(destination)?;
5305            let file = std::fs::File::create(&validated_dest)?;
5306            let mut writer =
5307                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5308
5309            // Write all in one batch for now (simplification)
5310            if !rows.is_empty() {
5311                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5312                writer.write(&batch)?;
5313            }
5314
5315            writer.close()?;
5316        }
5317
5318        let mut res = HashMap::new();
5319        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5320        Ok(vec![res])
5321    }
5322
5323    /// Writes Parquet data to a cloud storage destination.
5324    async fn write_parquet_to_cloud(
5325        &self,
5326        dest_url: &str,
5327        rows: &[HashMap<String, uni_common::Value>],
5328        arrow_schema: &arrow_schema::Schema,
5329    ) -> Result<()> {
5330        use object_store::ObjectStoreExt;
5331
5332        let (store, path) = build_store_from_url(dest_url)?;
5333
5334        // Write to an in-memory buffer
5335        let mut buffer = Vec::new();
5336        {
5337            let mut writer = parquet::arrow::ArrowWriter::try_new(
5338                &mut buffer,
5339                Arc::new(arrow_schema.clone()),
5340                None,
5341            )?;
5342
5343            if !rows.is_empty() {
5344                let batch = self.rows_to_batch(rows, arrow_schema)?;
5345                writer.write(&batch)?;
5346            }
5347
5348            writer.close()?;
5349        }
5350
5351        // Upload to cloud storage
5352        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5353
5354        Ok(())
5355    }
5356
5357    pub(crate) fn rows_to_batch(
5358        &self,
5359        rows: &[HashMap<String, uni_common::Value>],
5360        schema: &arrow_schema::Schema,
5361    ) -> Result<RecordBatch> {
5362        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5363
5364        for field in schema.fields() {
5365            let name = field.name();
5366            let dt = field.data_type();
5367
5368            let values: Vec<uni_common::Value> = rows
5369                .iter()
5370                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5371                .collect();
5372            let array = self.values_to_array(&values, dt)?;
5373            columns.push(array);
5374        }
5375
5376        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5377    }
5378
5379    /// Convert a slice of Values to an Arrow array.
5380    /// Delegates to the shared implementation in arrow_convert module.
5381    pub(crate) fn values_to_array(
5382        &self,
5383        values: &[uni_common::Value],
5384        dt: &arrow_schema::DataType,
5385    ) -> Result<Arc<dyn Array>> {
5386        arrow_convert::values_to_array(values, dt)
5387    }
5388
5389    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5390        match val {
5391            Value::Null => "".to_string(),
5392            Value::String(s) => s,
5393            Value::Int(i) => i.to_string(),
5394            Value::Float(f) => f.to_string(),
5395            Value::Bool(b) => b.to_string(),
5396            _ => format!("{val}"),
5397        }
5398    }
5399
5400    pub(crate) fn parse_csv_value(
5401        &self,
5402        s: &str,
5403        data_type: &uni_common::core::schema::DataType,
5404        prop_name: &str,
5405    ) -> Result<Value> {
5406        if s.is_empty() || s.to_lowercase() == "null" {
5407            return Ok(Value::Null);
5408        }
5409
5410        use uni_common::core::schema::DataType;
5411        match data_type {
5412            DataType::String => Ok(Value::String(s.to_string())),
5413            DataType::Int32 | DataType::Int64 => {
5414                let i = s.parse::<i64>().map_err(|_| {
5415                    anyhow!(
5416                        "Failed to parse integer for property '{}': {}",
5417                        prop_name,
5418                        s
5419                    )
5420                })?;
5421                Ok(Value::Int(i))
5422            }
5423            DataType::Float32 | DataType::Float64 => {
5424                let f = s.parse::<f64>().map_err(|_| {
5425                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5426                })?;
5427                Ok(Value::Float(f))
5428            }
5429            DataType::Bool => {
5430                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5431                    anyhow!(
5432                        "Failed to parse boolean for property '{}': {}",
5433                        prop_name,
5434                        s
5435                    )
5436                })?;
5437                Ok(Value::Bool(b))
5438            }
5439            DataType::CypherValue => {
5440                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5441                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5442                })?;
5443                Ok(Value::from(json_val))
5444            }
5445            DataType::Vector { .. } => {
5446                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5447                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5448                })?;
5449                Ok(Value::Vector(v))
5450            }
5451            _ => Ok(Value::String(s.to_string())),
5452        }
5453    }
5454
5455    pub(crate) async fn detach_delete_vertex(
5456        &self,
5457        vid: Vid,
5458        writer: &Writer,
5459        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5460    ) -> Result<()> {
5461        let schema = self.storage.schema_manager().schema();
5462        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5463
5464        // 1. Find and delete all outgoing edges
5465        let out_graph = self
5466            .storage
5467            .load_subgraph_cached(
5468                &[vid],
5469                &edge_type_ids,
5470                1,
5471                uni_store::runtime::Direction::Outgoing,
5472                Some(writer.l0_manager.get_current()),
5473            )
5474            .await?;
5475
5476        for edge in out_graph.edges() {
5477            writer
5478                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5479                .await?;
5480        }
5481
5482        // 2. Find and delete all incoming edges
5483        let in_graph = self
5484            .storage
5485            .load_subgraph_cached(
5486                &[vid],
5487                &edge_type_ids,
5488                1,
5489                uni_store::runtime::Direction::Incoming,
5490                Some(writer.l0_manager.get_current()),
5491            )
5492            .await?;
5493
5494        for edge in in_graph.edges() {
5495            writer
5496                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5497                .await?;
5498        }
5499
5500        Ok(())
5501    }
5502
5503    /// Load outgoing + incoming subgraphs (1-hop) for a batch of vertices
5504    /// in two scans — one per direction. Shared infrastructure for
5505    /// `batch_detach_delete_vertices` (which deletes the loaded edges) and
5506    /// `batch_check_vertices_have_no_edges` (which errors if any
5507    /// non-tombstoned edges exist).
5508    ///
5509    /// Returns raw `WorkingGraph`s with **no tombstone filtering**;
5510    /// callers that care about same-batch edge deletes (e.g., the
5511    /// non-detach check that runs after edges have been individually
5512    /// DELETEd) layer the filter on top.
5513    async fn batch_load_incident_edges(
5514        &self,
5515        vids: &[Vid],
5516        writer: &Writer,
5517    ) -> Result<(
5518        uni_store::runtime::working_graph::WorkingGraph,
5519        uni_store::runtime::working_graph::WorkingGraph,
5520    )> {
5521        let schema = self.storage.schema_manager().schema();
5522        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5523        let l0 = Some(writer.l0_manager.get_current());
5524
5525        let out_graph = self
5526            .storage
5527            .load_subgraph_cached(
5528                vids,
5529                &edge_type_ids,
5530                1,
5531                uni_store::runtime::Direction::Outgoing,
5532                l0.clone(),
5533            )
5534            .await?;
5535        let in_graph = self
5536            .storage
5537            .load_subgraph_cached(
5538                vids,
5539                &edge_type_ids,
5540                1,
5541                uni_store::runtime::Direction::Incoming,
5542                l0,
5543            )
5544            .await?;
5545        Ok((out_graph, in_graph))
5546    }
5547
5548    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5549    pub(crate) async fn batch_detach_delete_vertices(
5550        &self,
5551        vids: &[Vid],
5552        labels_per_vid: Vec<Option<Vec<String>>>,
5553        writer: &Writer,
5554        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5555    ) -> Result<()> {
5556        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5557
5558        for edge in out_graph.edges() {
5559            writer
5560                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5561                .await?;
5562        }
5563        for edge in in_graph.edges() {
5564            writer
5565                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5566                .await?;
5567        }
5568
5569        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5570            writer.delete_vertex(*vid, labels, tx_l0).await?;
5571        }
5572
5573        Ok(())
5574    }
5575
5576    /// Batch non-detach DELETE check: verify NO incident edges exist for
5577    /// any VID in `vids`, modulo tombstones from the writer's L0 and the
5578    /// tx-local L0 (same set the per-VID `check_vertex_has_no_edges`
5579    /// consults at write.rs:2650-2673).
5580    ///
5581    /// Why this exists: the per-VID `check_vertex_has_no_edges` does two
5582    /// `load_subgraph_cached(&[vid], ...)` calls per vertex (outgoing +
5583    /// incoming). At N vertices that's 2N subgraph loads. The batched
5584    /// path collapses to 2 loads regardless of N.
5585    ///
5586    /// Tombstone visibility: detach (which deletes everything) doesn't
5587    /// need the tombstone filter; non-detach does, because the caller
5588    /// may have already DELETEd some incident edges in the same
5589    /// statement (e.g. `MATCH (a)-[r]->(b) DELETE r, a` — edges are
5590    /// deleted before nodes per mutation_common.rs:720-724, so by the
5591    /// time this check runs, `r`'s eid is in `tx_l0.tombstones` and
5592    /// must be excluded from the "still has edges" set).
5593    pub(crate) async fn batch_check_vertices_have_no_edges(
5594        &self,
5595        vids: &[Vid],
5596        writer: &Writer,
5597        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5598    ) -> Result<()> {
5599        if vids.is_empty() {
5600            return Ok(());
5601        }
5602
5603        let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5604            std::collections::HashSet::new();
5605        {
5606            let writer_l0 = writer.l0_manager.get_current();
5607            let guard = writer_l0.read();
5608            for &eid in guard.tombstones.keys() {
5609                tombstoned_eids.insert(eid);
5610            }
5611        }
5612        if let Some(tx) = tx_l0 {
5613            let guard = tx.read();
5614            for &eid in guard.tombstones.keys() {
5615                tombstoned_eids.insert(eid);
5616            }
5617        }
5618
5619        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5620
5621        for edge in out_graph.edges().chain(in_graph.edges()) {
5622            if !tombstoned_eids.contains(&edge.eid) {
5623                return Err(anyhow!(
5624                    "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5625                    edge.src_vid
5626                ));
5627            }
5628        }
5629        Ok(())
5630    }
5631}