Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55// DataFusion engine imports
56use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73/// Collect VIDs from all L0 buffers visible to a query context.
74///
75/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
76/// collects the results. Returns an empty vec when no query context is present.
77fn collect_l0_vids(
78    ctx: Option<&QueryContext>,
79    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81    let mut vids = Vec::new();
82    if let Some(ctx) = ctx {
83        vids.extend(extractor(&ctx.l0.read()));
84        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85            vids.extend(extractor(&tx_l0_arc.read()));
86        }
87        for pending_l0_arc in &ctx.pending_flush_l0s {
88            vids.extend(extractor(&pending_l0_arc.read()));
89        }
90    }
91    vids
92}
93
94/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
95///
96/// This is the fallback for pushdown hydration - if the entity only has system fields
97/// (indicating pushdown didn't load properties), we load all properties here.
98///
99/// System field counts:
100/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
101/// - Vertex: 3 fields (_vid, _label, _uid)
102async fn hydrate_entity_if_needed(
103    map: &mut HashMap<String, Value>,
104    prop_manager: &PropertyManager,
105    ctx: Option<&QueryContext>,
106) {
107    // Check for edge entity
108    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110            tracing::debug!(
111                "Pushdown fallback: hydrating edge {} at execution time",
112                eid_u64
113            );
114            if let Ok(Some(props)) = prop_manager
115                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116                .await
117            {
118                for (key, value) in props {
119                    map.entry(key).or_insert(value);
120                }
121            }
122        } else {
123            tracing::trace!(
124                "Pushdown success: edge {} already has {} properties",
125                eid_u64,
126                map.len() - EDGE_SYSTEM_FIELD_COUNT
127            );
128        }
129        return;
130    }
131
132    // Check for vertex entity
133    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135            tracing::debug!(
136                "Pushdown fallback: hydrating vertex {} at execution time",
137                vid_u64
138            );
139            if let Ok(Some(props)) = prop_manager
140                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141                .await
142            {
143                for (key, value) in props {
144                    map.entry(key).or_insert(value);
145                }
146            }
147        } else {
148            tracing::trace!(
149                "Pushdown success: vertex {} already has {} properties",
150                vid_u64,
151                map.len() - VERTEX_SYSTEM_FIELD_COUNT
152            );
153        }
154    }
155}
156
157impl Executor {
158    /// Helper to verify and filter candidates against an optional predicate.
159    ///
160    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
161    /// Returns only VIDs that pass the filter (or are not deleted).
162    async fn verify_and_filter_candidates(
163        &self,
164        mut candidates: Vec<Vid>,
165        variable: &str,
166        filter: Option<&Expr>,
167        ctx: Option<&QueryContext>,
168        prop_manager: &PropertyManager,
169        params: &HashMap<String, Value>,
170    ) -> Result<Vec<Vid>> {
171        candidates.sort_unstable();
172        candidates.dedup();
173
174        let mut verified_vids = Vec::new();
175        for vid in candidates {
176            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
177                continue; // Deleted
178            };
179
180            if let Some(expr) = filter {
181                let mut props_map: HashMap<String, Value> = props;
182                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
183
184                let mut row = HashMap::new();
185                row.insert(variable.to_string(), Value::Map(props_map));
186
187                let res = self
188                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
189                    .await?;
190                if res.as_bool().unwrap_or(false) {
191                    verified_vids.push(vid);
192                }
193            } else {
194                verified_vids.push(vid);
195            }
196        }
197
198        Ok(verified_vids)
199    }
200
201    pub(crate) async fn scan_storage_candidates(
202        &self,
203        label_id: u16,
204        variable: &str,
205        filter: Option<&Expr>,
206    ) -> Result<Vec<Vid>> {
207        let schema = self.storage.schema_manager().schema();
208        let label_name = schema
209            .label_name_by_id(label_id)
210            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
211
212        // Generate filter SQL from expression
213        let empty_props = std::collections::HashMap::new();
214        let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
215        let filter_sql = filter.and_then(|expr| {
216            LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
217        });
218
219        self.storage
220            .scan_vertex_candidates(label_name, filter_sql.as_deref())
221            .await
222    }
223
224    pub(crate) async fn scan_label_with_filter(
225        &self,
226        label_id: u16,
227        variable: &str,
228        filter: Option<&Expr>,
229        ctx: Option<&QueryContext>,
230        prop_manager: &PropertyManager,
231        params: &HashMap<String, Value>,
232    ) -> Result<Vec<Vid>> {
233        let mut candidates = self
234            .scan_storage_candidates(label_id, variable, filter)
235            .await?;
236
237        // Convert label_id to label_name for L0 lookup
238        let schema = self.storage.schema_manager().schema();
239        if let Some(label_name) = schema.label_name_by_id(label_id) {
240            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
241        }
242
243        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
244            .await
245    }
246
247    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
248        // Handle Value::Node directly (has vid field)
249        if let Value::Node(node) = val {
250            return Ok(node.vid);
251        }
252        // Handle Object (node) containing _vid field
253        if let Value::Map(map) = val
254            && let Some(vid_val) = map.get("_vid")
255            && let Some(v) = vid_val.as_u64()
256        {
257            return Ok(Vid::from(v));
258        }
259        // Handle string format
260        if let Some(s) = val.as_str()
261            && let Ok(id) = s.parse::<u64>()
262        {
263            return Ok(Vid::new(id));
264        }
265        // Handle raw u64
266        if let Some(v) = val.as_u64() {
267            return Ok(Vid::from(v));
268        }
269        Err(anyhow!("Invalid Vid format: {:?}", val))
270    }
271
272    /// Find a node value in the row by VID.
273    ///
274    /// Scans all values in the row, looking for a node (Map or Node) whose VID
275    /// matches the target. Returns the full node value if found, or a minimal
276    /// Map with just `_vid` as fallback.
277    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
278        for val in row.values() {
279            if let Ok(vid) = Self::vid_from_value(val)
280                && vid == target_vid
281            {
282                return val.clone();
283            }
284        }
285        // Fallback: return minimal node map
286        Value::Map(HashMap::from([(
287            "_vid".to_string(),
288            Value::Int(target_vid.as_u64() as i64),
289        )]))
290    }
291
292    /// Create L0 context, session, and planner for DataFusion execution.
293    ///
294    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
295    /// Returns `(session_ctx, planner, prop_manager_arc)`.
296    pub async fn create_datafusion_planner(
297        &self,
298        prop_manager: &PropertyManager,
299        params: &HashMap<String, Value>,
300    ) -> Result<(
301        Arc<SyncRwLock<SessionContext>>,
302        HybridPhysicalPlanner,
303        Arc<PropertyManager>,
304    )> {
305        let query_ctx = self.get_context().await;
306        let l0_context = match query_ctx {
307            Some(ref ctx) => L0Context::from_query_context(ctx),
308            None => L0Context::empty(),
309        };
310
311        let prop_manager_arc = Arc::new(PropertyManager::new(
312            self.storage.clone(),
313            self.storage.schema_manager_arc(),
314            prop_manager.cache_size(),
315        ));
316
317        let session = SessionContext::new();
318        crate::query::df_udfs::register_cypher_udfs(&session)?;
319        if let Some(ref registry) = self.custom_function_registry {
320            crate::query::df_udfs::register_custom_udfs(&session, registry)?;
321        }
322        let session_ctx = Arc::new(SyncRwLock::new(session));
323
324        let mut planner = HybridPhysicalPlanner::with_l0_context(
325            session_ctx.clone(),
326            self.storage.clone(),
327            l0_context,
328            prop_manager_arc.clone(),
329            self.storage.schema_manager().schema(),
330            params.clone(),
331            HashMap::new(),
332        );
333
334        planner = planner.with_algo_registry(self.algo_registry.clone());
335        if let Some(ref registry) = self.procedure_registry {
336            planner = planner.with_procedure_registry(registry.clone());
337        }
338        if let Some(ref xervo_runtime) = self.xervo_runtime {
339            planner = planner.with_xervo_runtime(xervo_runtime.clone());
340        }
341
342        Ok((session_ctx, planner, prop_manager_arc))
343    }
344
345    /// Execute a DataFusion physical plan and collect all result batches.
346    pub fn collect_batches(
347        session_ctx: &Arc<SyncRwLock<SessionContext>>,
348        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
349    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
350        Box::pin(async move {
351            use futures::TryStreamExt;
352
353            let task_ctx = session_ctx.read().task_ctx();
354            let partition_count = execution_plan.output_partitioning().partition_count();
355            let mut all_batches = Vec::new();
356            for partition in 0..partition_count {
357                let stream = execution_plan.execute(partition, task_ctx.clone())?;
358                let batches: Vec<RecordBatch> = stream.try_collect().await?;
359                all_batches.extend(batches);
360            }
361            Ok(all_batches)
362        })
363    }
364
365    /// Executes a query using the DataFusion-based engine.
366    ///
367    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
368    /// trees with custom graph operators for graph-specific operations.
369    pub async fn execute_datafusion(
370        &self,
371        plan: LogicalPlan,
372        prop_manager: &PropertyManager,
373        params: &HashMap<String, Value>,
374    ) -> Result<Vec<RecordBatch>> {
375        let (batches, _plan) = self
376            .execute_datafusion_with_plan(plan, prop_manager, params)
377            .await?;
378        Ok(batches)
379    }
380
381    /// Executes a query using the DataFusion-based engine, returning both
382    /// result batches and the physical execution plan.
383    ///
384    /// The returned `Arc<dyn ExecutionPlan>` can be walked to extract per-operator
385    /// metrics (e.g., `output_rows`, `elapsed_compute`) that DataFusion's
386    /// `BaselineMetrics` recorded during execution.
387    pub async fn execute_datafusion_with_plan(
388        &self,
389        plan: LogicalPlan,
390        prop_manager: &PropertyManager,
391        params: &HashMap<String, Value>,
392    ) -> Result<(
393        Vec<RecordBatch>,
394        Arc<dyn datafusion::physical_plan::ExecutionPlan>,
395    )> {
396        let (session_ctx, mut planner, prop_manager_arc) =
397            self.create_datafusion_planner(prop_manager, params).await?;
398
399        // Build MutationContext when the plan contains write operations
400        if Self::contains_write_operations(&plan) {
401            let writer = self
402                .writer
403                .as_ref()
404                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
405                .clone();
406            let query_ctx = self.get_context().await;
407
408            debug_assert!(
409                query_ctx.is_some(),
410                "BUG: query_ctx is None for write operation"
411            );
412
413            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
414                executor: self.clone(),
415                writer,
416                prop_manager: prop_manager_arc,
417                params: params.clone(),
418                query_ctx,
419                tx_l0_override: self.transaction_l0_override.clone(),
420            });
421            planner = planner.with_mutation_context(mutation_ctx);
422            tracing::debug!(
423                plan_type = Self::get_plan_type(&plan),
424                "Mutation routed to DataFusion engine"
425            );
426        }
427
428        let execution_plan = planner.plan(&plan)?;
429        let plan_clone = Arc::clone(&execution_plan);
430        let result = Self::collect_batches(&session_ctx, execution_plan).await;
431
432        // Harvest warnings from the graph execution context after query completion.
433        let graph_warnings = planner.graph_ctx().take_warnings();
434        if !graph_warnings.is_empty()
435            && let Ok(mut w) = self.warnings.lock()
436        {
437            w.extend(graph_warnings);
438        }
439
440        result.map(|batches| (batches, plan_clone))
441    }
442
443    /// Execute a MERGE read sub-plan through the DataFusion engine.
444    ///
445    /// Plans and executes the MERGE pattern match using flat columnar output
446    /// (no structural projections), then groups dotted columns (`a._vid`,
447    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
448    pub(crate) async fn execute_merge_read_plan(
449        &self,
450        plan: LogicalPlan,
451        prop_manager: &PropertyManager,
452        params: &HashMap<String, Value>,
453        merge_variables: Vec<String>,
454    ) -> Result<Vec<HashMap<String, Value>>> {
455        let (session_ctx, planner, _prop_manager_arc) =
456            self.create_datafusion_planner(prop_manager, params).await?;
457
458        // Plan with full property access ("*") for all merge variables so that
459        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
460        let extra: HashMap<String, HashSet<String>> = merge_variables
461            .iter()
462            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
463            .collect();
464        let execution_plan = planner.plan_with_properties(&plan, extra)?;
465        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
466
467        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
468        let flat_rows = self.record_batches_to_rows(all_batches)?;
469
470        // Group dotted columns into per-variable Maps for MERGE's match logic.
471        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
472        let rows = flat_rows
473            .into_iter()
474            .map(|mut row| {
475                for var in &merge_variables {
476                    // Skip if already materialized (e.g., by record_batches_to_rows)
477                    if row.contains_key(var) {
478                        continue;
479                    }
480                    let prefix = format!("{}.", var);
481                    let dotted_keys: Vec<String> = row
482                        .keys()
483                        .filter(|k| k.starts_with(&prefix))
484                        .cloned()
485                        .collect();
486                    if !dotted_keys.is_empty() {
487                        let mut map = HashMap::new();
488                        for key in dotted_keys {
489                            let prop_name = key[prefix.len()..].to_string();
490                            if let Some(val) = row.remove(&key) {
491                                map.insert(prop_name, val);
492                            }
493                        }
494                        row.insert(var.clone(), Value::Map(map));
495                    }
496                }
497                row
498            })
499            .collect();
500
501        Ok(rows)
502    }
503
504    /// Converts DataFusion RecordBatches to row-based HashMap format.
505    ///
506    /// Handles special metadata on fields:
507    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
508    ///
509    /// Also normalizes path structures to user-facing format (converts _vid to _id).
510    pub(crate) fn record_batches_to_rows(
511        &self,
512        batches: Vec<RecordBatch>,
513    ) -> Result<Vec<HashMap<String, Value>>> {
514        let mut rows = Vec::new();
515
516        for batch in batches {
517            let num_rows = batch.num_rows();
518            let schema = batch.schema();
519
520            for row_idx in 0..num_rows {
521                let mut row = HashMap::new();
522
523                for (col_idx, field) in schema.fields().iter().enumerate() {
524                    let column = batch.column(col_idx);
525                    // Infer Uni DataType from Arrow type for DateTime/Time struct decoding
526                    let data_type =
527                        if uni_common::core::schema::is_datetime_struct(field.data_type()) {
528                            Some(&uni_common::DataType::DateTime)
529                        } else if uni_common::core::schema::is_time_struct(field.data_type()) {
530                            Some(&uni_common::DataType::Time)
531                        } else {
532                            None
533                        };
534                    let mut value =
535                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
536
537                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
538                    // Parse JSON string to restore the original type
539                    if field
540                        .metadata()
541                        .get("cv_encoded")
542                        .is_some_and(|v| v == "true")
543                        && let Value::String(s) = &value
544                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
545                    {
546                        value = Value::from(parsed);
547                    }
548
549                    // Normalize path structures to user-facing format
550                    value = Self::normalize_path_if_needed(value);
551
552                    row.insert(field.name().clone(), value);
553                }
554
555                // Merge system fields into bare variable maps.
556                // The projection step emits helper columns like "n._vid" and "n._labels"
557                // alongside the materialized "n" column (a Map of user properties).
558                // Here we merge those system fields into the map and remove the helpers.
559                //
560                // For search procedures (vector/FTS), the bare variable may be a VID
561                // string placeholder rather than a Map. In that case, promote it to a
562                // Map so we can merge system fields and properties into it.
563                let bare_vars: Vec<String> = row
564                    .keys()
565                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
566                    .cloned()
567                    .collect();
568
569                // Detect VID-placeholder variables that have system columns
570                // (e.g., "node" is String("1") but "node._vid" exists).
571                // Promote these to Maps so system fields can be merged in.
572                let vid_placeholder_vars: Vec<String> = row
573                    .keys()
574                    .filter(|k| {
575                        !k.contains('.')
576                            && matches!(row.get(*k), Some(Value::String(_)))
577                            && row.contains_key(&format!("{}._vid", k))
578                    })
579                    .cloned()
580                    .collect();
581
582                for var in &vid_placeholder_vars {
583                    // Build a Map from system and property columns
584                    let prefix = format!("{}.", var);
585                    let mut map = HashMap::new();
586
587                    let dotted_keys: Vec<String> = row
588                        .keys()
589                        .filter(|k| k.starts_with(&prefix))
590                        .cloned()
591                        .collect();
592
593                    for key in &dotted_keys {
594                        let prop_name = &key[prefix.len()..];
595                        if let Some(val) = row.remove(key) {
596                            map.insert(prop_name.to_string(), val);
597                        }
598                    }
599
600                    // Replace the VID-string placeholder with the constructed Map
601                    row.insert(var.clone(), Value::Map(map));
602                }
603
604                for var in &bare_vars {
605                    // Merge node system fields (_vid, _labels)
606                    let vid_key = format!("{}._vid", var);
607                    let labels_key = format!("{}._labels", var);
608
609                    let vid_val = row.remove(&vid_key);
610                    let labels_val = row.remove(&labels_key);
611
612                    if let Some(Value::Map(map)) = row.get_mut(var) {
613                        if let Some(v) = vid_val {
614                            map.insert("_vid".to_string(), v);
615                        }
616                        if let Some(v) = labels_val {
617                            map.insert("_labels".to_string(), v);
618                        }
619                    }
620
621                    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
622                    // These are emitted as helper columns by the traverse exec.
623                    // The structural projection already includes them in the struct,
624                    // but we still need to remove the dotted helper columns.
625                    let eid_key = format!("{}._eid", var);
626                    let type_key = format!("{}._type", var);
627
628                    let eid_val = row.remove(&eid_key);
629                    let type_val = row.remove(&type_key);
630
631                    if (eid_val.is_some() || type_val.is_some())
632                        && let Some(Value::Map(map)) = row.get_mut(var)
633                    {
634                        if let Some(v) = eid_val {
635                            map.entry("_eid".to_string()).or_insert(v);
636                        }
637                        if let Some(v) = type_val {
638                            map.entry("_type".to_string()).or_insert(v);
639                        }
640                    }
641
642                    // Remove remaining dotted helper columns (e.g. _all_props, _src_vid, _dst_vid)
643                    let prefix = format!("{}.", var);
644                    let helper_keys: Vec<String> = row
645                        .keys()
646                        .filter(|k| k.starts_with(&prefix))
647                        .cloned()
648                        .collect();
649                    for key in helper_keys {
650                        row.remove(&key);
651                    }
652                }
653
654                rows.push(row);
655            }
656        }
657
658        Ok(rows)
659    }
660
661    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
662    ///
663    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
664    /// Other values are returned unchanged to avoid interfering with query execution.
665    fn normalize_path_if_needed(value: Value) -> Value {
666        match value {
667            Value::Map(map)
668                if map.contains_key("nodes")
669                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
670            {
671                Self::normalize_path_map(map)
672            }
673            other => other,
674        }
675    }
676
677    /// Normalize a path map object.
678    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
679        // Normalize nodes array
680        if let Some(Value::List(nodes)) = map.remove("nodes") {
681            let normalized_nodes: Vec<Value> = nodes
682                .into_iter()
683                .map(|n| {
684                    if let Value::Map(node_map) = n {
685                        Self::normalize_path_node_map(node_map)
686                    } else {
687                        n
688                    }
689                })
690                .collect();
691            map.insert("nodes".to_string(), Value::List(normalized_nodes));
692        }
693
694        // Normalize relationships array (may be called "relationships" or "edges")
695        let rels_key = if map.contains_key("relationships") {
696            "relationships"
697        } else {
698            "edges"
699        };
700        if let Some(Value::List(rels)) = map.remove(rels_key) {
701            let normalized_rels: Vec<Value> = rels
702                .into_iter()
703                .map(|r| {
704                    if let Value::Map(rel_map) = r {
705                        Self::normalize_path_edge_map(rel_map)
706                    } else {
707                        r
708                    }
709                })
710                .collect();
711            map.insert("relationships".to_string(), Value::List(normalized_rels));
712        }
713
714        Value::Map(map)
715    }
716
717    /// Convert a Value to its string representation for path normalization.
718    fn value_to_id_string(val: Value) -> String {
719        match val {
720            Value::Int(n) => n.to_string(),
721            Value::Float(n) => n.to_string(),
722            Value::String(s) => s,
723            other => other.to_string(),
724        }
725    }
726
727    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
728    /// When `src_key == dst_key`, this simply stringifies the value in place.
729    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
730        if let Some(val) = map.remove(src_key) {
731            map.insert(
732                dst_key.to_string(),
733                Value::String(Self::value_to_id_string(val)),
734            );
735        }
736    }
737
738    /// Ensure the "properties" field is a non-null map.
739    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
740        match map.get("properties") {
741            Some(props) if !props.is_null() => {}
742            _ => {
743                map.insert("properties".to_string(), Value::Map(HashMap::new()));
744            }
745        }
746    }
747
748    /// Normalize a node within a path to user-facing format.
749    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
750        Self::stringify_map_field(&mut map, "_vid", "_id");
751        Self::ensure_properties_map(&mut map);
752        Value::Map(map)
753    }
754
755    /// Normalize an edge within a path to user-facing format.
756    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
757        Self::stringify_map_field(&mut map, "_eid", "_id");
758        Self::stringify_map_field(&mut map, "_src", "_src");
759        Self::stringify_map_field(&mut map, "_dst", "_dst");
760
761        if let Some(type_name) = map.remove("_type_name") {
762            map.insert("_type".to_string(), type_name);
763        }
764
765        Self::ensure_properties_map(&mut map);
766        Value::Map(map)
767    }
768
769    #[instrument(
770        skip(self, prop_manager, params),
771        fields(rows_returned, duration_ms),
772        level = "info"
773    )]
774    pub fn execute<'a>(
775        &'a self,
776        plan: LogicalPlan,
777        prop_manager: &'a PropertyManager,
778        params: &'a HashMap<String, Value>,
779    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
780        Box::pin(async move {
781            let query_type = Self::get_plan_type(&plan);
782            let ctx = self.get_context().await;
783            let start = Instant::now();
784
785            // Route DDL/Admin queries to the fallback executor.
786            // All other queries (including similar_to) flow through DataFusion.
787            let res = if Self::is_ddl_or_admin(&plan) {
788                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
789                    .await
790            } else {
791                let batches = self
792                    .execute_datafusion(plan.clone(), prop_manager, params)
793                    .await?;
794                self.record_batches_to_rows(batches)
795            };
796
797            let duration = start.elapsed();
798            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
799                .record(duration.as_secs_f64());
800
801            tracing::Span::current().record("duration_ms", duration.as_millis());
802            match &res {
803                Ok(rows) => {
804                    tracing::Span::current().record("rows_returned", rows.len());
805                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
806                        .increment(rows.len() as u64);
807                }
808                Err(e) => {
809                    let error_type = if e.to_string().contains("timed out") {
810                        "timeout"
811                    } else if e.to_string().contains("syntax") {
812                        "syntax"
813                    } else {
814                        "execution"
815                    };
816                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
817                }
818            }
819
820            res
821        })
822    }
823
824    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
825        match plan {
826            LogicalPlan::Scan { .. } => "read_scan",
827            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
828            LogicalPlan::Traverse { .. } => "read_traverse",
829            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
830            LogicalPlan::ScanAll { .. } => "read_scan_all",
831            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
832            LogicalPlan::VectorKnn { .. } => "read_vector",
833            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
834            LogicalPlan::Merge { .. } => "write_merge",
835            LogicalPlan::Delete { .. } => "write_delete",
836            LogicalPlan::Set { .. } => "write_set",
837            LogicalPlan::Remove { .. } => "write_remove",
838            LogicalPlan::ProcedureCall { .. } => "call",
839            LogicalPlan::Copy { .. } => "copy",
840            LogicalPlan::Backup { .. } => "backup",
841            _ => "other",
842        }
843    }
844
845    /// Return all direct child plan references from a `LogicalPlan`.
846    ///
847    /// This centralizes the variant→children mapping so that recursive walkers
848    /// (e.g., `contains_write_operations`) can delegate the
849    /// "recurse into children" logic instead of duplicating the match arms.
850    ///
851    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
852    /// is not included because it requires special iteration. Callers that
853    /// need to inspect the body should handle `Foreach` before falling through.
854    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
855        match plan {
856            // Single-input wrappers
857            LogicalPlan::Project { input, .. }
858            | LogicalPlan::Sort { input, .. }
859            | LogicalPlan::Limit { input, .. }
860            | LogicalPlan::Distinct { input }
861            | LogicalPlan::Aggregate { input, .. }
862            | LogicalPlan::Window { input, .. }
863            | LogicalPlan::Unwind { input, .. }
864            | LogicalPlan::Filter { input, .. }
865            | LogicalPlan::Create { input, .. }
866            | LogicalPlan::CreateBatch { input, .. }
867            | LogicalPlan::Set { input, .. }
868            | LogicalPlan::Remove { input, .. }
869            | LogicalPlan::Delete { input, .. }
870            | LogicalPlan::Merge { input, .. }
871            | LogicalPlan::Foreach { input, .. }
872            | LogicalPlan::Traverse { input, .. }
873            | LogicalPlan::TraverseMainByType { input, .. }
874            | LogicalPlan::BindZeroLengthPath { input, .. }
875            | LogicalPlan::BindPath { input, .. }
876            | LogicalPlan::ShortestPath { input, .. }
877            | LogicalPlan::AllShortestPaths { input, .. }
878            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
879
880            // Two-input wrappers
881            LogicalPlan::Apply {
882                input, subquery, ..
883            }
884            | LogicalPlan::SubqueryCall { input, subquery } => {
885                vec![input.as_ref(), subquery.as_ref()]
886            }
887            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
888                vec![left.as_ref(), right.as_ref()]
889            }
890            LogicalPlan::RecursiveCTE {
891                initial, recursive, ..
892            } => vec![initial.as_ref(), recursive.as_ref()],
893            LogicalPlan::QuantifiedPattern {
894                input,
895                pattern_plan,
896                ..
897            } => vec![input.as_ref(), pattern_plan.as_ref()],
898
899            // Leaf nodes (scans, DDL, admin, etc.)
900            _ => vec![],
901        }
902    }
903
904    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
905    ///
906    /// These operations don't produce data streams and aren't supported by the
907    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
908    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
909    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
910    pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
911        match plan {
912            // DDL / schema operations
913            LogicalPlan::CreateLabel(_)
914            | LogicalPlan::CreateEdgeType(_)
915            | LogicalPlan::AlterLabel(_)
916            | LogicalPlan::AlterEdgeType(_)
917            | LogicalPlan::DropLabel(_)
918            | LogicalPlan::DropEdgeType(_)
919            | LogicalPlan::CreateConstraint(_)
920            | LogicalPlan::DropConstraint(_)
921            | LogicalPlan::ShowConstraints(_) => true,
922
923            // Index operations
924            LogicalPlan::CreateVectorIndex { .. }
925            | LogicalPlan::CreateFullTextIndex { .. }
926            | LogicalPlan::CreateScalarIndex { .. }
927            | LogicalPlan::CreateJsonFtsIndex { .. }
928            | LogicalPlan::DropIndex { .. }
929            | LogicalPlan::ShowIndexes { .. } => true,
930
931            // Admin / utility operations
932            LogicalPlan::ShowDatabase
933            | LogicalPlan::ShowConfig
934            | LogicalPlan::ShowStatistics
935            | LogicalPlan::Vacuum
936            | LogicalPlan::Checkpoint
937            | LogicalPlan::Copy { .. }
938            | LogicalPlan::CopyTo { .. }
939            | LogicalPlan::CopyFrom { .. }
940            | LogicalPlan::Backup { .. }
941            | LogicalPlan::Explain { .. } => true,
942
943            // Procedure calls: DF-eligible procedures go through DataFusion,
944            // everything else (DDL, admin, unknown) stays on fallback.
945            LogicalPlan::ProcedureCall { procedure_name, .. } => {
946                !Self::is_df_eligible_procedure(procedure_name)
947            }
948
949            // Recurse through children using plan_children
950            _ => Self::plan_children(plan)
951                .iter()
952                .any(|child| Self::is_ddl_or_admin(child)),
953        }
954    }
955
956    /// Returns `true` if the procedure is a read-only, data-producing procedure
957    /// that can be executed through the DataFusion engine.
958    ///
959    /// This is a **positive allowlist** — unknown procedures default to the
960    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
961    fn is_df_eligible_procedure(name: &str) -> bool {
962        matches!(
963            name,
964            "uni.schema.labels"
965                | "uni.schema.edgeTypes"
966                | "uni.schema.relationshipTypes"
967                | "uni.schema.indexes"
968                | "uni.schema.constraints"
969                | "uni.schema.labelInfo"
970                | "uni.vector.query"
971                | "uni.fts.query"
972                | "uni.search"
973        ) || name.starts_with("uni.algo.")
974    }
975
976    /// Check if a plan contains write/mutation operations anywhere in the tree.
977    ///
978    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
979    /// are used to determine when a MutationContext needs to be built for DataFusion.
980    /// This recurses through read-only wrapper nodes to detect writes nested inside
981    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
982    fn contains_write_operations(plan: &LogicalPlan) -> bool {
983        match plan {
984            LogicalPlan::Create { .. }
985            | LogicalPlan::CreateBatch { .. }
986            | LogicalPlan::Merge { .. }
987            | LogicalPlan::Delete { .. }
988            | LogicalPlan::Set { .. }
989            | LogicalPlan::Remove { .. }
990            | LogicalPlan::Foreach { .. } => true,
991            _ => Self::plan_children(plan)
992                .iter()
993                .any(|child| Self::contains_write_operations(child)),
994        }
995    }
996
997    /// Executes a query as a stream of result batches.
998    ///
999    /// Routes DDL/Admin through the fallback executor and everything else
1000    /// through DataFusion.
1001    pub fn execute_stream(
1002        self,
1003        plan: LogicalPlan,
1004        prop_manager: Arc<PropertyManager>,
1005        params: HashMap<String, Value>,
1006    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1007        let this = self;
1008        let this_for_ctx = this.clone();
1009
1010        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1011
1012        ctx_stream
1013            .flat_map(move |ctx| {
1014                let plan = plan.clone();
1015                let this = this.clone();
1016                let prop_manager = prop_manager.clone();
1017                let params = params.clone();
1018
1019                let fut = async move {
1020                    if Self::is_ddl_or_admin(&plan) {
1021                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1022                            .await
1023                    } else {
1024                        let batches = this
1025                            .execute_datafusion(plan, &prop_manager, &params)
1026                            .await?;
1027                        this.record_batches_to_rows(batches)
1028                    }
1029                };
1030                stream::once(fut).boxed()
1031            })
1032            .boxed()
1033    }
1034
1035    /// Converts an Arrow array element at a given row index to a Value.
1036    /// Delegates to the shared implementation in arrow_convert module.
1037    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1038        arrow_convert::arrow_to_value(col, row, None)
1039    }
1040
1041    pub(crate) fn evaluate_expr<'a>(
1042        &'a self,
1043        expr: &'a Expr,
1044        row: &'a HashMap<String, Value>,
1045        prop_manager: &'a PropertyManager,
1046        params: &'a HashMap<String, Value>,
1047        ctx: Option<&'a QueryContext>,
1048    ) -> BoxFuture<'a, Result<Value>> {
1049        let this = self;
1050        Box::pin(async move {
1051            // First check if the expression itself is already pre-computed in the row
1052            let repr = expr.to_string_repr();
1053            if let Some(val) = row.get(&repr) {
1054                return Ok(val.clone());
1055            }
1056
1057            match expr {
1058                Expr::PatternComprehension { .. } => {
1059                    // Handled by DataFusion path via PatternComprehensionExecExpr
1060                    Err(anyhow::anyhow!(
1061                        "Pattern comprehensions are handled by DataFusion executor"
1062                    ))
1063                }
1064                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1065                    "COLLECT subqueries not yet supported in executor"
1066                )),
1067                Expr::Variable(name) => {
1068                    if let Some(val) = row.get(name) {
1069                        Ok(val.clone())
1070                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1071                        // Fallback: scan results may have system columns like "d._vid"
1072                        // without a materialized "d" Map. Return the VID so Property
1073                        // evaluation can fetch properties from storage.
1074                        Ok(vid_val.clone())
1075                    } else {
1076                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1077                    }
1078                }
1079                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1080                Expr::Property(var_expr, prop_name) => {
1081                    // Fast path: if the base is a Variable, try flat-key lookup first.
1082                    // DataFusion scan results use flat keys like "d.embedding" rather than
1083                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1084                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1085                        let flat_key = format!("{}.{}", var_name, prop_name);
1086                        if let Some(val) = row.get(flat_key.as_str()) {
1087                            return Ok(val.clone());
1088                        }
1089                    }
1090
1091                    let base_val = this
1092                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1093                        .await?;
1094
1095                    // Handle system properties _vid and _id directly
1096                    if (prop_name == "_vid" || prop_name == "_id")
1097                        && let Ok(vid) = Self::vid_from_value(&base_val)
1098                    {
1099                        return Ok(Value::Int(vid.as_u64() as i64));
1100                    }
1101
1102                    // Handle Value::Node - access properties directly or via prop manager
1103                    if let Value::Node(node) = &base_val {
1104                        // Handle system properties
1105                        if prop_name == "_vid" || prop_name == "_id" {
1106                            return Ok(Value::Int(node.vid.as_u64() as i64));
1107                        }
1108                        if prop_name == "_labels" {
1109                            return Ok(Value::List(
1110                                node.labels
1111                                    .iter()
1112                                    .map(|l| Value::String(l.clone()))
1113                                    .collect(),
1114                            ));
1115                        }
1116                        // Check in-memory properties first
1117                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1118                            return Ok(val.clone());
1119                        }
1120                        // Fallback to storage lookup
1121                        if let Ok(val) = prop_manager
1122                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1123                            .await
1124                        {
1125                            return Ok(val);
1126                        }
1127                        return Ok(Value::Null);
1128                    }
1129
1130                    // Handle Value::Edge - access properties directly or via prop manager
1131                    if let Value::Edge(edge) = &base_val {
1132                        // Handle system properties
1133                        if prop_name == "_eid" || prop_name == "_id" {
1134                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1135                        }
1136                        if prop_name == "_type" {
1137                            return Ok(Value::String(edge.edge_type.clone()));
1138                        }
1139                        if prop_name == "_src" {
1140                            return Ok(Value::Int(edge.src.as_u64() as i64));
1141                        }
1142                        if prop_name == "_dst" {
1143                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1144                        }
1145                        // Check in-memory properties first
1146                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1147                            return Ok(val.clone());
1148                        }
1149                        // Fallback to storage lookup
1150                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1151                        {
1152                            return Ok(val);
1153                        }
1154                        return Ok(Value::Null);
1155                    }
1156
1157                    // If base_val is an object (node/edge), check its properties first
1158                    // This handles properties from CREATE/MERGE that may not be persisted yet
1159                    if let Value::Map(map) = &base_val {
1160                        // First check top-level (for system properties like _id, _label, etc.)
1161                        if let Some(val) = map.get(prop_name.as_str()) {
1162                            return Ok(val.clone());
1163                        }
1164                        // Then check inside "properties" object (for user properties)
1165                        if let Some(Value::Map(props)) = map.get("properties")
1166                            && let Some(val) = props.get(prop_name.as_str())
1167                        {
1168                            return Ok(val.clone());
1169                        }
1170                        // Fallback to storage lookup using _vid or _id
1171                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1172                            map.get("_id")
1173                                .and_then(|v| v.as_str())
1174                                .and_then(|s| s.parse::<u64>().ok())
1175                        });
1176                        if let Some(id) = vid_opt {
1177                            let vid = Vid::from(id);
1178                            if let Ok(val) = prop_manager
1179                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1180                                .await
1181                            {
1182                                return Ok(val);
1183                            }
1184                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1185                            let eid = uni_common::core::id::Eid::from(id);
1186                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1187                                return Ok(val);
1188                            }
1189                        }
1190                        return Ok(Value::Null);
1191                    }
1192
1193                    // If base_val is just a VID, fetch from property manager
1194                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1195                        return prop_manager
1196                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1197                            .await;
1198                    }
1199
1200                    if base_val.is_null() {
1201                        return Ok(Value::Null);
1202                    }
1203
1204                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1205                    {
1206                        use crate::query::datetime::{
1207                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1208                            is_duration_string, is_temporal_accessor, is_temporal_string,
1209                        };
1210
1211                        // Handle Value::Temporal directly (no string parsing needed)
1212                        if let Value::Temporal(tv) = &base_val {
1213                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1214                                if is_duration_accessor(prop_name) {
1215                                    // Convert to string for the existing accessor logic
1216                                    return eval_duration_accessor(
1217                                        &base_val.to_string(),
1218                                        prop_name,
1219                                    );
1220                                }
1221                            } else if is_temporal_accessor(prop_name) {
1222                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1223                            }
1224                        }
1225
1226                        // Handle Value::String temporal (backward compat)
1227                        if let Value::String(s) = &base_val {
1228                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1229                                return eval_temporal_accessor(s, prop_name);
1230                            }
1231                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1232                                return eval_duration_accessor(s, prop_name);
1233                            }
1234                        }
1235                    }
1236
1237                    Err(anyhow!(
1238                        "Cannot access property '{}' on {:?}",
1239                        prop_name,
1240                        base_val
1241                    ))
1242                }
1243                Expr::ArrayIndex {
1244                    array: arr_expr,
1245                    index: idx_expr,
1246                } => {
1247                    let arr_val = this
1248                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1249                        .await?;
1250                    let idx_val = this
1251                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1252                        .await?;
1253
1254                    if let Value::List(arr) = &arr_val {
1255                        // Handle signed indices (allow negative)
1256                        if let Some(i) = idx_val.as_i64() {
1257                            let idx = if i < 0 {
1258                                // Negative index: -1 = last element, -2 = second to last, etc.
1259                                let positive_idx = arr.len() as i64 + i;
1260                                if positive_idx < 0 {
1261                                    return Ok(Value::Null); // Out of bounds
1262                                }
1263                                positive_idx as usize
1264                            } else {
1265                                i as usize
1266                            };
1267                            if idx < arr.len() {
1268                                return Ok(arr[idx].clone());
1269                            }
1270                            return Ok(Value::Null);
1271                        } else if idx_val.is_null() {
1272                            return Ok(Value::Null);
1273                        } else {
1274                            return Err(anyhow::anyhow!(
1275                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1276                                idx_val
1277                            ));
1278                        }
1279                    }
1280                    if let Value::Map(map) = &arr_val {
1281                        if let Some(key) = idx_val.as_str() {
1282                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1283                        } else if !idx_val.is_null() {
1284                            return Err(anyhow::anyhow!(
1285                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1286                                idx_val
1287                            ));
1288                        }
1289                    }
1290                    // Handle bracket access on Node: n['name'] returns property
1291                    if let Value::Node(node) = &arr_val {
1292                        if let Some(key) = idx_val.as_str() {
1293                            // Check in-memory properties first
1294                            if let Some(val) = node.properties.get(key) {
1295                                return Ok(val.clone());
1296                            }
1297                            // Fallback to property manager
1298                            if let Ok(val) = prop_manager
1299                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1300                                .await
1301                            {
1302                                return Ok(val);
1303                            }
1304                            return Ok(Value::Null);
1305                        } else if !idx_val.is_null() {
1306                            return Err(anyhow::anyhow!(
1307                                "TypeError: Node index must be a string, got: {:?}",
1308                                idx_val
1309                            ));
1310                        }
1311                    }
1312                    // Handle bracket access on Edge: e['property'] returns property
1313                    if let Value::Edge(edge) = &arr_val {
1314                        if let Some(key) = idx_val.as_str() {
1315                            // Check in-memory properties first
1316                            if let Some(val) = edge.properties.get(key) {
1317                                return Ok(val.clone());
1318                            }
1319                            // Fallback to property manager
1320                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1321                                return Ok(val);
1322                            }
1323                            return Ok(Value::Null);
1324                        } else if !idx_val.is_null() {
1325                            return Err(anyhow::anyhow!(
1326                                "TypeError: Edge index must be a string, got: {:?}",
1327                                idx_val
1328                            ));
1329                        }
1330                    }
1331                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1332                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1333                        && let Some(key) = idx_val.as_str()
1334                    {
1335                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1336                        {
1337                            return Ok(val);
1338                        }
1339                        return Ok(Value::Null);
1340                    }
1341                    if arr_val.is_null() {
1342                        return Ok(Value::Null);
1343                    }
1344                    Err(anyhow!(
1345                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1346                        arr_val
1347                    ))
1348                }
1349                Expr::ArraySlice { array, start, end } => {
1350                    let arr_val = this
1351                        .evaluate_expr(array, row, prop_manager, params, ctx)
1352                        .await?;
1353
1354                    if let Value::List(arr) = &arr_val {
1355                        let len = arr.len();
1356
1357                        // Evaluate start index (default to 0), null → null result
1358                        let start_idx = if let Some(s) = start {
1359                            let v = this
1360                                .evaluate_expr(s, row, prop_manager, params, ctx)
1361                                .await?;
1362                            if v.is_null() {
1363                                return Ok(Value::Null);
1364                            }
1365                            let raw = v.as_i64().unwrap_or(0);
1366                            if raw < 0 {
1367                                (len as i64 + raw).max(0) as usize
1368                            } else {
1369                                (raw as usize).min(len)
1370                            }
1371                        } else {
1372                            0
1373                        };
1374
1375                        // Evaluate end index (default to length), null → null result
1376                        let end_idx = if let Some(e) = end {
1377                            let v = this
1378                                .evaluate_expr(e, row, prop_manager, params, ctx)
1379                                .await?;
1380                            if v.is_null() {
1381                                return Ok(Value::Null);
1382                            }
1383                            let raw = v.as_i64().unwrap_or(len as i64);
1384                            if raw < 0 {
1385                                (len as i64 + raw).max(0) as usize
1386                            } else {
1387                                (raw as usize).min(len)
1388                            }
1389                        } else {
1390                            len
1391                        };
1392
1393                        // Return sliced array
1394                        if start_idx >= end_idx {
1395                            return Ok(Value::List(vec![]));
1396                        }
1397                        let end_idx = end_idx.min(len);
1398                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1399                    }
1400
1401                    if arr_val.is_null() {
1402                        return Ok(Value::Null);
1403                    }
1404                    Err(anyhow!("Cannot slice {:?}", arr_val))
1405                }
1406                Expr::Literal(lit) => Ok(lit.to_value()),
1407                Expr::List(items) => {
1408                    let mut vals = Vec::new();
1409                    for item in items {
1410                        vals.push(
1411                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1412                                .await?,
1413                        );
1414                    }
1415                    Ok(Value::List(vals))
1416                }
1417                Expr::Map(items) => {
1418                    let mut map = HashMap::new();
1419                    for (key, value_expr) in items {
1420                        let val = this
1421                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1422                            .await?;
1423                        map.insert(key.clone(), val);
1424                    }
1425                    Ok(Value::Map(map))
1426                }
1427                Expr::Exists { query, .. } => {
1428                    // Plan and execute subquery; failures return false (pattern doesn't match)
1429                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1430                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1431
1432                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1433                        Ok(plan) => {
1434                            let mut sub_params = params.clone();
1435                            sub_params.extend(row.clone());
1436
1437                            match this.execute(plan, prop_manager, &sub_params).await {
1438                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1439                                Err(e) => {
1440                                    log::debug!("EXISTS subquery execution failed: {}", e);
1441                                    Ok(Value::Bool(false))
1442                                }
1443                            }
1444                        }
1445                        Err(e) => {
1446                            log::debug!("EXISTS subquery planning failed: {}", e);
1447                            Ok(Value::Bool(false))
1448                        }
1449                    }
1450                }
1451                Expr::CountSubquery(query) => {
1452                    // Similar to Exists but returns count
1453                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1454
1455                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1456
1457                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1458                        Ok(plan) => {
1459                            let mut sub_params = params.clone();
1460                            sub_params.extend(row.clone());
1461
1462                            match this.execute(plan, prop_manager, &sub_params).await {
1463                                Ok(results) => Ok(Value::from(results.len() as i64)),
1464                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1465                            }
1466                        }
1467                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1468                    }
1469                }
1470                Expr::Quantifier {
1471                    quantifier,
1472                    variable,
1473                    list,
1474                    predicate,
1475                } => {
1476                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1477                    //
1478                    // This is the primary execution path for quantifiers because DataFusion
1479                    // does not support lambda functions yet. Queries with quantifiers attempt
1480                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1481                    // to this fallback executor path.
1482                    //
1483                    // This is intentional design - we get correct semantics with row-by-row
1484                    // evaluation until DataFusion adds lambda support.
1485                    //
1486                    // See: https://github.com/apache/datafusion/issues/14205
1487
1488                    // Evaluate the list expression
1489                    let list_val = this
1490                        .evaluate_expr(list, row, prop_manager, params, ctx)
1491                        .await?;
1492
1493                    // Handle null propagation
1494                    if list_val.is_null() {
1495                        return Ok(Value::Null);
1496                    }
1497
1498                    // Convert to array
1499                    let items = match list_val {
1500                        Value::List(arr) => arr,
1501                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1502                    };
1503
1504                    // Evaluate predicate for each item
1505                    let mut satisfied_count = 0;
1506                    for item in &items {
1507                        // Create new row with bound variable
1508                        let mut item_row = row.clone();
1509                        item_row.insert(variable.clone(), item.clone());
1510
1511                        // Evaluate predicate with bound variable
1512                        let pred_result = this
1513                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1514                            .await?;
1515
1516                        // Check if predicate is satisfied
1517                        if let Value::Bool(true) = pred_result {
1518                            satisfied_count += 1;
1519                        }
1520                    }
1521
1522                    // Return based on quantifier type
1523                    let result = match quantifier {
1524                        Quantifier::All => satisfied_count == items.len(),
1525                        Quantifier::Any => satisfied_count > 0,
1526                        Quantifier::Single => satisfied_count == 1,
1527                        Quantifier::None => satisfied_count == 0,
1528                    };
1529
1530                    Ok(Value::Bool(result))
1531                }
1532                Expr::ListComprehension {
1533                    variable,
1534                    list,
1535                    where_clause,
1536                    map_expr,
1537                } => {
1538                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1539                    //
1540                    // Similar to quantifiers, this requires lambda-like evaluation
1541                    // which DataFusion doesn't support yet. This is the primary execution path.
1542
1543                    // Evaluate the list expression
1544                    let list_val = this
1545                        .evaluate_expr(list, row, prop_manager, params, ctx)
1546                        .await?;
1547
1548                    // Handle null propagation
1549                    if list_val.is_null() {
1550                        return Ok(Value::Null);
1551                    }
1552
1553                    // Convert to array
1554                    let items = match list_val {
1555                        Value::List(arr) => arr,
1556                        _ => {
1557                            return Err(anyhow!(
1558                                "List comprehension expects a list, got: {:?}",
1559                                list_val
1560                            ));
1561                        }
1562                    };
1563
1564                    // Collect mapped values
1565                    let mut results = Vec::new();
1566                    for item in &items {
1567                        // Create new row with bound variable
1568                        let mut item_row = row.clone();
1569                        item_row.insert(variable.clone(), item.clone());
1570
1571                        // Apply WHERE filter if present
1572                        if let Some(predicate) = where_clause {
1573                            let pred_result = this
1574                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1575                                .await?;
1576
1577                            // Skip items that don't match the filter
1578                            if !matches!(pred_result, Value::Bool(true)) {
1579                                continue;
1580                            }
1581                        }
1582
1583                        // Apply map expression
1584                        let mapped_val = this
1585                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1586                            .await?;
1587                        results.push(mapped_val);
1588                    }
1589
1590                    Ok(Value::List(results))
1591                }
1592                Expr::BinaryOp { left, op, right } => {
1593                    // Short-circuit evaluation for AND/OR
1594                    match op {
1595                        BinaryOp::And => {
1596                            let l_val = this
1597                                .evaluate_expr(left, row, prop_manager, params, ctx)
1598                                .await?;
1599                            // Short-circuit: if left is false, don't evaluate right
1600                            if let Some(false) = l_val.as_bool() {
1601                                return Ok(Value::Bool(false));
1602                            }
1603                            let r_val = this
1604                                .evaluate_expr(right, row, prop_manager, params, ctx)
1605                                .await?;
1606                            eval_binary_op(&l_val, op, &r_val)
1607                        }
1608                        BinaryOp::Or => {
1609                            let l_val = this
1610                                .evaluate_expr(left, row, prop_manager, params, ctx)
1611                                .await?;
1612                            // Short-circuit: if left is true, don't evaluate right
1613                            if let Some(true) = l_val.as_bool() {
1614                                return Ok(Value::Bool(true));
1615                            }
1616                            let r_val = this
1617                                .evaluate_expr(right, row, prop_manager, params, ctx)
1618                                .await?;
1619                            eval_binary_op(&l_val, op, &r_val)
1620                        }
1621                        _ => {
1622                            // For all other operators, evaluate both sides
1623                            let l_val = this
1624                                .evaluate_expr(left, row, prop_manager, params, ctx)
1625                                .await?;
1626                            let r_val = this
1627                                .evaluate_expr(right, row, prop_manager, params, ctx)
1628                                .await?;
1629                            eval_binary_op(&l_val, op, &r_val)
1630                        }
1631                    }
1632                }
1633                Expr::In { expr, list } => {
1634                    let l_val = this
1635                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1636                        .await?;
1637                    let r_val = this
1638                        .evaluate_expr(list, row, prop_manager, params, ctx)
1639                        .await?;
1640                    eval_in_op(&l_val, &r_val)
1641                }
1642                Expr::UnaryOp { op, expr } => {
1643                    let val = this
1644                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1645                        .await?;
1646                    match op {
1647                        UnaryOp::Not => {
1648                            // Three-valued logic: NOT null = null
1649                            match val.as_bool() {
1650                                Some(b) => Ok(Value::Bool(!b)),
1651                                None if val.is_null() => Ok(Value::Null),
1652                                None => Err(anyhow!(
1653                                    "InvalidArgumentType: NOT requires a boolean argument"
1654                                )),
1655                            }
1656                        }
1657                        UnaryOp::Neg => {
1658                            if let Some(i) = val.as_i64() {
1659                                Ok(Value::Int(-i))
1660                            } else if let Some(f) = val.as_f64() {
1661                                Ok(Value::Float(-f))
1662                            } else {
1663                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1664                            }
1665                        }
1666                    }
1667                }
1668                Expr::IsNull(expr) => {
1669                    let val = this
1670                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1671                        .await?;
1672                    Ok(Value::Bool(val.is_null()))
1673                }
1674                Expr::IsNotNull(expr) => {
1675                    let val = this
1676                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1677                        .await?;
1678                    Ok(Value::Bool(!val.is_null()))
1679                }
1680                Expr::IsUnique(_) => {
1681                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1682                    Err(anyhow!(
1683                        "IS UNIQUE can only be used in constraint definitions"
1684                    ))
1685                }
1686                Expr::Case {
1687                    expr,
1688                    when_then,
1689                    else_expr,
1690                } => {
1691                    if let Some(base_expr) = expr {
1692                        let base_val = this
1693                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1694                            .await?;
1695                        for (w, t) in when_then {
1696                            let w_val = this
1697                                .evaluate_expr(w, row, prop_manager, params, ctx)
1698                                .await?;
1699                            if base_val == w_val {
1700                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1701                            }
1702                        }
1703                    } else {
1704                        for (w, t) in when_then {
1705                            let w_val = this
1706                                .evaluate_expr(w, row, prop_manager, params, ctx)
1707                                .await?;
1708                            if w_val.as_bool() == Some(true) {
1709                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1710                            }
1711                        }
1712                    }
1713                    if let Some(e) = else_expr {
1714                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1715                    }
1716                    Ok(Value::Null)
1717                }
1718                Expr::Wildcard => Ok(Value::Null),
1719                Expr::FunctionCall { name, args, .. } => {
1720                    // Special case: id() returns VID for nodes and EID for relationships
1721                    if name.eq_ignore_ascii_case("ID") {
1722                        if args.len() != 1 {
1723                            return Err(anyhow!("id() requires exactly 1 argument"));
1724                        }
1725                        let val = this
1726                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1727                            .await?;
1728                        if let Value::Map(map) = &val {
1729                            // Check for _vid (vertex) first
1730                            if let Some(vid_val) = map.get("_vid") {
1731                                return Ok(vid_val.clone());
1732                            }
1733                            // Check for _eid (edge/relationship)
1734                            if let Some(eid_val) = map.get("_eid") {
1735                                return Ok(eid_val.clone());
1736                            }
1737                            // Check for _id (fallback)
1738                            if let Some(id_val) = map.get("_id") {
1739                                return Ok(id_val.clone());
1740                            }
1741                        }
1742                        return Ok(Value::Null);
1743                    }
1744
1745                    // Special case: elementId() returns string format "label_id:local_offset"
1746                    if name.eq_ignore_ascii_case("ELEMENTID") {
1747                        if args.len() != 1 {
1748                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1749                        }
1750                        let val = this
1751                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1752                            .await?;
1753                        if let Value::Map(map) = &val {
1754                            // Check for _vid (vertex) first
1755                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1756                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1757                                return Ok(Value::String(vid_val.to_string()));
1758                            }
1759                            // Check for _eid (edge/relationship)
1760                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1761                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1762                                return Ok(Value::String(eid_val.to_string()));
1763                            }
1764                        }
1765                        return Ok(Value::Null);
1766                    }
1767
1768                    // Special case: type() returns the relationship type name
1769                    if name.eq_ignore_ascii_case("TYPE") {
1770                        if args.len() != 1 {
1771                            return Err(anyhow!("type() requires exactly 1 argument"));
1772                        }
1773                        let val = this
1774                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1775                            .await?;
1776                        if let Value::Map(map) = &val
1777                            && let Some(type_val) = map.get("_type")
1778                        {
1779                            // Numeric _type is an edge type ID; string _type is already a name
1780                            if let Some(type_id) =
1781                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1782                            {
1783                                if let Some(name) = this
1784                                    .storage
1785                                    .schema_manager()
1786                                    .edge_type_name_by_id_unified(type_id)
1787                                {
1788                                    return Ok(Value::String(name));
1789                                }
1790                            } else if let Some(name) = type_val.as_str() {
1791                                return Ok(Value::String(name.to_string()));
1792                            }
1793                        }
1794                        return Ok(Value::Null);
1795                    }
1796
1797                    // Special case: labels() returns the labels of a node
1798                    if name.eq_ignore_ascii_case("LABELS") {
1799                        if args.len() != 1 {
1800                            return Err(anyhow!("labels() requires exactly 1 argument"));
1801                        }
1802                        let val = this
1803                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1804                            .await?;
1805                        if let Value::Map(map) = &val
1806                            && let Some(labels_val) = map.get("_labels")
1807                        {
1808                            return Ok(labels_val.clone());
1809                        }
1810                        return Ok(Value::Null);
1811                    }
1812
1813                    // Special case: properties() returns the properties map of a node/edge
1814                    if name.eq_ignore_ascii_case("PROPERTIES") {
1815                        if args.len() != 1 {
1816                            return Err(anyhow!("properties() requires exactly 1 argument"));
1817                        }
1818                        let val = this
1819                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1820                            .await?;
1821                        if let Value::Map(map) = &val {
1822                            // Filter out internal properties (those starting with _)
1823                            let mut props = HashMap::new();
1824                            for (k, v) in map.iter() {
1825                                if !k.starts_with('_') {
1826                                    props.insert(k.clone(), v.clone());
1827                                }
1828                            }
1829                            return Ok(Value::Map(props));
1830                        }
1831                        return Ok(Value::Null);
1832                    }
1833
1834                    // Special case: startNode() returns the start node of a relationship
1835                    if name.eq_ignore_ascii_case("STARTNODE") {
1836                        if args.len() != 1 {
1837                            return Err(anyhow!("startNode() requires exactly 1 argument"));
1838                        }
1839                        let val = this
1840                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1841                            .await?;
1842                        if let Value::Edge(edge) = &val {
1843                            return Ok(Self::find_node_by_vid(row, edge.src));
1844                        }
1845                        if let Value::Map(map) = &val {
1846                            if let Some(start_node) = map.get("_startNode") {
1847                                return Ok(start_node.clone());
1848                            }
1849                            if let Some(src_vid) = map.get("_src_vid") {
1850                                return Ok(Value::Map(HashMap::from([(
1851                                    "_vid".to_string(),
1852                                    src_vid.clone(),
1853                                )])));
1854                            }
1855                            // Resolve _src VID by looking up node in row
1856                            if let Some(src_id) = map.get("_src")
1857                                && let Some(u) = src_id.as_u64()
1858                            {
1859                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1860                            }
1861                        }
1862                        return Ok(Value::Null);
1863                    }
1864
1865                    // Special case: endNode() returns the end node of a relationship
1866                    if name.eq_ignore_ascii_case("ENDNODE") {
1867                        if args.len() != 1 {
1868                            return Err(anyhow!("endNode() requires exactly 1 argument"));
1869                        }
1870                        let val = this
1871                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1872                            .await?;
1873                        if let Value::Edge(edge) = &val {
1874                            return Ok(Self::find_node_by_vid(row, edge.dst));
1875                        }
1876                        if let Value::Map(map) = &val {
1877                            if let Some(end_node) = map.get("_endNode") {
1878                                return Ok(end_node.clone());
1879                            }
1880                            if let Some(dst_vid) = map.get("_dst_vid") {
1881                                return Ok(Value::Map(HashMap::from([(
1882                                    "_vid".to_string(),
1883                                    dst_vid.clone(),
1884                                )])));
1885                            }
1886                            // Resolve _dst VID by looking up node in row
1887                            if let Some(dst_id) = map.get("_dst")
1888                                && let Some(u) = dst_id.as_u64()
1889                            {
1890                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1891                            }
1892                        }
1893                        return Ok(Value::Null);
1894                    }
1895
1896                    // Special case: hasLabel() checks if a node has a specific label
1897                    // Used for WHERE n:Label predicates
1898                    if name.eq_ignore_ascii_case("HASLABEL") {
1899                        if args.len() != 2 {
1900                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1901                        }
1902                        let node_val = this
1903                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1904                            .await?;
1905                        let label_val = this
1906                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1907                            .await?;
1908
1909                        let label_to_check = label_val.as_str().ok_or_else(|| {
1910                            anyhow!("Second argument to hasLabel must be a string")
1911                        })?;
1912
1913                        let has_label = match &node_val {
1914                            // Handle proper Value::Node type (from result normalization)
1915                            Value::Map(map) if map.contains_key("_vid") => {
1916                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1917                                    labels_arr
1918                                        .iter()
1919                                        .any(|l| l.as_str() == Some(label_to_check))
1920                                } else {
1921                                    false
1922                                }
1923                            }
1924                            // Also handle legacy Object format
1925                            Value::Map(map) => {
1926                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1927                                    labels_arr
1928                                        .iter()
1929                                        .any(|l| l.as_str() == Some(label_to_check))
1930                                } else {
1931                                    false
1932                                }
1933                            }
1934                            _ => false,
1935                        };
1936                        return Ok(Value::Bool(has_label));
1937                    }
1938
1939                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
1940                    // These should be parsed as Expr::Quantifier instead.
1941                    if matches!(
1942                        name.to_uppercase().as_str(),
1943                        "ANY" | "ALL" | "NONE" | "SINGLE"
1944                    ) {
1945                        return Err(anyhow!(
1946                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1947                            name.to_lowercase()
1948                        ));
1949                    }
1950
1951                    // Special case: COALESCE needs short-circuit evaluation
1952                    if name.eq_ignore_ascii_case("COALESCE") {
1953                        for arg in args {
1954                            let val = this
1955                                .evaluate_expr(arg, row, prop_manager, params, ctx)
1956                                .await?;
1957                            if !val.is_null() {
1958                                return Ok(val);
1959                            }
1960                        }
1961                        return Ok(Value::Null);
1962                    }
1963
1964                    // Special case: vector_similarity has dedicated implementation
1965                    if name.eq_ignore_ascii_case("vector_similarity") {
1966                        if args.len() != 2 {
1967                            return Err(anyhow!("vector_similarity takes 2 arguments"));
1968                        }
1969                        let v1 = this
1970                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1971                            .await?;
1972                        let v2 = this
1973                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1974                            .await?;
1975                        return eval_vector_similarity(&v1, &v2);
1976                    }
1977
1978                    // Special case: uni.validAt handles node fetching
1979                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
1980                        || name.eq_ignore_ascii_case("uni.validAt")
1981                        || name.eq_ignore_ascii_case("validAt")
1982                    {
1983                        if args.len() != 4 {
1984                            return Err(anyhow!("validAt requires 4 arguments"));
1985                        }
1986                        let node_val = this
1987                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1988                            .await?;
1989                        let start_prop = this
1990                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1991                            .await?
1992                            .as_str()
1993                            .ok_or(anyhow!("start_prop must be string"))?
1994                            .to_string();
1995                        let end_prop = this
1996                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
1997                            .await?
1998                            .as_str()
1999                            .ok_or(anyhow!("end_prop must be string"))?
2000                            .to_string();
2001                        let time_val = this
2002                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2003                            .await?;
2004
2005                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2006                            anyhow!("time argument must be a datetime value or string")
2007                        })?;
2008
2009                        // Fetch temporal property values - supports both vertices and edges
2010                        let valid_from_val: Option<Value> = if let Ok(vid) =
2011                            Self::vid_from_value(&node_val)
2012                        {
2013                            // Vertex case - VID string format
2014                            prop_manager
2015                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2016                                .await
2017                                .ok()
2018                        } else if let Value::Map(map) = &node_val {
2019                            // Check for embedded _vid or _eid in object
2020                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2021                                let vid = Vid::from(vid_val);
2022                                prop_manager
2023                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2024                                    .await
2025                                    .ok()
2026                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2027                                // Edge case
2028                                let eid = uni_common::core::id::Eid::from(eid_val);
2029                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2030                            } else {
2031                                // Inline object - property embedded directly
2032                                map.get(&start_prop).cloned()
2033                            }
2034                        } else {
2035                            return Ok(Value::Bool(false));
2036                        };
2037
2038                        let valid_from = match valid_from_val {
2039                            Some(ref v) => match value_to_datetime_utc(v) {
2040                                Some(dt) => dt,
2041                                None if v.is_null() => return Ok(Value::Bool(false)),
2042                                None => {
2043                                    return Err(anyhow!(
2044                                        "Property {} must be a datetime value or string",
2045                                        start_prop
2046                                    ));
2047                                }
2048                            },
2049                            None => return Ok(Value::Bool(false)),
2050                        };
2051
2052                        let valid_to_val: Option<Value> = if let Ok(vid) =
2053                            Self::vid_from_value(&node_val)
2054                        {
2055                            // Vertex case - VID string format
2056                            prop_manager
2057                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2058                                .await
2059                                .ok()
2060                        } else if let Value::Map(map) = &node_val {
2061                            // Check for embedded _vid or _eid in object
2062                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2063                                let vid = Vid::from(vid_val);
2064                                prop_manager
2065                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2066                                    .await
2067                                    .ok()
2068                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2069                                // Edge case
2070                                let eid = uni_common::core::id::Eid::from(eid_val);
2071                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2072                            } else {
2073                                // Inline object - property embedded directly
2074                                map.get(&end_prop).cloned()
2075                            }
2076                        } else {
2077                            return Ok(Value::Bool(false));
2078                        };
2079
2080                        let valid_to = match valid_to_val {
2081                            Some(ref v) => match value_to_datetime_utc(v) {
2082                                Some(dt) => Some(dt),
2083                                None if v.is_null() => None,
2084                                None => {
2085                                    return Err(anyhow!(
2086                                        "Property {} must be a datetime value or null",
2087                                        end_prop
2088                                    ));
2089                                }
2090                            },
2091                            None => None,
2092                        };
2093
2094                        let is_valid = valid_from <= query_time
2095                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2096                        return Ok(Value::Bool(is_valid));
2097                    }
2098
2099                    // For all other functions, evaluate arguments then call helper
2100                    let mut evaluated_args = Vec::with_capacity(args.len());
2101                    for arg in args {
2102                        let mut val = this
2103                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2104                            .await?;
2105
2106                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2107                        // Functions like validAt() need access to properties like valid_from/valid_to.
2108                        if let Value::Map(ref mut map) = val {
2109                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2110                        }
2111
2112                        evaluated_args.push(val);
2113                    }
2114                    eval_scalar_function(
2115                        name,
2116                        &evaluated_args,
2117                        self.custom_function_registry.as_deref(),
2118                    )
2119                }
2120                Expr::Reduce {
2121                    accumulator,
2122                    init,
2123                    variable,
2124                    list,
2125                    expr,
2126                } => {
2127                    let mut acc = self
2128                        .evaluate_expr(init, row, prop_manager, params, ctx)
2129                        .await?;
2130                    let list_val = self
2131                        .evaluate_expr(list, row, prop_manager, params, ctx)
2132                        .await?;
2133
2134                    if let Value::List(items) = list_val {
2135                        for item in items {
2136                            // Create a temporary scope/row with accumulator and variable
2137                            // For simplicity in fallback executor, we can construct a new row map
2138                            // merging current row + new variables.
2139                            let mut scope = row.clone();
2140                            scope.insert(accumulator.clone(), acc.clone());
2141                            scope.insert(variable.clone(), item);
2142
2143                            acc = self
2144                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2145                                .await?;
2146                        }
2147                    } else {
2148                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2149                    }
2150                    Ok(acc)
2151                }
2152                Expr::ValidAt { .. } => {
2153                    // VALID_AT should have been transformed to a function call in the planner
2154                    Err(anyhow!(
2155                        "VALID_AT expression should have been transformed to function call in planner"
2156                    ))
2157                }
2158
2159                Expr::LabelCheck { expr, labels } => {
2160                    let val = this
2161                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2162                        .await?;
2163                    match &val {
2164                        Value::Null => Ok(Value::Null),
2165                        Value::Map(map) => {
2166                            // Check if this is an edge (has _eid) or node (has _vid)
2167                            let is_edge = map.contains_key("_eid")
2168                                || map.contains_key("_type_name")
2169                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2170
2171                            if is_edge {
2172                                // Edges have a single type
2173                                if labels.len() > 1 {
2174                                    return Ok(Value::Bool(false));
2175                                }
2176                                let label_to_check = &labels[0];
2177                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2178                                {
2179                                    t == label_to_check
2180                                } else if let Some(Value::String(t)) = map.get("_type") {
2181                                    t == label_to_check
2182                                } else {
2183                                    false
2184                                };
2185                                Ok(Value::Bool(has_type))
2186                            } else {
2187                                // Node: check all labels
2188                                let has_all = labels.iter().all(|label_to_check| {
2189                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2190                                        labels_arr
2191                                            .iter()
2192                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2193                                    } else {
2194                                        false
2195                                    }
2196                                });
2197                                Ok(Value::Bool(has_all))
2198                            }
2199                        }
2200                        _ => Ok(Value::Bool(false)),
2201                    }
2202                }
2203
2204                Expr::MapProjection { base, items } => {
2205                    let base_value = this
2206                        .evaluate_expr(base, row, prop_manager, params, ctx)
2207                        .await?;
2208
2209                    // Extract properties from the base object
2210                    let properties = match &base_value {
2211                        Value::Map(map) => map,
2212                        _ => {
2213                            return Err(anyhow!(
2214                                "Map projection requires object, got {:?}",
2215                                base_value
2216                            ));
2217                        }
2218                    };
2219
2220                    let mut result_map = HashMap::new();
2221
2222                    for item in items {
2223                        match item {
2224                            MapProjectionItem::Property(prop) => {
2225                                if let Some(value) = properties.get(prop.as_str()) {
2226                                    result_map.insert(prop.clone(), value.clone());
2227                                }
2228                            }
2229                            MapProjectionItem::AllProperties => {
2230                                // Include all properties except internal fields (those starting with _)
2231                                for (key, value) in properties.iter() {
2232                                    if !key.starts_with('_') {
2233                                        result_map.insert(key.clone(), value.clone());
2234                                    }
2235                                }
2236                            }
2237                            MapProjectionItem::LiteralEntry(key, expr) => {
2238                                let value = this
2239                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2240                                    .await?;
2241                                result_map.insert(key.clone(), value);
2242                            }
2243                            MapProjectionItem::Variable(var_name) => {
2244                                // Variable selector: include the value of the variable in the result
2245                                // e.g., person{.name, friend} includes the value of 'friend' variable
2246                                if let Some(value) = row.get(var_name.as_str()) {
2247                                    result_map.insert(var_name.clone(), value.clone());
2248                                }
2249                            }
2250                        }
2251                    }
2252
2253                    Ok(Value::Map(result_map))
2254                }
2255            }
2256        })
2257    }
2258
2259    pub(crate) fn execute_subplan<'a>(
2260        &'a self,
2261        plan: LogicalPlan,
2262        prop_manager: &'a PropertyManager,
2263        params: &'a HashMap<String, Value>,
2264        ctx: Option<&'a QueryContext>,
2265    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2266        Box::pin(async move {
2267            if let Some(ctx) = ctx {
2268                ctx.check_timeout()?;
2269            }
2270            match plan {
2271                LogicalPlan::Union { left, right, all } => {
2272                    self.execute_union(left, right, all, prop_manager, params, ctx)
2273                        .await
2274                }
2275                LogicalPlan::CreateVectorIndex {
2276                    config,
2277                    if_not_exists,
2278                } => {
2279                    if if_not_exists && self.index_exists_by_name(&config.name) {
2280                        return Ok(vec![]);
2281                    }
2282                    let idx_mgr = self.storage.index_manager();
2283                    idx_mgr.create_vector_index(config).await?;
2284                    Ok(vec![])
2285                }
2286                LogicalPlan::CreateFullTextIndex {
2287                    config,
2288                    if_not_exists,
2289                } => {
2290                    if if_not_exists && self.index_exists_by_name(&config.name) {
2291                        return Ok(vec![]);
2292                    }
2293                    let idx_mgr = self.storage.index_manager();
2294                    idx_mgr.create_fts_index(config).await?;
2295                    Ok(vec![])
2296                }
2297                LogicalPlan::CreateScalarIndex {
2298                    mut config,
2299                    if_not_exists,
2300                } => {
2301                    if if_not_exists && self.index_exists_by_name(&config.name) {
2302                        return Ok(vec![]);
2303                    }
2304
2305                    // Check for expression indexes - create generated columns
2306                    let mut modified_properties = Vec::new();
2307
2308                    for prop in &config.properties {
2309                        // Heuristic: if contains '(' and ')', it's an expression
2310                        if prop.contains('(') && prop.contains(')') {
2311                            let gen_col = SchemaManager::generated_column_name(prop);
2312
2313                            // Add generated property to schema
2314                            let sm = self.storage.schema_manager_arc();
2315                            if let Err(e) = sm.add_generated_property(
2316                                &config.label,
2317                                &gen_col,
2318                                DataType::String, // Default type for expressions
2319                                prop.clone(),
2320                            ) {
2321                                log::warn!("Failed to add generated property (might exist): {}", e);
2322                            }
2323
2324                            modified_properties.push(gen_col);
2325                        } else {
2326                            // Simple property - use as-is
2327                            modified_properties.push(prop.clone());
2328                        }
2329                    }
2330
2331                    config.properties = modified_properties;
2332
2333                    let idx_mgr = self.storage.index_manager();
2334                    idx_mgr.create_scalar_index(config).await?;
2335                    Ok(vec![])
2336                }
2337                LogicalPlan::CreateJsonFtsIndex {
2338                    config,
2339                    if_not_exists,
2340                } => {
2341                    if if_not_exists && self.index_exists_by_name(&config.name) {
2342                        return Ok(vec![]);
2343                    }
2344                    let idx_mgr = self.storage.index_manager();
2345                    idx_mgr.create_json_fts_index(config).await?;
2346                    Ok(vec![])
2347                }
2348                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2349                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2350                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2351                LogicalPlan::Vacuum => {
2352                    self.execute_vacuum().await?;
2353                    Ok(vec![])
2354                }
2355                LogicalPlan::Checkpoint => {
2356                    self.execute_checkpoint().await?;
2357                    Ok(vec![])
2358                }
2359                LogicalPlan::CopyTo {
2360                    label,
2361                    path,
2362                    format,
2363                    options,
2364                } => {
2365                    let count = self
2366                        .execute_copy_to(&label, &path, &format, &options)
2367                        .await?;
2368                    let mut result = HashMap::new();
2369                    result.insert("count".to_string(), Value::Int(count as i64));
2370                    Ok(vec![result])
2371                }
2372                LogicalPlan::CopyFrom {
2373                    label,
2374                    path,
2375                    format,
2376                    options,
2377                } => {
2378                    let count = self
2379                        .execute_copy_from(&label, &path, &format, &options)
2380                        .await?;
2381                    let mut result = HashMap::new();
2382                    result.insert("count".to_string(), Value::Int(count as i64));
2383                    Ok(vec![result])
2384                }
2385                LogicalPlan::CreateLabel(clause) => {
2386                    self.execute_create_label(clause).await?;
2387                    Ok(vec![])
2388                }
2389                LogicalPlan::CreateEdgeType(clause) => {
2390                    self.execute_create_edge_type(clause).await?;
2391                    Ok(vec![])
2392                }
2393                LogicalPlan::AlterLabel(clause) => {
2394                    self.execute_alter_label(clause).await?;
2395                    Ok(vec![])
2396                }
2397                LogicalPlan::AlterEdgeType(clause) => {
2398                    self.execute_alter_edge_type(clause).await?;
2399                    Ok(vec![])
2400                }
2401                LogicalPlan::DropLabel(clause) => {
2402                    self.execute_drop_label(clause).await?;
2403                    Ok(vec![])
2404                }
2405                LogicalPlan::DropEdgeType(clause) => {
2406                    self.execute_drop_edge_type(clause).await?;
2407                    Ok(vec![])
2408                }
2409                LogicalPlan::CreateConstraint(clause) => {
2410                    self.execute_create_constraint(clause).await?;
2411                    Ok(vec![])
2412                }
2413                LogicalPlan::DropConstraint(clause) => {
2414                    self.execute_drop_constraint(clause).await?;
2415                    Ok(vec![])
2416                }
2417                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2418                LogicalPlan::DropIndex { name, if_exists } => {
2419                    let idx_mgr = self.storage.index_manager();
2420                    match idx_mgr.drop_index(&name).await {
2421                        Ok(_) => Ok(vec![]),
2422                        Err(e) => {
2423                            if if_exists && e.to_string().contains("not found") {
2424                                Ok(vec![])
2425                            } else {
2426                                Err(e)
2427                            }
2428                        }
2429                    }
2430                }
2431                LogicalPlan::ShowIndexes { filter } => {
2432                    Ok(self.execute_show_indexes(filter.as_deref()))
2433                }
2434                // Scan/traverse nodes: delegate to DataFusion for data access,
2435                // then convert results to HashMaps for the fallback executor.
2436                LogicalPlan::Scan { .. }
2437                | LogicalPlan::ExtIdLookup { .. }
2438                | LogicalPlan::ScanAll { .. }
2439                | LogicalPlan::ScanMainByLabels { .. }
2440                | LogicalPlan::Traverse { .. }
2441                | LogicalPlan::TraverseMainByType { .. } => {
2442                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2443                    self.record_batches_to_rows(batches)
2444                }
2445                LogicalPlan::Filter {
2446                    input,
2447                    predicate,
2448                    optional_variables,
2449                } => {
2450                    let input_matches = self
2451                        .execute_subplan(*input, prop_manager, params, ctx)
2452                        .await?;
2453
2454                    tracing::debug!(
2455                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2456                        predicate,
2457                        input_matches.len(),
2458                        optional_variables
2459                    );
2460
2461                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2462                    // Group rows by non-optional variables, apply filter, and ensure
2463                    // at least one row per group (with NULLs if filter removes all).
2464                    if !optional_variables.is_empty() {
2465                        // Helper to check if a key belongs to an optional variable.
2466                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2467                        let is_optional_key = |k: &str| -> bool {
2468                            optional_variables.contains(k)
2469                                || optional_variables
2470                                    .iter()
2471                                    .any(|var| k.starts_with(&format!("{}.", var)))
2472                        };
2473
2474                        // Helper to check if a key is internal (should not affect grouping)
2475                        let is_internal_key =
2476                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2477
2478                        // Compute the key (non-optional, non-internal variables) for grouping
2479                        let non_optional_vars: Vec<String> = input_matches
2480                            .first()
2481                            .map(|row| {
2482                                row.keys()
2483                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2484                                    .cloned()
2485                                    .collect()
2486                            })
2487                            .unwrap_or_default();
2488
2489                        // Group rows by their non-optional variable values
2490                        let mut groups: std::collections::HashMap<
2491                            Vec<u8>,
2492                            Vec<HashMap<String, Value>>,
2493                        > = std::collections::HashMap::new();
2494
2495                        for row in &input_matches {
2496                            // Create a key from non-optional variable values
2497                            let key: Vec<u8> = non_optional_vars
2498                                .iter()
2499                                .map(|var| {
2500                                    row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2501                                })
2502                                .collect::<Vec<_>>()
2503                                .join("|")
2504                                .into_bytes();
2505
2506                            groups.entry(key).or_default().push(row.clone());
2507                        }
2508
2509                        let mut filtered = Vec::new();
2510                        for (_key, group_rows) in groups {
2511                            let mut group_passed = Vec::new();
2512
2513                            for row in &group_rows {
2514                                // If optional variables are already NULL, preserve the row
2515                                let has_null_optional = optional_variables.iter().any(|var| {
2516                                    // Check both "var" and "var._vid" style keys
2517                                    let direct_null =
2518                                        matches!(row.get(var), Some(Value::Null) | None);
2519                                    let prefixed_null = row
2520                                        .keys()
2521                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2522                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2523                                    direct_null || prefixed_null
2524                                });
2525
2526                                if has_null_optional {
2527                                    group_passed.push(row.clone());
2528                                    continue;
2529                                }
2530
2531                                let res = self
2532                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2533                                    .await?;
2534
2535                                if res.as_bool().unwrap_or(false) {
2536                                    group_passed.push(row.clone());
2537                                }
2538                            }
2539
2540                            if group_passed.is_empty() {
2541                                // No rows passed - emit one row with NULLs for optional variables
2542                                // Use the first row's non-optional values as a template
2543                                if let Some(template) = group_rows.first() {
2544                                    let mut null_row = HashMap::new();
2545                                    for (k, v) in template {
2546                                        if is_optional_key(k) {
2547                                            null_row.insert(k.clone(), Value::Null);
2548                                        } else {
2549                                            null_row.insert(k.clone(), v.clone());
2550                                        }
2551                                    }
2552                                    filtered.push(null_row);
2553                                }
2554                            } else {
2555                                filtered.extend(group_passed);
2556                            }
2557                        }
2558
2559                        tracing::debug!(
2560                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2561                            input_matches.len(),
2562                            filtered.len()
2563                        );
2564
2565                        return Ok(filtered);
2566                    }
2567
2568                    // Standard filter for non-OPTIONAL MATCH
2569                    let mut filtered = Vec::new();
2570                    for row in input_matches.iter() {
2571                        let res = self
2572                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2573                            .await?;
2574
2575                        let passes = res.as_bool().unwrap_or(false);
2576
2577                        if passes {
2578                            filtered.push(row.clone());
2579                        }
2580                    }
2581
2582                    tracing::debug!(
2583                        "Filter: {} input rows -> {} output rows",
2584                        input_matches.len(),
2585                        filtered.len()
2586                    );
2587
2588                    Ok(filtered)
2589                }
2590                LogicalPlan::ProcedureCall {
2591                    procedure_name,
2592                    arguments,
2593                    yield_items,
2594                } => {
2595                    let yield_names: Vec<String> =
2596                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2597                    let results = self
2598                        .execute_procedure(
2599                            &procedure_name,
2600                            &arguments,
2601                            &yield_names,
2602                            prop_manager,
2603                            params,
2604                            ctx,
2605                        )
2606                        .await?;
2607
2608                    // Handle aliasing: collect all original values first, then
2609                    // build the aliased row in one pass. This avoids issues when
2610                    // an alias matches another yield item's original name (e.g.,
2611                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2612                    // clobber the original "b" before it is renamed to "d").
2613                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2614                    if !has_aliases {
2615                        // No aliases (includes YIELD * which produces empty yield_items) —
2616                        // pass through the procedure output rows unchanged.
2617                        Ok(results)
2618                    } else {
2619                        let mut aliased_results = Vec::with_capacity(results.len());
2620                        for row in results {
2621                            let mut new_row = HashMap::new();
2622                            for (name, alias) in &yield_items {
2623                                let col_name = alias.as_ref().unwrap_or(name);
2624                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2625                                new_row.insert(col_name.clone(), val);
2626                            }
2627                            aliased_results.push(new_row);
2628                        }
2629                        Ok(aliased_results)
2630                    }
2631                }
2632                LogicalPlan::VectorKnn { .. } => {
2633                    unreachable!("VectorKnn is handled by DataFusion engine")
2634                }
2635                LogicalPlan::InvertedIndexLookup { .. } => {
2636                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2637                }
2638                LogicalPlan::Sort { input, order_by } => {
2639                    let rows = self
2640                        .execute_subplan(*input, prop_manager, params, ctx)
2641                        .await?;
2642                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2643                        .await
2644                }
2645                LogicalPlan::Limit { input, skip, fetch } => {
2646                    let rows = self
2647                        .execute_subplan(*input, prop_manager, params, ctx)
2648                        .await?;
2649                    let skip = skip.unwrap_or(0);
2650                    let take = fetch.unwrap_or(usize::MAX);
2651                    Ok(rows.into_iter().skip(skip).take(take).collect())
2652                }
2653                LogicalPlan::Aggregate {
2654                    input,
2655                    group_by,
2656                    aggregates,
2657                } => {
2658                    let rows = self
2659                        .execute_subplan(*input, prop_manager, params, ctx)
2660                        .await?;
2661                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2662                        .await
2663                }
2664                LogicalPlan::Window {
2665                    input,
2666                    window_exprs,
2667                } => {
2668                    let rows = self
2669                        .execute_subplan(*input, prop_manager, params, ctx)
2670                        .await?;
2671                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2672                        .await
2673                }
2674                LogicalPlan::Project { input, projections } => {
2675                    let matches = self
2676                        .execute_subplan(*input, prop_manager, params, ctx)
2677                        .await?;
2678                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2679                        .await
2680                }
2681                LogicalPlan::Distinct { input } => {
2682                    let rows = self
2683                        .execute_subplan(*input, prop_manager, params, ctx)
2684                        .await?;
2685                    let mut seen = std::collections::HashSet::new();
2686                    let mut result = Vec::new();
2687                    for row in rows {
2688                        let key = Self::canonical_row_key(&row);
2689                        if seen.insert(key) {
2690                            result.push(row);
2691                        }
2692                    }
2693                    Ok(result)
2694                }
2695                LogicalPlan::Unwind {
2696                    input,
2697                    expr,
2698                    variable,
2699                } => {
2700                    let input_rows = self
2701                        .execute_subplan(*input, prop_manager, params, ctx)
2702                        .await?;
2703                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2704                        .await
2705                }
2706                LogicalPlan::Apply {
2707                    input,
2708                    subquery,
2709                    input_filter,
2710                } => {
2711                    let input_rows = self
2712                        .execute_subplan(*input, prop_manager, params, ctx)
2713                        .await?;
2714                    self.execute_apply(
2715                        input_rows,
2716                        &subquery,
2717                        input_filter.as_ref(),
2718                        prop_manager,
2719                        params,
2720                        ctx,
2721                    )
2722                    .await
2723                }
2724                LogicalPlan::SubqueryCall { input, subquery } => {
2725                    let input_rows = self
2726                        .execute_subplan(*input, prop_manager, params, ctx)
2727                        .await?;
2728                    // Execute subquery for each input row (correlated)
2729                    // No input_filter for CALL { }
2730                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2731                        .await
2732                }
2733                LogicalPlan::RecursiveCTE {
2734                    cte_name,
2735                    initial,
2736                    recursive,
2737                } => {
2738                    self.execute_recursive_cte(
2739                        &cte_name,
2740                        *initial,
2741                        *recursive,
2742                        prop_manager,
2743                        params,
2744                        ctx,
2745                    )
2746                    .await
2747                }
2748                LogicalPlan::CrossJoin { left, right } => {
2749                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2750                        .await
2751                }
2752                LogicalPlan::Set { .. }
2753                | LogicalPlan::Remove { .. }
2754                | LogicalPlan::Merge { .. }
2755                | LogicalPlan::Create { .. }
2756                | LogicalPlan::CreateBatch { .. } => {
2757                    unreachable!("mutations are handled by DataFusion engine")
2758                }
2759                LogicalPlan::Delete { .. } => {
2760                    unreachable!("mutations are handled by DataFusion engine")
2761                }
2762                LogicalPlan::Copy {
2763                    target,
2764                    source,
2765                    is_export,
2766                    options,
2767                } => {
2768                    if is_export {
2769                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2770                            .await
2771                    } else {
2772                        self.execute_copy(&target, &source, &options, prop_manager)
2773                            .await
2774                    }
2775                }
2776                LogicalPlan::Backup {
2777                    destination,
2778                    options,
2779                } => self.execute_backup(&destination, &options).await,
2780                LogicalPlan::Explain { plan } => {
2781                    let plan_str = format!("{:#?}", plan);
2782                    let mut row = HashMap::new();
2783                    row.insert("plan".to_string(), Value::String(plan_str));
2784                    Ok(vec![row])
2785                }
2786                LogicalPlan::ShortestPath { .. } => {
2787                    unreachable!("ShortestPath is handled by DataFusion engine")
2788                }
2789                LogicalPlan::AllShortestPaths { .. } => {
2790                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2791                }
2792                LogicalPlan::Foreach { .. } => {
2793                    unreachable!("mutations are handled by DataFusion engine")
2794                }
2795                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2796                LogicalPlan::BindZeroLengthPath { .. } => {
2797                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2798                }
2799                LogicalPlan::BindPath { .. } => {
2800                    unreachable!("BindPath is handled by DataFusion engine")
2801                }
2802                LogicalPlan::QuantifiedPattern { .. } => {
2803                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
2804                }
2805                LogicalPlan::LocyProgram { .. }
2806                | LogicalPlan::LocyFold { .. }
2807                | LogicalPlan::LocyBestBy { .. }
2808                | LogicalPlan::LocyPriority { .. }
2809                | LogicalPlan::LocyDerivedScan { .. }
2810                | LogicalPlan::LocyProject { .. } => {
2811                    unreachable!("Locy operators are handled by DataFusion engine")
2812                }
2813            }
2814        })
2815    }
2816
2817    /// Execute a single plan from a FOREACH body with the given scope.
2818    ///
2819    /// Used by the DataFusion ForeachExec operator to delegate body clause
2820    /// execution back to the executor.
2821    #[expect(clippy::too_many_arguments)]
2822    pub(crate) async fn execute_foreach_body_plan(
2823        &self,
2824        plan: LogicalPlan,
2825        scope: &mut HashMap<String, Value>,
2826        writer: &mut uni_store::runtime::writer::Writer,
2827        prop_manager: &PropertyManager,
2828        params: &HashMap<String, Value>,
2829        ctx: Option<&QueryContext>,
2830        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2831    ) -> Result<()> {
2832        match plan {
2833            LogicalPlan::Set { items, .. } => {
2834                self.execute_set_items_locked(
2835                    &items,
2836                    scope,
2837                    writer,
2838                    prop_manager,
2839                    params,
2840                    ctx,
2841                    tx_l0,
2842                )
2843                .await?;
2844            }
2845            LogicalPlan::Remove { items, .. } => {
2846                self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx, tx_l0)
2847                    .await?;
2848            }
2849            LogicalPlan::Delete { items, detach, .. } => {
2850                for expr in &items {
2851                    let val = self
2852                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
2853                        .await?;
2854                    self.execute_delete_item_locked(&val, detach, writer, tx_l0)
2855                        .await?;
2856                }
2857            }
2858            LogicalPlan::Create { pattern, .. } => {
2859                self.execute_create_pattern(
2860                    &pattern,
2861                    scope,
2862                    writer,
2863                    prop_manager,
2864                    params,
2865                    ctx,
2866                    tx_l0,
2867                )
2868                .await?;
2869            }
2870            LogicalPlan::CreateBatch { patterns, .. } => {
2871                for pattern in &patterns {
2872                    self.execute_create_pattern(
2873                        pattern,
2874                        scope,
2875                        writer,
2876                        prop_manager,
2877                        params,
2878                        ctx,
2879                        tx_l0,
2880                    )
2881                    .await?;
2882                }
2883            }
2884            LogicalPlan::Merge {
2885                pattern,
2886                on_match: _,
2887                on_create,
2888                ..
2889            } => {
2890                self.execute_create_pattern(
2891                    &pattern,
2892                    scope,
2893                    writer,
2894                    prop_manager,
2895                    params,
2896                    ctx,
2897                    tx_l0,
2898                )
2899                .await?;
2900                if let Some(on_create_clause) = on_create {
2901                    self.execute_set_items_locked(
2902                        &on_create_clause.items,
2903                        scope,
2904                        writer,
2905                        prop_manager,
2906                        params,
2907                        ctx,
2908                        tx_l0,
2909                    )
2910                    .await?;
2911                }
2912            }
2913            LogicalPlan::Foreach {
2914                variable,
2915                list,
2916                body,
2917                ..
2918            } => {
2919                let list_val = self
2920                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
2921                    .await?;
2922                let items = match list_val {
2923                    Value::List(arr) => arr,
2924                    Value::Null => return Ok(()),
2925                    _ => return Err(anyhow!("FOREACH requires a list")),
2926                };
2927                for item in items {
2928                    let mut nested_scope = scope.clone();
2929                    nested_scope.insert(variable.clone(), item);
2930                    for nested_plan in &body {
2931                        Box::pin(self.execute_foreach_body_plan(
2932                            nested_plan.clone(),
2933                            &mut nested_scope,
2934                            writer,
2935                            prop_manager,
2936                            params,
2937                            ctx,
2938                            tx_l0,
2939                        ))
2940                        .await?;
2941                    }
2942                }
2943            }
2944            _ => {
2945                return Err(anyhow!(
2946                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2947                ));
2948            }
2949        }
2950        Ok(())
2951    }
2952
2953    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2954        let mut pairs: Vec<_> = row.iter().collect();
2955        pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2956
2957        pairs
2958            .into_iter()
2959            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2960            .collect::<Vec<_>>()
2961            .join("|")
2962    }
2963
2964    fn canonical_value_key(v: &Value) -> String {
2965        match v {
2966            Value::Null => "null".to_string(),
2967            Value::Bool(b) => format!("b:{b}"),
2968            Value::Int(i) => format!("n:{i}"),
2969            Value::Float(f) => {
2970                if f.is_nan() {
2971                    "nan".to_string()
2972                } else if f.is_infinite() {
2973                    if f.is_sign_positive() {
2974                        "inf:+".to_string()
2975                    } else {
2976                        "inf:-".to_string()
2977                    }
2978                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
2979                    format!("n:{}", *f as i64)
2980                } else {
2981                    format!("f:{f}")
2982                }
2983            }
2984            Value::String(s) => {
2985                if let Some(k) = Self::temporal_string_key(s) {
2986                    format!("temporal:{k}")
2987                } else {
2988                    format!("s:{s}")
2989                }
2990            }
2991            Value::Bytes(b) => format!("bytes:{:?}", b),
2992            Value::List(items) => format!(
2993                "list:[{}]",
2994                items
2995                    .iter()
2996                    .map(Self::canonical_value_key)
2997                    .collect::<Vec<_>>()
2998                    .join(",")
2999            ),
3000            Value::Map(map) => {
3001                let mut pairs: Vec<_> = map.iter().collect();
3002                pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3003                format!(
3004                    "map:{{{}}}",
3005                    pairs
3006                        .into_iter()
3007                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3008                        .collect::<Vec<_>>()
3009                        .join(",")
3010                )
3011            }
3012            Value::Node(n) => {
3013                let mut labels = n.labels.clone();
3014                labels.sort();
3015                format!(
3016                    "node:{}:{}:{}",
3017                    n.vid.as_u64(),
3018                    labels.join(":"),
3019                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3020                )
3021            }
3022            Value::Edge(e) => format!(
3023                "edge:{}:{}:{}:{}:{}",
3024                e.eid.as_u64(),
3025                e.edge_type,
3026                e.src.as_u64(),
3027                e.dst.as_u64(),
3028                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3029            ),
3030            Value::Path(p) => format!(
3031                "path:nodes=[{}];edges=[{}]",
3032                p.nodes
3033                    .iter()
3034                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3035                    .collect::<Vec<_>>()
3036                    .join(","),
3037                p.edges
3038                    .iter()
3039                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3040                    .collect::<Vec<_>>()
3041                    .join(",")
3042            ),
3043            Value::Vector(vs) => format!("vec:{:?}", vs),
3044            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3045            _ => format!("{v:?}"),
3046        }
3047    }
3048
3049    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3050        match t {
3051            uni_common::TemporalValue::Date { days_since_epoch } => {
3052                format!("date:{days_since_epoch}")
3053            }
3054            uni_common::TemporalValue::LocalTime {
3055                nanos_since_midnight,
3056            } => format!("localtime:{nanos_since_midnight}"),
3057            uni_common::TemporalValue::Time {
3058                nanos_since_midnight,
3059                offset_seconds,
3060            } => {
3061                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3062                format!("time:{utc_nanos}")
3063            }
3064            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3065                format!("localdatetime:{nanos_since_epoch}")
3066            }
3067            uni_common::TemporalValue::DateTime {
3068                nanos_since_epoch, ..
3069            } => format!("datetime:{nanos_since_epoch}"),
3070            uni_common::TemporalValue::Duration {
3071                months,
3072                days,
3073                nanos,
3074            } => format!("duration:{months}:{days}:{nanos}"),
3075            uni_common::TemporalValue::Btic { lo, hi, meta } => {
3076                format!("btic:{lo}:{hi}:{meta}")
3077            }
3078        }
3079    }
3080
3081    fn temporal_string_key(s: &str) -> Option<String> {
3082        let fn_name = match classify_temporal(s)? {
3083            uni_common::TemporalType::Date => "DATE",
3084            uni_common::TemporalType::LocalTime => "LOCALTIME",
3085            uni_common::TemporalType::Time => "TIME",
3086            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3087            uni_common::TemporalType::DateTime => "DATETIME",
3088            uni_common::TemporalType::Duration => "DURATION",
3089            uni_common::TemporalType::Btic => return None, // BTIC uses dedicated parsing
3090        };
3091        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3092            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3093            _ => None,
3094        }
3095    }
3096
3097    /// Execute aggregate operation: GROUP BY + aggregate functions.
3098    /// Interval for timeout checks in aggregate loops.
3099    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3100
3101    pub(crate) async fn execute_aggregate(
3102        &self,
3103        rows: Vec<HashMap<String, Value>>,
3104        group_by: &[Expr],
3105        aggregates: &[Expr],
3106        prop_manager: &PropertyManager,
3107        params: &HashMap<String, Value>,
3108        ctx: Option<&QueryContext>,
3109    ) -> Result<Vec<HashMap<String, Value>>> {
3110        // CWE-400: Check timeout before aggregation
3111        if let Some(ctx) = ctx {
3112            ctx.check_timeout()?;
3113        }
3114
3115        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3116
3117        // Cypher semantics: aggregation without grouping keys returns one row even
3118        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3119        if rows.is_empty() {
3120            if group_by.is_empty() {
3121                let accs = Self::create_accumulators(aggregates);
3122                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3123                return Ok(vec![row]);
3124            }
3125            return Ok(vec![]);
3126        }
3127
3128        for (idx, row) in rows.into_iter().enumerate() {
3129            // Periodic timeout check during aggregation
3130            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3131                && let Some(ctx) = ctx
3132            {
3133                ctx.check_timeout()?;
3134            }
3135
3136            let key_vals = self
3137                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3138                .await?;
3139            // Build a canonical key so grouping follows Cypher value semantics
3140            // (e.g. temporal equality by instant, numeric normalization where applicable).
3141            let key_str = format!(
3142                "[{}]",
3143                key_vals
3144                    .iter()
3145                    .map(Self::canonical_value_key)
3146                    .collect::<Vec<_>>()
3147                    .join(",")
3148            );
3149
3150            let entry = groups
3151                .entry(key_str)
3152                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3153
3154            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3155                .await?;
3156        }
3157
3158        let results = groups
3159            .values()
3160            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3161            .collect();
3162
3163        Ok(results)
3164    }
3165
3166    pub(crate) async fn execute_window(
3167        &self,
3168        mut rows: Vec<HashMap<String, Value>>,
3169        window_exprs: &[Expr],
3170        _prop_manager: &PropertyManager,
3171        _params: &HashMap<String, Value>,
3172        ctx: Option<&QueryContext>,
3173    ) -> Result<Vec<HashMap<String, Value>>> {
3174        // CWE-400: Check timeout before window computation
3175        if let Some(ctx) = ctx {
3176            ctx.check_timeout()?;
3177        }
3178
3179        // If no rows or no window expressions, return as-is
3180        if rows.is_empty() || window_exprs.is_empty() {
3181            return Ok(rows);
3182        }
3183
3184        // Process each window function expression
3185        for window_expr in window_exprs {
3186            // Extract window function details
3187            let Expr::FunctionCall {
3188                name,
3189                args,
3190                window_spec: Some(window_spec),
3191                ..
3192            } = window_expr
3193            else {
3194                return Err(anyhow!(
3195                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3196                    window_expr
3197                ));
3198            };
3199
3200            let name_upper = name.to_uppercase();
3201
3202            // Validate it's a supported window function
3203            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3204                return Err(anyhow!(
3205                    "Unsupported window function: {}. Supported functions: {}",
3206                    name,
3207                    WINDOW_FUNCTIONS.join(", ")
3208                ));
3209            }
3210
3211            // Build partition groups based on PARTITION BY clause
3212            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3213
3214            for (row_idx, row) in rows.iter().enumerate() {
3215                // Evaluate partition key
3216                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3217                    // No partitioning - all rows in one partition
3218                    vec![]
3219                } else {
3220                    window_spec
3221                        .partition_by
3222                        .iter()
3223                        .map(|expr| self.evaluate_simple_expr(expr, row))
3224                        .collect::<Result<Vec<_>>>()?
3225                };
3226
3227                partition_map
3228                    .entry(partition_key)
3229                    .or_default()
3230                    .push(row_idx);
3231            }
3232
3233            // Process each partition
3234            for (_partition_key, row_indices) in partition_map.iter_mut() {
3235                // Sort rows within partition by ORDER BY clause
3236                if !window_spec.order_by.is_empty() {
3237                    row_indices.sort_by(|&a, &b| {
3238                        for sort_item in &window_spec.order_by {
3239                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3240                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3241
3242                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3243                                let cmp = Executor::compare_values(&va, &vb);
3244                                let cmp = if sort_item.ascending {
3245                                    cmp
3246                                } else {
3247                                    cmp.reverse()
3248                                };
3249                                if cmp != std::cmp::Ordering::Equal {
3250                                    return cmp;
3251                                }
3252                            }
3253                        }
3254                        std::cmp::Ordering::Equal
3255                    });
3256                }
3257
3258                // Compute window function values for this partition
3259                for (position, &row_idx) in row_indices.iter().enumerate() {
3260                    let window_value = match name_upper.as_str() {
3261                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3262                        "RANK" => {
3263                            // RANK: position (1-indexed) of first row in group of tied rows
3264                            let rank = if position == 0 {
3265                                1i64
3266                            } else {
3267                                let prev_row_idx = row_indices[position - 1];
3268                                let same_as_prev = self.rows_have_same_sort_keys(
3269                                    &window_spec.order_by,
3270                                    &rows,
3271                                    row_idx,
3272                                    prev_row_idx,
3273                                );
3274
3275                                if same_as_prev {
3276                                    // Walk backwards to find where this group started
3277                                    let mut group_start = position - 1;
3278                                    while group_start > 0 {
3279                                        let curr_idx = row_indices[group_start];
3280                                        let prev_idx = row_indices[group_start - 1];
3281                                        if !self.rows_have_same_sort_keys(
3282                                            &window_spec.order_by,
3283                                            &rows,
3284                                            curr_idx,
3285                                            prev_idx,
3286                                        ) {
3287                                            break;
3288                                        }
3289                                        group_start -= 1;
3290                                    }
3291                                    (group_start + 1) as i64
3292                                } else {
3293                                    (position + 1) as i64
3294                                }
3295                            };
3296                            Value::from(rank)
3297                        }
3298                        "DENSE_RANK" => {
3299                            // Dense rank: continuous ranking without gaps
3300                            let mut dense_rank = 1i64;
3301                            for i in 0..position {
3302                                let curr_idx = row_indices[i + 1];
3303                                let prev_idx = row_indices[i];
3304                                if !self.rows_have_same_sort_keys(
3305                                    &window_spec.order_by,
3306                                    &rows,
3307                                    curr_idx,
3308                                    prev_idx,
3309                                ) {
3310                                    dense_rank += 1;
3311                                }
3312                            }
3313                            Value::from(dense_rank)
3314                        }
3315                        "LAG" => {
3316                            let (value_expr, offset, default_value) =
3317                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3318
3319                            if position >= offset {
3320                                let target_idx = row_indices[position - offset];
3321                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3322                            } else {
3323                                default_value
3324                            }
3325                        }
3326                        "LEAD" => {
3327                            let (value_expr, offset, default_value) =
3328                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3329
3330                            if position + offset < row_indices.len() {
3331                                let target_idx = row_indices[position + offset];
3332                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3333                            } else {
3334                                default_value
3335                            }
3336                        }
3337                        "NTILE" => {
3338                            // Extract num_buckets argument: NTILE(num_buckets)
3339                            let num_buckets_expr = args.first().ok_or_else(|| {
3340                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3341                            })?;
3342                            let num_buckets_val =
3343                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3344                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3345                                anyhow!(
3346                                    "NTILE argument must be an integer, got: {:?}",
3347                                    num_buckets_val
3348                                )
3349                            })?;
3350
3351                            if num_buckets <= 0 {
3352                                return Err(anyhow!(
3353                                    "NTILE bucket count must be positive, got: {}",
3354                                    num_buckets
3355                                ));
3356                            }
3357
3358                            let num_buckets = num_buckets as usize;
3359                            let partition_size = row_indices.len();
3360
3361                            // Calculate bucket assignment using standard algorithm
3362                            // For N rows and B buckets:
3363                            // - Base size: N / B
3364                            // - Extra rows: N % B (go to first buckets)
3365                            let base_size = partition_size / num_buckets;
3366                            let extra_rows = partition_size % num_buckets;
3367
3368                            // Determine bucket for current row
3369                            let bucket = if position < extra_rows * (base_size + 1) {
3370                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3371                                position / (base_size + 1) + 1
3372                            } else {
3373                                // Row is in one of the normal-sized buckets
3374                                let adjusted_position = position - extra_rows * (base_size + 1);
3375                                extra_rows + (adjusted_position / base_size) + 1
3376                            };
3377
3378                            Value::from(bucket as i64)
3379                        }
3380                        "FIRST_VALUE" => {
3381                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3382                            let value_expr = args.first().ok_or_else(|| {
3383                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3384                            })?;
3385
3386                            // Get the first row in the partition (after ordering)
3387                            if row_indices.is_empty() {
3388                                Value::Null
3389                            } else {
3390                                let first_idx = row_indices[0];
3391                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3392                            }
3393                        }
3394                        "LAST_VALUE" => {
3395                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3396                            let value_expr = args.first().ok_or_else(|| {
3397                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3398                            })?;
3399
3400                            // Get the last row in the partition (after ordering)
3401                            if row_indices.is_empty() {
3402                                Value::Null
3403                            } else {
3404                                let last_idx = row_indices[row_indices.len() - 1];
3405                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3406                            }
3407                        }
3408                        "NTH_VALUE" => {
3409                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3410                            if args.len() != 2 {
3411                                return Err(anyhow!(
3412                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3413                                ));
3414                            }
3415
3416                            let value_expr = &args[0];
3417                            let n_expr = &args[1];
3418
3419                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3420                            let n = n_val.as_i64().ok_or_else(|| {
3421                                anyhow!(
3422                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3423                                    n_val
3424                                )
3425                            })?;
3426
3427                            if n <= 0 {
3428                                return Err(anyhow!(
3429                                    "NTH_VALUE position must be positive, got: {}",
3430                                    n
3431                                ));
3432                            }
3433
3434                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3435                            if nth_index < row_indices.len() {
3436                                let nth_idx = row_indices[nth_index];
3437                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3438                            } else {
3439                                Value::Null
3440                            }
3441                        }
3442                        _ => unreachable!("Window function {} already validated", name),
3443                    };
3444
3445                    // Add window function result to row
3446                    // Use the window expression's string representation as the column name
3447                    let col_name = window_expr.to_string_repr();
3448                    rows[row_idx].insert(col_name, window_value);
3449                }
3450            }
3451        }
3452
3453        Ok(rows)
3454    }
3455
3456    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3457    ///
3458    /// Uses `&self` for consistency with other evaluation methods, though it only
3459    /// recurses for property access.
3460    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3461        match expr {
3462            Expr::Variable(name) => row
3463                .get(name)
3464                .cloned()
3465                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3466            Expr::Property(base, prop) => {
3467                let base_val = self.evaluate_simple_expr(base, row)?;
3468                if let Value::Map(map) = base_val {
3469                    map.get(prop)
3470                        .cloned()
3471                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3472                } else {
3473                    Err(anyhow!("Cannot access property on non-object"))
3474                }
3475            }
3476            Expr::Literal(lit) => Ok(lit.to_value()),
3477            _ => Err(anyhow!(
3478                "Unsupported expression in window function: {:?}",
3479                expr
3480            )),
3481        }
3482    }
3483
3484    /// Check if two rows have matching sort keys for ranking functions.
3485    fn rows_have_same_sort_keys(
3486        &self,
3487        order_by: &[uni_cypher::ast::SortItem],
3488        rows: &[HashMap<String, Value>],
3489        idx_a: usize,
3490        idx_b: usize,
3491    ) -> bool {
3492        order_by.iter().all(|sort_item| {
3493            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3494            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3495            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3496        })
3497    }
3498
3499    /// Extract offset and default value for LAG/LEAD window functions.
3500    fn extract_lag_lead_params<'a>(
3501        &self,
3502        func_name: &str,
3503        args: &'a [Expr],
3504        row: &HashMap<String, Value>,
3505    ) -> Result<(&'a Expr, usize, Value)> {
3506        let value_expr = args.first().ok_or_else(|| {
3507            anyhow!(
3508                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3509                func_name,
3510                func_name
3511            )
3512        })?;
3513
3514        let offset = if let Some(offset_expr) = args.get(1) {
3515            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3516            offset_val.as_i64().ok_or_else(|| {
3517                anyhow!(
3518                    "{} offset must be an integer, got: {:?}",
3519                    func_name,
3520                    offset_val
3521                )
3522            })? as usize
3523        } else {
3524            1
3525        };
3526
3527        let default_value = if let Some(default_expr) = args.get(2) {
3528            self.evaluate_simple_expr(default_expr, row)?
3529        } else {
3530            Value::Null
3531        };
3532
3533        Ok((value_expr, offset, default_value))
3534    }
3535
3536    /// Evaluate group-by key expressions for a row.
3537    pub(crate) async fn evaluate_group_keys(
3538        &self,
3539        group_by: &[Expr],
3540        row: &HashMap<String, Value>,
3541        prop_manager: &PropertyManager,
3542        params: &HashMap<String, Value>,
3543        ctx: Option<&QueryContext>,
3544    ) -> Result<Vec<Value>> {
3545        let mut key_vals = Vec::new();
3546        for expr in group_by {
3547            key_vals.push(
3548                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3549                    .await?,
3550            );
3551        }
3552        Ok(key_vals)
3553    }
3554
3555    /// Update accumulators with values from the current row.
3556    pub(crate) async fn update_accumulators(
3557        &self,
3558        accs: &mut [Accumulator],
3559        aggregates: &[Expr],
3560        row: &HashMap<String, Value>,
3561        prop_manager: &PropertyManager,
3562        params: &HashMap<String, Value>,
3563        ctx: Option<&QueryContext>,
3564    ) -> Result<()> {
3565        for (i, agg_expr) in aggregates.iter().enumerate() {
3566            if let Expr::FunctionCall { args, .. } = agg_expr {
3567                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3568                let val = if is_wildcard {
3569                    Value::Null
3570                } else {
3571                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3572                        .await?
3573                };
3574                accs[i].update(&val, is_wildcard);
3575            }
3576        }
3577        Ok(())
3578    }
3579
3580    /// Execute sort operation with ORDER BY clauses.
3581    pub(crate) async fn execute_recursive_cte(
3582        &self,
3583        cte_name: &str,
3584        initial: LogicalPlan,
3585        recursive: LogicalPlan,
3586        prop_manager: &PropertyManager,
3587        params: &HashMap<String, Value>,
3588        ctx: Option<&QueryContext>,
3589    ) -> Result<Vec<HashMap<String, Value>>> {
3590        use std::collections::HashSet;
3591
3592        // Helper to create a stable key for cycle detection.
3593        // Uses sorted keys to ensure consistent ordering.
3594        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3595            let mut pairs: Vec<_> = row.iter().collect();
3596            pairs.sort_by(|a, b| a.0.cmp(b.0));
3597            format!("{pairs:?}")
3598        }
3599
3600        // 1. Execute Anchor
3601        let mut working_table = self
3602            .execute_subplan(initial, prop_manager, params, ctx)
3603            .await?;
3604        let mut result_table = working_table.clone();
3605
3606        // Track seen rows for cycle detection
3607        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3608
3609        // 2. Loop
3610        // Safety: Max iterations to prevent infinite loop
3611        let max_iterations = self.config.max_recursive_cte_iterations;
3612        for _iteration in 0..max_iterations {
3613            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3614            if let Some(ctx) = ctx {
3615                ctx.check_timeout()?;
3616            }
3617
3618            if working_table.is_empty() {
3619                break;
3620            }
3621
3622            // Bind working table to CTE name in params
3623            let working_val = Value::List(
3624                working_table
3625                    .iter()
3626                    .map(|row| {
3627                        if row.len() == 1 {
3628                            row.values().next().unwrap().clone()
3629                        } else {
3630                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3631                        }
3632                    })
3633                    .collect(),
3634            );
3635
3636            let mut next_params = params.clone();
3637            next_params.insert(cte_name.to_string(), working_val);
3638
3639            // Execute recursive part
3640            let next_result = self
3641                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3642                .await?;
3643
3644            if next_result.is_empty() {
3645                break;
3646            }
3647
3648            // Filter out already-seen rows (cycle detection)
3649            let new_rows: Vec<_> = next_result
3650                .into_iter()
3651                .filter(|row| {
3652                    let key = row_key(row);
3653                    seen.insert(key) // Returns false if already present
3654                })
3655                .collect();
3656
3657            if new_rows.is_empty() {
3658                // All results were cycles - terminate
3659                break;
3660            }
3661
3662            result_table.extend(new_rows.clone());
3663            working_table = new_rows;
3664        }
3665
3666        // Output accumulated results as a variable
3667        let final_list = Value::List(
3668            result_table
3669                .into_iter()
3670                .map(|row| {
3671                    // If the CTE returns a single column and we want to treat it as a list of values?
3672                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3673                    // Cypher LISTs usually contain values.
3674                    // If the row has 1 column, maybe unwrap?
3675                    // But SQL CTEs are tables.
3676                    // Let's stick to List<Map> for consistency with how we pass it in.
3677                    // UNLESS the user extracts it.
3678                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3679                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3680                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3681                    // If row has 1 col, users expect `[val, val]`.
3682                    if row.len() == 1 {
3683                        row.values().next().unwrap().clone()
3684                    } else {
3685                        Value::Map(row.into_iter().collect())
3686                    }
3687                })
3688                .collect(),
3689        );
3690
3691        let mut final_row = HashMap::new();
3692        final_row.insert(cte_name.to_string(), final_list);
3693        Ok(vec![final_row])
3694    }
3695
3696    /// Interval for timeout checks in sort loops.
3697    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3698
3699    pub(crate) async fn execute_sort(
3700        &self,
3701        rows: Vec<HashMap<String, Value>>,
3702        order_by: &[uni_cypher::ast::SortItem],
3703        prop_manager: &PropertyManager,
3704        params: &HashMap<String, Value>,
3705        ctx: Option<&QueryContext>,
3706    ) -> Result<Vec<HashMap<String, Value>>> {
3707        // CWE-400: Check timeout before potentially expensive sort
3708        if let Some(ctx) = ctx {
3709            ctx.check_timeout()?;
3710        }
3711
3712        let mut rows_with_keys = Vec::with_capacity(rows.len());
3713        for (idx, row) in rows.into_iter().enumerate() {
3714            // Periodic timeout check during key extraction
3715            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3716                && let Some(ctx) = ctx
3717            {
3718                ctx.check_timeout()?;
3719            }
3720
3721            let mut keys = Vec::new();
3722            for item in order_by {
3723                let val = row
3724                    .get(&item.expr.to_string_repr())
3725                    .cloned()
3726                    .unwrap_or(Value::Null);
3727                let val = if val.is_null() {
3728                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3729                        .await
3730                        .unwrap_or(Value::Null)
3731                } else {
3732                    val
3733                };
3734                keys.push(val);
3735            }
3736            rows_with_keys.push((row, keys));
3737        }
3738
3739        // Check timeout again before synchronous sort (can't be interrupted)
3740        if let Some(ctx) = ctx {
3741            ctx.check_timeout()?;
3742        }
3743
3744        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3745
3746        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3747    }
3748
3749    /// Create accumulators for aggregate expressions.
3750    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3751        aggregates
3752            .iter()
3753            .map(|expr| {
3754                if let Expr::FunctionCall { name, distinct, .. } = expr {
3755                    Accumulator::new(name, *distinct)
3756                } else {
3757                    Accumulator::new("COUNT", false)
3758                }
3759            })
3760            .collect()
3761    }
3762
3763    /// Build result row from group-by keys and accumulators.
3764    pub(crate) fn build_aggregate_result(
3765        group_by: &[Expr],
3766        aggregates: &[Expr],
3767        key_vals: &[Value],
3768        accs: &[Accumulator],
3769    ) -> HashMap<String, Value> {
3770        let mut res_row = HashMap::new();
3771        for (i, expr) in group_by.iter().enumerate() {
3772            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3773        }
3774        for (i, expr) in aggregates.iter().enumerate() {
3775            // Use aggregate_column_name to ensure consistency with planner
3776            let col_name = crate::query::planner::aggregate_column_name(expr);
3777            res_row.insert(col_name, accs[i].finish());
3778        }
3779        res_row
3780    }
3781
3782    /// Compare and return ordering for sort operation.
3783    pub(crate) fn compare_sort_keys(
3784        a_keys: &[Value],
3785        b_keys: &[Value],
3786        order_by: &[uni_cypher::ast::SortItem],
3787    ) -> std::cmp::Ordering {
3788        for (i, item) in order_by.iter().enumerate() {
3789            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3790            if order != std::cmp::Ordering::Equal {
3791                return if item.ascending {
3792                    order
3793                } else {
3794                    order.reverse()
3795                };
3796            }
3797        }
3798        std::cmp::Ordering::Equal
3799    }
3800
3801    /// Executes BACKUP command to local or cloud storage.
3802    ///
3803    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
3804    pub(crate) async fn execute_backup(
3805        &self,
3806        destination: &str,
3807        _options: &HashMap<String, Value>,
3808    ) -> Result<Vec<HashMap<String, Value>>> {
3809        // 1. Flush L0
3810        if let Some(writer_arc) = &self.writer {
3811            let mut writer = writer_arc.write().await;
3812            writer.flush_to_l1(None).await?;
3813        }
3814
3815        // 2. Snapshot
3816        let snapshot_manager = self.storage.snapshot_manager();
3817        let snapshot = snapshot_manager
3818            .load_latest_snapshot()
3819            .await?
3820            .ok_or_else(|| anyhow!("No snapshot found"))?;
3821
3822        // 3. Copy files - cloud or local path
3823        if is_cloud_url(destination) {
3824            self.backup_to_cloud(destination, &snapshot.snapshot_id)
3825                .await?;
3826        } else {
3827            // Validate local destination path against sandbox
3828            let validated_dest = self.validate_path(destination)?;
3829            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3830                .await?;
3831        }
3832
3833        let mut res = HashMap::new();
3834        res.insert(
3835            "status".to_string(),
3836            Value::String("Backup completed".to_string()),
3837        );
3838        res.insert(
3839            "snapshot_id".to_string(),
3840            Value::String(snapshot.snapshot_id),
3841        );
3842        Ok(vec![res])
3843    }
3844
3845    /// Backs up database to a local filesystem destination.
3846    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3847        let source_path = std::path::Path::new(self.storage.base_path());
3848
3849        if !dest_path.exists() {
3850            std::fs::create_dir_all(dest_path)?;
3851        }
3852
3853        // Recursive copy (local to local)
3854        if source_path.exists() {
3855            Self::copy_dir_all(source_path, dest_path)?;
3856        }
3857
3858        // Copy schema to destination/catalog/schema.json
3859        let schema_manager = self.storage.schema_manager();
3860        let dest_catalog = dest_path.join("catalog");
3861        if !dest_catalog.exists() {
3862            std::fs::create_dir_all(&dest_catalog)?;
3863        }
3864
3865        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3866        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3867
3868        Ok(())
3869    }
3870
3871    /// Backs up database to a cloud storage destination.
3872    ///
3873    /// Streams data from source to destination, supporting cross-cloud backups.
3874    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3875        use object_store::ObjectStore;
3876        use object_store::local::LocalFileSystem;
3877        use object_store::path::Path as ObjPath;
3878
3879        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3880        let source_path = std::path::Path::new(self.storage.base_path());
3881
3882        // Create local store for source, coerced to dyn ObjectStore
3883        let src_store: Arc<dyn ObjectStore> =
3884            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3885
3886        // Copy catalog/ directory
3887        let catalog_src = ObjPath::from("catalog");
3888        let catalog_dst = if dest_prefix.as_ref().is_empty() {
3889            ObjPath::from("catalog")
3890        } else {
3891            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3892        };
3893        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3894
3895        // Copy storage/ directory
3896        let storage_src = ObjPath::from("storage");
3897        let storage_dst = if dest_prefix.as_ref().is_empty() {
3898            ObjPath::from("storage")
3899        } else {
3900            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3901        };
3902        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3903
3904        // Ensure schema is present at canonical catalog location.
3905        let schema_manager = self.storage.schema_manager();
3906        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3907        let schema_path = if dest_prefix.as_ref().is_empty() {
3908            ObjPath::from("catalog/schema.json")
3909        } else {
3910            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3911        };
3912        dest_store
3913            .put(&schema_path, bytes::Bytes::from(schema_content).into())
3914            .await?;
3915
3916        Ok(())
3917    }
3918
3919    /// Maximum directory depth for backup operations.
3920    ///
3921    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
3922    /// excessively deep directory structures.
3923    const MAX_BACKUP_DEPTH: usize = 100;
3924
3925    /// Maximum file count for backup operations.
3926    ///
3927    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
3928    /// long-running operations from malicious or unexpectedly large directories.
3929    const MAX_BACKUP_FILES: usize = 100_000;
3930
3931    /// Recursively copies a directory with security limits.
3932    ///
3933    /// # Security
3934    ///
3935    /// - **CWE-674**: Depth limit prevents stack overflow
3936    /// - **CWE-400**: File count limit prevents resource exhaustion
3937    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
3938    pub(crate) fn copy_dir_all(
3939        src: &std::path::Path,
3940        dst: &std::path::Path,
3941    ) -> std::io::Result<()> {
3942        let mut file_count = 0usize;
3943        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3944    }
3945
3946    /// Internal implementation with depth and file count tracking.
3947    pub(crate) fn copy_dir_all_impl(
3948        src: &std::path::Path,
3949        dst: &std::path::Path,
3950        depth: usize,
3951        file_count: &mut usize,
3952    ) -> std::io::Result<()> {
3953        if depth >= Self::MAX_BACKUP_DEPTH {
3954            return Err(std::io::Error::new(
3955                std::io::ErrorKind::InvalidInput,
3956                format!(
3957                    "Maximum backup depth {} exceeded at {:?}",
3958                    Self::MAX_BACKUP_DEPTH,
3959                    src
3960                ),
3961            ));
3962        }
3963
3964        std::fs::create_dir_all(dst)?;
3965
3966        for entry in std::fs::read_dir(src)? {
3967            if *file_count >= Self::MAX_BACKUP_FILES {
3968                return Err(std::io::Error::new(
3969                    std::io::ErrorKind::InvalidInput,
3970                    format!(
3971                        "Maximum backup file count {} exceeded",
3972                        Self::MAX_BACKUP_FILES
3973                    ),
3974                ));
3975            }
3976            *file_count += 1;
3977
3978            let entry = entry?;
3979            let metadata = entry.metadata()?;
3980
3981            // Skip symlinks to prevent loops and traversal attacks
3982            if metadata.file_type().is_symlink() {
3983                // Silently skip - logging would require tracing dependency
3984                continue;
3985            }
3986
3987            let dst_path = dst.join(entry.file_name());
3988            if metadata.is_dir() {
3989                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
3990            } else {
3991                std::fs::copy(entry.path(), dst_path)?;
3992            }
3993        }
3994        Ok(())
3995    }
3996
3997    pub(crate) async fn execute_copy(
3998        &self,
3999        target: &str,
4000        source: &str,
4001        options: &HashMap<String, Value>,
4002        prop_manager: &PropertyManager,
4003    ) -> Result<Vec<HashMap<String, Value>>> {
4004        let format = options
4005            .get("format")
4006            .and_then(|v| v.as_str())
4007            .unwrap_or_else(|| {
4008                if source.ends_with(".parquet") {
4009                    "parquet"
4010                } else {
4011                    "csv"
4012                }
4013            });
4014
4015        match format.to_lowercase().as_str() {
4016            "csv" => self.execute_csv_import(target, source, options).await,
4017            "parquet" => {
4018                self.execute_parquet_import(target, source, options, prop_manager)
4019                    .await
4020            }
4021            _ => Err(anyhow!("Unsupported format: {}", format)),
4022        }
4023    }
4024
4025    pub(crate) async fn execute_csv_import(
4026        &self,
4027        target: &str,
4028        source: &str,
4029        options: &HashMap<String, Value>,
4030    ) -> Result<Vec<HashMap<String, Value>>> {
4031        // Validate source path against sandbox
4032        let validated_source = self.validate_path(source)?;
4033
4034        let writer_lock = self
4035            .writer
4036            .as_ref()
4037            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4038
4039        let schema = self.storage.schema_manager().schema();
4040
4041        // 1. Determine if target is Label or EdgeType
4042        let label_meta = schema.labels.get(target);
4043        let edge_meta = schema.edge_types.get(target);
4044
4045        if label_meta.is_none() && edge_meta.is_none() {
4046            return Err(anyhow!("Target '{}' not found in schema", target));
4047        }
4048
4049        // 2. Open CSV
4050        let delimiter_str = options
4051            .get("delimiter")
4052            .and_then(|v| v.as_str())
4053            .unwrap_or(",");
4054        let delimiter = if delimiter_str.is_empty() {
4055            b','
4056        } else {
4057            delimiter_str.as_bytes()[0]
4058        };
4059        let has_header = options
4060            .get("header")
4061            .and_then(|v| v.as_bool())
4062            .unwrap_or(true);
4063
4064        let mut rdr = csv::ReaderBuilder::new()
4065            .delimiter(delimiter)
4066            .has_headers(has_header)
4067            .from_path(&validated_source)?;
4068
4069        let headers = rdr.headers()?.clone();
4070        let mut count = 0;
4071
4072        let mut writer = writer_lock.write().await;
4073
4074        if label_meta.is_some() {
4075            let target_props = schema
4076                .properties
4077                .get(target)
4078                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4079
4080            for result in rdr.records() {
4081                let record = result?;
4082                let mut props = HashMap::new();
4083
4084                for (i, header) in headers.iter().enumerate() {
4085                    if let Some(val_str) = record.get(i)
4086                        && let Some(prop_meta) = target_props.get(header)
4087                    {
4088                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4089                        props.insert(header.to_string(), val);
4090                    }
4091                }
4092
4093                let vid = writer.next_vid().await?;
4094                writer
4095                    .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4096                    .await?;
4097                count += 1;
4098            }
4099        } else if let Some(meta) = edge_meta {
4100            let type_id = meta.id;
4101            let target_props = schema
4102                .properties
4103                .get(target)
4104                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4105
4106            // For edges, we need src and dst VIDs.
4107            // Expecting columns '_src' and '_dst' or as specified in options.
4108            let src_col = options
4109                .get("src_col")
4110                .and_then(|v| v.as_str())
4111                .unwrap_or("_src");
4112            let dst_col = options
4113                .get("dst_col")
4114                .and_then(|v| v.as_str())
4115                .unwrap_or("_dst");
4116
4117            for result in rdr.records() {
4118                let record = result?;
4119                let mut props = HashMap::new();
4120                let mut src_vid = None;
4121                let mut dst_vid = None;
4122
4123                for (i, header) in headers.iter().enumerate() {
4124                    if let Some(val_str) = record.get(i) {
4125                        if header == src_col {
4126                            src_vid =
4127                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4128                        } else if header == dst_col {
4129                            dst_vid =
4130                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4131                        } else if let Some(prop_meta) = target_props.get(header) {
4132                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4133                            props.insert(header.to_string(), val);
4134                        }
4135                    }
4136                }
4137
4138                let src =
4139                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4140                let dst = dst_vid
4141                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4142
4143                let eid = writer.next_eid(type_id).await?;
4144                writer
4145                    .insert_edge(
4146                        src,
4147                        dst,
4148                        type_id,
4149                        eid,
4150                        props,
4151                        Some(target.to_string()),
4152                        None,
4153                    )
4154                    .await?;
4155                count += 1;
4156            }
4157        }
4158
4159        let mut res = HashMap::new();
4160        res.insert("count".to_string(), Value::Int(count as i64));
4161        Ok(vec![res])
4162    }
4163
4164    /// Imports data from Parquet file to a label or edge type.
4165    ///
4166    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4167    pub(crate) async fn execute_parquet_import(
4168        &self,
4169        target: &str,
4170        source: &str,
4171        options: &HashMap<String, Value>,
4172        _prop_manager: &PropertyManager,
4173    ) -> Result<Vec<HashMap<String, Value>>> {
4174        let writer_lock = self
4175            .writer
4176            .as_ref()
4177            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4178
4179        let schema = self.storage.schema_manager().schema();
4180
4181        // 1. Determine if target is Label or EdgeType
4182        let label_meta = schema.labels.get(target);
4183        let edge_meta = schema.edge_types.get(target);
4184
4185        if label_meta.is_none() && edge_meta.is_none() {
4186            return Err(anyhow!("Target '{}' not found in schema", target));
4187        }
4188
4189        // 2. Open Parquet - support both local and cloud URLs
4190        let reader = if is_cloud_url(source) {
4191            self.open_parquet_from_cloud(source).await?
4192        } else {
4193            // Validate local source path against sandbox
4194            let validated_source = self.validate_path(source)?;
4195            let file = std::fs::File::open(&validated_source)?;
4196            let builder =
4197                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4198            builder.build()?
4199        };
4200        let mut reader = reader;
4201
4202        let mut count = 0;
4203        let mut writer = writer_lock.write().await;
4204
4205        if label_meta.is_some() {
4206            let target_props = schema
4207                .properties
4208                .get(target)
4209                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4210
4211            for batch in reader.by_ref() {
4212                let batch = batch?;
4213                for row in 0..batch.num_rows() {
4214                    let mut props = HashMap::new();
4215                    for field in batch.schema().fields() {
4216                        let name = field.name();
4217                        if target_props.contains_key(name) {
4218                            let col = batch.column_by_name(name).unwrap();
4219                            if !col.is_null(row) {
4220                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4221                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4222                                let val =
4223                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4224                                props.insert(name.clone(), val);
4225                            }
4226                        }
4227                    }
4228                    let vid = writer.next_vid().await?;
4229                    writer
4230                        .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4231                        .await?;
4232                    count += 1;
4233                }
4234            }
4235        } else if let Some(meta) = edge_meta {
4236            let type_id = meta.id;
4237            let target_props = schema
4238                .properties
4239                .get(target)
4240                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4241
4242            let src_col = options
4243                .get("src_col")
4244                .and_then(|v| v.as_str())
4245                .unwrap_or("_src");
4246            let dst_col = options
4247                .get("dst_col")
4248                .and_then(|v| v.as_str())
4249                .unwrap_or("_dst");
4250
4251            for batch in reader {
4252                let batch = batch?;
4253                for row in 0..batch.num_rows() {
4254                    let mut props = HashMap::new();
4255                    let mut src_vid = None;
4256                    let mut dst_vid = None;
4257
4258                    for field in batch.schema().fields() {
4259                        let name = field.name();
4260                        let col = batch.column_by_name(name).unwrap();
4261                        if col.is_null(row) {
4262                            continue;
4263                        }
4264
4265                        if name == src_col {
4266                            let val = Self::arrow_to_value(col.as_ref(), row);
4267                            src_vid = Some(Self::vid_from_value(&val)?);
4268                        } else if name == dst_col {
4269                            let val = Self::arrow_to_value(col.as_ref(), row);
4270                            dst_vid = Some(Self::vid_from_value(&val)?);
4271                        } else if let Some(pm) = target_props.get(name) {
4272                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4273                            let val =
4274                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4275                            props.insert(name.clone(), val);
4276                        }
4277                    }
4278
4279                    let src = src_vid
4280                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4281                    let dst = dst_vid.ok_or_else(|| {
4282                        anyhow!("Missing destination VID in column '{}'", dst_col)
4283                    })?;
4284
4285                    let eid = writer.next_eid(type_id).await?;
4286                    writer
4287                        .insert_edge(
4288                            src,
4289                            dst,
4290                            type_id,
4291                            eid,
4292                            props,
4293                            Some(target.to_string()),
4294                            None,
4295                        )
4296                        .await?;
4297                    count += 1;
4298                }
4299            }
4300        }
4301
4302        let mut res = HashMap::new();
4303        res.insert("count".to_string(), Value::Int(count as i64));
4304        Ok(vec![res])
4305    }
4306
4307    /// Opens a Parquet file from a cloud URL.
4308    ///
4309    /// Downloads the file to memory and creates a Parquet reader.
4310    async fn open_parquet_from_cloud(
4311        &self,
4312        source_url: &str,
4313    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4314        use object_store::ObjectStore;
4315
4316        let (store, path) = build_store_from_url(source_url)?;
4317
4318        // Download file contents
4319        let bytes = store.get(&path).await?.bytes().await?;
4320
4321        // Create a Parquet reader from the bytes
4322        let reader = bytes::Bytes::from(bytes.to_vec());
4323        let builder =
4324            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4325        Ok(builder.build()?)
4326    }
4327
4328    pub(crate) async fn scan_edge_type(
4329        &self,
4330        edge_type: &str,
4331        ctx: Option<&QueryContext>,
4332    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4333        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4334
4335        // 1. Scan L2 (Base)
4336        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4337
4338        // 2. Scan L1 (Delta)
4339        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4340
4341        // 3. Scan L0 (Memory) and filter tombstoned vertices
4342        if let Some(ctx) = ctx {
4343            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4344            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4345        }
4346
4347        Ok(edges
4348            .into_iter()
4349            .map(|(eid, (src, dst))| (eid, src, dst))
4350            .collect())
4351    }
4352
4353    /// Scan L2 (base) storage for edges of a given type.
4354    ///
4355    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4356    /// This L2 scan will typically find no data.
4357    pub(crate) async fn scan_edge_type_l2(
4358        &self,
4359        _edge_type: &str,
4360        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4361    ) -> Result<()> {
4362        // Edges are now stored in delta datasets (L1) via LanceDB.
4363        // Legacy L2 base edge storage is no longer used.
4364        Ok(())
4365    }
4366
4367    /// Scan L1 (delta) storage for edges of a given type.
4368    pub(crate) async fn scan_edge_type_l1(
4369        &self,
4370        edge_type: &str,
4371        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4372    ) -> Result<()> {
4373        if let Ok(Some(batch)) = self
4374            .storage
4375            .scan_delta_table(
4376                edge_type,
4377                "fwd",
4378                &["eid", "src_vid", "dst_vid", "op", "_version"],
4379                None,
4380            )
4381            .await
4382        {
4383            // Collect ops with versions: eid -> (version, op, src, dst)
4384            let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4385                HashMap::new();
4386
4387            self.process_delta_batch(&batch, &mut versioned_ops)?;
4388
4389            // Apply the winning ops
4390            for (eid, (_, op, src, dst)) in versioned_ops {
4391                if op == 0 {
4392                    edges.insert(eid, (src, dst));
4393                } else if op == 1 {
4394                    edges.remove(&eid);
4395                }
4396            }
4397        }
4398        Ok(())
4399    }
4400
4401    /// Process a delta batch, tracking versioned operations.
4402    pub(crate) fn process_delta_batch(
4403        &self,
4404        batch: &arrow_array::RecordBatch,
4405        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4406    ) -> Result<()> {
4407        use arrow_array::UInt64Array;
4408        let eid_col = batch
4409            .column_by_name("eid")
4410            .ok_or(anyhow!("Missing eid"))?
4411            .as_any()
4412            .downcast_ref::<UInt64Array>()
4413            .ok_or(anyhow!("Invalid eid"))?;
4414        let src_col = batch
4415            .column_by_name("src_vid")
4416            .ok_or(anyhow!("Missing src_vid"))?
4417            .as_any()
4418            .downcast_ref::<UInt64Array>()
4419            .ok_or(anyhow!("Invalid src_vid"))?;
4420        let dst_col = batch
4421            .column_by_name("dst_vid")
4422            .ok_or(anyhow!("Missing dst_vid"))?
4423            .as_any()
4424            .downcast_ref::<UInt64Array>()
4425            .ok_or(anyhow!("Invalid dst_vid"))?;
4426        let op_col = batch
4427            .column_by_name("op")
4428            .ok_or(anyhow!("Missing op"))?
4429            .as_any()
4430            .downcast_ref::<arrow_array::UInt8Array>()
4431            .ok_or(anyhow!("Invalid op"))?;
4432        let version_col = batch
4433            .column_by_name("_version")
4434            .ok_or(anyhow!("Missing _version"))?
4435            .as_any()
4436            .downcast_ref::<UInt64Array>()
4437            .ok_or(anyhow!("Invalid _version"))?;
4438
4439        for i in 0..batch.num_rows() {
4440            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4441            let version = version_col.value(i);
4442            let op = op_col.value(i);
4443            let src = Vid::from(src_col.value(i));
4444            let dst = Vid::from(dst_col.value(i));
4445
4446            match versioned_ops.entry(eid) {
4447                std::collections::hash_map::Entry::Vacant(e) => {
4448                    e.insert((version, op, src, dst));
4449                }
4450                std::collections::hash_map::Entry::Occupied(mut e) => {
4451                    if version > e.get().0 {
4452                        e.insert((version, op, src, dst));
4453                    }
4454                }
4455            }
4456        }
4457        Ok(())
4458    }
4459
4460    /// Scan L0 (memory) buffers for edges of a given type.
4461    pub(crate) fn scan_edge_type_l0(
4462        &self,
4463        edge_type: &str,
4464        ctx: &QueryContext,
4465        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4466    ) {
4467        let schema = self.storage.schema_manager().schema();
4468        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4469
4470        if let Some(type_id) = type_id {
4471            // Main L0
4472            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4473
4474            // Transaction L0
4475            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4476                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4477            }
4478
4479            // Pending flush L0s
4480            for pending_l0_arc in &ctx.pending_flush_l0s {
4481                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4482            }
4483        }
4484    }
4485
4486    /// Scan a single L0 buffer for edges and apply tombstones.
4487    pub(crate) fn scan_single_l0(
4488        &self,
4489        l0: &uni_store::runtime::L0Buffer,
4490        type_id: u32,
4491        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4492    ) {
4493        for edge_entry in l0.graph.edges() {
4494            if edge_entry.edge_type == type_id {
4495                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4496            }
4497        }
4498        // Process Tombstones
4499        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4500        for eid in eids_to_check {
4501            if l0.is_tombstoned(eid) {
4502                edges.remove(&eid);
4503            }
4504        }
4505    }
4506
4507    /// Filter out edges connected to tombstoned vertices.
4508    pub(crate) fn filter_tombstoned_vertex_edges(
4509        &self,
4510        ctx: &QueryContext,
4511        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4512    ) {
4513        let l0 = ctx.l0.read();
4514        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4515
4516        // Include tx_l0 vertex tombstones if present
4517        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4518            let tx_l0 = tx_l0_arc.read();
4519            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4520        }
4521
4522        // Include pending flush L0 vertex tombstones
4523        for pending_l0_arc in &ctx.pending_flush_l0s {
4524            let pending_l0 = pending_l0_arc.read();
4525            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4526        }
4527
4528        edges.retain(|_, (src, dst)| {
4529            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4530        });
4531    }
4532
4533    /// Execute a projection operation.
4534    pub(crate) async fn execute_project(
4535        &self,
4536        input_rows: Vec<HashMap<String, Value>>,
4537        projections: &[(Expr, Option<String>)],
4538        prop_manager: &PropertyManager,
4539        params: &HashMap<String, Value>,
4540        ctx: Option<&QueryContext>,
4541    ) -> Result<Vec<HashMap<String, Value>>> {
4542        let mut results = Vec::new();
4543        for m in input_rows {
4544            let mut row = HashMap::new();
4545            for (expr, alias) in projections {
4546                let val = self
4547                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4548                    .await?;
4549                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4550                row.insert(name, val);
4551            }
4552            results.push(row);
4553        }
4554        Ok(results)
4555    }
4556
4557    /// Execute an UNWIND operation.
4558    pub(crate) async fn execute_unwind(
4559        &self,
4560        input_rows: Vec<HashMap<String, Value>>,
4561        expr: &Expr,
4562        variable: &str,
4563        prop_manager: &PropertyManager,
4564        params: &HashMap<String, Value>,
4565        ctx: Option<&QueryContext>,
4566    ) -> Result<Vec<HashMap<String, Value>>> {
4567        let mut results = Vec::new();
4568        for row in input_rows {
4569            let val = self
4570                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4571                .await?;
4572            if let Value::List(items) = val {
4573                for item in items {
4574                    let mut new_row = row.clone();
4575                    new_row.insert(variable.to_string(), item);
4576                    results.push(new_row);
4577                }
4578            }
4579        }
4580        Ok(results)
4581    }
4582
4583    /// Execute an APPLY (correlated subquery) operation.
4584    pub(crate) async fn execute_apply(
4585        &self,
4586        input_rows: Vec<HashMap<String, Value>>,
4587        subquery: &LogicalPlan,
4588        input_filter: Option<&Expr>,
4589        prop_manager: &PropertyManager,
4590        params: &HashMap<String, Value>,
4591        ctx: Option<&QueryContext>,
4592    ) -> Result<Vec<HashMap<String, Value>>> {
4593        let mut filtered_rows = input_rows;
4594
4595        if let Some(filter) = input_filter {
4596            let mut filtered = Vec::new();
4597            for row in filtered_rows {
4598                let res = self
4599                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4600                    .await?;
4601                if res.as_bool().unwrap_or(false) {
4602                    filtered.push(row);
4603                }
4604            }
4605            filtered_rows = filtered;
4606        }
4607
4608        // Handle empty input: execute subquery once with empty context
4609        // This is critical for standalone CALL statements at the beginning of a query
4610        if filtered_rows.is_empty() {
4611            let sub_rows = self
4612                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4613                .await?;
4614            return Ok(sub_rows);
4615        }
4616
4617        let mut results = Vec::new();
4618        for row in filtered_rows {
4619            let mut sub_params = params.clone();
4620            sub_params.extend(row.clone());
4621
4622            let sub_rows = self
4623                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4624                .await?;
4625
4626            for sub_row in sub_rows {
4627                let mut new_row = row.clone();
4628                new_row.extend(sub_row);
4629                results.push(new_row);
4630            }
4631        }
4632        Ok(results)
4633    }
4634
4635    /// Execute SHOW INDEXES command.
4636    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4637        let schema = self.storage.schema_manager().schema();
4638        let mut rows = Vec::new();
4639        for idx in &schema.indexes {
4640            let (name, type_str, details) = match idx {
4641                uni_common::core::schema::IndexDefinition::Vector(c) => (
4642                    c.name.clone(),
4643                    "VECTOR",
4644                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4645                ),
4646                uni_common::core::schema::IndexDefinition::FullText(c) => (
4647                    c.name.clone(),
4648                    "FULLTEXT",
4649                    format!("on {}:{:?}", c.label, c.properties),
4650                ),
4651                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4652                    cfg.name.clone(),
4653                    "SCALAR",
4654                    format!(":{}({:?})", cfg.label, cfg.properties),
4655                ),
4656                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4657            };
4658
4659            if let Some(f) = filter
4660                && f != type_str
4661            {
4662                continue;
4663            }
4664
4665            let mut row = HashMap::new();
4666            row.insert("name".to_string(), Value::String(name));
4667            row.insert("type".to_string(), Value::String(type_str.to_string()));
4668            row.insert("details".to_string(), Value::String(details));
4669            rows.push(row);
4670        }
4671        rows
4672    }
4673
4674    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4675        let mut row = HashMap::new();
4676        row.insert("name".to_string(), Value::String("uni".to_string()));
4677        // Could add storage path, etc.
4678        vec![row]
4679    }
4680
4681    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4682        // Placeholder as we don't easy access to config struct from here
4683        vec![]
4684    }
4685
4686    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4687        let snapshot = self
4688            .storage
4689            .snapshot_manager()
4690            .load_latest_snapshot()
4691            .await?;
4692        let mut results = Vec::new();
4693
4694        if let Some(snap) = snapshot {
4695            for (label, s) in &snap.vertices {
4696                let mut row = HashMap::new();
4697                row.insert("type".to_string(), Value::String("Label".to_string()));
4698                row.insert("name".to_string(), Value::String(label.clone()));
4699                row.insert("count".to_string(), Value::Int(s.count as i64));
4700                results.push(row);
4701            }
4702            for (edge, s) in &snap.edges {
4703                let mut row = HashMap::new();
4704                row.insert("type".to_string(), Value::String("Edge".to_string()));
4705                row.insert("name".to_string(), Value::String(edge.clone()));
4706                row.insert("count".to_string(), Value::Int(s.count as i64));
4707                results.push(row);
4708            }
4709        }
4710
4711        Ok(results)
4712    }
4713
4714    pub(crate) fn execute_show_constraints(
4715        &self,
4716        clause: ShowConstraints,
4717    ) -> Vec<HashMap<String, Value>> {
4718        let schema = self.storage.schema_manager().schema();
4719        let mut rows = Vec::new();
4720        for c in &schema.constraints {
4721            if let Some(target) = &clause.target {
4722                match (target, &c.target) {
4723                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4724                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4725                        if e1 == e2 => {}
4726                    _ => continue,
4727                }
4728            }
4729
4730            let mut row = HashMap::new();
4731            row.insert("name".to_string(), Value::String(c.name.clone()));
4732            let type_str = match c.constraint_type {
4733                ConstraintType::Unique { .. } => "UNIQUE",
4734                ConstraintType::Exists { .. } => "EXISTS",
4735                ConstraintType::Check { .. } => "CHECK",
4736                _ => "UNKNOWN",
4737            };
4738            row.insert("type".to_string(), Value::String(type_str.to_string()));
4739
4740            let target_str = match &c.target {
4741                ConstraintTarget::Label(l) => format!("(:{})", l),
4742                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4743                _ => "UNKNOWN".to_string(),
4744            };
4745            row.insert("target".to_string(), Value::String(target_str));
4746
4747            rows.push(row);
4748        }
4749        rows
4750    }
4751
4752    /// Execute a MERGE operation.
4753    pub(crate) async fn execute_cross_join(
4754        &self,
4755        left: Box<LogicalPlan>,
4756        right: Box<LogicalPlan>,
4757        prop_manager: &PropertyManager,
4758        params: &HashMap<String, Value>,
4759        ctx: Option<&QueryContext>,
4760    ) -> Result<Vec<HashMap<String, Value>>> {
4761        let left_rows = self
4762            .execute_subplan(*left, prop_manager, params, ctx)
4763            .await?;
4764        let right_rows = self
4765            .execute_subplan(*right, prop_manager, params, ctx)
4766            .await?;
4767
4768        let mut results = Vec::new();
4769        for l in &left_rows {
4770            for r in &right_rows {
4771                let mut combined = l.clone();
4772                combined.extend(r.clone());
4773                results.push(combined);
4774            }
4775        }
4776        Ok(results)
4777    }
4778
4779    /// Execute a UNION operation with optional deduplication.
4780    pub(crate) async fn execute_union(
4781        &self,
4782        left: Box<LogicalPlan>,
4783        right: Box<LogicalPlan>,
4784        all: bool,
4785        prop_manager: &PropertyManager,
4786        params: &HashMap<String, Value>,
4787        ctx: Option<&QueryContext>,
4788    ) -> Result<Vec<HashMap<String, Value>>> {
4789        let mut left_rows = self
4790            .execute_subplan(*left, prop_manager, params, ctx)
4791            .await?;
4792        let mut right_rows = self
4793            .execute_subplan(*right, prop_manager, params, ctx)
4794            .await?;
4795
4796        left_rows.append(&mut right_rows);
4797
4798        if !all {
4799            let mut seen = HashSet::new();
4800            left_rows.retain(|row| {
4801                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4802                let key = format!("{sorted_row:?}");
4803                seen.insert(key)
4804            });
4805        }
4806        Ok(left_rows)
4807    }
4808
4809    /// Check if an index with the given name exists.
4810    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4811        let schema = self.storage.schema_manager().schema();
4812        schema.indexes.iter().any(|idx| match idx {
4813            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4814            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4815            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4816            _ => false,
4817        })
4818    }
4819
4820    pub(crate) async fn execute_export(
4821        &self,
4822        target: &str,
4823        source: &str,
4824        options: &HashMap<String, Value>,
4825        prop_manager: &PropertyManager,
4826        ctx: Option<&QueryContext>,
4827    ) -> Result<Vec<HashMap<String, Value>>> {
4828        let format = options
4829            .get("format")
4830            .and_then(|v| v.as_str())
4831            .unwrap_or("csv")
4832            .to_lowercase();
4833
4834        match format.as_str() {
4835            "csv" => {
4836                self.execute_csv_export(target, source, options, prop_manager, ctx)
4837                    .await
4838            }
4839            "parquet" => {
4840                self.execute_parquet_export(target, source, options, prop_manager, ctx)
4841                    .await
4842            }
4843            _ => Err(anyhow!("Unsupported export format: {}", format)),
4844        }
4845    }
4846
4847    pub(crate) async fn execute_csv_export(
4848        &self,
4849        target: &str,
4850        source: &str,
4851        options: &HashMap<String, Value>,
4852        prop_manager: &PropertyManager,
4853        ctx: Option<&QueryContext>,
4854    ) -> Result<Vec<HashMap<String, Value>>> {
4855        // Validate destination path against sandbox
4856        let validated_dest = self.validate_path(source)?;
4857
4858        let schema = self.storage.schema_manager().schema();
4859        let label_meta = schema.labels.get(target);
4860        let edge_meta = schema.edge_types.get(target);
4861
4862        if label_meta.is_none() && edge_meta.is_none() {
4863            return Err(anyhow!("Target '{}' not found in schema", target));
4864        }
4865
4866        let delimiter_str = options
4867            .get("delimiter")
4868            .and_then(|v| v.as_str())
4869            .unwrap_or(",");
4870        let delimiter = if delimiter_str.is_empty() {
4871            b','
4872        } else {
4873            delimiter_str.as_bytes()[0]
4874        };
4875        let has_header = options
4876            .get("header")
4877            .and_then(|v| v.as_bool())
4878            .unwrap_or(true);
4879
4880        let mut wtr = csv::WriterBuilder::new()
4881            .delimiter(delimiter)
4882            .from_path(&validated_dest)?;
4883
4884        let mut count = 0;
4885        // Empty properties map for labels/edge types without registered properties
4886        let empty_props = HashMap::new();
4887
4888        if let Some(meta) = label_meta {
4889            let label_id = meta.id;
4890            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4891            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4892            prop_names.sort();
4893
4894            let mut headers = vec!["_vid".to_string()];
4895            headers.extend(prop_names.clone());
4896
4897            if has_header {
4898                wtr.write_record(&headers)?;
4899            }
4900
4901            let vids = self
4902                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4903                .await?;
4904
4905            for vid in vids {
4906                let props = prop_manager
4907                    .get_all_vertex_props_with_ctx(vid, ctx)
4908                    .await?
4909                    .unwrap_or_default();
4910
4911                let mut row = Vec::with_capacity(headers.len());
4912                row.push(vid.to_string());
4913                for p_name in &prop_names {
4914                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4915                    row.push(self.format_csv_value(val));
4916                }
4917                wtr.write_record(&row)?;
4918                count += 1;
4919            }
4920        } else if let Some(meta) = edge_meta {
4921            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4922            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4923            prop_names.sort();
4924
4925            // Headers for Edge: _eid, _src, _dst, _type, ...props
4926            let mut headers = vec![
4927                "_eid".to_string(),
4928                "_src".to_string(),
4929                "_dst".to_string(),
4930                "_type".to_string(),
4931            ];
4932            headers.extend(prop_names.clone());
4933
4934            if has_header {
4935                wtr.write_record(&headers)?;
4936            }
4937
4938            let edges = self.scan_edge_type(target, ctx).await?;
4939
4940            for (eid, src, dst) in edges {
4941                let props = prop_manager
4942                    .get_all_edge_props_with_ctx(eid, ctx)
4943                    .await?
4944                    .unwrap_or_default();
4945
4946                let mut row = Vec::with_capacity(headers.len());
4947                row.push(eid.to_string());
4948                row.push(src.to_string());
4949                row.push(dst.to_string());
4950                row.push(meta.id.to_string());
4951
4952                for p_name in &prop_names {
4953                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4954                    row.push(self.format_csv_value(val));
4955                }
4956                wtr.write_record(&row)?;
4957                count += 1;
4958            }
4959        }
4960
4961        wtr.flush()?;
4962        let mut res = HashMap::new();
4963        res.insert("count".to_string(), Value::Int(count as i64));
4964        Ok(vec![res])
4965    }
4966
4967    /// Exports data to Parquet format.
4968    ///
4969    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4970    pub(crate) async fn execute_parquet_export(
4971        &self,
4972        target: &str,
4973        destination: &str,
4974        _options: &HashMap<String, Value>,
4975        prop_manager: &PropertyManager,
4976        ctx: Option<&QueryContext>,
4977    ) -> Result<Vec<HashMap<String, Value>>> {
4978        let schema_manager = self.storage.schema_manager();
4979        let schema = schema_manager.schema();
4980        let label_meta = schema.labels.get(target);
4981        let edge_meta = schema.edge_types.get(target);
4982
4983        if label_meta.is_none() && edge_meta.is_none() {
4984            return Err(anyhow!("Target '{}' not found in schema", target));
4985        }
4986
4987        let arrow_schema = if label_meta.is_some() {
4988            let dataset = self.storage.vertex_dataset(target)?;
4989            dataset.get_arrow_schema(&schema)?
4990        } else {
4991            // Edge Schema
4992            let dataset = self.storage.edge_dataset(target, "", "")?;
4993            dataset.get_arrow_schema(&schema)?
4994        };
4995
4996        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
4997
4998        if let Some(meta) = label_meta {
4999            let label_id = meta.id;
5000            let vids = self
5001                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5002                .await?;
5003
5004            for vid in vids {
5005                let mut props = prop_manager
5006                    .get_all_vertex_props_with_ctx(vid, ctx)
5007                    .await?
5008                    .unwrap_or_default();
5009
5010                props.insert(
5011                    "_vid".to_string(),
5012                    uni_common::Value::Int(vid.as_u64() as i64),
5013                );
5014                if !props.contains_key("_uid") {
5015                    props.insert(
5016                        "_uid".to_string(),
5017                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5018                    );
5019                }
5020                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5021                props.insert("_version".to_string(), uni_common::Value::Int(1));
5022                rows.push(props);
5023            }
5024        } else if edge_meta.is_some() {
5025            let edges = self.scan_edge_type(target, ctx).await?;
5026            for (eid, src, dst) in edges {
5027                let mut props = prop_manager
5028                    .get_all_edge_props_with_ctx(eid, ctx)
5029                    .await?
5030                    .unwrap_or_default();
5031
5032                props.insert(
5033                    "eid".to_string(),
5034                    uni_common::Value::Int(eid.as_u64() as i64),
5035                );
5036                props.insert(
5037                    "src_vid".to_string(),
5038                    uni_common::Value::Int(src.as_u64() as i64),
5039                );
5040                props.insert(
5041                    "dst_vid".to_string(),
5042                    uni_common::Value::Int(dst.as_u64() as i64),
5043                );
5044                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5045                props.insert("_version".to_string(), uni_common::Value::Int(1));
5046                rows.push(props);
5047            }
5048        }
5049
5050        // Write to cloud or local file
5051        if is_cloud_url(destination) {
5052            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5053                .await?;
5054        } else {
5055            // Validate local destination path against sandbox
5056            let validated_dest = self.validate_path(destination)?;
5057            let file = std::fs::File::create(&validated_dest)?;
5058            let mut writer =
5059                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5060
5061            // Write all in one batch for now (simplification)
5062            if !rows.is_empty() {
5063                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5064                writer.write(&batch)?;
5065            }
5066
5067            writer.close()?;
5068        }
5069
5070        let mut res = HashMap::new();
5071        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5072        Ok(vec![res])
5073    }
5074
5075    /// Writes Parquet data to a cloud storage destination.
5076    async fn write_parquet_to_cloud(
5077        &self,
5078        dest_url: &str,
5079        rows: &[HashMap<String, uni_common::Value>],
5080        arrow_schema: &arrow_schema::Schema,
5081    ) -> Result<()> {
5082        use object_store::ObjectStore;
5083
5084        let (store, path) = build_store_from_url(dest_url)?;
5085
5086        // Write to an in-memory buffer
5087        let mut buffer = Vec::new();
5088        {
5089            let mut writer = parquet::arrow::ArrowWriter::try_new(
5090                &mut buffer,
5091                Arc::new(arrow_schema.clone()),
5092                None,
5093            )?;
5094
5095            if !rows.is_empty() {
5096                let batch = self.rows_to_batch(rows, arrow_schema)?;
5097                writer.write(&batch)?;
5098            }
5099
5100            writer.close()?;
5101        }
5102
5103        // Upload to cloud storage
5104        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5105
5106        Ok(())
5107    }
5108
5109    pub(crate) fn rows_to_batch(
5110        &self,
5111        rows: &[HashMap<String, uni_common::Value>],
5112        schema: &arrow_schema::Schema,
5113    ) -> Result<RecordBatch> {
5114        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5115
5116        for field in schema.fields() {
5117            let name = field.name();
5118            let dt = field.data_type();
5119
5120            let values: Vec<uni_common::Value> = rows
5121                .iter()
5122                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5123                .collect();
5124            let array = self.values_to_array(&values, dt)?;
5125            columns.push(array);
5126        }
5127
5128        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5129    }
5130
5131    /// Convert a slice of Values to an Arrow array.
5132    /// Delegates to the shared implementation in arrow_convert module.
5133    pub(crate) fn values_to_array(
5134        &self,
5135        values: &[uni_common::Value],
5136        dt: &arrow_schema::DataType,
5137    ) -> Result<Arc<dyn Array>> {
5138        arrow_convert::values_to_array(values, dt)
5139    }
5140
5141    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5142        match val {
5143            Value::Null => "".to_string(),
5144            Value::String(s) => s,
5145            Value::Int(i) => i.to_string(),
5146            Value::Float(f) => f.to_string(),
5147            Value::Bool(b) => b.to_string(),
5148            _ => format!("{val}"),
5149        }
5150    }
5151
5152    pub(crate) fn parse_csv_value(
5153        &self,
5154        s: &str,
5155        data_type: &uni_common::core::schema::DataType,
5156        prop_name: &str,
5157    ) -> Result<Value> {
5158        if s.is_empty() || s.to_lowercase() == "null" {
5159            return Ok(Value::Null);
5160        }
5161
5162        use uni_common::core::schema::DataType;
5163        match data_type {
5164            DataType::String => Ok(Value::String(s.to_string())),
5165            DataType::Int32 | DataType::Int64 => {
5166                let i = s.parse::<i64>().map_err(|_| {
5167                    anyhow!(
5168                        "Failed to parse integer for property '{}': {}",
5169                        prop_name,
5170                        s
5171                    )
5172                })?;
5173                Ok(Value::Int(i))
5174            }
5175            DataType::Float32 | DataType::Float64 => {
5176                let f = s.parse::<f64>().map_err(|_| {
5177                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5178                })?;
5179                Ok(Value::Float(f))
5180            }
5181            DataType::Bool => {
5182                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5183                    anyhow!(
5184                        "Failed to parse boolean for property '{}': {}",
5185                        prop_name,
5186                        s
5187                    )
5188                })?;
5189                Ok(Value::Bool(b))
5190            }
5191            DataType::CypherValue => {
5192                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5193                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5194                })?;
5195                Ok(Value::from(json_val))
5196            }
5197            DataType::Vector { .. } => {
5198                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5199                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5200                })?;
5201                Ok(Value::Vector(v))
5202            }
5203            _ => Ok(Value::String(s.to_string())),
5204        }
5205    }
5206
5207    pub(crate) async fn detach_delete_vertex(
5208        &self,
5209        vid: Vid,
5210        writer: &mut Writer,
5211        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5212    ) -> Result<()> {
5213        let schema = self.storage.schema_manager().schema();
5214        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5215
5216        // 1. Find and delete all outgoing edges
5217        let out_graph = self
5218            .storage
5219            .load_subgraph_cached(
5220                &[vid],
5221                &edge_type_ids,
5222                1,
5223                uni_store::runtime::Direction::Outgoing,
5224                Some(writer.l0_manager.get_current()),
5225            )
5226            .await?;
5227
5228        for edge in out_graph.edges() {
5229            writer
5230                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5231                .await?;
5232        }
5233
5234        // 2. Find and delete all incoming edges
5235        let in_graph = self
5236            .storage
5237            .load_subgraph_cached(
5238                &[vid],
5239                &edge_type_ids,
5240                1,
5241                uni_store::runtime::Direction::Incoming,
5242                Some(writer.l0_manager.get_current()),
5243            )
5244            .await?;
5245
5246        for edge in in_graph.edges() {
5247            writer
5248                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5249                .await?;
5250        }
5251
5252        Ok(())
5253    }
5254
5255    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5256    pub(crate) async fn batch_detach_delete_vertices(
5257        &self,
5258        vids: &[Vid],
5259        labels_per_vid: Vec<Option<Vec<String>>>,
5260        writer: &mut Writer,
5261        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5262    ) -> Result<()> {
5263        let schema = self.storage.schema_manager().schema();
5264        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5265
5266        // Load outgoing subgraph for all VIDs in one call.
5267        let out_graph = self
5268            .storage
5269            .load_subgraph_cached(
5270                vids,
5271                &edge_type_ids,
5272                1,
5273                uni_store::runtime::Direction::Outgoing,
5274                Some(writer.l0_manager.get_current()),
5275            )
5276            .await?;
5277
5278        for edge in out_graph.edges() {
5279            writer
5280                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5281                .await?;
5282        }
5283
5284        // Load incoming subgraph for all VIDs in one call.
5285        let in_graph = self
5286            .storage
5287            .load_subgraph_cached(
5288                vids,
5289                &edge_type_ids,
5290                1,
5291                uni_store::runtime::Direction::Incoming,
5292                Some(writer.l0_manager.get_current()),
5293            )
5294            .await?;
5295
5296        for edge in in_graph.edges() {
5297            writer
5298                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5299                .await?;
5300        }
5301
5302        // Delete all vertices.
5303        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5304            writer.delete_vertex(*vid, labels, tx_l0).await?;
5305        }
5306
5307        Ok(())
5308    }
5309}