Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55// DataFusion engine imports
56use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73/// Collect VIDs from all L0 buffers visible to a query context.
74///
75/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
76/// collects the results. Returns an empty vec when no query context is present.
77fn collect_l0_vids(
78    ctx: Option<&QueryContext>,
79    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81    let mut vids = Vec::new();
82    if let Some(ctx) = ctx {
83        vids.extend(extractor(&ctx.l0.read()));
84        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85            vids.extend(extractor(&tx_l0_arc.read()));
86        }
87        for pending_l0_arc in &ctx.pending_flush_l0s {
88            vids.extend(extractor(&pending_l0_arc.read()));
89        }
90    }
91    vids
92}
93
94/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
95///
96/// This is the fallback for pushdown hydration - if the entity only has system fields
97/// (indicating pushdown didn't load properties), we load all properties here.
98///
99/// System field counts:
100/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
101/// - Vertex: 3 fields (_vid, _label, _uid)
102async fn hydrate_entity_if_needed(
103    map: &mut HashMap<String, Value>,
104    prop_manager: &PropertyManager,
105    ctx: Option<&QueryContext>,
106) {
107    // Check for edge entity
108    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110            tracing::debug!(
111                "Pushdown fallback: hydrating edge {} at execution time",
112                eid_u64
113            );
114            if let Ok(Some(props)) = prop_manager
115                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116                .await
117            {
118                for (key, value) in props {
119                    map.entry(key).or_insert(value);
120                }
121            }
122        } else {
123            tracing::trace!(
124                "Pushdown success: edge {} already has {} properties",
125                eid_u64,
126                map.len() - EDGE_SYSTEM_FIELD_COUNT
127            );
128        }
129        return;
130    }
131
132    // Check for vertex entity
133    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135            tracing::debug!(
136                "Pushdown fallback: hydrating vertex {} at execution time",
137                vid_u64
138            );
139            if let Ok(Some(props)) = prop_manager
140                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141                .await
142            {
143                for (key, value) in props {
144                    map.entry(key).or_insert(value);
145                }
146            }
147        } else {
148            tracing::trace!(
149                "Pushdown success: vertex {} already has {} properties",
150                vid_u64,
151                map.len() - VERTEX_SYSTEM_FIELD_COUNT
152            );
153        }
154    }
155}
156
157/// Promote a VID-string placeholder variable to a `Map`, draining its
158/// dotted system/property columns (`var.<prop>`) into the new map.
159///
160/// Used by `record_batches_to_rows` for search-procedure outputs, where the
161/// bare variable arrives as a VID string rather than a materialized Map.
162fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163    let prefix = format!("{}.", var);
164    let mut map = HashMap::new();
165
166    let dotted_keys: Vec<String> = row
167        .keys()
168        .filter(|k| k.starts_with(&prefix))
169        .cloned()
170        .collect();
171
172    for key in &dotted_keys {
173        let prop_name = &key[prefix.len()..];
174        if let Some(val) = row.remove(key) {
175            map.insert(prop_name.to_string(), val);
176        }
177    }
178
179    // Replace the VID-string placeholder with the constructed Map
180    row.insert(var.to_string(), Value::Map(map));
181}
182
183/// Merge the helper system columns (`var._vid`, `var._labels`, `var._eid`,
184/// `var._type`) and remaining USER property columns (`var.<prop>`) into the
185/// bare `Map` variable, removing the dotted helper columns from `row`.
186///
187/// Merging user properties keeps `RETURN var.prop` consistent after a
188/// unit-subquery SET refreshes the dotted columns: without it the (now stale)
189/// bare entity Struct from the outer scan would override the post-SET values.
190/// Internal helpers (`_all_props`, `_src_vid`, …) and `overflow_json` are
191/// dropped silently.
192fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193    // Merge node system fields (_vid, _labels)
194    let vid_key = format!("{}._vid", var);
195    let labels_key = format!("{}._labels", var);
196
197    let vid_val = row.remove(&vid_key);
198    let labels_val = row.remove(&labels_key);
199
200    if let Some(Value::Map(map)) = row.get_mut(var) {
201        if let Some(v) = vid_val {
202            map.insert("_vid".to_string(), v);
203        }
204        if let Some(v) = labels_val {
205            map.insert("_labels".to_string(), v);
206        }
207    }
208
209    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
210    // These are emitted as helper columns by the traverse exec.
211    // The structural projection already includes them in the struct,
212    // but we still need to remove the dotted helper columns.
213    let eid_key = format!("{}._eid", var);
214    let type_key = format!("{}._type", var);
215
216    let eid_val = row.remove(&eid_key);
217    let type_val = row.remove(&type_key);
218
219    if (eid_val.is_some() || type_val.is_some())
220        && let Some(Value::Map(map)) = row.get_mut(var)
221    {
222        if let Some(v) = eid_val {
223            map.entry("_eid".to_string()).or_insert(v);
224        }
225        if let Some(v) = type_val {
226            map.entry("_type".to_string()).or_insert(v);
227        }
228    }
229
230    // Drain remaining dotted columns, merging surviving USER properties.
231    let prefix = format!("{}.", var);
232    let dotted_keys: Vec<String> = row
233        .keys()
234        .filter(|k| k.starts_with(&prefix))
235        .cloned()
236        .collect();
237    for key in dotted_keys {
238        let prop_name = key[prefix.len()..].to_string();
239        let val = row.remove(&key);
240        if prop_name.starts_with('_') || prop_name == "overflow_json" {
241            continue;
242        }
243        if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244            map.insert(prop_name, val);
245        }
246    }
247}
248
249impl Executor {
250    /// Helper to verify and filter candidates against an optional predicate.
251    ///
252    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
253    /// Returns only VIDs that pass the filter (or are not deleted).
254    async fn verify_and_filter_candidates(
255        &self,
256        mut candidates: Vec<Vid>,
257        variable: &str,
258        filter: Option<&Expr>,
259        ctx: Option<&QueryContext>,
260        prop_manager: &PropertyManager,
261        params: &HashMap<String, Value>,
262    ) -> Result<Vec<Vid>> {
263        candidates.sort_unstable();
264        candidates.dedup();
265
266        let mut verified_vids = Vec::new();
267        for vid in candidates {
268            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269                continue; // Deleted
270            };
271
272            if let Some(expr) = filter {
273                let mut props_map: HashMap<String, Value> = props;
274                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276                let mut row = HashMap::new();
277                row.insert(variable.to_string(), Value::Map(props_map));
278
279                let res = self
280                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
281                    .await?;
282                if res.as_bool().unwrap_or(false) {
283                    verified_vids.push(vid);
284                }
285            } else {
286                verified_vids.push(vid);
287            }
288        }
289
290        Ok(verified_vids)
291    }
292
293    pub(crate) async fn scan_storage_candidates(
294        &self,
295        label_id: u16,
296        variable: &str,
297        filter: Option<&Expr>,
298    ) -> Result<Vec<Vid>> {
299        let schema = self.storage.schema_manager().schema();
300        let label_name = schema
301            .label_name_by_id(label_id)
302            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304        // Generate filter SQL from expression
305        let empty_props = std::collections::HashMap::new();
306        let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307        let filter_sql = filter.and_then(|expr| {
308            LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309        });
310
311        self.storage
312            .scan_vertex_candidates(label_name, filter_sql.as_deref())
313            .await
314    }
315
316    pub(crate) async fn scan_label_with_filter(
317        &self,
318        label_id: u16,
319        variable: &str,
320        filter: Option<&Expr>,
321        ctx: Option<&QueryContext>,
322        prop_manager: &PropertyManager,
323        params: &HashMap<String, Value>,
324    ) -> Result<Vec<Vid>> {
325        let mut candidates = self
326            .scan_storage_candidates(label_id, variable, filter)
327            .await?;
328
329        // Convert label_id to label_name for L0 lookup
330        let schema = self.storage.schema_manager().schema();
331        if let Some(label_name) = schema.label_name_by_id(label_id) {
332            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333        }
334
335        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336            .await
337    }
338
339    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340        // Handle Value::Node directly (has vid field)
341        if let Value::Node(node) = val {
342            return Ok(node.vid);
343        }
344        // Handle Object (node) containing _vid field
345        if let Value::Map(map) = val
346            && let Some(vid_val) = map.get("_vid")
347            && let Some(v) = vid_val.as_u64()
348        {
349            return Ok(Vid::from(v));
350        }
351        // Handle string format
352        if let Some(s) = val.as_str()
353            && let Ok(id) = s.parse::<u64>()
354        {
355            return Ok(Vid::new(id));
356        }
357        // Handle raw u64
358        if let Some(v) = val.as_u64() {
359            return Ok(Vid::from(v));
360        }
361        Err(anyhow!("Invalid Vid format: {:?}", val))
362    }
363
364    /// Find a node value in the row by VID.
365    ///
366    /// Scans all values in the row, looking for a node (Map or Node) whose VID
367    /// matches the target. Returns the full node value if found, or a minimal
368    /// Map with just `_vid` as fallback.
369    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370        for val in row.values() {
371            if let Ok(vid) = Self::vid_from_value(val)
372                && vid == target_vid
373            {
374                return val.clone();
375            }
376        }
377        // Fallback: return minimal node map
378        Value::Map(HashMap::from([(
379            "_vid".to_string(),
380            Value::Int(target_vid.as_u64() as i64),
381        )]))
382    }
383
384    /// Create L0 context, session, and planner for DataFusion execution.
385    ///
386    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
387    /// Returns `(session_ctx, planner, prop_manager_arc)`.
388    pub async fn create_datafusion_planner(
389        &self,
390        prop_manager: &PropertyManager,
391        params: &HashMap<String, Value>,
392    ) -> Result<(
393        Arc<SyncRwLock<SessionContext>>,
394        HybridPhysicalPlanner,
395        Arc<PropertyManager>,
396    )> {
397        let query_ctx = self.get_context().await;
398        let l0_context = match query_ctx {
399            Some(ref ctx) => L0Context::from_query_context(ctx),
400            None => L0Context::empty(),
401        };
402
403        // C2: scans route through the version-pinned storage manager so
404        // post-snapshot L1 ROWS are invisible (the row-existence pin). The
405        // PropertyManager, by contrast, stays on LIVE storage: property
406        // point-reads must honor read-your-writes (a transaction's own
407        // uncommitted edge/vertex properties live in tx_l0, not L1, and a
408        // version filter would hide them — breaking e.g. MERGE's
409        // edge-property match). Cross-transaction property skew on an
410        // already-visible row is caught by OCC at commit, not the pin.
411        let effective_storage = self.effective_storage();
412
413        // Prefer the shared `Arc<PropertyManager>` installed via
414        // `Executor::set_prop_manager` — skips the LRU/Mutex allocation cost
415        // (~80 µs/query) of constructing a fresh, immediately-abandoned
416        // PropertyManager. Falls back to the legacy fresh-construction path
417        // for callers that have not installed one.
418        let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
419            shared.clone()
420        } else {
421            Arc::new(PropertyManager::new(
422                self.storage.clone(),
423                self.storage.schema_manager_arc(),
424                prop_manager.cache_size(),
425            ))
426        };
427
428        // Two-mode session construction (hot/cold template path, main)
429        // fused with plugin-framework scalar/optimizer registration
430        // (plugin-fw).
431        //
432        // Hot path: clone the pre-built `df_session_template` — an O(1)
433        // Arc bump on the inner `SessionState`. Skips the ~140 µs cost of
434        // `SessionContext::new()` + `register_cypher_udfs()`. The template
435        // only has the built-in Cypher UDFs baked in (see
436        // `Uni::build` → `register_cypher_udfs`), so it is ONLY safe to
437        // reuse when this query needs no per-query dynamic registration:
438        //   - no legacy `CustomFunctionRegistry` scalars,
439        //   - no plugin-registered optimizer rules,
440        //   - no host-level plugin scalars (`Uni::load_*_plugin`),
441        //   - no session-scoped plugin scalars.
442        //
443        // Cold path: construct a fresh, isolated `SessionContext` (folding
444        // in any plugin optimizer rules) and register the dynamic scalar
445        // sets so they do not leak into the shared template.
446        let has_custom_udfs = self
447            .custom_function_registry
448            .as_ref()
449            .is_some_and(|r| !r.is_empty());
450        let host_plugin_registry = self
451            .procedure_registry
452            .as_ref()
453            .and_then(|pr| pr.plugin_registry());
454        // Optimizer rules come from the host plugin registry (the legacy
455        // `CustomFunctionRegistry` only ever carried scalar fns). When no
456        // host registry is attached, there are no plugin optimizer rules.
457        let optimizer_providers = host_plugin_registry
458            .as_ref()
459            .map(|pr| pr.optimizer_rules())
460            .unwrap_or_default();
461        let session_local_registry =
462            crate::query::df_udfs_plugin::current_session_plugin_registry();
463        let needs_dynamic_registration = has_custom_udfs
464            || !optimizer_providers.is_empty()
465            || host_plugin_registry.is_some()
466            || session_local_registry.is_some();
467
468        let session = if let (Some(tmpl), false) = (
469            self.df_session_template.as_ref(),
470            needs_dynamic_registration,
471        ) {
472            // Hot path: clone the template (Cypher UDFs already registered).
473            (**tmpl).clone()
474        } else {
475            // Cold path: build a fresh session, optionally with any
476            // plugin-registered optimizer rules folded in.
477            //
478            // M5h: build a `SessionContext` that includes any
479            // plugin-registered optimizer rules. The rule chain is
480            // snapshotted at session-construction time; reload discipline
481            // is M10's problem.
482            let session = if optimizer_providers.is_empty() {
483                SessionContext::new()
484            } else {
485                use datafusion::execution::session_state::SessionStateBuilder;
486                use uni_plugin::traits::operator::OptimizerPhase;
487                let mut builder = SessionStateBuilder::new().with_default_features();
488                for provider in optimizer_providers.iter() {
489                    match provider.phase() {
490                        OptimizerPhase::Logical => {
491                            builder = builder.with_optimizer_rule(provider.rule());
492                        }
493                        OptimizerPhase::Physical => {
494                            if let Some(rule) = provider.physical_rule() {
495                                builder = builder.with_physical_optimizer_rule(rule);
496                            } else {
497                                tracing::debug!(
498                                    target: "uni.plugin.registry",
499                                    "physical-phase provider returned no physical_rule(); skipping"
500                                );
501                            }
502                        }
503                        OptimizerPhase::Both => {
504                            builder = builder.with_optimizer_rule(provider.rule());
505                            if let Some(rule) = provider.physical_rule() {
506                                builder = builder.with_physical_optimizer_rule(rule);
507                            }
508                        }
509                        _ => {
510                            tracing::debug!(
511                                target: "uni.plugin.registry",
512                                "skipping optimizer rule for unknown phase"
513                            );
514                        }
515                    }
516                }
517                let state = builder.build();
518                SessionContext::new_with_state(state)
519            };
520            crate::query::df_udfs::register_cypher_udfs(&session)?;
521            // Instance-scope, legacy `CustomFunctionRegistry` shadow path
522            // (db.register_function() entries + apoc-core mirrors). Routed
523            // through the plugin-registry adapter (plugin-fw).
524            if let Some(ref registry) = self.custom_function_registry {
525                crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
526                    &session, registry,
527                )?;
528            }
529            // Instance-scope, **host** plugin registry (M8.6 follow-up):
530            // `Uni::load_python_plugin` / `Uni::load_rhai_plugin` /
531            // `Uni::load_wasm_*` and other host-level plugin-load paths
532            // register into `UniInner.plugin_registry`. The executor
533            // reaches that registry via the procedure-registry's
534            // attached handle (set by `Uni::build` at construction time).
535            // Without this branch, scalars added through
536            // `Uni::load_*_plugin` were invisible to Cypher's UDF
537            // resolution despite being directly invokable via
538            // `db.plugin_registry().scalar_fn(...)`.
539            if let Some(ref host_pr) = host_plugin_registry {
540                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
541            }
542            // Session-scope (M8.6): when the call is inside a
543            // `scoped_with_session_plugin_registry` scope (set by a host
544            // crate's per-query execution path), register the session's
545            // local plugin scalars *last* so they shadow instance entries
546            // by name (DataFusion's `register_udf` is last-write-wins).
547            if let Some(ref session_local) = session_local_registry {
548                crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
549            }
550            session
551        };
552        let session_ctx = Arc::new(SyncRwLock::new(session));
553
554        let mut planner = HybridPhysicalPlanner::with_l0_context(
555            session_ctx.clone(),
556            effective_storage.clone(),
557            l0_context,
558            prop_manager_arc.clone(),
559            effective_storage.schema_manager().schema(),
560            params.clone(),
561            HashMap::new(),
562        );
563
564        planner = planner.with_algo_registry(self.algo_registry.clone());
565        if let Some(ref registry) = self.procedure_registry {
566            planner = planner.with_procedure_registry(registry.clone());
567        }
568        // M5b follow-up #4: thread the host's plugin registry through to
569        // the physical planner so `plan_vector_knn` can consult
570        // `register_index_handle` for plugin-supplied vector index
571        // handles. The procedure registry's attached plugin registry is
572        // the host's actual instance (set by `Uni::build`, and what
573        // `db.plugin_registry()` returns to user code). The legacy
574        // `CustomFunctionRegistry` only ever carried scalar fns (no index
575        // handles / optimizer rules), so it is not a source here.
576        let host_plugin_registry = self
577            .procedure_registry
578            .as_ref()
579            .and_then(|r| r.plugin_registry());
580        if let Some(registry) = host_plugin_registry {
581            planner = planner.with_plugin_registry(registry);
582        }
583        if let Some(ref xervo_runtime) = self.xervo_runtime {
584            planner = planner.with_xervo_runtime(xervo_runtime.clone());
585        }
586        // FU-1 / M11 #6: thread the outer transaction's writer handle
587        // through so declared `WRITE`-mode procedures invoked from
588        // this query can run their Cypher bodies via the write-enabled
589        // inner-query host.
590        if let Some(writer) = &self.writer {
591            planner = planner.with_writer(Arc::clone(writer));
592        }
593
594        Ok((session_ctx, planner, prop_manager_arc))
595    }
596
597    /// Execute a DataFusion physical plan and collect all result batches.
598    pub fn collect_batches(
599        session_ctx: &Arc<SyncRwLock<SessionContext>>,
600        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
601    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
602        Box::pin(async move {
603            use futures::TryStreamExt;
604
605            let task_ctx = session_ctx.read().task_ctx();
606            let partition_count = execution_plan.output_partitioning().partition_count();
607            let mut all_batches = Vec::new();
608            for partition in 0..partition_count {
609                let stream = execution_plan.execute(partition, task_ctx.clone())?;
610                let batches: Vec<RecordBatch> = stream.try_collect().await?;
611                all_batches.extend(batches);
612            }
613            Ok(all_batches)
614        })
615    }
616
617    /// Executes a query using the DataFusion-based engine.
618    ///
619    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
620    /// trees with custom graph operators for graph-specific operations.
621    pub async fn execute_datafusion(
622        &self,
623        plan: LogicalPlan,
624        prop_manager: &PropertyManager,
625        params: &HashMap<String, Value>,
626    ) -> Result<Vec<RecordBatch>> {
627        let (batches, _plan) = self
628            .execute_datafusion_with_plan(plan, prop_manager, params)
629            .await?;
630        Ok(batches)
631    }
632
633    /// Executes a query using the DataFusion-based engine, returning both
634    /// result batches and the physical execution plan.
635    ///
636    /// The returned `Arc<dyn ExecutionPlan>` can be walked to extract per-operator
637    /// metrics (e.g., `output_rows`, `elapsed_compute`) that DataFusion's
638    /// `BaselineMetrics` recorded during execution.
639    pub async fn execute_datafusion_with_plan(
640        &self,
641        plan: LogicalPlan,
642        prop_manager: &PropertyManager,
643        params: &HashMap<String, Value>,
644    ) -> Result<(
645        Vec<RecordBatch>,
646        Arc<dyn datafusion::physical_plan::ExecutionPlan>,
647    )> {
648        let (session_ctx, mut planner, prop_manager_arc) =
649            self.create_datafusion_planner(prop_manager, params).await?;
650
651        // Build MutationContext when the plan contains write operations
652        if Self::contains_write_operations(&plan) {
653            let writer = self
654                .writer
655                .as_ref()
656                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
657                .clone();
658            let query_ctx = self.get_context().await;
659
660            debug_assert!(
661                query_ctx.is_some(),
662                "BUG: query_ctx is None for write operation"
663            );
664
665            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
666                executor: self.clone(),
667                writer,
668                prop_manager: prop_manager_arc,
669                params: params.clone(),
670                query_ctx,
671                tx_l0_override: self.transaction_l0_override.clone(),
672            });
673            planner = planner.with_mutation_context(mutation_ctx);
674            tracing::debug!(
675                plan_type = Self::get_plan_type(&plan),
676                "Mutation routed to DataFusion engine"
677            );
678        }
679
680        let execution_plan = planner.plan(&plan)?;
681        let plan_clone = Arc::clone(&execution_plan);
682        let result = Self::collect_batches(&session_ctx, execution_plan).await;
683
684        // Harvest warnings from the graph execution context after query completion.
685        let graph_warnings = planner.graph_ctx().take_warnings();
686        if !graph_warnings.is_empty()
687            && let Ok(mut w) = self.warnings.lock()
688        {
689            w.extend(graph_warnings);
690        }
691
692        result.map(|batches| (batches, plan_clone))
693    }
694
695    /// Execute a MERGE read sub-plan through the DataFusion engine.
696    ///
697    /// Plans and executes the MERGE pattern match using flat columnar output
698    /// (no structural projections), then groups dotted columns (`a._vid`,
699    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
700    pub(crate) async fn execute_merge_read_plan(
701        &self,
702        plan: LogicalPlan,
703        prop_manager: &PropertyManager,
704        params: &HashMap<String, Value>,
705        merge_variables: Vec<String>,
706    ) -> Result<Vec<HashMap<String, Value>>> {
707        let (session_ctx, planner, _prop_manager_arc) =
708            self.create_datafusion_planner(prop_manager, params).await?;
709
710        // Plan with full property access ("*") for all merge variables so that
711        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
712        let extra: HashMap<String, HashSet<String>> = merge_variables
713            .iter()
714            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
715            .collect();
716        let execution_plan = planner.plan_with_properties(&plan, extra)?;
717        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
718
719        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
720        let flat_rows = self.record_batches_to_rows(all_batches)?;
721
722        // Group dotted columns into per-variable Maps for MERGE's match logic.
723        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
724        let rows = flat_rows
725            .into_iter()
726            .map(|mut row| {
727                for var in &merge_variables {
728                    // Skip if already materialized (e.g., by record_batches_to_rows)
729                    if row.contains_key(var) {
730                        continue;
731                    }
732                    let prefix = format!("{}.", var);
733                    let dotted_keys: Vec<String> = row
734                        .keys()
735                        .filter(|k| k.starts_with(&prefix))
736                        .cloned()
737                        .collect();
738                    if !dotted_keys.is_empty() {
739                        let mut map = HashMap::new();
740                        for key in dotted_keys {
741                            let prop_name = key[prefix.len()..].to_string();
742                            if let Some(val) = row.remove(&key) {
743                                map.insert(prop_name, val);
744                            }
745                        }
746                        row.insert(var.clone(), Value::Map(map));
747                    }
748                }
749                row
750            })
751            .collect();
752
753        Ok(rows)
754    }
755
756    /// Converts DataFusion RecordBatches to row-based HashMap format.
757    ///
758    /// Handles special metadata on fields:
759    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
760    ///
761    /// Also normalizes path structures to user-facing format (converts _vid to _id).
762    pub(crate) fn record_batches_to_rows(
763        &self,
764        batches: Vec<RecordBatch>,
765    ) -> Result<Vec<HashMap<String, Value>>> {
766        let mut rows = Vec::new();
767
768        for batch in batches {
769            let num_rows = batch.num_rows();
770            let schema = batch.schema();
771
772            for row_idx in 0..num_rows {
773                let mut row = HashMap::new();
774
775                for (col_idx, field) in schema.fields().iter().enumerate() {
776                    let column = batch.column(col_idx);
777                    // Recover the Uni DataType so LargeBinary/struct columns decode
778                    // correctly. Raw `Bytes` columns are tagged with `uni_raw_bytes` at
779                    // scan time (scan.rs `property_field`) because `Bytes`, `CypherValue`,
780                    // and `Duration` all map to Arrow `LargeBinary` and are otherwise
781                    // indistinguishable here; without this hint a raw `Bytes` value would
782                    // be run through the CypherValue codec and corrupted (issue #93).
783                    let data_type = if field
784                        .metadata()
785                        .get("uni_raw_bytes")
786                        .is_some_and(|v| v == "true")
787                    {
788                        Some(&uni_common::DataType::Bytes)
789                    } else if uni_common::core::schema::is_datetime_struct(field.data_type()) {
790                        Some(&uni_common::DataType::DateTime)
791                    } else if uni_common::core::schema::is_time_struct(field.data_type()) {
792                        Some(&uni_common::DataType::Time)
793                    } else {
794                        None
795                    };
796                    let mut value =
797                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
798
799                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
800                    // Parse JSON string to restore the original type
801                    if field
802                        .metadata()
803                        .get("cv_encoded")
804                        .is_some_and(|v| v == "true")
805                        && let Value::String(s) = &value
806                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
807                    {
808                        value = Value::from(parsed);
809                    }
810
811                    // Normalize path structures to user-facing format
812                    value = Self::normalize_path_if_needed(value);
813
814                    row.insert(field.name().clone(), value);
815                }
816
817                // Merge system fields into bare variable maps.
818                // The projection step emits helper columns like "n._vid" and "n._labels"
819                // alongside the materialized "n" column (a Map of user properties).
820                // Here we merge those system fields into the map and remove the helpers.
821                //
822                // For search procedures (vector/FTS), the bare variable may be a VID
823                // string placeholder rather than a Map. In that case, promote it to a
824                // Map so we can merge system fields and properties into it.
825                let bare_vars: Vec<String> = row
826                    .keys()
827                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
828                    .cloned()
829                    .collect();
830
831                // Detect VID-placeholder variables that have system columns
832                // (e.g., "node" is String("1") but "node._vid" exists).
833                // Promote these to Maps so system fields can be merged in.
834                let vid_placeholder_vars: Vec<String> = row
835                    .keys()
836                    .filter(|k| {
837                        !k.contains('.')
838                            && matches!(row.get(*k), Some(Value::String(_)))
839                            && row.contains_key(&format!("{}._vid", k))
840                    })
841                    .cloned()
842                    .collect();
843
844                for var in &vid_placeholder_vars {
845                    promote_vid_placeholder(&mut row, var);
846                }
847
848                for var in &bare_vars {
849                    merge_dotted_columns(&mut row, var);
850                }
851
852                rows.push(row);
853            }
854        }
855
856        Ok(rows)
857    }
858
859    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
860    ///
861    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
862    /// Other values are returned unchanged to avoid interfering with query execution.
863    fn normalize_path_if_needed(value: Value) -> Value {
864        match value {
865            Value::Map(map)
866                if map.contains_key("nodes")
867                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
868            {
869                Self::normalize_path_map(map)
870            }
871            other => other,
872        }
873    }
874
875    /// Normalize a path map object.
876    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
877        // Normalize nodes array
878        if let Some(Value::List(nodes)) = map.remove("nodes") {
879            let normalized_nodes: Vec<Value> = nodes
880                .into_iter()
881                .map(|n| {
882                    if let Value::Map(node_map) = n {
883                        Self::normalize_path_node_map(node_map)
884                    } else {
885                        n
886                    }
887                })
888                .collect();
889            map.insert("nodes".to_string(), Value::List(normalized_nodes));
890        }
891
892        // Normalize relationships array (may be called "relationships" or "edges")
893        let rels_key = if map.contains_key("relationships") {
894            "relationships"
895        } else {
896            "edges"
897        };
898        if let Some(Value::List(rels)) = map.remove(rels_key) {
899            let normalized_rels: Vec<Value> = rels
900                .into_iter()
901                .map(|r| {
902                    if let Value::Map(rel_map) = r {
903                        Self::normalize_path_edge_map(rel_map)
904                    } else {
905                        r
906                    }
907                })
908                .collect();
909            map.insert("relationships".to_string(), Value::List(normalized_rels));
910        }
911
912        Value::Map(map)
913    }
914
915    /// Convert a Value to its string representation for path normalization.
916    fn value_to_id_string(val: Value) -> String {
917        match val {
918            Value::Int(n) => n.to_string(),
919            Value::Float(n) => n.to_string(),
920            Value::String(s) => s,
921            other => other.to_string(),
922        }
923    }
924
925    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
926    /// When `src_key == dst_key`, this simply stringifies the value in place.
927    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
928        if let Some(val) = map.remove(src_key) {
929            map.insert(
930                dst_key.to_string(),
931                Value::String(Self::value_to_id_string(val)),
932            );
933        }
934    }
935
936    /// Ensure the "properties" field is a non-null map.
937    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
938        match map.get("properties") {
939            Some(props) if !props.is_null() => {}
940            _ => {
941                map.insert("properties".to_string(), Value::Map(HashMap::new()));
942            }
943        }
944    }
945
946    /// Normalize a node within a path to user-facing format.
947    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
948        Self::stringify_map_field(&mut map, "_vid", "_id");
949        Self::ensure_properties_map(&mut map);
950        Value::Map(map)
951    }
952
953    /// Normalize an edge within a path to user-facing format.
954    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
955        Self::stringify_map_field(&mut map, "_eid", "_id");
956        Self::stringify_map_field(&mut map, "_src", "_src");
957        Self::stringify_map_field(&mut map, "_dst", "_dst");
958
959        if let Some(type_name) = map.remove("_type_name") {
960            map.insert("_type".to_string(), type_name);
961        }
962
963        Self::ensure_properties_map(&mut map);
964        Value::Map(map)
965    }
966
967    #[instrument(
968        skip(self, prop_manager, params),
969        fields(rows_returned, duration_ms),
970        level = "info"
971    )]
972    pub fn execute<'a>(
973        &'a self,
974        plan: LogicalPlan,
975        prop_manager: &'a PropertyManager,
976        params: &'a HashMap<String, Value>,
977    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
978        Box::pin(async move {
979            let query_type = Self::get_plan_type(&plan);
980            let ctx = self.get_context().await;
981            let start = Instant::now();
982
983            // Route DDL/Admin queries to the fallback executor.
984            // All other queries (including similar_to) flow through DataFusion.
985            let res = if Self::is_ddl_or_admin(&plan) {
986                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
987                    .await
988            } else {
989                let batches = self.execute_datafusion(plan, prop_manager, params).await?;
990                self.record_batches_to_rows(batches)
991            };
992
993            let duration = start.elapsed();
994            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
995                .record(duration.as_secs_f64());
996
997            tracing::Span::current().record("duration_ms", duration.as_millis());
998            match &res {
999                Ok(rows) => {
1000                    tracing::Span::current().record("rows_returned", rows.len());
1001                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
1002                        .increment(rows.len() as u64);
1003                }
1004                Err(e) => {
1005                    let error_type = if e.to_string().contains("timed out") {
1006                        "timeout"
1007                    } else if e.to_string().contains("syntax") {
1008                        "syntax"
1009                    } else {
1010                        "execution"
1011                    };
1012                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
1013                }
1014            }
1015
1016            res
1017        })
1018    }
1019
1020    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1021        match plan {
1022            LogicalPlan::Scan { .. } => "read_scan",
1023            LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1024            LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1025            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1026            LogicalPlan::Traverse { .. } => "read_traverse",
1027            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1028            LogicalPlan::ScanAll { .. } => "read_scan_all",
1029            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1030            LogicalPlan::VectorKnn { .. } => "read_vector",
1031            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1032            LogicalPlan::Merge { .. } => "write_merge",
1033            LogicalPlan::Delete { .. } => "write_delete",
1034            LogicalPlan::Set { .. } => "write_set",
1035            LogicalPlan::Remove { .. } => "write_remove",
1036            LogicalPlan::ProcedureCall { .. } => "call",
1037            LogicalPlan::Copy { .. } => "copy",
1038            LogicalPlan::Backup { .. } => "backup",
1039            _ => "other",
1040        }
1041    }
1042
1043    /// Return all direct child plan references from a `LogicalPlan`.
1044    ///
1045    /// This centralizes the variant→children mapping so that recursive walkers
1046    /// (e.g., `contains_write_operations`) can delegate the
1047    /// "recurse into children" logic instead of duplicating the match arms.
1048    ///
1049    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
1050    /// is not included because it requires special iteration. Callers that
1051    /// need to inspect the body should handle `Foreach` before falling through.
1052    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1053        match plan {
1054            // Single-input wrappers
1055            LogicalPlan::Project { input, .. }
1056            | LogicalPlan::Sort { input, .. }
1057            | LogicalPlan::Limit { input, .. }
1058            | LogicalPlan::Distinct { input }
1059            | LogicalPlan::Aggregate { input, .. }
1060            | LogicalPlan::Window { input, .. }
1061            | LogicalPlan::Unwind { input, .. }
1062            | LogicalPlan::Filter { input, .. }
1063            | LogicalPlan::Create { input, .. }
1064            | LogicalPlan::CreateBatch { input, .. }
1065            | LogicalPlan::Set { input, .. }
1066            | LogicalPlan::Remove { input, .. }
1067            | LogicalPlan::Delete { input, .. }
1068            | LogicalPlan::Merge { input, .. }
1069            | LogicalPlan::Foreach { input, .. }
1070            | LogicalPlan::Traverse { input, .. }
1071            | LogicalPlan::TraverseMainByType { input, .. }
1072            | LogicalPlan::BindZeroLengthPath { input, .. }
1073            | LogicalPlan::BindPath { input, .. }
1074            | LogicalPlan::ShortestPath { input, .. }
1075            | LogicalPlan::AllShortestPaths { input, .. }
1076            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1077
1078            // Two-input wrappers
1079            LogicalPlan::Apply {
1080                input, subquery, ..
1081            }
1082            | LogicalPlan::SubqueryCall { input, subquery } => {
1083                vec![input.as_ref(), subquery.as_ref()]
1084            }
1085            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1086                vec![left.as_ref(), right.as_ref()]
1087            }
1088            LogicalPlan::RecursiveCTE {
1089                initial, recursive, ..
1090            } => vec![initial.as_ref(), recursive.as_ref()],
1091            LogicalPlan::QuantifiedPattern {
1092                input,
1093                pattern_plan,
1094                ..
1095            } => vec![input.as_ref(), pattern_plan.as_ref()],
1096
1097            // Leaf nodes (scans, DDL, admin, etc.)
1098            _ => vec![],
1099        }
1100    }
1101
1102    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
1103    ///
1104    /// These operations don't produce data streams and aren't supported by the
1105    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
1106    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
1107    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
1108    pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1109        match plan {
1110            // DDL / schema operations
1111            LogicalPlan::CreateLabel(_)
1112            | LogicalPlan::CreateEdgeType(_)
1113            | LogicalPlan::AlterLabel(_)
1114            | LogicalPlan::AlterEdgeType(_)
1115            | LogicalPlan::DropLabel(_)
1116            | LogicalPlan::DropEdgeType(_)
1117            | LogicalPlan::CreateConstraint(_)
1118            | LogicalPlan::DropConstraint(_)
1119            | LogicalPlan::ShowConstraints(_) => true,
1120
1121            // Index operations
1122            LogicalPlan::CreateVectorIndex { .. }
1123            | LogicalPlan::CreateSparseIndex { .. }
1124            | LogicalPlan::CreateFullTextIndex { .. }
1125            | LogicalPlan::CreateScalarIndex { .. }
1126            | LogicalPlan::CreateJsonFtsIndex { .. }
1127            | LogicalPlan::DropIndex { .. }
1128            | LogicalPlan::ShowIndexes { .. } => true,
1129
1130            // Admin / utility operations
1131            LogicalPlan::ShowDatabase
1132            | LogicalPlan::ShowConfig
1133            | LogicalPlan::ShowStatistics
1134            | LogicalPlan::Vacuum
1135            | LogicalPlan::Checkpoint
1136            | LogicalPlan::Copy { .. }
1137            | LogicalPlan::CopyTo { .. }
1138            | LogicalPlan::CopyFrom { .. }
1139            | LogicalPlan::Backup { .. }
1140            | LogicalPlan::Explain { .. } => true,
1141
1142            // Procedure calls: DF-eligible procedures go through DataFusion,
1143            // everything else (DDL, admin, unknown) stays on fallback.
1144            LogicalPlan::ProcedureCall { procedure_name, .. } => {
1145                !Self::is_df_eligible_procedure(procedure_name)
1146            }
1147
1148            // Recurse through children using plan_children
1149            _ => Self::plan_children(plan)
1150                .iter()
1151                .any(|child| Self::is_ddl_or_admin(child)),
1152        }
1153    }
1154
1155    /// Returns `true` if the procedure is a read-only, data-producing procedure
1156    /// that can be executed through the DataFusion engine.
1157    ///
1158    /// This is a **positive allowlist** — unknown procedures default to the
1159    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
1160    fn is_df_eligible_procedure(name: &str) -> bool {
1161        matches!(
1162            name,
1163            "uni.schema.labels"
1164                | "uni.schema.edgeTypes"
1165                | "uni.schema.relationshipTypes"
1166                | "uni.schema.indexes"
1167                | "uni.schema.constraints"
1168                | "uni.schema.labelInfo"
1169                | "uni.vector.query"
1170                | "uni.fts.query"
1171                | "uni.sparse.query"
1172                | "uni.search"
1173                // M5g — `uni.create.vNode` produces a typed Node yield
1174                // via the planner-level node-shape expansion in
1175                // `GraphProcedureCallExec`; only the DataFusion path
1176                // honours that, so route it there explicitly. (vEdge
1177                // emits a self-contained Struct column and works on
1178                // either path — list both for symmetry.)
1179                | "uni.create.vNode"
1180                | "uni.create.vEdge"
1181        ) || name.starts_with("uni.algo.")
1182    }
1183
1184    /// Check if a plan contains write/mutation operations anywhere in the tree.
1185    ///
1186    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
1187    /// are used to determine when a MutationContext needs to be built for DataFusion.
1188    /// This recurses through read-only wrapper nodes to detect writes nested inside
1189    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
1190    fn contains_write_operations(plan: &LogicalPlan) -> bool {
1191        match plan {
1192            LogicalPlan::Create { .. }
1193            | LogicalPlan::CreateBatch { .. }
1194            | LogicalPlan::Merge { .. }
1195            | LogicalPlan::Delete { .. }
1196            | LogicalPlan::Set { .. }
1197            | LogicalPlan::Remove { .. }
1198            | LogicalPlan::Foreach { .. } => true,
1199            _ => Self::plan_children(plan)
1200                .iter()
1201                .any(|child| Self::contains_write_operations(child)),
1202        }
1203    }
1204
1205    /// Executes a query as a stream of result batches.
1206    ///
1207    /// Routes DDL/Admin through the fallback executor and everything else
1208    /// through DataFusion.
1209    pub fn execute_stream(
1210        self,
1211        plan: LogicalPlan,
1212        prop_manager: Arc<PropertyManager>,
1213        params: HashMap<String, Value>,
1214    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1215        let this = self;
1216        let this_for_ctx = this.clone();
1217
1218        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1219
1220        ctx_stream
1221            .flat_map(move |ctx| {
1222                let plan = plan.clone();
1223                let this = this.clone();
1224                let prop_manager = prop_manager.clone();
1225                let params = params.clone();
1226
1227                let fut = async move {
1228                    if Self::is_ddl_or_admin(&plan) {
1229                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1230                            .await
1231                    } else {
1232                        let batches = this
1233                            .execute_datafusion(plan, &prop_manager, &params)
1234                            .await?;
1235                        this.record_batches_to_rows(batches)
1236                    }
1237                };
1238                stream::once(fut).boxed()
1239            })
1240            .boxed()
1241    }
1242
1243    /// Converts an Arrow array element at a given row index to a Value.
1244    /// Delegates to the shared implementation in arrow_convert module.
1245    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1246        arrow_convert::arrow_to_value(col, row, None)
1247    }
1248
1249    pub(crate) fn evaluate_expr<'a>(
1250        &'a self,
1251        expr: &'a Expr,
1252        row: &'a HashMap<String, Value>,
1253        prop_manager: &'a PropertyManager,
1254        params: &'a HashMap<String, Value>,
1255        ctx: Option<&'a QueryContext>,
1256    ) -> BoxFuture<'a, Result<Value>> {
1257        let this = self;
1258        Box::pin(async move {
1259            // First check if the expression itself is already pre-computed in the row
1260            let repr = expr.to_string_repr();
1261            if let Some(val) = row.get(&repr) {
1262                return Ok(val.clone());
1263            }
1264
1265            match expr {
1266                Expr::PatternComprehension { .. } => {
1267                    // Handled by DataFusion path via PatternComprehensionExecExpr
1268                    Err(anyhow::anyhow!(
1269                        "Pattern comprehensions are handled by DataFusion executor"
1270                    ))
1271                }
1272                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1273                    "COLLECT subqueries not yet supported in executor"
1274                )),
1275                Expr::Variable(name) => {
1276                    if let Some(val) = row.get(name) {
1277                        Ok(val.clone())
1278                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1279                        // Fallback: scan results may have system columns like "d._vid"
1280                        // without a materialized "d" Map. Return the VID so Property
1281                        // evaluation can fetch properties from storage.
1282                        Ok(vid_val.clone())
1283                    } else {
1284                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1285                    }
1286                }
1287                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1288                Expr::Property(var_expr, prop_name) => {
1289                    // Fast path: if the base is a Variable, try flat-key lookup first.
1290                    // DataFusion scan results use flat keys like "d.embedding" rather than
1291                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1292                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1293                        let flat_key = format!("{}.{}", var_name, prop_name);
1294                        if let Some(val) = row.get(flat_key.as_str()) {
1295                            return Ok(val.clone());
1296                        }
1297                    }
1298
1299                    let base_val = this
1300                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1301                        .await?;
1302
1303                    // Handle system properties _vid and _id directly
1304                    if (prop_name == "_vid" || prop_name == "_id")
1305                        && let Ok(vid) = Self::vid_from_value(&base_val)
1306                    {
1307                        return Ok(Value::Int(vid.as_u64() as i64));
1308                    }
1309
1310                    // Handle Value::Node - access properties directly or via prop manager
1311                    if let Value::Node(node) = &base_val {
1312                        // Handle system properties
1313                        if prop_name == "_vid" || prop_name == "_id" {
1314                            return Ok(Value::Int(node.vid.as_u64() as i64));
1315                        }
1316                        if prop_name == "_labels" {
1317                            return Ok(Value::List(
1318                                node.labels
1319                                    .iter()
1320                                    .map(|l| Value::String(l.clone()))
1321                                    .collect(),
1322                            ));
1323                        }
1324                        // Check in-memory properties first
1325                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1326                            return Ok(val.clone());
1327                        }
1328                        // Fallback to storage lookup
1329                        if let Ok(val) = prop_manager
1330                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1331                            .await
1332                        {
1333                            return Ok(val);
1334                        }
1335                        return Ok(Value::Null);
1336                    }
1337
1338                    // Handle Value::Edge - access properties directly or via prop manager
1339                    if let Value::Edge(edge) = &base_val {
1340                        // Handle system properties
1341                        if prop_name == "_eid" || prop_name == "_id" {
1342                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1343                        }
1344                        if prop_name == "_type" {
1345                            return Ok(Value::String(edge.edge_type.clone()));
1346                        }
1347                        if prop_name == "_src" {
1348                            return Ok(Value::Int(edge.src.as_u64() as i64));
1349                        }
1350                        if prop_name == "_dst" {
1351                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1352                        }
1353                        // Check in-memory properties first
1354                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1355                            return Ok(val.clone());
1356                        }
1357                        // Fallback to storage lookup
1358                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1359                        {
1360                            return Ok(val);
1361                        }
1362                        return Ok(Value::Null);
1363                    }
1364
1365                    // If base_val is an object (node/edge), check its properties first
1366                    // This handles properties from CREATE/MERGE that may not be persisted yet
1367                    if let Value::Map(map) = &base_val {
1368                        // First check top-level (for system properties like _id, _label, etc.)
1369                        if let Some(val) = map.get(prop_name.as_str()) {
1370                            return Ok(val.clone());
1371                        }
1372                        // Then check inside "properties" object (for user properties)
1373                        if let Some(Value::Map(props)) = map.get("properties")
1374                            && let Some(val) = props.get(prop_name.as_str())
1375                        {
1376                            return Ok(val.clone());
1377                        }
1378                        // Fallback to storage lookup using _vid or _id
1379                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1380                            map.get("_id")
1381                                .and_then(|v| v.as_str())
1382                                .and_then(|s| s.parse::<u64>().ok())
1383                        });
1384                        if let Some(id) = vid_opt {
1385                            let vid = Vid::from(id);
1386                            if let Ok(val) = prop_manager
1387                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1388                                .await
1389                            {
1390                                return Ok(val);
1391                            }
1392                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1393                            let eid = uni_common::core::id::Eid::from(id);
1394                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1395                                return Ok(val);
1396                            }
1397                        }
1398                        return Ok(Value::Null);
1399                    }
1400
1401                    // If base_val is just a VID, fetch from property manager
1402                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1403                        return prop_manager
1404                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1405                            .await;
1406                    }
1407
1408                    if base_val.is_null() {
1409                        return Ok(Value::Null);
1410                    }
1411
1412                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1413                    {
1414                        use crate::query::datetime::{
1415                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1416                            is_duration_string, is_temporal_accessor, is_temporal_string,
1417                        };
1418
1419                        // Handle Value::Temporal directly (no string parsing needed)
1420                        if let Value::Temporal(tv) = &base_val {
1421                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1422                                if is_duration_accessor(prop_name) {
1423                                    // Convert to string for the existing accessor logic
1424                                    return eval_duration_accessor(
1425                                        &base_val.to_string(),
1426                                        prop_name,
1427                                    );
1428                                }
1429                            } else if is_temporal_accessor(prop_name) {
1430                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1431                            }
1432                        }
1433
1434                        // Handle Value::String temporal (backward compat)
1435                        if let Value::String(s) = &base_val {
1436                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1437                                return eval_temporal_accessor(s, prop_name);
1438                            }
1439                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1440                                return eval_duration_accessor(s, prop_name);
1441                            }
1442                        }
1443                    }
1444
1445                    Err(anyhow!(
1446                        "Cannot access property '{}' on {:?}",
1447                        prop_name,
1448                        base_val
1449                    ))
1450                }
1451                Expr::ArrayIndex {
1452                    array: arr_expr,
1453                    index: idx_expr,
1454                } => {
1455                    let arr_val = this
1456                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1457                        .await?;
1458                    let idx_val = this
1459                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1460                        .await?;
1461
1462                    if let Value::List(arr) = &arr_val {
1463                        // Handle signed indices (allow negative)
1464                        if let Some(i) = idx_val.as_i64() {
1465                            let idx = if i < 0 {
1466                                // Negative index: -1 = last element, -2 = second to last, etc.
1467                                let positive_idx = arr.len() as i64 + i;
1468                                if positive_idx < 0 {
1469                                    return Ok(Value::Null); // Out of bounds
1470                                }
1471                                positive_idx as usize
1472                            } else {
1473                                i as usize
1474                            };
1475                            if idx < arr.len() {
1476                                return Ok(arr[idx].clone());
1477                            }
1478                            return Ok(Value::Null);
1479                        } else if idx_val.is_null() {
1480                            return Ok(Value::Null);
1481                        } else {
1482                            return Err(anyhow::anyhow!(
1483                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1484                                idx_val
1485                            ));
1486                        }
1487                    }
1488                    if let Value::Map(map) = &arr_val {
1489                        if let Some(key) = idx_val.as_str() {
1490                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1491                        } else if !idx_val.is_null() {
1492                            return Err(anyhow::anyhow!(
1493                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1494                                idx_val
1495                            ));
1496                        }
1497                    }
1498                    // Handle bracket access on Node: n['name'] returns property
1499                    if let Value::Node(node) = &arr_val {
1500                        if let Some(key) = idx_val.as_str() {
1501                            // Check in-memory properties first
1502                            if let Some(val) = node.properties.get(key) {
1503                                return Ok(val.clone());
1504                            }
1505                            // Fallback to property manager
1506                            if let Ok(val) = prop_manager
1507                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1508                                .await
1509                            {
1510                                return Ok(val);
1511                            }
1512                            return Ok(Value::Null);
1513                        } else if !idx_val.is_null() {
1514                            return Err(anyhow::anyhow!(
1515                                "TypeError: Node index must be a string, got: {:?}",
1516                                idx_val
1517                            ));
1518                        }
1519                    }
1520                    // Handle bracket access on Edge: e['property'] returns property
1521                    if let Value::Edge(edge) = &arr_val {
1522                        if let Some(key) = idx_val.as_str() {
1523                            // Check in-memory properties first
1524                            if let Some(val) = edge.properties.get(key) {
1525                                return Ok(val.clone());
1526                            }
1527                            // Fallback to property manager
1528                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1529                                return Ok(val);
1530                            }
1531                            return Ok(Value::Null);
1532                        } else if !idx_val.is_null() {
1533                            return Err(anyhow::anyhow!(
1534                                "TypeError: Edge index must be a string, got: {:?}",
1535                                idx_val
1536                            ));
1537                        }
1538                    }
1539                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1540                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1541                        && let Some(key) = idx_val.as_str()
1542                    {
1543                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1544                        {
1545                            return Ok(val);
1546                        }
1547                        return Ok(Value::Null);
1548                    }
1549                    if arr_val.is_null() {
1550                        return Ok(Value::Null);
1551                    }
1552                    Err(anyhow!(
1553                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1554                        arr_val
1555                    ))
1556                }
1557                Expr::ArraySlice { array, start, end } => {
1558                    let arr_val = this
1559                        .evaluate_expr(array, row, prop_manager, params, ctx)
1560                        .await?;
1561
1562                    if let Value::List(arr) = &arr_val {
1563                        let len = arr.len();
1564
1565                        // Evaluate start index (default to 0), null → null result
1566                        let start_idx = if let Some(s) = start {
1567                            let v = this
1568                                .evaluate_expr(s, row, prop_manager, params, ctx)
1569                                .await?;
1570                            if v.is_null() {
1571                                return Ok(Value::Null);
1572                            }
1573                            let raw = v.as_i64().unwrap_or(0);
1574                            if raw < 0 {
1575                                (len as i64 + raw).max(0) as usize
1576                            } else {
1577                                (raw as usize).min(len)
1578                            }
1579                        } else {
1580                            0
1581                        };
1582
1583                        // Evaluate end index (default to length), null → null result
1584                        let end_idx = if let Some(e) = end {
1585                            let v = this
1586                                .evaluate_expr(e, row, prop_manager, params, ctx)
1587                                .await?;
1588                            if v.is_null() {
1589                                return Ok(Value::Null);
1590                            }
1591                            let raw = v.as_i64().unwrap_or(len as i64);
1592                            if raw < 0 {
1593                                (len as i64 + raw).max(0) as usize
1594                            } else {
1595                                (raw as usize).min(len)
1596                            }
1597                        } else {
1598                            len
1599                        };
1600
1601                        // Return sliced array
1602                        if start_idx >= end_idx {
1603                            return Ok(Value::List(vec![]));
1604                        }
1605                        let end_idx = end_idx.min(len);
1606                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1607                    }
1608
1609                    if arr_val.is_null() {
1610                        return Ok(Value::Null);
1611                    }
1612                    Err(anyhow!("Cannot slice {:?}", arr_val))
1613                }
1614                Expr::Literal(lit) => Ok(lit.to_value()),
1615                Expr::List(items) => {
1616                    let mut vals = Vec::new();
1617                    for item in items {
1618                        vals.push(
1619                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1620                                .await?,
1621                        );
1622                    }
1623                    Ok(Value::List(vals))
1624                }
1625                Expr::Map(items) => {
1626                    let mut map = HashMap::new();
1627                    for (key, value_expr) in items {
1628                        let val = this
1629                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1630                            .await?;
1631                        map.insert(key.clone(), val);
1632                    }
1633                    Ok(Value::Map(map))
1634                }
1635                Expr::Exists { query, .. } => {
1636                    // Plan and execute subquery; failures return false (pattern doesn't match)
1637                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1638                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1639
1640                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1641                        Ok(plan) => {
1642                            let mut sub_params = params.clone();
1643                            sub_params.extend(row.clone());
1644
1645                            match this.execute(plan, prop_manager, &sub_params).await {
1646                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1647                                Err(e) => {
1648                                    log::debug!("EXISTS subquery execution failed: {}", e);
1649                                    Ok(Value::Bool(false))
1650                                }
1651                            }
1652                        }
1653                        Err(e) => {
1654                            log::debug!("EXISTS subquery planning failed: {}", e);
1655                            Ok(Value::Bool(false))
1656                        }
1657                    }
1658                }
1659                Expr::CountSubquery(query) => {
1660                    // Similar to Exists but returns count
1661                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1662
1663                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1664
1665                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1666                        Ok(plan) => {
1667                            let mut sub_params = params.clone();
1668                            sub_params.extend(row.clone());
1669
1670                            match this.execute(plan, prop_manager, &sub_params).await {
1671                                Ok(results) => Ok(Value::from(results.len() as i64)),
1672                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1673                            }
1674                        }
1675                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1676                    }
1677                }
1678                Expr::Quantifier {
1679                    quantifier,
1680                    variable,
1681                    list,
1682                    predicate,
1683                } => {
1684                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1685                    //
1686                    // This is the primary execution path for quantifiers because DataFusion
1687                    // does not support lambda functions yet. Queries with quantifiers attempt
1688                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1689                    // to this fallback executor path.
1690                    //
1691                    // This is intentional design - we get correct semantics with row-by-row
1692                    // evaluation until DataFusion adds lambda support.
1693                    //
1694                    // See: https://github.com/apache/datafusion/issues/14205
1695
1696                    // Evaluate the list expression
1697                    let list_val = this
1698                        .evaluate_expr(list, row, prop_manager, params, ctx)
1699                        .await?;
1700
1701                    // Handle null propagation
1702                    if list_val.is_null() {
1703                        return Ok(Value::Null);
1704                    }
1705
1706                    // Convert to array
1707                    let items = match list_val {
1708                        Value::List(arr) => arr,
1709                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1710                    };
1711
1712                    // Evaluate predicate for each item
1713                    let mut satisfied_count = 0;
1714                    for item in &items {
1715                        // Create new row with bound variable
1716                        let mut item_row = row.clone();
1717                        item_row.insert(variable.clone(), item.clone());
1718
1719                        // Evaluate predicate with bound variable
1720                        let pred_result = this
1721                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1722                            .await?;
1723
1724                        // Check if predicate is satisfied
1725                        if let Value::Bool(true) = pred_result {
1726                            satisfied_count += 1;
1727                        }
1728                    }
1729
1730                    // Return based on quantifier type
1731                    let result = match quantifier {
1732                        Quantifier::All => satisfied_count == items.len(),
1733                        Quantifier::Any => satisfied_count > 0,
1734                        Quantifier::Single => satisfied_count == 1,
1735                        Quantifier::None => satisfied_count == 0,
1736                    };
1737
1738                    Ok(Value::Bool(result))
1739                }
1740                Expr::ListComprehension {
1741                    variable,
1742                    list,
1743                    where_clause,
1744                    map_expr,
1745                } => {
1746                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1747                    //
1748                    // Similar to quantifiers, this requires lambda-like evaluation
1749                    // which DataFusion doesn't support yet. This is the primary execution path.
1750
1751                    // Evaluate the list expression
1752                    let list_val = this
1753                        .evaluate_expr(list, row, prop_manager, params, ctx)
1754                        .await?;
1755
1756                    // Handle null propagation
1757                    if list_val.is_null() {
1758                        return Ok(Value::Null);
1759                    }
1760
1761                    // Convert to array
1762                    let items = match list_val {
1763                        Value::List(arr) => arr,
1764                        _ => {
1765                            return Err(anyhow!(
1766                                "List comprehension expects a list, got: {:?}",
1767                                list_val
1768                            ));
1769                        }
1770                    };
1771
1772                    // Collect mapped values
1773                    let mut results = Vec::new();
1774                    for item in &items {
1775                        // Create new row with bound variable
1776                        let mut item_row = row.clone();
1777                        item_row.insert(variable.clone(), item.clone());
1778
1779                        // Apply WHERE filter if present
1780                        if let Some(predicate) = where_clause {
1781                            let pred_result = this
1782                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1783                                .await?;
1784
1785                            // Skip items that don't match the filter
1786                            if !matches!(pred_result, Value::Bool(true)) {
1787                                continue;
1788                            }
1789                        }
1790
1791                        // Apply map expression
1792                        let mapped_val = this
1793                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1794                            .await?;
1795                        results.push(mapped_val);
1796                    }
1797
1798                    Ok(Value::List(results))
1799                }
1800                Expr::BinaryOp { left, op, right } => {
1801                    // Short-circuit evaluation for AND/OR
1802                    match op {
1803                        BinaryOp::And => {
1804                            let l_val = this
1805                                .evaluate_expr(left, row, prop_manager, params, ctx)
1806                                .await?;
1807                            // Short-circuit: if left is false, don't evaluate right
1808                            if let Some(false) = l_val.as_bool() {
1809                                return Ok(Value::Bool(false));
1810                            }
1811                            let r_val = this
1812                                .evaluate_expr(right, row, prop_manager, params, ctx)
1813                                .await?;
1814                            eval_binary_op(&l_val, op, &r_val)
1815                        }
1816                        BinaryOp::Or => {
1817                            let l_val = this
1818                                .evaluate_expr(left, row, prop_manager, params, ctx)
1819                                .await?;
1820                            // Short-circuit: if left is true, don't evaluate right
1821                            if let Some(true) = l_val.as_bool() {
1822                                return Ok(Value::Bool(true));
1823                            }
1824                            let r_val = this
1825                                .evaluate_expr(right, row, prop_manager, params, ctx)
1826                                .await?;
1827                            eval_binary_op(&l_val, op, &r_val)
1828                        }
1829                        _ => {
1830                            // For all other operators, evaluate both sides
1831                            let l_val = this
1832                                .evaluate_expr(left, row, prop_manager, params, ctx)
1833                                .await?;
1834                            let r_val = this
1835                                .evaluate_expr(right, row, prop_manager, params, ctx)
1836                                .await?;
1837                            eval_binary_op(&l_val, op, &r_val)
1838                        }
1839                    }
1840                }
1841                Expr::In { expr, list } => {
1842                    let l_val = this
1843                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1844                        .await?;
1845                    let r_val = this
1846                        .evaluate_expr(list, row, prop_manager, params, ctx)
1847                        .await?;
1848                    eval_in_op(&l_val, &r_val)
1849                }
1850                Expr::UnaryOp { op, expr } => {
1851                    let val = this
1852                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1853                        .await?;
1854                    match op {
1855                        UnaryOp::Not => {
1856                            // Three-valued logic: NOT null = null
1857                            match val.as_bool() {
1858                                Some(b) => Ok(Value::Bool(!b)),
1859                                None if val.is_null() => Ok(Value::Null),
1860                                None => Err(anyhow!(
1861                                    "InvalidArgumentType: NOT requires a boolean argument"
1862                                )),
1863                            }
1864                        }
1865                        UnaryOp::Neg => {
1866                            if let Some(i) = val.as_i64() {
1867                                Ok(Value::Int(-i))
1868                            } else if let Some(f) = val.as_f64() {
1869                                Ok(Value::Float(-f))
1870                            } else {
1871                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1872                            }
1873                        }
1874                    }
1875                }
1876                Expr::IsNull(expr) => {
1877                    let val = this
1878                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1879                        .await?;
1880                    Ok(Value::Bool(val.is_null()))
1881                }
1882                Expr::IsNotNull(expr) => {
1883                    let val = this
1884                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1885                        .await?;
1886                    Ok(Value::Bool(!val.is_null()))
1887                }
1888                Expr::IsUnique(_) => {
1889                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1890                    Err(anyhow!(
1891                        "IS UNIQUE can only be used in constraint definitions"
1892                    ))
1893                }
1894                Expr::Case {
1895                    expr,
1896                    when_then,
1897                    else_expr,
1898                } => {
1899                    if let Some(base_expr) = expr {
1900                        let base_val = this
1901                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1902                            .await?;
1903                        for (w, t) in when_then {
1904                            let w_val = this
1905                                .evaluate_expr(w, row, prop_manager, params, ctx)
1906                                .await?;
1907                            if base_val == w_val {
1908                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1909                            }
1910                        }
1911                    } else {
1912                        for (w, t) in when_then {
1913                            let w_val = this
1914                                .evaluate_expr(w, row, prop_manager, params, ctx)
1915                                .await?;
1916                            if w_val.as_bool() == Some(true) {
1917                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1918                            }
1919                        }
1920                    }
1921                    if let Some(e) = else_expr {
1922                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1923                    }
1924                    Ok(Value::Null)
1925                }
1926                Expr::Wildcard => Ok(Value::Null),
1927                Expr::FunctionCall { name, args, .. } => {
1928                    // Special case: id() returns VID for nodes and EID for relationships
1929                    if name.eq_ignore_ascii_case("ID") {
1930                        if args.len() != 1 {
1931                            return Err(anyhow!("id() requires exactly 1 argument"));
1932                        }
1933                        let val = this
1934                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1935                            .await?;
1936                        if let Value::Map(map) = &val {
1937                            // Check for _vid (vertex) first
1938                            if let Some(vid_val) = map.get("_vid") {
1939                                return Ok(vid_val.clone());
1940                            }
1941                            // Check for _eid (edge/relationship)
1942                            if let Some(eid_val) = map.get("_eid") {
1943                                return Ok(eid_val.clone());
1944                            }
1945                            // Check for _id (fallback)
1946                            if let Some(id_val) = map.get("_id") {
1947                                return Ok(id_val.clone());
1948                            }
1949                        }
1950                        return Ok(Value::Null);
1951                    }
1952
1953                    // Special case: elementId() returns string format "label_id:local_offset"
1954                    if name.eq_ignore_ascii_case("ELEMENTID") {
1955                        if args.len() != 1 {
1956                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1957                        }
1958                        let val = this
1959                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1960                            .await?;
1961                        if let Value::Map(map) = &val {
1962                            // Check for _vid (vertex) first
1963                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1964                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1965                                return Ok(Value::String(vid_val.to_string()));
1966                            }
1967                            // Check for _eid (edge/relationship)
1968                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1969                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1970                                return Ok(Value::String(eid_val.to_string()));
1971                            }
1972                        }
1973                        return Ok(Value::Null);
1974                    }
1975
1976                    // Special case: type() returns the relationship type name
1977                    if name.eq_ignore_ascii_case("TYPE") {
1978                        if args.len() != 1 {
1979                            return Err(anyhow!("type() requires exactly 1 argument"));
1980                        }
1981                        let val = this
1982                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1983                            .await?;
1984                        if let Value::Map(map) = &val
1985                            && let Some(type_val) = map.get("_type")
1986                        {
1987                            // Numeric _type is an edge type ID; string _type is already a name
1988                            if let Some(type_id) =
1989                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1990                            {
1991                                if let Some(name) = this
1992                                    .storage
1993                                    .schema_manager()
1994                                    .edge_type_name_by_id_unified(type_id)
1995                                {
1996                                    return Ok(Value::String(name));
1997                                }
1998                            } else if let Some(name) = type_val.as_str() {
1999                                return Ok(Value::String(name.to_string()));
2000                            }
2001                        }
2002                        return Ok(Value::Null);
2003                    }
2004
2005                    // Special case: labels() returns the labels of a node
2006                    if name.eq_ignore_ascii_case("LABELS") {
2007                        if args.len() != 1 {
2008                            return Err(anyhow!("labels() requires exactly 1 argument"));
2009                        }
2010                        let val = this
2011                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2012                            .await?;
2013                        if let Value::Map(map) = &val
2014                            && let Some(labels_val) = map.get("_labels")
2015                        {
2016                            return Ok(labels_val.clone());
2017                        }
2018                        return Ok(Value::Null);
2019                    }
2020
2021                    // Special case: properties() returns the properties map of a node/edge
2022                    if name.eq_ignore_ascii_case("PROPERTIES") {
2023                        if args.len() != 1 {
2024                            return Err(anyhow!("properties() requires exactly 1 argument"));
2025                        }
2026                        let val = this
2027                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2028                            .await?;
2029                        if let Value::Map(map) = &val {
2030                            // Filter out internal properties (those starting with _)
2031                            let mut props = HashMap::new();
2032                            for (k, v) in map.iter() {
2033                                if !k.starts_with('_') {
2034                                    props.insert(k.clone(), v.clone());
2035                                }
2036                            }
2037                            return Ok(Value::Map(props));
2038                        }
2039                        return Ok(Value::Null);
2040                    }
2041
2042                    // Special case: startNode() returns the start node of a relationship
2043                    if name.eq_ignore_ascii_case("STARTNODE") {
2044                        if args.len() != 1 {
2045                            return Err(anyhow!("startNode() requires exactly 1 argument"));
2046                        }
2047                        let val = this
2048                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2049                            .await?;
2050                        if let Value::Edge(edge) = &val {
2051                            return Ok(Self::find_node_by_vid(row, edge.src));
2052                        }
2053                        if let Value::Map(map) = &val {
2054                            if let Some(start_node) = map.get("_startNode") {
2055                                return Ok(start_node.clone());
2056                            }
2057                            if let Some(src_vid) = map.get("_src_vid") {
2058                                return Ok(Value::Map(HashMap::from([(
2059                                    "_vid".to_string(),
2060                                    src_vid.clone(),
2061                                )])));
2062                            }
2063                            // Resolve _src VID by looking up node in row
2064                            if let Some(src_id) = map.get("_src")
2065                                && let Some(u) = src_id.as_u64()
2066                            {
2067                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2068                            }
2069                        }
2070                        return Ok(Value::Null);
2071                    }
2072
2073                    // Special case: endNode() returns the end node of a relationship
2074                    if name.eq_ignore_ascii_case("ENDNODE") {
2075                        if args.len() != 1 {
2076                            return Err(anyhow!("endNode() requires exactly 1 argument"));
2077                        }
2078                        let val = this
2079                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2080                            .await?;
2081                        if let Value::Edge(edge) = &val {
2082                            return Ok(Self::find_node_by_vid(row, edge.dst));
2083                        }
2084                        if let Value::Map(map) = &val {
2085                            if let Some(end_node) = map.get("_endNode") {
2086                                return Ok(end_node.clone());
2087                            }
2088                            if let Some(dst_vid) = map.get("_dst_vid") {
2089                                return Ok(Value::Map(HashMap::from([(
2090                                    "_vid".to_string(),
2091                                    dst_vid.clone(),
2092                                )])));
2093                            }
2094                            // Resolve _dst VID by looking up node in row
2095                            if let Some(dst_id) = map.get("_dst")
2096                                && let Some(u) = dst_id.as_u64()
2097                            {
2098                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2099                            }
2100                        }
2101                        return Ok(Value::Null);
2102                    }
2103
2104                    // Special case: hasLabel() checks if a node has a specific label
2105                    // Used for WHERE n:Label predicates
2106                    if name.eq_ignore_ascii_case("HASLABEL") {
2107                        if args.len() != 2 {
2108                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2109                        }
2110                        let node_val = this
2111                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2112                            .await?;
2113                        let label_val = this
2114                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2115                            .await?;
2116
2117                        let label_to_check = label_val.as_str().ok_or_else(|| {
2118                            anyhow!("Second argument to hasLabel must be a string")
2119                        })?;
2120
2121                        let has_label = match &node_val {
2122                            // Handle proper Value::Node type (from result normalization)
2123                            Value::Map(map) if map.contains_key("_vid") => {
2124                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2125                                    labels_arr
2126                                        .iter()
2127                                        .any(|l| l.as_str() == Some(label_to_check))
2128                                } else {
2129                                    false
2130                                }
2131                            }
2132                            // Also handle legacy Object format
2133                            Value::Map(map) => {
2134                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
2135                                    labels_arr
2136                                        .iter()
2137                                        .any(|l| l.as_str() == Some(label_to_check))
2138                                } else {
2139                                    false
2140                                }
2141                            }
2142                            _ => false,
2143                        };
2144                        return Ok(Value::Bool(has_label));
2145                    }
2146
2147                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
2148                    // These should be parsed as Expr::Quantifier instead.
2149                    if matches!(
2150                        name.to_uppercase().as_str(),
2151                        "ANY" | "ALL" | "NONE" | "SINGLE"
2152                    ) {
2153                        return Err(anyhow!(
2154                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2155                            name.to_lowercase()
2156                        ));
2157                    }
2158
2159                    // Special case: COALESCE needs short-circuit evaluation
2160                    if name.eq_ignore_ascii_case("COALESCE") {
2161                        for arg in args {
2162                            let val = this
2163                                .evaluate_expr(arg, row, prop_manager, params, ctx)
2164                                .await?;
2165                            if !val.is_null() {
2166                                return Ok(val);
2167                            }
2168                        }
2169                        return Ok(Value::Null);
2170                    }
2171
2172                    // Special case: vector_similarity has dedicated implementation
2173                    if name.eq_ignore_ascii_case("vector_similarity") {
2174                        if args.len() != 2 {
2175                            return Err(anyhow!("vector_similarity takes 2 arguments"));
2176                        }
2177                        let v1 = this
2178                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2179                            .await?;
2180                        let v2 = this
2181                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2182                            .await?;
2183                        return eval_vector_similarity(&v1, &v2);
2184                    }
2185
2186                    // Special case: uni.validAt handles node fetching
2187                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
2188                        || name.eq_ignore_ascii_case("uni.validAt")
2189                        || name.eq_ignore_ascii_case("validAt")
2190                    {
2191                        if args.len() != 4 {
2192                            return Err(anyhow!("validAt requires 4 arguments"));
2193                        }
2194                        let node_val = this
2195                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2196                            .await?;
2197                        let start_prop = this
2198                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2199                            .await?
2200                            .as_str()
2201                            .ok_or(anyhow!("start_prop must be string"))?
2202                            .to_string();
2203                        let end_prop = this
2204                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2205                            .await?
2206                            .as_str()
2207                            .ok_or(anyhow!("end_prop must be string"))?
2208                            .to_string();
2209                        let time_val = this
2210                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2211                            .await?;
2212
2213                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2214                            anyhow!("time argument must be a datetime value or string")
2215                        })?;
2216
2217                        // Fetch temporal property values - supports both vertices and edges
2218                        let valid_from_val: Option<Value> = if let Ok(vid) =
2219                            Self::vid_from_value(&node_val)
2220                        {
2221                            // Vertex case - VID string format
2222                            prop_manager
2223                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2224                                .await
2225                                .ok()
2226                        } else if let Value::Map(map) = &node_val {
2227                            // Check for embedded _vid or _eid in object
2228                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2229                                let vid = Vid::from(vid_val);
2230                                prop_manager
2231                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2232                                    .await
2233                                    .ok()
2234                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2235                                // Edge case
2236                                let eid = uni_common::core::id::Eid::from(eid_val);
2237                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2238                            } else {
2239                                // Inline object - property embedded directly
2240                                map.get(&start_prop).cloned()
2241                            }
2242                        } else {
2243                            return Ok(Value::Bool(false));
2244                        };
2245
2246                        let valid_from = match valid_from_val {
2247                            Some(ref v) => match value_to_datetime_utc(v) {
2248                                Some(dt) => dt,
2249                                None if v.is_null() => return Ok(Value::Bool(false)),
2250                                None => {
2251                                    return Err(anyhow!(
2252                                        "Property {} must be a datetime value or string",
2253                                        start_prop
2254                                    ));
2255                                }
2256                            },
2257                            None => return Ok(Value::Bool(false)),
2258                        };
2259
2260                        let valid_to_val: Option<Value> = if let Ok(vid) =
2261                            Self::vid_from_value(&node_val)
2262                        {
2263                            // Vertex case - VID string format
2264                            prop_manager
2265                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2266                                .await
2267                                .ok()
2268                        } else if let Value::Map(map) = &node_val {
2269                            // Check for embedded _vid or _eid in object
2270                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2271                                let vid = Vid::from(vid_val);
2272                                prop_manager
2273                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2274                                    .await
2275                                    .ok()
2276                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2277                                // Edge case
2278                                let eid = uni_common::core::id::Eid::from(eid_val);
2279                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2280                            } else {
2281                                // Inline object - property embedded directly
2282                                map.get(&end_prop).cloned()
2283                            }
2284                        } else {
2285                            return Ok(Value::Bool(false));
2286                        };
2287
2288                        let valid_to = match valid_to_val {
2289                            Some(ref v) => match value_to_datetime_utc(v) {
2290                                Some(dt) => Some(dt),
2291                                None if v.is_null() => None,
2292                                None => {
2293                                    return Err(anyhow!(
2294                                        "Property {} must be a datetime value or null",
2295                                        end_prop
2296                                    ));
2297                                }
2298                            },
2299                            None => None,
2300                        };
2301
2302                        let is_valid = valid_from <= query_time
2303                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2304                        return Ok(Value::Bool(is_valid));
2305                    }
2306
2307                    // For all other functions, evaluate arguments then call helper
2308                    let mut evaluated_args = Vec::with_capacity(args.len());
2309                    for arg in args {
2310                        let mut val = this
2311                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2312                            .await?;
2313
2314                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2315                        // Functions like validAt() need access to properties like valid_from/valid_to.
2316                        if let Value::Map(ref mut map) = val {
2317                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2318                        }
2319
2320                        evaluated_args.push(val);
2321                    }
2322                    eval_scalar_function(
2323                        name,
2324                        &evaluated_args,
2325                        self.custom_function_registry.as_deref(),
2326                    )
2327                }
2328                Expr::Reduce {
2329                    accumulator,
2330                    init,
2331                    variable,
2332                    list,
2333                    expr,
2334                } => {
2335                    let mut acc = self
2336                        .evaluate_expr(init, row, prop_manager, params, ctx)
2337                        .await?;
2338                    let list_val = self
2339                        .evaluate_expr(list, row, prop_manager, params, ctx)
2340                        .await?;
2341
2342                    if let Value::List(items) = list_val {
2343                        for item in items {
2344                            // Create a temporary scope/row with accumulator and variable
2345                            // For simplicity in fallback executor, we can construct a new row map
2346                            // merging current row + new variables.
2347                            let mut scope = row.clone();
2348                            scope.insert(accumulator.clone(), acc.clone());
2349                            scope.insert(variable.clone(), item);
2350
2351                            acc = self
2352                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2353                                .await?;
2354                        }
2355                    } else {
2356                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2357                    }
2358                    Ok(acc)
2359                }
2360                Expr::ValidAt { .. } => {
2361                    // VALID_AT should have been transformed to a function call in the planner
2362                    Err(anyhow!(
2363                        "VALID_AT expression should have been transformed to function call in planner"
2364                    ))
2365                }
2366
2367                Expr::LabelCheck { expr, labels } => {
2368                    let val = this
2369                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2370                        .await?;
2371                    match &val {
2372                        Value::Null => Ok(Value::Null),
2373                        Value::Map(map) => {
2374                            // Check if this is an edge (has _eid) or node (has _vid)
2375                            let is_edge = map.contains_key("_eid")
2376                                || map.contains_key("_type_name")
2377                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2378
2379                            if is_edge {
2380                                // Edges have a single type
2381                                if labels.len() > 1 {
2382                                    return Ok(Value::Bool(false));
2383                                }
2384                                let label_to_check = &labels[0];
2385                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2386                                {
2387                                    t == label_to_check
2388                                } else if let Some(Value::String(t)) = map.get("_type") {
2389                                    t == label_to_check
2390                                } else {
2391                                    false
2392                                };
2393                                Ok(Value::Bool(has_type))
2394                            } else {
2395                                // Node: check all labels
2396                                let has_all = labels.iter().all(|label_to_check| {
2397                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2398                                        labels_arr
2399                                            .iter()
2400                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2401                                    } else {
2402                                        false
2403                                    }
2404                                });
2405                                Ok(Value::Bool(has_all))
2406                            }
2407                        }
2408                        _ => Ok(Value::Bool(false)),
2409                    }
2410                }
2411
2412                Expr::MapProjection { base, items } => {
2413                    let base_value = this
2414                        .evaluate_expr(base, row, prop_manager, params, ctx)
2415                        .await?;
2416
2417                    // Extract properties from the base object
2418                    let properties = match &base_value {
2419                        Value::Map(map) => map,
2420                        _ => {
2421                            return Err(anyhow!(
2422                                "Map projection requires object, got {:?}",
2423                                base_value
2424                            ));
2425                        }
2426                    };
2427
2428                    let mut result_map = HashMap::new();
2429
2430                    for item in items {
2431                        match item {
2432                            MapProjectionItem::Property(prop) => {
2433                                if let Some(value) = properties.get(prop.as_str()) {
2434                                    result_map.insert(prop.clone(), value.clone());
2435                                }
2436                            }
2437                            MapProjectionItem::AllProperties => {
2438                                // Include all properties except internal fields (those starting with _)
2439                                for (key, value) in properties.iter() {
2440                                    if !key.starts_with('_') {
2441                                        result_map.insert(key.clone(), value.clone());
2442                                    }
2443                                }
2444                            }
2445                            MapProjectionItem::LiteralEntry(key, expr) => {
2446                                let value = this
2447                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2448                                    .await?;
2449                                result_map.insert(key.clone(), value);
2450                            }
2451                            MapProjectionItem::Variable(var_name) => {
2452                                // Variable selector: include the value of the variable in the result
2453                                // e.g., person{.name, friend} includes the value of 'friend' variable
2454                                if let Some(value) = row.get(var_name.as_str()) {
2455                                    result_map.insert(var_name.clone(), value.clone());
2456                                }
2457                            }
2458                        }
2459                    }
2460
2461                    Ok(Value::Map(result_map))
2462                }
2463            }
2464        })
2465    }
2466
2467    pub(crate) fn execute_subplan<'a>(
2468        &'a self,
2469        plan: LogicalPlan,
2470        prop_manager: &'a PropertyManager,
2471        params: &'a HashMap<String, Value>,
2472        ctx: Option<&'a QueryContext>,
2473    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2474        Box::pin(async move {
2475            if let Some(ctx) = ctx {
2476                ctx.check_timeout()?;
2477            }
2478            match plan {
2479                LogicalPlan::Union { left, right, all } => {
2480                    self.execute_union(left, right, all, prop_manager, params, ctx)
2481                        .await
2482                }
2483                LogicalPlan::CreateVectorIndex {
2484                    config,
2485                    if_not_exists,
2486                } => {
2487                    if if_not_exists && self.index_exists_by_name(&config.name) {
2488                        return Ok(vec![]);
2489                    }
2490                    let idx_mgr = self.storage.index_manager();
2491                    idx_mgr.create_vector_index(config).await?;
2492                    Ok(vec![])
2493                }
2494                LogicalPlan::CreateSparseIndex {
2495                    config,
2496                    if_not_exists,
2497                } => {
2498                    if if_not_exists && self.index_exists_by_name(&config.name) {
2499                        return Ok(vec![]);
2500                    }
2501                    // `create_sparse_vector_index` registers the index, backfills any
2502                    // already-flushed rows via the backend, handles the empty
2503                    // (create-before-ingest) case, and persists the schema.
2504                    let idx_mgr = self.storage.index_manager();
2505                    idx_mgr.create_sparse_vector_index(config).await?;
2506                    Ok(vec![])
2507                }
2508                LogicalPlan::CreateFullTextIndex {
2509                    config,
2510                    if_not_exists,
2511                } => {
2512                    if if_not_exists && self.index_exists_by_name(&config.name) {
2513                        return Ok(vec![]);
2514                    }
2515                    let idx_mgr = self.storage.index_manager();
2516                    idx_mgr.create_fts_index(config).await?;
2517                    Ok(vec![])
2518                }
2519                LogicalPlan::CreateScalarIndex {
2520                    mut config,
2521                    if_not_exists,
2522                } => {
2523                    if if_not_exists && self.index_exists_by_name(&config.name) {
2524                        return Ok(vec![]);
2525                    }
2526
2527                    // Check for expression indexes - create generated columns
2528                    let mut modified_properties = Vec::new();
2529
2530                    for prop in &config.properties {
2531                        // Heuristic: if contains '(' and ')', it's an expression
2532                        if prop.contains('(') && prop.contains(')') {
2533                            let gen_col = SchemaManager::generated_column_name(prop);
2534
2535                            // Add generated property to schema
2536                            let sm = self.storage.schema_manager_arc();
2537                            if let Err(e) = sm.add_generated_property(
2538                                &config.label,
2539                                &gen_col,
2540                                DataType::String, // Default type for expressions
2541                                prop.clone(),
2542                            ) {
2543                                log::warn!("Failed to add generated property (might exist): {}", e);
2544                            }
2545
2546                            modified_properties.push(gen_col);
2547                        } else {
2548                            // Simple property - use as-is
2549                            modified_properties.push(prop.clone());
2550                        }
2551                    }
2552
2553                    config.properties = modified_properties;
2554
2555                    let idx_mgr = self.storage.index_manager();
2556                    idx_mgr.create_scalar_index(config).await?;
2557                    Ok(vec![])
2558                }
2559                LogicalPlan::CreateJsonFtsIndex {
2560                    config,
2561                    if_not_exists,
2562                } => {
2563                    if if_not_exists && self.index_exists_by_name(&config.name) {
2564                        return Ok(vec![]);
2565                    }
2566                    let idx_mgr = self.storage.index_manager();
2567                    idx_mgr.create_json_fts_index(config).await?;
2568                    Ok(vec![])
2569                }
2570                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2571                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2572                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2573                LogicalPlan::Vacuum => {
2574                    self.execute_vacuum().await?;
2575                    Ok(vec![])
2576                }
2577                LogicalPlan::Checkpoint => {
2578                    self.execute_checkpoint().await?;
2579                    Ok(vec![])
2580                }
2581                LogicalPlan::CopyTo {
2582                    label,
2583                    path,
2584                    format,
2585                    options,
2586                } => {
2587                    let count = self
2588                        .execute_copy_to(&label, &path, &format, &options)
2589                        .await?;
2590                    let mut result = HashMap::new();
2591                    result.insert("count".to_string(), Value::Int(count as i64));
2592                    Ok(vec![result])
2593                }
2594                LogicalPlan::CopyFrom {
2595                    label,
2596                    path,
2597                    format,
2598                    options,
2599                } => {
2600                    let count = self
2601                        .execute_copy_from(&label, &path, &format, &options)
2602                        .await?;
2603                    let mut result = HashMap::new();
2604                    result.insert("count".to_string(), Value::Int(count as i64));
2605                    Ok(vec![result])
2606                }
2607                LogicalPlan::CreateLabel(clause) => {
2608                    self.execute_create_label(clause).await?;
2609                    Ok(vec![])
2610                }
2611                LogicalPlan::CreateEdgeType(clause) => {
2612                    self.execute_create_edge_type(clause).await?;
2613                    Ok(vec![])
2614                }
2615                LogicalPlan::AlterLabel(clause) => {
2616                    self.execute_alter_label(clause).await?;
2617                    Ok(vec![])
2618                }
2619                LogicalPlan::AlterEdgeType(clause) => {
2620                    self.execute_alter_edge_type(clause).await?;
2621                    Ok(vec![])
2622                }
2623                LogicalPlan::DropLabel(clause) => {
2624                    self.execute_drop_label(clause).await?;
2625                    Ok(vec![])
2626                }
2627                LogicalPlan::DropEdgeType(clause) => {
2628                    self.execute_drop_edge_type(clause).await?;
2629                    Ok(vec![])
2630                }
2631                LogicalPlan::CreateConstraint(clause) => {
2632                    self.execute_create_constraint(clause).await?;
2633                    Ok(vec![])
2634                }
2635                LogicalPlan::DropConstraint(clause) => {
2636                    self.execute_drop_constraint(clause).await?;
2637                    Ok(vec![])
2638                }
2639                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2640                LogicalPlan::DropIndex { name, if_exists } => {
2641                    let idx_mgr = self.storage.index_manager();
2642                    match idx_mgr.drop_index(&name).await {
2643                        Ok(_) => Ok(vec![]),
2644                        Err(e) => {
2645                            if if_exists && e.to_string().contains("not found") {
2646                                Ok(vec![])
2647                            } else {
2648                                Err(e)
2649                            }
2650                        }
2651                    }
2652                }
2653                LogicalPlan::ShowIndexes { filter } => {
2654                    Ok(self.execute_show_indexes(filter.as_deref()))
2655                }
2656                // Scan/traverse nodes: delegate to DataFusion for data access,
2657                // then convert results to HashMaps for the fallback executor.
2658                LogicalPlan::Scan { .. }
2659                | LogicalPlan::FusedIndexScan { .. }
2660                | LogicalPlan::FusedIndexScanWrapped { .. }
2661                | LogicalPlan::ExtIdLookup { .. }
2662                | LogicalPlan::ScanAll { .. }
2663                | LogicalPlan::ScanMainByLabels { .. }
2664                | LogicalPlan::Traverse { .. }
2665                | LogicalPlan::TraverseMainByType { .. } => {
2666                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2667                    self.record_batches_to_rows(batches)
2668                }
2669                LogicalPlan::Filter {
2670                    input,
2671                    predicate,
2672                    optional_variables,
2673                } => {
2674                    let input_matches = self
2675                        .execute_subplan(*input, prop_manager, params, ctx)
2676                        .await?;
2677
2678                    tracing::debug!(
2679                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2680                        predicate,
2681                        input_matches.len(),
2682                        optional_variables
2683                    );
2684
2685                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2686                    // Group rows by non-optional variables, apply filter, and ensure
2687                    // at least one row per group (with NULLs if filter removes all).
2688                    if !optional_variables.is_empty() {
2689                        // Helper to check if a key belongs to an optional variable.
2690                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2691                        let is_optional_key = |k: &str| -> bool {
2692                            optional_variables.contains(k)
2693                                || optional_variables
2694                                    .iter()
2695                                    .any(|var| k.starts_with(&format!("{}.", var)))
2696                        };
2697
2698                        // Helper to check if a key is internal (should not affect grouping)
2699                        let is_internal_key =
2700                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2701
2702                        // Compute the key (non-optional, non-internal variables) for grouping
2703                        let non_optional_vars: Vec<String> = input_matches
2704                            .first()
2705                            .map(|row| {
2706                                row.keys()
2707                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2708                                    .cloned()
2709                                    .collect()
2710                            })
2711                            .unwrap_or_default();
2712
2713                        // Group rows by their non-optional variable values
2714                        let mut groups: std::collections::HashMap<
2715                            Vec<u8>,
2716                            Vec<HashMap<String, Value>>,
2717                        > = std::collections::HashMap::new();
2718
2719                        for row in &input_matches {
2720                            // Create a key from non-optional variable values
2721                            let key: Vec<u8> = non_optional_vars
2722                                .iter()
2723                                .map(|var| {
2724                                    row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2725                                })
2726                                .collect::<Vec<_>>()
2727                                .join("|")
2728                                .into_bytes();
2729
2730                            groups.entry(key).or_default().push(row.clone());
2731                        }
2732
2733                        let mut filtered = Vec::new();
2734                        for (_key, group_rows) in groups {
2735                            let mut group_passed = Vec::new();
2736
2737                            for row in &group_rows {
2738                                // If optional variables are already NULL, preserve the row
2739                                let has_null_optional = optional_variables.iter().any(|var| {
2740                                    // Check both "var" and "var._vid" style keys
2741                                    let direct_null =
2742                                        matches!(row.get(var), Some(Value::Null) | None);
2743                                    let prefixed_null = row
2744                                        .keys()
2745                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2746                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2747                                    direct_null || prefixed_null
2748                                });
2749
2750                                if has_null_optional {
2751                                    group_passed.push(row.clone());
2752                                    continue;
2753                                }
2754
2755                                let res = self
2756                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2757                                    .await?;
2758
2759                                if res.as_bool().unwrap_or(false) {
2760                                    group_passed.push(row.clone());
2761                                }
2762                            }
2763
2764                            if group_passed.is_empty() {
2765                                // No rows passed - emit one row with NULLs for optional variables
2766                                // Use the first row's non-optional values as a template
2767                                if let Some(template) = group_rows.first() {
2768                                    let mut null_row = HashMap::new();
2769                                    for (k, v) in template {
2770                                        if is_optional_key(k) {
2771                                            null_row.insert(k.clone(), Value::Null);
2772                                        } else {
2773                                            null_row.insert(k.clone(), v.clone());
2774                                        }
2775                                    }
2776                                    filtered.push(null_row);
2777                                }
2778                            } else {
2779                                filtered.extend(group_passed);
2780                            }
2781                        }
2782
2783                        tracing::debug!(
2784                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2785                            input_matches.len(),
2786                            filtered.len()
2787                        );
2788
2789                        return Ok(filtered);
2790                    }
2791
2792                    // Standard filter for non-OPTIONAL MATCH
2793                    let mut filtered = Vec::new();
2794                    for row in input_matches.iter() {
2795                        let res = self
2796                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2797                            .await?;
2798
2799                        let passes = res.as_bool().unwrap_or(false);
2800
2801                        if passes {
2802                            filtered.push(row.clone());
2803                        }
2804                    }
2805
2806                    tracing::debug!(
2807                        "Filter: {} input rows -> {} output rows",
2808                        input_matches.len(),
2809                        filtered.len()
2810                    );
2811
2812                    Ok(filtered)
2813                }
2814                LogicalPlan::ProcedureCall {
2815                    procedure_name,
2816                    arguments,
2817                    yield_items,
2818                } => {
2819                    let yield_names: Vec<String> =
2820                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2821                    let results = self
2822                        .execute_procedure(
2823                            &procedure_name,
2824                            &arguments,
2825                            &yield_names,
2826                            prop_manager,
2827                            params,
2828                            ctx,
2829                        )
2830                        .await?;
2831
2832                    // Handle aliasing: collect all original values first, then
2833                    // build the aliased row in one pass. This avoids issues when
2834                    // an alias matches another yield item's original name (e.g.,
2835                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2836                    // clobber the original "b" before it is renamed to "d").
2837                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2838                    if !has_aliases {
2839                        // No aliases (includes YIELD * which produces empty yield_items) —
2840                        // pass through the procedure output rows unchanged.
2841                        Ok(results)
2842                    } else {
2843                        let mut aliased_results = Vec::with_capacity(results.len());
2844                        for row in results {
2845                            let mut new_row = HashMap::new();
2846                            for (name, alias) in &yield_items {
2847                                let col_name = alias.as_ref().unwrap_or(name);
2848                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2849                                new_row.insert(col_name.clone(), val);
2850                            }
2851                            aliased_results.push(new_row);
2852                        }
2853                        Ok(aliased_results)
2854                    }
2855                }
2856                LogicalPlan::VectorKnn { .. } => {
2857                    unreachable!("VectorKnn is handled by DataFusion engine")
2858                }
2859                LogicalPlan::InvertedIndexLookup { .. } => {
2860                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2861                }
2862                LogicalPlan::Sort { input, order_by } => {
2863                    let rows = self
2864                        .execute_subplan(*input, prop_manager, params, ctx)
2865                        .await?;
2866                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2867                        .await
2868                }
2869                LogicalPlan::Limit { input, skip, fetch } => {
2870                    let rows = self
2871                        .execute_subplan(*input, prop_manager, params, ctx)
2872                        .await?;
2873                    let skip = skip.unwrap_or(0);
2874                    let take = fetch.unwrap_or(usize::MAX);
2875                    Ok(rows.into_iter().skip(skip).take(take).collect())
2876                }
2877                LogicalPlan::Aggregate {
2878                    input,
2879                    group_by,
2880                    aggregates,
2881                } => {
2882                    let rows = self
2883                        .execute_subplan(*input, prop_manager, params, ctx)
2884                        .await?;
2885                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2886                        .await
2887                }
2888                LogicalPlan::Window {
2889                    input,
2890                    window_exprs,
2891                } => {
2892                    let rows = self
2893                        .execute_subplan(*input, prop_manager, params, ctx)
2894                        .await?;
2895                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2896                        .await
2897                }
2898                LogicalPlan::Project { input, projections } => {
2899                    let matches = self
2900                        .execute_subplan(*input, prop_manager, params, ctx)
2901                        .await?;
2902                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2903                        .await
2904                }
2905                LogicalPlan::Distinct { input } => {
2906                    let rows = self
2907                        .execute_subplan(*input, prop_manager, params, ctx)
2908                        .await?;
2909                    let mut seen = std::collections::HashSet::new();
2910                    let mut result = Vec::new();
2911                    for row in rows {
2912                        let key = Self::canonical_row_key(&row);
2913                        if seen.insert(key) {
2914                            result.push(row);
2915                        }
2916                    }
2917                    Ok(result)
2918                }
2919                LogicalPlan::Unwind {
2920                    input,
2921                    expr,
2922                    variable,
2923                } => {
2924                    let input_rows = self
2925                        .execute_subplan(*input, prop_manager, params, ctx)
2926                        .await?;
2927                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2928                        .await
2929                }
2930                LogicalPlan::Apply {
2931                    input,
2932                    subquery,
2933                    input_filter,
2934                } => {
2935                    let input_rows = self
2936                        .execute_subplan(*input, prop_manager, params, ctx)
2937                        .await?;
2938                    self.execute_apply(
2939                        input_rows,
2940                        &subquery,
2941                        input_filter.as_ref(),
2942                        prop_manager,
2943                        params,
2944                        ctx,
2945                    )
2946                    .await
2947                }
2948                LogicalPlan::SubqueryCall { input, subquery } => {
2949                    let input_rows = self
2950                        .execute_subplan(*input, prop_manager, params, ctx)
2951                        .await?;
2952                    // Execute subquery for each input row (correlated)
2953                    // No input_filter for CALL { }
2954                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2955                        .await
2956                }
2957                LogicalPlan::RecursiveCTE {
2958                    cte_name,
2959                    initial,
2960                    recursive,
2961                } => {
2962                    self.execute_recursive_cte(
2963                        &cte_name,
2964                        *initial,
2965                        *recursive,
2966                        prop_manager,
2967                        params,
2968                        ctx,
2969                    )
2970                    .await
2971                }
2972                LogicalPlan::CrossJoin { left, right } => {
2973                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2974                        .await
2975                }
2976                LogicalPlan::Set { .. }
2977                | LogicalPlan::Remove { .. }
2978                | LogicalPlan::Merge { .. }
2979                | LogicalPlan::Create { .. }
2980                | LogicalPlan::CreateBatch { .. } => {
2981                    unreachable!("mutations are handled by DataFusion engine")
2982                }
2983                LogicalPlan::Delete { .. } => {
2984                    unreachable!("mutations are handled by DataFusion engine")
2985                }
2986                LogicalPlan::Copy {
2987                    target,
2988                    source,
2989                    is_export,
2990                    options,
2991                } => {
2992                    if is_export {
2993                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2994                            .await
2995                    } else {
2996                        self.execute_copy(&target, &source, &options, prop_manager)
2997                            .await
2998                    }
2999                }
3000                LogicalPlan::Backup {
3001                    destination,
3002                    options,
3003                } => self.execute_backup(&destination, &options).await,
3004                LogicalPlan::Explain { plan } => {
3005                    let plan_str = format!("{:#?}", plan);
3006                    let mut row = HashMap::new();
3007                    row.insert("plan".to_string(), Value::String(plan_str));
3008                    Ok(vec![row])
3009                }
3010                LogicalPlan::ShortestPath { .. } => {
3011                    unreachable!("ShortestPath is handled by DataFusion engine")
3012                }
3013                LogicalPlan::AllShortestPaths { .. } => {
3014                    unreachable!("AllShortestPaths is handled by DataFusion engine")
3015                }
3016                LogicalPlan::Foreach { .. } => {
3017                    unreachable!("mutations are handled by DataFusion engine")
3018                }
3019                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
3020                LogicalPlan::BindZeroLengthPath { .. } => {
3021                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
3022                }
3023                LogicalPlan::BindPath { .. } => {
3024                    unreachable!("BindPath is handled by DataFusion engine")
3025                }
3026                LogicalPlan::QuantifiedPattern { .. } => {
3027                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
3028                }
3029                LogicalPlan::LocyProgram { .. }
3030                | LogicalPlan::LocyFold { .. }
3031                | LogicalPlan::LocyBestBy { .. }
3032                | LogicalPlan::LocyPriority { .. }
3033                | LogicalPlan::LocyDerivedScan { .. }
3034                | LogicalPlan::LocyProject { .. }
3035                | LogicalPlan::LocyModelInvoke { .. } => {
3036                    unreachable!("Locy operators are handled by DataFusion engine")
3037                }
3038            }
3039        })
3040    }
3041
3042    /// Execute a single plan from a FOREACH body with the given scope.
3043    ///
3044    /// Used by the DataFusion ForeachExec operator to delegate body clause
3045    /// execution back to the executor.
3046    #[expect(clippy::too_many_arguments)]
3047    pub(crate) async fn execute_foreach_body_plan(
3048        &self,
3049        plan: LogicalPlan,
3050        scope: &mut HashMap<String, Value>,
3051        writer: &uni_store::runtime::writer::Writer,
3052        prop_manager: &PropertyManager,
3053        params: &HashMap<String, Value>,
3054        ctx: Option<&QueryContext>,
3055        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3056    ) -> Result<()> {
3057        match plan {
3058            LogicalPlan::Set { items, .. } => {
3059                self.execute_set_items_locked(
3060                    &items,
3061                    scope,
3062                    writer,
3063                    prop_manager,
3064                    params,
3065                    ctx,
3066                    tx_l0,
3067                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3068                )
3069                .await?;
3070            }
3071            LogicalPlan::Remove { items, .. } => {
3072                self.execute_remove_items_locked(
3073                    &items,
3074                    scope,
3075                    writer,
3076                    prop_manager,
3077                    ctx,
3078                    tx_l0,
3079                    &crate::query::df_graph::mutation_common::Prefetch::default(),
3080                )
3081                .await?;
3082            }
3083            LogicalPlan::Delete { items, detach, .. } => {
3084                for expr in &items {
3085                    let val = self
3086                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
3087                        .await?;
3088                    self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3089                        .await?;
3090                }
3091            }
3092            LogicalPlan::Create { pattern, .. } => {
3093                self.execute_create_pattern(
3094                    &pattern,
3095                    scope,
3096                    writer,
3097                    prop_manager,
3098                    params,
3099                    ctx,
3100                    tx_l0,
3101                    None,
3102                )
3103                .await?;
3104            }
3105            LogicalPlan::CreateBatch { patterns, .. } => {
3106                for pattern in &patterns {
3107                    self.execute_create_pattern(
3108                        pattern,
3109                        scope,
3110                        writer,
3111                        prop_manager,
3112                        params,
3113                        ctx,
3114                        tx_l0,
3115                        None,
3116                    )
3117                    .await?;
3118                }
3119            }
3120            LogicalPlan::Merge {
3121                pattern,
3122                on_match: _,
3123                on_create,
3124                ..
3125            } => {
3126                // Fold ON CREATE SET so a NOT-NULL property set only by
3127                // ON CREATE SET passes create-time validation (RC4); the
3128                // post-create SET below settles the final values.
3129                let seed_props = self
3130                    .on_create_seed_props(on_create.as_ref(), scope, prop_manager, params, ctx)
3131                    .await?;
3132                self.execute_create_pattern(
3133                    &pattern,
3134                    scope,
3135                    writer,
3136                    prop_manager,
3137                    params,
3138                    ctx,
3139                    tx_l0,
3140                    Some(&seed_props),
3141                )
3142                .await?;
3143                if let Some(on_create_clause) = on_create {
3144                    self.execute_set_items_locked(
3145                        &on_create_clause.items,
3146                        scope,
3147                        writer,
3148                        prop_manager,
3149                        params,
3150                        ctx,
3151                        tx_l0,
3152                        &crate::query::df_graph::mutation_common::Prefetch::default(),
3153                    )
3154                    .await?;
3155                }
3156            }
3157            LogicalPlan::Foreach {
3158                variable,
3159                list,
3160                body,
3161                ..
3162            } => {
3163                let list_val = self
3164                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
3165                    .await?;
3166                let items = match list_val {
3167                    Value::List(arr) => arr,
3168                    Value::Null => return Ok(()),
3169                    _ => return Err(anyhow!("FOREACH requires a list")),
3170                };
3171                for item in items {
3172                    let mut nested_scope = scope.clone();
3173                    nested_scope.insert(variable.clone(), item);
3174                    for nested_plan in &body {
3175                        Box::pin(self.execute_foreach_body_plan(
3176                            nested_plan.clone(),
3177                            &mut nested_scope,
3178                            writer,
3179                            prop_manager,
3180                            params,
3181                            ctx,
3182                            tx_l0,
3183                        ))
3184                        .await?;
3185                    }
3186                }
3187            }
3188            _ => {
3189                return Err(anyhow!(
3190                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3191                ));
3192            }
3193        }
3194        Ok(())
3195    }
3196
3197    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3198        let mut pairs: Vec<_> = row.iter().collect();
3199        pairs.sort_by_key(|(k, _)| *k);
3200
3201        pairs
3202            .into_iter()
3203            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3204            .collect::<Vec<_>>()
3205            .join("|")
3206    }
3207
3208    fn canonical_value_key(v: &Value) -> String {
3209        match v {
3210            Value::Null => "null".to_string(),
3211            Value::Bool(b) => format!("b:{b}"),
3212            Value::Int(i) => format!("n:{i}"),
3213            Value::Float(f) => {
3214                if f.is_nan() {
3215                    "nan".to_string()
3216                } else if f.is_infinite() {
3217                    if f.is_sign_positive() {
3218                        "inf:+".to_string()
3219                    } else {
3220                        "inf:-".to_string()
3221                    }
3222                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3223                    format!("n:{}", *f as i64)
3224                } else {
3225                    format!("f:{f}")
3226                }
3227            }
3228            Value::String(s) => {
3229                if let Some(k) = Self::temporal_string_key(s) {
3230                    format!("temporal:{k}")
3231                } else {
3232                    format!("s:{s}")
3233                }
3234            }
3235            Value::Bytes(b) => format!("bytes:{:?}", b),
3236            Value::List(items) => format!(
3237                "list:[{}]",
3238                items
3239                    .iter()
3240                    .map(Self::canonical_value_key)
3241                    .collect::<Vec<_>>()
3242                    .join(",")
3243            ),
3244            Value::Map(map) => {
3245                let mut pairs: Vec<_> = map.iter().collect();
3246                pairs.sort_by_key(|(k, _)| *k);
3247                format!(
3248                    "map:{{{}}}",
3249                    pairs
3250                        .into_iter()
3251                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3252                        .collect::<Vec<_>>()
3253                        .join(",")
3254                )
3255            }
3256            Value::Node(n) => {
3257                let mut labels = n.labels.clone();
3258                labels.sort();
3259                format!(
3260                    "node:{}:{}:{}",
3261                    n.vid.as_u64(),
3262                    labels.join(":"),
3263                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3264                )
3265            }
3266            Value::Edge(e) => format!(
3267                "edge:{}:{}:{}:{}:{}",
3268                e.eid.as_u64(),
3269                e.edge_type,
3270                e.src.as_u64(),
3271                e.dst.as_u64(),
3272                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3273            ),
3274            Value::Path(p) => format!(
3275                "path:nodes=[{}];edges=[{}]",
3276                p.nodes
3277                    .iter()
3278                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3279                    .collect::<Vec<_>>()
3280                    .join(","),
3281                p.edges
3282                    .iter()
3283                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3284                    .collect::<Vec<_>>()
3285                    .join(",")
3286            ),
3287            Value::Vector(vs) => format!("vec:{:?}", vs),
3288            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3289            _ => format!("{v:?}"),
3290        }
3291    }
3292
3293    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3294        match t {
3295            uni_common::TemporalValue::Date { days_since_epoch } => {
3296                format!("date:{days_since_epoch}")
3297            }
3298            uni_common::TemporalValue::LocalTime {
3299                nanos_since_midnight,
3300            } => format!("localtime:{nanos_since_midnight}"),
3301            uni_common::TemporalValue::Time {
3302                nanos_since_midnight,
3303                offset_seconds,
3304            } => {
3305                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3306                format!("time:{utc_nanos}")
3307            }
3308            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3309                format!("localdatetime:{nanos_since_epoch}")
3310            }
3311            uni_common::TemporalValue::DateTime {
3312                nanos_since_epoch, ..
3313            } => format!("datetime:{nanos_since_epoch}"),
3314            uni_common::TemporalValue::Duration {
3315                months,
3316                days,
3317                nanos,
3318            } => format!("duration:{months}:{days}:{nanos}"),
3319            uni_common::TemporalValue::Btic { lo, hi, meta } => {
3320                format!("btic:{lo}:{hi}:{meta}")
3321            }
3322        }
3323    }
3324
3325    fn temporal_string_key(s: &str) -> Option<String> {
3326        let fn_name = match classify_temporal(s)? {
3327            uni_common::TemporalType::Date => "DATE",
3328            uni_common::TemporalType::LocalTime => "LOCALTIME",
3329            uni_common::TemporalType::Time => "TIME",
3330            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3331            uni_common::TemporalType::DateTime => "DATETIME",
3332            uni_common::TemporalType::Duration => "DURATION",
3333            uni_common::TemporalType::Btic => return None, // BTIC uses dedicated parsing
3334        };
3335        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3336            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3337            _ => None,
3338        }
3339    }
3340
3341    /// Execute aggregate operation: GROUP BY + aggregate functions.
3342    /// Interval for timeout checks in aggregate loops.
3343    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3344
3345    pub(crate) async fn execute_aggregate(
3346        &self,
3347        rows: Vec<HashMap<String, Value>>,
3348        group_by: &[Expr],
3349        aggregates: &[Expr],
3350        prop_manager: &PropertyManager,
3351        params: &HashMap<String, Value>,
3352        ctx: Option<&QueryContext>,
3353    ) -> Result<Vec<HashMap<String, Value>>> {
3354        // CWE-400: Check timeout before aggregation
3355        if let Some(ctx) = ctx {
3356            ctx.check_timeout()?;
3357        }
3358
3359        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3360
3361        // Cypher semantics: aggregation without grouping keys returns one row even
3362        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3363        if rows.is_empty() {
3364            if group_by.is_empty() {
3365                let accs = Self::create_accumulators(aggregates);
3366                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3367                return Ok(vec![row]);
3368            }
3369            return Ok(vec![]);
3370        }
3371
3372        for (idx, row) in rows.into_iter().enumerate() {
3373            // Periodic timeout check during aggregation
3374            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3375                && let Some(ctx) = ctx
3376            {
3377                ctx.check_timeout()?;
3378            }
3379
3380            let key_vals = self
3381                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3382                .await?;
3383            // Build a canonical key so grouping follows Cypher value semantics
3384            // (e.g. temporal equality by instant, numeric normalization where applicable).
3385            let key_str = format!(
3386                "[{}]",
3387                key_vals
3388                    .iter()
3389                    .map(Self::canonical_value_key)
3390                    .collect::<Vec<_>>()
3391                    .join(",")
3392            );
3393
3394            let entry = groups
3395                .entry(key_str)
3396                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3397
3398            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3399                .await?;
3400        }
3401
3402        let results = groups
3403            .values()
3404            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3405            .collect();
3406
3407        Ok(results)
3408    }
3409
3410    pub(crate) async fn execute_window(
3411        &self,
3412        mut rows: Vec<HashMap<String, Value>>,
3413        window_exprs: &[Expr],
3414        _prop_manager: &PropertyManager,
3415        _params: &HashMap<String, Value>,
3416        ctx: Option<&QueryContext>,
3417    ) -> Result<Vec<HashMap<String, Value>>> {
3418        // CWE-400: Check timeout before window computation
3419        if let Some(ctx) = ctx {
3420            ctx.check_timeout()?;
3421        }
3422
3423        // If no rows or no window expressions, return as-is
3424        if rows.is_empty() || window_exprs.is_empty() {
3425            return Ok(rows);
3426        }
3427
3428        // Process each window function expression
3429        for window_expr in window_exprs {
3430            // Extract window function details
3431            let Expr::FunctionCall {
3432                name,
3433                args,
3434                window_spec: Some(window_spec),
3435                ..
3436            } = window_expr
3437            else {
3438                return Err(anyhow!(
3439                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3440                    window_expr
3441                ));
3442            };
3443
3444            let name_upper = name.to_uppercase();
3445
3446            // Validate it's a supported window function
3447            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3448                return Err(anyhow!(
3449                    "Unsupported window function: {}. Supported functions: {}",
3450                    name,
3451                    WINDOW_FUNCTIONS.join(", ")
3452                ));
3453            }
3454
3455            // Build partition groups based on PARTITION BY clause
3456            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3457
3458            for (row_idx, row) in rows.iter().enumerate() {
3459                // Evaluate partition key
3460                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3461                    // No partitioning - all rows in one partition
3462                    vec![]
3463                } else {
3464                    window_spec
3465                        .partition_by
3466                        .iter()
3467                        .map(|expr| self.evaluate_simple_expr(expr, row))
3468                        .collect::<Result<Vec<_>>>()?
3469                };
3470
3471                partition_map
3472                    .entry(partition_key)
3473                    .or_default()
3474                    .push(row_idx);
3475            }
3476
3477            // Process each partition
3478            for (_partition_key, row_indices) in partition_map.iter_mut() {
3479                // Sort rows within partition by ORDER BY clause
3480                if !window_spec.order_by.is_empty() {
3481                    row_indices.sort_by(|&a, &b| {
3482                        for sort_item in &window_spec.order_by {
3483                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3484                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3485
3486                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3487                                let cmp = Executor::compare_values(&va, &vb);
3488                                let cmp = if sort_item.ascending {
3489                                    cmp
3490                                } else {
3491                                    cmp.reverse()
3492                                };
3493                                if cmp != std::cmp::Ordering::Equal {
3494                                    return cmp;
3495                                }
3496                            }
3497                        }
3498                        std::cmp::Ordering::Equal
3499                    });
3500                }
3501
3502                // Compute window function values for this partition
3503                for (position, &row_idx) in row_indices.iter().enumerate() {
3504                    let window_value = match name_upper.as_str() {
3505                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3506                        "RANK" => {
3507                            // RANK: position (1-indexed) of first row in group of tied rows
3508                            let rank = if position == 0 {
3509                                1i64
3510                            } else {
3511                                let prev_row_idx = row_indices[position - 1];
3512                                let same_as_prev = self.rows_have_same_sort_keys(
3513                                    &window_spec.order_by,
3514                                    &rows,
3515                                    row_idx,
3516                                    prev_row_idx,
3517                                );
3518
3519                                if same_as_prev {
3520                                    // Walk backwards to find where this group started
3521                                    let mut group_start = position - 1;
3522                                    while group_start > 0 {
3523                                        let curr_idx = row_indices[group_start];
3524                                        let prev_idx = row_indices[group_start - 1];
3525                                        if !self.rows_have_same_sort_keys(
3526                                            &window_spec.order_by,
3527                                            &rows,
3528                                            curr_idx,
3529                                            prev_idx,
3530                                        ) {
3531                                            break;
3532                                        }
3533                                        group_start -= 1;
3534                                    }
3535                                    (group_start + 1) as i64
3536                                } else {
3537                                    (position + 1) as i64
3538                                }
3539                            };
3540                            Value::from(rank)
3541                        }
3542                        "DENSE_RANK" => {
3543                            // Dense rank: continuous ranking without gaps
3544                            let mut dense_rank = 1i64;
3545                            for i in 0..position {
3546                                let curr_idx = row_indices[i + 1];
3547                                let prev_idx = row_indices[i];
3548                                if !self.rows_have_same_sort_keys(
3549                                    &window_spec.order_by,
3550                                    &rows,
3551                                    curr_idx,
3552                                    prev_idx,
3553                                ) {
3554                                    dense_rank += 1;
3555                                }
3556                            }
3557                            Value::from(dense_rank)
3558                        }
3559                        "LAG" => {
3560                            let (value_expr, offset, default_value) =
3561                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3562
3563                            if position >= offset {
3564                                let target_idx = row_indices[position - offset];
3565                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3566                            } else {
3567                                default_value
3568                            }
3569                        }
3570                        "LEAD" => {
3571                            let (value_expr, offset, default_value) =
3572                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3573
3574                            if position + offset < row_indices.len() {
3575                                let target_idx = row_indices[position + offset];
3576                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3577                            } else {
3578                                default_value
3579                            }
3580                        }
3581                        "NTILE" => {
3582                            // Extract num_buckets argument: NTILE(num_buckets)
3583                            let num_buckets_expr = args.first().ok_or_else(|| {
3584                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3585                            })?;
3586                            let num_buckets_val =
3587                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3588                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3589                                anyhow!(
3590                                    "NTILE argument must be an integer, got: {:?}",
3591                                    num_buckets_val
3592                                )
3593                            })?;
3594
3595                            if num_buckets <= 0 {
3596                                return Err(anyhow!(
3597                                    "NTILE bucket count must be positive, got: {}",
3598                                    num_buckets
3599                                ));
3600                            }
3601
3602                            let num_buckets = num_buckets as usize;
3603                            let partition_size = row_indices.len();
3604
3605                            // Calculate bucket assignment using standard algorithm
3606                            // For N rows and B buckets:
3607                            // - Base size: N / B
3608                            // - Extra rows: N % B (go to first buckets)
3609                            let base_size = partition_size / num_buckets;
3610                            let extra_rows = partition_size % num_buckets;
3611
3612                            // Determine bucket for current row
3613                            let bucket = if position < extra_rows * (base_size + 1) {
3614                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3615                                position / (base_size + 1) + 1
3616                            } else {
3617                                // Row is in one of the normal-sized buckets
3618                                let adjusted_position = position - extra_rows * (base_size + 1);
3619                                extra_rows + (adjusted_position / base_size) + 1
3620                            };
3621
3622                            Value::from(bucket as i64)
3623                        }
3624                        "FIRST_VALUE" => {
3625                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3626                            let value_expr = args.first().ok_or_else(|| {
3627                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3628                            })?;
3629
3630                            // Get the first row in the partition (after ordering)
3631                            if row_indices.is_empty() {
3632                                Value::Null
3633                            } else {
3634                                let first_idx = row_indices[0];
3635                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3636                            }
3637                        }
3638                        "LAST_VALUE" => {
3639                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3640                            let value_expr = args.first().ok_or_else(|| {
3641                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3642                            })?;
3643
3644                            // Get the last row in the partition (after ordering)
3645                            if row_indices.is_empty() {
3646                                Value::Null
3647                            } else {
3648                                let last_idx = row_indices[row_indices.len() - 1];
3649                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3650                            }
3651                        }
3652                        "NTH_VALUE" => {
3653                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3654                            if args.len() != 2 {
3655                                return Err(anyhow!(
3656                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3657                                ));
3658                            }
3659
3660                            let value_expr = &args[0];
3661                            let n_expr = &args[1];
3662
3663                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3664                            let n = n_val.as_i64().ok_or_else(|| {
3665                                anyhow!(
3666                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3667                                    n_val
3668                                )
3669                            })?;
3670
3671                            if n <= 0 {
3672                                return Err(anyhow!(
3673                                    "NTH_VALUE position must be positive, got: {}",
3674                                    n
3675                                ));
3676                            }
3677
3678                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3679                            if nth_index < row_indices.len() {
3680                                let nth_idx = row_indices[nth_index];
3681                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3682                            } else {
3683                                Value::Null
3684                            }
3685                        }
3686                        _ => unreachable!("Window function {} already validated", name),
3687                    };
3688
3689                    // Add window function result to row
3690                    // Use the window expression's string representation as the column name
3691                    let col_name = window_expr.to_string_repr();
3692                    rows[row_idx].insert(col_name, window_value);
3693                }
3694            }
3695        }
3696
3697        Ok(rows)
3698    }
3699
3700    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3701    ///
3702    /// Uses `&self` for consistency with other evaluation methods, though it only
3703    /// recurses for property access.
3704    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3705        match expr {
3706            Expr::Variable(name) => row
3707                .get(name)
3708                .cloned()
3709                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3710            Expr::Property(base, prop) => {
3711                let base_val = self.evaluate_simple_expr(base, row)?;
3712                if let Value::Map(map) = base_val {
3713                    map.get(prop)
3714                        .cloned()
3715                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3716                } else {
3717                    Err(anyhow!("Cannot access property on non-object"))
3718                }
3719            }
3720            Expr::Literal(lit) => Ok(lit.to_value()),
3721            _ => Err(anyhow!(
3722                "Unsupported expression in window function: {:?}",
3723                expr
3724            )),
3725        }
3726    }
3727
3728    /// Check if two rows have matching sort keys for ranking functions.
3729    fn rows_have_same_sort_keys(
3730        &self,
3731        order_by: &[uni_cypher::ast::SortItem],
3732        rows: &[HashMap<String, Value>],
3733        idx_a: usize,
3734        idx_b: usize,
3735    ) -> bool {
3736        order_by.iter().all(|sort_item| {
3737            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3738            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3739            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3740        })
3741    }
3742
3743    /// Extract offset and default value for LAG/LEAD window functions.
3744    fn extract_lag_lead_params<'a>(
3745        &self,
3746        func_name: &str,
3747        args: &'a [Expr],
3748        row: &HashMap<String, Value>,
3749    ) -> Result<(&'a Expr, usize, Value)> {
3750        let value_expr = args.first().ok_or_else(|| {
3751            anyhow!(
3752                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3753                func_name,
3754                func_name
3755            )
3756        })?;
3757
3758        let offset = if let Some(offset_expr) = args.get(1) {
3759            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3760            offset_val.as_i64().ok_or_else(|| {
3761                anyhow!(
3762                    "{} offset must be an integer, got: {:?}",
3763                    func_name,
3764                    offset_val
3765                )
3766            })? as usize
3767        } else {
3768            1
3769        };
3770
3771        let default_value = if let Some(default_expr) = args.get(2) {
3772            self.evaluate_simple_expr(default_expr, row)?
3773        } else {
3774            Value::Null
3775        };
3776
3777        Ok((value_expr, offset, default_value))
3778    }
3779
3780    /// Evaluate group-by key expressions for a row.
3781    pub(crate) async fn evaluate_group_keys(
3782        &self,
3783        group_by: &[Expr],
3784        row: &HashMap<String, Value>,
3785        prop_manager: &PropertyManager,
3786        params: &HashMap<String, Value>,
3787        ctx: Option<&QueryContext>,
3788    ) -> Result<Vec<Value>> {
3789        let mut key_vals = Vec::new();
3790        for expr in group_by {
3791            key_vals.push(
3792                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3793                    .await?,
3794            );
3795        }
3796        Ok(key_vals)
3797    }
3798
3799    /// Update accumulators with values from the current row.
3800    pub(crate) async fn update_accumulators(
3801        &self,
3802        accs: &mut [Accumulator],
3803        aggregates: &[Expr],
3804        row: &HashMap<String, Value>,
3805        prop_manager: &PropertyManager,
3806        params: &HashMap<String, Value>,
3807        ctx: Option<&QueryContext>,
3808    ) -> Result<()> {
3809        for (i, agg_expr) in aggregates.iter().enumerate() {
3810            if let Expr::FunctionCall { args, .. } = agg_expr {
3811                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3812                let val = if is_wildcard {
3813                    Value::Null
3814                } else {
3815                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3816                        .await?
3817                };
3818                accs[i].update(&val, is_wildcard);
3819            }
3820        }
3821        Ok(())
3822    }
3823
3824    /// Execute sort operation with ORDER BY clauses.
3825    pub(crate) async fn execute_recursive_cte(
3826        &self,
3827        cte_name: &str,
3828        initial: LogicalPlan,
3829        recursive: LogicalPlan,
3830        prop_manager: &PropertyManager,
3831        params: &HashMap<String, Value>,
3832        ctx: Option<&QueryContext>,
3833    ) -> Result<Vec<HashMap<String, Value>>> {
3834        use std::collections::HashSet;
3835
3836        // Helper to create a stable key for cycle detection.
3837        // Uses sorted keys to ensure consistent ordering.
3838        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3839            let mut pairs: Vec<_> = row.iter().collect();
3840            pairs.sort_by(|a, b| a.0.cmp(b.0));
3841            format!("{pairs:?}")
3842        }
3843
3844        // 1. Execute Anchor
3845        let mut working_table = self
3846            .execute_subplan(initial, prop_manager, params, ctx)
3847            .await?;
3848        let mut result_table = working_table.clone();
3849
3850        // Track seen rows for cycle detection
3851        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3852
3853        // 2. Loop
3854        // Safety: Max iterations to prevent infinite loop
3855        let max_iterations = self.config.max_recursive_cte_iterations;
3856        for _iteration in 0..max_iterations {
3857            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3858            if let Some(ctx) = ctx {
3859                ctx.check_timeout()?;
3860            }
3861
3862            if working_table.is_empty() {
3863                break;
3864            }
3865
3866            // Bind working table to CTE name in params
3867            let working_val = Value::List(
3868                working_table
3869                    .iter()
3870                    .map(|row| {
3871                        if row.len() == 1 {
3872                            row.values().next().unwrap().clone()
3873                        } else {
3874                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3875                        }
3876                    })
3877                    .collect(),
3878            );
3879
3880            let mut next_params = params.clone();
3881            next_params.insert(cte_name.to_string(), working_val);
3882
3883            // Execute recursive part
3884            let next_result = self
3885                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3886                .await?;
3887
3888            if next_result.is_empty() {
3889                break;
3890            }
3891
3892            // Filter out already-seen rows (cycle detection)
3893            let new_rows: Vec<_> = next_result
3894                .into_iter()
3895                .filter(|row| {
3896                    let key = row_key(row);
3897                    seen.insert(key) // Returns false if already present
3898                })
3899                .collect();
3900
3901            if new_rows.is_empty() {
3902                // All results were cycles - terminate
3903                break;
3904            }
3905
3906            result_table.extend(new_rows.clone());
3907            working_table = new_rows;
3908        }
3909
3910        // Output accumulated results as a variable
3911        let final_list = Value::List(
3912            result_table
3913                .into_iter()
3914                .map(|row| {
3915                    // If the CTE returns a single column and we want to treat it as a list of values?
3916                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3917                    // Cypher LISTs usually contain values.
3918                    // If the row has 1 column, maybe unwrap?
3919                    // But SQL CTEs are tables.
3920                    // Let's stick to List<Map> for consistency with how we pass it in.
3921                    // UNLESS the user extracts it.
3922                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3923                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3924                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3925                    // If row has 1 col, users expect `[val, val]`.
3926                    if row.len() == 1 {
3927                        row.values().next().unwrap().clone()
3928                    } else {
3929                        Value::Map(row.into_iter().collect())
3930                    }
3931                })
3932                .collect(),
3933        );
3934
3935        let mut final_row = HashMap::new();
3936        final_row.insert(cte_name.to_string(), final_list);
3937        Ok(vec![final_row])
3938    }
3939
3940    /// Interval for timeout checks in sort loops.
3941    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3942
3943    pub(crate) async fn execute_sort(
3944        &self,
3945        rows: Vec<HashMap<String, Value>>,
3946        order_by: &[uni_cypher::ast::SortItem],
3947        prop_manager: &PropertyManager,
3948        params: &HashMap<String, Value>,
3949        ctx: Option<&QueryContext>,
3950    ) -> Result<Vec<HashMap<String, Value>>> {
3951        // CWE-400: Check timeout before potentially expensive sort
3952        if let Some(ctx) = ctx {
3953            ctx.check_timeout()?;
3954        }
3955
3956        let mut rows_with_keys = Vec::with_capacity(rows.len());
3957        for (idx, row) in rows.into_iter().enumerate() {
3958            // Periodic timeout check during key extraction
3959            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3960                && let Some(ctx) = ctx
3961            {
3962                ctx.check_timeout()?;
3963            }
3964
3965            let mut keys = Vec::new();
3966            for item in order_by {
3967                let val = row
3968                    .get(&item.expr.to_string_repr())
3969                    .cloned()
3970                    .unwrap_or(Value::Null);
3971                let val = if val.is_null() {
3972                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3973                        .await
3974                        .unwrap_or(Value::Null)
3975                } else {
3976                    val
3977                };
3978                keys.push(val);
3979            }
3980            rows_with_keys.push((row, keys));
3981        }
3982
3983        // Check timeout again before synchronous sort (can't be interrupted)
3984        if let Some(ctx) = ctx {
3985            ctx.check_timeout()?;
3986        }
3987
3988        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3989
3990        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3991    }
3992
3993    /// Create accumulators for aggregate expressions.
3994    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3995        aggregates
3996            .iter()
3997            .map(|expr| {
3998                if let Expr::FunctionCall { name, distinct, .. } = expr {
3999                    Accumulator::new(name, *distinct)
4000                } else {
4001                    Accumulator::new("COUNT", false)
4002                }
4003            })
4004            .collect()
4005    }
4006
4007    /// Build result row from group-by keys and accumulators.
4008    pub(crate) fn build_aggregate_result(
4009        group_by: &[Expr],
4010        aggregates: &[Expr],
4011        key_vals: &[Value],
4012        accs: &[Accumulator],
4013    ) -> HashMap<String, Value> {
4014        let mut res_row = HashMap::new();
4015        for (i, expr) in group_by.iter().enumerate() {
4016            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
4017        }
4018        for (i, expr) in aggregates.iter().enumerate() {
4019            // Use aggregate_column_name to ensure consistency with planner
4020            let col_name = crate::query::planner::aggregate_column_name(expr);
4021            res_row.insert(col_name, accs[i].finish());
4022        }
4023        res_row
4024    }
4025
4026    /// Compare and return ordering for sort operation.
4027    pub(crate) fn compare_sort_keys(
4028        a_keys: &[Value],
4029        b_keys: &[Value],
4030        order_by: &[uni_cypher::ast::SortItem],
4031    ) -> std::cmp::Ordering {
4032        for (i, item) in order_by.iter().enumerate() {
4033            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
4034            if order != std::cmp::Ordering::Equal {
4035                return if item.ascending {
4036                    order
4037                } else {
4038                    order.reverse()
4039                };
4040            }
4041        }
4042        std::cmp::Ordering::Equal
4043    }
4044
4045    /// Executes BACKUP command to local or cloud storage.
4046    ///
4047    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
4048    pub(crate) async fn execute_backup(
4049        &self,
4050        destination: &str,
4051        _options: &HashMap<String, Value>,
4052    ) -> Result<Vec<HashMap<String, Value>>> {
4053        // 1. Flush L0
4054        if let Some(writer_arc) = &self.writer {
4055            let writer: &uni_store::Writer = writer_arc.as_ref();
4056            writer.flush_to_l1(None).await?;
4057        }
4058
4059        // 2. Snapshot
4060        let snapshot_manager = self.storage.snapshot_manager();
4061        let snapshot = snapshot_manager
4062            .load_latest_snapshot()
4063            .await?
4064            .ok_or_else(|| anyhow!("No snapshot found"))?;
4065
4066        // 3. Copy files - cloud or local path
4067        if is_cloud_url(destination) {
4068            self.backup_to_cloud(destination, &snapshot.snapshot_id)
4069                .await?;
4070        } else {
4071            // Validate local destination path against sandbox
4072            let validated_dest = self.validate_path(destination)?;
4073            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4074                .await?;
4075        }
4076
4077        let mut res = HashMap::new();
4078        res.insert(
4079            "status".to_string(),
4080            Value::String("Backup completed".to_string()),
4081        );
4082        res.insert(
4083            "snapshot_id".to_string(),
4084            Value::String(snapshot.snapshot_id),
4085        );
4086        Ok(vec![res])
4087    }
4088
4089    /// Backs up database to a local filesystem destination.
4090    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4091        let source_path = std::path::Path::new(self.storage.base_path());
4092
4093        if !dest_path.exists() {
4094            std::fs::create_dir_all(dest_path)?;
4095        }
4096
4097        // Recursive copy (local to local)
4098        if source_path.exists() {
4099            Self::copy_dir_all(source_path, dest_path)?;
4100        }
4101
4102        // Copy schema to destination/catalog/schema.json
4103        let schema_manager = self.storage.schema_manager();
4104        let dest_catalog = dest_path.join("catalog");
4105        if !dest_catalog.exists() {
4106            std::fs::create_dir_all(&dest_catalog)?;
4107        }
4108
4109        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4110        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4111
4112        Ok(())
4113    }
4114
4115    /// Backs up database to a cloud storage destination.
4116    ///
4117    /// Streams data from source to destination, supporting cross-cloud backups.
4118    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4119        use object_store::ObjectStore;
4120        use object_store::ObjectStoreExt;
4121        use object_store::local::LocalFileSystem;
4122        use object_store::path::Path as ObjPath;
4123
4124        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4125        let source_path = std::path::Path::new(self.storage.base_path());
4126
4127        // Create local store for source, coerced to dyn ObjectStore
4128        let src_store: Arc<dyn ObjectStore> =
4129            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4130
4131        // Copy catalog/ directory
4132        let catalog_src = ObjPath::from("catalog");
4133        let catalog_dst = if dest_prefix.as_ref().is_empty() {
4134            ObjPath::from("catalog")
4135        } else {
4136            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4137        };
4138        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4139
4140        // Copy storage/ directory
4141        let storage_src = ObjPath::from("storage");
4142        let storage_dst = if dest_prefix.as_ref().is_empty() {
4143            ObjPath::from("storage")
4144        } else {
4145            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4146        };
4147        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4148
4149        // Ensure schema is present at canonical catalog location.
4150        let schema_manager = self.storage.schema_manager();
4151        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4152        let schema_path = if dest_prefix.as_ref().is_empty() {
4153            ObjPath::from("catalog/schema.json")
4154        } else {
4155            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4156        };
4157        dest_store
4158            .put(&schema_path, bytes::Bytes::from(schema_content).into())
4159            .await?;
4160
4161        Ok(())
4162    }
4163
4164    /// Maximum directory depth for backup operations.
4165    ///
4166    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
4167    /// excessively deep directory structures.
4168    const MAX_BACKUP_DEPTH: usize = 100;
4169
4170    /// Maximum file count for backup operations.
4171    ///
4172    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
4173    /// long-running operations from malicious or unexpectedly large directories.
4174    const MAX_BACKUP_FILES: usize = 100_000;
4175
4176    /// Recursively copies a directory with security limits.
4177    ///
4178    /// # Security
4179    ///
4180    /// - **CWE-674**: Depth limit prevents stack overflow
4181    /// - **CWE-400**: File count limit prevents resource exhaustion
4182    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
4183    pub(crate) fn copy_dir_all(
4184        src: &std::path::Path,
4185        dst: &std::path::Path,
4186    ) -> std::io::Result<()> {
4187        let mut file_count = 0usize;
4188        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4189    }
4190
4191    /// Internal implementation with depth and file count tracking.
4192    pub(crate) fn copy_dir_all_impl(
4193        src: &std::path::Path,
4194        dst: &std::path::Path,
4195        depth: usize,
4196        file_count: &mut usize,
4197    ) -> std::io::Result<()> {
4198        if depth >= Self::MAX_BACKUP_DEPTH {
4199            return Err(std::io::Error::new(
4200                std::io::ErrorKind::InvalidInput,
4201                format!(
4202                    "Maximum backup depth {} exceeded at {:?}",
4203                    Self::MAX_BACKUP_DEPTH,
4204                    src
4205                ),
4206            ));
4207        }
4208
4209        std::fs::create_dir_all(dst)?;
4210
4211        for entry in std::fs::read_dir(src)? {
4212            if *file_count >= Self::MAX_BACKUP_FILES {
4213                return Err(std::io::Error::new(
4214                    std::io::ErrorKind::InvalidInput,
4215                    format!(
4216                        "Maximum backup file count {} exceeded",
4217                        Self::MAX_BACKUP_FILES
4218                    ),
4219                ));
4220            }
4221            *file_count += 1;
4222
4223            let entry = entry?;
4224            let metadata = entry.metadata()?;
4225
4226            // Skip symlinks to prevent loops and traversal attacks
4227            if metadata.file_type().is_symlink() {
4228                // Silently skip - logging would require tracing dependency
4229                continue;
4230            }
4231
4232            let dst_path = dst.join(entry.file_name());
4233            if metadata.is_dir() {
4234                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4235            } else {
4236                std::fs::copy(entry.path(), dst_path)?;
4237            }
4238        }
4239        Ok(())
4240    }
4241
4242    pub(crate) async fn execute_copy(
4243        &self,
4244        target: &str,
4245        source: &str,
4246        options: &HashMap<String, Value>,
4247        prop_manager: &PropertyManager,
4248    ) -> Result<Vec<HashMap<String, Value>>> {
4249        let format = options
4250            .get("format")
4251            .and_then(|v| v.as_str())
4252            .unwrap_or_else(|| {
4253                if source.ends_with(".parquet") {
4254                    "parquet"
4255                } else {
4256                    "csv"
4257                }
4258            });
4259
4260        match format.to_lowercase().as_str() {
4261            "csv" => self.execute_csv_import(target, source, options).await,
4262            "parquet" => {
4263                self.execute_parquet_import(target, source, options, prop_manager)
4264                    .await
4265            }
4266            _ => Err(anyhow!("Unsupported format: {}", format)),
4267        }
4268    }
4269
4270    pub(crate) async fn execute_csv_import(
4271        &self,
4272        target: &str,
4273        source: &str,
4274        options: &HashMap<String, Value>,
4275    ) -> Result<Vec<HashMap<String, Value>>> {
4276        // Validate source path against sandbox
4277        let validated_source = self.validate_path(source)?;
4278
4279        let writer_lock = self
4280            .writer
4281            .as_ref()
4282            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4283
4284        let schema = self.storage.schema_manager().schema();
4285
4286        // 1. Determine if target is Label or EdgeType
4287        let label_meta = schema.labels.get(target);
4288        let edge_meta = schema.edge_types.get(target);
4289
4290        if label_meta.is_none() && edge_meta.is_none() {
4291            return Err(anyhow!("Target '{}' not found in schema", target));
4292        }
4293
4294        // 2. Open CSV
4295        let delimiter_str = options
4296            .get("delimiter")
4297            .and_then(|v| v.as_str())
4298            .unwrap_or(",");
4299        let delimiter = if delimiter_str.is_empty() {
4300            b','
4301        } else {
4302            delimiter_str.as_bytes()[0]
4303        };
4304        let has_header = options
4305            .get("header")
4306            .and_then(|v| v.as_bool())
4307            .unwrap_or(true);
4308
4309        let mut rdr = csv::ReaderBuilder::new()
4310            .delimiter(delimiter)
4311            .has_headers(has_header)
4312            .from_path(&validated_source)?;
4313
4314        let headers = rdr.headers()?.clone();
4315        let mut count = 0;
4316
4317        let writer: &uni_store::Writer = writer_lock.as_ref();
4318
4319        // Chunked VID/EID refill for streaming CSV: amortizes the IdAllocator
4320        // mutex over `CSV_ID_CHUNK` rows. End-of-iteration may waste up to
4321        // CSV_ID_CHUNK-1 IDs — harmless in the u64 space.
4322        const CSV_ID_CHUNK: usize = 256;
4323
4324        if label_meta.is_some() {
4325            let target_props = schema
4326                .properties
4327                .get(target)
4328                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4329
4330            let mut vid_chunk: std::collections::VecDeque<Vid> =
4331                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4332            for result in rdr.records() {
4333                let record = result?;
4334                let mut props = HashMap::new();
4335
4336                for (i, header) in headers.iter().enumerate() {
4337                    if let Some(val_str) = record.get(i)
4338                        && let Some(prop_meta) = target_props.get(header)
4339                    {
4340                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4341                        props.insert(header.to_string(), val);
4342                    }
4343                }
4344
4345                if vid_chunk.is_empty() {
4346                    vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4347                }
4348                let vid = vid_chunk.pop_front().unwrap();
4349                writer
4350                    .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4351                    .await?;
4352                count += 1;
4353            }
4354        } else if let Some(meta) = edge_meta {
4355            let type_id = meta.id;
4356            let target_props = schema
4357                .properties
4358                .get(target)
4359                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4360
4361            // For edges, we need src and dst VIDs.
4362            // Expecting columns '_src' and '_dst' or as specified in options.
4363            let src_col = options
4364                .get("src_col")
4365                .and_then(|v| v.as_str())
4366                .unwrap_or("_src");
4367            let dst_col = options
4368                .get("dst_col")
4369                .and_then(|v| v.as_str())
4370                .unwrap_or("_dst");
4371
4372            let mut eid_chunk: std::collections::VecDeque<Eid> =
4373                std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4374            for result in rdr.records() {
4375                let record = result?;
4376                let mut props = HashMap::new();
4377                let mut src_vid = None;
4378                let mut dst_vid = None;
4379
4380                for (i, header) in headers.iter().enumerate() {
4381                    if let Some(val_str) = record.get(i) {
4382                        if header == src_col {
4383                            src_vid =
4384                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4385                        } else if header == dst_col {
4386                            dst_vid =
4387                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4388                        } else if let Some(prop_meta) = target_props.get(header) {
4389                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4390                            props.insert(header.to_string(), val);
4391                        }
4392                    }
4393                }
4394
4395                let src =
4396                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4397                let dst = dst_vid
4398                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4399
4400                if eid_chunk.is_empty() {
4401                    eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4402                }
4403                let eid = eid_chunk.pop_front().unwrap();
4404                writer
4405                    .insert_edge(
4406                        src,
4407                        dst,
4408                        type_id,
4409                        eid,
4410                        props,
4411                        Some(target.to_string()),
4412                        None,
4413                    )
4414                    .await?;
4415                count += 1;
4416            }
4417        }
4418
4419        let mut res = HashMap::new();
4420        res.insert("count".to_string(), Value::Int(count as i64));
4421        Ok(vec![res])
4422    }
4423
4424    /// Imports data from Parquet file to a label or edge type.
4425    ///
4426    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4427    pub(crate) async fn execute_parquet_import(
4428        &self,
4429        target: &str,
4430        source: &str,
4431        options: &HashMap<String, Value>,
4432        _prop_manager: &PropertyManager,
4433    ) -> Result<Vec<HashMap<String, Value>>> {
4434        let writer_lock = self
4435            .writer
4436            .as_ref()
4437            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4438
4439        let schema = self.storage.schema_manager().schema();
4440
4441        // 1. Determine if target is Label or EdgeType
4442        let label_meta = schema.labels.get(target);
4443        let edge_meta = schema.edge_types.get(target);
4444
4445        if label_meta.is_none() && edge_meta.is_none() {
4446            return Err(anyhow!("Target '{}' not found in schema", target));
4447        }
4448
4449        // 2. Open Parquet - support both local and cloud URLs
4450        let reader = if is_cloud_url(source) {
4451            self.open_parquet_from_cloud(source).await?
4452        } else {
4453            // Validate local source path against sandbox
4454            let validated_source = self.validate_path(source)?;
4455            let file = std::fs::File::open(&validated_source)?;
4456            let builder =
4457                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4458            builder.build()?
4459        };
4460        let mut reader = reader;
4461
4462        let mut count = 0;
4463        let writer: &uni_store::Writer = writer_lock.as_ref();
4464
4465        if label_meta.is_some() {
4466            let target_props = schema
4467                .properties
4468                .get(target)
4469                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4470
4471            for batch in reader.by_ref() {
4472                let batch = batch?;
4473                let num_rows = batch.num_rows();
4474                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
4475                let vids = writer.allocate_vids(num_rows).await?;
4476                for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4477                    let mut props = HashMap::new();
4478                    for field in batch.schema().fields() {
4479                        let name = field.name();
4480                        if target_props.contains_key(name) {
4481                            let col = batch.column_by_name(name).unwrap();
4482                            if !col.is_null(row) {
4483                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4484                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4485                                let val =
4486                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4487                                props.insert(name.clone(), val);
4488                            }
4489                        }
4490                    }
4491                    writer
4492                        .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4493                        .await?;
4494                    count += 1;
4495                }
4496            }
4497        } else if let Some(meta) = edge_meta {
4498            let type_id = meta.id;
4499            let target_props = schema
4500                .properties
4501                .get(target)
4502                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4503
4504            let src_col = options
4505                .get("src_col")
4506                .and_then(|v| v.as_str())
4507                .unwrap_or("_src");
4508            let dst_col = options
4509                .get("dst_col")
4510                .and_then(|v| v.as_str())
4511                .unwrap_or("_dst");
4512
4513            for batch in reader {
4514                let batch = batch?;
4515                let num_rows = batch.num_rows();
4516                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
4517                let eids = writer.allocate_eids(num_rows).await?;
4518                for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4519                    let mut props = HashMap::new();
4520                    let mut src_vid = None;
4521                    let mut dst_vid = None;
4522
4523                    for field in batch.schema().fields() {
4524                        let name = field.name();
4525                        let col = batch.column_by_name(name).unwrap();
4526                        if col.is_null(row) {
4527                            continue;
4528                        }
4529
4530                        if name == src_col {
4531                            let val = Self::arrow_to_value(col.as_ref(), row);
4532                            src_vid = Some(Self::vid_from_value(&val)?);
4533                        } else if name == dst_col {
4534                            let val = Self::arrow_to_value(col.as_ref(), row);
4535                            dst_vid = Some(Self::vid_from_value(&val)?);
4536                        } else if let Some(pm) = target_props.get(name) {
4537                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4538                            let val =
4539                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4540                            props.insert(name.clone(), val);
4541                        }
4542                    }
4543
4544                    let src = src_vid
4545                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4546                    let dst = dst_vid.ok_or_else(|| {
4547                        anyhow!("Missing destination VID in column '{}'", dst_col)
4548                    })?;
4549
4550                    writer
4551                        .insert_edge(
4552                            src,
4553                            dst,
4554                            type_id,
4555                            eid,
4556                            props,
4557                            Some(target.to_string()),
4558                            None,
4559                        )
4560                        .await?;
4561                    count += 1;
4562                }
4563            }
4564        }
4565
4566        let mut res = HashMap::new();
4567        res.insert("count".to_string(), Value::Int(count as i64));
4568        Ok(vec![res])
4569    }
4570
4571    /// Opens a Parquet file from a cloud URL.
4572    ///
4573    /// Downloads the file to memory and creates a Parquet reader.
4574    async fn open_parquet_from_cloud(
4575        &self,
4576        source_url: &str,
4577    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4578        use object_store::ObjectStoreExt;
4579
4580        let (store, path) = build_store_from_url(source_url)?;
4581
4582        // Download file contents
4583        let bytes = store.get(&path).await?.bytes().await?;
4584
4585        // Create a Parquet reader from the bytes
4586        let reader = bytes::Bytes::from(bytes.to_vec());
4587        let builder =
4588            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4589        Ok(builder.build()?)
4590    }
4591
4592    pub(crate) async fn scan_edge_type(
4593        &self,
4594        edge_type: &str,
4595        ctx: Option<&QueryContext>,
4596    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4597        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4598
4599        // 1. Scan L2 (Base)
4600        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4601
4602        // 2. Scan L1 (Delta)
4603        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4604
4605        // 3. Scan L0 (Memory) and filter tombstoned vertices
4606        if let Some(ctx) = ctx {
4607            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4608            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4609        }
4610
4611        Ok(edges
4612            .into_iter()
4613            .map(|(eid, (src, dst))| (eid, src, dst))
4614            .collect())
4615    }
4616
4617    /// Scan L2 (base) storage for edges of a given type.
4618    ///
4619    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4620    /// This L2 scan will typically find no data.
4621    pub(crate) async fn scan_edge_type_l2(
4622        &self,
4623        _edge_type: &str,
4624        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4625    ) -> Result<()> {
4626        // Edges are now stored in delta datasets (L1) via LanceDB.
4627        // Legacy L2 base edge storage is no longer used.
4628        Ok(())
4629    }
4630
4631    /// Scan L1 (delta) storage for edges of a given type.
4632    pub(crate) async fn scan_edge_type_l1(
4633        &self,
4634        edge_type: &str,
4635        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4636    ) -> Result<()> {
4637        if let Ok(Some(batch)) = self
4638            .storage
4639            .scan_delta_table(
4640                edge_type,
4641                "fwd",
4642                &["eid", "src_vid", "dst_vid", "op", "_version"],
4643                None,
4644            )
4645            .await
4646        {
4647            // Collect ops with versions: eid -> (version, op, src, dst)
4648            let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4649                HashMap::new();
4650
4651            self.process_delta_batch(&batch, &mut versioned_ops)?;
4652
4653            // Apply the winning ops
4654            for (eid, (_, op, src, dst)) in versioned_ops {
4655                if op == 0 {
4656                    edges.insert(eid, (src, dst));
4657                } else if op == 1 {
4658                    edges.remove(&eid);
4659                }
4660            }
4661        }
4662        Ok(())
4663    }
4664
4665    /// Process a delta batch, tracking versioned operations.
4666    pub(crate) fn process_delta_batch(
4667        &self,
4668        batch: &arrow_array::RecordBatch,
4669        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4670    ) -> Result<()> {
4671        use arrow_array::UInt64Array;
4672        let eid_col = batch
4673            .column_by_name("eid")
4674            .ok_or(anyhow!("Missing eid"))?
4675            .as_any()
4676            .downcast_ref::<UInt64Array>()
4677            .ok_or(anyhow!("Invalid eid"))?;
4678        let src_col = batch
4679            .column_by_name("src_vid")
4680            .ok_or(anyhow!("Missing src_vid"))?
4681            .as_any()
4682            .downcast_ref::<UInt64Array>()
4683            .ok_or(anyhow!("Invalid src_vid"))?;
4684        let dst_col = batch
4685            .column_by_name("dst_vid")
4686            .ok_or(anyhow!("Missing dst_vid"))?
4687            .as_any()
4688            .downcast_ref::<UInt64Array>()
4689            .ok_or(anyhow!("Invalid dst_vid"))?;
4690        let op_col = batch
4691            .column_by_name("op")
4692            .ok_or(anyhow!("Missing op"))?
4693            .as_any()
4694            .downcast_ref::<arrow_array::UInt8Array>()
4695            .ok_or(anyhow!("Invalid op"))?;
4696        let version_col = batch
4697            .column_by_name("_version")
4698            .ok_or(anyhow!("Missing _version"))?
4699            .as_any()
4700            .downcast_ref::<UInt64Array>()
4701            .ok_or(anyhow!("Invalid _version"))?;
4702
4703        for i in 0..batch.num_rows() {
4704            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4705            let version = version_col.value(i);
4706            let op = op_col.value(i);
4707            let src = Vid::from(src_col.value(i));
4708            let dst = Vid::from(dst_col.value(i));
4709
4710            match versioned_ops.entry(eid) {
4711                std::collections::hash_map::Entry::Vacant(e) => {
4712                    e.insert((version, op, src, dst));
4713                }
4714                std::collections::hash_map::Entry::Occupied(mut e) => {
4715                    if version > e.get().0 {
4716                        e.insert((version, op, src, dst));
4717                    }
4718                }
4719            }
4720        }
4721        Ok(())
4722    }
4723
4724    /// Scan L0 (memory) buffers for edges of a given type.
4725    pub(crate) fn scan_edge_type_l0(
4726        &self,
4727        edge_type: &str,
4728        ctx: &QueryContext,
4729        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4730    ) {
4731        let schema = self.storage.schema_manager().schema();
4732        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4733
4734        if let Some(type_id) = type_id {
4735            // Main L0
4736            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4737
4738            // Transaction L0
4739            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4740                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4741            }
4742
4743            // Pending flush L0s
4744            for pending_l0_arc in &ctx.pending_flush_l0s {
4745                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4746            }
4747        }
4748    }
4749
4750    /// Scan a single L0 buffer for edges and apply tombstones.
4751    pub(crate) fn scan_single_l0(
4752        &self,
4753        l0: &uni_store::runtime::L0Buffer,
4754        type_id: u32,
4755        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4756    ) {
4757        for edge_entry in l0.graph.edges() {
4758            if edge_entry.edge_type == type_id {
4759                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4760            }
4761        }
4762        // Process Tombstones
4763        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4764        for eid in eids_to_check {
4765            if l0.is_tombstoned(eid) {
4766                edges.remove(&eid);
4767            }
4768        }
4769    }
4770
4771    /// Filter out edges connected to tombstoned vertices.
4772    pub(crate) fn filter_tombstoned_vertex_edges(
4773        &self,
4774        ctx: &QueryContext,
4775        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4776    ) {
4777        let l0 = ctx.l0.read();
4778        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4779
4780        // Include tx_l0 vertex tombstones if present
4781        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4782            let tx_l0 = tx_l0_arc.read();
4783            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4784        }
4785
4786        // Include pending flush L0 vertex tombstones
4787        for pending_l0_arc in &ctx.pending_flush_l0s {
4788            let pending_l0 = pending_l0_arc.read();
4789            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4790        }
4791
4792        edges.retain(|_, (src, dst)| {
4793            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4794        });
4795    }
4796
4797    /// Execute a projection operation.
4798    pub(crate) async fn execute_project(
4799        &self,
4800        input_rows: Vec<HashMap<String, Value>>,
4801        projections: &[(Expr, Option<String>)],
4802        prop_manager: &PropertyManager,
4803        params: &HashMap<String, Value>,
4804        ctx: Option<&QueryContext>,
4805    ) -> Result<Vec<HashMap<String, Value>>> {
4806        let mut results = Vec::new();
4807        for m in input_rows {
4808            let mut row = HashMap::new();
4809            for (expr, alias) in projections {
4810                let val = self
4811                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4812                    .await?;
4813                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4814                row.insert(name, val);
4815            }
4816            results.push(row);
4817        }
4818        Ok(results)
4819    }
4820
4821    /// Execute an UNWIND operation.
4822    pub(crate) async fn execute_unwind(
4823        &self,
4824        input_rows: Vec<HashMap<String, Value>>,
4825        expr: &Expr,
4826        variable: &str,
4827        prop_manager: &PropertyManager,
4828        params: &HashMap<String, Value>,
4829        ctx: Option<&QueryContext>,
4830    ) -> Result<Vec<HashMap<String, Value>>> {
4831        let mut results = Vec::new();
4832        for row in input_rows {
4833            let val = self
4834                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4835                .await?;
4836            if let Value::List(items) = val {
4837                for item in items {
4838                    let mut new_row = row.clone();
4839                    new_row.insert(variable.to_string(), item);
4840                    results.push(new_row);
4841                }
4842            }
4843        }
4844        Ok(results)
4845    }
4846
4847    /// Execute an APPLY (correlated subquery) operation.
4848    pub(crate) async fn execute_apply(
4849        &self,
4850        input_rows: Vec<HashMap<String, Value>>,
4851        subquery: &LogicalPlan,
4852        input_filter: Option<&Expr>,
4853        prop_manager: &PropertyManager,
4854        params: &HashMap<String, Value>,
4855        ctx: Option<&QueryContext>,
4856    ) -> Result<Vec<HashMap<String, Value>>> {
4857        let mut filtered_rows = input_rows;
4858
4859        if let Some(filter) = input_filter {
4860            let mut filtered = Vec::new();
4861            for row in filtered_rows {
4862                let res = self
4863                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4864                    .await?;
4865                if res.as_bool().unwrap_or(false) {
4866                    filtered.push(row);
4867                }
4868            }
4869            filtered_rows = filtered;
4870        }
4871
4872        // Handle empty input: execute subquery once with empty context
4873        // This is critical for standalone CALL statements at the beginning of a query
4874        if filtered_rows.is_empty() {
4875            let sub_rows = self
4876                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4877                .await?;
4878            return Ok(sub_rows);
4879        }
4880
4881        let mut results = Vec::new();
4882        for row in filtered_rows {
4883            let mut sub_params = params.clone();
4884            sub_params.extend(row.clone());
4885
4886            let sub_rows = self
4887                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4888                .await?;
4889
4890            for sub_row in sub_rows {
4891                let mut new_row = row.clone();
4892                new_row.extend(sub_row);
4893                results.push(new_row);
4894            }
4895        }
4896        Ok(results)
4897    }
4898
4899    /// Execute SHOW INDEXES command.
4900    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4901        let schema = self.storage.schema_manager().schema();
4902        let mut rows = Vec::new();
4903        for idx in &schema.indexes {
4904            let (name, type_str, details) = match idx {
4905                uni_common::core::schema::IndexDefinition::Vector(c) => (
4906                    c.name.clone(),
4907                    "VECTOR",
4908                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4909                ),
4910                uni_common::core::schema::IndexDefinition::FullText(c) => (
4911                    c.name.clone(),
4912                    "FULLTEXT",
4913                    format!("on {}:{:?}", c.label, c.properties),
4914                ),
4915                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4916                    cfg.name.clone(),
4917                    "SCALAR",
4918                    format!(":{}({:?})", cfg.label, cfg.properties),
4919                ),
4920                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4921            };
4922
4923            if let Some(f) = filter
4924                && f != type_str
4925            {
4926                continue;
4927            }
4928
4929            let mut row = HashMap::new();
4930            row.insert("name".to_string(), Value::String(name));
4931            row.insert("type".to_string(), Value::String(type_str.to_string()));
4932            row.insert("details".to_string(), Value::String(details));
4933            rows.push(row);
4934        }
4935        rows
4936    }
4937
4938    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4939        let mut row = HashMap::new();
4940        row.insert("name".to_string(), Value::String("uni".to_string()));
4941        // Could add storage path, etc.
4942        vec![row]
4943    }
4944
4945    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4946        // Placeholder as we don't easy access to config struct from here
4947        vec![]
4948    }
4949
4950    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4951        let snapshot = self
4952            .storage
4953            .snapshot_manager()
4954            .load_latest_snapshot()
4955            .await?;
4956        let mut results = Vec::new();
4957
4958        if let Some(snap) = snapshot {
4959            for (label, s) in &snap.vertices {
4960                let mut row = HashMap::new();
4961                row.insert("type".to_string(), Value::String("Label".to_string()));
4962                row.insert("name".to_string(), Value::String(label.clone()));
4963                row.insert("count".to_string(), Value::Int(s.count as i64));
4964                results.push(row);
4965            }
4966            for (edge, s) in &snap.edges {
4967                let mut row = HashMap::new();
4968                row.insert("type".to_string(), Value::String("Edge".to_string()));
4969                row.insert("name".to_string(), Value::String(edge.clone()));
4970                row.insert("count".to_string(), Value::Int(s.count as i64));
4971                results.push(row);
4972            }
4973        }
4974
4975        Ok(results)
4976    }
4977
4978    pub(crate) fn execute_show_constraints(
4979        &self,
4980        clause: ShowConstraints,
4981    ) -> Vec<HashMap<String, Value>> {
4982        let schema = self.storage.schema_manager().schema();
4983        let mut rows = Vec::new();
4984        for c in &schema.constraints {
4985            if let Some(target) = &clause.target {
4986                match (target, &c.target) {
4987                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4988                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4989                        if e1 == e2 => {}
4990                    _ => continue,
4991                }
4992            }
4993
4994            let mut row = HashMap::new();
4995            row.insert("name".to_string(), Value::String(c.name.clone()));
4996            let type_str = match c.constraint_type {
4997                ConstraintType::Unique { .. } => "UNIQUE",
4998                ConstraintType::Exists { .. } => "EXISTS",
4999                ConstraintType::Check { .. } => "CHECK",
5000                _ => "UNKNOWN",
5001            };
5002            row.insert("type".to_string(), Value::String(type_str.to_string()));
5003
5004            let target_str = match &c.target {
5005                ConstraintTarget::Label(l) => format!("(:{})", l),
5006                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
5007                _ => "UNKNOWN".to_string(),
5008            };
5009            row.insert("target".to_string(), Value::String(target_str));
5010
5011            rows.push(row);
5012        }
5013        rows
5014    }
5015
5016    /// Execute a MERGE operation.
5017    pub(crate) async fn execute_cross_join(
5018        &self,
5019        left: Box<LogicalPlan>,
5020        right: Box<LogicalPlan>,
5021        prop_manager: &PropertyManager,
5022        params: &HashMap<String, Value>,
5023        ctx: Option<&QueryContext>,
5024    ) -> Result<Vec<HashMap<String, Value>>> {
5025        let left_rows = self
5026            .execute_subplan(*left, prop_manager, params, ctx)
5027            .await?;
5028        let right_rows = self
5029            .execute_subplan(*right, prop_manager, params, ctx)
5030            .await?;
5031
5032        let mut results = Vec::new();
5033        for l in &left_rows {
5034            for r in &right_rows {
5035                let mut combined = l.clone();
5036                combined.extend(r.clone());
5037                results.push(combined);
5038            }
5039        }
5040        Ok(results)
5041    }
5042
5043    /// Execute a UNION operation with optional deduplication.
5044    pub(crate) async fn execute_union(
5045        &self,
5046        left: Box<LogicalPlan>,
5047        right: Box<LogicalPlan>,
5048        all: bool,
5049        prop_manager: &PropertyManager,
5050        params: &HashMap<String, Value>,
5051        ctx: Option<&QueryContext>,
5052    ) -> Result<Vec<HashMap<String, Value>>> {
5053        let mut left_rows = self
5054            .execute_subplan(*left, prop_manager, params, ctx)
5055            .await?;
5056        let mut right_rows = self
5057            .execute_subplan(*right, prop_manager, params, ctx)
5058            .await?;
5059
5060        left_rows.append(&mut right_rows);
5061
5062        if !all {
5063            let mut seen = HashSet::new();
5064            left_rows.retain(|row| {
5065                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5066                let key = format!("{sorted_row:?}");
5067                seen.insert(key)
5068            });
5069        }
5070        Ok(left_rows)
5071    }
5072
5073    /// Check if an index with the given name exists.
5074    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5075        let schema = self.storage.schema_manager().schema();
5076        schema.indexes.iter().any(|idx| match idx {
5077            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5078            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5079            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5080            _ => false,
5081        })
5082    }
5083
5084    pub(crate) async fn execute_export(
5085        &self,
5086        target: &str,
5087        source: &str,
5088        options: &HashMap<String, Value>,
5089        prop_manager: &PropertyManager,
5090        ctx: Option<&QueryContext>,
5091    ) -> Result<Vec<HashMap<String, Value>>> {
5092        let format = options
5093            .get("format")
5094            .and_then(|v| v.as_str())
5095            .unwrap_or("csv")
5096            .to_lowercase();
5097
5098        match format.as_str() {
5099            "csv" => {
5100                self.execute_csv_export(target, source, options, prop_manager, ctx)
5101                    .await
5102            }
5103            "parquet" => {
5104                self.execute_parquet_export(target, source, options, prop_manager, ctx)
5105                    .await
5106            }
5107            _ => Err(anyhow!("Unsupported export format: {}", format)),
5108        }
5109    }
5110
5111    pub(crate) async fn execute_csv_export(
5112        &self,
5113        target: &str,
5114        source: &str,
5115        options: &HashMap<String, Value>,
5116        prop_manager: &PropertyManager,
5117        ctx: Option<&QueryContext>,
5118    ) -> Result<Vec<HashMap<String, Value>>> {
5119        // Validate destination path against sandbox
5120        let validated_dest = self.validate_path(source)?;
5121
5122        let schema = self.storage.schema_manager().schema();
5123        let label_meta = schema.labels.get(target);
5124        let edge_meta = schema.edge_types.get(target);
5125
5126        if label_meta.is_none() && edge_meta.is_none() {
5127            return Err(anyhow!("Target '{}' not found in schema", target));
5128        }
5129
5130        let delimiter_str = options
5131            .get("delimiter")
5132            .and_then(|v| v.as_str())
5133            .unwrap_or(",");
5134        let delimiter = if delimiter_str.is_empty() {
5135            b','
5136        } else {
5137            delimiter_str.as_bytes()[0]
5138        };
5139        let has_header = options
5140            .get("header")
5141            .and_then(|v| v.as_bool())
5142            .unwrap_or(true);
5143
5144        let mut wtr = csv::WriterBuilder::new()
5145            .delimiter(delimiter)
5146            .from_path(&validated_dest)?;
5147
5148        let mut count = 0;
5149        // Empty properties map for labels/edge types without registered properties
5150        let empty_props = HashMap::new();
5151
5152        if let Some(meta) = label_meta {
5153            let label_id = meta.id;
5154            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5155            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5156            prop_names.sort();
5157
5158            let mut headers = vec!["_vid".to_string()];
5159            headers.extend(prop_names.clone());
5160
5161            if has_header {
5162                wtr.write_record(&headers)?;
5163            }
5164
5165            let vids = self
5166                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5167                .await?;
5168
5169            for vid in vids {
5170                let props = prop_manager
5171                    .get_all_vertex_props_with_ctx(vid, ctx)
5172                    .await?
5173                    .unwrap_or_default();
5174
5175                let mut row = Vec::with_capacity(headers.len());
5176                row.push(vid.to_string());
5177                for p_name in &prop_names {
5178                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5179                    row.push(self.format_csv_value(val));
5180                }
5181                wtr.write_record(&row)?;
5182                count += 1;
5183            }
5184        } else if let Some(meta) = edge_meta {
5185            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5186            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5187            prop_names.sort();
5188
5189            // Headers for Edge: _eid, _src, _dst, _type, ...props
5190            let mut headers = vec![
5191                "_eid".to_string(),
5192                "_src".to_string(),
5193                "_dst".to_string(),
5194                "_type".to_string(),
5195            ];
5196            headers.extend(prop_names.clone());
5197
5198            if has_header {
5199                wtr.write_record(&headers)?;
5200            }
5201
5202            let edges = self.scan_edge_type(target, ctx).await?;
5203
5204            for (eid, src, dst) in edges {
5205                let props = prop_manager
5206                    .get_all_edge_props_with_ctx(eid, ctx)
5207                    .await?
5208                    .unwrap_or_default();
5209
5210                let mut row = Vec::with_capacity(headers.len());
5211                row.push(eid.to_string());
5212                row.push(src.to_string());
5213                row.push(dst.to_string());
5214                row.push(meta.id.to_string());
5215
5216                for p_name in &prop_names {
5217                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5218                    row.push(self.format_csv_value(val));
5219                }
5220                wtr.write_record(&row)?;
5221                count += 1;
5222            }
5223        }
5224
5225        wtr.flush()?;
5226        let mut res = HashMap::new();
5227        res.insert("count".to_string(), Value::Int(count as i64));
5228        Ok(vec![res])
5229    }
5230
5231    /// Exports data to Parquet format.
5232    ///
5233    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
5234    pub(crate) async fn execute_parquet_export(
5235        &self,
5236        target: &str,
5237        destination: &str,
5238        _options: &HashMap<String, Value>,
5239        prop_manager: &PropertyManager,
5240        ctx: Option<&QueryContext>,
5241    ) -> Result<Vec<HashMap<String, Value>>> {
5242        let schema_manager = self.storage.schema_manager();
5243        let schema = schema_manager.schema();
5244        let label_meta = schema.labels.get(target);
5245        let edge_meta = schema.edge_types.get(target);
5246
5247        if label_meta.is_none() && edge_meta.is_none() {
5248            return Err(anyhow!("Target '{}' not found in schema", target));
5249        }
5250
5251        let arrow_schema = if label_meta.is_some() {
5252            let dataset = self.storage.vertex_dataset(target)?;
5253            dataset.get_arrow_schema(&schema)?
5254        } else {
5255            // Edge Schema
5256            let dataset = self.storage.edge_dataset(target, "", "")?;
5257            dataset.get_arrow_schema(&schema)?
5258        };
5259
5260        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5261
5262        if let Some(meta) = label_meta {
5263            let label_id = meta.id;
5264            let vids = self
5265                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5266                .await?;
5267
5268            for vid in vids {
5269                let mut props = prop_manager
5270                    .get_all_vertex_props_with_ctx(vid, ctx)
5271                    .await?
5272                    .unwrap_or_default();
5273
5274                props.insert(
5275                    "_vid".to_string(),
5276                    uni_common::Value::Int(vid.as_u64() as i64),
5277                );
5278                if !props.contains_key("_uid") {
5279                    props.insert(
5280                        "_uid".to_string(),
5281                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5282                    );
5283                }
5284                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5285                props.insert("_version".to_string(), uni_common::Value::Int(1));
5286                rows.push(props);
5287            }
5288        } else if edge_meta.is_some() {
5289            let edges = self.scan_edge_type(target, ctx).await?;
5290            for (eid, src, dst) in edges {
5291                let mut props = prop_manager
5292                    .get_all_edge_props_with_ctx(eid, ctx)
5293                    .await?
5294                    .unwrap_or_default();
5295
5296                props.insert(
5297                    "eid".to_string(),
5298                    uni_common::Value::Int(eid.as_u64() as i64),
5299                );
5300                props.insert(
5301                    "src_vid".to_string(),
5302                    uni_common::Value::Int(src.as_u64() as i64),
5303                );
5304                props.insert(
5305                    "dst_vid".to_string(),
5306                    uni_common::Value::Int(dst.as_u64() as i64),
5307                );
5308                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5309                props.insert("_version".to_string(), uni_common::Value::Int(1));
5310                rows.push(props);
5311            }
5312        }
5313
5314        // Write to cloud or local file
5315        if is_cloud_url(destination) {
5316            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5317                .await?;
5318        } else {
5319            // Validate local destination path against sandbox
5320            let validated_dest = self.validate_path(destination)?;
5321            let file = std::fs::File::create(&validated_dest)?;
5322            let mut writer =
5323                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5324
5325            // Write all in one batch for now (simplification)
5326            if !rows.is_empty() {
5327                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5328                writer.write(&batch)?;
5329            }
5330
5331            writer.close()?;
5332        }
5333
5334        let mut res = HashMap::new();
5335        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5336        Ok(vec![res])
5337    }
5338
5339    /// Writes Parquet data to a cloud storage destination.
5340    async fn write_parquet_to_cloud(
5341        &self,
5342        dest_url: &str,
5343        rows: &[HashMap<String, uni_common::Value>],
5344        arrow_schema: &arrow_schema::Schema,
5345    ) -> Result<()> {
5346        use object_store::ObjectStoreExt;
5347
5348        let (store, path) = build_store_from_url(dest_url)?;
5349
5350        // Write to an in-memory buffer
5351        let mut buffer = Vec::new();
5352        {
5353            let mut writer = parquet::arrow::ArrowWriter::try_new(
5354                &mut buffer,
5355                Arc::new(arrow_schema.clone()),
5356                None,
5357            )?;
5358
5359            if !rows.is_empty() {
5360                let batch = self.rows_to_batch(rows, arrow_schema)?;
5361                writer.write(&batch)?;
5362            }
5363
5364            writer.close()?;
5365        }
5366
5367        // Upload to cloud storage
5368        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5369
5370        Ok(())
5371    }
5372
5373    pub(crate) fn rows_to_batch(
5374        &self,
5375        rows: &[HashMap<String, uni_common::Value>],
5376        schema: &arrow_schema::Schema,
5377    ) -> Result<RecordBatch> {
5378        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5379
5380        for field in schema.fields() {
5381            let name = field.name();
5382            let dt = field.data_type();
5383
5384            let values: Vec<uni_common::Value> = rows
5385                .iter()
5386                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5387                .collect();
5388            let array = self.values_to_array(&values, dt)?;
5389            columns.push(array);
5390        }
5391
5392        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5393    }
5394
5395    /// Convert a slice of Values to an Arrow array.
5396    /// Delegates to the shared implementation in arrow_convert module.
5397    pub(crate) fn values_to_array(
5398        &self,
5399        values: &[uni_common::Value],
5400        dt: &arrow_schema::DataType,
5401    ) -> Result<Arc<dyn Array>> {
5402        arrow_convert::values_to_array(values, dt)
5403    }
5404
5405    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5406        match val {
5407            Value::Null => "".to_string(),
5408            Value::String(s) => s,
5409            Value::Int(i) => i.to_string(),
5410            Value::Float(f) => f.to_string(),
5411            Value::Bool(b) => b.to_string(),
5412            _ => format!("{val}"),
5413        }
5414    }
5415
5416    pub(crate) fn parse_csv_value(
5417        &self,
5418        s: &str,
5419        data_type: &uni_common::core::schema::DataType,
5420        prop_name: &str,
5421    ) -> Result<Value> {
5422        if s.is_empty() || s.to_lowercase() == "null" {
5423            return Ok(Value::Null);
5424        }
5425
5426        use uni_common::core::schema::DataType;
5427        match data_type {
5428            DataType::String => Ok(Value::String(s.to_string())),
5429            DataType::Int32 | DataType::Int64 => {
5430                let i = s.parse::<i64>().map_err(|_| {
5431                    anyhow!(
5432                        "Failed to parse integer for property '{}': {}",
5433                        prop_name,
5434                        s
5435                    )
5436                })?;
5437                Ok(Value::Int(i))
5438            }
5439            DataType::Float32 | DataType::Float64 => {
5440                let f = s.parse::<f64>().map_err(|_| {
5441                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5442                })?;
5443                Ok(Value::Float(f))
5444            }
5445            DataType::Bool => {
5446                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5447                    anyhow!(
5448                        "Failed to parse boolean for property '{}': {}",
5449                        prop_name,
5450                        s
5451                    )
5452                })?;
5453                Ok(Value::Bool(b))
5454            }
5455            DataType::CypherValue => {
5456                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5457                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5458                })?;
5459                Ok(Value::from(json_val))
5460            }
5461            DataType::Vector { .. } => {
5462                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5463                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5464                })?;
5465                Ok(Value::Vector(v))
5466            }
5467            _ => Ok(Value::String(s.to_string())),
5468        }
5469    }
5470
5471    pub(crate) async fn detach_delete_vertex(
5472        &self,
5473        vid: Vid,
5474        writer: &Writer,
5475        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5476    ) -> Result<()> {
5477        let schema = self.storage.schema_manager().schema();
5478        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5479
5480        // 1. Find and delete all outgoing edges
5481        let out_graph = self
5482            .storage
5483            .load_subgraph_cached(
5484                &[vid],
5485                &edge_type_ids,
5486                1,
5487                uni_store::runtime::Direction::Outgoing,
5488                Some(writer.l0_manager.get_current()),
5489            )
5490            .await?;
5491
5492        for edge in out_graph.edges() {
5493            writer
5494                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5495                .await?;
5496        }
5497
5498        // 2. Find and delete all incoming edges
5499        let in_graph = self
5500            .storage
5501            .load_subgraph_cached(
5502                &[vid],
5503                &edge_type_ids,
5504                1,
5505                uni_store::runtime::Direction::Incoming,
5506                Some(writer.l0_manager.get_current()),
5507            )
5508            .await?;
5509
5510        for edge in in_graph.edges() {
5511            writer
5512                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5513                .await?;
5514        }
5515
5516        Ok(())
5517    }
5518
5519    /// Load outgoing + incoming subgraphs (1-hop) for a batch of vertices
5520    /// in two scans — one per direction. Shared infrastructure for
5521    /// `batch_detach_delete_vertices` (which deletes the loaded edges) and
5522    /// `batch_check_vertices_have_no_edges` (which errors if any
5523    /// non-tombstoned edges exist).
5524    ///
5525    /// Returns raw `WorkingGraph`s with **no tombstone filtering**;
5526    /// callers that care about same-batch edge deletes (e.g., the
5527    /// non-detach check that runs after edges have been individually
5528    /// DELETEd) layer the filter on top.
5529    async fn batch_load_incident_edges(
5530        &self,
5531        vids: &[Vid],
5532        writer: &Writer,
5533    ) -> Result<(
5534        uni_store::runtime::working_graph::WorkingGraph,
5535        uni_store::runtime::working_graph::WorkingGraph,
5536    )> {
5537        let schema = self.storage.schema_manager().schema();
5538        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5539        let l0 = Some(writer.l0_manager.get_current());
5540
5541        let out_graph = self
5542            .storage
5543            .load_subgraph_cached(
5544                vids,
5545                &edge_type_ids,
5546                1,
5547                uni_store::runtime::Direction::Outgoing,
5548                l0.clone(),
5549            )
5550            .await?;
5551        let in_graph = self
5552            .storage
5553            .load_subgraph_cached(
5554                vids,
5555                &edge_type_ids,
5556                1,
5557                uni_store::runtime::Direction::Incoming,
5558                l0,
5559            )
5560            .await?;
5561        Ok((out_graph, in_graph))
5562    }
5563
5564    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5565    pub(crate) async fn batch_detach_delete_vertices(
5566        &self,
5567        vids: &[Vid],
5568        labels_per_vid: Vec<Option<Vec<String>>>,
5569        writer: &Writer,
5570        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5571    ) -> Result<()> {
5572        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5573
5574        for edge in out_graph.edges() {
5575            writer
5576                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5577                .await?;
5578        }
5579        for edge in in_graph.edges() {
5580            writer
5581                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5582                .await?;
5583        }
5584
5585        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5586            writer.delete_vertex(*vid, labels, tx_l0).await?;
5587        }
5588
5589        Ok(())
5590    }
5591
5592    /// Batch non-detach DELETE check: verify NO incident edges exist for
5593    /// any VID in `vids`, modulo tombstones from the writer's L0 and the
5594    /// tx-local L0 (same set the per-VID `check_vertex_has_no_edges`
5595    /// consults at write.rs:2650-2673).
5596    ///
5597    /// Why this exists: the per-VID `check_vertex_has_no_edges` does two
5598    /// `load_subgraph_cached(&[vid], ...)` calls per vertex (outgoing +
5599    /// incoming). At N vertices that's 2N subgraph loads. The batched
5600    /// path collapses to 2 loads regardless of N.
5601    ///
5602    /// Tombstone visibility: detach (which deletes everything) doesn't
5603    /// need the tombstone filter; non-detach does, because the caller
5604    /// may have already DELETEd some incident edges in the same
5605    /// statement (e.g. `MATCH (a)-[r]->(b) DELETE r, a` — edges are
5606    /// deleted before nodes per mutation_common.rs:720-724, so by the
5607    /// time this check runs, `r`'s eid is in `tx_l0.tombstones` and
5608    /// must be excluded from the "still has edges" set).
5609    pub(crate) async fn batch_check_vertices_have_no_edges(
5610        &self,
5611        vids: &[Vid],
5612        writer: &Writer,
5613        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5614    ) -> Result<()> {
5615        if vids.is_empty() {
5616            return Ok(());
5617        }
5618
5619        let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5620            std::collections::HashSet::new();
5621        {
5622            let writer_l0 = writer.l0_manager.get_current();
5623            let guard = writer_l0.read();
5624            for &eid in guard.tombstones.keys() {
5625                tombstoned_eids.insert(eid);
5626            }
5627        }
5628        if let Some(tx) = tx_l0 {
5629            let guard = tx.read();
5630            for &eid in guard.tombstones.keys() {
5631                tombstoned_eids.insert(eid);
5632            }
5633        }
5634
5635        let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5636
5637        for edge in out_graph.edges().chain(in_graph.edges()) {
5638            if !tombstoned_eids.contains(&edge.eid) {
5639                return Err(anyhow!(
5640                    "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5641                    edge.src_vid
5642                ));
5643            }
5644        }
5645        Ok(())
5646    }
5647}