Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55// DataFusion engine imports
56use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73/// Collect VIDs from all L0 buffers visible to a query context.
74///
75/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
76/// collects the results. Returns an empty vec when no query context is present.
77fn collect_l0_vids(
78    ctx: Option<&QueryContext>,
79    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81    let mut vids = Vec::new();
82    if let Some(ctx) = ctx {
83        vids.extend(extractor(&ctx.l0.read()));
84        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85            vids.extend(extractor(&tx_l0_arc.read()));
86        }
87        for pending_l0_arc in &ctx.pending_flush_l0s {
88            vids.extend(extractor(&pending_l0_arc.read()));
89        }
90    }
91    vids
92}
93
94/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
95///
96/// This is the fallback for pushdown hydration - if the entity only has system fields
97/// (indicating pushdown didn't load properties), we load all properties here.
98///
99/// System field counts:
100/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
101/// - Vertex: 3 fields (_vid, _label, _uid)
102async fn hydrate_entity_if_needed(
103    map: &mut HashMap<String, Value>,
104    prop_manager: &PropertyManager,
105    ctx: Option<&QueryContext>,
106) {
107    // Check for edge entity
108    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110            tracing::debug!(
111                "Pushdown fallback: hydrating edge {} at execution time",
112                eid_u64
113            );
114            if let Ok(Some(props)) = prop_manager
115                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116                .await
117            {
118                for (key, value) in props {
119                    map.entry(key).or_insert(value);
120                }
121            }
122        } else {
123            tracing::trace!(
124                "Pushdown success: edge {} already has {} properties",
125                eid_u64,
126                map.len() - EDGE_SYSTEM_FIELD_COUNT
127            );
128        }
129        return;
130    }
131
132    // Check for vertex entity
133    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135            tracing::debug!(
136                "Pushdown fallback: hydrating vertex {} at execution time",
137                vid_u64
138            );
139            if let Ok(Some(props)) = prop_manager
140                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141                .await
142            {
143                for (key, value) in props {
144                    map.entry(key).or_insert(value);
145                }
146            }
147        } else {
148            tracing::trace!(
149                "Pushdown success: vertex {} already has {} properties",
150                vid_u64,
151                map.len() - VERTEX_SYSTEM_FIELD_COUNT
152            );
153        }
154    }
155}
156
157impl Executor {
158    /// Helper to verify and filter candidates against an optional predicate.
159    ///
160    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
161    /// Returns only VIDs that pass the filter (or are not deleted).
162    async fn verify_and_filter_candidates(
163        &self,
164        mut candidates: Vec<Vid>,
165        variable: &str,
166        filter: Option<&Expr>,
167        ctx: Option<&QueryContext>,
168        prop_manager: &PropertyManager,
169        params: &HashMap<String, Value>,
170    ) -> Result<Vec<Vid>> {
171        candidates.sort_unstable();
172        candidates.dedup();
173
174        let mut verified_vids = Vec::new();
175        for vid in candidates {
176            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
177                continue; // Deleted
178            };
179
180            if let Some(expr) = filter {
181                let mut props_map: HashMap<String, Value> = props;
182                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
183
184                let mut row = HashMap::new();
185                row.insert(variable.to_string(), Value::Map(props_map));
186
187                let res = self
188                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
189                    .await?;
190                if res.as_bool().unwrap_or(false) {
191                    verified_vids.push(vid);
192                }
193            } else {
194                verified_vids.push(vid);
195            }
196        }
197
198        Ok(verified_vids)
199    }
200
201    pub(crate) async fn scan_storage_candidates(
202        &self,
203        label_id: u16,
204        variable: &str,
205        filter: Option<&Expr>,
206    ) -> Result<Vec<Vid>> {
207        let schema = self.storage.schema_manager().schema();
208        let label_name = schema
209            .label_name_by_id(label_id)
210            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
211
212        // Generate filter SQL from expression
213        let empty_props = std::collections::HashMap::new();
214        let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
215        let filter_sql = filter.and_then(|expr| {
216            LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
217        });
218
219        self.storage
220            .scan_vertex_candidates(label_name, filter_sql.as_deref())
221            .await
222    }
223
224    pub(crate) async fn scan_label_with_filter(
225        &self,
226        label_id: u16,
227        variable: &str,
228        filter: Option<&Expr>,
229        ctx: Option<&QueryContext>,
230        prop_manager: &PropertyManager,
231        params: &HashMap<String, Value>,
232    ) -> Result<Vec<Vid>> {
233        let mut candidates = self
234            .scan_storage_candidates(label_id, variable, filter)
235            .await?;
236
237        // Convert label_id to label_name for L0 lookup
238        let schema = self.storage.schema_manager().schema();
239        if let Some(label_name) = schema.label_name_by_id(label_id) {
240            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
241        }
242
243        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
244            .await
245    }
246
247    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
248        // Handle Value::Node directly (has vid field)
249        if let Value::Node(node) = val {
250            return Ok(node.vid);
251        }
252        // Handle Object (node) containing _vid field
253        if let Value::Map(map) = val
254            && let Some(vid_val) = map.get("_vid")
255            && let Some(v) = vid_val.as_u64()
256        {
257            return Ok(Vid::from(v));
258        }
259        // Handle string format
260        if let Some(s) = val.as_str()
261            && let Ok(id) = s.parse::<u64>()
262        {
263            return Ok(Vid::new(id));
264        }
265        // Handle raw u64
266        if let Some(v) = val.as_u64() {
267            return Ok(Vid::from(v));
268        }
269        Err(anyhow!("Invalid Vid format: {:?}", val))
270    }
271
272    /// Find a node value in the row by VID.
273    ///
274    /// Scans all values in the row, looking for a node (Map or Node) whose VID
275    /// matches the target. Returns the full node value if found, or a minimal
276    /// Map with just `_vid` as fallback.
277    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
278        for val in row.values() {
279            if let Ok(vid) = Self::vid_from_value(val)
280                && vid == target_vid
281            {
282                return val.clone();
283            }
284        }
285        // Fallback: return minimal node map
286        Value::Map(HashMap::from([(
287            "_vid".to_string(),
288            Value::Int(target_vid.as_u64() as i64),
289        )]))
290    }
291
292    /// Create L0 context, session, and planner for DataFusion execution.
293    ///
294    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
295    /// Returns `(session_ctx, planner, prop_manager_arc)`.
296    pub async fn create_datafusion_planner(
297        &self,
298        prop_manager: &PropertyManager,
299        params: &HashMap<String, Value>,
300    ) -> Result<(
301        Arc<SyncRwLock<SessionContext>>,
302        HybridPhysicalPlanner,
303        Arc<PropertyManager>,
304    )> {
305        let query_ctx = self.get_context().await;
306        let l0_context = match query_ctx {
307            Some(ref ctx) => L0Context::from_query_context(ctx),
308            None => L0Context::empty(),
309        };
310
311        let prop_manager_arc = Arc::new(PropertyManager::new(
312            self.storage.clone(),
313            self.storage.schema_manager_arc(),
314            prop_manager.cache_size(),
315        ));
316
317        let session = SessionContext::new();
318        crate::query::df_udfs::register_cypher_udfs(&session)?;
319        if let Some(ref registry) = self.custom_function_registry {
320            crate::query::df_udfs::register_custom_udfs(&session, registry)?;
321        }
322        let session_ctx = Arc::new(SyncRwLock::new(session));
323
324        let mut planner = HybridPhysicalPlanner::with_l0_context(
325            session_ctx.clone(),
326            self.storage.clone(),
327            l0_context,
328            prop_manager_arc.clone(),
329            self.storage.schema_manager().schema(),
330            params.clone(),
331            HashMap::new(),
332        );
333
334        planner = planner.with_algo_registry(self.algo_registry.clone());
335        if let Some(ref registry) = self.procedure_registry {
336            planner = planner.with_procedure_registry(registry.clone());
337        }
338        if let Some(ref xervo_runtime) = self.xervo_runtime {
339            planner = planner.with_xervo_runtime(xervo_runtime.clone());
340        }
341
342        Ok((session_ctx, planner, prop_manager_arc))
343    }
344
345    /// Execute a DataFusion physical plan and collect all result batches.
346    pub fn collect_batches(
347        session_ctx: &Arc<SyncRwLock<SessionContext>>,
348        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
349    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
350        Box::pin(async move {
351            use futures::TryStreamExt;
352
353            let task_ctx = session_ctx.read().task_ctx();
354            let partition_count = execution_plan.output_partitioning().partition_count();
355            let mut all_batches = Vec::new();
356            for partition in 0..partition_count {
357                let stream = execution_plan.execute(partition, task_ctx.clone())?;
358                let batches: Vec<RecordBatch> = stream.try_collect().await?;
359                all_batches.extend(batches);
360            }
361            Ok(all_batches)
362        })
363    }
364
365    /// Executes a query using the DataFusion-based engine.
366    ///
367    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
368    /// trees with custom graph operators for graph-specific operations.
369    pub async fn execute_datafusion(
370        &self,
371        plan: LogicalPlan,
372        prop_manager: &PropertyManager,
373        params: &HashMap<String, Value>,
374    ) -> Result<Vec<RecordBatch>> {
375        let (batches, _plan) = self
376            .execute_datafusion_with_plan(plan, prop_manager, params)
377            .await?;
378        Ok(batches)
379    }
380
381    /// Executes a query using the DataFusion-based engine, returning both
382    /// result batches and the physical execution plan.
383    ///
384    /// The returned `Arc<dyn ExecutionPlan>` can be walked to extract per-operator
385    /// metrics (e.g., `output_rows`, `elapsed_compute`) that DataFusion's
386    /// `BaselineMetrics` recorded during execution.
387    pub async fn execute_datafusion_with_plan(
388        &self,
389        plan: LogicalPlan,
390        prop_manager: &PropertyManager,
391        params: &HashMap<String, Value>,
392    ) -> Result<(
393        Vec<RecordBatch>,
394        Arc<dyn datafusion::physical_plan::ExecutionPlan>,
395    )> {
396        let (session_ctx, mut planner, prop_manager_arc) =
397            self.create_datafusion_planner(prop_manager, params).await?;
398
399        // Build MutationContext when the plan contains write operations
400        if Self::contains_write_operations(&plan) {
401            let writer = self
402                .writer
403                .as_ref()
404                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
405                .clone();
406            let query_ctx = self.get_context().await;
407
408            debug_assert!(
409                query_ctx.is_some(),
410                "BUG: query_ctx is None for write operation"
411            );
412
413            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
414                executor: self.clone(),
415                writer,
416                prop_manager: prop_manager_arc,
417                params: params.clone(),
418                query_ctx,
419                tx_l0_override: self.transaction_l0_override.clone(),
420            });
421            planner = planner.with_mutation_context(mutation_ctx);
422            tracing::debug!(
423                plan_type = Self::get_plan_type(&plan),
424                "Mutation routed to DataFusion engine"
425            );
426        }
427
428        let execution_plan = planner.plan(&plan)?;
429        let plan_clone = Arc::clone(&execution_plan);
430        let result = Self::collect_batches(&session_ctx, execution_plan).await;
431
432        // Harvest warnings from the graph execution context after query completion.
433        let graph_warnings = planner.graph_ctx().take_warnings();
434        if !graph_warnings.is_empty()
435            && let Ok(mut w) = self.warnings.lock()
436        {
437            w.extend(graph_warnings);
438        }
439
440        result.map(|batches| (batches, plan_clone))
441    }
442
443    /// Execute a MERGE read sub-plan through the DataFusion engine.
444    ///
445    /// Plans and executes the MERGE pattern match using flat columnar output
446    /// (no structural projections), then groups dotted columns (`a._vid`,
447    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
448    pub(crate) async fn execute_merge_read_plan(
449        &self,
450        plan: LogicalPlan,
451        prop_manager: &PropertyManager,
452        params: &HashMap<String, Value>,
453        merge_variables: Vec<String>,
454    ) -> Result<Vec<HashMap<String, Value>>> {
455        let (session_ctx, planner, _prop_manager_arc) =
456            self.create_datafusion_planner(prop_manager, params).await?;
457
458        // Plan with full property access ("*") for all merge variables so that
459        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
460        let extra: HashMap<String, HashSet<String>> = merge_variables
461            .iter()
462            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
463            .collect();
464        let execution_plan = planner.plan_with_properties(&plan, extra)?;
465        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
466
467        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
468        let flat_rows = self.record_batches_to_rows(all_batches)?;
469
470        // Group dotted columns into per-variable Maps for MERGE's match logic.
471        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
472        let rows = flat_rows
473            .into_iter()
474            .map(|mut row| {
475                for var in &merge_variables {
476                    // Skip if already materialized (e.g., by record_batches_to_rows)
477                    if row.contains_key(var) {
478                        continue;
479                    }
480                    let prefix = format!("{}.", var);
481                    let dotted_keys: Vec<String> = row
482                        .keys()
483                        .filter(|k| k.starts_with(&prefix))
484                        .cloned()
485                        .collect();
486                    if !dotted_keys.is_empty() {
487                        let mut map = HashMap::new();
488                        for key in dotted_keys {
489                            let prop_name = key[prefix.len()..].to_string();
490                            if let Some(val) = row.remove(&key) {
491                                map.insert(prop_name, val);
492                            }
493                        }
494                        row.insert(var.clone(), Value::Map(map));
495                    }
496                }
497                row
498            })
499            .collect();
500
501        Ok(rows)
502    }
503
504    /// Converts DataFusion RecordBatches to row-based HashMap format.
505    ///
506    /// Handles special metadata on fields:
507    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
508    ///
509    /// Also normalizes path structures to user-facing format (converts _vid to _id).
510    pub(crate) fn record_batches_to_rows(
511        &self,
512        batches: Vec<RecordBatch>,
513    ) -> Result<Vec<HashMap<String, Value>>> {
514        let mut rows = Vec::new();
515
516        for batch in batches {
517            let num_rows = batch.num_rows();
518            let schema = batch.schema();
519
520            for row_idx in 0..num_rows {
521                let mut row = HashMap::new();
522
523                for (col_idx, field) in schema.fields().iter().enumerate() {
524                    let column = batch.column(col_idx);
525                    // Infer Uni DataType from Arrow type for DateTime/Time struct decoding
526                    let data_type =
527                        if uni_common::core::schema::is_datetime_struct(field.data_type()) {
528                            Some(&uni_common::DataType::DateTime)
529                        } else if uni_common::core::schema::is_time_struct(field.data_type()) {
530                            Some(&uni_common::DataType::Time)
531                        } else {
532                            None
533                        };
534                    let mut value =
535                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
536
537                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
538                    // Parse JSON string to restore the original type
539                    if field
540                        .metadata()
541                        .get("cv_encoded")
542                        .is_some_and(|v| v == "true")
543                        && let Value::String(s) = &value
544                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
545                    {
546                        value = Value::from(parsed);
547                    }
548
549                    // Normalize path structures to user-facing format
550                    value = Self::normalize_path_if_needed(value);
551
552                    row.insert(field.name().clone(), value);
553                }
554
555                // Merge system fields into bare variable maps.
556                // The projection step emits helper columns like "n._vid" and "n._labels"
557                // alongside the materialized "n" column (a Map of user properties).
558                // Here we merge those system fields into the map and remove the helpers.
559                //
560                // For search procedures (vector/FTS), the bare variable may be a VID
561                // string placeholder rather than a Map. In that case, promote it to a
562                // Map so we can merge system fields and properties into it.
563                let bare_vars: Vec<String> = row
564                    .keys()
565                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
566                    .cloned()
567                    .collect();
568
569                // Detect VID-placeholder variables that have system columns
570                // (e.g., "node" is String("1") but "node._vid" exists).
571                // Promote these to Maps so system fields can be merged in.
572                let vid_placeholder_vars: Vec<String> = row
573                    .keys()
574                    .filter(|k| {
575                        !k.contains('.')
576                            && matches!(row.get(*k), Some(Value::String(_)))
577                            && row.contains_key(&format!("{}._vid", k))
578                    })
579                    .cloned()
580                    .collect();
581
582                for var in &vid_placeholder_vars {
583                    // Build a Map from system and property columns
584                    let prefix = format!("{}.", var);
585                    let mut map = HashMap::new();
586
587                    let dotted_keys: Vec<String> = row
588                        .keys()
589                        .filter(|k| k.starts_with(&prefix))
590                        .cloned()
591                        .collect();
592
593                    for key in &dotted_keys {
594                        let prop_name = &key[prefix.len()..];
595                        if let Some(val) = row.remove(key) {
596                            map.insert(prop_name.to_string(), val);
597                        }
598                    }
599
600                    // Replace the VID-string placeholder with the constructed Map
601                    row.insert(var.clone(), Value::Map(map));
602                }
603
604                for var in &bare_vars {
605                    // Merge node system fields (_vid, _labels)
606                    let vid_key = format!("{}._vid", var);
607                    let labels_key = format!("{}._labels", var);
608
609                    let vid_val = row.remove(&vid_key);
610                    let labels_val = row.remove(&labels_key);
611
612                    if let Some(Value::Map(map)) = row.get_mut(var) {
613                        if let Some(v) = vid_val {
614                            map.insert("_vid".to_string(), v);
615                        }
616                        if let Some(v) = labels_val {
617                            map.insert("_labels".to_string(), v);
618                        }
619                    }
620
621                    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
622                    // These are emitted as helper columns by the traverse exec.
623                    // The structural projection already includes them in the struct,
624                    // but we still need to remove the dotted helper columns.
625                    let eid_key = format!("{}._eid", var);
626                    let type_key = format!("{}._type", var);
627
628                    let eid_val = row.remove(&eid_key);
629                    let type_val = row.remove(&type_key);
630
631                    if (eid_val.is_some() || type_val.is_some())
632                        && let Some(Value::Map(map)) = row.get_mut(var)
633                    {
634                        if let Some(v) = eid_val {
635                            map.entry("_eid".to_string()).or_insert(v);
636                        }
637                        if let Some(v) = type_val {
638                            map.entry("_type".to_string()).or_insert(v);
639                        }
640                    }
641
642                    // Remove remaining dotted helper columns (e.g. _all_props, _src_vid, _dst_vid)
643                    let prefix = format!("{}.", var);
644                    let helper_keys: Vec<String> = row
645                        .keys()
646                        .filter(|k| k.starts_with(&prefix))
647                        .cloned()
648                        .collect();
649                    for key in helper_keys {
650                        row.remove(&key);
651                    }
652                }
653
654                rows.push(row);
655            }
656        }
657
658        Ok(rows)
659    }
660
661    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
662    ///
663    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
664    /// Other values are returned unchanged to avoid interfering with query execution.
665    fn normalize_path_if_needed(value: Value) -> Value {
666        match value {
667            Value::Map(map)
668                if map.contains_key("nodes")
669                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
670            {
671                Self::normalize_path_map(map)
672            }
673            other => other,
674        }
675    }
676
677    /// Normalize a path map object.
678    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
679        // Normalize nodes array
680        if let Some(Value::List(nodes)) = map.remove("nodes") {
681            let normalized_nodes: Vec<Value> = nodes
682                .into_iter()
683                .map(|n| {
684                    if let Value::Map(node_map) = n {
685                        Self::normalize_path_node_map(node_map)
686                    } else {
687                        n
688                    }
689                })
690                .collect();
691            map.insert("nodes".to_string(), Value::List(normalized_nodes));
692        }
693
694        // Normalize relationships array (may be called "relationships" or "edges")
695        let rels_key = if map.contains_key("relationships") {
696            "relationships"
697        } else {
698            "edges"
699        };
700        if let Some(Value::List(rels)) = map.remove(rels_key) {
701            let normalized_rels: Vec<Value> = rels
702                .into_iter()
703                .map(|r| {
704                    if let Value::Map(rel_map) = r {
705                        Self::normalize_path_edge_map(rel_map)
706                    } else {
707                        r
708                    }
709                })
710                .collect();
711            map.insert("relationships".to_string(), Value::List(normalized_rels));
712        }
713
714        Value::Map(map)
715    }
716
717    /// Convert a Value to its string representation for path normalization.
718    fn value_to_id_string(val: Value) -> String {
719        match val {
720            Value::Int(n) => n.to_string(),
721            Value::Float(n) => n.to_string(),
722            Value::String(s) => s,
723            other => other.to_string(),
724        }
725    }
726
727    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
728    /// When `src_key == dst_key`, this simply stringifies the value in place.
729    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
730        if let Some(val) = map.remove(src_key) {
731            map.insert(
732                dst_key.to_string(),
733                Value::String(Self::value_to_id_string(val)),
734            );
735        }
736    }
737
738    /// Ensure the "properties" field is a non-null map.
739    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
740        match map.get("properties") {
741            Some(props) if !props.is_null() => {}
742            _ => {
743                map.insert("properties".to_string(), Value::Map(HashMap::new()));
744            }
745        }
746    }
747
748    /// Normalize a node within a path to user-facing format.
749    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
750        Self::stringify_map_field(&mut map, "_vid", "_id");
751        Self::ensure_properties_map(&mut map);
752        Value::Map(map)
753    }
754
755    /// Normalize an edge within a path to user-facing format.
756    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
757        Self::stringify_map_field(&mut map, "_eid", "_id");
758        Self::stringify_map_field(&mut map, "_src", "_src");
759        Self::stringify_map_field(&mut map, "_dst", "_dst");
760
761        if let Some(type_name) = map.remove("_type_name") {
762            map.insert("_type".to_string(), type_name);
763        }
764
765        Self::ensure_properties_map(&mut map);
766        Value::Map(map)
767    }
768
769    #[instrument(
770        skip(self, prop_manager, params),
771        fields(rows_returned, duration_ms),
772        level = "info"
773    )]
774    pub fn execute<'a>(
775        &'a self,
776        plan: LogicalPlan,
777        prop_manager: &'a PropertyManager,
778        params: &'a HashMap<String, Value>,
779    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
780        Box::pin(async move {
781            let query_type = Self::get_plan_type(&plan);
782            let ctx = self.get_context().await;
783            let start = Instant::now();
784
785            // Route DDL/Admin queries to the fallback executor.
786            // All other queries (including similar_to) flow through DataFusion.
787            let res = if Self::is_ddl_or_admin(&plan) {
788                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
789                    .await
790            } else {
791                let batches = self
792                    .execute_datafusion(plan.clone(), prop_manager, params)
793                    .await?;
794                self.record_batches_to_rows(batches)
795            };
796
797            let duration = start.elapsed();
798            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
799                .record(duration.as_secs_f64());
800
801            tracing::Span::current().record("duration_ms", duration.as_millis());
802            match &res {
803                Ok(rows) => {
804                    tracing::Span::current().record("rows_returned", rows.len());
805                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
806                        .increment(rows.len() as u64);
807                }
808                Err(e) => {
809                    let error_type = if e.to_string().contains("timed out") {
810                        "timeout"
811                    } else if e.to_string().contains("syntax") {
812                        "syntax"
813                    } else {
814                        "execution"
815                    };
816                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
817                }
818            }
819
820            res
821        })
822    }
823
824    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
825        match plan {
826            LogicalPlan::Scan { .. } => "read_scan",
827            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
828            LogicalPlan::Traverse { .. } => "read_traverse",
829            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
830            LogicalPlan::ScanAll { .. } => "read_scan_all",
831            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
832            LogicalPlan::VectorKnn { .. } => "read_vector",
833            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
834            LogicalPlan::Merge { .. } => "write_merge",
835            LogicalPlan::Delete { .. } => "write_delete",
836            LogicalPlan::Set { .. } => "write_set",
837            LogicalPlan::Remove { .. } => "write_remove",
838            LogicalPlan::ProcedureCall { .. } => "call",
839            LogicalPlan::Copy { .. } => "copy",
840            LogicalPlan::Backup { .. } => "backup",
841            _ => "other",
842        }
843    }
844
845    /// Return all direct child plan references from a `LogicalPlan`.
846    ///
847    /// This centralizes the variant→children mapping so that recursive walkers
848    /// (e.g., `contains_write_operations`) can delegate the
849    /// "recurse into children" logic instead of duplicating the match arms.
850    ///
851    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
852    /// is not included because it requires special iteration. Callers that
853    /// need to inspect the body should handle `Foreach` before falling through.
854    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
855        match plan {
856            // Single-input wrappers
857            LogicalPlan::Project { input, .. }
858            | LogicalPlan::Sort { input, .. }
859            | LogicalPlan::Limit { input, .. }
860            | LogicalPlan::Distinct { input }
861            | LogicalPlan::Aggregate { input, .. }
862            | LogicalPlan::Window { input, .. }
863            | LogicalPlan::Unwind { input, .. }
864            | LogicalPlan::Filter { input, .. }
865            | LogicalPlan::Create { input, .. }
866            | LogicalPlan::CreateBatch { input, .. }
867            | LogicalPlan::Set { input, .. }
868            | LogicalPlan::Remove { input, .. }
869            | LogicalPlan::Delete { input, .. }
870            | LogicalPlan::Merge { input, .. }
871            | LogicalPlan::Foreach { input, .. }
872            | LogicalPlan::Traverse { input, .. }
873            | LogicalPlan::TraverseMainByType { input, .. }
874            | LogicalPlan::BindZeroLengthPath { input, .. }
875            | LogicalPlan::BindPath { input, .. }
876            | LogicalPlan::ShortestPath { input, .. }
877            | LogicalPlan::AllShortestPaths { input, .. }
878            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
879
880            // Two-input wrappers
881            LogicalPlan::Apply {
882                input, subquery, ..
883            }
884            | LogicalPlan::SubqueryCall { input, subquery } => {
885                vec![input.as_ref(), subquery.as_ref()]
886            }
887            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
888                vec![left.as_ref(), right.as_ref()]
889            }
890            LogicalPlan::RecursiveCTE {
891                initial, recursive, ..
892            } => vec![initial.as_ref(), recursive.as_ref()],
893            LogicalPlan::QuantifiedPattern {
894                input,
895                pattern_plan,
896                ..
897            } => vec![input.as_ref(), pattern_plan.as_ref()],
898
899            // Leaf nodes (scans, DDL, admin, etc.)
900            _ => vec![],
901        }
902    }
903
904    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
905    ///
906    /// These operations don't produce data streams and aren't supported by the
907    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
908    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
909    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
910    pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
911        match plan {
912            // DDL / schema operations
913            LogicalPlan::CreateLabel(_)
914            | LogicalPlan::CreateEdgeType(_)
915            | LogicalPlan::AlterLabel(_)
916            | LogicalPlan::AlterEdgeType(_)
917            | LogicalPlan::DropLabel(_)
918            | LogicalPlan::DropEdgeType(_)
919            | LogicalPlan::CreateConstraint(_)
920            | LogicalPlan::DropConstraint(_)
921            | LogicalPlan::ShowConstraints(_) => true,
922
923            // Index operations
924            LogicalPlan::CreateVectorIndex { .. }
925            | LogicalPlan::CreateFullTextIndex { .. }
926            | LogicalPlan::CreateScalarIndex { .. }
927            | LogicalPlan::CreateJsonFtsIndex { .. }
928            | LogicalPlan::DropIndex { .. }
929            | LogicalPlan::ShowIndexes { .. } => true,
930
931            // Admin / utility operations
932            LogicalPlan::ShowDatabase
933            | LogicalPlan::ShowConfig
934            | LogicalPlan::ShowStatistics
935            | LogicalPlan::Vacuum
936            | LogicalPlan::Checkpoint
937            | LogicalPlan::Copy { .. }
938            | LogicalPlan::CopyTo { .. }
939            | LogicalPlan::CopyFrom { .. }
940            | LogicalPlan::Backup { .. }
941            | LogicalPlan::Explain { .. } => true,
942
943            // Procedure calls: DF-eligible procedures go through DataFusion,
944            // everything else (DDL, admin, unknown) stays on fallback.
945            LogicalPlan::ProcedureCall { procedure_name, .. } => {
946                !Self::is_df_eligible_procedure(procedure_name)
947            }
948
949            // Recurse through children using plan_children
950            _ => Self::plan_children(plan)
951                .iter()
952                .any(|child| Self::is_ddl_or_admin(child)),
953        }
954    }
955
956    /// Returns `true` if the procedure is a read-only, data-producing procedure
957    /// that can be executed through the DataFusion engine.
958    ///
959    /// This is a **positive allowlist** — unknown procedures default to the
960    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
961    fn is_df_eligible_procedure(name: &str) -> bool {
962        matches!(
963            name,
964            "uni.schema.labels"
965                | "uni.schema.edgeTypes"
966                | "uni.schema.relationshipTypes"
967                | "uni.schema.indexes"
968                | "uni.schema.constraints"
969                | "uni.schema.labelInfo"
970                | "uni.vector.query"
971                | "uni.fts.query"
972                | "uni.search"
973        ) || name.starts_with("uni.algo.")
974    }
975
976    /// Check if a plan contains write/mutation operations anywhere in the tree.
977    ///
978    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
979    /// are used to determine when a MutationContext needs to be built for DataFusion.
980    /// This recurses through read-only wrapper nodes to detect writes nested inside
981    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
982    fn contains_write_operations(plan: &LogicalPlan) -> bool {
983        match plan {
984            LogicalPlan::Create { .. }
985            | LogicalPlan::CreateBatch { .. }
986            | LogicalPlan::Merge { .. }
987            | LogicalPlan::Delete { .. }
988            | LogicalPlan::Set { .. }
989            | LogicalPlan::Remove { .. }
990            | LogicalPlan::Foreach { .. } => true,
991            _ => Self::plan_children(plan)
992                .iter()
993                .any(|child| Self::contains_write_operations(child)),
994        }
995    }
996
997    /// Executes a query as a stream of result batches.
998    ///
999    /// Routes DDL/Admin through the fallback executor and everything else
1000    /// through DataFusion.
1001    pub fn execute_stream(
1002        self,
1003        plan: LogicalPlan,
1004        prop_manager: Arc<PropertyManager>,
1005        params: HashMap<String, Value>,
1006    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1007        let this = self;
1008        let this_for_ctx = this.clone();
1009
1010        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1011
1012        ctx_stream
1013            .flat_map(move |ctx| {
1014                let plan = plan.clone();
1015                let this = this.clone();
1016                let prop_manager = prop_manager.clone();
1017                let params = params.clone();
1018
1019                let fut = async move {
1020                    if Self::is_ddl_or_admin(&plan) {
1021                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1022                            .await
1023                    } else {
1024                        let batches = this
1025                            .execute_datafusion(plan, &prop_manager, &params)
1026                            .await?;
1027                        this.record_batches_to_rows(batches)
1028                    }
1029                };
1030                stream::once(fut).boxed()
1031            })
1032            .boxed()
1033    }
1034
1035    /// Converts an Arrow array element at a given row index to a Value.
1036    /// Delegates to the shared implementation in arrow_convert module.
1037    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1038        arrow_convert::arrow_to_value(col, row, None)
1039    }
1040
1041    pub(crate) fn evaluate_expr<'a>(
1042        &'a self,
1043        expr: &'a Expr,
1044        row: &'a HashMap<String, Value>,
1045        prop_manager: &'a PropertyManager,
1046        params: &'a HashMap<String, Value>,
1047        ctx: Option<&'a QueryContext>,
1048    ) -> BoxFuture<'a, Result<Value>> {
1049        let this = self;
1050        Box::pin(async move {
1051            // First check if the expression itself is already pre-computed in the row
1052            let repr = expr.to_string_repr();
1053            if let Some(val) = row.get(&repr) {
1054                return Ok(val.clone());
1055            }
1056
1057            match expr {
1058                Expr::PatternComprehension { .. } => {
1059                    // Handled by DataFusion path via PatternComprehensionExecExpr
1060                    Err(anyhow::anyhow!(
1061                        "Pattern comprehensions are handled by DataFusion executor"
1062                    ))
1063                }
1064                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1065                    "COLLECT subqueries not yet supported in executor"
1066                )),
1067                Expr::Variable(name) => {
1068                    if let Some(val) = row.get(name) {
1069                        Ok(val.clone())
1070                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1071                        // Fallback: scan results may have system columns like "d._vid"
1072                        // without a materialized "d" Map. Return the VID so Property
1073                        // evaluation can fetch properties from storage.
1074                        Ok(vid_val.clone())
1075                    } else {
1076                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1077                    }
1078                }
1079                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1080                Expr::Property(var_expr, prop_name) => {
1081                    // Fast path: if the base is a Variable, try flat-key lookup first.
1082                    // DataFusion scan results use flat keys like "d.embedding" rather than
1083                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1084                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1085                        let flat_key = format!("{}.{}", var_name, prop_name);
1086                        if let Some(val) = row.get(flat_key.as_str()) {
1087                            return Ok(val.clone());
1088                        }
1089                    }
1090
1091                    let base_val = this
1092                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1093                        .await?;
1094
1095                    // Handle system properties _vid and _id directly
1096                    if (prop_name == "_vid" || prop_name == "_id")
1097                        && let Ok(vid) = Self::vid_from_value(&base_val)
1098                    {
1099                        return Ok(Value::Int(vid.as_u64() as i64));
1100                    }
1101
1102                    // Handle Value::Node - access properties directly or via prop manager
1103                    if let Value::Node(node) = &base_val {
1104                        // Handle system properties
1105                        if prop_name == "_vid" || prop_name == "_id" {
1106                            return Ok(Value::Int(node.vid.as_u64() as i64));
1107                        }
1108                        if prop_name == "_labels" {
1109                            return Ok(Value::List(
1110                                node.labels
1111                                    .iter()
1112                                    .map(|l| Value::String(l.clone()))
1113                                    .collect(),
1114                            ));
1115                        }
1116                        // Check in-memory properties first
1117                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1118                            return Ok(val.clone());
1119                        }
1120                        // Fallback to storage lookup
1121                        if let Ok(val) = prop_manager
1122                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1123                            .await
1124                        {
1125                            return Ok(val);
1126                        }
1127                        return Ok(Value::Null);
1128                    }
1129
1130                    // Handle Value::Edge - access properties directly or via prop manager
1131                    if let Value::Edge(edge) = &base_val {
1132                        // Handle system properties
1133                        if prop_name == "_eid" || prop_name == "_id" {
1134                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1135                        }
1136                        if prop_name == "_type" {
1137                            return Ok(Value::String(edge.edge_type.clone()));
1138                        }
1139                        if prop_name == "_src" {
1140                            return Ok(Value::Int(edge.src.as_u64() as i64));
1141                        }
1142                        if prop_name == "_dst" {
1143                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1144                        }
1145                        // Check in-memory properties first
1146                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1147                            return Ok(val.clone());
1148                        }
1149                        // Fallback to storage lookup
1150                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1151                        {
1152                            return Ok(val);
1153                        }
1154                        return Ok(Value::Null);
1155                    }
1156
1157                    // If base_val is an object (node/edge), check its properties first
1158                    // This handles properties from CREATE/MERGE that may not be persisted yet
1159                    if let Value::Map(map) = &base_val {
1160                        // First check top-level (for system properties like _id, _label, etc.)
1161                        if let Some(val) = map.get(prop_name.as_str()) {
1162                            return Ok(val.clone());
1163                        }
1164                        // Then check inside "properties" object (for user properties)
1165                        if let Some(Value::Map(props)) = map.get("properties")
1166                            && let Some(val) = props.get(prop_name.as_str())
1167                        {
1168                            return Ok(val.clone());
1169                        }
1170                        // Fallback to storage lookup using _vid or _id
1171                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1172                            map.get("_id")
1173                                .and_then(|v| v.as_str())
1174                                .and_then(|s| s.parse::<u64>().ok())
1175                        });
1176                        if let Some(id) = vid_opt {
1177                            let vid = Vid::from(id);
1178                            if let Ok(val) = prop_manager
1179                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1180                                .await
1181                            {
1182                                return Ok(val);
1183                            }
1184                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1185                            let eid = uni_common::core::id::Eid::from(id);
1186                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1187                                return Ok(val);
1188                            }
1189                        }
1190                        return Ok(Value::Null);
1191                    }
1192
1193                    // If base_val is just a VID, fetch from property manager
1194                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1195                        return prop_manager
1196                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1197                            .await;
1198                    }
1199
1200                    if base_val.is_null() {
1201                        return Ok(Value::Null);
1202                    }
1203
1204                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1205                    {
1206                        use crate::query::datetime::{
1207                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1208                            is_duration_string, is_temporal_accessor, is_temporal_string,
1209                        };
1210
1211                        // Handle Value::Temporal directly (no string parsing needed)
1212                        if let Value::Temporal(tv) = &base_val {
1213                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1214                                if is_duration_accessor(prop_name) {
1215                                    // Convert to string for the existing accessor logic
1216                                    return eval_duration_accessor(
1217                                        &base_val.to_string(),
1218                                        prop_name,
1219                                    );
1220                                }
1221                            } else if is_temporal_accessor(prop_name) {
1222                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1223                            }
1224                        }
1225
1226                        // Handle Value::String temporal (backward compat)
1227                        if let Value::String(s) = &base_val {
1228                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1229                                return eval_temporal_accessor(s, prop_name);
1230                            }
1231                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1232                                return eval_duration_accessor(s, prop_name);
1233                            }
1234                        }
1235                    }
1236
1237                    Err(anyhow!(
1238                        "Cannot access property '{}' on {:?}",
1239                        prop_name,
1240                        base_val
1241                    ))
1242                }
1243                Expr::ArrayIndex {
1244                    array: arr_expr,
1245                    index: idx_expr,
1246                } => {
1247                    let arr_val = this
1248                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1249                        .await?;
1250                    let idx_val = this
1251                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1252                        .await?;
1253
1254                    if let Value::List(arr) = &arr_val {
1255                        // Handle signed indices (allow negative)
1256                        if let Some(i) = idx_val.as_i64() {
1257                            let idx = if i < 0 {
1258                                // Negative index: -1 = last element, -2 = second to last, etc.
1259                                let positive_idx = arr.len() as i64 + i;
1260                                if positive_idx < 0 {
1261                                    return Ok(Value::Null); // Out of bounds
1262                                }
1263                                positive_idx as usize
1264                            } else {
1265                                i as usize
1266                            };
1267                            if idx < arr.len() {
1268                                return Ok(arr[idx].clone());
1269                            }
1270                            return Ok(Value::Null);
1271                        } else if idx_val.is_null() {
1272                            return Ok(Value::Null);
1273                        } else {
1274                            return Err(anyhow::anyhow!(
1275                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1276                                idx_val
1277                            ));
1278                        }
1279                    }
1280                    if let Value::Map(map) = &arr_val {
1281                        if let Some(key) = idx_val.as_str() {
1282                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1283                        } else if !idx_val.is_null() {
1284                            return Err(anyhow::anyhow!(
1285                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1286                                idx_val
1287                            ));
1288                        }
1289                    }
1290                    // Handle bracket access on Node: n['name'] returns property
1291                    if let Value::Node(node) = &arr_val {
1292                        if let Some(key) = idx_val.as_str() {
1293                            // Check in-memory properties first
1294                            if let Some(val) = node.properties.get(key) {
1295                                return Ok(val.clone());
1296                            }
1297                            // Fallback to property manager
1298                            if let Ok(val) = prop_manager
1299                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1300                                .await
1301                            {
1302                                return Ok(val);
1303                            }
1304                            return Ok(Value::Null);
1305                        } else if !idx_val.is_null() {
1306                            return Err(anyhow::anyhow!(
1307                                "TypeError: Node index must be a string, got: {:?}",
1308                                idx_val
1309                            ));
1310                        }
1311                    }
1312                    // Handle bracket access on Edge: e['property'] returns property
1313                    if let Value::Edge(edge) = &arr_val {
1314                        if let Some(key) = idx_val.as_str() {
1315                            // Check in-memory properties first
1316                            if let Some(val) = edge.properties.get(key) {
1317                                return Ok(val.clone());
1318                            }
1319                            // Fallback to property manager
1320                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1321                                return Ok(val);
1322                            }
1323                            return Ok(Value::Null);
1324                        } else if !idx_val.is_null() {
1325                            return Err(anyhow::anyhow!(
1326                                "TypeError: Edge index must be a string, got: {:?}",
1327                                idx_val
1328                            ));
1329                        }
1330                    }
1331                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1332                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1333                        && let Some(key) = idx_val.as_str()
1334                    {
1335                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1336                        {
1337                            return Ok(val);
1338                        }
1339                        return Ok(Value::Null);
1340                    }
1341                    if arr_val.is_null() {
1342                        return Ok(Value::Null);
1343                    }
1344                    Err(anyhow!(
1345                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1346                        arr_val
1347                    ))
1348                }
1349                Expr::ArraySlice { array, start, end } => {
1350                    let arr_val = this
1351                        .evaluate_expr(array, row, prop_manager, params, ctx)
1352                        .await?;
1353
1354                    if let Value::List(arr) = &arr_val {
1355                        let len = arr.len();
1356
1357                        // Evaluate start index (default to 0), null → null result
1358                        let start_idx = if let Some(s) = start {
1359                            let v = this
1360                                .evaluate_expr(s, row, prop_manager, params, ctx)
1361                                .await?;
1362                            if v.is_null() {
1363                                return Ok(Value::Null);
1364                            }
1365                            let raw = v.as_i64().unwrap_or(0);
1366                            if raw < 0 {
1367                                (len as i64 + raw).max(0) as usize
1368                            } else {
1369                                (raw as usize).min(len)
1370                            }
1371                        } else {
1372                            0
1373                        };
1374
1375                        // Evaluate end index (default to length), null → null result
1376                        let end_idx = if let Some(e) = end {
1377                            let v = this
1378                                .evaluate_expr(e, row, prop_manager, params, ctx)
1379                                .await?;
1380                            if v.is_null() {
1381                                return Ok(Value::Null);
1382                            }
1383                            let raw = v.as_i64().unwrap_or(len as i64);
1384                            if raw < 0 {
1385                                (len as i64 + raw).max(0) as usize
1386                            } else {
1387                                (raw as usize).min(len)
1388                            }
1389                        } else {
1390                            len
1391                        };
1392
1393                        // Return sliced array
1394                        if start_idx >= end_idx {
1395                            return Ok(Value::List(vec![]));
1396                        }
1397                        let end_idx = end_idx.min(len);
1398                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1399                    }
1400
1401                    if arr_val.is_null() {
1402                        return Ok(Value::Null);
1403                    }
1404                    Err(anyhow!("Cannot slice {:?}", arr_val))
1405                }
1406                Expr::Literal(lit) => Ok(lit.to_value()),
1407                Expr::List(items) => {
1408                    let mut vals = Vec::new();
1409                    for item in items {
1410                        vals.push(
1411                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1412                                .await?,
1413                        );
1414                    }
1415                    Ok(Value::List(vals))
1416                }
1417                Expr::Map(items) => {
1418                    let mut map = HashMap::new();
1419                    for (key, value_expr) in items {
1420                        let val = this
1421                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1422                            .await?;
1423                        map.insert(key.clone(), val);
1424                    }
1425                    Ok(Value::Map(map))
1426                }
1427                Expr::Exists { query, .. } => {
1428                    // Plan and execute subquery; failures return false (pattern doesn't match)
1429                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1430                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1431
1432                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1433                        Ok(plan) => {
1434                            let mut sub_params = params.clone();
1435                            sub_params.extend(row.clone());
1436
1437                            match this.execute(plan, prop_manager, &sub_params).await {
1438                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1439                                Err(e) => {
1440                                    log::debug!("EXISTS subquery execution failed: {}", e);
1441                                    Ok(Value::Bool(false))
1442                                }
1443                            }
1444                        }
1445                        Err(e) => {
1446                            log::debug!("EXISTS subquery planning failed: {}", e);
1447                            Ok(Value::Bool(false))
1448                        }
1449                    }
1450                }
1451                Expr::CountSubquery(query) => {
1452                    // Similar to Exists but returns count
1453                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1454
1455                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1456
1457                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1458                        Ok(plan) => {
1459                            let mut sub_params = params.clone();
1460                            sub_params.extend(row.clone());
1461
1462                            match this.execute(plan, prop_manager, &sub_params).await {
1463                                Ok(results) => Ok(Value::from(results.len() as i64)),
1464                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1465                            }
1466                        }
1467                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1468                    }
1469                }
1470                Expr::Quantifier {
1471                    quantifier,
1472                    variable,
1473                    list,
1474                    predicate,
1475                } => {
1476                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1477                    //
1478                    // This is the primary execution path for quantifiers because DataFusion
1479                    // does not support lambda functions yet. Queries with quantifiers attempt
1480                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1481                    // to this fallback executor path.
1482                    //
1483                    // This is intentional design - we get correct semantics with row-by-row
1484                    // evaluation until DataFusion adds lambda support.
1485                    //
1486                    // See: https://github.com/apache/datafusion/issues/14205
1487
1488                    // Evaluate the list expression
1489                    let list_val = this
1490                        .evaluate_expr(list, row, prop_manager, params, ctx)
1491                        .await?;
1492
1493                    // Handle null propagation
1494                    if list_val.is_null() {
1495                        return Ok(Value::Null);
1496                    }
1497
1498                    // Convert to array
1499                    let items = match list_val {
1500                        Value::List(arr) => arr,
1501                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1502                    };
1503
1504                    // Evaluate predicate for each item
1505                    let mut satisfied_count = 0;
1506                    for item in &items {
1507                        // Create new row with bound variable
1508                        let mut item_row = row.clone();
1509                        item_row.insert(variable.clone(), item.clone());
1510
1511                        // Evaluate predicate with bound variable
1512                        let pred_result = this
1513                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1514                            .await?;
1515
1516                        // Check if predicate is satisfied
1517                        if let Value::Bool(true) = pred_result {
1518                            satisfied_count += 1;
1519                        }
1520                    }
1521
1522                    // Return based on quantifier type
1523                    let result = match quantifier {
1524                        Quantifier::All => satisfied_count == items.len(),
1525                        Quantifier::Any => satisfied_count > 0,
1526                        Quantifier::Single => satisfied_count == 1,
1527                        Quantifier::None => satisfied_count == 0,
1528                    };
1529
1530                    Ok(Value::Bool(result))
1531                }
1532                Expr::ListComprehension {
1533                    variable,
1534                    list,
1535                    where_clause,
1536                    map_expr,
1537                } => {
1538                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1539                    //
1540                    // Similar to quantifiers, this requires lambda-like evaluation
1541                    // which DataFusion doesn't support yet. This is the primary execution path.
1542
1543                    // Evaluate the list expression
1544                    let list_val = this
1545                        .evaluate_expr(list, row, prop_manager, params, ctx)
1546                        .await?;
1547
1548                    // Handle null propagation
1549                    if list_val.is_null() {
1550                        return Ok(Value::Null);
1551                    }
1552
1553                    // Convert to array
1554                    let items = match list_val {
1555                        Value::List(arr) => arr,
1556                        _ => {
1557                            return Err(anyhow!(
1558                                "List comprehension expects a list, got: {:?}",
1559                                list_val
1560                            ));
1561                        }
1562                    };
1563
1564                    // Collect mapped values
1565                    let mut results = Vec::new();
1566                    for item in &items {
1567                        // Create new row with bound variable
1568                        let mut item_row = row.clone();
1569                        item_row.insert(variable.clone(), item.clone());
1570
1571                        // Apply WHERE filter if present
1572                        if let Some(predicate) = where_clause {
1573                            let pred_result = this
1574                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1575                                .await?;
1576
1577                            // Skip items that don't match the filter
1578                            if !matches!(pred_result, Value::Bool(true)) {
1579                                continue;
1580                            }
1581                        }
1582
1583                        // Apply map expression
1584                        let mapped_val = this
1585                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1586                            .await?;
1587                        results.push(mapped_val);
1588                    }
1589
1590                    Ok(Value::List(results))
1591                }
1592                Expr::BinaryOp { left, op, right } => {
1593                    // Short-circuit evaluation for AND/OR
1594                    match op {
1595                        BinaryOp::And => {
1596                            let l_val = this
1597                                .evaluate_expr(left, row, prop_manager, params, ctx)
1598                                .await?;
1599                            // Short-circuit: if left is false, don't evaluate right
1600                            if let Some(false) = l_val.as_bool() {
1601                                return Ok(Value::Bool(false));
1602                            }
1603                            let r_val = this
1604                                .evaluate_expr(right, row, prop_manager, params, ctx)
1605                                .await?;
1606                            eval_binary_op(&l_val, op, &r_val)
1607                        }
1608                        BinaryOp::Or => {
1609                            let l_val = this
1610                                .evaluate_expr(left, row, prop_manager, params, ctx)
1611                                .await?;
1612                            // Short-circuit: if left is true, don't evaluate right
1613                            if let Some(true) = l_val.as_bool() {
1614                                return Ok(Value::Bool(true));
1615                            }
1616                            let r_val = this
1617                                .evaluate_expr(right, row, prop_manager, params, ctx)
1618                                .await?;
1619                            eval_binary_op(&l_val, op, &r_val)
1620                        }
1621                        _ => {
1622                            // For all other operators, evaluate both sides
1623                            let l_val = this
1624                                .evaluate_expr(left, row, prop_manager, params, ctx)
1625                                .await?;
1626                            let r_val = this
1627                                .evaluate_expr(right, row, prop_manager, params, ctx)
1628                                .await?;
1629                            eval_binary_op(&l_val, op, &r_val)
1630                        }
1631                    }
1632                }
1633                Expr::In { expr, list } => {
1634                    let l_val = this
1635                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1636                        .await?;
1637                    let r_val = this
1638                        .evaluate_expr(list, row, prop_manager, params, ctx)
1639                        .await?;
1640                    eval_in_op(&l_val, &r_val)
1641                }
1642                Expr::UnaryOp { op, expr } => {
1643                    let val = this
1644                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1645                        .await?;
1646                    match op {
1647                        UnaryOp::Not => {
1648                            // Three-valued logic: NOT null = null
1649                            match val.as_bool() {
1650                                Some(b) => Ok(Value::Bool(!b)),
1651                                None if val.is_null() => Ok(Value::Null),
1652                                None => Err(anyhow!(
1653                                    "InvalidArgumentType: NOT requires a boolean argument"
1654                                )),
1655                            }
1656                        }
1657                        UnaryOp::Neg => {
1658                            if let Some(i) = val.as_i64() {
1659                                Ok(Value::Int(-i))
1660                            } else if let Some(f) = val.as_f64() {
1661                                Ok(Value::Float(-f))
1662                            } else {
1663                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1664                            }
1665                        }
1666                    }
1667                }
1668                Expr::IsNull(expr) => {
1669                    let val = this
1670                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1671                        .await?;
1672                    Ok(Value::Bool(val.is_null()))
1673                }
1674                Expr::IsNotNull(expr) => {
1675                    let val = this
1676                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1677                        .await?;
1678                    Ok(Value::Bool(!val.is_null()))
1679                }
1680                Expr::IsUnique(_) => {
1681                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1682                    Err(anyhow!(
1683                        "IS UNIQUE can only be used in constraint definitions"
1684                    ))
1685                }
1686                Expr::Case {
1687                    expr,
1688                    when_then,
1689                    else_expr,
1690                } => {
1691                    if let Some(base_expr) = expr {
1692                        let base_val = this
1693                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1694                            .await?;
1695                        for (w, t) in when_then {
1696                            let w_val = this
1697                                .evaluate_expr(w, row, prop_manager, params, ctx)
1698                                .await?;
1699                            if base_val == w_val {
1700                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1701                            }
1702                        }
1703                    } else {
1704                        for (w, t) in when_then {
1705                            let w_val = this
1706                                .evaluate_expr(w, row, prop_manager, params, ctx)
1707                                .await?;
1708                            if w_val.as_bool() == Some(true) {
1709                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1710                            }
1711                        }
1712                    }
1713                    if let Some(e) = else_expr {
1714                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1715                    }
1716                    Ok(Value::Null)
1717                }
1718                Expr::Wildcard => Ok(Value::Null),
1719                Expr::FunctionCall { name, args, .. } => {
1720                    // Special case: id() returns VID for nodes and EID for relationships
1721                    if name.eq_ignore_ascii_case("ID") {
1722                        if args.len() != 1 {
1723                            return Err(anyhow!("id() requires exactly 1 argument"));
1724                        }
1725                        let val = this
1726                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1727                            .await?;
1728                        if let Value::Map(map) = &val {
1729                            // Check for _vid (vertex) first
1730                            if let Some(vid_val) = map.get("_vid") {
1731                                return Ok(vid_val.clone());
1732                            }
1733                            // Check for _eid (edge/relationship)
1734                            if let Some(eid_val) = map.get("_eid") {
1735                                return Ok(eid_val.clone());
1736                            }
1737                            // Check for _id (fallback)
1738                            if let Some(id_val) = map.get("_id") {
1739                                return Ok(id_val.clone());
1740                            }
1741                        }
1742                        return Ok(Value::Null);
1743                    }
1744
1745                    // Special case: elementId() returns string format "label_id:local_offset"
1746                    if name.eq_ignore_ascii_case("ELEMENTID") {
1747                        if args.len() != 1 {
1748                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1749                        }
1750                        let val = this
1751                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1752                            .await?;
1753                        if let Value::Map(map) = &val {
1754                            // Check for _vid (vertex) first
1755                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1756                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1757                                return Ok(Value::String(vid_val.to_string()));
1758                            }
1759                            // Check for _eid (edge/relationship)
1760                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1761                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1762                                return Ok(Value::String(eid_val.to_string()));
1763                            }
1764                        }
1765                        return Ok(Value::Null);
1766                    }
1767
1768                    // Special case: type() returns the relationship type name
1769                    if name.eq_ignore_ascii_case("TYPE") {
1770                        if args.len() != 1 {
1771                            return Err(anyhow!("type() requires exactly 1 argument"));
1772                        }
1773                        let val = this
1774                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1775                            .await?;
1776                        if let Value::Map(map) = &val
1777                            && let Some(type_val) = map.get("_type")
1778                        {
1779                            // Numeric _type is an edge type ID; string _type is already a name
1780                            if let Some(type_id) =
1781                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1782                            {
1783                                if let Some(name) = this
1784                                    .storage
1785                                    .schema_manager()
1786                                    .edge_type_name_by_id_unified(type_id)
1787                                {
1788                                    return Ok(Value::String(name));
1789                                }
1790                            } else if let Some(name) = type_val.as_str() {
1791                                return Ok(Value::String(name.to_string()));
1792                            }
1793                        }
1794                        return Ok(Value::Null);
1795                    }
1796
1797                    // Special case: labels() returns the labels of a node
1798                    if name.eq_ignore_ascii_case("LABELS") {
1799                        if args.len() != 1 {
1800                            return Err(anyhow!("labels() requires exactly 1 argument"));
1801                        }
1802                        let val = this
1803                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1804                            .await?;
1805                        if let Value::Map(map) = &val
1806                            && let Some(labels_val) = map.get("_labels")
1807                        {
1808                            return Ok(labels_val.clone());
1809                        }
1810                        return Ok(Value::Null);
1811                    }
1812
1813                    // Special case: properties() returns the properties map of a node/edge
1814                    if name.eq_ignore_ascii_case("PROPERTIES") {
1815                        if args.len() != 1 {
1816                            return Err(anyhow!("properties() requires exactly 1 argument"));
1817                        }
1818                        let val = this
1819                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1820                            .await?;
1821                        if let Value::Map(map) = &val {
1822                            // Filter out internal properties (those starting with _)
1823                            let mut props = HashMap::new();
1824                            for (k, v) in map.iter() {
1825                                if !k.starts_with('_') {
1826                                    props.insert(k.clone(), v.clone());
1827                                }
1828                            }
1829                            return Ok(Value::Map(props));
1830                        }
1831                        return Ok(Value::Null);
1832                    }
1833
1834                    // Special case: startNode() returns the start node of a relationship
1835                    if name.eq_ignore_ascii_case("STARTNODE") {
1836                        if args.len() != 1 {
1837                            return Err(anyhow!("startNode() requires exactly 1 argument"));
1838                        }
1839                        let val = this
1840                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1841                            .await?;
1842                        if let Value::Edge(edge) = &val {
1843                            return Ok(Self::find_node_by_vid(row, edge.src));
1844                        }
1845                        if let Value::Map(map) = &val {
1846                            if let Some(start_node) = map.get("_startNode") {
1847                                return Ok(start_node.clone());
1848                            }
1849                            if let Some(src_vid) = map.get("_src_vid") {
1850                                return Ok(Value::Map(HashMap::from([(
1851                                    "_vid".to_string(),
1852                                    src_vid.clone(),
1853                                )])));
1854                            }
1855                            // Resolve _src VID by looking up node in row
1856                            if let Some(src_id) = map.get("_src")
1857                                && let Some(u) = src_id.as_u64()
1858                            {
1859                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1860                            }
1861                        }
1862                        return Ok(Value::Null);
1863                    }
1864
1865                    // Special case: endNode() returns the end node of a relationship
1866                    if name.eq_ignore_ascii_case("ENDNODE") {
1867                        if args.len() != 1 {
1868                            return Err(anyhow!("endNode() requires exactly 1 argument"));
1869                        }
1870                        let val = this
1871                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1872                            .await?;
1873                        if let Value::Edge(edge) = &val {
1874                            return Ok(Self::find_node_by_vid(row, edge.dst));
1875                        }
1876                        if let Value::Map(map) = &val {
1877                            if let Some(end_node) = map.get("_endNode") {
1878                                return Ok(end_node.clone());
1879                            }
1880                            if let Some(dst_vid) = map.get("_dst_vid") {
1881                                return Ok(Value::Map(HashMap::from([(
1882                                    "_vid".to_string(),
1883                                    dst_vid.clone(),
1884                                )])));
1885                            }
1886                            // Resolve _dst VID by looking up node in row
1887                            if let Some(dst_id) = map.get("_dst")
1888                                && let Some(u) = dst_id.as_u64()
1889                            {
1890                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1891                            }
1892                        }
1893                        return Ok(Value::Null);
1894                    }
1895
1896                    // Special case: hasLabel() checks if a node has a specific label
1897                    // Used for WHERE n:Label predicates
1898                    if name.eq_ignore_ascii_case("HASLABEL") {
1899                        if args.len() != 2 {
1900                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1901                        }
1902                        let node_val = this
1903                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1904                            .await?;
1905                        let label_val = this
1906                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1907                            .await?;
1908
1909                        let label_to_check = label_val.as_str().ok_or_else(|| {
1910                            anyhow!("Second argument to hasLabel must be a string")
1911                        })?;
1912
1913                        let has_label = match &node_val {
1914                            // Handle proper Value::Node type (from result normalization)
1915                            Value::Map(map) if map.contains_key("_vid") => {
1916                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1917                                    labels_arr
1918                                        .iter()
1919                                        .any(|l| l.as_str() == Some(label_to_check))
1920                                } else {
1921                                    false
1922                                }
1923                            }
1924                            // Also handle legacy Object format
1925                            Value::Map(map) => {
1926                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1927                                    labels_arr
1928                                        .iter()
1929                                        .any(|l| l.as_str() == Some(label_to_check))
1930                                } else {
1931                                    false
1932                                }
1933                            }
1934                            _ => false,
1935                        };
1936                        return Ok(Value::Bool(has_label));
1937                    }
1938
1939                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
1940                    // These should be parsed as Expr::Quantifier instead.
1941                    if matches!(
1942                        name.to_uppercase().as_str(),
1943                        "ANY" | "ALL" | "NONE" | "SINGLE"
1944                    ) {
1945                        return Err(anyhow!(
1946                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1947                            name.to_lowercase()
1948                        ));
1949                    }
1950
1951                    // Special case: COALESCE needs short-circuit evaluation
1952                    if name.eq_ignore_ascii_case("COALESCE") {
1953                        for arg in args {
1954                            let val = this
1955                                .evaluate_expr(arg, row, prop_manager, params, ctx)
1956                                .await?;
1957                            if !val.is_null() {
1958                                return Ok(val);
1959                            }
1960                        }
1961                        return Ok(Value::Null);
1962                    }
1963
1964                    // Special case: vector_similarity has dedicated implementation
1965                    if name.eq_ignore_ascii_case("vector_similarity") {
1966                        if args.len() != 2 {
1967                            return Err(anyhow!("vector_similarity takes 2 arguments"));
1968                        }
1969                        let v1 = this
1970                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1971                            .await?;
1972                        let v2 = this
1973                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1974                            .await?;
1975                        return eval_vector_similarity(&v1, &v2);
1976                    }
1977
1978                    // Special case: uni.validAt handles node fetching
1979                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
1980                        || name.eq_ignore_ascii_case("uni.validAt")
1981                        || name.eq_ignore_ascii_case("validAt")
1982                    {
1983                        if args.len() != 4 {
1984                            return Err(anyhow!("validAt requires 4 arguments"));
1985                        }
1986                        let node_val = this
1987                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1988                            .await?;
1989                        let start_prop = this
1990                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1991                            .await?
1992                            .as_str()
1993                            .ok_or(anyhow!("start_prop must be string"))?
1994                            .to_string();
1995                        let end_prop = this
1996                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
1997                            .await?
1998                            .as_str()
1999                            .ok_or(anyhow!("end_prop must be string"))?
2000                            .to_string();
2001                        let time_val = this
2002                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2003                            .await?;
2004
2005                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2006                            anyhow!("time argument must be a datetime value or string")
2007                        })?;
2008
2009                        // Fetch temporal property values - supports both vertices and edges
2010                        let valid_from_val: Option<Value> = if let Ok(vid) =
2011                            Self::vid_from_value(&node_val)
2012                        {
2013                            // Vertex case - VID string format
2014                            prop_manager
2015                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2016                                .await
2017                                .ok()
2018                        } else if let Value::Map(map) = &node_val {
2019                            // Check for embedded _vid or _eid in object
2020                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2021                                let vid = Vid::from(vid_val);
2022                                prop_manager
2023                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2024                                    .await
2025                                    .ok()
2026                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2027                                // Edge case
2028                                let eid = uni_common::core::id::Eid::from(eid_val);
2029                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2030                            } else {
2031                                // Inline object - property embedded directly
2032                                map.get(&start_prop).cloned()
2033                            }
2034                        } else {
2035                            return Ok(Value::Bool(false));
2036                        };
2037
2038                        let valid_from = match valid_from_val {
2039                            Some(ref v) => match value_to_datetime_utc(v) {
2040                                Some(dt) => dt,
2041                                None if v.is_null() => return Ok(Value::Bool(false)),
2042                                None => {
2043                                    return Err(anyhow!(
2044                                        "Property {} must be a datetime value or string",
2045                                        start_prop
2046                                    ));
2047                                }
2048                            },
2049                            None => return Ok(Value::Bool(false)),
2050                        };
2051
2052                        let valid_to_val: Option<Value> = if let Ok(vid) =
2053                            Self::vid_from_value(&node_val)
2054                        {
2055                            // Vertex case - VID string format
2056                            prop_manager
2057                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2058                                .await
2059                                .ok()
2060                        } else if let Value::Map(map) = &node_val {
2061                            // Check for embedded _vid or _eid in object
2062                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2063                                let vid = Vid::from(vid_val);
2064                                prop_manager
2065                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2066                                    .await
2067                                    .ok()
2068                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2069                                // Edge case
2070                                let eid = uni_common::core::id::Eid::from(eid_val);
2071                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2072                            } else {
2073                                // Inline object - property embedded directly
2074                                map.get(&end_prop).cloned()
2075                            }
2076                        } else {
2077                            return Ok(Value::Bool(false));
2078                        };
2079
2080                        let valid_to = match valid_to_val {
2081                            Some(ref v) => match value_to_datetime_utc(v) {
2082                                Some(dt) => Some(dt),
2083                                None if v.is_null() => None,
2084                                None => {
2085                                    return Err(anyhow!(
2086                                        "Property {} must be a datetime value or null",
2087                                        end_prop
2088                                    ));
2089                                }
2090                            },
2091                            None => None,
2092                        };
2093
2094                        let is_valid = valid_from <= query_time
2095                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2096                        return Ok(Value::Bool(is_valid));
2097                    }
2098
2099                    // For all other functions, evaluate arguments then call helper
2100                    let mut evaluated_args = Vec::with_capacity(args.len());
2101                    for arg in args {
2102                        let mut val = this
2103                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2104                            .await?;
2105
2106                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2107                        // Functions like validAt() need access to properties like valid_from/valid_to.
2108                        if let Value::Map(ref mut map) = val {
2109                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2110                        }
2111
2112                        evaluated_args.push(val);
2113                    }
2114                    eval_scalar_function(
2115                        name,
2116                        &evaluated_args,
2117                        self.custom_function_registry.as_deref(),
2118                    )
2119                }
2120                Expr::Reduce {
2121                    accumulator,
2122                    init,
2123                    variable,
2124                    list,
2125                    expr,
2126                } => {
2127                    let mut acc = self
2128                        .evaluate_expr(init, row, prop_manager, params, ctx)
2129                        .await?;
2130                    let list_val = self
2131                        .evaluate_expr(list, row, prop_manager, params, ctx)
2132                        .await?;
2133
2134                    if let Value::List(items) = list_val {
2135                        for item in items {
2136                            // Create a temporary scope/row with accumulator and variable
2137                            // For simplicity in fallback executor, we can construct a new row map
2138                            // merging current row + new variables.
2139                            let mut scope = row.clone();
2140                            scope.insert(accumulator.clone(), acc.clone());
2141                            scope.insert(variable.clone(), item);
2142
2143                            acc = self
2144                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2145                                .await?;
2146                        }
2147                    } else {
2148                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2149                    }
2150                    Ok(acc)
2151                }
2152                Expr::ValidAt { .. } => {
2153                    // VALID_AT should have been transformed to a function call in the planner
2154                    Err(anyhow!(
2155                        "VALID_AT expression should have been transformed to function call in planner"
2156                    ))
2157                }
2158
2159                Expr::LabelCheck { expr, labels } => {
2160                    let val = this
2161                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2162                        .await?;
2163                    match &val {
2164                        Value::Null => Ok(Value::Null),
2165                        Value::Map(map) => {
2166                            // Check if this is an edge (has _eid) or node (has _vid)
2167                            let is_edge = map.contains_key("_eid")
2168                                || map.contains_key("_type_name")
2169                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2170
2171                            if is_edge {
2172                                // Edges have a single type
2173                                if labels.len() > 1 {
2174                                    return Ok(Value::Bool(false));
2175                                }
2176                                let label_to_check = &labels[0];
2177                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2178                                {
2179                                    t == label_to_check
2180                                } else if let Some(Value::String(t)) = map.get("_type") {
2181                                    t == label_to_check
2182                                } else {
2183                                    false
2184                                };
2185                                Ok(Value::Bool(has_type))
2186                            } else {
2187                                // Node: check all labels
2188                                let has_all = labels.iter().all(|label_to_check| {
2189                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2190                                        labels_arr
2191                                            .iter()
2192                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2193                                    } else {
2194                                        false
2195                                    }
2196                                });
2197                                Ok(Value::Bool(has_all))
2198                            }
2199                        }
2200                        _ => Ok(Value::Bool(false)),
2201                    }
2202                }
2203
2204                Expr::MapProjection { base, items } => {
2205                    let base_value = this
2206                        .evaluate_expr(base, row, prop_manager, params, ctx)
2207                        .await?;
2208
2209                    // Extract properties from the base object
2210                    let properties = match &base_value {
2211                        Value::Map(map) => map,
2212                        _ => {
2213                            return Err(anyhow!(
2214                                "Map projection requires object, got {:?}",
2215                                base_value
2216                            ));
2217                        }
2218                    };
2219
2220                    let mut result_map = HashMap::new();
2221
2222                    for item in items {
2223                        match item {
2224                            MapProjectionItem::Property(prop) => {
2225                                if let Some(value) = properties.get(prop.as_str()) {
2226                                    result_map.insert(prop.clone(), value.clone());
2227                                }
2228                            }
2229                            MapProjectionItem::AllProperties => {
2230                                // Include all properties except internal fields (those starting with _)
2231                                for (key, value) in properties.iter() {
2232                                    if !key.starts_with('_') {
2233                                        result_map.insert(key.clone(), value.clone());
2234                                    }
2235                                }
2236                            }
2237                            MapProjectionItem::LiteralEntry(key, expr) => {
2238                                let value = this
2239                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2240                                    .await?;
2241                                result_map.insert(key.clone(), value);
2242                            }
2243                            MapProjectionItem::Variable(var_name) => {
2244                                // Variable selector: include the value of the variable in the result
2245                                // e.g., person{.name, friend} includes the value of 'friend' variable
2246                                if let Some(value) = row.get(var_name.as_str()) {
2247                                    result_map.insert(var_name.clone(), value.clone());
2248                                }
2249                            }
2250                        }
2251                    }
2252
2253                    Ok(Value::Map(result_map))
2254                }
2255            }
2256        })
2257    }
2258
2259    pub(crate) fn execute_subplan<'a>(
2260        &'a self,
2261        plan: LogicalPlan,
2262        prop_manager: &'a PropertyManager,
2263        params: &'a HashMap<String, Value>,
2264        ctx: Option<&'a QueryContext>,
2265    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2266        Box::pin(async move {
2267            if let Some(ctx) = ctx {
2268                ctx.check_timeout()?;
2269            }
2270            match plan {
2271                LogicalPlan::Union { left, right, all } => {
2272                    self.execute_union(left, right, all, prop_manager, params, ctx)
2273                        .await
2274                }
2275                LogicalPlan::CreateVectorIndex {
2276                    config,
2277                    if_not_exists,
2278                } => {
2279                    if if_not_exists && self.index_exists_by_name(&config.name) {
2280                        return Ok(vec![]);
2281                    }
2282                    let idx_mgr = self.storage.index_manager();
2283                    idx_mgr.create_vector_index(config).await?;
2284                    Ok(vec![])
2285                }
2286                LogicalPlan::CreateFullTextIndex {
2287                    config,
2288                    if_not_exists,
2289                } => {
2290                    if if_not_exists && self.index_exists_by_name(&config.name) {
2291                        return Ok(vec![]);
2292                    }
2293                    let idx_mgr = self.storage.index_manager();
2294                    idx_mgr.create_fts_index(config).await?;
2295                    Ok(vec![])
2296                }
2297                LogicalPlan::CreateScalarIndex {
2298                    mut config,
2299                    if_not_exists,
2300                } => {
2301                    if if_not_exists && self.index_exists_by_name(&config.name) {
2302                        return Ok(vec![]);
2303                    }
2304
2305                    // Check for expression indexes - create generated columns
2306                    let mut modified_properties = Vec::new();
2307
2308                    for prop in &config.properties {
2309                        // Heuristic: if contains '(' and ')', it's an expression
2310                        if prop.contains('(') && prop.contains(')') {
2311                            let gen_col = SchemaManager::generated_column_name(prop);
2312
2313                            // Add generated property to schema
2314                            let sm = self.storage.schema_manager_arc();
2315                            if let Err(e) = sm.add_generated_property(
2316                                &config.label,
2317                                &gen_col,
2318                                DataType::String, // Default type for expressions
2319                                prop.clone(),
2320                            ) {
2321                                log::warn!("Failed to add generated property (might exist): {}", e);
2322                            }
2323
2324                            modified_properties.push(gen_col);
2325                        } else {
2326                            // Simple property - use as-is
2327                            modified_properties.push(prop.clone());
2328                        }
2329                    }
2330
2331                    config.properties = modified_properties;
2332
2333                    let idx_mgr = self.storage.index_manager();
2334                    idx_mgr.create_scalar_index(config).await?;
2335                    Ok(vec![])
2336                }
2337                LogicalPlan::CreateJsonFtsIndex {
2338                    config,
2339                    if_not_exists,
2340                } => {
2341                    if if_not_exists && self.index_exists_by_name(&config.name) {
2342                        return Ok(vec![]);
2343                    }
2344                    let idx_mgr = self.storage.index_manager();
2345                    idx_mgr.create_json_fts_index(config).await?;
2346                    Ok(vec![])
2347                }
2348                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2349                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2350                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2351                LogicalPlan::Vacuum => {
2352                    self.execute_vacuum().await?;
2353                    Ok(vec![])
2354                }
2355                LogicalPlan::Checkpoint => {
2356                    self.execute_checkpoint().await?;
2357                    Ok(vec![])
2358                }
2359                LogicalPlan::CopyTo {
2360                    label,
2361                    path,
2362                    format,
2363                    options,
2364                } => {
2365                    let count = self
2366                        .execute_copy_to(&label, &path, &format, &options)
2367                        .await?;
2368                    let mut result = HashMap::new();
2369                    result.insert("count".to_string(), Value::Int(count as i64));
2370                    Ok(vec![result])
2371                }
2372                LogicalPlan::CopyFrom {
2373                    label,
2374                    path,
2375                    format,
2376                    options,
2377                } => {
2378                    let count = self
2379                        .execute_copy_from(&label, &path, &format, &options)
2380                        .await?;
2381                    let mut result = HashMap::new();
2382                    result.insert("count".to_string(), Value::Int(count as i64));
2383                    Ok(vec![result])
2384                }
2385                LogicalPlan::CreateLabel(clause) => {
2386                    self.execute_create_label(clause).await?;
2387                    Ok(vec![])
2388                }
2389                LogicalPlan::CreateEdgeType(clause) => {
2390                    self.execute_create_edge_type(clause).await?;
2391                    Ok(vec![])
2392                }
2393                LogicalPlan::AlterLabel(clause) => {
2394                    self.execute_alter_label(clause).await?;
2395                    Ok(vec![])
2396                }
2397                LogicalPlan::AlterEdgeType(clause) => {
2398                    self.execute_alter_edge_type(clause).await?;
2399                    Ok(vec![])
2400                }
2401                LogicalPlan::DropLabel(clause) => {
2402                    self.execute_drop_label(clause).await?;
2403                    Ok(vec![])
2404                }
2405                LogicalPlan::DropEdgeType(clause) => {
2406                    self.execute_drop_edge_type(clause).await?;
2407                    Ok(vec![])
2408                }
2409                LogicalPlan::CreateConstraint(clause) => {
2410                    self.execute_create_constraint(clause).await?;
2411                    Ok(vec![])
2412                }
2413                LogicalPlan::DropConstraint(clause) => {
2414                    self.execute_drop_constraint(clause).await?;
2415                    Ok(vec![])
2416                }
2417                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2418                LogicalPlan::DropIndex { name, if_exists } => {
2419                    let idx_mgr = self.storage.index_manager();
2420                    match idx_mgr.drop_index(&name).await {
2421                        Ok(_) => Ok(vec![]),
2422                        Err(e) => {
2423                            if if_exists && e.to_string().contains("not found") {
2424                                Ok(vec![])
2425                            } else {
2426                                Err(e)
2427                            }
2428                        }
2429                    }
2430                }
2431                LogicalPlan::ShowIndexes { filter } => {
2432                    Ok(self.execute_show_indexes(filter.as_deref()))
2433                }
2434                // Scan/traverse nodes: delegate to DataFusion for data access,
2435                // then convert results to HashMaps for the fallback executor.
2436                LogicalPlan::Scan { .. }
2437                | LogicalPlan::ExtIdLookup { .. }
2438                | LogicalPlan::ScanAll { .. }
2439                | LogicalPlan::ScanMainByLabels { .. }
2440                | LogicalPlan::Traverse { .. }
2441                | LogicalPlan::TraverseMainByType { .. } => {
2442                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2443                    self.record_batches_to_rows(batches)
2444                }
2445                LogicalPlan::Filter {
2446                    input,
2447                    predicate,
2448                    optional_variables,
2449                } => {
2450                    let input_matches = self
2451                        .execute_subplan(*input, prop_manager, params, ctx)
2452                        .await?;
2453
2454                    tracing::debug!(
2455                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2456                        predicate,
2457                        input_matches.len(),
2458                        optional_variables
2459                    );
2460
2461                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2462                    // Group rows by non-optional variables, apply filter, and ensure
2463                    // at least one row per group (with NULLs if filter removes all).
2464                    if !optional_variables.is_empty() {
2465                        // Helper to check if a key belongs to an optional variable.
2466                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2467                        let is_optional_key = |k: &str| -> bool {
2468                            optional_variables.contains(k)
2469                                || optional_variables
2470                                    .iter()
2471                                    .any(|var| k.starts_with(&format!("{}.", var)))
2472                        };
2473
2474                        // Helper to check if a key is internal (should not affect grouping)
2475                        let is_internal_key =
2476                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2477
2478                        // Compute the key (non-optional, non-internal variables) for grouping
2479                        let non_optional_vars: Vec<String> = input_matches
2480                            .first()
2481                            .map(|row| {
2482                                row.keys()
2483                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2484                                    .cloned()
2485                                    .collect()
2486                            })
2487                            .unwrap_or_default();
2488
2489                        // Group rows by their non-optional variable values
2490                        let mut groups: std::collections::HashMap<
2491                            Vec<u8>,
2492                            Vec<HashMap<String, Value>>,
2493                        > = std::collections::HashMap::new();
2494
2495                        for row in &input_matches {
2496                            // Create a key from non-optional variable values
2497                            let key: Vec<u8> = non_optional_vars
2498                                .iter()
2499                                .map(|var| {
2500                                    row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2501                                })
2502                                .collect::<Vec<_>>()
2503                                .join("|")
2504                                .into_bytes();
2505
2506                            groups.entry(key).or_default().push(row.clone());
2507                        }
2508
2509                        let mut filtered = Vec::new();
2510                        for (_key, group_rows) in groups {
2511                            let mut group_passed = Vec::new();
2512
2513                            for row in &group_rows {
2514                                // If optional variables are already NULL, preserve the row
2515                                let has_null_optional = optional_variables.iter().any(|var| {
2516                                    // Check both "var" and "var._vid" style keys
2517                                    let direct_null =
2518                                        matches!(row.get(var), Some(Value::Null) | None);
2519                                    let prefixed_null = row
2520                                        .keys()
2521                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2522                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2523                                    direct_null || prefixed_null
2524                                });
2525
2526                                if has_null_optional {
2527                                    group_passed.push(row.clone());
2528                                    continue;
2529                                }
2530
2531                                let res = self
2532                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2533                                    .await?;
2534
2535                                if res.as_bool().unwrap_or(false) {
2536                                    group_passed.push(row.clone());
2537                                }
2538                            }
2539
2540                            if group_passed.is_empty() {
2541                                // No rows passed - emit one row with NULLs for optional variables
2542                                // Use the first row's non-optional values as a template
2543                                if let Some(template) = group_rows.first() {
2544                                    let mut null_row = HashMap::new();
2545                                    for (k, v) in template {
2546                                        if is_optional_key(k) {
2547                                            null_row.insert(k.clone(), Value::Null);
2548                                        } else {
2549                                            null_row.insert(k.clone(), v.clone());
2550                                        }
2551                                    }
2552                                    filtered.push(null_row);
2553                                }
2554                            } else {
2555                                filtered.extend(group_passed);
2556                            }
2557                        }
2558
2559                        tracing::debug!(
2560                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2561                            input_matches.len(),
2562                            filtered.len()
2563                        );
2564
2565                        return Ok(filtered);
2566                    }
2567
2568                    // Standard filter for non-OPTIONAL MATCH
2569                    let mut filtered = Vec::new();
2570                    for row in input_matches.iter() {
2571                        let res = self
2572                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2573                            .await?;
2574
2575                        let passes = res.as_bool().unwrap_or(false);
2576
2577                        if passes {
2578                            filtered.push(row.clone());
2579                        }
2580                    }
2581
2582                    tracing::debug!(
2583                        "Filter: {} input rows -> {} output rows",
2584                        input_matches.len(),
2585                        filtered.len()
2586                    );
2587
2588                    Ok(filtered)
2589                }
2590                LogicalPlan::ProcedureCall {
2591                    procedure_name,
2592                    arguments,
2593                    yield_items,
2594                } => {
2595                    let yield_names: Vec<String> =
2596                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2597                    let results = self
2598                        .execute_procedure(
2599                            &procedure_name,
2600                            &arguments,
2601                            &yield_names,
2602                            prop_manager,
2603                            params,
2604                            ctx,
2605                        )
2606                        .await?;
2607
2608                    // Handle aliasing: collect all original values first, then
2609                    // build the aliased row in one pass. This avoids issues when
2610                    // an alias matches another yield item's original name (e.g.,
2611                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2612                    // clobber the original "b" before it is renamed to "d").
2613                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2614                    if !has_aliases {
2615                        // No aliases (includes YIELD * which produces empty yield_items) —
2616                        // pass through the procedure output rows unchanged.
2617                        Ok(results)
2618                    } else {
2619                        let mut aliased_results = Vec::with_capacity(results.len());
2620                        for row in results {
2621                            let mut new_row = HashMap::new();
2622                            for (name, alias) in &yield_items {
2623                                let col_name = alias.as_ref().unwrap_or(name);
2624                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2625                                new_row.insert(col_name.clone(), val);
2626                            }
2627                            aliased_results.push(new_row);
2628                        }
2629                        Ok(aliased_results)
2630                    }
2631                }
2632                LogicalPlan::VectorKnn { .. } => {
2633                    unreachable!("VectorKnn is handled by DataFusion engine")
2634                }
2635                LogicalPlan::InvertedIndexLookup { .. } => {
2636                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2637                }
2638                LogicalPlan::Sort { input, order_by } => {
2639                    let rows = self
2640                        .execute_subplan(*input, prop_manager, params, ctx)
2641                        .await?;
2642                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2643                        .await
2644                }
2645                LogicalPlan::Limit { input, skip, fetch } => {
2646                    let rows = self
2647                        .execute_subplan(*input, prop_manager, params, ctx)
2648                        .await?;
2649                    let skip = skip.unwrap_or(0);
2650                    let take = fetch.unwrap_or(usize::MAX);
2651                    Ok(rows.into_iter().skip(skip).take(take).collect())
2652                }
2653                LogicalPlan::Aggregate {
2654                    input,
2655                    group_by,
2656                    aggregates,
2657                } => {
2658                    let rows = self
2659                        .execute_subplan(*input, prop_manager, params, ctx)
2660                        .await?;
2661                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2662                        .await
2663                }
2664                LogicalPlan::Window {
2665                    input,
2666                    window_exprs,
2667                } => {
2668                    let rows = self
2669                        .execute_subplan(*input, prop_manager, params, ctx)
2670                        .await?;
2671                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2672                        .await
2673                }
2674                LogicalPlan::Project { input, projections } => {
2675                    let matches = self
2676                        .execute_subplan(*input, prop_manager, params, ctx)
2677                        .await?;
2678                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2679                        .await
2680                }
2681                LogicalPlan::Distinct { input } => {
2682                    let rows = self
2683                        .execute_subplan(*input, prop_manager, params, ctx)
2684                        .await?;
2685                    let mut seen = std::collections::HashSet::new();
2686                    let mut result = Vec::new();
2687                    for row in rows {
2688                        let key = Self::canonical_row_key(&row);
2689                        if seen.insert(key) {
2690                            result.push(row);
2691                        }
2692                    }
2693                    Ok(result)
2694                }
2695                LogicalPlan::Unwind {
2696                    input,
2697                    expr,
2698                    variable,
2699                } => {
2700                    let input_rows = self
2701                        .execute_subplan(*input, prop_manager, params, ctx)
2702                        .await?;
2703                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2704                        .await
2705                }
2706                LogicalPlan::Apply {
2707                    input,
2708                    subquery,
2709                    input_filter,
2710                } => {
2711                    let input_rows = self
2712                        .execute_subplan(*input, prop_manager, params, ctx)
2713                        .await?;
2714                    self.execute_apply(
2715                        input_rows,
2716                        &subquery,
2717                        input_filter.as_ref(),
2718                        prop_manager,
2719                        params,
2720                        ctx,
2721                    )
2722                    .await
2723                }
2724                LogicalPlan::SubqueryCall { input, subquery } => {
2725                    let input_rows = self
2726                        .execute_subplan(*input, prop_manager, params, ctx)
2727                        .await?;
2728                    // Execute subquery for each input row (correlated)
2729                    // No input_filter for CALL { }
2730                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2731                        .await
2732                }
2733                LogicalPlan::RecursiveCTE {
2734                    cte_name,
2735                    initial,
2736                    recursive,
2737                } => {
2738                    self.execute_recursive_cte(
2739                        &cte_name,
2740                        *initial,
2741                        *recursive,
2742                        prop_manager,
2743                        params,
2744                        ctx,
2745                    )
2746                    .await
2747                }
2748                LogicalPlan::CrossJoin { left, right } => {
2749                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2750                        .await
2751                }
2752                LogicalPlan::Set { .. }
2753                | LogicalPlan::Remove { .. }
2754                | LogicalPlan::Merge { .. }
2755                | LogicalPlan::Create { .. }
2756                | LogicalPlan::CreateBatch { .. } => {
2757                    unreachable!("mutations are handled by DataFusion engine")
2758                }
2759                LogicalPlan::Delete { .. } => {
2760                    unreachable!("mutations are handled by DataFusion engine")
2761                }
2762                LogicalPlan::Copy {
2763                    target,
2764                    source,
2765                    is_export,
2766                    options,
2767                } => {
2768                    if is_export {
2769                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2770                            .await
2771                    } else {
2772                        self.execute_copy(&target, &source, &options, prop_manager)
2773                            .await
2774                    }
2775                }
2776                LogicalPlan::Backup {
2777                    destination,
2778                    options,
2779                } => self.execute_backup(&destination, &options).await,
2780                LogicalPlan::Explain { plan } => {
2781                    let plan_str = format!("{:#?}", plan);
2782                    let mut row = HashMap::new();
2783                    row.insert("plan".to_string(), Value::String(plan_str));
2784                    Ok(vec![row])
2785                }
2786                LogicalPlan::ShortestPath { .. } => {
2787                    unreachable!("ShortestPath is handled by DataFusion engine")
2788                }
2789                LogicalPlan::AllShortestPaths { .. } => {
2790                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2791                }
2792                LogicalPlan::Foreach { .. } => {
2793                    unreachable!("mutations are handled by DataFusion engine")
2794                }
2795                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2796                LogicalPlan::BindZeroLengthPath { .. } => {
2797                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2798                }
2799                LogicalPlan::BindPath { .. } => {
2800                    unreachable!("BindPath is handled by DataFusion engine")
2801                }
2802                LogicalPlan::QuantifiedPattern { .. } => {
2803                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
2804                }
2805                LogicalPlan::LocyProgram { .. }
2806                | LogicalPlan::LocyFold { .. }
2807                | LogicalPlan::LocyBestBy { .. }
2808                | LogicalPlan::LocyPriority { .. }
2809                | LogicalPlan::LocyDerivedScan { .. }
2810                | LogicalPlan::LocyProject { .. } => {
2811                    unreachable!("Locy operators are handled by DataFusion engine")
2812                }
2813            }
2814        })
2815    }
2816
2817    /// Execute a single plan from a FOREACH body with the given scope.
2818    ///
2819    /// Used by the DataFusion ForeachExec operator to delegate body clause
2820    /// execution back to the executor.
2821    #[expect(clippy::too_many_arguments)]
2822    pub(crate) async fn execute_foreach_body_plan(
2823        &self,
2824        plan: LogicalPlan,
2825        scope: &mut HashMap<String, Value>,
2826        writer: &mut uni_store::runtime::writer::Writer,
2827        prop_manager: &PropertyManager,
2828        params: &HashMap<String, Value>,
2829        ctx: Option<&QueryContext>,
2830        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2831    ) -> Result<()> {
2832        match plan {
2833            LogicalPlan::Set { items, .. } => {
2834                self.execute_set_items_locked(
2835                    &items,
2836                    scope,
2837                    writer,
2838                    prop_manager,
2839                    params,
2840                    ctx,
2841                    tx_l0,
2842                )
2843                .await?;
2844            }
2845            LogicalPlan::Remove { items, .. } => {
2846                self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx, tx_l0)
2847                    .await?;
2848            }
2849            LogicalPlan::Delete { items, detach, .. } => {
2850                for expr in &items {
2851                    let val = self
2852                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
2853                        .await?;
2854                    self.execute_delete_item_locked(&val, detach, writer, tx_l0)
2855                        .await?;
2856                }
2857            }
2858            LogicalPlan::Create { pattern, .. } => {
2859                self.execute_create_pattern(
2860                    &pattern,
2861                    scope,
2862                    writer,
2863                    prop_manager,
2864                    params,
2865                    ctx,
2866                    tx_l0,
2867                )
2868                .await?;
2869            }
2870            LogicalPlan::CreateBatch { patterns, .. } => {
2871                for pattern in &patterns {
2872                    self.execute_create_pattern(
2873                        pattern,
2874                        scope,
2875                        writer,
2876                        prop_manager,
2877                        params,
2878                        ctx,
2879                        tx_l0,
2880                    )
2881                    .await?;
2882                }
2883            }
2884            LogicalPlan::Merge {
2885                pattern,
2886                on_match: _,
2887                on_create,
2888                ..
2889            } => {
2890                self.execute_create_pattern(
2891                    &pattern,
2892                    scope,
2893                    writer,
2894                    prop_manager,
2895                    params,
2896                    ctx,
2897                    tx_l0,
2898                )
2899                .await?;
2900                if let Some(on_create_clause) = on_create {
2901                    self.execute_set_items_locked(
2902                        &on_create_clause.items,
2903                        scope,
2904                        writer,
2905                        prop_manager,
2906                        params,
2907                        ctx,
2908                        tx_l0,
2909                    )
2910                    .await?;
2911                }
2912            }
2913            LogicalPlan::Foreach {
2914                variable,
2915                list,
2916                body,
2917                ..
2918            } => {
2919                let list_val = self
2920                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
2921                    .await?;
2922                let items = match list_val {
2923                    Value::List(arr) => arr,
2924                    Value::Null => return Ok(()),
2925                    _ => return Err(anyhow!("FOREACH requires a list")),
2926                };
2927                for item in items {
2928                    let mut nested_scope = scope.clone();
2929                    nested_scope.insert(variable.clone(), item);
2930                    for nested_plan in &body {
2931                        Box::pin(self.execute_foreach_body_plan(
2932                            nested_plan.clone(),
2933                            &mut nested_scope,
2934                            writer,
2935                            prop_manager,
2936                            params,
2937                            ctx,
2938                            tx_l0,
2939                        ))
2940                        .await?;
2941                    }
2942                }
2943            }
2944            _ => {
2945                return Err(anyhow!(
2946                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2947                ));
2948            }
2949        }
2950        Ok(())
2951    }
2952
2953    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2954        let mut pairs: Vec<_> = row.iter().collect();
2955        pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2956
2957        pairs
2958            .into_iter()
2959            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2960            .collect::<Vec<_>>()
2961            .join("|")
2962    }
2963
2964    fn canonical_value_key(v: &Value) -> String {
2965        match v {
2966            Value::Null => "null".to_string(),
2967            Value::Bool(b) => format!("b:{b}"),
2968            Value::Int(i) => format!("n:{i}"),
2969            Value::Float(f) => {
2970                if f.is_nan() {
2971                    "nan".to_string()
2972                } else if f.is_infinite() {
2973                    if f.is_sign_positive() {
2974                        "inf:+".to_string()
2975                    } else {
2976                        "inf:-".to_string()
2977                    }
2978                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
2979                    format!("n:{}", *f as i64)
2980                } else {
2981                    format!("f:{f}")
2982                }
2983            }
2984            Value::String(s) => {
2985                if let Some(k) = Self::temporal_string_key(s) {
2986                    format!("temporal:{k}")
2987                } else {
2988                    format!("s:{s}")
2989                }
2990            }
2991            Value::Bytes(b) => format!("bytes:{:?}", b),
2992            Value::List(items) => format!(
2993                "list:[{}]",
2994                items
2995                    .iter()
2996                    .map(Self::canonical_value_key)
2997                    .collect::<Vec<_>>()
2998                    .join(",")
2999            ),
3000            Value::Map(map) => {
3001                let mut pairs: Vec<_> = map.iter().collect();
3002                pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3003                format!(
3004                    "map:{{{}}}",
3005                    pairs
3006                        .into_iter()
3007                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3008                        .collect::<Vec<_>>()
3009                        .join(",")
3010                )
3011            }
3012            Value::Node(n) => {
3013                let mut labels = n.labels.clone();
3014                labels.sort();
3015                format!(
3016                    "node:{}:{}:{}",
3017                    n.vid.as_u64(),
3018                    labels.join(":"),
3019                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3020                )
3021            }
3022            Value::Edge(e) => format!(
3023                "edge:{}:{}:{}:{}:{}",
3024                e.eid.as_u64(),
3025                e.edge_type,
3026                e.src.as_u64(),
3027                e.dst.as_u64(),
3028                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3029            ),
3030            Value::Path(p) => format!(
3031                "path:nodes=[{}];edges=[{}]",
3032                p.nodes
3033                    .iter()
3034                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3035                    .collect::<Vec<_>>()
3036                    .join(","),
3037                p.edges
3038                    .iter()
3039                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3040                    .collect::<Vec<_>>()
3041                    .join(",")
3042            ),
3043            Value::Vector(vs) => format!("vec:{:?}", vs),
3044            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3045            _ => format!("{v:?}"),
3046        }
3047    }
3048
3049    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3050        match t {
3051            uni_common::TemporalValue::Date { days_since_epoch } => {
3052                format!("date:{days_since_epoch}")
3053            }
3054            uni_common::TemporalValue::LocalTime {
3055                nanos_since_midnight,
3056            } => format!("localtime:{nanos_since_midnight}"),
3057            uni_common::TemporalValue::Time {
3058                nanos_since_midnight,
3059                offset_seconds,
3060            } => {
3061                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3062                format!("time:{utc_nanos}")
3063            }
3064            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3065                format!("localdatetime:{nanos_since_epoch}")
3066            }
3067            uni_common::TemporalValue::DateTime {
3068                nanos_since_epoch, ..
3069            } => format!("datetime:{nanos_since_epoch}"),
3070            uni_common::TemporalValue::Duration {
3071                months,
3072                days,
3073                nanos,
3074            } => format!("duration:{months}:{days}:{nanos}"),
3075        }
3076    }
3077
3078    fn temporal_string_key(s: &str) -> Option<String> {
3079        let fn_name = match classify_temporal(s)? {
3080            uni_common::TemporalType::Date => "DATE",
3081            uni_common::TemporalType::LocalTime => "LOCALTIME",
3082            uni_common::TemporalType::Time => "TIME",
3083            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3084            uni_common::TemporalType::DateTime => "DATETIME",
3085            uni_common::TemporalType::Duration => "DURATION",
3086        };
3087        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3088            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3089            _ => None,
3090        }
3091    }
3092
3093    /// Execute aggregate operation: GROUP BY + aggregate functions.
3094    /// Interval for timeout checks in aggregate loops.
3095    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3096
3097    pub(crate) async fn execute_aggregate(
3098        &self,
3099        rows: Vec<HashMap<String, Value>>,
3100        group_by: &[Expr],
3101        aggregates: &[Expr],
3102        prop_manager: &PropertyManager,
3103        params: &HashMap<String, Value>,
3104        ctx: Option<&QueryContext>,
3105    ) -> Result<Vec<HashMap<String, Value>>> {
3106        // CWE-400: Check timeout before aggregation
3107        if let Some(ctx) = ctx {
3108            ctx.check_timeout()?;
3109        }
3110
3111        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3112
3113        // Cypher semantics: aggregation without grouping keys returns one row even
3114        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3115        if rows.is_empty() {
3116            if group_by.is_empty() {
3117                let accs = Self::create_accumulators(aggregates);
3118                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3119                return Ok(vec![row]);
3120            }
3121            return Ok(vec![]);
3122        }
3123
3124        for (idx, row) in rows.into_iter().enumerate() {
3125            // Periodic timeout check during aggregation
3126            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3127                && let Some(ctx) = ctx
3128            {
3129                ctx.check_timeout()?;
3130            }
3131
3132            let key_vals = self
3133                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3134                .await?;
3135            // Build a canonical key so grouping follows Cypher value semantics
3136            // (e.g. temporal equality by instant, numeric normalization where applicable).
3137            let key_str = format!(
3138                "[{}]",
3139                key_vals
3140                    .iter()
3141                    .map(Self::canonical_value_key)
3142                    .collect::<Vec<_>>()
3143                    .join(",")
3144            );
3145
3146            let entry = groups
3147                .entry(key_str)
3148                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3149
3150            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3151                .await?;
3152        }
3153
3154        let results = groups
3155            .values()
3156            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3157            .collect();
3158
3159        Ok(results)
3160    }
3161
3162    pub(crate) async fn execute_window(
3163        &self,
3164        mut rows: Vec<HashMap<String, Value>>,
3165        window_exprs: &[Expr],
3166        _prop_manager: &PropertyManager,
3167        _params: &HashMap<String, Value>,
3168        ctx: Option<&QueryContext>,
3169    ) -> Result<Vec<HashMap<String, Value>>> {
3170        // CWE-400: Check timeout before window computation
3171        if let Some(ctx) = ctx {
3172            ctx.check_timeout()?;
3173        }
3174
3175        // If no rows or no window expressions, return as-is
3176        if rows.is_empty() || window_exprs.is_empty() {
3177            return Ok(rows);
3178        }
3179
3180        // Process each window function expression
3181        for window_expr in window_exprs {
3182            // Extract window function details
3183            let Expr::FunctionCall {
3184                name,
3185                args,
3186                window_spec: Some(window_spec),
3187                ..
3188            } = window_expr
3189            else {
3190                return Err(anyhow!(
3191                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3192                    window_expr
3193                ));
3194            };
3195
3196            let name_upper = name.to_uppercase();
3197
3198            // Validate it's a supported window function
3199            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3200                return Err(anyhow!(
3201                    "Unsupported window function: {}. Supported functions: {}",
3202                    name,
3203                    WINDOW_FUNCTIONS.join(", ")
3204                ));
3205            }
3206
3207            // Build partition groups based on PARTITION BY clause
3208            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3209
3210            for (row_idx, row) in rows.iter().enumerate() {
3211                // Evaluate partition key
3212                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3213                    // No partitioning - all rows in one partition
3214                    vec![]
3215                } else {
3216                    window_spec
3217                        .partition_by
3218                        .iter()
3219                        .map(|expr| self.evaluate_simple_expr(expr, row))
3220                        .collect::<Result<Vec<_>>>()?
3221                };
3222
3223                partition_map
3224                    .entry(partition_key)
3225                    .or_default()
3226                    .push(row_idx);
3227            }
3228
3229            // Process each partition
3230            for (_partition_key, row_indices) in partition_map.iter_mut() {
3231                // Sort rows within partition by ORDER BY clause
3232                if !window_spec.order_by.is_empty() {
3233                    row_indices.sort_by(|&a, &b| {
3234                        for sort_item in &window_spec.order_by {
3235                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3236                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3237
3238                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3239                                let cmp = Executor::compare_values(&va, &vb);
3240                                let cmp = if sort_item.ascending {
3241                                    cmp
3242                                } else {
3243                                    cmp.reverse()
3244                                };
3245                                if cmp != std::cmp::Ordering::Equal {
3246                                    return cmp;
3247                                }
3248                            }
3249                        }
3250                        std::cmp::Ordering::Equal
3251                    });
3252                }
3253
3254                // Compute window function values for this partition
3255                for (position, &row_idx) in row_indices.iter().enumerate() {
3256                    let window_value = match name_upper.as_str() {
3257                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3258                        "RANK" => {
3259                            // RANK: position (1-indexed) of first row in group of tied rows
3260                            let rank = if position == 0 {
3261                                1i64
3262                            } else {
3263                                let prev_row_idx = row_indices[position - 1];
3264                                let same_as_prev = self.rows_have_same_sort_keys(
3265                                    &window_spec.order_by,
3266                                    &rows,
3267                                    row_idx,
3268                                    prev_row_idx,
3269                                );
3270
3271                                if same_as_prev {
3272                                    // Walk backwards to find where this group started
3273                                    let mut group_start = position - 1;
3274                                    while group_start > 0 {
3275                                        let curr_idx = row_indices[group_start];
3276                                        let prev_idx = row_indices[group_start - 1];
3277                                        if !self.rows_have_same_sort_keys(
3278                                            &window_spec.order_by,
3279                                            &rows,
3280                                            curr_idx,
3281                                            prev_idx,
3282                                        ) {
3283                                            break;
3284                                        }
3285                                        group_start -= 1;
3286                                    }
3287                                    (group_start + 1) as i64
3288                                } else {
3289                                    (position + 1) as i64
3290                                }
3291                            };
3292                            Value::from(rank)
3293                        }
3294                        "DENSE_RANK" => {
3295                            // Dense rank: continuous ranking without gaps
3296                            let mut dense_rank = 1i64;
3297                            for i in 0..position {
3298                                let curr_idx = row_indices[i + 1];
3299                                let prev_idx = row_indices[i];
3300                                if !self.rows_have_same_sort_keys(
3301                                    &window_spec.order_by,
3302                                    &rows,
3303                                    curr_idx,
3304                                    prev_idx,
3305                                ) {
3306                                    dense_rank += 1;
3307                                }
3308                            }
3309                            Value::from(dense_rank)
3310                        }
3311                        "LAG" => {
3312                            let (value_expr, offset, default_value) =
3313                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3314
3315                            if position >= offset {
3316                                let target_idx = row_indices[position - offset];
3317                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3318                            } else {
3319                                default_value
3320                            }
3321                        }
3322                        "LEAD" => {
3323                            let (value_expr, offset, default_value) =
3324                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3325
3326                            if position + offset < row_indices.len() {
3327                                let target_idx = row_indices[position + offset];
3328                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3329                            } else {
3330                                default_value
3331                            }
3332                        }
3333                        "NTILE" => {
3334                            // Extract num_buckets argument: NTILE(num_buckets)
3335                            let num_buckets_expr = args.first().ok_or_else(|| {
3336                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3337                            })?;
3338                            let num_buckets_val =
3339                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3340                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3341                                anyhow!(
3342                                    "NTILE argument must be an integer, got: {:?}",
3343                                    num_buckets_val
3344                                )
3345                            })?;
3346
3347                            if num_buckets <= 0 {
3348                                return Err(anyhow!(
3349                                    "NTILE bucket count must be positive, got: {}",
3350                                    num_buckets
3351                                ));
3352                            }
3353
3354                            let num_buckets = num_buckets as usize;
3355                            let partition_size = row_indices.len();
3356
3357                            // Calculate bucket assignment using standard algorithm
3358                            // For N rows and B buckets:
3359                            // - Base size: N / B
3360                            // - Extra rows: N % B (go to first buckets)
3361                            let base_size = partition_size / num_buckets;
3362                            let extra_rows = partition_size % num_buckets;
3363
3364                            // Determine bucket for current row
3365                            let bucket = if position < extra_rows * (base_size + 1) {
3366                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3367                                position / (base_size + 1) + 1
3368                            } else {
3369                                // Row is in one of the normal-sized buckets
3370                                let adjusted_position = position - extra_rows * (base_size + 1);
3371                                extra_rows + (adjusted_position / base_size) + 1
3372                            };
3373
3374                            Value::from(bucket as i64)
3375                        }
3376                        "FIRST_VALUE" => {
3377                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3378                            let value_expr = args.first().ok_or_else(|| {
3379                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3380                            })?;
3381
3382                            // Get the first row in the partition (after ordering)
3383                            if row_indices.is_empty() {
3384                                Value::Null
3385                            } else {
3386                                let first_idx = row_indices[0];
3387                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3388                            }
3389                        }
3390                        "LAST_VALUE" => {
3391                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3392                            let value_expr = args.first().ok_or_else(|| {
3393                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3394                            })?;
3395
3396                            // Get the last row in the partition (after ordering)
3397                            if row_indices.is_empty() {
3398                                Value::Null
3399                            } else {
3400                                let last_idx = row_indices[row_indices.len() - 1];
3401                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3402                            }
3403                        }
3404                        "NTH_VALUE" => {
3405                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3406                            if args.len() != 2 {
3407                                return Err(anyhow!(
3408                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3409                                ));
3410                            }
3411
3412                            let value_expr = &args[0];
3413                            let n_expr = &args[1];
3414
3415                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3416                            let n = n_val.as_i64().ok_or_else(|| {
3417                                anyhow!(
3418                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3419                                    n_val
3420                                )
3421                            })?;
3422
3423                            if n <= 0 {
3424                                return Err(anyhow!(
3425                                    "NTH_VALUE position must be positive, got: {}",
3426                                    n
3427                                ));
3428                            }
3429
3430                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3431                            if nth_index < row_indices.len() {
3432                                let nth_idx = row_indices[nth_index];
3433                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3434                            } else {
3435                                Value::Null
3436                            }
3437                        }
3438                        _ => unreachable!("Window function {} already validated", name),
3439                    };
3440
3441                    // Add window function result to row
3442                    // Use the window expression's string representation as the column name
3443                    let col_name = window_expr.to_string_repr();
3444                    rows[row_idx].insert(col_name, window_value);
3445                }
3446            }
3447        }
3448
3449        Ok(rows)
3450    }
3451
3452    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3453    ///
3454    /// Uses `&self` for consistency with other evaluation methods, though it only
3455    /// recurses for property access.
3456    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3457        match expr {
3458            Expr::Variable(name) => row
3459                .get(name)
3460                .cloned()
3461                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3462            Expr::Property(base, prop) => {
3463                let base_val = self.evaluate_simple_expr(base, row)?;
3464                if let Value::Map(map) = base_val {
3465                    map.get(prop)
3466                        .cloned()
3467                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3468                } else {
3469                    Err(anyhow!("Cannot access property on non-object"))
3470                }
3471            }
3472            Expr::Literal(lit) => Ok(lit.to_value()),
3473            _ => Err(anyhow!(
3474                "Unsupported expression in window function: {:?}",
3475                expr
3476            )),
3477        }
3478    }
3479
3480    /// Check if two rows have matching sort keys for ranking functions.
3481    fn rows_have_same_sort_keys(
3482        &self,
3483        order_by: &[uni_cypher::ast::SortItem],
3484        rows: &[HashMap<String, Value>],
3485        idx_a: usize,
3486        idx_b: usize,
3487    ) -> bool {
3488        order_by.iter().all(|sort_item| {
3489            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3490            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3491            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3492        })
3493    }
3494
3495    /// Extract offset and default value for LAG/LEAD window functions.
3496    fn extract_lag_lead_params<'a>(
3497        &self,
3498        func_name: &str,
3499        args: &'a [Expr],
3500        row: &HashMap<String, Value>,
3501    ) -> Result<(&'a Expr, usize, Value)> {
3502        let value_expr = args.first().ok_or_else(|| {
3503            anyhow!(
3504                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3505                func_name,
3506                func_name
3507            )
3508        })?;
3509
3510        let offset = if let Some(offset_expr) = args.get(1) {
3511            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3512            offset_val.as_i64().ok_or_else(|| {
3513                anyhow!(
3514                    "{} offset must be an integer, got: {:?}",
3515                    func_name,
3516                    offset_val
3517                )
3518            })? as usize
3519        } else {
3520            1
3521        };
3522
3523        let default_value = if let Some(default_expr) = args.get(2) {
3524            self.evaluate_simple_expr(default_expr, row)?
3525        } else {
3526            Value::Null
3527        };
3528
3529        Ok((value_expr, offset, default_value))
3530    }
3531
3532    /// Evaluate group-by key expressions for a row.
3533    pub(crate) async fn evaluate_group_keys(
3534        &self,
3535        group_by: &[Expr],
3536        row: &HashMap<String, Value>,
3537        prop_manager: &PropertyManager,
3538        params: &HashMap<String, Value>,
3539        ctx: Option<&QueryContext>,
3540    ) -> Result<Vec<Value>> {
3541        let mut key_vals = Vec::new();
3542        for expr in group_by {
3543            key_vals.push(
3544                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3545                    .await?,
3546            );
3547        }
3548        Ok(key_vals)
3549    }
3550
3551    /// Update accumulators with values from the current row.
3552    pub(crate) async fn update_accumulators(
3553        &self,
3554        accs: &mut [Accumulator],
3555        aggregates: &[Expr],
3556        row: &HashMap<String, Value>,
3557        prop_manager: &PropertyManager,
3558        params: &HashMap<String, Value>,
3559        ctx: Option<&QueryContext>,
3560    ) -> Result<()> {
3561        for (i, agg_expr) in aggregates.iter().enumerate() {
3562            if let Expr::FunctionCall { args, .. } = agg_expr {
3563                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3564                let val = if is_wildcard {
3565                    Value::Null
3566                } else {
3567                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3568                        .await?
3569                };
3570                accs[i].update(&val, is_wildcard);
3571            }
3572        }
3573        Ok(())
3574    }
3575
3576    /// Execute sort operation with ORDER BY clauses.
3577    pub(crate) async fn execute_recursive_cte(
3578        &self,
3579        cte_name: &str,
3580        initial: LogicalPlan,
3581        recursive: LogicalPlan,
3582        prop_manager: &PropertyManager,
3583        params: &HashMap<String, Value>,
3584        ctx: Option<&QueryContext>,
3585    ) -> Result<Vec<HashMap<String, Value>>> {
3586        use std::collections::HashSet;
3587
3588        // Helper to create a stable key for cycle detection.
3589        // Uses sorted keys to ensure consistent ordering.
3590        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3591            let mut pairs: Vec<_> = row.iter().collect();
3592            pairs.sort_by(|a, b| a.0.cmp(b.0));
3593            format!("{pairs:?}")
3594        }
3595
3596        // 1. Execute Anchor
3597        let mut working_table = self
3598            .execute_subplan(initial, prop_manager, params, ctx)
3599            .await?;
3600        let mut result_table = working_table.clone();
3601
3602        // Track seen rows for cycle detection
3603        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3604
3605        // 2. Loop
3606        // Safety: Max iterations to prevent infinite loop
3607        let max_iterations = self.config.max_recursive_cte_iterations;
3608        for _iteration in 0..max_iterations {
3609            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3610            if let Some(ctx) = ctx {
3611                ctx.check_timeout()?;
3612            }
3613
3614            if working_table.is_empty() {
3615                break;
3616            }
3617
3618            // Bind working table to CTE name in params
3619            let working_val = Value::List(
3620                working_table
3621                    .iter()
3622                    .map(|row| {
3623                        if row.len() == 1 {
3624                            row.values().next().unwrap().clone()
3625                        } else {
3626                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3627                        }
3628                    })
3629                    .collect(),
3630            );
3631
3632            let mut next_params = params.clone();
3633            next_params.insert(cte_name.to_string(), working_val);
3634
3635            // Execute recursive part
3636            let next_result = self
3637                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3638                .await?;
3639
3640            if next_result.is_empty() {
3641                break;
3642            }
3643
3644            // Filter out already-seen rows (cycle detection)
3645            let new_rows: Vec<_> = next_result
3646                .into_iter()
3647                .filter(|row| {
3648                    let key = row_key(row);
3649                    seen.insert(key) // Returns false if already present
3650                })
3651                .collect();
3652
3653            if new_rows.is_empty() {
3654                // All results were cycles - terminate
3655                break;
3656            }
3657
3658            result_table.extend(new_rows.clone());
3659            working_table = new_rows;
3660        }
3661
3662        // Output accumulated results as a variable
3663        let final_list = Value::List(
3664            result_table
3665                .into_iter()
3666                .map(|row| {
3667                    // If the CTE returns a single column and we want to treat it as a list of values?
3668                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3669                    // Cypher LISTs usually contain values.
3670                    // If the row has 1 column, maybe unwrap?
3671                    // But SQL CTEs are tables.
3672                    // Let's stick to List<Map> for consistency with how we pass it in.
3673                    // UNLESS the user extracts it.
3674                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3675                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3676                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3677                    // If row has 1 col, users expect `[val, val]`.
3678                    if row.len() == 1 {
3679                        row.values().next().unwrap().clone()
3680                    } else {
3681                        Value::Map(row.into_iter().collect())
3682                    }
3683                })
3684                .collect(),
3685        );
3686
3687        let mut final_row = HashMap::new();
3688        final_row.insert(cte_name.to_string(), final_list);
3689        Ok(vec![final_row])
3690    }
3691
3692    /// Interval for timeout checks in sort loops.
3693    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3694
3695    pub(crate) async fn execute_sort(
3696        &self,
3697        rows: Vec<HashMap<String, Value>>,
3698        order_by: &[uni_cypher::ast::SortItem],
3699        prop_manager: &PropertyManager,
3700        params: &HashMap<String, Value>,
3701        ctx: Option<&QueryContext>,
3702    ) -> Result<Vec<HashMap<String, Value>>> {
3703        // CWE-400: Check timeout before potentially expensive sort
3704        if let Some(ctx) = ctx {
3705            ctx.check_timeout()?;
3706        }
3707
3708        let mut rows_with_keys = Vec::with_capacity(rows.len());
3709        for (idx, row) in rows.into_iter().enumerate() {
3710            // Periodic timeout check during key extraction
3711            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3712                && let Some(ctx) = ctx
3713            {
3714                ctx.check_timeout()?;
3715            }
3716
3717            let mut keys = Vec::new();
3718            for item in order_by {
3719                let val = row
3720                    .get(&item.expr.to_string_repr())
3721                    .cloned()
3722                    .unwrap_or(Value::Null);
3723                let val = if val.is_null() {
3724                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3725                        .await
3726                        .unwrap_or(Value::Null)
3727                } else {
3728                    val
3729                };
3730                keys.push(val);
3731            }
3732            rows_with_keys.push((row, keys));
3733        }
3734
3735        // Check timeout again before synchronous sort (can't be interrupted)
3736        if let Some(ctx) = ctx {
3737            ctx.check_timeout()?;
3738        }
3739
3740        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3741
3742        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3743    }
3744
3745    /// Create accumulators for aggregate expressions.
3746    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3747        aggregates
3748            .iter()
3749            .map(|expr| {
3750                if let Expr::FunctionCall { name, distinct, .. } = expr {
3751                    Accumulator::new(name, *distinct)
3752                } else {
3753                    Accumulator::new("COUNT", false)
3754                }
3755            })
3756            .collect()
3757    }
3758
3759    /// Build result row from group-by keys and accumulators.
3760    pub(crate) fn build_aggregate_result(
3761        group_by: &[Expr],
3762        aggregates: &[Expr],
3763        key_vals: &[Value],
3764        accs: &[Accumulator],
3765    ) -> HashMap<String, Value> {
3766        let mut res_row = HashMap::new();
3767        for (i, expr) in group_by.iter().enumerate() {
3768            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3769        }
3770        for (i, expr) in aggregates.iter().enumerate() {
3771            // Use aggregate_column_name to ensure consistency with planner
3772            let col_name = crate::query::planner::aggregate_column_name(expr);
3773            res_row.insert(col_name, accs[i].finish());
3774        }
3775        res_row
3776    }
3777
3778    /// Compare and return ordering for sort operation.
3779    pub(crate) fn compare_sort_keys(
3780        a_keys: &[Value],
3781        b_keys: &[Value],
3782        order_by: &[uni_cypher::ast::SortItem],
3783    ) -> std::cmp::Ordering {
3784        for (i, item) in order_by.iter().enumerate() {
3785            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3786            if order != std::cmp::Ordering::Equal {
3787                return if item.ascending {
3788                    order
3789                } else {
3790                    order.reverse()
3791                };
3792            }
3793        }
3794        std::cmp::Ordering::Equal
3795    }
3796
3797    /// Executes BACKUP command to local or cloud storage.
3798    ///
3799    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
3800    pub(crate) async fn execute_backup(
3801        &self,
3802        destination: &str,
3803        _options: &HashMap<String, Value>,
3804    ) -> Result<Vec<HashMap<String, Value>>> {
3805        // 1. Flush L0
3806        if let Some(writer_arc) = &self.writer {
3807            let mut writer = writer_arc.write().await;
3808            writer.flush_to_l1(None).await?;
3809        }
3810
3811        // 2. Snapshot
3812        let snapshot_manager = self.storage.snapshot_manager();
3813        let snapshot = snapshot_manager
3814            .load_latest_snapshot()
3815            .await?
3816            .ok_or_else(|| anyhow!("No snapshot found"))?;
3817
3818        // 3. Copy files - cloud or local path
3819        if is_cloud_url(destination) {
3820            self.backup_to_cloud(destination, &snapshot.snapshot_id)
3821                .await?;
3822        } else {
3823            // Validate local destination path against sandbox
3824            let validated_dest = self.validate_path(destination)?;
3825            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3826                .await?;
3827        }
3828
3829        let mut res = HashMap::new();
3830        res.insert(
3831            "status".to_string(),
3832            Value::String("Backup completed".to_string()),
3833        );
3834        res.insert(
3835            "snapshot_id".to_string(),
3836            Value::String(snapshot.snapshot_id),
3837        );
3838        Ok(vec![res])
3839    }
3840
3841    /// Backs up database to a local filesystem destination.
3842    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3843        let source_path = std::path::Path::new(self.storage.base_path());
3844
3845        if !dest_path.exists() {
3846            std::fs::create_dir_all(dest_path)?;
3847        }
3848
3849        // Recursive copy (local to local)
3850        if source_path.exists() {
3851            Self::copy_dir_all(source_path, dest_path)?;
3852        }
3853
3854        // Copy schema to destination/catalog/schema.json
3855        let schema_manager = self.storage.schema_manager();
3856        let dest_catalog = dest_path.join("catalog");
3857        if !dest_catalog.exists() {
3858            std::fs::create_dir_all(&dest_catalog)?;
3859        }
3860
3861        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3862        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3863
3864        Ok(())
3865    }
3866
3867    /// Backs up database to a cloud storage destination.
3868    ///
3869    /// Streams data from source to destination, supporting cross-cloud backups.
3870    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3871        use object_store::ObjectStore;
3872        use object_store::local::LocalFileSystem;
3873        use object_store::path::Path as ObjPath;
3874
3875        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3876        let source_path = std::path::Path::new(self.storage.base_path());
3877
3878        // Create local store for source, coerced to dyn ObjectStore
3879        let src_store: Arc<dyn ObjectStore> =
3880            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3881
3882        // Copy catalog/ directory
3883        let catalog_src = ObjPath::from("catalog");
3884        let catalog_dst = if dest_prefix.as_ref().is_empty() {
3885            ObjPath::from("catalog")
3886        } else {
3887            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3888        };
3889        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3890
3891        // Copy storage/ directory
3892        let storage_src = ObjPath::from("storage");
3893        let storage_dst = if dest_prefix.as_ref().is_empty() {
3894            ObjPath::from("storage")
3895        } else {
3896            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3897        };
3898        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3899
3900        // Ensure schema is present at canonical catalog location.
3901        let schema_manager = self.storage.schema_manager();
3902        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3903        let schema_path = if dest_prefix.as_ref().is_empty() {
3904            ObjPath::from("catalog/schema.json")
3905        } else {
3906            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3907        };
3908        dest_store
3909            .put(&schema_path, bytes::Bytes::from(schema_content).into())
3910            .await?;
3911
3912        Ok(())
3913    }
3914
3915    /// Maximum directory depth for backup operations.
3916    ///
3917    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
3918    /// excessively deep directory structures.
3919    const MAX_BACKUP_DEPTH: usize = 100;
3920
3921    /// Maximum file count for backup operations.
3922    ///
3923    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
3924    /// long-running operations from malicious or unexpectedly large directories.
3925    const MAX_BACKUP_FILES: usize = 100_000;
3926
3927    /// Recursively copies a directory with security limits.
3928    ///
3929    /// # Security
3930    ///
3931    /// - **CWE-674**: Depth limit prevents stack overflow
3932    /// - **CWE-400**: File count limit prevents resource exhaustion
3933    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
3934    pub(crate) fn copy_dir_all(
3935        src: &std::path::Path,
3936        dst: &std::path::Path,
3937    ) -> std::io::Result<()> {
3938        let mut file_count = 0usize;
3939        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3940    }
3941
3942    /// Internal implementation with depth and file count tracking.
3943    pub(crate) fn copy_dir_all_impl(
3944        src: &std::path::Path,
3945        dst: &std::path::Path,
3946        depth: usize,
3947        file_count: &mut usize,
3948    ) -> std::io::Result<()> {
3949        if depth >= Self::MAX_BACKUP_DEPTH {
3950            return Err(std::io::Error::new(
3951                std::io::ErrorKind::InvalidInput,
3952                format!(
3953                    "Maximum backup depth {} exceeded at {:?}",
3954                    Self::MAX_BACKUP_DEPTH,
3955                    src
3956                ),
3957            ));
3958        }
3959
3960        std::fs::create_dir_all(dst)?;
3961
3962        for entry in std::fs::read_dir(src)? {
3963            if *file_count >= Self::MAX_BACKUP_FILES {
3964                return Err(std::io::Error::new(
3965                    std::io::ErrorKind::InvalidInput,
3966                    format!(
3967                        "Maximum backup file count {} exceeded",
3968                        Self::MAX_BACKUP_FILES
3969                    ),
3970                ));
3971            }
3972            *file_count += 1;
3973
3974            let entry = entry?;
3975            let metadata = entry.metadata()?;
3976
3977            // Skip symlinks to prevent loops and traversal attacks
3978            if metadata.file_type().is_symlink() {
3979                // Silently skip - logging would require tracing dependency
3980                continue;
3981            }
3982
3983            let dst_path = dst.join(entry.file_name());
3984            if metadata.is_dir() {
3985                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
3986            } else {
3987                std::fs::copy(entry.path(), dst_path)?;
3988            }
3989        }
3990        Ok(())
3991    }
3992
3993    pub(crate) async fn execute_copy(
3994        &self,
3995        target: &str,
3996        source: &str,
3997        options: &HashMap<String, Value>,
3998        prop_manager: &PropertyManager,
3999    ) -> Result<Vec<HashMap<String, Value>>> {
4000        let format = options
4001            .get("format")
4002            .and_then(|v| v.as_str())
4003            .unwrap_or_else(|| {
4004                if source.ends_with(".parquet") {
4005                    "parquet"
4006                } else {
4007                    "csv"
4008                }
4009            });
4010
4011        match format.to_lowercase().as_str() {
4012            "csv" => self.execute_csv_import(target, source, options).await,
4013            "parquet" => {
4014                self.execute_parquet_import(target, source, options, prop_manager)
4015                    .await
4016            }
4017            _ => Err(anyhow!("Unsupported format: {}", format)),
4018        }
4019    }
4020
4021    pub(crate) async fn execute_csv_import(
4022        &self,
4023        target: &str,
4024        source: &str,
4025        options: &HashMap<String, Value>,
4026    ) -> Result<Vec<HashMap<String, Value>>> {
4027        // Validate source path against sandbox
4028        let validated_source = self.validate_path(source)?;
4029
4030        let writer_lock = self
4031            .writer
4032            .as_ref()
4033            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4034
4035        let schema = self.storage.schema_manager().schema();
4036
4037        // 1. Determine if target is Label or EdgeType
4038        let label_meta = schema.labels.get(target);
4039        let edge_meta = schema.edge_types.get(target);
4040
4041        if label_meta.is_none() && edge_meta.is_none() {
4042            return Err(anyhow!("Target '{}' not found in schema", target));
4043        }
4044
4045        // 2. Open CSV
4046        let delimiter_str = options
4047            .get("delimiter")
4048            .and_then(|v| v.as_str())
4049            .unwrap_or(",");
4050        let delimiter = if delimiter_str.is_empty() {
4051            b','
4052        } else {
4053            delimiter_str.as_bytes()[0]
4054        };
4055        let has_header = options
4056            .get("header")
4057            .and_then(|v| v.as_bool())
4058            .unwrap_or(true);
4059
4060        let mut rdr = csv::ReaderBuilder::new()
4061            .delimiter(delimiter)
4062            .has_headers(has_header)
4063            .from_path(&validated_source)?;
4064
4065        let headers = rdr.headers()?.clone();
4066        let mut count = 0;
4067
4068        let mut writer = writer_lock.write().await;
4069
4070        if label_meta.is_some() {
4071            let target_props = schema
4072                .properties
4073                .get(target)
4074                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4075
4076            for result in rdr.records() {
4077                let record = result?;
4078                let mut props = HashMap::new();
4079
4080                for (i, header) in headers.iter().enumerate() {
4081                    if let Some(val_str) = record.get(i)
4082                        && let Some(prop_meta) = target_props.get(header)
4083                    {
4084                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4085                        props.insert(header.to_string(), val);
4086                    }
4087                }
4088
4089                let vid = writer.next_vid().await?;
4090                writer
4091                    .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4092                    .await?;
4093                count += 1;
4094            }
4095        } else if let Some(meta) = edge_meta {
4096            let type_id = meta.id;
4097            let target_props = schema
4098                .properties
4099                .get(target)
4100                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4101
4102            // For edges, we need src and dst VIDs.
4103            // Expecting columns '_src' and '_dst' or as specified in options.
4104            let src_col = options
4105                .get("src_col")
4106                .and_then(|v| v.as_str())
4107                .unwrap_or("_src");
4108            let dst_col = options
4109                .get("dst_col")
4110                .and_then(|v| v.as_str())
4111                .unwrap_or("_dst");
4112
4113            for result in rdr.records() {
4114                let record = result?;
4115                let mut props = HashMap::new();
4116                let mut src_vid = None;
4117                let mut dst_vid = None;
4118
4119                for (i, header) in headers.iter().enumerate() {
4120                    if let Some(val_str) = record.get(i) {
4121                        if header == src_col {
4122                            src_vid =
4123                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4124                        } else if header == dst_col {
4125                            dst_vid =
4126                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4127                        } else if let Some(prop_meta) = target_props.get(header) {
4128                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4129                            props.insert(header.to_string(), val);
4130                        }
4131                    }
4132                }
4133
4134                let src =
4135                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4136                let dst = dst_vid
4137                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4138
4139                let eid = writer.next_eid(type_id).await?;
4140                writer
4141                    .insert_edge(
4142                        src,
4143                        dst,
4144                        type_id,
4145                        eid,
4146                        props,
4147                        Some(target.to_string()),
4148                        None,
4149                    )
4150                    .await?;
4151                count += 1;
4152            }
4153        }
4154
4155        let mut res = HashMap::new();
4156        res.insert("count".to_string(), Value::Int(count as i64));
4157        Ok(vec![res])
4158    }
4159
4160    /// Imports data from Parquet file to a label or edge type.
4161    ///
4162    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4163    pub(crate) async fn execute_parquet_import(
4164        &self,
4165        target: &str,
4166        source: &str,
4167        options: &HashMap<String, Value>,
4168        _prop_manager: &PropertyManager,
4169    ) -> Result<Vec<HashMap<String, Value>>> {
4170        let writer_lock = self
4171            .writer
4172            .as_ref()
4173            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4174
4175        let schema = self.storage.schema_manager().schema();
4176
4177        // 1. Determine if target is Label or EdgeType
4178        let label_meta = schema.labels.get(target);
4179        let edge_meta = schema.edge_types.get(target);
4180
4181        if label_meta.is_none() && edge_meta.is_none() {
4182            return Err(anyhow!("Target '{}' not found in schema", target));
4183        }
4184
4185        // 2. Open Parquet - support both local and cloud URLs
4186        let reader = if is_cloud_url(source) {
4187            self.open_parquet_from_cloud(source).await?
4188        } else {
4189            // Validate local source path against sandbox
4190            let validated_source = self.validate_path(source)?;
4191            let file = std::fs::File::open(&validated_source)?;
4192            let builder =
4193                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4194            builder.build()?
4195        };
4196        let mut reader = reader;
4197
4198        let mut count = 0;
4199        let mut writer = writer_lock.write().await;
4200
4201        if label_meta.is_some() {
4202            let target_props = schema
4203                .properties
4204                .get(target)
4205                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4206
4207            for batch in reader.by_ref() {
4208                let batch = batch?;
4209                for row in 0..batch.num_rows() {
4210                    let mut props = HashMap::new();
4211                    for field in batch.schema().fields() {
4212                        let name = field.name();
4213                        if target_props.contains_key(name) {
4214                            let col = batch.column_by_name(name).unwrap();
4215                            if !col.is_null(row) {
4216                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4217                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4218                                let val =
4219                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4220                                props.insert(name.clone(), val);
4221                            }
4222                        }
4223                    }
4224                    let vid = writer.next_vid().await?;
4225                    writer
4226                        .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4227                        .await?;
4228                    count += 1;
4229                }
4230            }
4231        } else if let Some(meta) = edge_meta {
4232            let type_id = meta.id;
4233            let target_props = schema
4234                .properties
4235                .get(target)
4236                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4237
4238            let src_col = options
4239                .get("src_col")
4240                .and_then(|v| v.as_str())
4241                .unwrap_or("_src");
4242            let dst_col = options
4243                .get("dst_col")
4244                .and_then(|v| v.as_str())
4245                .unwrap_or("_dst");
4246
4247            for batch in reader {
4248                let batch = batch?;
4249                for row in 0..batch.num_rows() {
4250                    let mut props = HashMap::new();
4251                    let mut src_vid = None;
4252                    let mut dst_vid = None;
4253
4254                    for field in batch.schema().fields() {
4255                        let name = field.name();
4256                        let col = batch.column_by_name(name).unwrap();
4257                        if col.is_null(row) {
4258                            continue;
4259                        }
4260
4261                        if name == src_col {
4262                            let val = Self::arrow_to_value(col.as_ref(), row);
4263                            src_vid = Some(Self::vid_from_value(&val)?);
4264                        } else if name == dst_col {
4265                            let val = Self::arrow_to_value(col.as_ref(), row);
4266                            dst_vid = Some(Self::vid_from_value(&val)?);
4267                        } else if let Some(pm) = target_props.get(name) {
4268                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4269                            let val =
4270                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4271                            props.insert(name.clone(), val);
4272                        }
4273                    }
4274
4275                    let src = src_vid
4276                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4277                    let dst = dst_vid.ok_or_else(|| {
4278                        anyhow!("Missing destination VID in column '{}'", dst_col)
4279                    })?;
4280
4281                    let eid = writer.next_eid(type_id).await?;
4282                    writer
4283                        .insert_edge(
4284                            src,
4285                            dst,
4286                            type_id,
4287                            eid,
4288                            props,
4289                            Some(target.to_string()),
4290                            None,
4291                        )
4292                        .await?;
4293                    count += 1;
4294                }
4295            }
4296        }
4297
4298        let mut res = HashMap::new();
4299        res.insert("count".to_string(), Value::Int(count as i64));
4300        Ok(vec![res])
4301    }
4302
4303    /// Opens a Parquet file from a cloud URL.
4304    ///
4305    /// Downloads the file to memory and creates a Parquet reader.
4306    async fn open_parquet_from_cloud(
4307        &self,
4308        source_url: &str,
4309    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4310        use object_store::ObjectStore;
4311
4312        let (store, path) = build_store_from_url(source_url)?;
4313
4314        // Download file contents
4315        let bytes = store.get(&path).await?.bytes().await?;
4316
4317        // Create a Parquet reader from the bytes
4318        let reader = bytes::Bytes::from(bytes.to_vec());
4319        let builder =
4320            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4321        Ok(builder.build()?)
4322    }
4323
4324    pub(crate) async fn scan_edge_type(
4325        &self,
4326        edge_type: &str,
4327        ctx: Option<&QueryContext>,
4328    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4329        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4330
4331        // 1. Scan L2 (Base)
4332        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4333
4334        // 2. Scan L1 (Delta)
4335        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4336
4337        // 3. Scan L0 (Memory) and filter tombstoned vertices
4338        if let Some(ctx) = ctx {
4339            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4340            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4341        }
4342
4343        Ok(edges
4344            .into_iter()
4345            .map(|(eid, (src, dst))| (eid, src, dst))
4346            .collect())
4347    }
4348
4349    /// Scan L2 (base) storage for edges of a given type.
4350    ///
4351    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4352    /// This L2 scan will typically find no data.
4353    pub(crate) async fn scan_edge_type_l2(
4354        &self,
4355        _edge_type: &str,
4356        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4357    ) -> Result<()> {
4358        // Edges are now stored in delta datasets (L1) via LanceDB.
4359        // Legacy L2 base edge storage is no longer used.
4360        Ok(())
4361    }
4362
4363    /// Scan L1 (delta) storage for edges of a given type.
4364    pub(crate) async fn scan_edge_type_l1(
4365        &self,
4366        edge_type: &str,
4367        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4368    ) -> Result<()> {
4369        if let Ok(Some(batch)) = self
4370            .storage
4371            .scan_delta_table(
4372                edge_type,
4373                "fwd",
4374                &["eid", "src_vid", "dst_vid", "op", "_version"],
4375                None,
4376            )
4377            .await
4378        {
4379            // Collect ops with versions: eid -> (version, op, src, dst)
4380            let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4381                HashMap::new();
4382
4383            self.process_delta_batch(&batch, &mut versioned_ops)?;
4384
4385            // Apply the winning ops
4386            for (eid, (_, op, src, dst)) in versioned_ops {
4387                if op == 0 {
4388                    edges.insert(eid, (src, dst));
4389                } else if op == 1 {
4390                    edges.remove(&eid);
4391                }
4392            }
4393        }
4394        Ok(())
4395    }
4396
4397    /// Process a delta batch, tracking versioned operations.
4398    pub(crate) fn process_delta_batch(
4399        &self,
4400        batch: &arrow_array::RecordBatch,
4401        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4402    ) -> Result<()> {
4403        use arrow_array::UInt64Array;
4404        let eid_col = batch
4405            .column_by_name("eid")
4406            .ok_or(anyhow!("Missing eid"))?
4407            .as_any()
4408            .downcast_ref::<UInt64Array>()
4409            .ok_or(anyhow!("Invalid eid"))?;
4410        let src_col = batch
4411            .column_by_name("src_vid")
4412            .ok_or(anyhow!("Missing src_vid"))?
4413            .as_any()
4414            .downcast_ref::<UInt64Array>()
4415            .ok_or(anyhow!("Invalid src_vid"))?;
4416        let dst_col = batch
4417            .column_by_name("dst_vid")
4418            .ok_or(anyhow!("Missing dst_vid"))?
4419            .as_any()
4420            .downcast_ref::<UInt64Array>()
4421            .ok_or(anyhow!("Invalid dst_vid"))?;
4422        let op_col = batch
4423            .column_by_name("op")
4424            .ok_or(anyhow!("Missing op"))?
4425            .as_any()
4426            .downcast_ref::<arrow_array::UInt8Array>()
4427            .ok_or(anyhow!("Invalid op"))?;
4428        let version_col = batch
4429            .column_by_name("_version")
4430            .ok_or(anyhow!("Missing _version"))?
4431            .as_any()
4432            .downcast_ref::<UInt64Array>()
4433            .ok_or(anyhow!("Invalid _version"))?;
4434
4435        for i in 0..batch.num_rows() {
4436            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4437            let version = version_col.value(i);
4438            let op = op_col.value(i);
4439            let src = Vid::from(src_col.value(i));
4440            let dst = Vid::from(dst_col.value(i));
4441
4442            match versioned_ops.entry(eid) {
4443                std::collections::hash_map::Entry::Vacant(e) => {
4444                    e.insert((version, op, src, dst));
4445                }
4446                std::collections::hash_map::Entry::Occupied(mut e) => {
4447                    if version > e.get().0 {
4448                        e.insert((version, op, src, dst));
4449                    }
4450                }
4451            }
4452        }
4453        Ok(())
4454    }
4455
4456    /// Scan L0 (memory) buffers for edges of a given type.
4457    pub(crate) fn scan_edge_type_l0(
4458        &self,
4459        edge_type: &str,
4460        ctx: &QueryContext,
4461        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4462    ) {
4463        let schema = self.storage.schema_manager().schema();
4464        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4465
4466        if let Some(type_id) = type_id {
4467            // Main L0
4468            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4469
4470            // Transaction L0
4471            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4472                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4473            }
4474
4475            // Pending flush L0s
4476            for pending_l0_arc in &ctx.pending_flush_l0s {
4477                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4478            }
4479        }
4480    }
4481
4482    /// Scan a single L0 buffer for edges and apply tombstones.
4483    pub(crate) fn scan_single_l0(
4484        &self,
4485        l0: &uni_store::runtime::L0Buffer,
4486        type_id: u32,
4487        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4488    ) {
4489        for edge_entry in l0.graph.edges() {
4490            if edge_entry.edge_type == type_id {
4491                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4492            }
4493        }
4494        // Process Tombstones
4495        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4496        for eid in eids_to_check {
4497            if l0.is_tombstoned(eid) {
4498                edges.remove(&eid);
4499            }
4500        }
4501    }
4502
4503    /// Filter out edges connected to tombstoned vertices.
4504    pub(crate) fn filter_tombstoned_vertex_edges(
4505        &self,
4506        ctx: &QueryContext,
4507        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4508    ) {
4509        let l0 = ctx.l0.read();
4510        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4511
4512        // Include tx_l0 vertex tombstones if present
4513        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4514            let tx_l0 = tx_l0_arc.read();
4515            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4516        }
4517
4518        // Include pending flush L0 vertex tombstones
4519        for pending_l0_arc in &ctx.pending_flush_l0s {
4520            let pending_l0 = pending_l0_arc.read();
4521            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4522        }
4523
4524        edges.retain(|_, (src, dst)| {
4525            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4526        });
4527    }
4528
4529    /// Execute a projection operation.
4530    pub(crate) async fn execute_project(
4531        &self,
4532        input_rows: Vec<HashMap<String, Value>>,
4533        projections: &[(Expr, Option<String>)],
4534        prop_manager: &PropertyManager,
4535        params: &HashMap<String, Value>,
4536        ctx: Option<&QueryContext>,
4537    ) -> Result<Vec<HashMap<String, Value>>> {
4538        let mut results = Vec::new();
4539        for m in input_rows {
4540            let mut row = HashMap::new();
4541            for (expr, alias) in projections {
4542                let val = self
4543                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4544                    .await?;
4545                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4546                row.insert(name, val);
4547            }
4548            results.push(row);
4549        }
4550        Ok(results)
4551    }
4552
4553    /// Execute an UNWIND operation.
4554    pub(crate) async fn execute_unwind(
4555        &self,
4556        input_rows: Vec<HashMap<String, Value>>,
4557        expr: &Expr,
4558        variable: &str,
4559        prop_manager: &PropertyManager,
4560        params: &HashMap<String, Value>,
4561        ctx: Option<&QueryContext>,
4562    ) -> Result<Vec<HashMap<String, Value>>> {
4563        let mut results = Vec::new();
4564        for row in input_rows {
4565            let val = self
4566                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4567                .await?;
4568            if let Value::List(items) = val {
4569                for item in items {
4570                    let mut new_row = row.clone();
4571                    new_row.insert(variable.to_string(), item);
4572                    results.push(new_row);
4573                }
4574            }
4575        }
4576        Ok(results)
4577    }
4578
4579    /// Execute an APPLY (correlated subquery) operation.
4580    pub(crate) async fn execute_apply(
4581        &self,
4582        input_rows: Vec<HashMap<String, Value>>,
4583        subquery: &LogicalPlan,
4584        input_filter: Option<&Expr>,
4585        prop_manager: &PropertyManager,
4586        params: &HashMap<String, Value>,
4587        ctx: Option<&QueryContext>,
4588    ) -> Result<Vec<HashMap<String, Value>>> {
4589        let mut filtered_rows = input_rows;
4590
4591        if let Some(filter) = input_filter {
4592            let mut filtered = Vec::new();
4593            for row in filtered_rows {
4594                let res = self
4595                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4596                    .await?;
4597                if res.as_bool().unwrap_or(false) {
4598                    filtered.push(row);
4599                }
4600            }
4601            filtered_rows = filtered;
4602        }
4603
4604        // Handle empty input: execute subquery once with empty context
4605        // This is critical for standalone CALL statements at the beginning of a query
4606        if filtered_rows.is_empty() {
4607            let sub_rows = self
4608                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4609                .await?;
4610            return Ok(sub_rows);
4611        }
4612
4613        let mut results = Vec::new();
4614        for row in filtered_rows {
4615            let mut sub_params = params.clone();
4616            sub_params.extend(row.clone());
4617
4618            let sub_rows = self
4619                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4620                .await?;
4621
4622            for sub_row in sub_rows {
4623                let mut new_row = row.clone();
4624                new_row.extend(sub_row);
4625                results.push(new_row);
4626            }
4627        }
4628        Ok(results)
4629    }
4630
4631    /// Execute SHOW INDEXES command.
4632    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4633        let schema = self.storage.schema_manager().schema();
4634        let mut rows = Vec::new();
4635        for idx in &schema.indexes {
4636            let (name, type_str, details) = match idx {
4637                uni_common::core::schema::IndexDefinition::Vector(c) => (
4638                    c.name.clone(),
4639                    "VECTOR",
4640                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4641                ),
4642                uni_common::core::schema::IndexDefinition::FullText(c) => (
4643                    c.name.clone(),
4644                    "FULLTEXT",
4645                    format!("on {}:{:?}", c.label, c.properties),
4646                ),
4647                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4648                    cfg.name.clone(),
4649                    "SCALAR",
4650                    format!(":{}({:?})", cfg.label, cfg.properties),
4651                ),
4652                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4653            };
4654
4655            if let Some(f) = filter
4656                && f != type_str
4657            {
4658                continue;
4659            }
4660
4661            let mut row = HashMap::new();
4662            row.insert("name".to_string(), Value::String(name));
4663            row.insert("type".to_string(), Value::String(type_str.to_string()));
4664            row.insert("details".to_string(), Value::String(details));
4665            rows.push(row);
4666        }
4667        rows
4668    }
4669
4670    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4671        let mut row = HashMap::new();
4672        row.insert("name".to_string(), Value::String("uni".to_string()));
4673        // Could add storage path, etc.
4674        vec![row]
4675    }
4676
4677    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4678        // Placeholder as we don't easy access to config struct from here
4679        vec![]
4680    }
4681
4682    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4683        let snapshot = self
4684            .storage
4685            .snapshot_manager()
4686            .load_latest_snapshot()
4687            .await?;
4688        let mut results = Vec::new();
4689
4690        if let Some(snap) = snapshot {
4691            for (label, s) in &snap.vertices {
4692                let mut row = HashMap::new();
4693                row.insert("type".to_string(), Value::String("Label".to_string()));
4694                row.insert("name".to_string(), Value::String(label.clone()));
4695                row.insert("count".to_string(), Value::Int(s.count as i64));
4696                results.push(row);
4697            }
4698            for (edge, s) in &snap.edges {
4699                let mut row = HashMap::new();
4700                row.insert("type".to_string(), Value::String("Edge".to_string()));
4701                row.insert("name".to_string(), Value::String(edge.clone()));
4702                row.insert("count".to_string(), Value::Int(s.count as i64));
4703                results.push(row);
4704            }
4705        }
4706
4707        Ok(results)
4708    }
4709
4710    pub(crate) fn execute_show_constraints(
4711        &self,
4712        clause: ShowConstraints,
4713    ) -> Vec<HashMap<String, Value>> {
4714        let schema = self.storage.schema_manager().schema();
4715        let mut rows = Vec::new();
4716        for c in &schema.constraints {
4717            if let Some(target) = &clause.target {
4718                match (target, &c.target) {
4719                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4720                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4721                        if e1 == e2 => {}
4722                    _ => continue,
4723                }
4724            }
4725
4726            let mut row = HashMap::new();
4727            row.insert("name".to_string(), Value::String(c.name.clone()));
4728            let type_str = match c.constraint_type {
4729                ConstraintType::Unique { .. } => "UNIQUE",
4730                ConstraintType::Exists { .. } => "EXISTS",
4731                ConstraintType::Check { .. } => "CHECK",
4732                _ => "UNKNOWN",
4733            };
4734            row.insert("type".to_string(), Value::String(type_str.to_string()));
4735
4736            let target_str = match &c.target {
4737                ConstraintTarget::Label(l) => format!("(:{})", l),
4738                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4739                _ => "UNKNOWN".to_string(),
4740            };
4741            row.insert("target".to_string(), Value::String(target_str));
4742
4743            rows.push(row);
4744        }
4745        rows
4746    }
4747
4748    /// Execute a MERGE operation.
4749    pub(crate) async fn execute_cross_join(
4750        &self,
4751        left: Box<LogicalPlan>,
4752        right: Box<LogicalPlan>,
4753        prop_manager: &PropertyManager,
4754        params: &HashMap<String, Value>,
4755        ctx: Option<&QueryContext>,
4756    ) -> Result<Vec<HashMap<String, Value>>> {
4757        let left_rows = self
4758            .execute_subplan(*left, prop_manager, params, ctx)
4759            .await?;
4760        let right_rows = self
4761            .execute_subplan(*right, prop_manager, params, ctx)
4762            .await?;
4763
4764        let mut results = Vec::new();
4765        for l in &left_rows {
4766            for r in &right_rows {
4767                let mut combined = l.clone();
4768                combined.extend(r.clone());
4769                results.push(combined);
4770            }
4771        }
4772        Ok(results)
4773    }
4774
4775    /// Execute a UNION operation with optional deduplication.
4776    pub(crate) async fn execute_union(
4777        &self,
4778        left: Box<LogicalPlan>,
4779        right: Box<LogicalPlan>,
4780        all: bool,
4781        prop_manager: &PropertyManager,
4782        params: &HashMap<String, Value>,
4783        ctx: Option<&QueryContext>,
4784    ) -> Result<Vec<HashMap<String, Value>>> {
4785        let mut left_rows = self
4786            .execute_subplan(*left, prop_manager, params, ctx)
4787            .await?;
4788        let mut right_rows = self
4789            .execute_subplan(*right, prop_manager, params, ctx)
4790            .await?;
4791
4792        left_rows.append(&mut right_rows);
4793
4794        if !all {
4795            let mut seen = HashSet::new();
4796            left_rows.retain(|row| {
4797                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4798                let key = format!("{sorted_row:?}");
4799                seen.insert(key)
4800            });
4801        }
4802        Ok(left_rows)
4803    }
4804
4805    /// Check if an index with the given name exists.
4806    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4807        let schema = self.storage.schema_manager().schema();
4808        schema.indexes.iter().any(|idx| match idx {
4809            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4810            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4811            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4812            _ => false,
4813        })
4814    }
4815
4816    pub(crate) async fn execute_export(
4817        &self,
4818        target: &str,
4819        source: &str,
4820        options: &HashMap<String, Value>,
4821        prop_manager: &PropertyManager,
4822        ctx: Option<&QueryContext>,
4823    ) -> Result<Vec<HashMap<String, Value>>> {
4824        let format = options
4825            .get("format")
4826            .and_then(|v| v.as_str())
4827            .unwrap_or("csv")
4828            .to_lowercase();
4829
4830        match format.as_str() {
4831            "csv" => {
4832                self.execute_csv_export(target, source, options, prop_manager, ctx)
4833                    .await
4834            }
4835            "parquet" => {
4836                self.execute_parquet_export(target, source, options, prop_manager, ctx)
4837                    .await
4838            }
4839            _ => Err(anyhow!("Unsupported export format: {}", format)),
4840        }
4841    }
4842
4843    pub(crate) async fn execute_csv_export(
4844        &self,
4845        target: &str,
4846        source: &str,
4847        options: &HashMap<String, Value>,
4848        prop_manager: &PropertyManager,
4849        ctx: Option<&QueryContext>,
4850    ) -> Result<Vec<HashMap<String, Value>>> {
4851        // Validate destination path against sandbox
4852        let validated_dest = self.validate_path(source)?;
4853
4854        let schema = self.storage.schema_manager().schema();
4855        let label_meta = schema.labels.get(target);
4856        let edge_meta = schema.edge_types.get(target);
4857
4858        if label_meta.is_none() && edge_meta.is_none() {
4859            return Err(anyhow!("Target '{}' not found in schema", target));
4860        }
4861
4862        let delimiter_str = options
4863            .get("delimiter")
4864            .and_then(|v| v.as_str())
4865            .unwrap_or(",");
4866        let delimiter = if delimiter_str.is_empty() {
4867            b','
4868        } else {
4869            delimiter_str.as_bytes()[0]
4870        };
4871        let has_header = options
4872            .get("header")
4873            .and_then(|v| v.as_bool())
4874            .unwrap_or(true);
4875
4876        let mut wtr = csv::WriterBuilder::new()
4877            .delimiter(delimiter)
4878            .from_path(&validated_dest)?;
4879
4880        let mut count = 0;
4881        // Empty properties map for labels/edge types without registered properties
4882        let empty_props = HashMap::new();
4883
4884        if let Some(meta) = label_meta {
4885            let label_id = meta.id;
4886            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4887            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4888            prop_names.sort();
4889
4890            let mut headers = vec!["_vid".to_string()];
4891            headers.extend(prop_names.clone());
4892
4893            if has_header {
4894                wtr.write_record(&headers)?;
4895            }
4896
4897            let vids = self
4898                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4899                .await?;
4900
4901            for vid in vids {
4902                let props = prop_manager
4903                    .get_all_vertex_props_with_ctx(vid, ctx)
4904                    .await?
4905                    .unwrap_or_default();
4906
4907                let mut row = Vec::with_capacity(headers.len());
4908                row.push(vid.to_string());
4909                for p_name in &prop_names {
4910                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4911                    row.push(self.format_csv_value(val));
4912                }
4913                wtr.write_record(&row)?;
4914                count += 1;
4915            }
4916        } else if let Some(meta) = edge_meta {
4917            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4918            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4919            prop_names.sort();
4920
4921            // Headers for Edge: _eid, _src, _dst, _type, ...props
4922            let mut headers = vec![
4923                "_eid".to_string(),
4924                "_src".to_string(),
4925                "_dst".to_string(),
4926                "_type".to_string(),
4927            ];
4928            headers.extend(prop_names.clone());
4929
4930            if has_header {
4931                wtr.write_record(&headers)?;
4932            }
4933
4934            let edges = self.scan_edge_type(target, ctx).await?;
4935
4936            for (eid, src, dst) in edges {
4937                let props = prop_manager
4938                    .get_all_edge_props_with_ctx(eid, ctx)
4939                    .await?
4940                    .unwrap_or_default();
4941
4942                let mut row = Vec::with_capacity(headers.len());
4943                row.push(eid.to_string());
4944                row.push(src.to_string());
4945                row.push(dst.to_string());
4946                row.push(meta.id.to_string());
4947
4948                for p_name in &prop_names {
4949                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4950                    row.push(self.format_csv_value(val));
4951                }
4952                wtr.write_record(&row)?;
4953                count += 1;
4954            }
4955        }
4956
4957        wtr.flush()?;
4958        let mut res = HashMap::new();
4959        res.insert("count".to_string(), Value::Int(count as i64));
4960        Ok(vec![res])
4961    }
4962
4963    /// Exports data to Parquet format.
4964    ///
4965    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4966    pub(crate) async fn execute_parquet_export(
4967        &self,
4968        target: &str,
4969        destination: &str,
4970        _options: &HashMap<String, Value>,
4971        prop_manager: &PropertyManager,
4972        ctx: Option<&QueryContext>,
4973    ) -> Result<Vec<HashMap<String, Value>>> {
4974        let schema_manager = self.storage.schema_manager();
4975        let schema = schema_manager.schema();
4976        let label_meta = schema.labels.get(target);
4977        let edge_meta = schema.edge_types.get(target);
4978
4979        if label_meta.is_none() && edge_meta.is_none() {
4980            return Err(anyhow!("Target '{}' not found in schema", target));
4981        }
4982
4983        let arrow_schema = if label_meta.is_some() {
4984            let dataset = self.storage.vertex_dataset(target)?;
4985            dataset.get_arrow_schema(&schema)?
4986        } else {
4987            // Edge Schema
4988            let dataset = self.storage.edge_dataset(target, "", "")?;
4989            dataset.get_arrow_schema(&schema)?
4990        };
4991
4992        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
4993
4994        if let Some(meta) = label_meta {
4995            let label_id = meta.id;
4996            let vids = self
4997                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4998                .await?;
4999
5000            for vid in vids {
5001                let mut props = prop_manager
5002                    .get_all_vertex_props_with_ctx(vid, ctx)
5003                    .await?
5004                    .unwrap_or_default();
5005
5006                props.insert(
5007                    "_vid".to_string(),
5008                    uni_common::Value::Int(vid.as_u64() as i64),
5009                );
5010                if !props.contains_key("_uid") {
5011                    props.insert(
5012                        "_uid".to_string(),
5013                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5014                    );
5015                }
5016                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5017                props.insert("_version".to_string(), uni_common::Value::Int(1));
5018                rows.push(props);
5019            }
5020        } else if edge_meta.is_some() {
5021            let edges = self.scan_edge_type(target, ctx).await?;
5022            for (eid, src, dst) in edges {
5023                let mut props = prop_manager
5024                    .get_all_edge_props_with_ctx(eid, ctx)
5025                    .await?
5026                    .unwrap_or_default();
5027
5028                props.insert(
5029                    "eid".to_string(),
5030                    uni_common::Value::Int(eid.as_u64() as i64),
5031                );
5032                props.insert(
5033                    "src_vid".to_string(),
5034                    uni_common::Value::Int(src.as_u64() as i64),
5035                );
5036                props.insert(
5037                    "dst_vid".to_string(),
5038                    uni_common::Value::Int(dst.as_u64() as i64),
5039                );
5040                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5041                props.insert("_version".to_string(), uni_common::Value::Int(1));
5042                rows.push(props);
5043            }
5044        }
5045
5046        // Write to cloud or local file
5047        if is_cloud_url(destination) {
5048            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5049                .await?;
5050        } else {
5051            // Validate local destination path against sandbox
5052            let validated_dest = self.validate_path(destination)?;
5053            let file = std::fs::File::create(&validated_dest)?;
5054            let mut writer =
5055                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5056
5057            // Write all in one batch for now (simplification)
5058            if !rows.is_empty() {
5059                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5060                writer.write(&batch)?;
5061            }
5062
5063            writer.close()?;
5064        }
5065
5066        let mut res = HashMap::new();
5067        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5068        Ok(vec![res])
5069    }
5070
5071    /// Writes Parquet data to a cloud storage destination.
5072    async fn write_parquet_to_cloud(
5073        &self,
5074        dest_url: &str,
5075        rows: &[HashMap<String, uni_common::Value>],
5076        arrow_schema: &arrow_schema::Schema,
5077    ) -> Result<()> {
5078        use object_store::ObjectStore;
5079
5080        let (store, path) = build_store_from_url(dest_url)?;
5081
5082        // Write to an in-memory buffer
5083        let mut buffer = Vec::new();
5084        {
5085            let mut writer = parquet::arrow::ArrowWriter::try_new(
5086                &mut buffer,
5087                Arc::new(arrow_schema.clone()),
5088                None,
5089            )?;
5090
5091            if !rows.is_empty() {
5092                let batch = self.rows_to_batch(rows, arrow_schema)?;
5093                writer.write(&batch)?;
5094            }
5095
5096            writer.close()?;
5097        }
5098
5099        // Upload to cloud storage
5100        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5101
5102        Ok(())
5103    }
5104
5105    pub(crate) fn rows_to_batch(
5106        &self,
5107        rows: &[HashMap<String, uni_common::Value>],
5108        schema: &arrow_schema::Schema,
5109    ) -> Result<RecordBatch> {
5110        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5111
5112        for field in schema.fields() {
5113            let name = field.name();
5114            let dt = field.data_type();
5115
5116            let values: Vec<uni_common::Value> = rows
5117                .iter()
5118                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5119                .collect();
5120            let array = self.values_to_array(&values, dt)?;
5121            columns.push(array);
5122        }
5123
5124        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5125    }
5126
5127    /// Convert a slice of Values to an Arrow array.
5128    /// Delegates to the shared implementation in arrow_convert module.
5129    pub(crate) fn values_to_array(
5130        &self,
5131        values: &[uni_common::Value],
5132        dt: &arrow_schema::DataType,
5133    ) -> Result<Arc<dyn Array>> {
5134        arrow_convert::values_to_array(values, dt)
5135    }
5136
5137    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5138        match val {
5139            Value::Null => "".to_string(),
5140            Value::String(s) => s,
5141            Value::Int(i) => i.to_string(),
5142            Value::Float(f) => f.to_string(),
5143            Value::Bool(b) => b.to_string(),
5144            _ => format!("{val}"),
5145        }
5146    }
5147
5148    pub(crate) fn parse_csv_value(
5149        &self,
5150        s: &str,
5151        data_type: &uni_common::core::schema::DataType,
5152        prop_name: &str,
5153    ) -> Result<Value> {
5154        if s.is_empty() || s.to_lowercase() == "null" {
5155            return Ok(Value::Null);
5156        }
5157
5158        use uni_common::core::schema::DataType;
5159        match data_type {
5160            DataType::String => Ok(Value::String(s.to_string())),
5161            DataType::Int32 | DataType::Int64 => {
5162                let i = s.parse::<i64>().map_err(|_| {
5163                    anyhow!(
5164                        "Failed to parse integer for property '{}': {}",
5165                        prop_name,
5166                        s
5167                    )
5168                })?;
5169                Ok(Value::Int(i))
5170            }
5171            DataType::Float32 | DataType::Float64 => {
5172                let f = s.parse::<f64>().map_err(|_| {
5173                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5174                })?;
5175                Ok(Value::Float(f))
5176            }
5177            DataType::Bool => {
5178                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5179                    anyhow!(
5180                        "Failed to parse boolean for property '{}': {}",
5181                        prop_name,
5182                        s
5183                    )
5184                })?;
5185                Ok(Value::Bool(b))
5186            }
5187            DataType::CypherValue => {
5188                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5189                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5190                })?;
5191                Ok(Value::from(json_val))
5192            }
5193            DataType::Vector { .. } => {
5194                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5195                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5196                })?;
5197                Ok(Value::Vector(v))
5198            }
5199            _ => Ok(Value::String(s.to_string())),
5200        }
5201    }
5202
5203    pub(crate) async fn detach_delete_vertex(
5204        &self,
5205        vid: Vid,
5206        writer: &mut Writer,
5207        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5208    ) -> Result<()> {
5209        let schema = self.storage.schema_manager().schema();
5210        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5211
5212        // 1. Find and delete all outgoing edges
5213        let out_graph = self
5214            .storage
5215            .load_subgraph_cached(
5216                &[vid],
5217                &edge_type_ids,
5218                1,
5219                uni_store::runtime::Direction::Outgoing,
5220                Some(writer.l0_manager.get_current()),
5221            )
5222            .await?;
5223
5224        for edge in out_graph.edges() {
5225            writer
5226                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5227                .await?;
5228        }
5229
5230        // 2. Find and delete all incoming edges
5231        let in_graph = self
5232            .storage
5233            .load_subgraph_cached(
5234                &[vid],
5235                &edge_type_ids,
5236                1,
5237                uni_store::runtime::Direction::Incoming,
5238                Some(writer.l0_manager.get_current()),
5239            )
5240            .await?;
5241
5242        for edge in in_graph.edges() {
5243            writer
5244                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5245                .await?;
5246        }
5247
5248        Ok(())
5249    }
5250
5251    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5252    pub(crate) async fn batch_detach_delete_vertices(
5253        &self,
5254        vids: &[Vid],
5255        labels_per_vid: Vec<Option<Vec<String>>>,
5256        writer: &mut Writer,
5257        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5258    ) -> Result<()> {
5259        let schema = self.storage.schema_manager().schema();
5260        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5261
5262        // Load outgoing subgraph for all VIDs in one call.
5263        let out_graph = self
5264            .storage
5265            .load_subgraph_cached(
5266                vids,
5267                &edge_type_ids,
5268                1,
5269                uni_store::runtime::Direction::Outgoing,
5270                Some(writer.l0_manager.get_current()),
5271            )
5272            .await?;
5273
5274        for edge in out_graph.edges() {
5275            writer
5276                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5277                .await?;
5278        }
5279
5280        // Load incoming subgraph for all VIDs in one call.
5281        let in_graph = self
5282            .storage
5283            .load_subgraph_cached(
5284                vids,
5285                &edge_type_ids,
5286                1,
5287                uni_store::runtime::Direction::Incoming,
5288                Some(writer.l0_manager.get_current()),
5289            )
5290            .await?;
5291
5292        for edge in in_graph.edges() {
5293            writer
5294                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5295                .await?;
5296        }
5297
5298        // Delete all vertices.
5299        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5300            writer.delete_vertex(*vid, labels, tx_l0).await?;
5301        }
5302
5303        Ok(())
5304    }
5305}