Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55// DataFusion engine imports
56use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73/// Collect VIDs from all L0 buffers visible to a query context.
74///
75/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
76/// collects the results. Returns an empty vec when no query context is present.
77fn collect_l0_vids(
78    ctx: Option<&QueryContext>,
79    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81    let mut vids = Vec::new();
82    if let Some(ctx) = ctx {
83        vids.extend(extractor(&ctx.l0.read()));
84        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85            vids.extend(extractor(&tx_l0_arc.read()));
86        }
87        for pending_l0_arc in &ctx.pending_flush_l0s {
88            vids.extend(extractor(&pending_l0_arc.read()));
89        }
90    }
91    vids
92}
93
94/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
95///
96/// This is the fallback for pushdown hydration - if the entity only has system fields
97/// (indicating pushdown didn't load properties), we load all properties here.
98///
99/// System field counts:
100/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
101/// - Vertex: 3 fields (_vid, _label, _uid)
102async fn hydrate_entity_if_needed(
103    map: &mut HashMap<String, Value>,
104    prop_manager: &PropertyManager,
105    ctx: Option<&QueryContext>,
106) {
107    // Check for edge entity
108    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110            tracing::debug!(
111                "Pushdown fallback: hydrating edge {} at execution time",
112                eid_u64
113            );
114            if let Ok(Some(props)) = prop_manager
115                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116                .await
117            {
118                for (key, value) in props {
119                    map.entry(key).or_insert(value);
120                }
121            }
122        } else {
123            tracing::trace!(
124                "Pushdown success: edge {} already has {} properties",
125                eid_u64,
126                map.len() - EDGE_SYSTEM_FIELD_COUNT
127            );
128        }
129        return;
130    }
131
132    // Check for vertex entity
133    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135            tracing::debug!(
136                "Pushdown fallback: hydrating vertex {} at execution time",
137                vid_u64
138            );
139            if let Ok(Some(props)) = prop_manager
140                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141                .await
142            {
143                for (key, value) in props {
144                    map.entry(key).or_insert(value);
145                }
146            }
147        } else {
148            tracing::trace!(
149                "Pushdown success: vertex {} already has {} properties",
150                vid_u64,
151                map.len() - VERTEX_SYSTEM_FIELD_COUNT
152            );
153        }
154    }
155}
156
157/// Promote a VID-string placeholder variable to a `Map`, draining its
158/// dotted system/property columns (`var.<prop>`) into the new map.
159///
160/// Used by `record_batches_to_rows` for search-procedure outputs, where the
161/// bare variable arrives as a VID string rather than a materialized Map.
162fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163    let prefix = format!("{}.", var);
164    let mut map = HashMap::new();
165
166    let dotted_keys: Vec<String> = row
167        .keys()
168        .filter(|k| k.starts_with(&prefix))
169        .cloned()
170        .collect();
171
172    for key in &dotted_keys {
173        let prop_name = &key[prefix.len()..];
174        if let Some(val) = row.remove(key) {
175            map.insert(prop_name.to_string(), val);
176        }
177    }
178
179    // Replace the VID-string placeholder with the constructed Map
180    row.insert(var.to_string(), Value::Map(map));
181}
182
183/// Merge the helper system columns (`var._vid`, `var._labels`, `var._eid`,
184/// `var._type`) and remaining USER property columns (`var.<prop>`) into the
185/// bare `Map` variable, removing the dotted helper columns from `row`.
186///
187/// Merging user properties keeps `RETURN var.prop` consistent after a
188/// unit-subquery SET refreshes the dotted columns: without it the (now stale)
189/// bare entity Struct from the outer scan would override the post-SET values.
190/// Internal helpers (`_all_props`, `_src_vid`, …) and `overflow_json` are
191/// dropped silently.
192fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193    // Merge node system fields (_vid, _labels)
194    let vid_key = format!("{}._vid", var);
195    let labels_key = format!("{}._labels", var);
196
197    let vid_val = row.remove(&vid_key);
198    let labels_val = row.remove(&labels_key);
199
200    if let Some(Value::Map(map)) = row.get_mut(var) {
201        if let Some(v) = vid_val {
202            map.insert("_vid".to_string(), v);
203        }
204        if let Some(v) = labels_val {
205            map.insert("_labels".to_string(), v);
206        }
207    }
208
209    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
210    // These are emitted as helper columns by the traverse exec.
211    // The structural projection already includes them in the struct,
212    // but we still need to remove the dotted helper columns.
213    let eid_key = format!("{}._eid", var);
214    let type_key = format!("{}._type", var);
215
216    let eid_val = row.remove(&eid_key);
217    let type_val = row.remove(&type_key);
218
219    if (eid_val.is_some() || type_val.is_some())
220        && let Some(Value::Map(map)) = row.get_mut(var)
221    {
222        if let Some(v) = eid_val {
223            map.entry("_eid".to_string()).or_insert(v);
224        }
225        if let Some(v) = type_val {
226            map.entry("_type".to_string()).or_insert(v);
227        }
228    }
229
230    // Drain remaining dotted columns, merging surviving USER properties.
231    let prefix = format!("{}.", var);
232    let dotted_keys: Vec<String> = row
233        .keys()
234        .filter(|k| k.starts_with(&prefix))
235        .cloned()
236        .collect();
237    for key in dotted_keys {
238        let prop_name = key[prefix.len()..].to_string();
239        let val = row.remove(&key);
240        if prop_name.starts_with('_') || prop_name == "overflow_json" {
241            continue;
242        }
243        if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244            map.insert(prop_name, val);
245        }
246    }
247}
248
249impl Executor {
250    /// Helper to verify and filter candidates against an optional predicate.
251    ///
252    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
253    /// Returns only VIDs that pass the filter (or are not deleted).
254    async fn verify_and_filter_candidates(
255        &self,
256        mut candidates: Vec<Vid>,
257        variable: &str,
258        filter: Option<&Expr>,
259        ctx: Option<&QueryContext>,
260        prop_manager: &PropertyManager,
261        params: &HashMap<String, Value>,
262    ) -> Result<Vec<Vid>> {
263        candidates.sort_unstable();
264        candidates.dedup();
265
266        let mut verified_vids = Vec::new();
267        for vid in candidates {
268            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269                continue; // Deleted
270            };
271
272            if let Some(expr) = filter {
273                let mut props_map: HashMap<String, Value> = props;
274                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276                let mut row = HashMap::new();
277                row.insert(variable.to_string(), Value::Map(props_map));
278
279                let res = self
280                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
281                    .await?;
282                if res.as_bool().unwrap_or(false) {
283                    verified_vids.push(vid);
284                }
285            } else {
286                verified_vids.push(vid);
287            }
288        }
289
290        Ok(verified_vids)
291    }
292
293    pub(crate) async fn scan_storage_candidates(
294        &self,
295        label_id: u16,
296        variable: &str,
297        filter: Option<&Expr>,
298    ) -> Result<Vec<Vid>> {
299        let schema = self.storage.schema_manager().schema();
300        let label_name = schema
301            .label_name_by_id(label_id)
302            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304        // Generate filter SQL from expression
305        let empty_props = std::collections::HashMap::new();
306        let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307        let filter_sql = filter.and_then(|expr| {
308            LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309        });
310
311        self.storage
312            .scan_vertex_candidates(label_name, filter_sql.as_deref())
313            .await
314    }
315
316    pub(crate) async fn scan_label_with_filter(
317        &self,
318        label_id: u16,
319        variable: &str,
320        filter: Option<&Expr>,
321        ctx: Option<&QueryContext>,
322        prop_manager: &PropertyManager,
323        params: &HashMap<String, Value>,
324    ) -> Result<Vec<Vid>> {
325        let mut candidates = self
326            .scan_storage_candidates(label_id, variable, filter)
327            .await?;
328
329        // Convert label_id to label_name for L0 lookup
330        let schema = self.storage.schema_manager().schema();
331        if let Some(label_name) = schema.label_name_by_id(label_id) {
332            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333        }
334
335        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336            .await
337    }
338
339    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340        // Handle Value::Node directly (has vid field)
341        if let Value::Node(node) = val {
342            return Ok(node.vid);
343        }
344        // Handle Object (node) containing _vid field
345        if let Value::Map(map) = val
346            && let Some(vid_val) = map.get("_vid")
347            && let Some(v) = vid_val.as_u64()
348        {
349            return Ok(Vid::from(v));
350        }
351        // Handle string format
352        if let Some(s) = val.as_str()
353            && let Ok(id) = s.parse::<u64>()
354        {
355            return Ok(Vid::new(id));
356        }
357        // Handle raw u64
358        if let Some(v) = val.as_u64() {
359            return Ok(Vid::from(v));
360        }
361        Err(anyhow!("Invalid Vid format: {:?}", val))
362    }
363
364    /// Find a node value in the row by VID.
365    ///
366    /// Scans all values in the row, looking for a node (Map or Node) whose VID
367    /// matches the target. Returns the full node value if found, or a minimal
368    /// Map with just `_vid` as fallback.
369    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370        for val in row.values() {
371            if let Ok(vid) = Self::vid_from_value(val)
372                && vid == target_vid
373            {
374                return val.clone();
375            }
376        }
377        // Fallback: return minimal node map
378        Value::Map(HashMap::from([(
379            "_vid".to_string(),
380            Value::Int(target_vid.as_u64() as i64),
381        )]))
382    }
383
384    /// Create L0 context, session, and planner for DataFusion execution.
385    ///
386    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
387    /// Returns `(session_ctx, planner, prop_manager_arc)`.
388    pub async fn create_datafusion_planner(
389        &self,
390        prop_manager: &PropertyManager,
391        params: &HashMap<String, Value>,
392    ) -> Result<(
393        Arc<SyncRwLock<SessionContext>>,
394        HybridPhysicalPlanner,
395        Arc<PropertyManager>,
396    )> {
397        let query_ctx = self.get_context().await;
398        let l0_context = match query_ctx {
399            Some(ref ctx) => L0Context::from_query_context(ctx),
400            None => L0Context::empty(),
401        };
402
403        // Prefer the shared `Arc<PropertyManager>` installed via
404        // `Executor::set_prop_manager` — skips the LRU/Mutex allocation cost
405        // (~80 µs/query) of constructing a fresh, immediately-abandoned
406        // PropertyManager. Falls back to the legacy fresh-construction path
407        // for callers that have not installed one.
408        let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
409            shared.clone()
410        } else {
411            Arc::new(PropertyManager::new(
412                self.storage.clone(),
413                self.storage.schema_manager_arc(),
414                prop_manager.cache_size(),
415            ))
416        };
417
418        // Two-mode session construction (hot/cold template path, main)
419        // fused with plugin-framework scalar/optimizer registration
420        // (plugin-fw).
421        //
422        // Hot path: clone the pre-built `df_session_template` — an O(1)
423        // Arc bump on the inner `SessionState`. Skips the ~140 µs cost of
424        // `SessionContext::new()` + `register_cypher_udfs()`. The template
425        // only has the built-in Cypher UDFs baked in (see
426        // `Uni::build` → `register_cypher_udfs`), so it is ONLY safe to
427        // reuse when this query needs no per-query dynamic registration:
428        //   - no legacy `CustomFunctionRegistry` scalars,
429        //   - no plugin-registered optimizer rules,
430        //   - no host-level plugin scalars (`Uni::load_*_plugin`),
431        //   - no session-scoped plugin scalars.
432        //
433        // Cold path: construct a fresh, isolated `SessionContext` (folding
434        // in any plugin optimizer rules) and register the dynamic scalar
435        // sets so they do not leak into the shared template.
436        let has_custom_udfs = self
437            .custom_function_registry
438            .as_ref()
439            .is_some_and(|r| !r.is_empty());
440        let host_plugin_registry = self
441            .procedure_registry
442            .as_ref()
443            .and_then(|pr| pr.plugin_registry());
444        // Optimizer rules come from the host plugin registry (the legacy
445        // `CustomFunctionRegistry` only ever carried scalar fns). When no
446        // host registry is attached, there are no plugin optimizer rules.
447        let optimizer_providers = host_plugin_registry
448            .as_ref()
449            .map(|pr| pr.optimizer_rules())
450            .unwrap_or_default();
451        let session_local_registry =
452            crate::query::df_udfs_plugin::current_session_plugin_registry();
453        let needs_dynamic_registration = has_custom_udfs
454            || !optimizer_providers.is_empty()
455            || host_plugin_registry.is_some()
456            || session_local_registry.is_some();
457
458        let session = if let (Some(tmpl), false) = (
459            self.df_session_template.as_ref(),
460            needs_dynamic_registration,
461        ) {
462            // Hot path: clone the template (Cypher UDFs already registered).
463            (**tmpl).clone()
464        } else {
465            // Cold path: build a fresh session, optionally with any
466            // plugin-registered optimizer rules folded in.
467            //
468            // M5h: build a `SessionContext` that includes any
469            // plugin-registered optimizer rules. The rule chain is
470            // snapshotted at session-construction time; reload discipline
471            // is M10's problem.
472            let session = if optimizer_providers.is_empty() {
473                SessionContext::new()
474            } else {
475                use datafusion::execution::session_state::SessionStateBuilder;
476                use uni_plugin::traits::operator::OptimizerPhase;
477                let mut builder = SessionStateBuilder::new().with_default_features();
478                for provider in optimizer_providers.iter() {
479                    match provider.phase() {
480                        OptimizerPhase::Logical => {
481                            builder = builder.with_optimizer_rule(provider.rule());
482                        }
483                        OptimizerPhase::Physical => {
484                            if let Some(rule) = provider.physical_rule() {
485                                builder = builder.with_physical_optimizer_rule(rule);
486                            } else {
487                                tracing::debug!(
488                                    target: "uni.plugin.registry",
489                                    "physical-phase provider returned no physical_rule(); skipping"
490                                );
491                            }
492                        }
493                        OptimizerPhase::Both => {
494                            builder = builder.with_optimizer_rule(provider.rule());
495                            if let Some(rule) = provider.physical_rule() {
496                                builder = builder.with_physical_optimizer_rule(rule);
497                            }
498                        }
499                        _ => {
500                            tracing::debug!(
501                                target: "uni.plugin.registry",
502                                "skipping optimizer rule for unknown phase"
503                            );
504                        }
505                    }
506                }
507                let state = builder.build();
508                SessionContext::new_with_state(state)
509            };
510            crate::query::df_udfs::register_cypher_udfs(&session)?;
511            // Instance-scope, legacy `CustomFunctionRegistry` shadow path
512            // (db.register_function() entries + apoc-core mirrors). Routed
513            // through the plugin-registry adapter (plugin-fw).
514            if let Some(ref registry) = self.custom_function_registry {
515                crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
516                    &session, registry,
517                )?;
518            }
519            // Instance-scope, **host** plugin registry (M8.6 follow-up):
520            // `Uni::load_python_plugin` / `Uni::load_rhai_plugin` /
521            // `Uni::load_wasm_*` and other host-level plugin-load paths
522            // register into `UniInner.plugin_registry`. The executor
523            // reaches that registry via the procedure-registry's
524            // attached handle (set by `Uni::build` at construction time).
525            // Without this branch, scalars added through
526            // `Uni::load_*_plugin` were invisible to Cypher's UDF
527            // resolution despite being directly invokable via
528            // `db.plugin_registry().scalar_fn(...)`.
529            if let Some(ref host_pr) = host_plugin_registry {
530                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
531            }
532            // Session-scope (M8.6): when the call is inside a
533            // `scoped_with_session_plugin_registry` scope (set by a host
534            // crate's per-query execution path), register the session's
535            // local plugin scalars *last* so they shadow instance entries
536            // by name (DataFusion's `register_udf` is last-write-wins).
537            if let Some(ref session_local) = session_local_registry {
538                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
539            }
540            session
541        };
542        let session_ctx = Arc::new(SyncRwLock::new(session));
543
544        let mut planner = HybridPhysicalPlanner::with_l0_context(
545            session_ctx.clone(),
546            self.storage.clone(),
547            l0_context,
548            prop_manager_arc.clone(),
549            self.storage.schema_manager().schema(),
550            params.clone(),
551            HashMap::new(),
552        );
553
554        planner = planner.with_algo_registry(self.algo_registry.clone());
555        if let Some(ref registry) = self.procedure_registry {
556            planner = planner.with_procedure_registry(registry.clone());
557        }
558        // M5b follow-up #4: thread the host's plugin registry through to
559        // the physical planner so `plan_vector_knn` can consult
560        // `register_index_handle` for plugin-supplied vector index
561        // handles. The procedure registry's attached plugin registry is
562        // the host's actual instance (set by `Uni::build`, and what
563        // `db.plugin_registry()` returns to user code). The legacy
564        // `CustomFunctionRegistry` only ever carried scalar fns (no index
565        // handles / optimizer rules), so it is not a source here.
566        let host_plugin_registry = self
567            .procedure_registry
568            .as_ref()
569            .and_then(|r| r.plugin_registry());
570        if let Some(registry) = host_plugin_registry {
571            planner = planner.with_plugin_registry(registry);
572        }
573        if let Some(ref xervo_runtime) = self.xervo_runtime {
574            planner = planner.with_xervo_runtime(xervo_runtime.clone());
575        }
576        // FU-1 / M11 #6: thread the outer transaction's writer handle
577        // through so declared `WRITE`-mode procedures invoked from
578        // this query can run their Cypher bodies via the write-enabled
579        // inner-query host.
580        if let Some(writer) = &self.writer {
581            planner = planner.with_writer(Arc::clone(writer));
582        }
583
584        Ok((session_ctx, planner, prop_manager_arc))
585    }
586
587    /// Execute a DataFusion physical plan and collect all result batches.
588    pub fn collect_batches(
589        session_ctx: &Arc<SyncRwLock<SessionContext>>,
590        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
591    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
592        Box::pin(async move {
593            use futures::TryStreamExt;
594
595            let task_ctx = session_ctx.read().task_ctx();
596            let partition_count = execution_plan.output_partitioning().partition_count();
597            let mut all_batches = Vec::new();
598            for partition in 0..partition_count {
599                let stream = execution_plan.execute(partition, task_ctx.clone())?;
600                let batches: Vec<RecordBatch> = stream.try_collect().await?;
601                all_batches.extend(batches);
602            }
603            Ok(all_batches)
604        })
605    }
606
607    /// Executes a query using the DataFusion-based engine.
608    ///
609    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
610    /// trees with custom graph operators for graph-specific operations.
611    pub async fn execute_datafusion(
612        &self,
613        plan: LogicalPlan,
614        prop_manager: &PropertyManager,
615        params: &HashMap<String, Value>,
616    ) -> Result<Vec<RecordBatch>> {
617        let (batches, _plan) = self
618            .execute_datafusion_with_plan(plan, prop_manager, params)
619            .await?;
620        Ok(batches)
621    }
622
623    /// Executes a query using the DataFusion-based engine, returning both
624    /// result batches and the physical execution plan.
625    ///
626    /// The returned `Arc<dyn ExecutionPlan>` can be walked to extract per-operator
627    /// metrics (e.g., `output_rows`, `elapsed_compute`) that DataFusion's
628    /// `BaselineMetrics` recorded during execution.
629    pub async fn execute_datafusion_with_plan(
630        &self,
631        plan: LogicalPlan,
632        prop_manager: &PropertyManager,
633        params: &HashMap<String, Value>,
634    ) -> Result<(
635        Vec<RecordBatch>,
636        Arc<dyn datafusion::physical_plan::ExecutionPlan>,
637    )> {
638        let (session_ctx, mut planner, prop_manager_arc) =
639            self.create_datafusion_planner(prop_manager, params).await?;
640
641        // Build MutationContext when the plan contains write operations
642        if Self::contains_write_operations(&plan) {
643            let writer = self
644                .writer
645                .as_ref()
646                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
647                .clone();
648            let query_ctx = self.get_context().await;
649
650            debug_assert!(
651                query_ctx.is_some(),
652                "BUG: query_ctx is None for write operation"
653            );
654
655            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
656                executor: self.clone(),
657                writer,
658                prop_manager: prop_manager_arc,
659                params: params.clone(),
660                query_ctx,
661                tx_l0_override: self.transaction_l0_override.clone(),
662            });
663            planner = planner.with_mutation_context(mutation_ctx);
664            tracing::debug!(
665                plan_type = Self::get_plan_type(&plan),
666                "Mutation routed to DataFusion engine"
667            );
668        }
669
670        let execution_plan = planner.plan(&plan)?;
671        let plan_clone = Arc::clone(&execution_plan);
672        let result = Self::collect_batches(&session_ctx, execution_plan).await;
673
674        // Harvest warnings from the graph execution context after query completion.
675        let graph_warnings = planner.graph_ctx().take_warnings();
676        if !graph_warnings.is_empty()
677            && let Ok(mut w) = self.warnings.lock()
678        {
679            w.extend(graph_warnings);
680        }
681
682        result.map(|batches| (batches, plan_clone))
683    }
684
685    /// Execute a MERGE read sub-plan through the DataFusion engine.
686    ///
687    /// Plans and executes the MERGE pattern match using flat columnar output
688    /// (no structural projections), then groups dotted columns (`a._vid`,
689    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
690    pub(crate) async fn execute_merge_read_plan(
691        &self,
692        plan: LogicalPlan,
693        prop_manager: &PropertyManager,
694        params: &HashMap<String, Value>,
695        merge_variables: Vec<String>,
696    ) -> Result<Vec<HashMap<String, Value>>> {
697        let (session_ctx, planner, _prop_manager_arc) =
698            self.create_datafusion_planner(prop_manager, params).await?;
699
700        // Plan with full property access ("*") for all merge variables so that
701        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
702        let extra: HashMap<String, HashSet<String>> = merge_variables
703            .iter()
704            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
705            .collect();
706        let execution_plan = planner.plan_with_properties(&plan, extra)?;
707        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
708
709        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
710        let flat_rows = self.record_batches_to_rows(all_batches)?;
711
712        // Group dotted columns into per-variable Maps for MERGE's match logic.
713        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
714        let rows = flat_rows
715            .into_iter()
716            .map(|mut row| {
717                for var in &merge_variables {
718                    // Skip if already materialized (e.g., by record_batches_to_rows)
719                    if row.contains_key(var) {
720                        continue;
721                    }
722                    let prefix = format!("{}.", var);
723                    let dotted_keys: Vec<String> = row
724                        .keys()
725                        .filter(|k| k.starts_with(&prefix))
726                        .cloned()
727                        .collect();
728                    if !dotted_keys.is_empty() {
729                        let mut map = HashMap::new();
730                        for key in dotted_keys {
731                            let prop_name = key[prefix.len()..].to_string();
732                            if let Some(val) = row.remove(&key) {
733                                map.insert(prop_name, val);
734                            }
735                        }
736                        row.insert(var.clone(), Value::Map(map));
737                    }
738                }
739                row
740            })
741            .collect();
742
743        Ok(rows)
744    }
745
746    /// Converts DataFusion RecordBatches to row-based HashMap format.
747    ///
748    /// Handles special metadata on fields:
749    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
750    ///
751    /// Also normalizes path structures to user-facing format (converts _vid to _id).
752    pub(crate) fn record_batches_to_rows(
753        &self,
754        batches: Vec<RecordBatch>,
755    ) -> Result<Vec<HashMap<String, Value>>> {
756        let mut rows = Vec::new();
757
758        for batch in batches {
759            let num_rows = batch.num_rows();
760            let schema = batch.schema();
761
762            for row_idx in 0..num_rows {
763                let mut row = HashMap::new();
764
765                for (col_idx, field) in schema.fields().iter().enumerate() {
766                    let column = batch.column(col_idx);
767                    // Infer Uni DataType from Arrow type for DateTime/Time struct decoding
768                    let data_type =
769                        if uni_common::core::schema::is_datetime_struct(field.data_type()) {
770                            Some(&uni_common::DataType::DateTime)
771                        } else if uni_common::core::schema::is_time_struct(field.data_type()) {
772                            Some(&uni_common::DataType::Time)
773                        } else {
774                            None
775                        };
776                    let mut value =
777                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
778
779                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
780                    // Parse JSON string to restore the original type
781                    if field
782                        .metadata()
783                        .get("cv_encoded")
784                        .is_some_and(|v| v == "true")
785                        && let Value::String(s) = &value
786                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
787                    {
788                        value = Value::from(parsed);
789                    }
790
791                    // Normalize path structures to user-facing format
792                    value = Self::normalize_path_if_needed(value);
793
794                    row.insert(field.name().clone(), value);
795                }
796
797                // Merge system fields into bare variable maps.
798                // The projection step emits helper columns like "n._vid" and "n._labels"
799                // alongside the materialized "n" column (a Map of user properties).
800                // Here we merge those system fields into the map and remove the helpers.
801                //
802                // For search procedures (vector/FTS), the bare variable may be a VID
803                // string placeholder rather than a Map. In that case, promote it to a
804                // Map so we can merge system fields and properties into it.
805                let bare_vars: Vec<String> = row
806                    .keys()
807                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
808                    .cloned()
809                    .collect();
810
811                // Detect VID-placeholder variables that have system columns
812                // (e.g., "node" is String("1") but "node._vid" exists).
813                // Promote these to Maps so system fields can be merged in.
814                let vid_placeholder_vars: Vec<String> = row
815                    .keys()
816                    .filter(|k| {
817                        !k.contains('.')
818                            && matches!(row.get(*k), Some(Value::String(_)))
819                            && row.contains_key(&format!("{}._vid", k))
820                    })
821                    .cloned()
822                    .collect();
823
824                for var in &vid_placeholder_vars {
825                    promote_vid_placeholder(&mut row, var);
826                }
827
828                for var in &bare_vars {
829                    merge_dotted_columns(&mut row, var);
830                }
831
832                rows.push(row);
833            }
834        }
835
836        Ok(rows)
837    }
838
839    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
840    ///
841    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
842    /// Other values are returned unchanged to avoid interfering with query execution.
843    fn normalize_path_if_needed(value: Value) -> Value {
844        match value {
845            Value::Map(map)
846                if map.contains_key("nodes")
847                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
848            {
849                Self::normalize_path_map(map)
850            }
851            other => other,
852        }
853    }
854
855    /// Normalize a path map object.
856    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
857        // Normalize nodes array
858        if let Some(Value::List(nodes)) = map.remove("nodes") {
859            let normalized_nodes: Vec<Value> = nodes
860                .into_iter()
861                .map(|n| {
862                    if let Value::Map(node_map) = n {
863                        Self::normalize_path_node_map(node_map)
864                    } else {
865                        n
866                    }
867                })
868                .collect();
869            map.insert("nodes".to_string(), Value::List(normalized_nodes));
870        }
871
872        // Normalize relationships array (may be called "relationships" or "edges")
873        let rels_key = if map.contains_key("relationships") {
874            "relationships"
875        } else {
876            "edges"
877        };
878        if let Some(Value::List(rels)) = map.remove(rels_key) {
879            let normalized_rels: Vec<Value> = rels
880                .into_iter()
881                .map(|r| {
882                    if let Value::Map(rel_map) = r {
883                        Self::normalize_path_edge_map(rel_map)
884                    } else {
885                        r
886                    }
887                })
888                .collect();
889            map.insert("relationships".to_string(), Value::List(normalized_rels));
890        }
891
892        Value::Map(map)
893    }
894
895    /// Convert a Value to its string representation for path normalization.
896    fn value_to_id_string(val: Value) -> String {
897        match val {
898            Value::Int(n) => n.to_string(),
899            Value::Float(n) => n.to_string(),
900            Value::String(s) => s,
901            other => other.to_string(),
902        }
903    }
904
905    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
906    /// When `src_key == dst_key`, this simply stringifies the value in place.
907    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
908        if let Some(val) = map.remove(src_key) {
909            map.insert(
910                dst_key.to_string(),
911                Value::String(Self::value_to_id_string(val)),
912            );
913        }
914    }
915
916    /// Ensure the "properties" field is a non-null map.
917    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
918        match map.get("properties") {
919            Some(props) if !props.is_null() => {}
920            _ => {
921                map.insert("properties".to_string(), Value::Map(HashMap::new()));
922            }
923        }
924    }
925
926    /// Normalize a node within a path to user-facing format.
927    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
928        Self::stringify_map_field(&mut map, "_vid", "_id");
929        Self::ensure_properties_map(&mut map);
930        Value::Map(map)
931    }
932
933    /// Normalize an edge within a path to user-facing format.
934    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
935        Self::stringify_map_field(&mut map, "_eid", "_id");
936        Self::stringify_map_field(&mut map, "_src", "_src");
937        Self::stringify_map_field(&mut map, "_dst", "_dst");
938
939        if let Some(type_name) = map.remove("_type_name") {
940            map.insert("_type".to_string(), type_name);
941        }
942
943        Self::ensure_properties_map(&mut map);
944        Value::Map(map)
945    }
946
947    #[instrument(
948        skip(self, prop_manager, params),
949        fields(rows_returned, duration_ms),
950        level = "info"
951    )]
952    pub fn execute<'a>(
953        &'a self,
954        plan: LogicalPlan,
955        prop_manager: &'a PropertyManager,
956        params: &'a HashMap<String, Value>,
957    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
958        Box::pin(async move {
959            let query_type = Self::get_plan_type(&plan);
960            let ctx = self.get_context().await;
961            let start = Instant::now();
962
963            // Route DDL/Admin queries to the fallback executor.
964            // All other queries (including similar_to) flow through DataFusion.
965            let res = if Self::is_ddl_or_admin(&plan) {
966                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
967                    .await
968            } else {
969                let batches = self.execute_datafusion(plan, prop_manager, params).await?;
970                self.record_batches_to_rows(batches)
971            };
972
973            let duration = start.elapsed();
974            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
975                .record(duration.as_secs_f64());
976
977            tracing::Span::current().record("duration_ms", duration.as_millis());
978            match &res {
979                Ok(rows) => {
980                    tracing::Span::current().record("rows_returned", rows.len());
981                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
982                        .increment(rows.len() as u64);
983                }
984                Err(e) => {
985                    let error_type = if e.to_string().contains("timed out") {
986                        "timeout"
987                    } else if e.to_string().contains("syntax") {
988                        "syntax"
989                    } else {
990                        "execution"
991                    };
992                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
993                }
994            }
995
996            res
997        })
998    }
999
1000    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1001        match plan {
1002            LogicalPlan::Scan { .. } => "read_scan",
1003            LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1004            LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1005            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1006            LogicalPlan::Traverse { .. } => "read_traverse",
1007            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1008            LogicalPlan::ScanAll { .. } => "read_scan_all",
1009            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1010            LogicalPlan::VectorKnn { .. } => "read_vector",
1011            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1012            LogicalPlan::Merge { .. } => "write_merge",
1013            LogicalPlan::Delete { .. } => "write_delete",
1014            LogicalPlan::Set { .. } => "write_set",
1015            LogicalPlan::Remove { .. } => "write_remove",
1016            LogicalPlan::ProcedureCall { .. } => "call",
1017            LogicalPlan::Copy { .. } => "copy",
1018            LogicalPlan::Backup { .. } => "backup",
1019            _ => "other",
1020        }
1021    }
1022
1023    /// Return all direct child plan references from a `LogicalPlan`.
1024    ///
1025    /// This centralizes the variant→children mapping so that recursive walkers
1026    /// (e.g., `contains_write_operations`) can delegate the
1027    /// "recurse into children" logic instead of duplicating the match arms.
1028    ///
1029    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
1030    /// is not included because it requires special iteration. Callers that
1031    /// need to inspect the body should handle `Foreach` before falling through.
1032    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1033        match plan {
1034            // Single-input wrappers
1035            LogicalPlan::Project { input, .. }
1036            | LogicalPlan::Sort { input, .. }
1037            | LogicalPlan::Limit { input, .. }
1038            | LogicalPlan::Distinct { input }
1039            | LogicalPlan::Aggregate { input, .. }
1040            | LogicalPlan::Window { input, .. }
1041            | LogicalPlan::Unwind { input, .. }
1042            | LogicalPlan::Filter { input, .. }
1043            | LogicalPlan::Create { input, .. }
1044            | LogicalPlan::CreateBatch { input, .. }
1045            | LogicalPlan::Set { input, .. }
1046            | LogicalPlan::Remove { input, .. }
1047            | LogicalPlan::Delete { input, .. }
1048            | LogicalPlan::Merge { input, .. }
1049            | LogicalPlan::Foreach { input, .. }
1050            | LogicalPlan::Traverse { input, .. }
1051            | LogicalPlan::TraverseMainByType { input, .. }
1052            | LogicalPlan::BindZeroLengthPath { input, .. }
1053            | LogicalPlan::BindPath { input, .. }
1054            | LogicalPlan::ShortestPath { input, .. }
1055            | LogicalPlan::AllShortestPaths { input, .. }
1056            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1057
1058            // Two-input wrappers
1059            LogicalPlan::Apply {
1060                input, subquery, ..
1061            }
1062            | LogicalPlan::SubqueryCall { input, subquery } => {
1063                vec![input.as_ref(), subquery.as_ref()]
1064            }
1065            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1066                vec![left.as_ref(), right.as_ref()]
1067            }
1068            LogicalPlan::RecursiveCTE {
1069                initial, recursive, ..
1070            } => vec![initial.as_ref(), recursive.as_ref()],
1071            LogicalPlan::QuantifiedPattern {
1072                input,
1073                pattern_plan,
1074                ..
1075            } => vec![input.as_ref(), pattern_plan.as_ref()],
1076
1077            // Leaf nodes (scans, DDL, admin, etc.)
1078            _ => vec![],
1079        }
1080    }
1081
1082    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
1083    ///
1084    /// These operations don't produce data streams and aren't supported by the
1085    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
1086    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
1087    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
1088    pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1089        match plan {
1090            // DDL / schema operations
1091            LogicalPlan::CreateLabel(_)
1092            | LogicalPlan::CreateEdgeType(_)
1093            | LogicalPlan::AlterLabel(_)
1094            | LogicalPlan::AlterEdgeType(_)
1095            | LogicalPlan::DropLabel(_)
1096            | LogicalPlan::DropEdgeType(_)
1097            | LogicalPlan::CreateConstraint(_)
1098            | LogicalPlan::DropConstraint(_)
1099            | LogicalPlan::ShowConstraints(_) => true,
1100
1101            // Index operations
1102            LogicalPlan::CreateVectorIndex { .. }
1103            | LogicalPlan::CreateFullTextIndex { .. }
1104            | LogicalPlan::CreateScalarIndex { .. }
1105            | LogicalPlan::CreateJsonFtsIndex { .. }
1106            | LogicalPlan::DropIndex { .. }
1107            | LogicalPlan::ShowIndexes { .. } => true,
1108
1109            // Admin / utility operations
1110            LogicalPlan::ShowDatabase
1111            | LogicalPlan::ShowConfig
1112            | LogicalPlan::ShowStatistics
1113            | LogicalPlan::Vacuum
1114            | LogicalPlan::Checkpoint
1115            | LogicalPlan::Copy { .. }
1116            | LogicalPlan::CopyTo { .. }
1117            | LogicalPlan::CopyFrom { .. }
1118            | LogicalPlan::Backup { .. }
1119            | LogicalPlan::Explain { .. } => true,
1120
1121            // Procedure calls: DF-eligible procedures go through DataFusion,
1122            // everything else (DDL, admin, unknown) stays on fallback.
1123            LogicalPlan::ProcedureCall { procedure_name, .. } => {
1124                !Self::is_df_eligible_procedure(procedure_name)
1125            }
1126
1127            // Recurse through children using plan_children
1128            _ => Self::plan_children(plan)
1129                .iter()
1130                .any(|child| Self::is_ddl_or_admin(child)),
1131        }
1132    }
1133
1134    /// Returns `true` if the procedure is a read-only, data-producing procedure
1135    /// that can be executed through the DataFusion engine.
1136    ///
1137    /// This is a **positive allowlist** — unknown procedures default to the
1138    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
1139    fn is_df_eligible_procedure(name: &str) -> bool {
1140        matches!(
1141            name,
1142            "uni.schema.labels"
1143                | "uni.schema.edgeTypes"
1144                | "uni.schema.relationshipTypes"
1145                | "uni.schema.indexes"
1146                | "uni.schema.constraints"
1147                | "uni.schema.labelInfo"
1148                | "uni.vector.query"
1149                | "uni.fts.query"
1150                | "uni.search"
1151                // M5g — `uni.create.vNode` produces a typed Node yield
1152                // via the planner-level node-shape expansion in
1153                // `GraphProcedureCallExec`; only the DataFusion path
1154                // honours that, so route it there explicitly. (vEdge
1155                // emits a self-contained Struct column and works on
1156                // either path — list both for symmetry.)
1157                | "uni.create.vNode"
1158                | "uni.create.vEdge"
1159        ) || name.starts_with("uni.algo.")
1160    }
1161
1162    /// Check if a plan contains write/mutation operations anywhere in the tree.
1163    ///
1164    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
1165    /// are used to determine when a MutationContext needs to be built for DataFusion.
1166    /// This recurses through read-only wrapper nodes to detect writes nested inside
1167    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
1168    fn contains_write_operations(plan: &LogicalPlan) -> bool {
1169        match plan {
1170            LogicalPlan::Create { .. }
1171            | LogicalPlan::CreateBatch { .. }
1172            | LogicalPlan::Merge { .. }
1173            | LogicalPlan::Delete { .. }
1174            | LogicalPlan::Set { .. }
1175            | LogicalPlan::Remove { .. }
1176            | LogicalPlan::Foreach { .. } => true,
1177            _ => Self::plan_children(plan)
1178                .iter()
1179                .any(|child| Self::contains_write_operations(child)),
1180        }
1181    }
1182
1183    /// Executes a query as a stream of result batches.
1184    ///
1185    /// Routes DDL/Admin through the fallback executor and everything else
1186    /// through DataFusion.
1187    pub fn execute_stream(
1188        self,
1189        plan: LogicalPlan,
1190        prop_manager: Arc<PropertyManager>,
1191        params: HashMap<String, Value>,
1192    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1193        let this = self;
1194        let this_for_ctx = this.clone();
1195
1196        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1197
1198        ctx_stream
1199            .flat_map(move |ctx| {
1200                let plan = plan.clone();
1201                let this = this.clone();
1202                let prop_manager = prop_manager.clone();
1203                let params = params.clone();
1204
1205                let fut = async move {
1206                    if Self::is_ddl_or_admin(&plan) {
1207                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1208                            .await
1209                    } else {
1210                        let batches = this
1211                            .execute_datafusion(plan, &prop_manager, &params)
1212                            .await?;
1213                        this.record_batches_to_rows(batches)
1214                    }
1215                };
1216                stream::once(fut).boxed()
1217            })
1218            .boxed()
1219    }
1220
1221    /// Converts an Arrow array element at a given row index to a Value.
1222    /// Delegates to the shared implementation in arrow_convert module.
1223    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1224        arrow_convert::arrow_to_value(col, row, None)
1225    }
1226
1227    pub(crate) fn evaluate_expr<'a>(
1228        &'a self,
1229        expr: &'a Expr,
1230        row: &'a HashMap<String, Value>,
1231        prop_manager: &'a PropertyManager,
1232        params: &'a HashMap<String, Value>,
1233        ctx: Option<&'a QueryContext>,
1234    ) -> BoxFuture<'a, Result<Value>> {
1235        let this = self;
1236        Box::pin(async move {
1237            // First check if the expression itself is already pre-computed in the row
1238            let repr = expr.to_string_repr();
1239            if let Some(val) = row.get(&repr) {
1240                return Ok(val.clone());
1241            }
1242
1243            match expr {
1244                Expr::PatternComprehension { .. } => {
1245                    // Handled by DataFusion path via PatternComprehensionExecExpr
1246                    Err(anyhow::anyhow!(
1247                        "Pattern comprehensions are handled by DataFusion executor"
1248                    ))
1249                }
1250                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1251                    "COLLECT subqueries not yet supported in executor"
1252                )),
1253                Expr::Variable(name) => {
1254                    if let Some(val) = row.get(name) {
1255                        Ok(val.clone())
1256                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1257                        // Fallback: scan results may have system columns like "d._vid"
1258                        // without a materialized "d" Map. Return the VID so Property
1259                        // evaluation can fetch properties from storage.
1260                        Ok(vid_val.clone())
1261                    } else {
1262                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1263                    }
1264                }
1265                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1266                Expr::Property(var_expr, prop_name) => {
1267                    // Fast path: if the base is a Variable, try flat-key lookup first.
1268                    // DataFusion scan results use flat keys like "d.embedding" rather than
1269                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1270                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1271                        let flat_key = format!("{}.{}", var_name, prop_name);
1272                        if let Some(val) = row.get(flat_key.as_str()) {
1273                            return Ok(val.clone());
1274                        }
1275                    }
1276
1277                    let base_val = this
1278                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1279                        .await?;
1280
1281                    // Handle system properties _vid and _id directly
1282                    if (prop_name == "_vid" || prop_name == "_id")
1283                        && let Ok(vid) = Self::vid_from_value(&base_val)
1284                    {
1285                        return Ok(Value::Int(vid.as_u64() as i64));
1286                    }
1287
1288                    // Handle Value::Node - access properties directly or via prop manager
1289                    if let Value::Node(node) = &base_val {
1290                        // Handle system properties
1291                        if prop_name == "_vid" || prop_name == "_id" {
1292                            return Ok(Value::Int(node.vid.as_u64() as i64));
1293                        }
1294                        if prop_name == "_labels" {
1295                            return Ok(Value::List(
1296                                node.labels
1297                                    .iter()
1298                                    .map(|l| Value::String(l.clone()))
1299                                    .collect(),
1300                            ));
1301                        }
1302                        // Check in-memory properties first
1303                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1304                            return Ok(val.clone());
1305                        }
1306                        // Fallback to storage lookup
1307                        if let Ok(val) = prop_manager
1308                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1309                            .await
1310                        {
1311                            return Ok(val);
1312                        }
1313                        return Ok(Value::Null);
1314                    }
1315
1316                    // Handle Value::Edge - access properties directly or via prop manager
1317                    if let Value::Edge(edge) = &base_val {
1318                        // Handle system properties
1319                        if prop_name == "_eid" || prop_name == "_id" {
1320                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1321                        }
1322                        if prop_name == "_type" {
1323                            return Ok(Value::String(edge.edge_type.clone()));
1324                        }
1325                        if prop_name == "_src" {
1326                            return Ok(Value::Int(edge.src.as_u64() as i64));
1327                        }
1328                        if prop_name == "_dst" {
1329                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1330                        }
1331                        // Check in-memory properties first
1332                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1333                            return Ok(val.clone());
1334                        }
1335                        // Fallback to storage lookup
1336                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1337                        {
1338                            return Ok(val);
1339                        }
1340                        return Ok(Value::Null);
1341                    }
1342
1343                    // If base_val is an object (node/edge), check its properties first
1344                    // This handles properties from CREATE/MERGE that may not be persisted yet
1345                    if let Value::Map(map) = &base_val {
1346                        // First check top-level (for system properties like _id, _label, etc.)
1347                        if let Some(val) = map.get(prop_name.as_str()) {
1348                            return Ok(val.clone());
1349                        }
1350                        // Then check inside "properties" object (for user properties)
1351                        if let Some(Value::Map(props)) = map.get("properties")
1352                            && let Some(val) = props.get(prop_name.as_str())
1353                        {
1354                            return Ok(val.clone());
1355                        }
1356                        // Fallback to storage lookup using _vid or _id
1357                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1358                            map.get("_id")
1359                                .and_then(|v| v.as_str())
1360                                .and_then(|s| s.parse::<u64>().ok())
1361                        });
1362                        if let Some(id) = vid_opt {
1363                            let vid = Vid::from(id);
1364                            if let Ok(val) = prop_manager
1365                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1366                                .await
1367                            {
1368                                return Ok(val);
1369                            }
1370                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1371                            let eid = uni_common::core::id::Eid::from(id);
1372                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1373                                return Ok(val);
1374                            }
1375                        }
1376                        return Ok(Value::Null);
1377                    }
1378
1379                    // If base_val is just a VID, fetch from property manager
1380                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1381                        return prop_manager
1382                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1383                            .await;
1384                    }
1385
1386                    if base_val.is_null() {
1387                        return Ok(Value::Null);
1388                    }
1389
1390                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1391                    {
1392                        use crate::query::datetime::{
1393                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1394                            is_duration_string, is_temporal_accessor, is_temporal_string,
1395                        };
1396
1397                        // Handle Value::Temporal directly (no string parsing needed)
1398                        if let Value::Temporal(tv) = &base_val {
1399                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1400                                if is_duration_accessor(prop_name) {
1401                                    // Convert to string for the existing accessor logic
1402                                    return eval_duration_accessor(
1403                                        &base_val.to_string(),
1404                                        prop_name,
1405                                    );
1406                                }
1407                            } else if is_temporal_accessor(prop_name) {
1408                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1409                            }
1410                        }
1411
1412                        // Handle Value::String temporal (backward compat)
1413                        if let Value::String(s) = &base_val {
1414                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1415                                return eval_temporal_accessor(s, prop_name);
1416                            }
1417                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1418                                return eval_duration_accessor(s, prop_name);
1419                            }
1420                        }
1421                    }
1422
1423                    Err(anyhow!(
1424                        "Cannot access property '{}' on {:?}",
1425                        prop_name,
1426                        base_val
1427                    ))
1428                }
1429                Expr::ArrayIndex {
1430                    array: arr_expr,
1431                    index: idx_expr,
1432                } => {
1433                    let arr_val = this
1434                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1435                        .await?;
1436                    let idx_val = this
1437                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1438                        .await?;
1439
1440                    if let Value::List(arr) = &arr_val {
1441                        // Handle signed indices (allow negative)
1442                        if let Some(i) = idx_val.as_i64() {
1443                            let idx = if i < 0 {
1444                                // Negative index: -1 = last element, -2 = second to last, etc.
1445                                let positive_idx = arr.len() as i64 + i;
1446                                if positive_idx < 0 {
1447                                    return Ok(Value::Null); // Out of bounds
1448                                }
1449                                positive_idx as usize
1450                            } else {
1451                                i as usize
1452                            };
1453                            if idx < arr.len() {
1454                                return Ok(arr[idx].clone());
1455                            }
1456                            return Ok(Value::Null);
1457                        } else if idx_val.is_null() {
1458                            return Ok(Value::Null);
1459                        } else {
1460                            return Err(anyhow::anyhow!(
1461                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1462                                idx_val
1463                            ));
1464                        }
1465                    }
1466                    if let Value::Map(map) = &arr_val {
1467                        if let Some(key) = idx_val.as_str() {
1468                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1469                        } else if !idx_val.is_null() {
1470                            return Err(anyhow::anyhow!(
1471                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1472                                idx_val
1473                            ));
1474                        }
1475                    }
1476                    // Handle bracket access on Node: n['name'] returns property
1477                    if let Value::Node(node) = &arr_val {
1478                        if let Some(key) = idx_val.as_str() {
1479                            // Check in-memory properties first
1480                            if let Some(val) = node.properties.get(key) {
1481                                return Ok(val.clone());
1482                            }
1483                            // Fallback to property manager
1484                            if let Ok(val) = prop_manager
1485                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1486                                .await
1487                            {
1488                                return Ok(val);
1489                            }
1490                            return Ok(Value::Null);
1491                        } else if !idx_val.is_null() {
1492                            return Err(anyhow::anyhow!(
1493                                "TypeError: Node index must be a string, got: {:?}",
1494                                idx_val
1495                            ));
1496                        }
1497                    }
1498                    // Handle bracket access on Edge: e['property'] returns property
1499                    if let Value::Edge(edge) = &arr_val {
1500                        if let Some(key) = idx_val.as_str() {
1501                            // Check in-memory properties first
1502                            if let Some(val) = edge.properties.get(key) {
1503                                return Ok(val.clone());
1504                            }
1505                            // Fallback to property manager
1506                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1507                                return Ok(val);
1508                            }
1509                            return Ok(Value::Null);
1510                        } else if !idx_val.is_null() {
1511                            return Err(anyhow::anyhow!(
1512                                "TypeError: Edge index must be a string, got: {:?}",
1513                                idx_val
1514                            ));
1515                        }
1516                    }
1517                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1518                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1519                        && let Some(key) = idx_val.as_str()
1520                    {
1521                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1522                        {
1523                            return Ok(val);
1524                        }
1525                        return Ok(Value::Null);
1526                    }
1527                    if arr_val.is_null() {
1528                        return Ok(Value::Null);
1529                    }
1530                    Err(anyhow!(
1531                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1532                        arr_val
1533                    ))
1534                }
1535                Expr::ArraySlice { array, start, end } => {
1536                    let arr_val = this
1537                        .evaluate_expr(array, row, prop_manager, params, ctx)
1538                        .await?;
1539
1540                    if let Value::List(arr) = &arr_val {
1541                        let len = arr.len();
1542
1543                        // Evaluate start index (default to 0), null → null result
1544                        let start_idx = if let Some(s) = start {
1545                            let v = this
1546                                .evaluate_expr(s, row, prop_manager, params, ctx)
1547                                .await?;
1548                            if v.is_null() {
1549                                return Ok(Value::Null);
1550                            }
1551                            let raw = v.as_i64().unwrap_or(0);
1552                            if raw < 0 {
1553                                (len as i64 + raw).max(0) as usize
1554                            } else {
1555                                (raw as usize).min(len)
1556                            }
1557                        } else {
1558                            0
1559                        };
1560
1561                        // Evaluate end index (default to length), null → null result
1562                        let end_idx = if let Some(e) = end {
1563                            let v = this
1564                                .evaluate_expr(e, row, prop_manager, params, ctx)
1565                                .await?;
1566                            if v.is_null() {
1567                                return Ok(Value::Null);
1568                            }
1569                            let raw = v.as_i64().unwrap_or(len as i64);
1570                            if raw < 0 {
1571                                (len as i64 + raw).max(0) as usize
1572                            } else {
1573                                (raw as usize).min(len)
1574                            }
1575                        } else {
1576                            len
1577                        };
1578
1579                        // Return sliced array
1580                        if start_idx >= end_idx {
1581                            return Ok(Value::List(vec![]));
1582                        }
1583                        let end_idx = end_idx.min(len);
1584                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1585                    }
1586
1587                    if arr_val.is_null() {
1588                        return Ok(Value::Null);
1589                    }
1590                    Err(anyhow!("Cannot slice {:?}", arr_val))
1591                }
1592                Expr::Literal(lit) => Ok(lit.to_value()),
1593                Expr::List(items) => {
1594                    let mut vals = Vec::new();
1595                    for item in items {
1596                        vals.push(
1597                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1598                                .await?,
1599                        );
1600                    }
1601                    Ok(Value::List(vals))
1602                }
1603                Expr::Map(items) => {
1604                    let mut map = HashMap::new();
1605                    for (key, value_expr) in items {
1606                        let val = this
1607                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1608                            .await?;
1609                        map.insert(key.clone(), val);
1610                    }
1611                    Ok(Value::Map(map))
1612                }
1613                Expr::Exists { query, .. } => {
1614                    // Plan and execute subquery; failures return false (pattern doesn't match)
1615                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1616                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1617
1618                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1619                        Ok(plan) => {
1620                            let mut sub_params = params.clone();
1621                            sub_params.extend(row.clone());
1622
1623                            match this.execute(plan, prop_manager, &sub_params).await {
1624                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1625                                Err(e) => {
1626                                    log::debug!("EXISTS subquery execution failed: {}", e);
1627                                    Ok(Value::Bool(false))
1628                                }
1629                            }
1630                        }
1631                        Err(e) => {
1632                            log::debug!("EXISTS subquery planning failed: {}", e);
1633                            Ok(Value::Bool(false))
1634                        }
1635                    }
1636                }
1637                Expr::CountSubquery(query) => {
1638                    // Similar to Exists but returns count
1639                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1640
1641                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1642
1643                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1644                        Ok(plan) => {
1645                            let mut sub_params = params.clone();
1646                            sub_params.extend(row.clone());
1647
1648                            match this.execute(plan, prop_manager, &sub_params).await {
1649                                Ok(results) => Ok(Value::from(results.len() as i64)),
1650                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1651                            }
1652                        }
1653                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1654                    }
1655                }
1656                Expr::Quantifier {
1657                    quantifier,
1658                    variable,
1659                    list,
1660                    predicate,
1661                } => {
1662                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1663                    //
1664                    // This is the primary execution path for quantifiers because DataFusion
1665                    // does not support lambda functions yet. Queries with quantifiers attempt
1666                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1667                    // to this fallback executor path.
1668                    //
1669                    // This is intentional design - we get correct semantics with row-by-row
1670                    // evaluation until DataFusion adds lambda support.
1671                    //
1672                    // See: https://github.com/apache/datafusion/issues/14205
1673
1674                    // Evaluate the list expression
1675                    let list_val = this
1676                        .evaluate_expr(list, row, prop_manager, params, ctx)
1677                        .await?;
1678
1679                    // Handle null propagation
1680                    if list_val.is_null() {
1681                        return Ok(Value::Null);
1682                    }
1683
1684                    // Convert to array
1685                    let items = match list_val {
1686                        Value::List(arr) => arr,
1687                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1688                    };
1689
1690                    // Evaluate predicate for each item
1691                    let mut satisfied_count = 0;
1692                    for item in &items {
1693                        // Create new row with bound variable
1694                        let mut item_row = row.clone();
1695                        item_row.insert(variable.clone(), item.clone());
1696
1697                        // Evaluate predicate with bound variable
1698                        let pred_result = this
1699                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1700                            .await?;
1701
1702                        // Check if predicate is satisfied
1703                        if let Value::Bool(true) = pred_result {
1704                            satisfied_count += 1;
1705                        }
1706                    }
1707
1708                    // Return based on quantifier type
1709                    let result = match quantifier {
1710                        Quantifier::All => satisfied_count == items.len(),
1711                        Quantifier::Any => satisfied_count > 0,
1712                        Quantifier::Single => satisfied_count == 1,
1713                        Quantifier::None => satisfied_count == 0,
1714                    };
1715
1716                    Ok(Value::Bool(result))
1717                }
1718                Expr::ListComprehension {
1719                    variable,
1720                    list,
1721                    where_clause,
1722                    map_expr,
1723                } => {
1724                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1725                    //
1726                    // Similar to quantifiers, this requires lambda-like evaluation
1727                    // which DataFusion doesn't support yet. This is the primary execution path.
1728
1729                    // Evaluate the list expression
1730                    let list_val = this
1731                        .evaluate_expr(list, row, prop_manager, params, ctx)
1732                        .await?;
1733
1734                    // Handle null propagation
1735                    if list_val.is_null() {
1736                        return Ok(Value::Null);
1737                    }
1738
1739                    // Convert to array
1740                    let items = match list_val {
1741                        Value::List(arr) => arr,
1742                        _ => {
1743                            return Err(anyhow!(
1744                                "List comprehension expects a list, got: {:?}",
1745                                list_val
1746                            ));
1747                        }
1748                    };
1749
1750                    // Collect mapped values
1751                    let mut results = Vec::new();
1752                    for item in &items {
1753                        // Create new row with bound variable
1754                        let mut item_row = row.clone();
1755                        item_row.insert(variable.clone(), item.clone());
1756
1757                        // Apply WHERE filter if present
1758                        if let Some(predicate) = where_clause {
1759                            let pred_result = this
1760                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1761                                .await?;
1762
1763                            // Skip items that don't match the filter
1764                            if !matches!(pred_result, Value::Bool(true)) {
1765                                continue;
1766                            }
1767                        }
1768
1769                        // Apply map expression
1770                        let mapped_val = this
1771                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1772                            .await?;
1773                        results.push(mapped_val);
1774                    }
1775
1776                    Ok(Value::List(results))
1777                }
1778                Expr::BinaryOp { left, op, right } => {
1779                    // Short-circuit evaluation for AND/OR
1780                    match op {
1781                        BinaryOp::And => {
1782                            let l_val = this
1783                                .evaluate_expr(left, row, prop_manager, params, ctx)
1784                                .await?;
1785                            // Short-circuit: if left is false, don't evaluate right
1786                            if let Some(false) = l_val.as_bool() {
1787                                return Ok(Value::Bool(false));
1788                            }
1789                            let r_val = this
1790                                .evaluate_expr(right, row, prop_manager, params, ctx)
1791                                .await?;
1792                            eval_binary_op(&l_val, op, &r_val)
1793                        }
1794                        BinaryOp::Or => {
1795                            let l_val = this
1796                                .evaluate_expr(left, row, prop_manager, params, ctx)
1797                                .await?;
1798                            // Short-circuit: if left is true, don't evaluate right
1799                            if let Some(true) = l_val.as_bool() {
1800                                return Ok(Value::Bool(true));
1801                            }
1802                            let r_val = this
1803                                .evaluate_expr(right, row, prop_manager, params, ctx)
1804                                .await?;
1805                            eval_binary_op(&l_val, op, &r_val)
1806                        }
1807                        _ => {
1808                            // For all other operators, evaluate both sides
1809                            let l_val = this
1810                                .evaluate_expr(left, row, prop_manager, params, ctx)
1811                                .await?;
1812                            let r_val = this
1813                                .evaluate_expr(right, row, prop_manager, params, ctx)
1814                                .await?;
1815                            eval_binary_op(&l_val, op, &r_val)
1816                        }
1817                    }
1818                }
1819                Expr::In { expr, list } => {
1820                    let l_val = this
1821                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1822                        .await?;
1823                    let r_val = this
1824                        .evaluate_expr(list, row, prop_manager, params, ctx)
1825                        .await?;
1826                    eval_in_op(&l_val, &r_val)
1827                }
1828                Expr::UnaryOp { op, expr } => {
1829                    let val = this
1830                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1831                        .await?;
1832                    match op {
1833                        UnaryOp::Not => {
1834                            // Three-valued logic: NOT null = null
1835                            match val.as_bool() {
1836                                Some(b) => Ok(Value::Bool(!b)),
1837                                None if val.is_null() => Ok(Value::Null),
1838                                None => Err(anyhow!(
1839                                    "InvalidArgumentType: NOT requires a boolean argument"
1840                                )),
1841                            }
1842                        }
1843                        UnaryOp::Neg => {
1844                            if let Some(i) = val.as_i64() {
1845                                Ok(Value::Int(-i))
1846                            } else if let Some(f) = val.as_f64() {
1847                                Ok(Value::Float(-f))
1848                            } else {
1849                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1850                            }
1851                        }
1852                    }
1853                }
1854                Expr::IsNull(expr) => {
1855                    let val = this
1856                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1857                        .await?;
1858                    Ok(Value::Bool(val.is_null()))
1859                }
1860                Expr::IsNotNull(expr) => {
1861                    let val = this
1862                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1863                        .await?;
1864                    Ok(Value::Bool(!val.is_null()))
1865                }
1866                Expr::IsUnique(_) => {
1867                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1868                    Err(anyhow!(
1869                        "IS UNIQUE can only be used in constraint definitions"
1870                    ))
1871                }
1872                Expr::Case {
1873                    expr,
1874                    when_then,
1875                    else_expr,
1876                } => {
1877                    if let Some(base_expr) = expr {
1878                        let base_val = this
1879                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1880                            .await?;
1881                        for (w, t) in when_then {
1882                            let w_val = this
1883                                .evaluate_expr(w, row, prop_manager, params, ctx)
1884                                .await?;
1885                            if base_val == w_val {
1886                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1887                            }
1888                        }
1889                    } else {
1890                        for (w, t) in when_then {
1891                            let w_val = this
1892                                .evaluate_expr(w, row, prop_manager, params, ctx)
1893                                .await?;
1894                            if w_val.as_bool() == Some(true) {
1895                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1896                            }
1897                        }
1898                    }
1899                    if let Some(e) = else_expr {
1900                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1901                    }
1902                    Ok(Value::Null)
1903                }
1904                Expr::Wildcard => Ok(Value::Null),
1905                Expr::FunctionCall { name, args, .. } => {
1906                    // Special case: id() returns VID for nodes and EID for relationships
1907                    if name.eq_ignore_ascii_case("ID") {
1908                        if args.len() != 1 {
1909                            return Err(anyhow!("id() requires exactly 1 argument"));
1910                        }
1911                        let val = this
1912                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1913                            .await?;
1914                        if let Value::Map(map) = &val {
1915                            // Check for _vid (vertex) first
1916                            if let Some(vid_val) = map.get("_vid") {
1917                                return Ok(vid_val.clone());
1918                            }
1919                            // Check for _eid (edge/relationship)
1920                            if let Some(eid_val) = map.get("_eid") {
1921                                return Ok(eid_val.clone());
1922                            }
1923                            // Check for _id (fallback)
1924                            if let Some(id_val) = map.get("_id") {
1925                                return Ok(id_val.clone());
1926                            }
1927                        }
1928                        return Ok(Value::Null);
1929                    }
1930
1931                    // Special case: elementId() returns string format "label_id:local_offset"
1932                    if name.eq_ignore_ascii_case("ELEMENTID") {
1933                        if args.len() != 1 {
1934                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1935                        }
1936                        let val = this
1937                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1938                            .await?;
1939                        if let Value::Map(map) = &val {
1940                            // Check for _vid (vertex) first
1941                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1942                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1943                                return Ok(Value::String(vid_val.to_string()));
1944                            }
1945                            // Check for _eid (edge/relationship)
1946                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1947                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1948                                return Ok(Value::String(eid_val.to_string()));
1949                            }
1950                        }
1951                        return Ok(Value::Null);
1952                    }
1953
1954                    // Special case: type() returns the relationship type name
1955                    if name.eq_ignore_ascii_case("TYPE") {
1956                        if args.len() != 1 {
1957                            return Err(anyhow!("type() requires exactly 1 argument"));
1958                        }
1959                        let val = this
1960                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1961                            .await?;
1962                        if let Value::Map(map) = &val
1963                            && let Some(type_val) = map.get("_type")
1964                        {
1965                            // Numeric _type is an edge type ID; string _type is already a name
1966                            if let Some(type_id) =
1967                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1968                            {
1969                                if let Some(name) = this
1970                                    .storage
1971                                    .schema_manager()
1972                                    .edge_type_name_by_id_unified(type_id)
1973                                {
1974                                    return Ok(Value::String(name));
1975                                }
1976                            } else if let Some(name) = type_val.as_str() {
1977                                return Ok(Value::String(name.to_string()));
1978                            }
1979                        }
1980                        return Ok(Value::Null);
1981                    }
1982
1983                    // Special case: labels() returns the labels of a node
1984                    if name.eq_ignore_ascii_case("LABELS") {
1985                        if args.len() != 1 {
1986                            return Err(anyhow!("labels() requires exactly 1 argument"));
1987                        }
1988                        let val = this
1989                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1990                            .await?;
1991                        if let Value::Map(map) = &val
1992                            && let Some(labels_val) = map.get("_labels")
1993                        {
1994                            return Ok(labels_val.clone());
1995                        }
1996                        return Ok(Value::Null);
1997                    }
1998
1999                    // Special case: properties() returns the properties map of a node/edge
2000                    if name.eq_ignore_ascii_case("PROPERTIES") {
2001                        if args.len() != 1 {
2002                            return Err(anyhow!("properties() requires exactly 1 argument"));
2003                        }
2004                        let val = this
2005                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2006                            .await?;
2007                        if let Value::Map(map) = &val {
2008                            // Filter out internal properties (those starting with _)
2009                            let mut props = HashMap::new();
2010                            for (k, v) in map.iter() {
2011                                if !k.starts_with('_') {
2012                                    props.insert(k.clone(), v.clone());
2013                                }
2014                            }
2015                            return Ok(Value::Map(props));
2016                        }
2017                        return Ok(Value::Null);
2018                    }
2019
2020                    // Special case: startNode() returns the start node of a relationship
2021                    if name.eq_ignore_ascii_case("STARTNODE") {
2022                        if args.len() != 1 {
2023                            return Err(anyhow!("startNode() requires exactly 1 argument"));
2024                        }
2025                        let val = this
2026                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2027                            .await?;
2028                        if let Value::Edge(edge) = &val {
2029                            return Ok(Self::find_node_by_vid(row, edge.src));
2030                        }
2031                        if let Value::Map(map) = &val {
2032                            if let Some(start_node) = map.get("_startNode") {
2033                                return Ok(start_node.clone());
2034                            }
2035                            if let Some(src_vid) = map.get("_src_vid") {
2036                                return Ok(Value::Map(HashMap::from([(
2037                                    "_vid".to_string(),
2038                                    src_vid.clone(),
2039                                )])));
2040                            }
2041                            // Resolve _src VID by looking up node in row
2042                            if let Some(src_id) = map.get("_src")
2043                                && let Some(u) = src_id.as_u64()
2044                            {
2045                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2046                            }
2047                        }
2048                        return Ok(Value::Null);
2049                    }
2050
2051                    // Special case: endNode() returns the end node of a relationship
2052                    if name.eq_ignore_ascii_case("ENDNODE") {
2053                        if args.len() != 1 {
2054                            return Err(anyhow!("endNode() requires exactly 1 argument"));
2055                        }
2056                        let val = this
2057                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2058                            .await?;
2059                        if let Value::Edge(edge) = &val {
2060                            return Ok(Self::find_node_by_vid(row, edge.dst));
2061                        }
2062                        if let Value::Map(map) = &val {
2063                            if let Some(end_node) = map.get("_endNode") {
2064                                return Ok(end_node.clone());
2065                            }
2066                            if let Some(dst_vid) = map.get("_dst_vid") {
2067                                return Ok(Value::Map(HashMap::from([(
2068                                    "_vid".to_string(),
2069                                    dst_vid.clone(),
2070                                )])));
2071                            }
2072                            // Resolve _dst VID by looking up node in row
2073                            if let Some(dst_id) = map.get("_dst")
2074                                && let Some(u) = dst_id.as_u64()
2075                            {
2076                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2077                            }
2078                        }
2079                        return Ok(Value::Null);
2080                    }
2081
2082                    // Special case: hasLabel() checks if a node has a specific label
2083                    // Used for WHERE n:Label predicates
2084                    if name.eq_ignore_ascii_case("HASLABEL") {
2085                        if args.len() != 2 {
2086                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2087                        }
2088                        let node_val = this
2089                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2090                            .await?;
2091                        let label_val = this
2092                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2093                            .await?;
2094
2095                        let label_to_check = label_val.as_str().ok_or_else(|| {
2096                            anyhow!("Second argument to hasLabel must be a string")
2097                        })?;
2098
2099                        let has_label = match &node_val {
2100                            // Handle proper Value::Node type (from result normalization)
2101                            Value::Map(map) if map.contains_key("_vid") => {
2102                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2103                                    labels_arr
2104                                        .iter()
2105                                        .any(|l| l.as_str() == Some(label_to_check))
2106                                } else {
2107                                    false
2108                                }
2109                            }
2110                            // Also handle legacy Object format
2111                            Value::Map(map) => {
2112                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2113                                    labels_arr
2114                                        .iter()
2115                                        .any(|l| l.as_str() == Some(label_to_check))
2116                                } else {
2117                                    false
2118                                }
2119                            }
2120                            _ => false,
2121                        };
2122                        return Ok(Value::Bool(has_label));
2123                    }
2124
2125                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
2126                    // These should be parsed as Expr::Quantifier instead.
2127                    if matches!(
2128                        name.to_uppercase().as_str(),
2129                        "ANY" | "ALL" | "NONE" | "SINGLE"
2130                    ) {
2131                        return Err(anyhow!(
2132                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2133                            name.to_lowercase()
2134                        ));
2135                    }
2136
2137                    // Special case: COALESCE needs short-circuit evaluation
2138                    if name.eq_ignore_ascii_case("COALESCE") {
2139                        for arg in args {
2140                            let val = this
2141                                .evaluate_expr(arg, row, prop_manager, params, ctx)
2142                                .await?;
2143                            if !val.is_null() {
2144                                return Ok(val);
2145                            }
2146                        }
2147                        return Ok(Value::Null);
2148                    }
2149
2150                    // Special case: vector_similarity has dedicated implementation
2151                    if name.eq_ignore_ascii_case("vector_similarity") {
2152                        if args.len() != 2 {
2153                            return Err(anyhow!("vector_similarity takes 2 arguments"));
2154                        }
2155                        let v1 = this
2156                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2157                            .await?;
2158                        let v2 = this
2159                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2160                            .await?;
2161                        return eval_vector_similarity(&v1, &v2);
2162                    }
2163
2164                    // Special case: uni.validAt handles node fetching
2165                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
2166                        || name.eq_ignore_ascii_case("uni.validAt")
2167                        || name.eq_ignore_ascii_case("validAt")
2168                    {
2169                        if args.len() != 4 {
2170                            return Err(anyhow!("validAt requires 4 arguments"));
2171                        }
2172                        let node_val = this
2173                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2174                            .await?;
2175                        let start_prop = this
2176                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2177                            .await?
2178                            .as_str()
2179                            .ok_or(anyhow!("start_prop must be string"))?
2180                            .to_string();
2181                        let end_prop = this
2182                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2183                            .await?
2184                            .as_str()
2185                            .ok_or(anyhow!("end_prop must be string"))?
2186                            .to_string();
2187                        let time_val = this
2188                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2189                            .await?;
2190
2191                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2192                            anyhow!("time argument must be a datetime value or string")
2193                        })?;
2194
2195                        // Fetch temporal property values - supports both vertices and edges
2196                        let valid_from_val: Option<Value> = if let Ok(vid) =
2197                            Self::vid_from_value(&node_val)
2198                        {
2199                            // Vertex case - VID string format
2200                            prop_manager
2201                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2202                                .await
2203                                .ok()
2204                        } else if let Value::Map(map) = &node_val {
2205                            // Check for embedded _vid or _eid in object
2206                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2207                                let vid = Vid::from(vid_val);
2208                                prop_manager
2209                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2210                                    .await
2211                                    .ok()
2212                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2213                                // Edge case
2214                                let eid = uni_common::core::id::Eid::from(eid_val);
2215                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2216                            } else {
2217                                // Inline object - property embedded directly
2218                                map.get(&start_prop).cloned()
2219                            }
2220                        } else {
2221                            return Ok(Value::Bool(false));
2222                        };
2223
2224                        let valid_from = match valid_from_val {
2225                            Some(ref v) => match value_to_datetime_utc(v) {
2226                                Some(dt) => dt,
2227                                None if v.is_null() => return Ok(Value::Bool(false)),
2228                                None => {
2229                                    return Err(anyhow!(
2230                                        "Property {} must be a datetime value or string",
2231                                        start_prop
2232                                    ));
2233                                }
2234                            },
2235                            None => return Ok(Value::Bool(false)),
2236                        };
2237
2238                        let valid_to_val: Option<Value> = if let Ok(vid) =
2239                            Self::vid_from_value(&node_val)
2240                        {
2241                            // Vertex case - VID string format
2242                            prop_manager
2243                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2244                                .await
2245                                .ok()
2246                        } else if let Value::Map(map) = &node_val {
2247                            // Check for embedded _vid or _eid in object
2248                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2249                                let vid = Vid::from(vid_val);
2250                                prop_manager
2251                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2252                                    .await
2253                                    .ok()
2254                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2255                                // Edge case
2256                                let eid = uni_common::core::id::Eid::from(eid_val);
2257                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2258                            } else {
2259                                // Inline object - property embedded directly
2260                                map.get(&end_prop).cloned()
2261                            }
2262                        } else {
2263                            return Ok(Value::Bool(false));
2264                        };
2265
2266                        let valid_to = match valid_to_val {
2267                            Some(ref v) => match value_to_datetime_utc(v) {
2268                                Some(dt) => Some(dt),
2269                                None if v.is_null() => None,
2270                                None => {
2271                                    return Err(anyhow!(
2272                                        "Property {} must be a datetime value or null",
2273                                        end_prop
2274                                    ));
2275                                }
2276                            },
2277                            None => None,
2278                        };
2279
2280                        let is_valid = valid_from <= query_time
2281                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2282                        return Ok(Value::Bool(is_valid));
2283                    }
2284
2285                    // For all other functions, evaluate arguments then call helper
2286                    let mut evaluated_args = Vec::with_capacity(args.len());
2287                    for arg in args {
2288                        let mut val = this
2289                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2290                            .await?;
2291
2292                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2293                        // Functions like validAt() need access to properties like valid_from/valid_to.
2294                        if let Value::Map(ref mut map) = val {
2295                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2296                        }
2297
2298                        evaluated_args.push(val);
2299                    }
2300                    eval_scalar_function(
2301                        name,
2302                        &evaluated_args,
2303                        self.custom_function_registry.as_deref(),
2304                    )
2305                }
2306                Expr::Reduce {
2307                    accumulator,
2308                    init,
2309                    variable,
2310                    list,
2311                    expr,
2312                } => {
2313                    let mut acc = self
2314                        .evaluate_expr(init, row, prop_manager, params, ctx)
2315                        .await?;
2316                    let list_val = self
2317                        .evaluate_expr(list, row, prop_manager, params, ctx)
2318                        .await?;
2319
2320                    if let Value::List(items) = list_val {
2321                        for item in items {
2322                            // Create a temporary scope/row with accumulator and variable
2323                            // For simplicity in fallback executor, we can construct a new row map
2324                            // merging current row + new variables.
2325                            let mut scope = row.clone();
2326                            scope.insert(accumulator.clone(), acc.clone());
2327                            scope.insert(variable.clone(), item);
2328
2329                            acc = self
2330                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2331                                .await?;
2332                        }
2333                    } else {
2334                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2335                    }
2336                    Ok(acc)
2337                }
2338                Expr::ValidAt { .. } => {
2339                    // VALID_AT should have been transformed to a function call in the planner
2340                    Err(anyhow!(
2341                        "VALID_AT expression should have been transformed to function call in planner"
2342                    ))
2343                }
2344
2345                Expr::LabelCheck { expr, labels } => {
2346                    let val = this
2347                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2348                        .await?;
2349                    match &val {
2350                        Value::Null => Ok(Value::Null),
2351                        Value::Map(map) => {
2352                            // Check if this is an edge (has _eid) or node (has _vid)
2353                            let is_edge = map.contains_key("_eid")
2354                                || map.contains_key("_type_name")
2355                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2356
2357                            if is_edge {
2358                                // Edges have a single type
2359                                if labels.len() > 1 {
2360                                    return Ok(Value::Bool(false));
2361                                }
2362                                let label_to_check = &labels[0];
2363                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2364                                {
2365                                    t == label_to_check
2366                                } else if let Some(Value::String(t)) = map.get("_type") {
2367                                    t == label_to_check
2368                                } else {
2369                                    false
2370                                };
2371                                Ok(Value::Bool(has_type))
2372                            } else {
2373                                // Node: check all labels
2374                                let has_all = labels.iter().all(|label_to_check| {
2375                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2376                                        labels_arr
2377                                            .iter()
2378                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2379                                    } else {
2380                                        false
2381                                    }
2382                                });
2383                                Ok(Value::Bool(has_all))
2384                            }
2385                        }
2386                        _ => Ok(Value::Bool(false)),
2387                    }
2388                }
2389
2390                Expr::MapProjection { base, items } => {
2391                    let base_value = this
2392                        .evaluate_expr(base, row, prop_manager, params, ctx)
2393                        .await?;
2394
2395                    // Extract properties from the base object
2396                    let properties = match &base_value {
2397                        Value::Map(map) => map,
2398                        _ => {
2399                            return Err(anyhow!(
2400                                "Map projection requires object, got {:?}",
2401                                base_value
2402                            ));
2403                        }
2404                    };
2405
2406                    let mut result_map = HashMap::new();
2407
2408                    for item in items {
2409                        match item {
2410                            MapProjectionItem::Property(prop) => {
2411                                if let Some(value) = properties.get(prop.as_str()) {
2412                                    result_map.insert(prop.clone(), value.clone());
2413                                }
2414                            }
2415                            MapProjectionItem::AllProperties => {
2416                                // Include all properties except internal fields (those starting with _)
2417                                for (key, value) in properties.iter() {
2418                                    if !key.starts_with('_') {
2419                                        result_map.insert(key.clone(), value.clone());
2420                                    }
2421                                }
2422                            }
2423                            MapProjectionItem::LiteralEntry(key, expr) => {
2424                                let value = this
2425                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2426                                    .await?;
2427                                result_map.insert(key.clone(), value);
2428                            }
2429                            MapProjectionItem::Variable(var_name) => {
2430                                // Variable selector: include the value of the variable in the result
2431                                // e.g., person{.name, friend} includes the value of 'friend' variable
2432                                if let Some(value) = row.get(var_name.as_str()) {
2433                                    result_map.insert(var_name.clone(), value.clone());
2434                                }
2435                            }
2436                        }
2437                    }
2438
2439                    Ok(Value::Map(result_map))
2440                }
2441            }
2442        })
2443    }
2444
2445    pub(crate) fn execute_subplan<'a>(
2446        &'a self,
2447        plan: LogicalPlan,
2448        prop_manager: &'a PropertyManager,
2449        params: &'a HashMap<String, Value>,
2450        ctx: Option<&'a QueryContext>,
2451    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2452        Box::pin(async move {
2453            if let Some(ctx) = ctx {
2454                ctx.check_timeout()?;
2455            }
2456            match plan {
2457                LogicalPlan::Union { left, right, all } => {
2458                    self.execute_union(left, right, all, prop_manager, params, ctx)
2459                        .await
2460                }
2461                LogicalPlan::CreateVectorIndex {
2462                    config,
2463                    if_not_exists,
2464                } => {
2465                    if if_not_exists && self.index_exists_by_name(&config.name) {
2466                        return Ok(vec![]);
2467                    }
2468                    let idx_mgr = self.storage.index_manager();
2469                    idx_mgr.create_vector_index(config).await?;
2470                    Ok(vec![])
2471                }
2472                LogicalPlan::CreateFullTextIndex {
2473                    config,
2474                    if_not_exists,
2475                } => {
2476                    if if_not_exists && self.index_exists_by_name(&config.name) {
2477                        return Ok(vec![]);
2478                    }
2479                    let idx_mgr = self.storage.index_manager();
2480                    idx_mgr.create_fts_index(config).await?;
2481                    Ok(vec![])
2482                }
2483                LogicalPlan::CreateScalarIndex {
2484                    mut config,
2485                    if_not_exists,
2486                } => {
2487                    if if_not_exists && self.index_exists_by_name(&config.name) {
2488                        return Ok(vec![]);
2489                    }
2490
2491                    // Check for expression indexes - create generated columns
2492                    let mut modified_properties = Vec::new();
2493
2494                    for prop in &config.properties {
2495                        // Heuristic: if contains '(' and ')', it's an expression
2496                        if prop.contains('(') && prop.contains(')') {
2497                            let gen_col = SchemaManager::generated_column_name(prop);
2498
2499                            // Add generated property to schema
2500                            let sm = self.storage.schema_manager_arc();
2501                            if let Err(e) = sm.add_generated_property(
2502                                &config.label,
2503                                &gen_col,
2504                                DataType::String, // Default type for expressions
2505                                prop.clone(),
2506                            ) {
2507                                log::warn!("Failed to add generated property (might exist): {}", e);
2508                            }
2509
2510                            modified_properties.push(gen_col);
2511                        } else {
2512                            // Simple property - use as-is
2513                            modified_properties.push(prop.clone());
2514                        }
2515                    }
2516
2517                    config.properties = modified_properties;
2518
2519                    let idx_mgr = self.storage.index_manager();
2520                    idx_mgr.create_scalar_index(config).await?;
2521                    Ok(vec![])
2522                }
2523                LogicalPlan::CreateJsonFtsIndex {
2524                    config,
2525                    if_not_exists,
2526                } => {
2527                    if if_not_exists && self.index_exists_by_name(&config.name) {
2528                        return Ok(vec![]);
2529                    }
2530                    let idx_mgr = self.storage.index_manager();
2531                    idx_mgr.create_json_fts_index(config).await?;
2532                    Ok(vec![])
2533                }
2534                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2535                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2536                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2537                LogicalPlan::Vacuum => {
2538                    self.execute_vacuum().await?;
2539                    Ok(vec![])
2540                }
2541                LogicalPlan::Checkpoint => {
2542                    self.execute_checkpoint().await?;
2543                    Ok(vec![])
2544                }
2545                LogicalPlan::CopyTo {
2546                    label,
2547                    path,
2548                    format,
2549                    options,
2550                } => {
2551                    let count = self
2552                        .execute_copy_to(&label, &path, &format, &options)
2553                        .await?;
2554                    let mut result = HashMap::new();
2555                    result.insert("count".to_string(), Value::Int(count as i64));
2556                    Ok(vec![result])
2557                }
2558                LogicalPlan::CopyFrom {
2559                    label,
2560                    path,
2561                    format,
2562                    options,
2563                } => {
2564                    let count = self
2565                        .execute_copy_from(&label, &path, &format, &options)
2566                        .await?;
2567                    let mut result = HashMap::new();
2568                    result.insert("count".to_string(), Value::Int(count as i64));
2569                    Ok(vec![result])
2570                }
2571                LogicalPlan::CreateLabel(clause) => {
2572                    self.execute_create_label(clause).await?;
2573                    Ok(vec![])
2574                }
2575                LogicalPlan::CreateEdgeType(clause) => {
2576                    self.execute_create_edge_type(clause).await?;
2577                    Ok(vec![])
2578                }
2579                LogicalPlan::AlterLabel(clause) => {
2580                    self.execute_alter_label(clause).await?;
2581                    Ok(vec![])
2582                }
2583                LogicalPlan::AlterEdgeType(clause) => {
2584                    self.execute_alter_edge_type(clause).await?;
2585                    Ok(vec![])
2586                }
2587                LogicalPlan::DropLabel(clause) => {
2588                    self.execute_drop_label(clause).await?;
2589                    Ok(vec![])
2590                }
2591                LogicalPlan::DropEdgeType(clause) => {
2592                    self.execute_drop_edge_type(clause).await?;
2593                    Ok(vec![])
2594                }
2595                LogicalPlan::CreateConstraint(clause) => {
2596                    self.execute_create_constraint(clause).await?;
2597                    Ok(vec![])
2598                }
2599                LogicalPlan::DropConstraint(clause) => {
2600                    self.execute_drop_constraint(clause).await?;
2601                    Ok(vec![])
2602                }
2603                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2604                LogicalPlan::DropIndex { name, if_exists } => {
2605                    let idx_mgr = self.storage.index_manager();
2606                    match idx_mgr.drop_index(&name).await {
2607                        Ok(_) => Ok(vec![]),
2608                        Err(e) => {
2609                            if if_exists && e.to_string().contains("not found") {
2610                                Ok(vec![])
2611                            } else {
2612                                Err(e)
2613                            }
2614                        }
2615                    }
2616                }
2617                LogicalPlan::ShowIndexes { filter } => {
2618                    Ok(self.execute_show_indexes(filter.as_deref()))
2619                }
2620                // Scan/traverse nodes: delegate to DataFusion for data access,
2621                // then convert results to HashMaps for the fallback executor.
2622                LogicalPlan::Scan { .. }
2623                | LogicalPlan::FusedIndexScan { .. }
2624                | LogicalPlan::FusedIndexScanWrapped { .. }
2625                | LogicalPlan::ExtIdLookup { .. }
2626                | LogicalPlan::ScanAll { .. }
2627                | LogicalPlan::ScanMainByLabels { .. }
2628                | LogicalPlan::Traverse { .. }
2629                | LogicalPlan::TraverseMainByType { .. } => {
2630                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2631                    self.record_batches_to_rows(batches)
2632                }
2633                LogicalPlan::Filter {
2634                    input,
2635                    predicate,
2636                    optional_variables,
2637                } => {
2638                    let input_matches = self
2639                        .execute_subplan(*input, prop_manager, params, ctx)
2640                        .await?;
2641
2642                    tracing::debug!(
2643                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2644                        predicate,
2645                        input_matches.len(),
2646                        optional_variables
2647                    );
2648
2649                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2650                    // Group rows by non-optional variables, apply filter, and ensure
2651                    // at least one row per group (with NULLs if filter removes all).
2652                    if !optional_variables.is_empty() {
2653                        // Helper to check if a key belongs to an optional variable.
2654                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2655                        let is_optional_key = |k: &str| -> bool {
2656                            optional_variables.contains(k)
2657                                || optional_variables
2658                                    .iter()
2659                                    .any(|var| k.starts_with(&format!("{}.", var)))
2660                        };
2661
2662                        // Helper to check if a key is internal (should not affect grouping)
2663                        let is_internal_key =
2664                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2665
2666                        // Compute the key (non-optional, non-internal variables) for grouping
2667                        let non_optional_vars: Vec<String> = input_matches
2668                            .first()
2669                            .map(|row| {
2670                                row.keys()
2671                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2672                                    .cloned()
2673                                    .collect()
2674                            })
2675                            .unwrap_or_default();
2676
2677                        // Group rows by their non-optional variable values
2678                        let mut groups: std::collections::HashMap<
2679                            Vec<u8>,
2680                            Vec<HashMap<String, Value>>,
2681                        > = std::collections::HashMap::new();
2682
2683                        for row in &input_matches {
2684                            // Create a key from non-optional variable values
2685                            let key: Vec<u8> = non_optional_vars
2686                                .iter()
2687                                .map(|var| {
2688                                    row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2689                                })
2690                                .collect::<Vec<_>>()
2691                                .join("|")
2692                                .into_bytes();
2693
2694                            groups.entry(key).or_default().push(row.clone());
2695                        }
2696
2697                        let mut filtered = Vec::new();
2698                        for (_key, group_rows) in groups {
2699                            let mut group_passed = Vec::new();
2700
2701                            for row in &group_rows {
2702                                // If optional variables are already NULL, preserve the row
2703                                let has_null_optional = optional_variables.iter().any(|var| {
2704                                    // Check both "var" and "var._vid" style keys
2705                                    let direct_null =
2706                                        matches!(row.get(var), Some(Value::Null) | None);
2707                                    let prefixed_null = row
2708                                        .keys()
2709                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2710                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2711                                    direct_null || prefixed_null
2712                                });
2713
2714                                if has_null_optional {
2715                                    group_passed.push(row.clone());
2716                                    continue;
2717                                }
2718
2719                                let res = self
2720                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2721                                    .await?;
2722
2723                                if res.as_bool().unwrap_or(false) {
2724                                    group_passed.push(row.clone());
2725                                }
2726                            }
2727
2728                            if group_passed.is_empty() {
2729                                // No rows passed - emit one row with NULLs for optional variables
2730                                // Use the first row's non-optional values as a template
2731                                if let Some(template) = group_rows.first() {
2732                                    let mut null_row = HashMap::new();
2733                                    for (k, v) in template {
2734                                        if is_optional_key(k) {
2735                                            null_row.insert(k.clone(), Value::Null);
2736                                        } else {
2737                                            null_row.insert(k.clone(), v.clone());
2738                                        }
2739                                    }
2740                                    filtered.push(null_row);
2741                                }
2742                            } else {
2743                                filtered.extend(group_passed);
2744                            }
2745                        }
2746
2747                        tracing::debug!(
2748                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2749                            input_matches.len(),
2750                            filtered.len()
2751                        );
2752
2753                        return Ok(filtered);
2754                    }
2755
2756                    // Standard filter for non-OPTIONAL MATCH
2757                    let mut filtered = Vec::new();
2758                    for row in input_matches.iter() {
2759                        let res = self
2760                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2761                            .await?;
2762
2763                        let passes = res.as_bool().unwrap_or(false);
2764
2765                        if passes {
2766                            filtered.push(row.clone());
2767                        }
2768                    }
2769
2770                    tracing::debug!(
2771                        "Filter: {} input rows -> {} output rows",
2772                        input_matches.len(),
2773                        filtered.len()
2774                    );
2775
2776                    Ok(filtered)
2777                }
2778                LogicalPlan::ProcedureCall {
2779                    procedure_name,
2780                    arguments,
2781                    yield_items,
2782                } => {
2783                    let yield_names: Vec<String> =
2784                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2785                    let results = self
2786                        .execute_procedure(
2787                            &procedure_name,
2788                            &arguments,
2789                            &yield_names,
2790                            prop_manager,
2791                            params,
2792                            ctx,
2793                        )
2794                        .await?;
2795
2796                    // Handle aliasing: collect all original values first, then
2797                    // build the aliased row in one pass. This avoids issues when
2798                    // an alias matches another yield item's original name (e.g.,
2799                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2800                    // clobber the original "b" before it is renamed to "d").
2801                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2802                    if !has_aliases {
2803                        // No aliases (includes YIELD * which produces empty yield_items) —
2804                        // pass through the procedure output rows unchanged.
2805                        Ok(results)
2806                    } else {
2807                        let mut aliased_results = Vec::with_capacity(results.len());
2808                        for row in results {
2809                            let mut new_row = HashMap::new();
2810                            for (name, alias) in &yield_items {
2811                                let col_name = alias.as_ref().unwrap_or(name);
2812                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2813                                new_row.insert(col_name.clone(), val);
2814                            }
2815                            aliased_results.push(new_row);
2816                        }
2817                        Ok(aliased_results)
2818                    }
2819                }
2820                LogicalPlan::VectorKnn { .. } => {
2821                    unreachable!("VectorKnn is handled by DataFusion engine")
2822                }
2823                LogicalPlan::InvertedIndexLookup { .. } => {
2824                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2825                }
2826                LogicalPlan::Sort { input, order_by } => {
2827                    let rows = self
2828                        .execute_subplan(*input, prop_manager, params, ctx)
2829                        .await?;
2830                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2831                        .await
2832                }
2833                LogicalPlan::Limit { input, skip, fetch } => {
2834                    let rows = self
2835                        .execute_subplan(*input, prop_manager, params, ctx)
2836                        .await?;
2837                    let skip = skip.unwrap_or(0);
2838                    let take = fetch.unwrap_or(usize::MAX);
2839                    Ok(rows.into_iter().skip(skip).take(take).collect())
2840                }
2841                LogicalPlan::Aggregate {
2842                    input,
2843                    group_by,
2844                    aggregates,
2845                } => {
2846                    let rows = self
2847                        .execute_subplan(*input, prop_manager, params, ctx)
2848                        .await?;
2849                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2850                        .await
2851                }
2852                LogicalPlan::Window {
2853                    input,
2854                    window_exprs,
2855                } => {
2856                    let rows = self
2857                        .execute_subplan(*input, prop_manager, params, ctx)
2858                        .await?;
2859                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2860                        .await
2861                }
2862                LogicalPlan::Project { input, projections } => {
2863                    let matches = self
2864                        .execute_subplan(*input, prop_manager, params, ctx)
2865                        .await?;
2866                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2867                        .await
2868                }
2869                LogicalPlan::Distinct { input } => {
2870                    let rows = self
2871                        .execute_subplan(*input, prop_manager, params, ctx)
2872                        .await?;
2873                    let mut seen = std::collections::HashSet::new();
2874                    let mut result = Vec::new();
2875                    for row in rows {
2876                        let key = Self::canonical_row_key(&row);
2877                        if seen.insert(key) {
2878                            result.push(row);
2879                        }
2880                    }
2881                    Ok(result)
2882                }
2883                LogicalPlan::Unwind {
2884                    input,
2885                    expr,
2886                    variable,
2887                } => {
2888                    let input_rows = self
2889                        .execute_subplan(*input, prop_manager, params, ctx)
2890                        .await?;
2891                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2892                        .await
2893                }
2894                LogicalPlan::Apply {
2895                    input,
2896                    subquery,
2897                    input_filter,
2898                } => {
2899                    let input_rows = self
2900                        .execute_subplan(*input, prop_manager, params, ctx)
2901                        .await?;
2902                    self.execute_apply(
2903                        input_rows,
2904                        &subquery,
2905                        input_filter.as_ref(),
2906                        prop_manager,
2907                        params,
2908                        ctx,
2909                    )
2910                    .await
2911                }
2912                LogicalPlan::SubqueryCall { input, subquery } => {
2913                    let input_rows = self
2914                        .execute_subplan(*input, prop_manager, params, ctx)
2915                        .await?;
2916                    // Execute subquery for each input row (correlated)
2917                    // No input_filter for CALL { }
2918                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2919                        .await
2920                }
2921                LogicalPlan::RecursiveCTE {
2922                    cte_name,
2923                    initial,
2924                    recursive,
2925                } => {
2926                    self.execute_recursive_cte(
2927                        &cte_name,
2928                        *initial,
2929                        *recursive,
2930                        prop_manager,
2931                        params,
2932                        ctx,
2933                    )
2934                    .await
2935                }
2936                LogicalPlan::CrossJoin { left, right } => {
2937                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2938                        .await
2939                }
2940                LogicalPlan::Set { .. }
2941                | LogicalPlan::Remove { .. }
2942                | LogicalPlan::Merge { .. }
2943                | LogicalPlan::Create { .. }
2944                | LogicalPlan::CreateBatch { .. } => {
2945                    unreachable!("mutations are handled by DataFusion engine")
2946                }
2947                LogicalPlan::Delete { .. } => {
2948                    unreachable!("mutations are handled by DataFusion engine")
2949                }
2950                LogicalPlan::Copy {
2951                    target,
2952                    source,
2953                    is_export,
2954                    options,
2955                } => {
2956                    if is_export {
2957                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2958                            .await
2959                    } else {
2960                        self.execute_copy(&target, &source, &options, prop_manager)
2961                            .await
2962                    }
2963                }
2964                LogicalPlan::Backup {
2965                    destination,
2966                    options,
2967                } => self.execute_backup(&destination, &options).await,
2968                LogicalPlan::Explain { plan } => {
2969                    let plan_str = format!("{:#?}", plan);
2970                    let mut row = HashMap::new();
2971                    row.insert("plan".to_string(), Value::String(plan_str));
2972                    Ok(vec![row])
2973                }
2974                LogicalPlan::ShortestPath { .. } => {
2975                    unreachable!("ShortestPath is handled by DataFusion engine")
2976                }
2977                LogicalPlan::AllShortestPaths { .. } => {
2978                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2979                }
2980                LogicalPlan::Foreach { .. } => {
2981                    unreachable!("mutations are handled by DataFusion engine")
2982                }
2983                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2984                LogicalPlan::BindZeroLengthPath { .. } => {
2985                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2986                }
2987                LogicalPlan::BindPath { .. } => {
2988                    unreachable!("BindPath is handled by DataFusion engine")
2989                }
2990                LogicalPlan::QuantifiedPattern { .. } => {
2991                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
2992                }
2993                LogicalPlan::LocyProgram { .. }
2994                | LogicalPlan::LocyFold { .. }
2995                | LogicalPlan::LocyBestBy { .. }
2996                | LogicalPlan::LocyPriority { .. }
2997                | LogicalPlan::LocyDerivedScan { .. }
2998                | LogicalPlan::LocyProject { .. }
2999                | LogicalPlan::LocyModelInvoke { .. } => {
3000                    unreachable!("Locy operators are handled by DataFusion engine")
3001                }
3002            }
3003        })
3004    }
3005
3006    /// Execute a single plan from a FOREACH body with the given scope.
3007    ///
3008    /// Used by the DataFusion ForeachExec operator to delegate body clause
3009    /// execution back to the executor.
3010    #[expect(clippy::too_many_arguments)]
3011    pub(crate) async fn execute_foreach_body_plan(
3012        &self,
3013        plan: LogicalPlan,
3014        scope: &mut HashMap<String, Value>,
3015        writer: &uni_store::runtime::writer::Writer,
3016        prop_manager: &PropertyManager,
3017        params: &HashMap<String, Value>,
3018        ctx: Option<&QueryContext>,
3019        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3020    ) -> Result<()> {
3021        match plan {
3022            LogicalPlan::Set { items, .. } => {
3023                self.execute_set_items_locked(
3024                    &items,
3025                    scope,
3026                    writer,
3027                    prop_manager,
3028                    params,
3029                    ctx,
3030                    tx_l0,
3031                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3032                )
3033                .await?;
3034            }
3035            LogicalPlan::Remove { items, .. } => {
3036                self.execute_remove_items_locked(
3037                    &items,
3038                    scope,
3039                    writer,
3040                    prop_manager,
3041                    ctx,
3042                    tx_l0,
3043                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3044                )
3045                .await?;
3046            }
3047            LogicalPlan::Delete { items, detach, .. } => {
3048                for expr in &items {
3049                    let val = self
3050                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
3051                        .await?;
3052                    self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3053                        .await?;
3054                }
3055            }
3056            LogicalPlan::Create { pattern, .. } => {
3057                self.execute_create_pattern(
3058                    &pattern,
3059                    scope,
3060                    writer,
3061                    prop_manager,
3062                    params,
3063                    ctx,
3064                    tx_l0,
3065                )
3066                .await?;
3067            }
3068            LogicalPlan::CreateBatch { patterns, .. } => {
3069                for pattern in &patterns {
3070                    self.execute_create_pattern(
3071                        pattern,
3072                        scope,
3073                        writer,
3074                        prop_manager,
3075                        params,
3076                        ctx,
3077                        tx_l0,
3078                    )
3079                    .await?;
3080                }
3081            }
3082            LogicalPlan::Merge {
3083                pattern,
3084                on_match: _,
3085                on_create,
3086                ..
3087            } => {
3088                self.execute_create_pattern(
3089                    &pattern,
3090                    scope,
3091                    writer,
3092                    prop_manager,
3093                    params,
3094                    ctx,
3095                    tx_l0,
3096                )
3097                .await?;
3098                if let Some(on_create_clause) = on_create {
3099                    self.execute_set_items_locked(
3100                        &on_create_clause.items,
3101                        scope,
3102                        writer,
3103                        prop_manager,
3104                        params,
3105                        ctx,
3106                        tx_l0,
3107                        &crate::query::df_graph::mutation_common::Prefetch::default(),
3108                    )
3109                    .await?;
3110                }
3111            }
3112            LogicalPlan::Foreach {
3113                variable,
3114                list,
3115                body,
3116                ..
3117            } => {
3118                let list_val = self
3119                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
3120                    .await?;
3121                let items = match list_val {
3122                    Value::List(arr) => arr,
3123                    Value::Null => return Ok(()),
3124                    _ => return Err(anyhow!("FOREACH requires a list")),
3125                };
3126                for item in items {
3127                    let mut nested_scope = scope.clone();
3128                    nested_scope.insert(variable.clone(), item);
3129                    for nested_plan in &body {
3130                        Box::pin(self.execute_foreach_body_plan(
3131                            nested_plan.clone(),
3132                            &mut nested_scope,
3133                            writer,
3134                            prop_manager,
3135                            params,
3136                            ctx,
3137                            tx_l0,
3138                        ))
3139                        .await?;
3140                    }
3141                }
3142            }
3143            _ => {
3144                return Err(anyhow!(
3145                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3146                ));
3147            }
3148        }
3149        Ok(())
3150    }
3151
3152    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3153        let mut pairs: Vec<_> = row.iter().collect();
3154        pairs.sort_by_key(|(k, _)| *k);
3155
3156        pairs
3157            .into_iter()
3158            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3159            .collect::<Vec<_>>()
3160            .join("|")
3161    }
3162
3163    fn canonical_value_key(v: &Value) -> String {
3164        match v {
3165            Value::Null => "null".to_string(),
3166            Value::Bool(b) => format!("b:{b}"),
3167            Value::Int(i) => format!("n:{i}"),
3168            Value::Float(f) => {
3169                if f.is_nan() {
3170                    "nan".to_string()
3171                } else if f.is_infinite() {
3172                    if f.is_sign_positive() {
3173                        "inf:+".to_string()
3174                    } else {
3175                        "inf:-".to_string()
3176                    }
3177                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3178                    format!("n:{}", *f as i64)
3179                } else {
3180                    format!("f:{f}")
3181                }
3182            }
3183            Value::String(s) => {
3184                if let Some(k) = Self::temporal_string_key(s) {
3185                    format!("temporal:{k}")
3186                } else {
3187                    format!("s:{s}")
3188                }
3189            }
3190            Value::Bytes(b) => format!("bytes:{:?}", b),
3191            Value::List(items) => format!(
3192                "list:[{}]",
3193                items
3194                    .iter()
3195                    .map(Self::canonical_value_key)
3196                    .collect::<Vec<_>>()
3197                    .join(",")
3198            ),
3199            Value::Map(map) => {
3200                let mut pairs: Vec<_> = map.iter().collect();
3201                pairs.sort_by_key(|(k, _)| *k);
3202                format!(
3203                    "map:{{{}}}",
3204                    pairs
3205                        .into_iter()
3206                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3207                        .collect::<Vec<_>>()
3208                        .join(",")
3209                )
3210            }
3211            Value::Node(n) => {
3212                let mut labels = n.labels.clone();
3213                labels.sort();
3214                format!(
3215                    "node:{}:{}:{}",
3216                    n.vid.as_u64(),
3217                    labels.join(":"),
3218                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3219                )
3220            }
3221            Value::Edge(e) => format!(
3222                "edge:{}:{}:{}:{}:{}",
3223                e.eid.as_u64(),
3224                e.edge_type,
3225                e.src.as_u64(),
3226                e.dst.as_u64(),
3227                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3228            ),
3229            Value::Path(p) => format!(
3230                "path:nodes=[{}];edges=[{}]",
3231                p.nodes
3232                    .iter()
3233                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3234                    .collect::<Vec<_>>()
3235                    .join(","),
3236                p.edges
3237                    .iter()
3238                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3239                    .collect::<Vec<_>>()
3240                    .join(",")
3241            ),
3242            Value::Vector(vs) => format!("vec:{:?}", vs),
3243            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3244            _ => format!("{v:?}"),
3245        }
3246    }
3247
3248    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3249        match t {
3250            uni_common::TemporalValue::Date { days_since_epoch } => {
3251                format!("date:{days_since_epoch}")
3252            }
3253            uni_common::TemporalValue::LocalTime {
3254                nanos_since_midnight,
3255            } => format!("localtime:{nanos_since_midnight}"),
3256            uni_common::TemporalValue::Time {
3257                nanos_since_midnight,
3258                offset_seconds,
3259            } => {
3260                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3261                format!("time:{utc_nanos}")
3262            }
3263            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3264                format!("localdatetime:{nanos_since_epoch}")
3265            }
3266            uni_common::TemporalValue::DateTime {
3267                nanos_since_epoch, ..
3268            } => format!("datetime:{nanos_since_epoch}"),
3269            uni_common::TemporalValue::Duration {
3270                months,
3271                days,
3272                nanos,
3273            } => format!("duration:{months}:{days}:{nanos}"),
3274            uni_common::TemporalValue::Btic { lo, hi, meta } => {
3275                format!("btic:{lo}:{hi}:{meta}")
3276            }
3277        }
3278    }
3279
3280    fn temporal_string_key(s: &str) -> Option<String> {
3281        let fn_name = match classify_temporal(s)? {
3282            uni_common::TemporalType::Date => "DATE",
3283            uni_common::TemporalType::LocalTime => "LOCALTIME",
3284            uni_common::TemporalType::Time => "TIME",
3285            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3286            uni_common::TemporalType::DateTime => "DATETIME",
3287            uni_common::TemporalType::Duration => "DURATION",
3288            uni_common::TemporalType::Btic => return None, // BTIC uses dedicated parsing
3289        };
3290        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3291            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3292            _ => None,
3293        }
3294    }
3295
3296    /// Execute aggregate operation: GROUP BY + aggregate functions.
3297    /// Interval for timeout checks in aggregate loops.
3298    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3299
3300    pub(crate) async fn execute_aggregate(
3301        &self,
3302        rows: Vec<HashMap<String, Value>>,
3303        group_by: &[Expr],
3304        aggregates: &[Expr],
3305        prop_manager: &PropertyManager,
3306        params: &HashMap<String, Value>,
3307        ctx: Option<&QueryContext>,
3308    ) -> Result<Vec<HashMap<String, Value>>> {
3309        // CWE-400: Check timeout before aggregation
3310        if let Some(ctx) = ctx {
3311            ctx.check_timeout()?;
3312        }
3313
3314        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3315
3316        // Cypher semantics: aggregation without grouping keys returns one row even
3317        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3318        if rows.is_empty() {
3319            if group_by.is_empty() {
3320                let accs = Self::create_accumulators(aggregates);
3321                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3322                return Ok(vec![row]);
3323            }
3324            return Ok(vec![]);
3325        }
3326
3327        for (idx, row) in rows.into_iter().enumerate() {
3328            // Periodic timeout check during aggregation
3329            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3330                && let Some(ctx) = ctx
3331            {
3332                ctx.check_timeout()?;
3333            }
3334
3335            let key_vals = self
3336                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3337                .await?;
3338            // Build a canonical key so grouping follows Cypher value semantics
3339            // (e.g. temporal equality by instant, numeric normalization where applicable).
3340            let key_str = format!(
3341                "[{}]",
3342                key_vals
3343                    .iter()
3344                    .map(Self::canonical_value_key)
3345                    .collect::<Vec<_>>()
3346                    .join(",")
3347            );
3348
3349            let entry = groups
3350                .entry(key_str)
3351                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3352
3353            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3354                .await?;
3355        }
3356
3357        let results = groups
3358            .values()
3359            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3360            .collect();
3361
3362        Ok(results)
3363    }
3364
3365    pub(crate) async fn execute_window(
3366        &self,
3367        mut rows: Vec<HashMap<String, Value>>,
3368        window_exprs: &[Expr],
3369        _prop_manager: &PropertyManager,
3370        _params: &HashMap<String, Value>,
3371        ctx: Option<&QueryContext>,
3372    ) -> Result<Vec<HashMap<String, Value>>> {
3373        // CWE-400: Check timeout before window computation
3374        if let Some(ctx) = ctx {
3375            ctx.check_timeout()?;
3376        }
3377
3378        // If no rows or no window expressions, return as-is
3379        if rows.is_empty() || window_exprs.is_empty() {
3380            return Ok(rows);
3381        }
3382
3383        // Process each window function expression
3384        for window_expr in window_exprs {
3385            // Extract window function details
3386            let Expr::FunctionCall {
3387                name,
3388                args,
3389                window_spec: Some(window_spec),
3390                ..
3391            } = window_expr
3392            else {
3393                return Err(anyhow!(
3394                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3395                    window_expr
3396                ));
3397            };
3398
3399            let name_upper = name.to_uppercase();
3400
3401            // Validate it's a supported window function
3402            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3403                return Err(anyhow!(
3404                    "Unsupported window function: {}. Supported functions: {}",
3405                    name,
3406                    WINDOW_FUNCTIONS.join(", ")
3407                ));
3408            }
3409
3410            // Build partition groups based on PARTITION BY clause
3411            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3412
3413            for (row_idx, row) in rows.iter().enumerate() {
3414                // Evaluate partition key
3415                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3416                    // No partitioning - all rows in one partition
3417                    vec![]
3418                } else {
3419                    window_spec
3420                        .partition_by
3421                        .iter()
3422                        .map(|expr| self.evaluate_simple_expr(expr, row))
3423                        .collect::<Result<Vec<_>>>()?
3424                };
3425
3426                partition_map
3427                    .entry(partition_key)
3428                    .or_default()
3429                    .push(row_idx);
3430            }
3431
3432            // Process each partition
3433            for (_partition_key, row_indices) in partition_map.iter_mut() {
3434                // Sort rows within partition by ORDER BY clause
3435                if !window_spec.order_by.is_empty() {
3436                    row_indices.sort_by(|&a, &b| {
3437                        for sort_item in &window_spec.order_by {
3438                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3439                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3440
3441                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3442                                let cmp = Executor::compare_values(&va, &vb);
3443                                let cmp = if sort_item.ascending {
3444                                    cmp
3445                                } else {
3446                                    cmp.reverse()
3447                                };
3448                                if cmp != std::cmp::Ordering::Equal {
3449                                    return cmp;
3450                                }
3451                            }
3452                        }
3453                        std::cmp::Ordering::Equal
3454                    });
3455                }
3456
3457                // Compute window function values for this partition
3458                for (position, &row_idx) in row_indices.iter().enumerate() {
3459                    let window_value = match name_upper.as_str() {
3460                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3461                        "RANK" => {
3462                            // RANK: position (1-indexed) of first row in group of tied rows
3463                            let rank = if position == 0 {
3464                                1i64
3465                            } else {
3466                                let prev_row_idx = row_indices[position - 1];
3467                                let same_as_prev = self.rows_have_same_sort_keys(
3468                                    &window_spec.order_by,
3469                                    &rows,
3470                                    row_idx,
3471                                    prev_row_idx,
3472                                );
3473
3474                                if same_as_prev {
3475                                    // Walk backwards to find where this group started
3476                                    let mut group_start = position - 1;
3477                                    while group_start > 0 {
3478                                        let curr_idx = row_indices[group_start];
3479                                        let prev_idx = row_indices[group_start - 1];
3480                                        if !self.rows_have_same_sort_keys(
3481                                            &window_spec.order_by,
3482                                            &rows,
3483                                            curr_idx,
3484                                            prev_idx,
3485                                        ) {
3486                                            break;
3487                                        }
3488                                        group_start -= 1;
3489                                    }
3490                                    (group_start + 1) as i64
3491                                } else {
3492                                    (position + 1) as i64
3493                                }
3494                            };
3495                            Value::from(rank)
3496                        }
3497                        "DENSE_RANK" => {
3498                            // Dense rank: continuous ranking without gaps
3499                            let mut dense_rank = 1i64;
3500                            for i in 0..position {
3501                                let curr_idx = row_indices[i + 1];
3502                                let prev_idx = row_indices[i];
3503                                if !self.rows_have_same_sort_keys(
3504                                    &window_spec.order_by,
3505                                    &rows,
3506                                    curr_idx,
3507                                    prev_idx,
3508                                ) {
3509                                    dense_rank += 1;
3510                                }
3511                            }
3512                            Value::from(dense_rank)
3513                        }
3514                        "LAG" => {
3515                            let (value_expr, offset, default_value) =
3516                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3517
3518                            if position >= offset {
3519                                let target_idx = row_indices[position - offset];
3520                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3521                            } else {
3522                                default_value
3523                            }
3524                        }
3525                        "LEAD" => {
3526                            let (value_expr, offset, default_value) =
3527                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3528
3529                            if position + offset < row_indices.len() {
3530                                let target_idx = row_indices[position + offset];
3531                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3532                            } else {
3533                                default_value
3534                            }
3535                        }
3536                        "NTILE" => {
3537                            // Extract num_buckets argument: NTILE(num_buckets)
3538                            let num_buckets_expr = args.first().ok_or_else(|| {
3539                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3540                            })?;
3541                            let num_buckets_val =
3542                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3543                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3544                                anyhow!(
3545                                    "NTILE argument must be an integer, got: {:?}",
3546                                    num_buckets_val
3547                                )
3548                            })?;
3549
3550                            if num_buckets <= 0 {
3551                                return Err(anyhow!(
3552                                    "NTILE bucket count must be positive, got: {}",
3553                                    num_buckets
3554                                ));
3555                            }
3556
3557                            let num_buckets = num_buckets as usize;
3558                            let partition_size = row_indices.len();
3559
3560                            // Calculate bucket assignment using standard algorithm
3561                            // For N rows and B buckets:
3562                            // - Base size: N / B
3563                            // - Extra rows: N % B (go to first buckets)
3564                            let base_size = partition_size / num_buckets;
3565                            let extra_rows = partition_size % num_buckets;
3566
3567                            // Determine bucket for current row
3568                            let bucket = if position < extra_rows * (base_size + 1) {
3569                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3570                                position / (base_size + 1) + 1
3571                            } else {
3572                                // Row is in one of the normal-sized buckets
3573                                let adjusted_position = position - extra_rows * (base_size + 1);
3574                                extra_rows + (adjusted_position / base_size) + 1
3575                            };
3576
3577                            Value::from(bucket as i64)
3578                        }
3579                        "FIRST_VALUE" => {
3580                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3581                            let value_expr = args.first().ok_or_else(|| {
3582                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3583                            })?;
3584
3585                            // Get the first row in the partition (after ordering)
3586                            if row_indices.is_empty() {
3587                                Value::Null
3588                            } else {
3589                                let first_idx = row_indices[0];
3590                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3591                            }
3592                        }
3593                        "LAST_VALUE" => {
3594                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3595                            let value_expr = args.first().ok_or_else(|| {
3596                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3597                            })?;
3598
3599                            // Get the last row in the partition (after ordering)
3600                            if row_indices.is_empty() {
3601                                Value::Null
3602                            } else {
3603                                let last_idx = row_indices[row_indices.len() - 1];
3604                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3605                            }
3606                        }
3607                        "NTH_VALUE" => {
3608                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3609                            if args.len() != 2 {
3610                                return Err(anyhow!(
3611                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3612                                ));
3613                            }
3614
3615                            let value_expr = &args[0];
3616                            let n_expr = &args[1];
3617
3618                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3619                            let n = n_val.as_i64().ok_or_else(|| {
3620                                anyhow!(
3621                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3622                                    n_val
3623                                )
3624                            })?;
3625
3626                            if n <= 0 {
3627                                return Err(anyhow!(
3628                                    "NTH_VALUE position must be positive, got: {}",
3629                                    n
3630                                ));
3631                            }
3632
3633                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3634                            if nth_index < row_indices.len() {
3635                                let nth_idx = row_indices[nth_index];
3636                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3637                            } else {
3638                                Value::Null
3639                            }
3640                        }
3641                        _ => unreachable!("Window function {} already validated", name),
3642                    };
3643
3644                    // Add window function result to row
3645                    // Use the window expression's string representation as the column name
3646                    let col_name = window_expr.to_string_repr();
3647                    rows[row_idx].insert(col_name, window_value);
3648                }
3649            }
3650        }
3651
3652        Ok(rows)
3653    }
3654
3655    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3656    ///
3657    /// Uses `&self` for consistency with other evaluation methods, though it only
3658    /// recurses for property access.
3659    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3660        match expr {
3661            Expr::Variable(name) => row
3662                .get(name)
3663                .cloned()
3664                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3665            Expr::Property(base, prop) => {
3666                let base_val = self.evaluate_simple_expr(base, row)?;
3667                if let Value::Map(map) = base_val {
3668                    map.get(prop)
3669                        .cloned()
3670                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3671                } else {
3672                    Err(anyhow!("Cannot access property on non-object"))
3673                }
3674            }
3675            Expr::Literal(lit) => Ok(lit.to_value()),
3676            _ => Err(anyhow!(
3677                "Unsupported expression in window function: {:?}",
3678                expr
3679            )),
3680        }
3681    }
3682
3683    /// Check if two rows have matching sort keys for ranking functions.
3684    fn rows_have_same_sort_keys(
3685        &self,
3686        order_by: &[uni_cypher::ast::SortItem],
3687        rows: &[HashMap<String, Value>],
3688        idx_a: usize,
3689        idx_b: usize,
3690    ) -> bool {
3691        order_by.iter().all(|sort_item| {
3692            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3693            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3694            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3695        })
3696    }
3697
3698    /// Extract offset and default value for LAG/LEAD window functions.
3699    fn extract_lag_lead_params<'a>(
3700        &self,
3701        func_name: &str,
3702        args: &'a [Expr],
3703        row: &HashMap<String, Value>,
3704    ) -> Result<(&'a Expr, usize, Value)> {
3705        let value_expr = args.first().ok_or_else(|| {
3706            anyhow!(
3707                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3708                func_name,
3709                func_name
3710            )
3711        })?;
3712
3713        let offset = if let Some(offset_expr) = args.get(1) {
3714            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3715            offset_val.as_i64().ok_or_else(|| {
3716                anyhow!(
3717                    "{} offset must be an integer, got: {:?}",
3718                    func_name,
3719                    offset_val
3720                )
3721            })? as usize
3722        } else {
3723            1
3724        };
3725
3726        let default_value = if let Some(default_expr) = args.get(2) {
3727            self.evaluate_simple_expr(default_expr, row)?
3728        } else {
3729            Value::Null
3730        };
3731
3732        Ok((value_expr, offset, default_value))
3733    }
3734
3735    /// Evaluate group-by key expressions for a row.
3736    pub(crate) async fn evaluate_group_keys(
3737        &self,
3738        group_by: &[Expr],
3739        row: &HashMap<String, Value>,
3740        prop_manager: &PropertyManager,
3741        params: &HashMap<String, Value>,
3742        ctx: Option<&QueryContext>,
3743    ) -> Result<Vec<Value>> {
3744        let mut key_vals = Vec::new();
3745        for expr in group_by {
3746            key_vals.push(
3747                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3748                    .await?,
3749            );
3750        }
3751        Ok(key_vals)
3752    }
3753
3754    /// Update accumulators with values from the current row.
3755    pub(crate) async fn update_accumulators(
3756        &self,
3757        accs: &mut [Accumulator],
3758        aggregates: &[Expr],
3759        row: &HashMap<String, Value>,
3760        prop_manager: &PropertyManager,
3761        params: &HashMap<String, Value>,
3762        ctx: Option<&QueryContext>,
3763    ) -> Result<()> {
3764        for (i, agg_expr) in aggregates.iter().enumerate() {
3765            if let Expr::FunctionCall { args, .. } = agg_expr {
3766                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3767                let val = if is_wildcard {
3768                    Value::Null
3769                } else {
3770                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3771                        .await?
3772                };
3773                accs[i].update(&val, is_wildcard);
3774            }
3775        }
3776        Ok(())
3777    }
3778
3779    /// Execute sort operation with ORDER BY clauses.
3780    pub(crate) async fn execute_recursive_cte(
3781        &self,
3782        cte_name: &str,
3783        initial: LogicalPlan,
3784        recursive: LogicalPlan,
3785        prop_manager: &PropertyManager,
3786        params: &HashMap<String, Value>,
3787        ctx: Option<&QueryContext>,
3788    ) -> Result<Vec<HashMap<String, Value>>> {
3789        use std::collections::HashSet;
3790
3791        // Helper to create a stable key for cycle detection.
3792        // Uses sorted keys to ensure consistent ordering.
3793        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3794            let mut pairs: Vec<_> = row.iter().collect();
3795            pairs.sort_by(|a, b| a.0.cmp(b.0));
3796            format!("{pairs:?}")
3797        }
3798
3799        // 1. Execute Anchor
3800        let mut working_table = self
3801            .execute_subplan(initial, prop_manager, params, ctx)
3802            .await?;
3803        let mut result_table = working_table.clone();
3804
3805        // Track seen rows for cycle detection
3806        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3807
3808        // 2. Loop
3809        // Safety: Max iterations to prevent infinite loop
3810        let max_iterations = self.config.max_recursive_cte_iterations;
3811        for _iteration in 0..max_iterations {
3812            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3813            if let Some(ctx) = ctx {
3814                ctx.check_timeout()?;
3815            }
3816
3817            if working_table.is_empty() {
3818                break;
3819            }
3820
3821            // Bind working table to CTE name in params
3822            let working_val = Value::List(
3823                working_table
3824                    .iter()
3825                    .map(|row| {
3826                        if row.len() == 1 {
3827                            row.values().next().unwrap().clone()
3828                        } else {
3829                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3830                        }
3831                    })
3832                    .collect(),
3833            );
3834
3835            let mut next_params = params.clone();
3836            next_params.insert(cte_name.to_string(), working_val);
3837
3838            // Execute recursive part
3839            let next_result = self
3840                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3841                .await?;
3842
3843            if next_result.is_empty() {
3844                break;
3845            }
3846
3847            // Filter out already-seen rows (cycle detection)
3848            let new_rows: Vec<_> = next_result
3849                .into_iter()
3850                .filter(|row| {
3851                    let key = row_key(row);
3852                    seen.insert(key) // Returns false if already present
3853                })
3854                .collect();
3855
3856            if new_rows.is_empty() {
3857                // All results were cycles - terminate
3858                break;
3859            }
3860
3861            result_table.extend(new_rows.clone());
3862            working_table = new_rows;
3863        }
3864
3865        // Output accumulated results as a variable
3866        let final_list = Value::List(
3867            result_table
3868                .into_iter()
3869                .map(|row| {
3870                    // If the CTE returns a single column and we want to treat it as a list of values?
3871                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3872                    // Cypher LISTs usually contain values.
3873                    // If the row has 1 column, maybe unwrap?
3874                    // But SQL CTEs are tables.
3875                    // Let's stick to List<Map> for consistency with how we pass it in.
3876                    // UNLESS the user extracts it.
3877                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3878                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3879                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3880                    // If row has 1 col, users expect `[val, val]`.
3881                    if row.len() == 1 {
3882                        row.values().next().unwrap().clone()
3883                    } else {
3884                        Value::Map(row.into_iter().collect())
3885                    }
3886                })
3887                .collect(),
3888        );
3889
3890        let mut final_row = HashMap::new();
3891        final_row.insert(cte_name.to_string(), final_list);
3892        Ok(vec![final_row])
3893    }
3894
3895    /// Interval for timeout checks in sort loops.
3896    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3897
3898    pub(crate) async fn execute_sort(
3899        &self,
3900        rows: Vec<HashMap<String, Value>>,
3901        order_by: &[uni_cypher::ast::SortItem],
3902        prop_manager: &PropertyManager,
3903        params: &HashMap<String, Value>,
3904        ctx: Option<&QueryContext>,
3905    ) -> Result<Vec<HashMap<String, Value>>> {
3906        // CWE-400: Check timeout before potentially expensive sort
3907        if let Some(ctx) = ctx {
3908            ctx.check_timeout()?;
3909        }
3910
3911        let mut rows_with_keys = Vec::with_capacity(rows.len());
3912        for (idx, row) in rows.into_iter().enumerate() {
3913            // Periodic timeout check during key extraction
3914            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3915                && let Some(ctx) = ctx
3916            {
3917                ctx.check_timeout()?;
3918            }
3919
3920            let mut keys = Vec::new();
3921            for item in order_by {
3922                let val = row
3923                    .get(&item.expr.to_string_repr())
3924                    .cloned()
3925                    .unwrap_or(Value::Null);
3926                let val = if val.is_null() {
3927                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3928                        .await
3929                        .unwrap_or(Value::Null)
3930                } else {
3931                    val
3932                };
3933                keys.push(val);
3934            }
3935            rows_with_keys.push((row, keys));
3936        }
3937
3938        // Check timeout again before synchronous sort (can't be interrupted)
3939        if let Some(ctx) = ctx {
3940            ctx.check_timeout()?;
3941        }
3942
3943        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3944
3945        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3946    }
3947
3948    /// Create accumulators for aggregate expressions.
3949    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3950        aggregates
3951            .iter()
3952            .map(|expr| {
3953                if let Expr::FunctionCall { name, distinct, .. } = expr {
3954                    Accumulator::new(name, *distinct)
3955                } else {
3956                    Accumulator::new("COUNT", false)
3957                }
3958            })
3959            .collect()
3960    }
3961
3962    /// Build result row from group-by keys and accumulators.
3963    pub(crate) fn build_aggregate_result(
3964        group_by: &[Expr],
3965        aggregates: &[Expr],
3966        key_vals: &[Value],
3967        accs: &[Accumulator],
3968    ) -> HashMap<String, Value> {
3969        let mut res_row = HashMap::new();
3970        for (i, expr) in group_by.iter().enumerate() {
3971            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3972        }
3973        for (i, expr) in aggregates.iter().enumerate() {
3974            // Use aggregate_column_name to ensure consistency with planner
3975            let col_name = crate::query::planner::aggregate_column_name(expr);
3976            res_row.insert(col_name, accs[i].finish());
3977        }
3978        res_row
3979    }
3980
3981    /// Compare and return ordering for sort operation.
3982    pub(crate) fn compare_sort_keys(
3983        a_keys: &[Value],
3984        b_keys: &[Value],
3985        order_by: &[uni_cypher::ast::SortItem],
3986    ) -> std::cmp::Ordering {
3987        for (i, item) in order_by.iter().enumerate() {
3988            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3989            if order != std::cmp::Ordering::Equal {
3990                return if item.ascending {
3991                    order
3992                } else {
3993                    order.reverse()
3994                };
3995            }
3996        }
3997        std::cmp::Ordering::Equal
3998    }
3999
4000    /// Executes BACKUP command to local or cloud storage.
4001    ///
4002    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
4003    pub(crate) async fn execute_backup(
4004        &self,
4005        destination: &str,
4006        _options: &HashMap<String, Value>,
4007    ) -> Result<Vec<HashMap<String, Value>>> {
4008        // 1. Flush L0
4009        if let Some(writer_arc) = &self.writer {
4010            let writer: &uni_store::Writer = writer_arc.as_ref();
4011            writer.flush_to_l1(None).await?;
4012        }
4013
4014        // 2. Snapshot
4015        let snapshot_manager = self.storage.snapshot_manager();
4016        let snapshot = snapshot_manager
4017            .load_latest_snapshot()
4018            .await?
4019            .ok_or_else(|| anyhow!("No snapshot found"))?;
4020
4021        // 3. Copy files - cloud or local path
4022        if is_cloud_url(destination) {
4023            self.backup_to_cloud(destination, &snapshot.snapshot_id)
4024                .await?;
4025        } else {
4026            // Validate local destination path against sandbox
4027            let validated_dest = self.validate_path(destination)?;
4028            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4029                .await?;
4030        }
4031
4032        let mut res = HashMap::new();
4033        res.insert(
4034            "status".to_string(),
4035            Value::String("Backup completed".to_string()),
4036        );
4037        res.insert(
4038            "snapshot_id".to_string(),
4039            Value::String(snapshot.snapshot_id),
4040        );
4041        Ok(vec![res])
4042    }
4043
4044    /// Backs up database to a local filesystem destination.
4045    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4046        let source_path = std::path::Path::new(self.storage.base_path());
4047
4048        if !dest_path.exists() {
4049            std::fs::create_dir_all(dest_path)?;
4050        }
4051
4052        // Recursive copy (local to local)
4053        if source_path.exists() {
4054            Self::copy_dir_all(source_path, dest_path)?;
4055        }
4056
4057        // Copy schema to destination/catalog/schema.json
4058        let schema_manager = self.storage.schema_manager();
4059        let dest_catalog = dest_path.join("catalog");
4060        if !dest_catalog.exists() {
4061            std::fs::create_dir_all(&dest_catalog)?;
4062        }
4063
4064        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4065        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4066
4067        Ok(())
4068    }
4069
4070    /// Backs up database to a cloud storage destination.
4071    ///
4072    /// Streams data from source to destination, supporting cross-cloud backups.
4073    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4074        use object_store::ObjectStore;
4075        use object_store::ObjectStoreExt;
4076        use object_store::local::LocalFileSystem;
4077        use object_store::path::Path as ObjPath;
4078
4079        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4080        let source_path = std::path::Path::new(self.storage.base_path());
4081
4082        // Create local store for source, coerced to dyn ObjectStore
4083        let src_store: Arc<dyn ObjectStore> =
4084            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4085
4086        // Copy catalog/ directory
4087        let catalog_src = ObjPath::from("catalog");
4088        let catalog_dst = if dest_prefix.as_ref().is_empty() {
4089            ObjPath::from("catalog")
4090        } else {
4091            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4092        };
4093        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4094
4095        // Copy storage/ directory
4096        let storage_src = ObjPath::from("storage");
4097        let storage_dst = if dest_prefix.as_ref().is_empty() {
4098            ObjPath::from("storage")
4099        } else {
4100            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4101        };
4102        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4103
4104        // Ensure schema is present at canonical catalog location.
4105        let schema_manager = self.storage.schema_manager();
4106        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4107        let schema_path = if dest_prefix.as_ref().is_empty() {
4108            ObjPath::from("catalog/schema.json")
4109        } else {
4110            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4111        };
4112        dest_store
4113            .put(&schema_path, bytes::Bytes::from(schema_content).into())
4114            .await?;
4115
4116        Ok(())
4117    }
4118
4119    /// Maximum directory depth for backup operations.
4120    ///
4121    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
4122    /// excessively deep directory structures.
4123    const MAX_BACKUP_DEPTH: usize = 100;
4124
4125    /// Maximum file count for backup operations.
4126    ///
4127    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
4128    /// long-running operations from malicious or unexpectedly large directories.
4129    const MAX_BACKUP_FILES: usize = 100_000;
4130
4131    /// Recursively copies a directory with security limits.
4132    ///
4133    /// # Security
4134    ///
4135    /// - **CWE-674**: Depth limit prevents stack overflow
4136    /// - **CWE-400**: File count limit prevents resource exhaustion
4137    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
4138    pub(crate) fn copy_dir_all(
4139        src: &std::path::Path,
4140        dst: &std::path::Path,
4141    ) -> std::io::Result<()> {
4142        let mut file_count = 0usize;
4143        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4144    }
4145
4146    /// Internal implementation with depth and file count tracking.
4147    pub(crate) fn copy_dir_all_impl(
4148        src: &std::path::Path,
4149        dst: &std::path::Path,
4150        depth: usize,
4151        file_count: &mut usize,
4152    ) -> std::io::Result<()> {
4153        if depth >= Self::MAX_BACKUP_DEPTH {
4154            return Err(std::io::Error::new(
4155                std::io::ErrorKind::InvalidInput,
4156                format!(
4157                    "Maximum backup depth {} exceeded at {:?}",
4158                    Self::MAX_BACKUP_DEPTH,
4159                    src
4160                ),
4161            ));
4162        }
4163
4164        std::fs::create_dir_all(dst)?;
4165
4166        for entry in std::fs::read_dir(src)? {
4167            if *file_count >= Self::MAX_BACKUP_FILES {
4168                return Err(std::io::Error::new(
4169                    std::io::ErrorKind::InvalidInput,
4170                    format!(
4171                        "Maximum backup file count {} exceeded",
4172                        Self::MAX_BACKUP_FILES
4173                    ),
4174                ));
4175            }
4176            *file_count += 1;
4177
4178            let entry = entry?;
4179            let metadata = entry.metadata()?;
4180
4181            // Skip symlinks to prevent loops and traversal attacks
4182            if metadata.file_type().is_symlink() {
4183                // Silently skip - logging would require tracing dependency
4184                continue;
4185            }
4186
4187            let dst_path = dst.join(entry.file_name());
4188            if metadata.is_dir() {
4189                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4190            } else {
4191                std::fs::copy(entry.path(), dst_path)?;
4192            }
4193        }
4194        Ok(())
4195    }
4196
4197    pub(crate) async fn execute_copy(
4198        &self,
4199        target: &str,
4200        source: &str,
4201        options: &HashMap<String, Value>,
4202        prop_manager: &PropertyManager,
4203    ) -> Result<Vec<HashMap<String, Value>>> {
4204        let format = options
4205            .get("format")
4206            .and_then(|v| v.as_str())
4207            .unwrap_or_else(|| {
4208                if source.ends_with(".parquet") {
4209                    "parquet"
4210                } else {
4211                    "csv"
4212                }
4213            });
4214
4215        match format.to_lowercase().as_str() {
4216            "csv" => self.execute_csv_import(target, source, options).await,
4217            "parquet" => {
4218                self.execute_parquet_import(target, source, options, prop_manager)
4219                    .await
4220            }
4221            _ => Err(anyhow!("Unsupported format: {}", format)),
4222        }
4223    }
4224
4225    pub(crate) async fn execute_csv_import(
4226        &self,
4227        target: &str,
4228        source: &str,
4229        options: &HashMap<String, Value>,
4230    ) -> Result<Vec<HashMap<String, Value>>> {
4231        // Validate source path against sandbox
4232        let validated_source = self.validate_path(source)?;
4233
4234        let writer_lock = self
4235            .writer
4236            .as_ref()
4237            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4238
4239        let schema = self.storage.schema_manager().schema();
4240
4241        // 1. Determine if target is Label or EdgeType
4242        let label_meta = schema.labels.get(target);
4243        let edge_meta = schema.edge_types.get(target);
4244
4245        if label_meta.is_none() && edge_meta.is_none() {
4246            return Err(anyhow!("Target '{}' not found in schema", target));
4247        }
4248
4249        // 2. Open CSV
4250        let delimiter_str = options
4251            .get("delimiter")
4252            .and_then(|v| v.as_str())
4253            .unwrap_or(",");
4254        let delimiter = if delimiter_str.is_empty() {
4255            b','
4256        } else {
4257            delimiter_str.as_bytes()[0]
4258        };
4259        let has_header = options
4260            .get("header")
4261            .and_then(|v| v.as_bool())
4262            .unwrap_or(true);
4263
4264        let mut rdr = csv::ReaderBuilder::new()
4265            .delimiter(delimiter)
4266            .has_headers(has_header)
4267            .from_path(&validated_source)?;
4268
4269        let headers = rdr.headers()?.clone();
4270        let mut count = 0;
4271
4272        let writer: &uni_store::Writer = writer_lock.as_ref();
4273
4274        // Chunked VID/EID refill for streaming CSV: amortizes the IdAllocator
4275        // mutex over `CSV_ID_CHUNK` rows. End-of-iteration may waste up to
4276        // CSV_ID_CHUNK-1 IDs — harmless in the u64 space.
4277        const CSV_ID_CHUNK: usize = 256;
4278
4279        if label_meta.is_some() {
4280            let target_props = schema
4281                .properties
4282                .get(target)
4283                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4284
4285            let mut vid_chunk: std::collections::VecDeque<Vid> =
4286                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4287            for result in rdr.records() {
4288                let record = result?;
4289                let mut props = HashMap::new();
4290
4291                for (i, header) in headers.iter().enumerate() {
4292                    if let Some(val_str) = record.get(i)
4293                        && let Some(prop_meta) = target_props.get(header)
4294                    {
4295                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4296                        props.insert(header.to_string(), val);
4297                    }
4298                }
4299
4300                if vid_chunk.is_empty() {
4301                    vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4302                }
4303                let vid = vid_chunk.pop_front().unwrap();
4304                writer
4305                    .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4306                    .await?;
4307                count += 1;
4308            }
4309        } else if let Some(meta) = edge_meta {
4310            let type_id = meta.id;
4311            let target_props = schema
4312                .properties
4313                .get(target)
4314                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4315
4316            // For edges, we need src and dst VIDs.
4317            // Expecting columns '_src' and '_dst' or as specified in options.
4318            let src_col = options
4319                .get("src_col")
4320                .and_then(|v| v.as_str())
4321                .unwrap_or("_src");
4322            let dst_col = options
4323                .get("dst_col")
4324                .and_then(|v| v.as_str())
4325                .unwrap_or("_dst");
4326
4327            let mut eid_chunk: std::collections::VecDeque<Eid> =
4328                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4329            for result in rdr.records() {
4330                let record = result?;
4331                let mut props = HashMap::new();
4332                let mut src_vid = None;
4333                let mut dst_vid = None;
4334
4335                for (i, header) in headers.iter().enumerate() {
4336                    if let Some(val_str) = record.get(i) {
4337                        if header == src_col {
4338                            src_vid =
4339                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4340                        } else if header == dst_col {
4341                            dst_vid =
4342                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4343                        } else if let Some(prop_meta) = target_props.get(header) {
4344                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4345                            props.insert(header.to_string(), val);
4346                        }
4347                    }
4348                }
4349
4350                let src =
4351                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4352                let dst = dst_vid
4353                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4354
4355                if eid_chunk.is_empty() {
4356                    eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4357                }
4358                let eid = eid_chunk.pop_front().unwrap();
4359                writer
4360                    .insert_edge(
4361                        src,
4362                        dst,
4363                        type_id,
4364                        eid,
4365                        props,
4366                        Some(target.to_string()),
4367                        None,
4368                    )
4369                    .await?;
4370                count += 1;
4371            }
4372        }
4373
4374        let mut res = HashMap::new();
4375        res.insert("count".to_string(), Value::Int(count as i64));
4376        Ok(vec![res])
4377    }
4378
4379    /// Imports data from Parquet file to a label or edge type.
4380    ///
4381    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4382    pub(crate) async fn execute_parquet_import(
4383        &self,
4384        target: &str,
4385        source: &str,
4386        options: &HashMap<String, Value>,
4387        _prop_manager: &PropertyManager,
4388    ) -> Result<Vec<HashMap<String, Value>>> {
4389        let writer_lock = self
4390            .writer
4391            .as_ref()
4392            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4393
4394        let schema = self.storage.schema_manager().schema();
4395
4396        // 1. Determine if target is Label or EdgeType
4397        let label_meta = schema.labels.get(target);
4398        let edge_meta = schema.edge_types.get(target);
4399
4400        if label_meta.is_none() && edge_meta.is_none() {
4401            return Err(anyhow!("Target '{}' not found in schema", target));
4402        }
4403
4404        // 2. Open Parquet - support both local and cloud URLs
4405        let reader = if is_cloud_url(source) {
4406            self.open_parquet_from_cloud(source).await?
4407        } else {
4408            // Validate local source path against sandbox
4409            let validated_source = self.validate_path(source)?;
4410            let file = std::fs::File::open(&validated_source)?;
4411            let builder =
4412                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4413            builder.build()?
4414        };
4415        let mut reader = reader;
4416
4417        let mut count = 0;
4418        let writer: &uni_store::Writer = writer_lock.as_ref();
4419
4420        if label_meta.is_some() {
4421            let target_props = schema
4422                .properties
4423                .get(target)
4424                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4425
4426            for batch in reader.by_ref() {
4427                let batch = batch?;
4428                let num_rows = batch.num_rows();
4429                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
4430                let vids = writer.allocate_vids(num_rows).await?;
4431                for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4432                    let mut props = HashMap::new();
4433                    for field in batch.schema().fields() {
4434                        let name = field.name();
4435                        if target_props.contains_key(name) {
4436                            let col = batch.column_by_name(name).unwrap();
4437                            if !col.is_null(row) {
4438                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4439                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4440                                let val =
4441                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4442                                props.insert(name.clone(), val);
4443                            }
4444                        }
4445                    }
4446                    writer
4447                        .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4448                        .await?;
4449                    count += 1;
4450                }
4451            }
4452        } else if let Some(meta) = edge_meta {
4453            let type_id = meta.id;
4454            let target_props = schema
4455                .properties
4456                .get(target)
4457                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4458
4459            let src_col = options
4460                .get("src_col")
4461                .and_then(|v| v.as_str())
4462                .unwrap_or("_src");
4463            let dst_col = options
4464                .get("dst_col")
4465                .and_then(|v| v.as_str())
4466                .unwrap_or("_dst");
4467
4468            for batch in reader {
4469                let batch = batch?;
4470                let num_rows = batch.num_rows();
4471                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
4472                let eids = writer.allocate_eids(num_rows).await?;
4473                for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4474                    let mut props = HashMap::new();
4475                    let mut src_vid = None;
4476                    let mut dst_vid = None;
4477
4478                    for field in batch.schema().fields() {
4479                        let name = field.name();
4480                        let col = batch.column_by_name(name).unwrap();
4481                        if col.is_null(row) {
4482                            continue;
4483                        }
4484
4485                        if name == src_col {
4486                            let val = Self::arrow_to_value(col.as_ref(), row);
4487                            src_vid = Some(Self::vid_from_value(&val)?);
4488                        } else if name == dst_col {
4489                            let val = Self::arrow_to_value(col.as_ref(), row);
4490                            dst_vid = Some(Self::vid_from_value(&val)?);
4491                        } else if let Some(pm) = target_props.get(name) {
4492                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4493                            let val =
4494                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4495                            props.insert(name.clone(), val);
4496                        }
4497                    }
4498
4499                    let src = src_vid
4500                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4501                    let dst = dst_vid.ok_or_else(|| {
4502                        anyhow!("Missing destination VID in column '{}'", dst_col)
4503                    })?;
4504
4505                    writer
4506                        .insert_edge(
4507                            src,
4508                            dst,
4509                            type_id,
4510                            eid,
4511                            props,
4512                            Some(target.to_string()),
4513                            None,
4514                        )
4515                        .await?;
4516                    count += 1;
4517                }
4518            }
4519        }
4520
4521        let mut res = HashMap::new();
4522        res.insert("count".to_string(), Value::Int(count as i64));
4523        Ok(vec![res])
4524    }
4525
4526    /// Opens a Parquet file from a cloud URL.
4527    ///
4528    /// Downloads the file to memory and creates a Parquet reader.
4529    async fn open_parquet_from_cloud(
4530        &self,
4531        source_url: &str,
4532    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4533        use object_store::ObjectStoreExt;
4534
4535        let (store, path) = build_store_from_url(source_url)?;
4536
4537        // Download file contents
4538        let bytes = store.get(&path).await?.bytes().await?;
4539
4540        // Create a Parquet reader from the bytes
4541        let reader = bytes::Bytes::from(bytes.to_vec());
4542        let builder =
4543            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4544        Ok(builder.build()?)
4545    }
4546
4547    pub(crate) async fn scan_edge_type(
4548        &self,
4549        edge_type: &str,
4550        ctx: Option<&QueryContext>,
4551    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4552        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4553
4554        // 1. Scan L2 (Base)
4555        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4556
4557        // 2. Scan L1 (Delta)
4558        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4559
4560        // 3. Scan L0 (Memory) and filter tombstoned vertices
4561        if let Some(ctx) = ctx {
4562            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4563            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4564        }
4565
4566        Ok(edges
4567            .into_iter()
4568            .map(|(eid, (src, dst))| (eid, src, dst))
4569            .collect())
4570    }
4571
4572    /// Scan L2 (base) storage for edges of a given type.
4573    ///
4574    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4575    /// This L2 scan will typically find no data.
4576    pub(crate) async fn scan_edge_type_l2(
4577        &self,
4578        _edge_type: &str,
4579        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4580    ) -> Result<()> {
4581        // Edges are now stored in delta datasets (L1) via LanceDB.
4582        // Legacy L2 base edge storage is no longer used.
4583        Ok(())
4584    }
4585
4586    /// Scan L1 (delta) storage for edges of a given type.
4587    pub(crate) async fn scan_edge_type_l1(
4588        &self,
4589        edge_type: &str,
4590        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4591    ) -> Result<()> {
4592        if let Ok(Some(batch)) = self
4593            .storage
4594            .scan_delta_table(
4595                edge_type,
4596                "fwd",
4597                &["eid", "src_vid", "dst_vid", "op", "_version"],
4598                None,
4599            )
4600            .await
4601        {
4602            // Collect ops with versions: eid -> (version, op, src, dst)
4603            let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4604                HashMap::new();
4605
4606            self.process_delta_batch(&batch, &mut versioned_ops)?;
4607
4608            // Apply the winning ops
4609            for (eid, (_, op, src, dst)) in versioned_ops {
4610                if op == 0 {
4611                    edges.insert(eid, (src, dst));
4612                } else if op == 1 {
4613                    edges.remove(&eid);
4614                }
4615            }
4616        }
4617        Ok(())
4618    }
4619
4620    /// Process a delta batch, tracking versioned operations.
4621    pub(crate) fn process_delta_batch(
4622        &self,
4623        batch: &arrow_array::RecordBatch,
4624        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4625    ) -> Result<()> {
4626        use arrow_array::UInt64Array;
4627        let eid_col = batch
4628            .column_by_name("eid")
4629            .ok_or(anyhow!("Missing eid"))?
4630            .as_any()
4631            .downcast_ref::<UInt64Array>()
4632            .ok_or(anyhow!("Invalid eid"))?;
4633        let src_col = batch
4634            .column_by_name("src_vid")
4635            .ok_or(anyhow!("Missing src_vid"))?
4636            .as_any()
4637            .downcast_ref::<UInt64Array>()
4638            .ok_or(anyhow!("Invalid src_vid"))?;
4639        let dst_col = batch
4640            .column_by_name("dst_vid")
4641            .ok_or(anyhow!("Missing dst_vid"))?
4642            .as_any()
4643            .downcast_ref::<UInt64Array>()
4644            .ok_or(anyhow!("Invalid dst_vid"))?;
4645        let op_col = batch
4646            .column_by_name("op")
4647            .ok_or(anyhow!("Missing op"))?
4648            .as_any()
4649            .downcast_ref::<arrow_array::UInt8Array>()
4650            .ok_or(anyhow!("Invalid op"))?;
4651        let version_col = batch
4652            .column_by_name("_version")
4653            .ok_or(anyhow!("Missing _version"))?
4654            .as_any()
4655            .downcast_ref::<UInt64Array>()
4656            .ok_or(anyhow!("Invalid _version"))?;
4657
4658        for i in 0..batch.num_rows() {
4659            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4660            let version = version_col.value(i);
4661            let op = op_col.value(i);
4662            let src = Vid::from(src_col.value(i));
4663            let dst = Vid::from(dst_col.value(i));
4664
4665            match versioned_ops.entry(eid) {
4666                std::collections::hash_map::Entry::Vacant(e) => {
4667                    e.insert((version, op, src, dst));
4668                }
4669                std::collections::hash_map::Entry::Occupied(mut e) => {
4670                    if version > e.get().0 {
4671                        e.insert((version, op, src, dst));
4672                    }
4673                }
4674            }
4675        }
4676        Ok(())
4677    }
4678
4679    /// Scan L0 (memory) buffers for edges of a given type.
4680    pub(crate) fn scan_edge_type_l0(
4681        &self,
4682        edge_type: &str,
4683        ctx: &QueryContext,
4684        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4685    ) {
4686        let schema = self.storage.schema_manager().schema();
4687        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4688
4689        if let Some(type_id) = type_id {
4690            // Main L0
4691            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4692
4693            // Transaction L0
4694            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4695                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4696            }
4697
4698            // Pending flush L0s
4699            for pending_l0_arc in &ctx.pending_flush_l0s {
4700                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4701            }
4702        }
4703    }
4704
4705    /// Scan a single L0 buffer for edges and apply tombstones.
4706    pub(crate) fn scan_single_l0(
4707        &self,
4708        l0: &uni_store::runtime::L0Buffer,
4709        type_id: u32,
4710        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4711    ) {
4712        for edge_entry in l0.graph.edges() {
4713            if edge_entry.edge_type == type_id {
4714                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4715            }
4716        }
4717        // Process Tombstones
4718        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4719        for eid in eids_to_check {
4720            if l0.is_tombstoned(eid) {
4721                edges.remove(&eid);
4722            }
4723        }
4724    }
4725
4726    /// Filter out edges connected to tombstoned vertices.
4727    pub(crate) fn filter_tombstoned_vertex_edges(
4728        &self,
4729        ctx: &QueryContext,
4730        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4731    ) {
4732        let l0 = ctx.l0.read();
4733        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4734
4735        // Include tx_l0 vertex tombstones if present
4736        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4737            let tx_l0 = tx_l0_arc.read();
4738            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4739        }
4740
4741        // Include pending flush L0 vertex tombstones
4742        for pending_l0_arc in &ctx.pending_flush_l0s {
4743            let pending_l0 = pending_l0_arc.read();
4744            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4745        }
4746
4747        edges.retain(|_, (src, dst)| {
4748            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4749        });
4750    }
4751
4752    /// Execute a projection operation.
4753    pub(crate) async fn execute_project(
4754        &self,
4755        input_rows: Vec<HashMap<String, Value>>,
4756        projections: &[(Expr, Option<String>)],
4757        prop_manager: &PropertyManager,
4758        params: &HashMap<String, Value>,
4759        ctx: Option<&QueryContext>,
4760    ) -> Result<Vec<HashMap<String, Value>>> {
4761        let mut results = Vec::new();
4762        for m in input_rows {
4763            let mut row = HashMap::new();
4764            for (expr, alias) in projections {
4765                let val = self
4766                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4767                    .await?;
4768                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4769                row.insert(name, val);
4770            }
4771            results.push(row);
4772        }
4773        Ok(results)
4774    }
4775
4776    /// Execute an UNWIND operation.
4777    pub(crate) async fn execute_unwind(
4778        &self,
4779        input_rows: Vec<HashMap<String, Value>>,
4780        expr: &Expr,
4781        variable: &str,
4782        prop_manager: &PropertyManager,
4783        params: &HashMap<String, Value>,
4784        ctx: Option<&QueryContext>,
4785    ) -> Result<Vec<HashMap<String, Value>>> {
4786        let mut results = Vec::new();
4787        for row in input_rows {
4788            let val = self
4789                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4790                .await?;
4791            if let Value::List(items) = val {
4792                for item in items {
4793                    let mut new_row = row.clone();
4794                    new_row.insert(variable.to_string(), item);
4795                    results.push(new_row);
4796                }
4797            }
4798        }
4799        Ok(results)
4800    }
4801
4802    /// Execute an APPLY (correlated subquery) operation.
4803    pub(crate) async fn execute_apply(
4804        &self,
4805        input_rows: Vec<HashMap<String, Value>>,
4806        subquery: &LogicalPlan,
4807        input_filter: Option<&Expr>,
4808        prop_manager: &PropertyManager,
4809        params: &HashMap<String, Value>,
4810        ctx: Option<&QueryContext>,
4811    ) -> Result<Vec<HashMap<String, Value>>> {
4812        let mut filtered_rows = input_rows;
4813
4814        if let Some(filter) = input_filter {
4815            let mut filtered = Vec::new();
4816            for row in filtered_rows {
4817                let res = self
4818                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4819                    .await?;
4820                if res.as_bool().unwrap_or(false) {
4821                    filtered.push(row);
4822                }
4823            }
4824            filtered_rows = filtered;
4825        }
4826
4827        // Handle empty input: execute subquery once with empty context
4828        // This is critical for standalone CALL statements at the beginning of a query
4829        if filtered_rows.is_empty() {
4830            let sub_rows = self
4831                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4832                .await?;
4833            return Ok(sub_rows);
4834        }
4835
4836        let mut results = Vec::new();
4837        for row in filtered_rows {
4838            let mut sub_params = params.clone();
4839            sub_params.extend(row.clone());
4840
4841            let sub_rows = self
4842                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4843                .await?;
4844
4845            for sub_row in sub_rows {
4846                let mut new_row = row.clone();
4847                new_row.extend(sub_row);
4848                results.push(new_row);
4849            }
4850        }
4851        Ok(results)
4852    }
4853
4854    /// Execute SHOW INDEXES command.
4855    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4856        let schema = self.storage.schema_manager().schema();
4857        let mut rows = Vec::new();
4858        for idx in &schema.indexes {
4859            let (name, type_str, details) = match idx {
4860                uni_common::core::schema::IndexDefinition::Vector(c) => (
4861                    c.name.clone(),
4862                    "VECTOR",
4863                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4864                ),
4865                uni_common::core::schema::IndexDefinition::FullText(c) => (
4866                    c.name.clone(),
4867                    "FULLTEXT",
4868                    format!("on {}:{:?}", c.label, c.properties),
4869                ),
4870                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4871                    cfg.name.clone(),
4872                    "SCALAR",
4873                    format!(":{}({:?})", cfg.label, cfg.properties),
4874                ),
4875                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4876            };
4877
4878            if let Some(f) = filter
4879                && f != type_str
4880            {
4881                continue;
4882            }
4883
4884            let mut row = HashMap::new();
4885            row.insert("name".to_string(), Value::String(name));
4886            row.insert("type".to_string(), Value::String(type_str.to_string()));
4887            row.insert("details".to_string(), Value::String(details));
4888            rows.push(row);
4889        }
4890        rows
4891    }
4892
4893    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4894        let mut row = HashMap::new();
4895        row.insert("name".to_string(), Value::String("uni".to_string()));
4896        // Could add storage path, etc.
4897        vec![row]
4898    }
4899
4900    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4901        // Placeholder as we don't easy access to config struct from here
4902        vec![]
4903    }
4904
4905    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4906        let snapshot = self
4907            .storage
4908            .snapshot_manager()
4909            .load_latest_snapshot()
4910            .await?;
4911        let mut results = Vec::new();
4912
4913        if let Some(snap) = snapshot {
4914            for (label, s) in &snap.vertices {
4915                let mut row = HashMap::new();
4916                row.insert("type".to_string(), Value::String("Label".to_string()));
4917                row.insert("name".to_string(), Value::String(label.clone()));
4918                row.insert("count".to_string(), Value::Int(s.count as i64));
4919                results.push(row);
4920            }
4921            for (edge, s) in &snap.edges {
4922                let mut row = HashMap::new();
4923                row.insert("type".to_string(), Value::String("Edge".to_string()));
4924                row.insert("name".to_string(), Value::String(edge.clone()));
4925                row.insert("count".to_string(), Value::Int(s.count as i64));
4926                results.push(row);
4927            }
4928        }
4929
4930        Ok(results)
4931    }
4932
4933    pub(crate) fn execute_show_constraints(
4934        &self,
4935        clause: ShowConstraints,
4936    ) -> Vec<HashMap<String, Value>> {
4937        let schema = self.storage.schema_manager().schema();
4938        let mut rows = Vec::new();
4939        for c in &schema.constraints {
4940            if let Some(target) = &clause.target {
4941                match (target, &c.target) {
4942                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4943                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4944                        if e1 == e2 => {}
4945                    _ => continue,
4946                }
4947            }
4948
4949            let mut row = HashMap::new();
4950            row.insert("name".to_string(), Value::String(c.name.clone()));
4951            let type_str = match c.constraint_type {
4952                ConstraintType::Unique { .. } => "UNIQUE",
4953                ConstraintType::Exists { .. } => "EXISTS",
4954                ConstraintType::Check { .. } => "CHECK",
4955                _ => "UNKNOWN",
4956            };
4957            row.insert("type".to_string(), Value::String(type_str.to_string()));
4958
4959            let target_str = match &c.target {
4960                ConstraintTarget::Label(l) => format!("(:{})", l),
4961                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4962                _ => "UNKNOWN".to_string(),
4963            };
4964            row.insert("target".to_string(), Value::String(target_str));
4965
4966            rows.push(row);
4967        }
4968        rows
4969    }
4970
4971    /// Execute a MERGE operation.
4972    pub(crate) async fn execute_cross_join(
4973        &self,
4974        left: Box<LogicalPlan>,
4975        right: Box<LogicalPlan>,
4976        prop_manager: &PropertyManager,
4977        params: &HashMap<String, Value>,
4978        ctx: Option<&QueryContext>,
4979    ) -> Result<Vec<HashMap<String, Value>>> {
4980        let left_rows = self
4981            .execute_subplan(*left, prop_manager, params, ctx)
4982            .await?;
4983        let right_rows = self
4984            .execute_subplan(*right, prop_manager, params, ctx)
4985            .await?;
4986
4987        let mut results = Vec::new();
4988        for l in &left_rows {
4989            for r in &right_rows {
4990                let mut combined = l.clone();
4991                combined.extend(r.clone());
4992                results.push(combined);
4993            }
4994        }
4995        Ok(results)
4996    }
4997
4998    /// Execute a UNION operation with optional deduplication.
4999    pub(crate) async fn execute_union(
5000        &self,
5001        left: Box<LogicalPlan>,
5002        right: Box<LogicalPlan>,
5003        all: bool,
5004        prop_manager: &PropertyManager,
5005        params: &HashMap<String, Value>,
5006        ctx: Option<&QueryContext>,
5007    ) -> Result<Vec<HashMap<String, Value>>> {
5008        let mut left_rows = self
5009            .execute_subplan(*left, prop_manager, params, ctx)
5010            .await?;
5011        let mut right_rows = self
5012            .execute_subplan(*right, prop_manager, params, ctx)
5013            .await?;
5014
5015        left_rows.append(&mut right_rows);
5016
5017        if !all {
5018            let mut seen = HashSet::new();
5019            left_rows.retain(|row| {
5020                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5021                let key = format!("{sorted_row:?}");
5022                seen.insert(key)
5023            });
5024        }
5025        Ok(left_rows)
5026    }
5027
5028    /// Check if an index with the given name exists.
5029    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5030        let schema = self.storage.schema_manager().schema();
5031        schema.indexes.iter().any(|idx| match idx {
5032            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5033            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5034            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5035            _ => false,
5036        })
5037    }
5038
5039    pub(crate) async fn execute_export(
5040        &self,
5041        target: &str,
5042        source: &str,
5043        options: &HashMap<String, Value>,
5044        prop_manager: &PropertyManager,
5045        ctx: Option<&QueryContext>,
5046    ) -> Result<Vec<HashMap<String, Value>>> {
5047        let format = options
5048            .get("format")
5049            .and_then(|v| v.as_str())
5050            .unwrap_or("csv")
5051            .to_lowercase();
5052
5053        match format.as_str() {
5054            "csv" => {
5055                self.execute_csv_export(target, source, options, prop_manager, ctx)
5056                    .await
5057            }
5058            "parquet" => {
5059                self.execute_parquet_export(target, source, options, prop_manager, ctx)
5060                    .await
5061            }
5062            _ => Err(anyhow!("Unsupported export format: {}", format)),
5063        }
5064    }
5065
5066    pub(crate) async fn execute_csv_export(
5067        &self,
5068        target: &str,
5069        source: &str,
5070        options: &HashMap<String, Value>,
5071        prop_manager: &PropertyManager,
5072        ctx: Option<&QueryContext>,
5073    ) -> Result<Vec<HashMap<String, Value>>> {
5074        // Validate destination path against sandbox
5075        let validated_dest = self.validate_path(source)?;
5076
5077        let schema = self.storage.schema_manager().schema();
5078        let label_meta = schema.labels.get(target);
5079        let edge_meta = schema.edge_types.get(target);
5080
5081        if label_meta.is_none() && edge_meta.is_none() {
5082            return Err(anyhow!("Target '{}' not found in schema", target));
5083        }
5084
5085        let delimiter_str = options
5086            .get("delimiter")
5087            .and_then(|v| v.as_str())
5088            .unwrap_or(",");
5089        let delimiter = if delimiter_str.is_empty() {
5090            b','
5091        } else {
5092            delimiter_str.as_bytes()[0]
5093        };
5094        let has_header = options
5095            .get("header")
5096            .and_then(|v| v.as_bool())
5097            .unwrap_or(true);
5098
5099        let mut wtr = csv::WriterBuilder::new()
5100            .delimiter(delimiter)
5101            .from_path(&validated_dest)?;
5102
5103        let mut count = 0;
5104        // Empty properties map for labels/edge types without registered properties
5105        let empty_props = HashMap::new();
5106
5107        if let Some(meta) = label_meta {
5108            let label_id = meta.id;
5109            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5110            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5111            prop_names.sort();
5112
5113            let mut headers = vec!["_vid".to_string()];
5114            headers.extend(prop_names.clone());
5115
5116            if has_header {
5117                wtr.write_record(&headers)?;
5118            }
5119
5120            let vids = self
5121                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5122                .await?;
5123
5124            for vid in vids {
5125                let props = prop_manager
5126                    .get_all_vertex_props_with_ctx(vid, ctx)
5127                    .await?
5128                    .unwrap_or_default();
5129
5130                let mut row = Vec::with_capacity(headers.len());
5131                row.push(vid.to_string());
5132                for p_name in &prop_names {
5133                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5134                    row.push(self.format_csv_value(val));
5135                }
5136                wtr.write_record(&row)?;
5137                count += 1;
5138            }
5139        } else if let Some(meta) = edge_meta {
5140            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5141            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5142            prop_names.sort();
5143
5144            // Headers for Edge: _eid, _src, _dst, _type, ...props
5145            let mut headers = vec![
5146                "_eid".to_string(),
5147                "_src".to_string(),
5148                "_dst".to_string(),
5149                "_type".to_string(),
5150            ];
5151            headers.extend(prop_names.clone());
5152
5153            if has_header {
5154                wtr.write_record(&headers)?;
5155            }
5156
5157            let edges = self.scan_edge_type(target, ctx).await?;
5158
5159            for (eid, src, dst) in edges {
5160                let props = prop_manager
5161                    .get_all_edge_props_with_ctx(eid, ctx)
5162                    .await?
5163                    .unwrap_or_default();
5164
5165                let mut row = Vec::with_capacity(headers.len());
5166                row.push(eid.to_string());
5167                row.push(src.to_string());
5168                row.push(dst.to_string());
5169                row.push(meta.id.to_string());
5170
5171                for p_name in &prop_names {
5172                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5173                    row.push(self.format_csv_value(val));
5174                }
5175                wtr.write_record(&row)?;
5176                count += 1;
5177            }
5178        }
5179
5180        wtr.flush()?;
5181        let mut res = HashMap::new();
5182        res.insert("count".to_string(), Value::Int(count as i64));
5183        Ok(vec![res])
5184    }
5185
5186    /// Exports data to Parquet format.
5187    ///
5188    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
5189    pub(crate) async fn execute_parquet_export(
5190        &self,
5191        target: &str,
5192        destination: &str,
5193        _options: &HashMap<String, Value>,
5194        prop_manager: &PropertyManager,
5195        ctx: Option<&QueryContext>,
5196    ) -> Result<Vec<HashMap<String, Value>>> {
5197        let schema_manager = self.storage.schema_manager();
5198        let schema = schema_manager.schema();
5199        let label_meta = schema.labels.get(target);
5200        let edge_meta = schema.edge_types.get(target);
5201
5202        if label_meta.is_none() && edge_meta.is_none() {
5203            return Err(anyhow!("Target '{}' not found in schema", target));
5204        }
5205
5206        let arrow_schema = if label_meta.is_some() {
5207            let dataset = self.storage.vertex_dataset(target)?;
5208            dataset.get_arrow_schema(&schema)?
5209        } else {
5210            // Edge Schema
5211            let dataset = self.storage.edge_dataset(target, "", "")?;
5212            dataset.get_arrow_schema(&schema)?
5213        };
5214
5215        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5216
5217        if let Some(meta) = label_meta {
5218            let label_id = meta.id;
5219            let vids = self
5220                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5221                .await?;
5222
5223            for vid in vids {
5224                let mut props = prop_manager
5225                    .get_all_vertex_props_with_ctx(vid, ctx)
5226                    .await?
5227                    .unwrap_or_default();
5228
5229                props.insert(
5230                    "_vid".to_string(),
5231                    uni_common::Value::Int(vid.as_u64() as i64),
5232                );
5233                if !props.contains_key("_uid") {
5234                    props.insert(
5235                        "_uid".to_string(),
5236                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5237                    );
5238                }
5239                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5240                props.insert("_version".to_string(), uni_common::Value::Int(1));
5241                rows.push(props);
5242            }
5243        } else if edge_meta.is_some() {
5244            let edges = self.scan_edge_type(target, ctx).await?;
5245            for (eid, src, dst) in edges {
5246                let mut props = prop_manager
5247                    .get_all_edge_props_with_ctx(eid, ctx)
5248                    .await?
5249                    .unwrap_or_default();
5250
5251                props.insert(
5252                    "eid".to_string(),
5253                    uni_common::Value::Int(eid.as_u64() as i64),
5254                );
5255                props.insert(
5256                    "src_vid".to_string(),
5257                    uni_common::Value::Int(src.as_u64() as i64),
5258                );
5259                props.insert(
5260                    "dst_vid".to_string(),
5261                    uni_common::Value::Int(dst.as_u64() as i64),
5262                );
5263                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5264                props.insert("_version".to_string(), uni_common::Value::Int(1));
5265                rows.push(props);
5266            }
5267        }
5268
5269        // Write to cloud or local file
5270        if is_cloud_url(destination) {
5271            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5272                .await?;
5273        } else {
5274            // Validate local destination path against sandbox
5275            let validated_dest = self.validate_path(destination)?;
5276            let file = std::fs::File::create(&validated_dest)?;
5277            let mut writer =
5278                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5279
5280            // Write all in one batch for now (simplification)
5281            if !rows.is_empty() {
5282                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5283                writer.write(&batch)?;
5284            }
5285
5286            writer.close()?;
5287        }
5288
5289        let mut res = HashMap::new();
5290        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5291        Ok(vec![res])
5292    }
5293
5294    /// Writes Parquet data to a cloud storage destination.
5295    async fn write_parquet_to_cloud(
5296        &self,
5297        dest_url: &str,
5298        rows: &[HashMap<String, uni_common::Value>],
5299        arrow_schema: &arrow_schema::Schema,
5300    ) -> Result<()> {
5301        use object_store::ObjectStoreExt;
5302
5303        let (store, path) = build_store_from_url(dest_url)?;
5304
5305        // Write to an in-memory buffer
5306        let mut buffer = Vec::new();
5307        {
5308            let mut writer = parquet::arrow::ArrowWriter::try_new(
5309                &mut buffer,
5310                Arc::new(arrow_schema.clone()),
5311                None,
5312            )?;
5313
5314            if !rows.is_empty() {
5315                let batch = self.rows_to_batch(rows, arrow_schema)?;
5316                writer.write(&batch)?;
5317            }
5318
5319            writer.close()?;
5320        }
5321
5322        // Upload to cloud storage
5323        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5324
5325        Ok(())
5326    }
5327
5328    pub(crate) fn rows_to_batch(
5329        &self,
5330        rows: &[HashMap<String, uni_common::Value>],
5331        schema: &arrow_schema::Schema,
5332    ) -> Result<RecordBatch> {
5333        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5334
5335        for field in schema.fields() {
5336            let name = field.name();
5337            let dt = field.data_type();
5338
5339            let values: Vec<uni_common::Value> = rows
5340                .iter()
5341                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5342                .collect();
5343            let array = self.values_to_array(&values, dt)?;
5344            columns.push(array);
5345        }
5346
5347        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5348    }
5349
5350    /// Convert a slice of Values to an Arrow array.
5351    /// Delegates to the shared implementation in arrow_convert module.
5352    pub(crate) fn values_to_array(
5353        &self,
5354        values: &[uni_common::Value],
5355        dt: &arrow_schema::DataType,
5356    ) -> Result<Arc<dyn Array>> {
5357        arrow_convert::values_to_array(values, dt)
5358    }
5359
5360    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5361        match val {
5362            Value::Null => "".to_string(),
5363            Value::String(s) => s,
5364            Value::Int(i) => i.to_string(),
5365            Value::Float(f) => f.to_string(),
5366            Value::Bool(b) => b.to_string(),
5367            _ => format!("{val}"),
5368        }
5369    }
5370
5371    pub(crate) fn parse_csv_value(
5372        &self,
5373        s: &str,
5374        data_type: &uni_common::core::schema::DataType,
5375        prop_name: &str,
5376    ) -> Result<Value> {
5377        if s.is_empty() || s.to_lowercase() == "null" {
5378            return Ok(Value::Null);
5379        }
5380
5381        use uni_common::core::schema::DataType;
5382        match data_type {
5383            DataType::String => Ok(Value::String(s.to_string())),
5384            DataType::Int32 | DataType::Int64 => {
5385                let i = s.parse::<i64>().map_err(|_| {
5386                    anyhow!(
5387                        "Failed to parse integer for property '{}': {}",
5388                        prop_name,
5389                        s
5390                    )
5391                })?;
5392                Ok(Value::Int(i))
5393            }
5394            DataType::Float32 | DataType::Float64 => {
5395                let f = s.parse::<f64>().map_err(|_| {
5396                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5397                })?;
5398                Ok(Value::Float(f))
5399            }
5400            DataType::Bool => {
5401                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5402                    anyhow!(
5403                        "Failed to parse boolean for property '{}': {}",
5404                        prop_name,
5405                        s
5406                    )
5407                })?;
5408                Ok(Value::Bool(b))
5409            }
5410            DataType::CypherValue => {
5411                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5412                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5413                })?;
5414                Ok(Value::from(json_val))
5415            }
5416            DataType::Vector { .. } => {
5417                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5418                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5419                })?;
5420                Ok(Value::Vector(v))
5421            }
5422            _ => Ok(Value::String(s.to_string())),
5423        }
5424    }
5425
5426    pub(crate) async fn detach_delete_vertex(
5427        &self,
5428        vid: Vid,
5429        writer: &Writer,
5430        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5431    ) -> Result<()> {
5432        let schema = self.storage.schema_manager().schema();
5433        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5434
5435        // 1. Find and delete all outgoing edges
5436        let out_graph = self
5437            .storage
5438            .load_subgraph_cached(
5439                &[vid],
5440                &edge_type_ids,
5441                1,
5442                uni_store::runtime::Direction::Outgoing,
5443                Some(writer.l0_manager.get_current()),
5444            )
5445            .await?;
5446
5447        for edge in out_graph.edges() {
5448            writer
5449                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5450                .await?;
5451        }
5452
5453        // 2. Find and delete all incoming edges
5454        let in_graph = self
5455            .storage
5456            .load_subgraph_cached(
5457                &[vid],
5458                &edge_type_ids,
5459                1,
5460                uni_store::runtime::Direction::Incoming,
5461                Some(writer.l0_manager.get_current()),
5462            )
5463            .await?;
5464
5465        for edge in in_graph.edges() {
5466            writer
5467                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5468                .await?;
5469        }
5470
5471        Ok(())
5472    }
5473
5474    /// Load outgoing + incoming subgraphs (1-hop) for a batch of vertices
5475    /// in two scans — one per direction. Shared infrastructure for
5476    /// `batch_detach_delete_vertices` (which deletes the loaded edges) and
5477    /// `batch_check_vertices_have_no_edges` (which errors if any
5478    /// non-tombstoned edges exist).
5479    ///
5480    /// Returns raw `WorkingGraph`s with **no tombstone filtering**;
5481    /// callers that care about same-batch edge deletes (e.g., the
5482    /// non-detach check that runs after edges have been individually
5483    /// DELETEd) layer the filter on top.
5484    async fn batch_load_incident_edges(
5485        &self,
5486        vids: &[Vid],
5487        writer: &Writer,
5488    ) -> Result<(
5489        uni_store::runtime::working_graph::WorkingGraph,
5490        uni_store::runtime::working_graph::WorkingGraph,
5491    )> {
5492        let schema = self.storage.schema_manager().schema();
5493        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5494        let l0 = Some(writer.l0_manager.get_current());
5495
5496        let out_graph = self
5497            .storage
5498            .load_subgraph_cached(
5499                vids,
5500                &edge_type_ids,
5501                1,
5502                uni_store::runtime::Direction::Outgoing,
5503                l0.clone(),
5504            )
5505            .await?;
5506        let in_graph = self
5507            .storage
5508            .load_subgraph_cached(
5509                vids,
5510                &edge_type_ids,
5511                1,
5512                uni_store::runtime::Direction::Incoming,
5513                l0,
5514            )
5515            .await?;
5516        Ok((out_graph, in_graph))
5517    }
5518
5519    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5520    pub(crate) async fn batch_detach_delete_vertices(
5521        &self,
5522        vids: &[Vid],
5523        labels_per_vid: Vec<Option<Vec<String>>>,
5524        writer: &Writer,
5525        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5526    ) -> Result<()> {
5527        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5528
5529        for edge in out_graph.edges() {
5530            writer
5531                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5532                .await?;
5533        }
5534        for edge in in_graph.edges() {
5535            writer
5536                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5537                .await?;
5538        }
5539
5540        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5541            writer.delete_vertex(*vid, labels, tx_l0).await?;
5542        }
5543
5544        Ok(())
5545    }
5546
5547    /// Batch non-detach DELETE check: verify NO incident edges exist for
5548    /// any VID in `vids`, modulo tombstones from the writer's L0 and the
5549    /// tx-local L0 (same set the per-VID `check_vertex_has_no_edges`
5550    /// consults at write.rs:2650-2673).
5551    ///
5552    /// Why this exists: the per-VID `check_vertex_has_no_edges` does two
5553    /// `load_subgraph_cached(&[vid], ...)` calls per vertex (outgoing +
5554    /// incoming). At N vertices that's 2N subgraph loads. The batched
5555    /// path collapses to 2 loads regardless of N.
5556    ///
5557    /// Tombstone visibility: detach (which deletes everything) doesn't
5558    /// need the tombstone filter; non-detach does, because the caller
5559    /// may have already DELETEd some incident edges in the same
5560    /// statement (e.g. `MATCH (a)-[r]->(b) DELETE r, a` — edges are
5561    /// deleted before nodes per mutation_common.rs:720-724, so by the
5562    /// time this check runs, `r`'s eid is in `tx_l0.tombstones` and
5563    /// must be excluded from the "still has edges" set).
5564    pub(crate) async fn batch_check_vertices_have_no_edges(
5565        &self,
5566        vids: &[Vid],
5567        writer: &Writer,
5568        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5569    ) -> Result<()> {
5570        if vids.is_empty() {
5571            return Ok(());
5572        }
5573
5574        let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5575            std::collections::HashSet::new();
5576        {
5577            let writer_l0 = writer.l0_manager.get_current();
5578            let guard = writer_l0.read();
5579            for &eid in guard.tombstones.keys() {
5580                tombstoned_eids.insert(eid);
5581            }
5582        }
5583        if let Some(tx) = tx_l0 {
5584            let guard = tx.read();
5585            for &eid in guard.tombstones.keys() {
5586                tombstoned_eids.insert(eid);
5587            }
5588        }
5589
5590        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5591
5592        for edge in out_graph.edges().chain(in_graph.edges()) {
5593            if !tombstoned_eids.contains(&edge.eid) {
5594                return Err(anyhow!(
5595                    "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5596                    edge.src_vid
5597                ));
5598            }
5599        }
5600        Ok(())
5601    }
5602}