Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54use uni_store::storage::index_manager::IndexManager;
55
56// DataFusion engine imports
57use crate::query::df_graph::L0Context;
58use crate::query::df_planner::HybridPhysicalPlanner;
59use datafusion::physical_plan::ExecutionPlanProperties;
60use datafusion::prelude::SessionContext;
61use parking_lot::RwLock as SyncRwLock;
62
63use arrow_array::{Array, RecordBatch};
64use csv;
65use parquet;
66
67use super::core::*;
68
69/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
70const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
71/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
72const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
73
74/// Collect VIDs from all L0 buffers visible to a query context.
75///
76/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
77/// collects the results. Returns an empty vec when no query context is present.
78fn collect_l0_vids(
79    ctx: Option<&QueryContext>,
80    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
81) -> Vec<Vid> {
82    let mut vids = Vec::new();
83    if let Some(ctx) = ctx {
84        vids.extend(extractor(&ctx.l0.read()));
85        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
86            vids.extend(extractor(&tx_l0_arc.read()));
87        }
88        for pending_l0_arc in &ctx.pending_flush_l0s {
89            vids.extend(extractor(&pending_l0_arc.read()));
90        }
91    }
92    vids
93}
94
95/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
96///
97/// This is the fallback for pushdown hydration - if the entity only has system fields
98/// (indicating pushdown didn't load properties), we load all properties here.
99///
100/// System field counts:
101/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
102/// - Vertex: 3 fields (_vid, _label, _uid)
103async fn hydrate_entity_if_needed(
104    map: &mut HashMap<String, Value>,
105    prop_manager: &PropertyManager,
106    ctx: Option<&QueryContext>,
107) {
108    // Check for edge entity
109    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
110        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
111            tracing::debug!(
112                "Pushdown fallback: hydrating edge {} at execution time",
113                eid_u64
114            );
115            if let Ok(Some(props)) = prop_manager
116                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
117                .await
118            {
119                for (key, value) in props {
120                    map.entry(key).or_insert(value);
121                }
122            }
123        } else {
124            tracing::trace!(
125                "Pushdown success: edge {} already has {} properties",
126                eid_u64,
127                map.len() - EDGE_SYSTEM_FIELD_COUNT
128            );
129        }
130        return;
131    }
132
133    // Check for vertex entity
134    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
135        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
136            tracing::debug!(
137                "Pushdown fallback: hydrating vertex {} at execution time",
138                vid_u64
139            );
140            if let Ok(Some(props)) = prop_manager
141                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
142                .await
143            {
144                for (key, value) in props {
145                    map.entry(key).or_insert(value);
146                }
147            }
148        } else {
149            tracing::trace!(
150                "Pushdown success: vertex {} already has {} properties",
151                vid_u64,
152                map.len() - VERTEX_SYSTEM_FIELD_COUNT
153            );
154        }
155    }
156}
157
158impl Executor {
159    /// Helper to verify and filter candidates against an optional predicate.
160    ///
161    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
162    /// Returns only VIDs that pass the filter (or are not deleted).
163    async fn verify_and_filter_candidates(
164        &self,
165        mut candidates: Vec<Vid>,
166        variable: &str,
167        filter: Option<&Expr>,
168        ctx: Option<&QueryContext>,
169        prop_manager: &PropertyManager,
170        params: &HashMap<String, Value>,
171    ) -> Result<Vec<Vid>> {
172        candidates.sort_unstable();
173        candidates.dedup();
174
175        let mut verified_vids = Vec::new();
176        for vid in candidates {
177            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
178                continue; // Deleted
179            };
180
181            if let Some(expr) = filter {
182                let mut props_map: HashMap<String, Value> = props;
183                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
184
185                let mut row = HashMap::new();
186                row.insert(variable.to_string(), Value::Map(props_map));
187
188                let res = self
189                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
190                    .await?;
191                if res.as_bool().unwrap_or(false) {
192                    verified_vids.push(vid);
193                }
194            } else {
195                verified_vids.push(vid);
196            }
197        }
198
199        Ok(verified_vids)
200    }
201
202    pub(crate) async fn scan_storage_candidates(
203        &self,
204        label_id: u16,
205        variable: &str,
206        filter: Option<&Expr>,
207    ) -> Result<Vec<Vid>> {
208        let schema = self.storage.schema_manager().schema();
209        let label_name = schema
210            .label_name_by_id(label_id)
211            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
212
213        let ds = self.storage.vertex_dataset(label_name)?;
214        let lancedb_store = self.storage.lancedb_store();
215
216        // Try LanceDB first (canonical storage)
217        match ds.open_lancedb(lancedb_store).await {
218            Ok(table) => {
219                use arrow_array::UInt64Array;
220                use futures::TryStreamExt;
221                use lancedb::query::{ExecutableQuery, QueryBase, Select};
222
223                let mut query = table.query();
224
225                // Apply filter if provided, with schema awareness
226                // to skip overflow properties that aren't physical Lance columns.
227                // For labels with no registered properties (schemaless), use an empty
228                // map so all non-system properties are recognized as overflow.
229                let empty_props = std::collections::HashMap::new();
230                let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
231                if let Some(expr) = filter
232                    && let Some(sql) = LanceFilterGenerator::generate(
233                        std::slice::from_ref(expr),
234                        variable,
235                        Some(label_props),
236                    )
237                {
238                    query = query.only_if(format!("_deleted = false AND ({})", sql));
239                } else {
240                    query = query.only_if("_deleted = false");
241                }
242
243                // Project to only _vid
244                let query = query.select(Select::columns(&["_vid"]));
245                let stream = query.execute().await?;
246                let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
247
248                let mut vids = Vec::new();
249                for batch in batches {
250                    let vid_col = batch
251                        .column_by_name("_vid")
252                        .ok_or(anyhow!("Missing _vid"))?
253                        .as_any()
254                        .downcast_ref::<UInt64Array>()
255                        .ok_or(anyhow!("Invalid _vid"))?;
256                    for i in 0..batch.num_rows() {
257                        vids.push(Vid::from(vid_col.value(i)));
258                    }
259                }
260                Ok(vids)
261            }
262            Err(e) => {
263                // Only treat "not found" / "does not exist" errors as empty results.
264                // Propagate all other errors (network, auth, corruption, etc.)
265                let err_msg = e.to_string().to_lowercase();
266                if err_msg.contains("not found")
267                    || err_msg.contains("does not exist")
268                    || err_msg.contains("no such file")
269                    || err_msg.contains("object not found")
270                {
271                    Ok(Vec::new())
272                } else {
273                    Err(e)
274                }
275            }
276        }
277    }
278
279    pub(crate) async fn scan_label_with_filter(
280        &self,
281        label_id: u16,
282        variable: &str,
283        filter: Option<&Expr>,
284        ctx: Option<&QueryContext>,
285        prop_manager: &PropertyManager,
286        params: &HashMap<String, Value>,
287    ) -> Result<Vec<Vid>> {
288        let mut candidates = self
289            .scan_storage_candidates(label_id, variable, filter)
290            .await?;
291
292        // Convert label_id to label_name for L0 lookup
293        let schema = self.storage.schema_manager().schema();
294        if let Some(label_name) = schema.label_name_by_id(label_id) {
295            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
296        }
297
298        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
299            .await
300    }
301
302    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
303        // Handle Value::Node directly (has vid field)
304        if let Value::Node(node) = val {
305            return Ok(node.vid);
306        }
307        // Handle Object (node) containing _vid field
308        if let Value::Map(map) = val
309            && let Some(vid_val) = map.get("_vid")
310            && let Some(v) = vid_val.as_u64()
311        {
312            return Ok(Vid::from(v));
313        }
314        // Handle string format
315        if let Some(s) = val.as_str()
316            && let Ok(id) = s.parse::<u64>()
317        {
318            return Ok(Vid::new(id));
319        }
320        // Handle raw u64
321        if let Some(v) = val.as_u64() {
322            return Ok(Vid::from(v));
323        }
324        Err(anyhow!("Invalid Vid format: {:?}", val))
325    }
326
327    /// Find a node value in the row by VID.
328    ///
329    /// Scans all values in the row, looking for a node (Map or Node) whose VID
330    /// matches the target. Returns the full node value if found, or a minimal
331    /// Map with just `_vid` as fallback.
332    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
333        for val in row.values() {
334            if let Ok(vid) = Self::vid_from_value(val)
335                && vid == target_vid
336            {
337                return val.clone();
338            }
339        }
340        // Fallback: return minimal node map
341        Value::Map(HashMap::from([(
342            "_vid".to_string(),
343            Value::Int(target_vid.as_u64() as i64),
344        )]))
345    }
346
347    /// Create L0 context, session, and planner for DataFusion execution.
348    ///
349    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
350    /// Returns `(session_ctx, planner, prop_manager_arc)`.
351    pub async fn create_datafusion_planner(
352        &self,
353        prop_manager: &PropertyManager,
354        params: &HashMap<String, Value>,
355    ) -> Result<(
356        Arc<SyncRwLock<SessionContext>>,
357        HybridPhysicalPlanner,
358        Arc<PropertyManager>,
359    )> {
360        let query_ctx = self.get_context().await;
361        let l0_context = match query_ctx {
362            Some(ref ctx) => L0Context::from_query_context(ctx),
363            None => L0Context::empty(),
364        };
365
366        let prop_manager_arc = Arc::new(PropertyManager::new(
367            self.storage.clone(),
368            self.storage.schema_manager_arc(),
369            prop_manager.cache_size(),
370        ));
371
372        let session = SessionContext::new();
373        crate::query::df_udfs::register_cypher_udfs(&session)?;
374        let session_ctx = Arc::new(SyncRwLock::new(session));
375
376        let mut planner = HybridPhysicalPlanner::with_l0_context(
377            session_ctx.clone(),
378            self.storage.clone(),
379            l0_context,
380            prop_manager_arc.clone(),
381            self.storage.schema_manager().schema(),
382            params.clone(),
383            HashMap::new(),
384        );
385
386        planner = planner.with_algo_registry(self.algo_registry.clone());
387        if let Some(ref registry) = self.procedure_registry {
388            planner = planner.with_procedure_registry(registry.clone());
389        }
390        if let Some(ref xervo_runtime) = self.xervo_runtime {
391            planner = planner.with_xervo_runtime(xervo_runtime.clone());
392        }
393
394        Ok((session_ctx, planner, prop_manager_arc))
395    }
396
397    /// Execute a DataFusion physical plan and collect all result batches.
398    pub fn collect_batches(
399        session_ctx: &Arc<SyncRwLock<SessionContext>>,
400        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
401    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
402        Box::pin(async move {
403            use futures::TryStreamExt;
404
405            let task_ctx = session_ctx.read().task_ctx();
406            let partition_count = execution_plan.output_partitioning().partition_count();
407            let mut all_batches = Vec::new();
408            for partition in 0..partition_count {
409                let stream = execution_plan.execute(partition, task_ctx.clone())?;
410                let batches: Vec<RecordBatch> = stream.try_collect().await?;
411                all_batches.extend(batches);
412            }
413            Ok(all_batches)
414        })
415    }
416
417    /// Executes a query using the DataFusion-based engine.
418    ///
419    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
420    /// trees with custom graph operators for graph-specific operations.
421    pub async fn execute_datafusion(
422        &self,
423        plan: LogicalPlan,
424        prop_manager: &PropertyManager,
425        params: &HashMap<String, Value>,
426    ) -> Result<Vec<RecordBatch>> {
427        let (session_ctx, mut planner, prop_manager_arc) =
428            self.create_datafusion_planner(prop_manager, params).await?;
429
430        // Build MutationContext when the plan contains write operations
431        if Self::contains_write_operations(&plan) {
432            let writer = self
433                .writer
434                .as_ref()
435                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
436                .clone();
437            let query_ctx = self.get_context().await;
438
439            debug_assert!(
440                query_ctx.is_some(),
441                "BUG: query_ctx is None for write operation"
442            );
443
444            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
445                executor: self.clone(),
446                writer,
447                prop_manager: prop_manager_arc,
448                params: params.clone(),
449                query_ctx,
450            });
451            planner = planner.with_mutation_context(mutation_ctx);
452            tracing::debug!(
453                plan_type = Self::get_plan_type(&plan),
454                "Mutation routed to DataFusion engine"
455            );
456        }
457
458        let execution_plan = planner.plan(&plan)?;
459        Self::collect_batches(&session_ctx, execution_plan).await
460    }
461
462    /// Execute a MERGE read sub-plan through the DataFusion engine.
463    ///
464    /// Plans and executes the MERGE pattern match using flat columnar output
465    /// (no structural projections), then groups dotted columns (`a._vid`,
466    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
467    pub(crate) async fn execute_merge_read_plan(
468        &self,
469        plan: LogicalPlan,
470        prop_manager: &PropertyManager,
471        params: &HashMap<String, Value>,
472        merge_variables: Vec<String>,
473    ) -> Result<Vec<HashMap<String, Value>>> {
474        let (session_ctx, planner, _prop_manager_arc) =
475            self.create_datafusion_planner(prop_manager, params).await?;
476
477        // Plan with full property access ("*") for all merge variables so that
478        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
479        let extra: HashMap<String, HashSet<String>> = merge_variables
480            .iter()
481            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
482            .collect();
483        let execution_plan = planner.plan_with_properties(&plan, extra)?;
484        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
485
486        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
487        let flat_rows = self.record_batches_to_rows(all_batches)?;
488
489        // Group dotted columns into per-variable Maps for MERGE's match logic.
490        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
491        let rows = flat_rows
492            .into_iter()
493            .map(|mut row| {
494                for var in &merge_variables {
495                    // Skip if already materialized (e.g., by record_batches_to_rows)
496                    if row.contains_key(var) {
497                        continue;
498                    }
499                    let prefix = format!("{}.", var);
500                    let dotted_keys: Vec<String> = row
501                        .keys()
502                        .filter(|k| k.starts_with(&prefix))
503                        .cloned()
504                        .collect();
505                    if !dotted_keys.is_empty() {
506                        let mut map = HashMap::new();
507                        for key in dotted_keys {
508                            let prop_name = key[prefix.len()..].to_string();
509                            if let Some(val) = row.remove(&key) {
510                                map.insert(prop_name, val);
511                            }
512                        }
513                        row.insert(var.clone(), Value::Map(map));
514                    }
515                }
516                row
517            })
518            .collect();
519
520        Ok(rows)
521    }
522
523    /// Converts DataFusion RecordBatches to row-based HashMap format.
524    ///
525    /// Handles special metadata on fields:
526    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
527    ///
528    /// Also normalizes path structures to user-facing format (converts _vid to _id).
529    fn record_batches_to_rows(
530        &self,
531        batches: Vec<RecordBatch>,
532    ) -> Result<Vec<HashMap<String, Value>>> {
533        let mut rows = Vec::new();
534
535        for batch in batches {
536            let num_rows = batch.num_rows();
537            let schema = batch.schema();
538
539            for row_idx in 0..num_rows {
540                let mut row = HashMap::new();
541
542                for (col_idx, field) in schema.fields().iter().enumerate() {
543                    let column = batch.column(col_idx);
544                    // Infer Uni DataType from Arrow type for DateTime/Time struct decoding
545                    let data_type =
546                        if uni_common::core::schema::is_datetime_struct(field.data_type()) {
547                            Some(&uni_common::DataType::DateTime)
548                        } else if uni_common::core::schema::is_time_struct(field.data_type()) {
549                            Some(&uni_common::DataType::Time)
550                        } else {
551                            None
552                        };
553                    let mut value =
554                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
555
556                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
557                    // Parse JSON string to restore the original type
558                    if field
559                        .metadata()
560                        .get("cv_encoded")
561                        .is_some_and(|v| v == "true")
562                        && let Value::String(s) = &value
563                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
564                    {
565                        value = Value::from(parsed);
566                    }
567
568                    // Normalize path structures to user-facing format
569                    value = Self::normalize_path_if_needed(value);
570
571                    row.insert(field.name().clone(), value);
572                }
573
574                // Merge system fields into bare variable maps.
575                // The projection step emits helper columns like "n._vid" and "n._labels"
576                // alongside the materialized "n" column (a Map of user properties).
577                // Here we merge those system fields into the map and remove the helpers.
578                //
579                // For search procedures (vector/FTS), the bare variable may be a VID
580                // string placeholder rather than a Map. In that case, promote it to a
581                // Map so we can merge system fields and properties into it.
582                let bare_vars: Vec<String> = row
583                    .keys()
584                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
585                    .cloned()
586                    .collect();
587
588                // Detect VID-placeholder variables that have system columns
589                // (e.g., "node" is String("1") but "node._vid" exists).
590                // Promote these to Maps so system fields can be merged in.
591                let vid_placeholder_vars: Vec<String> = row
592                    .keys()
593                    .filter(|k| {
594                        !k.contains('.')
595                            && matches!(row.get(*k), Some(Value::String(_)))
596                            && row.contains_key(&format!("{}._vid", k))
597                    })
598                    .cloned()
599                    .collect();
600
601                for var in &vid_placeholder_vars {
602                    // Build a Map from system and property columns
603                    let prefix = format!("{}.", var);
604                    let mut map = HashMap::new();
605
606                    let dotted_keys: Vec<String> = row
607                        .keys()
608                        .filter(|k| k.starts_with(&prefix))
609                        .cloned()
610                        .collect();
611
612                    for key in &dotted_keys {
613                        let prop_name = &key[prefix.len()..];
614                        if let Some(val) = row.remove(key) {
615                            map.insert(prop_name.to_string(), val);
616                        }
617                    }
618
619                    // Replace the VID-string placeholder with the constructed Map
620                    row.insert(var.clone(), Value::Map(map));
621                }
622
623                for var in &bare_vars {
624                    // Merge node system fields (_vid, _labels)
625                    let vid_key = format!("{}._vid", var);
626                    let labels_key = format!("{}._labels", var);
627
628                    let vid_val = row.remove(&vid_key);
629                    let labels_val = row.remove(&labels_key);
630
631                    if let Some(Value::Map(map)) = row.get_mut(var) {
632                        if let Some(v) = vid_val {
633                            map.insert("_vid".to_string(), v);
634                        }
635                        if let Some(v) = labels_val {
636                            map.insert("_labels".to_string(), v);
637                        }
638                    }
639
640                    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
641                    // These are emitted as helper columns by the traverse exec.
642                    // The structural projection already includes them in the struct,
643                    // but we still need to remove the dotted helper columns.
644                    let eid_key = format!("{}._eid", var);
645                    let type_key = format!("{}._type", var);
646
647                    let eid_val = row.remove(&eid_key);
648                    let type_val = row.remove(&type_key);
649
650                    if (eid_val.is_some() || type_val.is_some())
651                        && let Some(Value::Map(map)) = row.get_mut(var)
652                    {
653                        if let Some(v) = eid_val {
654                            map.entry("_eid".to_string()).or_insert(v);
655                        }
656                        if let Some(v) = type_val {
657                            map.entry("_type".to_string()).or_insert(v);
658                        }
659                    }
660
661                    // Remove remaining dotted helper columns (e.g. _all_props, _src_vid, _dst_vid)
662                    let prefix = format!("{}.", var);
663                    let helper_keys: Vec<String> = row
664                        .keys()
665                        .filter(|k| k.starts_with(&prefix))
666                        .cloned()
667                        .collect();
668                    for key in helper_keys {
669                        row.remove(&key);
670                    }
671                }
672
673                rows.push(row);
674            }
675        }
676
677        Ok(rows)
678    }
679
680    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
681    ///
682    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
683    /// Other values are returned unchanged to avoid interfering with query execution.
684    fn normalize_path_if_needed(value: Value) -> Value {
685        match value {
686            Value::Map(map)
687                if map.contains_key("nodes")
688                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
689            {
690                Self::normalize_path_map(map)
691            }
692            other => other,
693        }
694    }
695
696    /// Normalize a path map object.
697    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
698        // Normalize nodes array
699        if let Some(Value::List(nodes)) = map.remove("nodes") {
700            let normalized_nodes: Vec<Value> = nodes
701                .into_iter()
702                .map(|n| {
703                    if let Value::Map(node_map) = n {
704                        Self::normalize_path_node_map(node_map)
705                    } else {
706                        n
707                    }
708                })
709                .collect();
710            map.insert("nodes".to_string(), Value::List(normalized_nodes));
711        }
712
713        // Normalize relationships array (may be called "relationships" or "edges")
714        let rels_key = if map.contains_key("relationships") {
715            "relationships"
716        } else {
717            "edges"
718        };
719        if let Some(Value::List(rels)) = map.remove(rels_key) {
720            let normalized_rels: Vec<Value> = rels
721                .into_iter()
722                .map(|r| {
723                    if let Value::Map(rel_map) = r {
724                        Self::normalize_path_edge_map(rel_map)
725                    } else {
726                        r
727                    }
728                })
729                .collect();
730            map.insert("relationships".to_string(), Value::List(normalized_rels));
731        }
732
733        Value::Map(map)
734    }
735
736    /// Convert a Value to its string representation for path normalization.
737    fn value_to_id_string(val: Value) -> String {
738        match val {
739            Value::Int(n) => n.to_string(),
740            Value::Float(n) => n.to_string(),
741            Value::String(s) => s,
742            other => other.to_string(),
743        }
744    }
745
746    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
747    /// When `src_key == dst_key`, this simply stringifies the value in place.
748    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
749        if let Some(val) = map.remove(src_key) {
750            map.insert(
751                dst_key.to_string(),
752                Value::String(Self::value_to_id_string(val)),
753            );
754        }
755    }
756
757    /// Ensure the "properties" field is a non-null map.
758    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
759        match map.get("properties") {
760            Some(props) if !props.is_null() => {}
761            _ => {
762                map.insert("properties".to_string(), Value::Map(HashMap::new()));
763            }
764        }
765    }
766
767    /// Normalize a node within a path to user-facing format.
768    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
769        Self::stringify_map_field(&mut map, "_vid", "_id");
770        Self::ensure_properties_map(&mut map);
771        Value::Map(map)
772    }
773
774    /// Normalize an edge within a path to user-facing format.
775    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
776        Self::stringify_map_field(&mut map, "_eid", "_id");
777        Self::stringify_map_field(&mut map, "_src", "_src");
778        Self::stringify_map_field(&mut map, "_dst", "_dst");
779
780        if let Some(type_name) = map.remove("_type_name") {
781            map.insert("_type".to_string(), type_name);
782        }
783
784        Self::ensure_properties_map(&mut map);
785        Value::Map(map)
786    }
787
788    #[instrument(
789        skip(self, prop_manager, params),
790        fields(rows_returned, duration_ms),
791        level = "info"
792    )]
793    pub fn execute<'a>(
794        &'a self,
795        plan: LogicalPlan,
796        prop_manager: &'a PropertyManager,
797        params: &'a HashMap<String, Value>,
798    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
799        Box::pin(async move {
800            let query_type = Self::get_plan_type(&plan);
801            let ctx = self.get_context().await;
802            let start = Instant::now();
803
804            // Route DDL/Admin queries to the fallback executor.
805            // All other queries (reads, mutations, window functions) flow through DataFusion.
806            let res = if Self::is_ddl_or_admin(&plan) {
807                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
808                    .await
809            } else {
810                let batches = self
811                    .execute_datafusion(plan.clone(), prop_manager, params)
812                    .await?;
813                self.record_batches_to_rows(batches)
814            };
815
816            let duration = start.elapsed();
817            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
818                .record(duration.as_secs_f64());
819
820            tracing::Span::current().record("duration_ms", duration.as_millis());
821            match &res {
822                Ok(rows) => {
823                    tracing::Span::current().record("rows_returned", rows.len());
824                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
825                        .increment(rows.len() as u64);
826                }
827                Err(e) => {
828                    let error_type = if e.to_string().contains("timed out") {
829                        "timeout"
830                    } else if e.to_string().contains("syntax") {
831                        "syntax"
832                    } else {
833                        "execution"
834                    };
835                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
836                }
837            }
838
839            res
840        })
841    }
842
843    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
844        match plan {
845            LogicalPlan::Scan { .. } => "read_scan",
846            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
847            LogicalPlan::Traverse { .. } => "read_traverse",
848            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
849            LogicalPlan::ScanAll { .. } => "read_scan_all",
850            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
851            LogicalPlan::VectorKnn { .. } => "read_vector",
852            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
853            LogicalPlan::Merge { .. } => "write_merge",
854            LogicalPlan::Delete { .. } => "write_delete",
855            LogicalPlan::Set { .. } => "write_set",
856            LogicalPlan::Remove { .. } => "write_remove",
857            LogicalPlan::ProcedureCall { .. } => "call",
858            LogicalPlan::Copy { .. } => "copy",
859            LogicalPlan::Backup { .. } => "backup",
860            _ => "other",
861        }
862    }
863
864    /// Return all direct child plan references from a `LogicalPlan`.
865    ///
866    /// This centralizes the variant→children mapping so that recursive walkers
867    /// (e.g., `contains_write_operations`) can delegate the
868    /// "recurse into children" logic instead of duplicating the match arms.
869    ///
870    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
871    /// is not included because it requires special iteration. Callers that
872    /// need to inspect the body should handle `Foreach` before falling through.
873    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
874        match plan {
875            // Single-input wrappers
876            LogicalPlan::Project { input, .. }
877            | LogicalPlan::Sort { input, .. }
878            | LogicalPlan::Limit { input, .. }
879            | LogicalPlan::Distinct { input }
880            | LogicalPlan::Aggregate { input, .. }
881            | LogicalPlan::Window { input, .. }
882            | LogicalPlan::Unwind { input, .. }
883            | LogicalPlan::Filter { input, .. }
884            | LogicalPlan::Create { input, .. }
885            | LogicalPlan::CreateBatch { input, .. }
886            | LogicalPlan::Set { input, .. }
887            | LogicalPlan::Remove { input, .. }
888            | LogicalPlan::Delete { input, .. }
889            | LogicalPlan::Merge { input, .. }
890            | LogicalPlan::Foreach { input, .. }
891            | LogicalPlan::Traverse { input, .. }
892            | LogicalPlan::TraverseMainByType { input, .. }
893            | LogicalPlan::BindZeroLengthPath { input, .. }
894            | LogicalPlan::BindPath { input, .. }
895            | LogicalPlan::ShortestPath { input, .. }
896            | LogicalPlan::AllShortestPaths { input, .. }
897            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
898
899            // Two-input wrappers
900            LogicalPlan::Apply {
901                input, subquery, ..
902            }
903            | LogicalPlan::SubqueryCall { input, subquery } => {
904                vec![input.as_ref(), subquery.as_ref()]
905            }
906            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
907                vec![left.as_ref(), right.as_ref()]
908            }
909            LogicalPlan::RecursiveCTE {
910                initial, recursive, ..
911            } => vec![initial.as_ref(), recursive.as_ref()],
912            LogicalPlan::QuantifiedPattern {
913                input,
914                pattern_plan,
915                ..
916            } => vec![input.as_ref(), pattern_plan.as_ref()],
917
918            // Leaf nodes (scans, DDL, admin, etc.)
919            _ => vec![],
920        }
921    }
922
923    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
924    ///
925    /// These operations don't produce data streams and aren't supported by the
926    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
927    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
928    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
929    fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
930        match plan {
931            // DDL / schema operations
932            LogicalPlan::CreateLabel(_)
933            | LogicalPlan::CreateEdgeType(_)
934            | LogicalPlan::AlterLabel(_)
935            | LogicalPlan::AlterEdgeType(_)
936            | LogicalPlan::DropLabel(_)
937            | LogicalPlan::DropEdgeType(_)
938            | LogicalPlan::CreateConstraint(_)
939            | LogicalPlan::DropConstraint(_)
940            | LogicalPlan::ShowConstraints(_) => true,
941
942            // Index operations
943            LogicalPlan::CreateVectorIndex { .. }
944            | LogicalPlan::CreateFullTextIndex { .. }
945            | LogicalPlan::CreateScalarIndex { .. }
946            | LogicalPlan::CreateJsonFtsIndex { .. }
947            | LogicalPlan::DropIndex { .. }
948            | LogicalPlan::ShowIndexes { .. } => true,
949
950            // Admin / utility operations
951            LogicalPlan::ShowDatabase
952            | LogicalPlan::ShowConfig
953            | LogicalPlan::ShowStatistics
954            | LogicalPlan::Vacuum
955            | LogicalPlan::Checkpoint
956            | LogicalPlan::Begin
957            | LogicalPlan::Commit
958            | LogicalPlan::Rollback
959            | LogicalPlan::Copy { .. }
960            | LogicalPlan::CopyTo { .. }
961            | LogicalPlan::CopyFrom { .. }
962            | LogicalPlan::Backup { .. }
963            | LogicalPlan::Explain { .. } => true,
964
965            // Procedure calls: DF-eligible procedures go through DataFusion,
966            // everything else (DDL, admin, unknown) stays on fallback.
967            LogicalPlan::ProcedureCall { procedure_name, .. } => {
968                !Self::is_df_eligible_procedure(procedure_name)
969            }
970
971            // Recurse through single-input read wrapper nodes
972            LogicalPlan::Project { input, .. }
973            | LogicalPlan::Sort { input, .. }
974            | LogicalPlan::Limit { input, .. }
975            | LogicalPlan::Distinct { input }
976            | LogicalPlan::Aggregate { input, .. }
977            | LogicalPlan::Window { input, .. }
978            | LogicalPlan::Unwind { input, .. }
979            | LogicalPlan::Filter { input, .. }
980            | LogicalPlan::BindZeroLengthPath { input, .. }
981            | LogicalPlan::BindPath { input, .. }
982            | LogicalPlan::ShortestPath { input, .. }
983            | LogicalPlan::AllShortestPaths { input, .. } => Self::is_ddl_or_admin(input),
984
985            // Recurse through two-input wrapper nodes
986            LogicalPlan::Apply {
987                input, subquery, ..
988            }
989            | LogicalPlan::SubqueryCall { input, subquery } => {
990                Self::is_ddl_or_admin(input) || Self::is_ddl_or_admin(subquery)
991            }
992            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
993                Self::is_ddl_or_admin(left) || Self::is_ddl_or_admin(right)
994            }
995            LogicalPlan::RecursiveCTE {
996                initial, recursive, ..
997            } => Self::is_ddl_or_admin(initial) || Self::is_ddl_or_admin(recursive),
998            LogicalPlan::QuantifiedPattern {
999                input,
1000                pattern_plan,
1001                ..
1002            } => Self::is_ddl_or_admin(input) || Self::is_ddl_or_admin(pattern_plan),
1003
1004            _ => false,
1005        }
1006    }
1007
1008    /// Returns `true` if the procedure is a read-only, data-producing procedure
1009    /// that can be executed through the DataFusion engine.
1010    ///
1011    /// This is a **positive allowlist** — unknown procedures default to the
1012    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
1013    fn is_df_eligible_procedure(name: &str) -> bool {
1014        matches!(
1015            name,
1016            "uni.schema.labels"
1017                | "uni.schema.edgeTypes"
1018                | "uni.schema.relationshipTypes"
1019                | "uni.schema.indexes"
1020                | "uni.schema.constraints"
1021                | "uni.schema.labelInfo"
1022                | "uni.vector.query"
1023                | "uni.fts.query"
1024                | "uni.search"
1025        ) || name.starts_with("uni.algo.")
1026    }
1027
1028    /// Check if a plan contains write/mutation operations anywhere in the tree.
1029    ///
1030    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
1031    /// are used to determine when a MutationContext needs to be built for DataFusion.
1032    /// This recurses through read-only wrapper nodes to detect writes nested inside
1033    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
1034    fn contains_write_operations(plan: &LogicalPlan) -> bool {
1035        match plan {
1036            LogicalPlan::Create { .. }
1037            | LogicalPlan::CreateBatch { .. }
1038            | LogicalPlan::Merge { .. }
1039            | LogicalPlan::Delete { .. }
1040            | LogicalPlan::Set { .. }
1041            | LogicalPlan::Remove { .. }
1042            | LogicalPlan::Foreach { .. } => true,
1043            _ => Self::plan_children(plan)
1044                .iter()
1045                .any(|child| Self::contains_write_operations(child)),
1046        }
1047    }
1048
1049    /// Executes a query as a stream of result batches.
1050    ///
1051    /// Routes DDL/Admin through the fallback executor and everything else
1052    /// through DataFusion.
1053    pub fn execute_stream(
1054        self,
1055        plan: LogicalPlan,
1056        prop_manager: Arc<PropertyManager>,
1057        params: HashMap<String, Value>,
1058    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1059        let this = self;
1060        let this_for_ctx = this.clone();
1061
1062        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1063
1064        ctx_stream
1065            .flat_map(move |ctx| {
1066                let plan = plan.clone();
1067                let this = this.clone();
1068                let prop_manager = prop_manager.clone();
1069                let params = params.clone();
1070
1071                let fut = async move {
1072                    if Self::is_ddl_or_admin(&plan) {
1073                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1074                            .await
1075                    } else {
1076                        let batches = this
1077                            .execute_datafusion(plan, &prop_manager, &params)
1078                            .await?;
1079                        this.record_batches_to_rows(batches)
1080                    }
1081                };
1082                stream::once(fut).boxed()
1083            })
1084            .boxed()
1085    }
1086
1087    /// Converts an Arrow array element at a given row index to a Value.
1088    /// Delegates to the shared implementation in arrow_convert module.
1089    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1090        arrow_convert::arrow_to_value(col, row, None)
1091    }
1092
1093    pub(crate) fn evaluate_expr<'a>(
1094        &'a self,
1095        expr: &'a Expr,
1096        row: &'a HashMap<String, Value>,
1097        prop_manager: &'a PropertyManager,
1098        params: &'a HashMap<String, Value>,
1099        ctx: Option<&'a QueryContext>,
1100    ) -> BoxFuture<'a, Result<Value>> {
1101        let this = self;
1102        Box::pin(async move {
1103            // First check if the expression itself is already pre-computed in the row
1104            let repr = expr.to_string_repr();
1105            if let Some(val) = row.get(&repr) {
1106                return Ok(val.clone());
1107            }
1108
1109            match expr {
1110                Expr::PatternComprehension { .. } => {
1111                    // Handled by DataFusion path via PatternComprehensionExecExpr
1112                    Err(anyhow::anyhow!(
1113                        "Pattern comprehensions are handled by DataFusion executor"
1114                    ))
1115                }
1116                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1117                    "COLLECT subqueries not yet supported in executor"
1118                )),
1119                Expr::Variable(name) => {
1120                    if let Some(val) = row.get(name) {
1121                        Ok(val.clone())
1122                    } else {
1123                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1124                    }
1125                }
1126                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1127                Expr::Property(var_expr, prop_name) => {
1128                    let base_val = this
1129                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1130                        .await?;
1131
1132                    // Handle system properties _vid and _id directly
1133                    if (prop_name == "_vid" || prop_name == "_id")
1134                        && let Ok(vid) = Self::vid_from_value(&base_val)
1135                    {
1136                        return Ok(Value::Int(vid.as_u64() as i64));
1137                    }
1138
1139                    // Handle Value::Node - access properties directly or via prop manager
1140                    if let Value::Node(node) = &base_val {
1141                        // Handle system properties
1142                        if prop_name == "_vid" || prop_name == "_id" {
1143                            return Ok(Value::Int(node.vid.as_u64() as i64));
1144                        }
1145                        if prop_name == "_labels" {
1146                            return Ok(Value::List(
1147                                node.labels
1148                                    .iter()
1149                                    .map(|l| Value::String(l.clone()))
1150                                    .collect(),
1151                            ));
1152                        }
1153                        // Check in-memory properties first
1154                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1155                            return Ok(val.clone());
1156                        }
1157                        // Fallback to storage lookup
1158                        if let Ok(val) = prop_manager
1159                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1160                            .await
1161                        {
1162                            return Ok(val);
1163                        }
1164                        return Ok(Value::Null);
1165                    }
1166
1167                    // Handle Value::Edge - access properties directly or via prop manager
1168                    if let Value::Edge(edge) = &base_val {
1169                        // Handle system properties
1170                        if prop_name == "_eid" || prop_name == "_id" {
1171                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1172                        }
1173                        if prop_name == "_type" {
1174                            return Ok(Value::String(edge.edge_type.clone()));
1175                        }
1176                        if prop_name == "_src" {
1177                            return Ok(Value::Int(edge.src.as_u64() as i64));
1178                        }
1179                        if prop_name == "_dst" {
1180                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1181                        }
1182                        // Check in-memory properties first
1183                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1184                            return Ok(val.clone());
1185                        }
1186                        // Fallback to storage lookup
1187                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1188                        {
1189                            return Ok(val);
1190                        }
1191                        return Ok(Value::Null);
1192                    }
1193
1194                    // If base_val is an object (node/edge), check its properties first
1195                    // This handles properties from CREATE/MERGE that may not be persisted yet
1196                    if let Value::Map(map) = &base_val {
1197                        // First check top-level (for system properties like _id, _label, etc.)
1198                        if let Some(val) = map.get(prop_name.as_str()) {
1199                            return Ok(val.clone());
1200                        }
1201                        // Then check inside "properties" object (for user properties)
1202                        if let Some(Value::Map(props)) = map.get("properties")
1203                            && let Some(val) = props.get(prop_name.as_str())
1204                        {
1205                            return Ok(val.clone());
1206                        }
1207                        // Fallback to storage lookup using _vid or _id
1208                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1209                            map.get("_id")
1210                                .and_then(|v| v.as_str())
1211                                .and_then(|s| s.parse::<u64>().ok())
1212                        });
1213                        if let Some(id) = vid_opt {
1214                            let vid = Vid::from(id);
1215                            if let Ok(val) = prop_manager
1216                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1217                                .await
1218                            {
1219                                return Ok(val);
1220                            }
1221                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1222                            let eid = uni_common::core::id::Eid::from(id);
1223                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1224                                return Ok(val);
1225                            }
1226                        }
1227                        return Ok(Value::Null);
1228                    }
1229
1230                    // If base_val is just a VID, fetch from property manager
1231                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1232                        return prop_manager
1233                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1234                            .await;
1235                    }
1236
1237                    if base_val.is_null() {
1238                        return Ok(Value::Null);
1239                    }
1240
1241                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1242                    {
1243                        use crate::query::datetime::{
1244                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1245                            is_duration_string, is_temporal_accessor, is_temporal_string,
1246                        };
1247
1248                        // Handle Value::Temporal directly (no string parsing needed)
1249                        if let Value::Temporal(tv) = &base_val {
1250                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1251                                if is_duration_accessor(prop_name) {
1252                                    // Convert to string for the existing accessor logic
1253                                    return eval_duration_accessor(
1254                                        &base_val.to_string(),
1255                                        prop_name,
1256                                    );
1257                                }
1258                            } else if is_temporal_accessor(prop_name) {
1259                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1260                            }
1261                        }
1262
1263                        // Handle Value::String temporal (backward compat)
1264                        if let Value::String(s) = &base_val {
1265                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1266                                return eval_temporal_accessor(s, prop_name);
1267                            }
1268                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1269                                return eval_duration_accessor(s, prop_name);
1270                            }
1271                        }
1272                    }
1273
1274                    Err(anyhow!(
1275                        "Cannot access property '{}' on {:?}",
1276                        prop_name,
1277                        base_val
1278                    ))
1279                }
1280                Expr::ArrayIndex {
1281                    array: arr_expr,
1282                    index: idx_expr,
1283                } => {
1284                    let arr_val = this
1285                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1286                        .await?;
1287                    let idx_val = this
1288                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1289                        .await?;
1290
1291                    if let Value::List(arr) = &arr_val {
1292                        // Handle signed indices (allow negative)
1293                        if let Some(i) = idx_val.as_i64() {
1294                            let idx = if i < 0 {
1295                                // Negative index: -1 = last element, -2 = second to last, etc.
1296                                let positive_idx = arr.len() as i64 + i;
1297                                if positive_idx < 0 {
1298                                    return Ok(Value::Null); // Out of bounds
1299                                }
1300                                positive_idx as usize
1301                            } else {
1302                                i as usize
1303                            };
1304                            if idx < arr.len() {
1305                                return Ok(arr[idx].clone());
1306                            }
1307                            return Ok(Value::Null);
1308                        } else if idx_val.is_null() {
1309                            return Ok(Value::Null);
1310                        } else {
1311                            return Err(anyhow::anyhow!(
1312                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1313                                idx_val
1314                            ));
1315                        }
1316                    }
1317                    if let Value::Map(map) = &arr_val {
1318                        if let Some(key) = idx_val.as_str() {
1319                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1320                        } else if !idx_val.is_null() {
1321                            return Err(anyhow::anyhow!(
1322                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1323                                idx_val
1324                            ));
1325                        }
1326                    }
1327                    // Handle bracket access on Node: n['name'] returns property
1328                    if let Value::Node(node) = &arr_val {
1329                        if let Some(key) = idx_val.as_str() {
1330                            // Check in-memory properties first
1331                            if let Some(val) = node.properties.get(key) {
1332                                return Ok(val.clone());
1333                            }
1334                            // Fallback to property manager
1335                            if let Ok(val) = prop_manager
1336                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1337                                .await
1338                            {
1339                                return Ok(val);
1340                            }
1341                            return Ok(Value::Null);
1342                        } else if !idx_val.is_null() {
1343                            return Err(anyhow::anyhow!(
1344                                "TypeError: Node index must be a string, got: {:?}",
1345                                idx_val
1346                            ));
1347                        }
1348                    }
1349                    // Handle bracket access on Edge: e['property'] returns property
1350                    if let Value::Edge(edge) = &arr_val {
1351                        if let Some(key) = idx_val.as_str() {
1352                            // Check in-memory properties first
1353                            if let Some(val) = edge.properties.get(key) {
1354                                return Ok(val.clone());
1355                            }
1356                            // Fallback to property manager
1357                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1358                                return Ok(val);
1359                            }
1360                            return Ok(Value::Null);
1361                        } else if !idx_val.is_null() {
1362                            return Err(anyhow::anyhow!(
1363                                "TypeError: Edge index must be a string, got: {:?}",
1364                                idx_val
1365                            ));
1366                        }
1367                    }
1368                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1369                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1370                        && let Some(key) = idx_val.as_str()
1371                    {
1372                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1373                        {
1374                            return Ok(val);
1375                        }
1376                        return Ok(Value::Null);
1377                    }
1378                    if arr_val.is_null() {
1379                        return Ok(Value::Null);
1380                    }
1381                    Err(anyhow!(
1382                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1383                        arr_val
1384                    ))
1385                }
1386                Expr::ArraySlice { array, start, end } => {
1387                    let arr_val = this
1388                        .evaluate_expr(array, row, prop_manager, params, ctx)
1389                        .await?;
1390
1391                    if let Value::List(arr) = &arr_val {
1392                        let len = arr.len();
1393
1394                        // Evaluate start index (default to 0), null → null result
1395                        let start_idx = if let Some(s) = start {
1396                            let v = this
1397                                .evaluate_expr(s, row, prop_manager, params, ctx)
1398                                .await?;
1399                            if v.is_null() {
1400                                return Ok(Value::Null);
1401                            }
1402                            let raw = v.as_i64().unwrap_or(0);
1403                            if raw < 0 {
1404                                (len as i64 + raw).max(0) as usize
1405                            } else {
1406                                (raw as usize).min(len)
1407                            }
1408                        } else {
1409                            0
1410                        };
1411
1412                        // Evaluate end index (default to length), null → null result
1413                        let end_idx = if let Some(e) = end {
1414                            let v = this
1415                                .evaluate_expr(e, row, prop_manager, params, ctx)
1416                                .await?;
1417                            if v.is_null() {
1418                                return Ok(Value::Null);
1419                            }
1420                            let raw = v.as_i64().unwrap_or(len as i64);
1421                            if raw < 0 {
1422                                (len as i64 + raw).max(0) as usize
1423                            } else {
1424                                (raw as usize).min(len)
1425                            }
1426                        } else {
1427                            len
1428                        };
1429
1430                        // Return sliced array
1431                        if start_idx >= end_idx {
1432                            return Ok(Value::List(vec![]));
1433                        }
1434                        let end_idx = end_idx.min(len);
1435                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1436                    }
1437
1438                    if arr_val.is_null() {
1439                        return Ok(Value::Null);
1440                    }
1441                    Err(anyhow!("Cannot slice {:?}", arr_val))
1442                }
1443                Expr::Literal(lit) => Ok(lit.to_value()),
1444                Expr::List(items) => {
1445                    let mut vals = Vec::new();
1446                    for item in items {
1447                        vals.push(
1448                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1449                                .await?,
1450                        );
1451                    }
1452                    Ok(Value::List(vals))
1453                }
1454                Expr::Map(items) => {
1455                    let mut map = HashMap::new();
1456                    for (key, value_expr) in items {
1457                        let val = this
1458                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1459                            .await?;
1460                        map.insert(key.clone(), val);
1461                    }
1462                    Ok(Value::Map(map))
1463                }
1464                Expr::Exists { query, .. } => {
1465                    // Plan and execute subquery; failures return false (pattern doesn't match)
1466                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1467                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1468
1469                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1470                        Ok(plan) => {
1471                            let mut sub_params = params.clone();
1472                            sub_params.extend(row.clone());
1473
1474                            match this.execute(plan, prop_manager, &sub_params).await {
1475                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1476                                Err(e) => {
1477                                    log::debug!("EXISTS subquery execution failed: {}", e);
1478                                    Ok(Value::Bool(false))
1479                                }
1480                            }
1481                        }
1482                        Err(e) => {
1483                            log::debug!("EXISTS subquery planning failed: {}", e);
1484                            Ok(Value::Bool(false))
1485                        }
1486                    }
1487                }
1488                Expr::CountSubquery(query) => {
1489                    // Similar to Exists but returns count
1490                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1491
1492                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1493
1494                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1495                        Ok(plan) => {
1496                            let mut sub_params = params.clone();
1497                            sub_params.extend(row.clone());
1498
1499                            match this.execute(plan, prop_manager, &sub_params).await {
1500                                Ok(results) => Ok(Value::from(results.len() as i64)),
1501                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1502                            }
1503                        }
1504                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1505                    }
1506                }
1507                Expr::Quantifier {
1508                    quantifier,
1509                    variable,
1510                    list,
1511                    predicate,
1512                } => {
1513                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1514                    //
1515                    // This is the primary execution path for quantifiers because DataFusion
1516                    // does not support lambda functions yet. Queries with quantifiers attempt
1517                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1518                    // to this fallback executor path.
1519                    //
1520                    // This is intentional design - we get correct semantics with row-by-row
1521                    // evaluation until DataFusion adds lambda support.
1522                    //
1523                    // See: https://github.com/apache/datafusion/issues/14205
1524
1525                    // Evaluate the list expression
1526                    let list_val = this
1527                        .evaluate_expr(list, row, prop_manager, params, ctx)
1528                        .await?;
1529
1530                    // Handle null propagation
1531                    if list_val.is_null() {
1532                        return Ok(Value::Null);
1533                    }
1534
1535                    // Convert to array
1536                    let items = match list_val {
1537                        Value::List(arr) => arr,
1538                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1539                    };
1540
1541                    // Evaluate predicate for each item
1542                    let mut satisfied_count = 0;
1543                    for item in &items {
1544                        // Create new row with bound variable
1545                        let mut item_row = row.clone();
1546                        item_row.insert(variable.clone(), item.clone());
1547
1548                        // Evaluate predicate with bound variable
1549                        let pred_result = this
1550                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1551                            .await?;
1552
1553                        // Check if predicate is satisfied
1554                        if let Value::Bool(true) = pred_result {
1555                            satisfied_count += 1;
1556                        }
1557                    }
1558
1559                    // Return based on quantifier type
1560                    let result = match quantifier {
1561                        Quantifier::All => satisfied_count == items.len(),
1562                        Quantifier::Any => satisfied_count > 0,
1563                        Quantifier::Single => satisfied_count == 1,
1564                        Quantifier::None => satisfied_count == 0,
1565                    };
1566
1567                    Ok(Value::Bool(result))
1568                }
1569                Expr::ListComprehension {
1570                    variable,
1571                    list,
1572                    where_clause,
1573                    map_expr,
1574                } => {
1575                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1576                    //
1577                    // Similar to quantifiers, this requires lambda-like evaluation
1578                    // which DataFusion doesn't support yet. This is the primary execution path.
1579
1580                    // Evaluate the list expression
1581                    let list_val = this
1582                        .evaluate_expr(list, row, prop_manager, params, ctx)
1583                        .await?;
1584
1585                    // Handle null propagation
1586                    if list_val.is_null() {
1587                        return Ok(Value::Null);
1588                    }
1589
1590                    // Convert to array
1591                    let items = match list_val {
1592                        Value::List(arr) => arr,
1593                        _ => {
1594                            return Err(anyhow!(
1595                                "List comprehension expects a list, got: {:?}",
1596                                list_val
1597                            ));
1598                        }
1599                    };
1600
1601                    // Collect mapped values
1602                    let mut results = Vec::new();
1603                    for item in &items {
1604                        // Create new row with bound variable
1605                        let mut item_row = row.clone();
1606                        item_row.insert(variable.clone(), item.clone());
1607
1608                        // Apply WHERE filter if present
1609                        if let Some(predicate) = where_clause {
1610                            let pred_result = this
1611                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1612                                .await?;
1613
1614                            // Skip items that don't match the filter
1615                            if !matches!(pred_result, Value::Bool(true)) {
1616                                continue;
1617                            }
1618                        }
1619
1620                        // Apply map expression
1621                        let mapped_val = this
1622                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1623                            .await?;
1624                        results.push(mapped_val);
1625                    }
1626
1627                    Ok(Value::List(results))
1628                }
1629                Expr::BinaryOp { left, op, right } => {
1630                    // Short-circuit evaluation for AND/OR
1631                    match op {
1632                        BinaryOp::And => {
1633                            let l_val = this
1634                                .evaluate_expr(left, row, prop_manager, params, ctx)
1635                                .await?;
1636                            // Short-circuit: if left is false, don't evaluate right
1637                            if let Some(false) = l_val.as_bool() {
1638                                return Ok(Value::Bool(false));
1639                            }
1640                            let r_val = this
1641                                .evaluate_expr(right, row, prop_manager, params, ctx)
1642                                .await?;
1643                            eval_binary_op(&l_val, op, &r_val)
1644                        }
1645                        BinaryOp::Or => {
1646                            let l_val = this
1647                                .evaluate_expr(left, row, prop_manager, params, ctx)
1648                                .await?;
1649                            // Short-circuit: if left is true, don't evaluate right
1650                            if let Some(true) = l_val.as_bool() {
1651                                return Ok(Value::Bool(true));
1652                            }
1653                            let r_val = this
1654                                .evaluate_expr(right, row, prop_manager, params, ctx)
1655                                .await?;
1656                            eval_binary_op(&l_val, op, &r_val)
1657                        }
1658                        _ => {
1659                            // For all other operators, evaluate both sides
1660                            let l_val = this
1661                                .evaluate_expr(left, row, prop_manager, params, ctx)
1662                                .await?;
1663                            let r_val = this
1664                                .evaluate_expr(right, row, prop_manager, params, ctx)
1665                                .await?;
1666                            eval_binary_op(&l_val, op, &r_val)
1667                        }
1668                    }
1669                }
1670                Expr::In { expr, list } => {
1671                    let l_val = this
1672                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1673                        .await?;
1674                    let r_val = this
1675                        .evaluate_expr(list, row, prop_manager, params, ctx)
1676                        .await?;
1677                    eval_in_op(&l_val, &r_val)
1678                }
1679                Expr::UnaryOp { op, expr } => {
1680                    let val = this
1681                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1682                        .await?;
1683                    match op {
1684                        UnaryOp::Not => {
1685                            // Three-valued logic: NOT null = null
1686                            match val.as_bool() {
1687                                Some(b) => Ok(Value::Bool(!b)),
1688                                None if val.is_null() => Ok(Value::Null),
1689                                None => Err(anyhow!(
1690                                    "InvalidArgumentType: NOT requires a boolean argument"
1691                                )),
1692                            }
1693                        }
1694                        UnaryOp::Neg => {
1695                            if let Some(i) = val.as_i64() {
1696                                Ok(Value::Int(-i))
1697                            } else if let Some(f) = val.as_f64() {
1698                                Ok(Value::Float(-f))
1699                            } else {
1700                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1701                            }
1702                        }
1703                    }
1704                }
1705                Expr::IsNull(expr) => {
1706                    let val = this
1707                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1708                        .await?;
1709                    Ok(Value::Bool(val.is_null()))
1710                }
1711                Expr::IsNotNull(expr) => {
1712                    let val = this
1713                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1714                        .await?;
1715                    Ok(Value::Bool(!val.is_null()))
1716                }
1717                Expr::IsUnique(_) => {
1718                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1719                    Err(anyhow!(
1720                        "IS UNIQUE can only be used in constraint definitions"
1721                    ))
1722                }
1723                Expr::Case {
1724                    expr,
1725                    when_then,
1726                    else_expr,
1727                } => {
1728                    if let Some(base_expr) = expr {
1729                        let base_val = this
1730                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1731                            .await?;
1732                        for (w, t) in when_then {
1733                            let w_val = this
1734                                .evaluate_expr(w, row, prop_manager, params, ctx)
1735                                .await?;
1736                            if base_val == w_val {
1737                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1738                            }
1739                        }
1740                    } else {
1741                        for (w, t) in when_then {
1742                            let w_val = this
1743                                .evaluate_expr(w, row, prop_manager, params, ctx)
1744                                .await?;
1745                            if w_val.as_bool() == Some(true) {
1746                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1747                            }
1748                        }
1749                    }
1750                    if let Some(e) = else_expr {
1751                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1752                    }
1753                    Ok(Value::Null)
1754                }
1755                Expr::Wildcard => Ok(Value::Null),
1756                Expr::FunctionCall { name, args, .. } => {
1757                    // Special case: id() returns VID for nodes and EID for relationships
1758                    if name.eq_ignore_ascii_case("ID") {
1759                        if args.len() != 1 {
1760                            return Err(anyhow!("id() requires exactly 1 argument"));
1761                        }
1762                        let val = this
1763                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1764                            .await?;
1765                        if let Value::Map(map) = &val {
1766                            // Check for _vid (vertex) first
1767                            if let Some(vid_val) = map.get("_vid") {
1768                                return Ok(vid_val.clone());
1769                            }
1770                            // Check for _eid (edge/relationship)
1771                            if let Some(eid_val) = map.get("_eid") {
1772                                return Ok(eid_val.clone());
1773                            }
1774                            // Check for _id (fallback)
1775                            if let Some(id_val) = map.get("_id") {
1776                                return Ok(id_val.clone());
1777                            }
1778                        }
1779                        return Ok(Value::Null);
1780                    }
1781
1782                    // Special case: elementId() returns string format "label_id:local_offset"
1783                    if name.eq_ignore_ascii_case("ELEMENTID") {
1784                        if args.len() != 1 {
1785                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1786                        }
1787                        let val = this
1788                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1789                            .await?;
1790                        if let Value::Map(map) = &val {
1791                            // Check for _vid (vertex) first
1792                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1793                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1794                                return Ok(Value::String(vid_val.to_string()));
1795                            }
1796                            // Check for _eid (edge/relationship)
1797                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1798                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1799                                return Ok(Value::String(eid_val.to_string()));
1800                            }
1801                        }
1802                        return Ok(Value::Null);
1803                    }
1804
1805                    // Special case: type() returns the relationship type name
1806                    if name.eq_ignore_ascii_case("TYPE") {
1807                        if args.len() != 1 {
1808                            return Err(anyhow!("type() requires exactly 1 argument"));
1809                        }
1810                        let val = this
1811                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1812                            .await?;
1813                        if let Value::Map(map) = &val
1814                            && let Some(type_val) = map.get("_type")
1815                        {
1816                            // Numeric _type is an edge type ID; string _type is already a name
1817                            if let Some(type_id) =
1818                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1819                            {
1820                                if let Some(name) = this
1821                                    .storage
1822                                    .schema_manager()
1823                                    .edge_type_name_by_id_unified(type_id)
1824                                {
1825                                    return Ok(Value::String(name));
1826                                }
1827                            } else if let Some(name) = type_val.as_str() {
1828                                return Ok(Value::String(name.to_string()));
1829                            }
1830                        }
1831                        return Ok(Value::Null);
1832                    }
1833
1834                    // Special case: labels() returns the labels of a node
1835                    if name.eq_ignore_ascii_case("LABELS") {
1836                        if args.len() != 1 {
1837                            return Err(anyhow!("labels() requires exactly 1 argument"));
1838                        }
1839                        let val = this
1840                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1841                            .await?;
1842                        if let Value::Map(map) = &val
1843                            && let Some(labels_val) = map.get("_labels")
1844                        {
1845                            return Ok(labels_val.clone());
1846                        }
1847                        return Ok(Value::Null);
1848                    }
1849
1850                    // Special case: properties() returns the properties map of a node/edge
1851                    if name.eq_ignore_ascii_case("PROPERTIES") {
1852                        if args.len() != 1 {
1853                            return Err(anyhow!("properties() requires exactly 1 argument"));
1854                        }
1855                        let val = this
1856                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1857                            .await?;
1858                        if let Value::Map(map) = &val {
1859                            // Filter out internal properties (those starting with _)
1860                            let mut props = HashMap::new();
1861                            for (k, v) in map.iter() {
1862                                if !k.starts_with('_') {
1863                                    props.insert(k.clone(), v.clone());
1864                                }
1865                            }
1866                            return Ok(Value::Map(props));
1867                        }
1868                        return Ok(Value::Null);
1869                    }
1870
1871                    // Special case: startNode() returns the start node of a relationship
1872                    if name.eq_ignore_ascii_case("STARTNODE") {
1873                        if args.len() != 1 {
1874                            return Err(anyhow!("startNode() requires exactly 1 argument"));
1875                        }
1876                        let val = this
1877                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1878                            .await?;
1879                        if let Value::Edge(edge) = &val {
1880                            return Ok(Self::find_node_by_vid(row, edge.src));
1881                        }
1882                        if let Value::Map(map) = &val {
1883                            if let Some(start_node) = map.get("_startNode") {
1884                                return Ok(start_node.clone());
1885                            }
1886                            if let Some(src_vid) = map.get("_src_vid") {
1887                                return Ok(Value::Map(HashMap::from([(
1888                                    "_vid".to_string(),
1889                                    src_vid.clone(),
1890                                )])));
1891                            }
1892                            // Resolve _src VID by looking up node in row
1893                            if let Some(src_id) = map.get("_src")
1894                                && let Some(u) = src_id.as_u64()
1895                            {
1896                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1897                            }
1898                        }
1899                        return Ok(Value::Null);
1900                    }
1901
1902                    // Special case: endNode() returns the end node of a relationship
1903                    if name.eq_ignore_ascii_case("ENDNODE") {
1904                        if args.len() != 1 {
1905                            return Err(anyhow!("endNode() requires exactly 1 argument"));
1906                        }
1907                        let val = this
1908                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1909                            .await?;
1910                        if let Value::Edge(edge) = &val {
1911                            return Ok(Self::find_node_by_vid(row, edge.dst));
1912                        }
1913                        if let Value::Map(map) = &val {
1914                            if let Some(end_node) = map.get("_endNode") {
1915                                return Ok(end_node.clone());
1916                            }
1917                            if let Some(dst_vid) = map.get("_dst_vid") {
1918                                return Ok(Value::Map(HashMap::from([(
1919                                    "_vid".to_string(),
1920                                    dst_vid.clone(),
1921                                )])));
1922                            }
1923                            // Resolve _dst VID by looking up node in row
1924                            if let Some(dst_id) = map.get("_dst")
1925                                && let Some(u) = dst_id.as_u64()
1926                            {
1927                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1928                            }
1929                        }
1930                        return Ok(Value::Null);
1931                    }
1932
1933                    // Special case: hasLabel() checks if a node has a specific label
1934                    // Used for WHERE n:Label predicates
1935                    if name.eq_ignore_ascii_case("HASLABEL") {
1936                        if args.len() != 2 {
1937                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1938                        }
1939                        let node_val = this
1940                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1941                            .await?;
1942                        let label_val = this
1943                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1944                            .await?;
1945
1946                        let label_to_check = label_val.as_str().ok_or_else(|| {
1947                            anyhow!("Second argument to hasLabel must be a string")
1948                        })?;
1949
1950                        let has_label = match &node_val {
1951                            // Handle proper Value::Node type (from result normalization)
1952                            Value::Map(map) if map.contains_key("_vid") => {
1953                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1954                                    labels_arr
1955                                        .iter()
1956                                        .any(|l| l.as_str() == Some(label_to_check))
1957                                } else {
1958                                    false
1959                                }
1960                            }
1961                            // Also handle legacy Object format
1962                            Value::Map(map) => {
1963                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1964                                    labels_arr
1965                                        .iter()
1966                                        .any(|l| l.as_str() == Some(label_to_check))
1967                                } else {
1968                                    false
1969                                }
1970                            }
1971                            _ => false,
1972                        };
1973                        return Ok(Value::Bool(has_label));
1974                    }
1975
1976                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
1977                    // These should be parsed as Expr::Quantifier instead.
1978                    if matches!(
1979                        name.to_uppercase().as_str(),
1980                        "ANY" | "ALL" | "NONE" | "SINGLE"
1981                    ) {
1982                        return Err(anyhow!(
1983                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1984                            name.to_lowercase()
1985                        ));
1986                    }
1987
1988                    // Special case: COALESCE needs short-circuit evaluation
1989                    if name.eq_ignore_ascii_case("COALESCE") {
1990                        for arg in args {
1991                            let val = this
1992                                .evaluate_expr(arg, row, prop_manager, params, ctx)
1993                                .await?;
1994                            if !val.is_null() {
1995                                return Ok(val);
1996                            }
1997                        }
1998                        return Ok(Value::Null);
1999                    }
2000
2001                    // Special case: vector_similarity has dedicated implementation
2002                    if name.eq_ignore_ascii_case("vector_similarity") {
2003                        if args.len() != 2 {
2004                            return Err(anyhow!("vector_similarity takes 2 arguments"));
2005                        }
2006                        let v1 = this
2007                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2008                            .await?;
2009                        let v2 = this
2010                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2011                            .await?;
2012                        return eval_vector_similarity(&v1, &v2);
2013                    }
2014
2015                    // Special case: uni.validAt handles node fetching
2016                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
2017                        || name.eq_ignore_ascii_case("uni.validAt")
2018                        || name.eq_ignore_ascii_case("validAt")
2019                    {
2020                        if args.len() != 4 {
2021                            return Err(anyhow!("validAt requires 4 arguments"));
2022                        }
2023                        let node_val = this
2024                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2025                            .await?;
2026                        let start_prop = this
2027                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2028                            .await?
2029                            .as_str()
2030                            .ok_or(anyhow!("start_prop must be string"))?
2031                            .to_string();
2032                        let end_prop = this
2033                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2034                            .await?
2035                            .as_str()
2036                            .ok_or(anyhow!("end_prop must be string"))?
2037                            .to_string();
2038                        let time_val = this
2039                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2040                            .await?;
2041
2042                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2043                            anyhow!("time argument must be a datetime value or string")
2044                        })?;
2045
2046                        // Fetch temporal property values - supports both vertices and edges
2047                        let valid_from_val: Option<Value> = if let Ok(vid) =
2048                            Self::vid_from_value(&node_val)
2049                        {
2050                            // Vertex case - VID string format
2051                            prop_manager
2052                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2053                                .await
2054                                .ok()
2055                        } else if let Value::Map(map) = &node_val {
2056                            // Check for embedded _vid or _eid in object
2057                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2058                                let vid = Vid::from(vid_val);
2059                                prop_manager
2060                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2061                                    .await
2062                                    .ok()
2063                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2064                                // Edge case
2065                                let eid = uni_common::core::id::Eid::from(eid_val);
2066                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2067                            } else {
2068                                // Inline object - property embedded directly
2069                                map.get(&start_prop).cloned()
2070                            }
2071                        } else {
2072                            return Ok(Value::Bool(false));
2073                        };
2074
2075                        let valid_from = match valid_from_val {
2076                            Some(ref v) => match value_to_datetime_utc(v) {
2077                                Some(dt) => dt,
2078                                None if v.is_null() => return Ok(Value::Bool(false)),
2079                                None => {
2080                                    return Err(anyhow!(
2081                                        "Property {} must be a datetime value or string",
2082                                        start_prop
2083                                    ));
2084                                }
2085                            },
2086                            None => return Ok(Value::Bool(false)),
2087                        };
2088
2089                        let valid_to_val: Option<Value> = if let Ok(vid) =
2090                            Self::vid_from_value(&node_val)
2091                        {
2092                            // Vertex case - VID string format
2093                            prop_manager
2094                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2095                                .await
2096                                .ok()
2097                        } else if let Value::Map(map) = &node_val {
2098                            // Check for embedded _vid or _eid in object
2099                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2100                                let vid = Vid::from(vid_val);
2101                                prop_manager
2102                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2103                                    .await
2104                                    .ok()
2105                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2106                                // Edge case
2107                                let eid = uni_common::core::id::Eid::from(eid_val);
2108                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2109                            } else {
2110                                // Inline object - property embedded directly
2111                                map.get(&end_prop).cloned()
2112                            }
2113                        } else {
2114                            return Ok(Value::Bool(false));
2115                        };
2116
2117                        let valid_to = match valid_to_val {
2118                            Some(ref v) => match value_to_datetime_utc(v) {
2119                                Some(dt) => Some(dt),
2120                                None if v.is_null() => None,
2121                                None => {
2122                                    return Err(anyhow!(
2123                                        "Property {} must be a datetime value or null",
2124                                        end_prop
2125                                    ));
2126                                }
2127                            },
2128                            None => None,
2129                        };
2130
2131                        let is_valid = valid_from <= query_time
2132                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2133                        return Ok(Value::Bool(is_valid));
2134                    }
2135
2136                    // For all other functions, evaluate arguments then call helper
2137                    let mut evaluated_args = Vec::with_capacity(args.len());
2138                    for arg in args {
2139                        let mut val = this
2140                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2141                            .await?;
2142
2143                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2144                        // Functions like validAt() need access to properties like valid_from/valid_to.
2145                        if let Value::Map(ref mut map) = val {
2146                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2147                        }
2148
2149                        evaluated_args.push(val);
2150                    }
2151                    eval_scalar_function(name, &evaluated_args)
2152                }
2153                Expr::Reduce {
2154                    accumulator,
2155                    init,
2156                    variable,
2157                    list,
2158                    expr,
2159                } => {
2160                    let mut acc = self
2161                        .evaluate_expr(init, row, prop_manager, params, ctx)
2162                        .await?;
2163                    let list_val = self
2164                        .evaluate_expr(list, row, prop_manager, params, ctx)
2165                        .await?;
2166
2167                    if let Value::List(items) = list_val {
2168                        for item in items {
2169                            // Create a temporary scope/row with accumulator and variable
2170                            // For simplicity in fallback executor, we can construct a new row map
2171                            // merging current row + new variables.
2172                            let mut scope = row.clone();
2173                            scope.insert(accumulator.clone(), acc.clone());
2174                            scope.insert(variable.clone(), item);
2175
2176                            acc = self
2177                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2178                                .await?;
2179                        }
2180                    } else {
2181                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2182                    }
2183                    Ok(acc)
2184                }
2185                Expr::ValidAt { .. } => {
2186                    // VALID_AT should have been transformed to a function call in the planner
2187                    Err(anyhow!(
2188                        "VALID_AT expression should have been transformed to function call in planner"
2189                    ))
2190                }
2191
2192                Expr::LabelCheck { expr, labels } => {
2193                    let val = this
2194                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2195                        .await?;
2196                    match &val {
2197                        Value::Null => Ok(Value::Null),
2198                        Value::Map(map) => {
2199                            // Check if this is an edge (has _eid) or node (has _vid)
2200                            let is_edge = map.contains_key("_eid")
2201                                || map.contains_key("_type_name")
2202                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2203
2204                            if is_edge {
2205                                // Edges have a single type
2206                                if labels.len() > 1 {
2207                                    return Ok(Value::Bool(false));
2208                                }
2209                                let label_to_check = &labels[0];
2210                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2211                                {
2212                                    t == label_to_check
2213                                } else if let Some(Value::String(t)) = map.get("_type") {
2214                                    t == label_to_check
2215                                } else {
2216                                    false
2217                                };
2218                                Ok(Value::Bool(has_type))
2219                            } else {
2220                                // Node: check all labels
2221                                let has_all = labels.iter().all(|label_to_check| {
2222                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2223                                        labels_arr
2224                                            .iter()
2225                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2226                                    } else {
2227                                        false
2228                                    }
2229                                });
2230                                Ok(Value::Bool(has_all))
2231                            }
2232                        }
2233                        _ => Ok(Value::Bool(false)),
2234                    }
2235                }
2236
2237                Expr::MapProjection { base, items } => {
2238                    let base_value = this
2239                        .evaluate_expr(base, row, prop_manager, params, ctx)
2240                        .await?;
2241
2242                    // Extract properties from the base object
2243                    let properties = match &base_value {
2244                        Value::Map(map) => map,
2245                        _ => {
2246                            return Err(anyhow!(
2247                                "Map projection requires object, got {:?}",
2248                                base_value
2249                            ));
2250                        }
2251                    };
2252
2253                    let mut result_map = HashMap::new();
2254
2255                    for item in items {
2256                        match item {
2257                            MapProjectionItem::Property(prop) => {
2258                                if let Some(value) = properties.get(prop.as_str()) {
2259                                    result_map.insert(prop.clone(), value.clone());
2260                                }
2261                            }
2262                            MapProjectionItem::AllProperties => {
2263                                // Include all properties except internal fields (those starting with _)
2264                                for (key, value) in properties.iter() {
2265                                    if !key.starts_with('_') {
2266                                        result_map.insert(key.clone(), value.clone());
2267                                    }
2268                                }
2269                            }
2270                            MapProjectionItem::LiteralEntry(key, expr) => {
2271                                let value = this
2272                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2273                                    .await?;
2274                                result_map.insert(key.clone(), value);
2275                            }
2276                            MapProjectionItem::Variable(var_name) => {
2277                                // Variable selector: include the value of the variable in the result
2278                                // e.g., person{.name, friend} includes the value of 'friend' variable
2279                                if let Some(value) = row.get(var_name.as_str()) {
2280                                    result_map.insert(var_name.clone(), value.clone());
2281                                }
2282                            }
2283                        }
2284                    }
2285
2286                    Ok(Value::Map(result_map))
2287                }
2288            }
2289        })
2290    }
2291
2292    pub(crate) fn execute_subplan<'a>(
2293        &'a self,
2294        plan: LogicalPlan,
2295        prop_manager: &'a PropertyManager,
2296        params: &'a HashMap<String, Value>,
2297        ctx: Option<&'a QueryContext>,
2298    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2299        Box::pin(async move {
2300            if let Some(ctx) = ctx {
2301                ctx.check_timeout()?;
2302            }
2303            match plan {
2304                LogicalPlan::Union { left, right, all } => {
2305                    self.execute_union(left, right, all, prop_manager, params, ctx)
2306                        .await
2307                }
2308                LogicalPlan::CreateVectorIndex {
2309                    config,
2310                    if_not_exists,
2311                } => {
2312                    if if_not_exists && self.index_exists_by_name(&config.name) {
2313                        return Ok(vec![]);
2314                    }
2315                    let idx_mgr = IndexManager::new(
2316                        self.storage.base_path(),
2317                        self.storage.schema_manager_arc(),
2318                        self.storage.lancedb_store_arc(),
2319                    );
2320                    idx_mgr.create_vector_index(config).await?;
2321                    Ok(vec![])
2322                }
2323                LogicalPlan::CreateFullTextIndex {
2324                    config,
2325                    if_not_exists,
2326                } => {
2327                    if if_not_exists && self.index_exists_by_name(&config.name) {
2328                        return Ok(vec![]);
2329                    }
2330                    let idx_mgr = IndexManager::new(
2331                        self.storage.base_path(),
2332                        self.storage.schema_manager_arc(),
2333                        self.storage.lancedb_store_arc(),
2334                    );
2335                    idx_mgr.create_fts_index(config).await?;
2336                    Ok(vec![])
2337                }
2338                LogicalPlan::CreateScalarIndex {
2339                    mut config,
2340                    if_not_exists,
2341                } => {
2342                    if if_not_exists && self.index_exists_by_name(&config.name) {
2343                        return Ok(vec![]);
2344                    }
2345
2346                    // Check for expression indexes - create generated columns
2347                    let mut modified_properties = Vec::new();
2348
2349                    for prop in &config.properties {
2350                        // Heuristic: if contains '(' and ')', it's an expression
2351                        if prop.contains('(') && prop.contains(')') {
2352                            let gen_col = SchemaManager::generated_column_name(prop);
2353
2354                            // Add generated property to schema
2355                            let sm = self.storage.schema_manager_arc();
2356                            if let Err(e) = sm.add_generated_property(
2357                                &config.label,
2358                                &gen_col,
2359                                DataType::String, // Default type for expressions
2360                                prop.clone(),
2361                            ) {
2362                                log::warn!("Failed to add generated property (might exist): {}", e);
2363                            }
2364
2365                            modified_properties.push(gen_col);
2366                        } else {
2367                            // Simple property - use as-is
2368                            modified_properties.push(prop.clone());
2369                        }
2370                    }
2371
2372                    config.properties = modified_properties;
2373
2374                    let idx_mgr = IndexManager::new(
2375                        self.storage.base_path(),
2376                        self.storage.schema_manager_arc(),
2377                        self.storage.lancedb_store_arc(),
2378                    );
2379                    idx_mgr.create_scalar_index(config).await?;
2380                    Ok(vec![])
2381                }
2382                LogicalPlan::CreateJsonFtsIndex {
2383                    config,
2384                    if_not_exists,
2385                } => {
2386                    if if_not_exists && self.index_exists_by_name(&config.name) {
2387                        return Ok(vec![]);
2388                    }
2389                    let idx_mgr = IndexManager::new(
2390                        self.storage.base_path(),
2391                        self.storage.schema_manager_arc(),
2392                        self.storage.lancedb_store_arc(),
2393                    );
2394                    idx_mgr.create_json_fts_index(config).await?;
2395                    Ok(vec![])
2396                }
2397                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2398                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2399                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2400                LogicalPlan::Vacuum => {
2401                    self.execute_vacuum().await?;
2402                    Ok(vec![])
2403                }
2404                LogicalPlan::Checkpoint => {
2405                    self.execute_checkpoint().await?;
2406                    Ok(vec![])
2407                }
2408                LogicalPlan::CopyTo {
2409                    label,
2410                    path,
2411                    format,
2412                    options,
2413                } => {
2414                    let count = self
2415                        .execute_copy_to(&label, &path, &format, &options)
2416                        .await?;
2417                    let mut result = HashMap::new();
2418                    result.insert("count".to_string(), Value::Int(count as i64));
2419                    Ok(vec![result])
2420                }
2421                LogicalPlan::CopyFrom {
2422                    label,
2423                    path,
2424                    format,
2425                    options,
2426                } => {
2427                    let count = self
2428                        .execute_copy_from(&label, &path, &format, &options)
2429                        .await?;
2430                    let mut result = HashMap::new();
2431                    result.insert("count".to_string(), Value::Int(count as i64));
2432                    Ok(vec![result])
2433                }
2434                LogicalPlan::CreateLabel(clause) => {
2435                    self.execute_create_label(clause).await?;
2436                    Ok(vec![])
2437                }
2438                LogicalPlan::CreateEdgeType(clause) => {
2439                    self.execute_create_edge_type(clause).await?;
2440                    Ok(vec![])
2441                }
2442                LogicalPlan::AlterLabel(clause) => {
2443                    self.execute_alter_label(clause).await?;
2444                    Ok(vec![])
2445                }
2446                LogicalPlan::AlterEdgeType(clause) => {
2447                    self.execute_alter_edge_type(clause).await?;
2448                    Ok(vec![])
2449                }
2450                LogicalPlan::DropLabel(clause) => {
2451                    self.execute_drop_label(clause).await?;
2452                    Ok(vec![])
2453                }
2454                LogicalPlan::DropEdgeType(clause) => {
2455                    self.execute_drop_edge_type(clause).await?;
2456                    Ok(vec![])
2457                }
2458                LogicalPlan::CreateConstraint(clause) => {
2459                    self.execute_create_constraint(clause).await?;
2460                    Ok(vec![])
2461                }
2462                LogicalPlan::DropConstraint(clause) => {
2463                    self.execute_drop_constraint(clause).await?;
2464                    Ok(vec![])
2465                }
2466                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2467                LogicalPlan::DropIndex { name, if_exists } => {
2468                    let idx_mgr = IndexManager::new(
2469                        self.storage.base_path(),
2470                        self.storage.schema_manager_arc(),
2471                        self.storage.lancedb_store_arc(),
2472                    );
2473                    match idx_mgr.drop_index(&name).await {
2474                        Ok(_) => Ok(vec![]),
2475                        Err(e) => {
2476                            if if_exists && e.to_string().contains("not found") {
2477                                Ok(vec![])
2478                            } else {
2479                                Err(e)
2480                            }
2481                        }
2482                    }
2483                }
2484                LogicalPlan::ShowIndexes { filter } => {
2485                    Ok(self.execute_show_indexes(filter.as_deref()))
2486                }
2487                LogicalPlan::Scan { .. } => {
2488                    unreachable!("Scan is handled by DataFusion engine")
2489                }
2490                LogicalPlan::ExtIdLookup { .. } => {
2491                    unreachable!("ExtIdLookup is handled by DataFusion engine")
2492                }
2493                LogicalPlan::ScanAll { .. } => {
2494                    unreachable!("ScanAll is handled by DataFusion engine")
2495                }
2496                LogicalPlan::ScanMainByLabels { .. } => {
2497                    unreachable!("ScanMainByLabels is handled by DataFusion engine")
2498                }
2499                LogicalPlan::Traverse { .. } => {
2500                    unreachable!("Traverse is handled by DataFusion engine")
2501                }
2502                LogicalPlan::TraverseMainByType { .. } => {
2503                    unreachable!("TraverseMainByType is handled by DataFusion engine")
2504                }
2505                LogicalPlan::Filter {
2506                    input,
2507                    predicate,
2508                    optional_variables,
2509                } => {
2510                    let input_matches = self
2511                        .execute_subplan(*input, prop_manager, params, ctx)
2512                        .await?;
2513
2514                    tracing::debug!(
2515                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2516                        predicate,
2517                        input_matches.len(),
2518                        optional_variables
2519                    );
2520
2521                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2522                    // Group rows by non-optional variables, apply filter, and ensure
2523                    // at least one row per group (with NULLs if filter removes all).
2524                    if !optional_variables.is_empty() {
2525                        // Helper to check if a key belongs to an optional variable.
2526                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2527                        let is_optional_key = |k: &str| -> bool {
2528                            optional_variables.contains(k)
2529                                || optional_variables
2530                                    .iter()
2531                                    .any(|var| k.starts_with(&format!("{}.", var)))
2532                        };
2533
2534                        // Helper to check if a key is internal (should not affect grouping)
2535                        let is_internal_key =
2536                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2537
2538                        // Compute the key (non-optional, non-internal variables) for grouping
2539                        let non_optional_vars: Vec<String> = input_matches
2540                            .first()
2541                            .map(|row| {
2542                                row.keys()
2543                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2544                                    .cloned()
2545                                    .collect()
2546                            })
2547                            .unwrap_or_default();
2548
2549                        // Group rows by their non-optional variable values
2550                        let mut groups: std::collections::HashMap<
2551                            Vec<u8>,
2552                            Vec<HashMap<String, Value>>,
2553                        > = std::collections::HashMap::new();
2554
2555                        for row in &input_matches {
2556                            // Create a key from non-optional variable values
2557                            let key: Vec<u8> = non_optional_vars
2558                                .iter()
2559                                .map(|var| {
2560                                    row.get(var).map(|v| format!("{:?}", v)).unwrap_or_default()
2561                                })
2562                                .collect::<Vec<_>>()
2563                                .join("|")
2564                                .into_bytes();
2565
2566                            groups.entry(key).or_default().push(row.clone());
2567                        }
2568
2569                        let mut filtered = Vec::new();
2570                        for (_key, group_rows) in groups {
2571                            let mut group_passed = Vec::new();
2572
2573                            for row in &group_rows {
2574                                // If optional variables are already NULL, preserve the row
2575                                let has_null_optional = optional_variables.iter().any(|var| {
2576                                    // Check both "var" and "var._vid" style keys
2577                                    let direct_null =
2578                                        matches!(row.get(var), Some(Value::Null) | None);
2579                                    let prefixed_null = row
2580                                        .keys()
2581                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2582                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2583                                    direct_null || prefixed_null
2584                                });
2585
2586                                if has_null_optional {
2587                                    group_passed.push(row.clone());
2588                                    continue;
2589                                }
2590
2591                                let res = self
2592                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2593                                    .await?;
2594
2595                                if res.as_bool().unwrap_or(false) {
2596                                    group_passed.push(row.clone());
2597                                }
2598                            }
2599
2600                            if group_passed.is_empty() {
2601                                // No rows passed - emit one row with NULLs for optional variables
2602                                // Use the first row's non-optional values as a template
2603                                if let Some(template) = group_rows.first() {
2604                                    let mut null_row = HashMap::new();
2605                                    for (k, v) in template {
2606                                        if is_optional_key(k) {
2607                                            null_row.insert(k.clone(), Value::Null);
2608                                        } else {
2609                                            null_row.insert(k.clone(), v.clone());
2610                                        }
2611                                    }
2612                                    filtered.push(null_row);
2613                                }
2614                            } else {
2615                                filtered.extend(group_passed);
2616                            }
2617                        }
2618
2619                        tracing::debug!(
2620                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2621                            input_matches.len(),
2622                            filtered.len()
2623                        );
2624
2625                        return Ok(filtered);
2626                    }
2627
2628                    // Standard filter for non-OPTIONAL MATCH
2629                    let mut filtered = Vec::new();
2630                    for row in input_matches.iter() {
2631                        let res = self
2632                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2633                            .await?;
2634
2635                        let passes = res.as_bool().unwrap_or(false);
2636
2637                        if passes {
2638                            filtered.push(row.clone());
2639                        }
2640                    }
2641
2642                    tracing::debug!(
2643                        "Filter: {} input rows -> {} output rows",
2644                        input_matches.len(),
2645                        filtered.len()
2646                    );
2647
2648                    Ok(filtered)
2649                }
2650                LogicalPlan::ProcedureCall {
2651                    procedure_name,
2652                    arguments,
2653                    yield_items,
2654                } => {
2655                    let yield_names: Vec<String> =
2656                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2657                    let results = self
2658                        .execute_procedure(
2659                            &procedure_name,
2660                            &arguments,
2661                            &yield_names,
2662                            prop_manager,
2663                            params,
2664                            ctx,
2665                        )
2666                        .await?;
2667
2668                    // Handle aliasing: collect all original values first, then
2669                    // build the aliased row in one pass. This avoids issues when
2670                    // an alias matches another yield item's original name (e.g.,
2671                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2672                    // clobber the original "b" before it is renamed to "d").
2673                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2674                    if !has_aliases {
2675                        // No aliases (includes YIELD * which produces empty yield_items) —
2676                        // pass through the procedure output rows unchanged.
2677                        Ok(results)
2678                    } else {
2679                        let mut aliased_results = Vec::with_capacity(results.len());
2680                        for row in results {
2681                            let mut new_row = HashMap::new();
2682                            for (name, alias) in &yield_items {
2683                                let col_name = alias.as_ref().unwrap_or(name);
2684                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2685                                new_row.insert(col_name.clone(), val);
2686                            }
2687                            aliased_results.push(new_row);
2688                        }
2689                        Ok(aliased_results)
2690                    }
2691                }
2692                LogicalPlan::VectorKnn { .. } => {
2693                    unreachable!("VectorKnn is handled by DataFusion engine")
2694                }
2695                LogicalPlan::InvertedIndexLookup { .. } => {
2696                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2697                }
2698                LogicalPlan::Sort { input, order_by } => {
2699                    let rows = self
2700                        .execute_subplan(*input, prop_manager, params, ctx)
2701                        .await?;
2702                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2703                        .await
2704                }
2705                LogicalPlan::Limit { input, skip, fetch } => {
2706                    let rows = self
2707                        .execute_subplan(*input, prop_manager, params, ctx)
2708                        .await?;
2709                    let skip = skip.unwrap_or(0);
2710                    let take = fetch.unwrap_or(usize::MAX);
2711                    Ok(rows.into_iter().skip(skip).take(take).collect())
2712                }
2713                LogicalPlan::Aggregate {
2714                    input,
2715                    group_by,
2716                    aggregates,
2717                } => {
2718                    let rows = self
2719                        .execute_subplan(*input, prop_manager, params, ctx)
2720                        .await?;
2721                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2722                        .await
2723                }
2724                LogicalPlan::Window {
2725                    input,
2726                    window_exprs,
2727                } => {
2728                    let rows = self
2729                        .execute_subplan(*input, prop_manager, params, ctx)
2730                        .await?;
2731                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2732                        .await
2733                }
2734                LogicalPlan::Project { input, projections } => {
2735                    let matches = self
2736                        .execute_subplan(*input, prop_manager, params, ctx)
2737                        .await?;
2738                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2739                        .await
2740                }
2741                LogicalPlan::Distinct { input } => {
2742                    let rows = self
2743                        .execute_subplan(*input, prop_manager, params, ctx)
2744                        .await?;
2745                    let mut seen = std::collections::HashSet::new();
2746                    let mut result = Vec::new();
2747                    for row in rows {
2748                        let key = Self::canonical_row_key(&row);
2749                        if seen.insert(key) {
2750                            result.push(row);
2751                        }
2752                    }
2753                    Ok(result)
2754                }
2755                LogicalPlan::Unwind {
2756                    input,
2757                    expr,
2758                    variable,
2759                } => {
2760                    let input_rows = self
2761                        .execute_subplan(*input, prop_manager, params, ctx)
2762                        .await?;
2763                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2764                        .await
2765                }
2766                LogicalPlan::Apply {
2767                    input,
2768                    subquery,
2769                    input_filter,
2770                } => {
2771                    let input_rows = self
2772                        .execute_subplan(*input, prop_manager, params, ctx)
2773                        .await?;
2774                    self.execute_apply(
2775                        input_rows,
2776                        &subquery,
2777                        input_filter.as_ref(),
2778                        prop_manager,
2779                        params,
2780                        ctx,
2781                    )
2782                    .await
2783                }
2784                LogicalPlan::SubqueryCall { input, subquery } => {
2785                    let input_rows = self
2786                        .execute_subplan(*input, prop_manager, params, ctx)
2787                        .await?;
2788                    // Execute subquery for each input row (correlated)
2789                    // No input_filter for CALL { }
2790                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2791                        .await
2792                }
2793                LogicalPlan::RecursiveCTE {
2794                    cte_name,
2795                    initial,
2796                    recursive,
2797                } => {
2798                    self.execute_recursive_cte(
2799                        &cte_name,
2800                        *initial,
2801                        *recursive,
2802                        prop_manager,
2803                        params,
2804                        ctx,
2805                    )
2806                    .await
2807                }
2808                LogicalPlan::CrossJoin { left, right } => {
2809                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2810                        .await
2811                }
2812                LogicalPlan::Set { .. }
2813                | LogicalPlan::Remove { .. }
2814                | LogicalPlan::Merge { .. }
2815                | LogicalPlan::Create { .. }
2816                | LogicalPlan::CreateBatch { .. } => {
2817                    unreachable!("mutations are handled by DataFusion engine")
2818                }
2819                LogicalPlan::Delete { .. } => {
2820                    unreachable!("mutations are handled by DataFusion engine")
2821                }
2822                LogicalPlan::Begin => {
2823                    if let Some(writer_lock) = &self.writer {
2824                        let mut writer = writer_lock.write().await;
2825                        writer.begin_transaction()?;
2826                    } else {
2827                        return Err(anyhow!("Transaction requires a Writer"));
2828                    }
2829                    Ok(vec![HashMap::new()])
2830                }
2831                LogicalPlan::Commit => {
2832                    if let Some(writer_lock) = &self.writer {
2833                        let mut writer = writer_lock.write().await;
2834                        writer.commit_transaction().await?;
2835                    } else {
2836                        return Err(anyhow!("Transaction requires a Writer"));
2837                    }
2838                    Ok(vec![HashMap::new()])
2839                }
2840                LogicalPlan::Rollback => {
2841                    if let Some(writer_lock) = &self.writer {
2842                        let mut writer = writer_lock.write().await;
2843                        writer.rollback_transaction()?;
2844                    } else {
2845                        return Err(anyhow!("Transaction requires a Writer"));
2846                    }
2847                    Ok(vec![HashMap::new()])
2848                }
2849                LogicalPlan::Copy {
2850                    target,
2851                    source,
2852                    is_export,
2853                    options,
2854                } => {
2855                    if is_export {
2856                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2857                            .await
2858                    } else {
2859                        self.execute_copy(&target, &source, &options, prop_manager)
2860                            .await
2861                    }
2862                }
2863                LogicalPlan::Backup {
2864                    destination,
2865                    options,
2866                } => self.execute_backup(&destination, &options).await,
2867                LogicalPlan::Explain { plan } => {
2868                    let plan_str = format!("{:#?}", plan);
2869                    let mut row = HashMap::new();
2870                    row.insert("plan".to_string(), Value::String(plan_str));
2871                    Ok(vec![row])
2872                }
2873                LogicalPlan::ShortestPath { .. } => {
2874                    unreachable!("ShortestPath is handled by DataFusion engine")
2875                }
2876                LogicalPlan::AllShortestPaths { .. } => {
2877                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2878                }
2879                LogicalPlan::Foreach { .. } => {
2880                    unreachable!("mutations are handled by DataFusion engine")
2881                }
2882                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2883                LogicalPlan::BindZeroLengthPath { .. } => {
2884                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2885                }
2886                LogicalPlan::BindPath { .. } => {
2887                    unreachable!("BindPath is handled by DataFusion engine")
2888                }
2889                LogicalPlan::QuantifiedPattern { .. } => {
2890                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
2891                }
2892                LogicalPlan::LocyProgram { .. }
2893                | LogicalPlan::LocyFold { .. }
2894                | LogicalPlan::LocyBestBy { .. }
2895                | LogicalPlan::LocyPriority { .. }
2896                | LogicalPlan::LocyDerivedScan { .. }
2897                | LogicalPlan::LocyProject { .. } => {
2898                    unreachable!("Locy operators are handled by DataFusion engine")
2899                }
2900            }
2901        })
2902    }
2903
2904    /// Execute a single plan from a FOREACH body with the given scope.
2905    ///
2906    /// Used by the DataFusion ForeachExec operator to delegate body clause
2907    /// execution back to the executor.
2908    pub(crate) async fn execute_foreach_body_plan(
2909        &self,
2910        plan: LogicalPlan,
2911        scope: &mut HashMap<String, Value>,
2912        writer: &mut uni_store::runtime::writer::Writer,
2913        prop_manager: &PropertyManager,
2914        params: &HashMap<String, Value>,
2915        ctx: Option<&QueryContext>,
2916    ) -> Result<()> {
2917        match plan {
2918            LogicalPlan::Set { items, .. } => {
2919                self.execute_set_items_locked(&items, scope, writer, prop_manager, params, ctx)
2920                    .await?;
2921            }
2922            LogicalPlan::Remove { items, .. } => {
2923                self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx)
2924                    .await?;
2925            }
2926            LogicalPlan::Delete { items, detach, .. } => {
2927                for expr in &items {
2928                    let val = self
2929                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
2930                        .await?;
2931                    self.execute_delete_item_locked(&val, detach, writer)
2932                        .await?;
2933                }
2934            }
2935            LogicalPlan::Create { pattern, .. } => {
2936                self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2937                    .await?;
2938            }
2939            LogicalPlan::CreateBatch { patterns, .. } => {
2940                for pattern in &patterns {
2941                    self.execute_create_pattern(pattern, scope, writer, prop_manager, params, ctx)
2942                        .await?;
2943                }
2944            }
2945            LogicalPlan::Merge {
2946                pattern,
2947                on_match: _,
2948                on_create,
2949                ..
2950            } => {
2951                self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2952                    .await?;
2953                if let Some(on_create_clause) = on_create {
2954                    self.execute_set_items_locked(
2955                        &on_create_clause.items,
2956                        scope,
2957                        writer,
2958                        prop_manager,
2959                        params,
2960                        ctx,
2961                    )
2962                    .await?;
2963                }
2964            }
2965            LogicalPlan::Foreach {
2966                variable,
2967                list,
2968                body,
2969                ..
2970            } => {
2971                let list_val = self
2972                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
2973                    .await?;
2974                let items = match list_val {
2975                    Value::List(arr) => arr,
2976                    Value::Null => return Ok(()),
2977                    _ => return Err(anyhow!("FOREACH requires a list")),
2978                };
2979                for item in items {
2980                    let mut nested_scope = scope.clone();
2981                    nested_scope.insert(variable.clone(), item);
2982                    for nested_plan in &body {
2983                        Box::pin(self.execute_foreach_body_plan(
2984                            nested_plan.clone(),
2985                            &mut nested_scope,
2986                            writer,
2987                            prop_manager,
2988                            params,
2989                            ctx,
2990                        ))
2991                        .await?;
2992                    }
2993                }
2994            }
2995            _ => {
2996                return Err(anyhow!(
2997                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2998                ));
2999            }
3000        }
3001        Ok(())
3002    }
3003
3004    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3005        let mut pairs: Vec<_> = row.iter().collect();
3006        pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3007
3008        pairs
3009            .into_iter()
3010            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3011            .collect::<Vec<_>>()
3012            .join("|")
3013    }
3014
3015    fn canonical_value_key(v: &Value) -> String {
3016        match v {
3017            Value::Null => "null".to_string(),
3018            Value::Bool(b) => format!("b:{b}"),
3019            Value::Int(i) => format!("n:{i}"),
3020            Value::Float(f) => {
3021                if f.is_nan() {
3022                    "nan".to_string()
3023                } else if f.is_infinite() {
3024                    if f.is_sign_positive() {
3025                        "inf:+".to_string()
3026                    } else {
3027                        "inf:-".to_string()
3028                    }
3029                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3030                    format!("n:{}", *f as i64)
3031                } else {
3032                    format!("f:{f}")
3033                }
3034            }
3035            Value::String(s) => {
3036                if let Some(k) = Self::temporal_string_key(s) {
3037                    format!("temporal:{k}")
3038                } else {
3039                    format!("s:{s}")
3040                }
3041            }
3042            Value::Bytes(b) => format!("bytes:{:?}", b),
3043            Value::List(items) => format!(
3044                "list:[{}]",
3045                items
3046                    .iter()
3047                    .map(Self::canonical_value_key)
3048                    .collect::<Vec<_>>()
3049                    .join(",")
3050            ),
3051            Value::Map(map) => {
3052                let mut pairs: Vec<_> = map.iter().collect();
3053                pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3054                format!(
3055                    "map:{{{}}}",
3056                    pairs
3057                        .into_iter()
3058                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3059                        .collect::<Vec<_>>()
3060                        .join(",")
3061                )
3062            }
3063            Value::Node(n) => {
3064                let mut labels = n.labels.clone();
3065                labels.sort();
3066                format!(
3067                    "node:{}:{}:{}",
3068                    n.vid.as_u64(),
3069                    labels.join(":"),
3070                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3071                )
3072            }
3073            Value::Edge(e) => format!(
3074                "edge:{}:{}:{}:{}:{}",
3075                e.eid.as_u64(),
3076                e.edge_type,
3077                e.src.as_u64(),
3078                e.dst.as_u64(),
3079                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3080            ),
3081            Value::Path(p) => format!(
3082                "path:nodes=[{}];edges=[{}]",
3083                p.nodes
3084                    .iter()
3085                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3086                    .collect::<Vec<_>>()
3087                    .join(","),
3088                p.edges
3089                    .iter()
3090                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3091                    .collect::<Vec<_>>()
3092                    .join(",")
3093            ),
3094            Value::Vector(vs) => format!("vec:{:?}", vs),
3095            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3096            _ => format!("{v:?}"),
3097        }
3098    }
3099
3100    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3101        match t {
3102            uni_common::TemporalValue::Date { days_since_epoch } => {
3103                format!("date:{days_since_epoch}")
3104            }
3105            uni_common::TemporalValue::LocalTime {
3106                nanos_since_midnight,
3107            } => format!("localtime:{nanos_since_midnight}"),
3108            uni_common::TemporalValue::Time {
3109                nanos_since_midnight,
3110                offset_seconds,
3111            } => {
3112                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3113                format!("time:{utc_nanos}")
3114            }
3115            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3116                format!("localdatetime:{nanos_since_epoch}")
3117            }
3118            uni_common::TemporalValue::DateTime {
3119                nanos_since_epoch, ..
3120            } => format!("datetime:{nanos_since_epoch}"),
3121            uni_common::TemporalValue::Duration {
3122                months,
3123                days,
3124                nanos,
3125            } => format!("duration:{months}:{days}:{nanos}"),
3126        }
3127    }
3128
3129    fn temporal_string_key(s: &str) -> Option<String> {
3130        let fn_name = match classify_temporal(s)? {
3131            uni_common::TemporalType::Date => "DATE",
3132            uni_common::TemporalType::LocalTime => "LOCALTIME",
3133            uni_common::TemporalType::Time => "TIME",
3134            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3135            uni_common::TemporalType::DateTime => "DATETIME",
3136            uni_common::TemporalType::Duration => "DURATION",
3137        };
3138        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3139            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3140            _ => None,
3141        }
3142    }
3143
3144    /// Execute aggregate operation: GROUP BY + aggregate functions.
3145    /// Interval for timeout checks in aggregate loops.
3146    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3147
3148    pub(crate) async fn execute_aggregate(
3149        &self,
3150        rows: Vec<HashMap<String, Value>>,
3151        group_by: &[Expr],
3152        aggregates: &[Expr],
3153        prop_manager: &PropertyManager,
3154        params: &HashMap<String, Value>,
3155        ctx: Option<&QueryContext>,
3156    ) -> Result<Vec<HashMap<String, Value>>> {
3157        // CWE-400: Check timeout before aggregation
3158        if let Some(ctx) = ctx {
3159            ctx.check_timeout()?;
3160        }
3161
3162        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3163
3164        // Cypher semantics: aggregation without grouping keys returns one row even
3165        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3166        if rows.is_empty() {
3167            if group_by.is_empty() {
3168                let accs = Self::create_accumulators(aggregates);
3169                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3170                return Ok(vec![row]);
3171            }
3172            return Ok(vec![]);
3173        }
3174
3175        for (idx, row) in rows.into_iter().enumerate() {
3176            // Periodic timeout check during aggregation
3177            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3178                && let Some(ctx) = ctx
3179            {
3180                ctx.check_timeout()?;
3181            }
3182
3183            let key_vals = self
3184                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3185                .await?;
3186            // Build a canonical key so grouping follows Cypher value semantics
3187            // (e.g. temporal equality by instant, numeric normalization where applicable).
3188            let key_str = format!(
3189                "[{}]",
3190                key_vals
3191                    .iter()
3192                    .map(Self::canonical_value_key)
3193                    .collect::<Vec<_>>()
3194                    .join(",")
3195            );
3196
3197            let entry = groups
3198                .entry(key_str)
3199                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3200
3201            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3202                .await?;
3203        }
3204
3205        let results = groups
3206            .values()
3207            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3208            .collect();
3209
3210        Ok(results)
3211    }
3212
3213    pub(crate) async fn execute_window(
3214        &self,
3215        mut rows: Vec<HashMap<String, Value>>,
3216        window_exprs: &[Expr],
3217        _prop_manager: &PropertyManager,
3218        _params: &HashMap<String, Value>,
3219        ctx: Option<&QueryContext>,
3220    ) -> Result<Vec<HashMap<String, Value>>> {
3221        // CWE-400: Check timeout before window computation
3222        if let Some(ctx) = ctx {
3223            ctx.check_timeout()?;
3224        }
3225
3226        // If no rows or no window expressions, return as-is
3227        if rows.is_empty() || window_exprs.is_empty() {
3228            return Ok(rows);
3229        }
3230
3231        // Process each window function expression
3232        for window_expr in window_exprs {
3233            // Extract window function details
3234            let Expr::FunctionCall {
3235                name,
3236                args,
3237                window_spec: Some(window_spec),
3238                ..
3239            } = window_expr
3240            else {
3241                return Err(anyhow!(
3242                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3243                    window_expr
3244                ));
3245            };
3246
3247            let name_upper = name.to_uppercase();
3248
3249            // Validate it's a supported window function
3250            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3251                return Err(anyhow!(
3252                    "Unsupported window function: {}. Supported functions: {}",
3253                    name,
3254                    WINDOW_FUNCTIONS.join(", ")
3255                ));
3256            }
3257
3258            // Build partition groups based on PARTITION BY clause
3259            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3260
3261            for (row_idx, row) in rows.iter().enumerate() {
3262                // Evaluate partition key
3263                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3264                    // No partitioning - all rows in one partition
3265                    vec![]
3266                } else {
3267                    window_spec
3268                        .partition_by
3269                        .iter()
3270                        .map(|expr| self.evaluate_simple_expr(expr, row))
3271                        .collect::<Result<Vec<_>>>()?
3272                };
3273
3274                partition_map
3275                    .entry(partition_key)
3276                    .or_default()
3277                    .push(row_idx);
3278            }
3279
3280            // Process each partition
3281            for (_partition_key, row_indices) in partition_map.iter_mut() {
3282                // Sort rows within partition by ORDER BY clause
3283                if !window_spec.order_by.is_empty() {
3284                    row_indices.sort_by(|&a, &b| {
3285                        for sort_item in &window_spec.order_by {
3286                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3287                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3288
3289                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3290                                let cmp = Executor::compare_values(&va, &vb);
3291                                let cmp = if sort_item.ascending {
3292                                    cmp
3293                                } else {
3294                                    cmp.reverse()
3295                                };
3296                                if cmp != std::cmp::Ordering::Equal {
3297                                    return cmp;
3298                                }
3299                            }
3300                        }
3301                        std::cmp::Ordering::Equal
3302                    });
3303                }
3304
3305                // Compute window function values for this partition
3306                for (position, &row_idx) in row_indices.iter().enumerate() {
3307                    let window_value = match name_upper.as_str() {
3308                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3309                        "RANK" => {
3310                            // RANK: position (1-indexed) of first row in group of tied rows
3311                            let rank = if position == 0 {
3312                                1i64
3313                            } else {
3314                                let prev_row_idx = row_indices[position - 1];
3315                                let same_as_prev = self.rows_have_same_sort_keys(
3316                                    &window_spec.order_by,
3317                                    &rows,
3318                                    row_idx,
3319                                    prev_row_idx,
3320                                );
3321
3322                                if same_as_prev {
3323                                    // Walk backwards to find where this group started
3324                                    let mut group_start = position - 1;
3325                                    while group_start > 0 {
3326                                        let curr_idx = row_indices[group_start];
3327                                        let prev_idx = row_indices[group_start - 1];
3328                                        if !self.rows_have_same_sort_keys(
3329                                            &window_spec.order_by,
3330                                            &rows,
3331                                            curr_idx,
3332                                            prev_idx,
3333                                        ) {
3334                                            break;
3335                                        }
3336                                        group_start -= 1;
3337                                    }
3338                                    (group_start + 1) as i64
3339                                } else {
3340                                    (position + 1) as i64
3341                                }
3342                            };
3343                            Value::from(rank)
3344                        }
3345                        "DENSE_RANK" => {
3346                            // Dense rank: continuous ranking without gaps
3347                            let mut dense_rank = 1i64;
3348                            for i in 0..position {
3349                                let curr_idx = row_indices[i + 1];
3350                                let prev_idx = row_indices[i];
3351                                if !self.rows_have_same_sort_keys(
3352                                    &window_spec.order_by,
3353                                    &rows,
3354                                    curr_idx,
3355                                    prev_idx,
3356                                ) {
3357                                    dense_rank += 1;
3358                                }
3359                            }
3360                            Value::from(dense_rank)
3361                        }
3362                        "LAG" => {
3363                            let (value_expr, offset, default_value) =
3364                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3365
3366                            if position >= offset {
3367                                let target_idx = row_indices[position - offset];
3368                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3369                            } else {
3370                                default_value
3371                            }
3372                        }
3373                        "LEAD" => {
3374                            let (value_expr, offset, default_value) =
3375                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3376
3377                            if position + offset < row_indices.len() {
3378                                let target_idx = row_indices[position + offset];
3379                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3380                            } else {
3381                                default_value
3382                            }
3383                        }
3384                        "NTILE" => {
3385                            // Extract num_buckets argument: NTILE(num_buckets)
3386                            let num_buckets_expr = args.first().ok_or_else(|| {
3387                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3388                            })?;
3389                            let num_buckets_val =
3390                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3391                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3392                                anyhow!(
3393                                    "NTILE argument must be an integer, got: {:?}",
3394                                    num_buckets_val
3395                                )
3396                            })?;
3397
3398                            if num_buckets <= 0 {
3399                                return Err(anyhow!(
3400                                    "NTILE bucket count must be positive, got: {}",
3401                                    num_buckets
3402                                ));
3403                            }
3404
3405                            let num_buckets = num_buckets as usize;
3406                            let partition_size = row_indices.len();
3407
3408                            // Calculate bucket assignment using standard algorithm
3409                            // For N rows and B buckets:
3410                            // - Base size: N / B
3411                            // - Extra rows: N % B (go to first buckets)
3412                            let base_size = partition_size / num_buckets;
3413                            let extra_rows = partition_size % num_buckets;
3414
3415                            // Determine bucket for current row
3416                            let bucket = if position < extra_rows * (base_size + 1) {
3417                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3418                                position / (base_size + 1) + 1
3419                            } else {
3420                                // Row is in one of the normal-sized buckets
3421                                let adjusted_position = position - extra_rows * (base_size + 1);
3422                                extra_rows + (adjusted_position / base_size) + 1
3423                            };
3424
3425                            Value::from(bucket as i64)
3426                        }
3427                        "FIRST_VALUE" => {
3428                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3429                            let value_expr = args.first().ok_or_else(|| {
3430                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3431                            })?;
3432
3433                            // Get the first row in the partition (after ordering)
3434                            if row_indices.is_empty() {
3435                                Value::Null
3436                            } else {
3437                                let first_idx = row_indices[0];
3438                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3439                            }
3440                        }
3441                        "LAST_VALUE" => {
3442                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3443                            let value_expr = args.first().ok_or_else(|| {
3444                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3445                            })?;
3446
3447                            // Get the last row in the partition (after ordering)
3448                            if row_indices.is_empty() {
3449                                Value::Null
3450                            } else {
3451                                let last_idx = row_indices[row_indices.len() - 1];
3452                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3453                            }
3454                        }
3455                        "NTH_VALUE" => {
3456                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3457                            if args.len() != 2 {
3458                                return Err(anyhow!(
3459                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3460                                ));
3461                            }
3462
3463                            let value_expr = &args[0];
3464                            let n_expr = &args[1];
3465
3466                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3467                            let n = n_val.as_i64().ok_or_else(|| {
3468                                anyhow!(
3469                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3470                                    n_val
3471                                )
3472                            })?;
3473
3474                            if n <= 0 {
3475                                return Err(anyhow!(
3476                                    "NTH_VALUE position must be positive, got: {}",
3477                                    n
3478                                ));
3479                            }
3480
3481                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3482                            if nth_index < row_indices.len() {
3483                                let nth_idx = row_indices[nth_index];
3484                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3485                            } else {
3486                                Value::Null
3487                            }
3488                        }
3489                        _ => unreachable!("Window function {} already validated", name),
3490                    };
3491
3492                    // Add window function result to row
3493                    // Use the window expression's string representation as the column name
3494                    let col_name = window_expr.to_string_repr();
3495                    rows[row_idx].insert(col_name, window_value);
3496                }
3497            }
3498        }
3499
3500        Ok(rows)
3501    }
3502
3503    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3504    ///
3505    /// Uses `&self` for consistency with other evaluation methods, though it only
3506    /// recurses for property access.
3507    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3508        match expr {
3509            Expr::Variable(name) => row
3510                .get(name)
3511                .cloned()
3512                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3513            Expr::Property(base, prop) => {
3514                let base_val = self.evaluate_simple_expr(base, row)?;
3515                if let Value::Map(map) = base_val {
3516                    map.get(prop)
3517                        .cloned()
3518                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3519                } else {
3520                    Err(anyhow!("Cannot access property on non-object"))
3521                }
3522            }
3523            Expr::Literal(lit) => Ok(lit.to_value()),
3524            _ => Err(anyhow!(
3525                "Unsupported expression in window function: {:?}",
3526                expr
3527            )),
3528        }
3529    }
3530
3531    /// Check if two rows have matching sort keys for ranking functions.
3532    fn rows_have_same_sort_keys(
3533        &self,
3534        order_by: &[uni_cypher::ast::SortItem],
3535        rows: &[HashMap<String, Value>],
3536        idx_a: usize,
3537        idx_b: usize,
3538    ) -> bool {
3539        order_by.iter().all(|sort_item| {
3540            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3541            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3542            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3543        })
3544    }
3545
3546    /// Extract offset and default value for LAG/LEAD window functions.
3547    fn extract_lag_lead_params<'a>(
3548        &self,
3549        func_name: &str,
3550        args: &'a [Expr],
3551        row: &HashMap<String, Value>,
3552    ) -> Result<(&'a Expr, usize, Value)> {
3553        let value_expr = args.first().ok_or_else(|| {
3554            anyhow!(
3555                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3556                func_name,
3557                func_name
3558            )
3559        })?;
3560
3561        let offset = if let Some(offset_expr) = args.get(1) {
3562            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3563            offset_val.as_i64().ok_or_else(|| {
3564                anyhow!(
3565                    "{} offset must be an integer, got: {:?}",
3566                    func_name,
3567                    offset_val
3568                )
3569            })? as usize
3570        } else {
3571            1
3572        };
3573
3574        let default_value = if let Some(default_expr) = args.get(2) {
3575            self.evaluate_simple_expr(default_expr, row)?
3576        } else {
3577            Value::Null
3578        };
3579
3580        Ok((value_expr, offset, default_value))
3581    }
3582
3583    /// Evaluate group-by key expressions for a row.
3584    pub(crate) async fn evaluate_group_keys(
3585        &self,
3586        group_by: &[Expr],
3587        row: &HashMap<String, Value>,
3588        prop_manager: &PropertyManager,
3589        params: &HashMap<String, Value>,
3590        ctx: Option<&QueryContext>,
3591    ) -> Result<Vec<Value>> {
3592        let mut key_vals = Vec::new();
3593        for expr in group_by {
3594            key_vals.push(
3595                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3596                    .await?,
3597            );
3598        }
3599        Ok(key_vals)
3600    }
3601
3602    /// Update accumulators with values from the current row.
3603    pub(crate) async fn update_accumulators(
3604        &self,
3605        accs: &mut [Accumulator],
3606        aggregates: &[Expr],
3607        row: &HashMap<String, Value>,
3608        prop_manager: &PropertyManager,
3609        params: &HashMap<String, Value>,
3610        ctx: Option<&QueryContext>,
3611    ) -> Result<()> {
3612        for (i, agg_expr) in aggregates.iter().enumerate() {
3613            if let Expr::FunctionCall { args, .. } = agg_expr {
3614                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3615                let val = if is_wildcard {
3616                    Value::Null
3617                } else {
3618                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3619                        .await?
3620                };
3621                accs[i].update(&val, is_wildcard);
3622            }
3623        }
3624        Ok(())
3625    }
3626
3627    /// Execute sort operation with ORDER BY clauses.
3628    pub(crate) async fn execute_recursive_cte(
3629        &self,
3630        cte_name: &str,
3631        initial: LogicalPlan,
3632        recursive: LogicalPlan,
3633        prop_manager: &PropertyManager,
3634        params: &HashMap<String, Value>,
3635        ctx: Option<&QueryContext>,
3636    ) -> Result<Vec<HashMap<String, Value>>> {
3637        use std::collections::HashSet;
3638
3639        // Helper to create a stable key for cycle detection.
3640        // Uses sorted keys to ensure consistent ordering.
3641        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3642            let mut pairs: Vec<_> = row.iter().collect();
3643            pairs.sort_by(|a, b| a.0.cmp(b.0));
3644            format!("{:?}", pairs)
3645        }
3646
3647        // 1. Execute Anchor
3648        let mut working_table = self
3649            .execute_subplan(initial, prop_manager, params, ctx)
3650            .await?;
3651        let mut result_table = working_table.clone();
3652
3653        // Track seen rows for cycle detection
3654        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3655
3656        // 2. Loop
3657        // Safety: Max iterations to prevent infinite loop
3658        // TODO: expose this via UniConfig for user control
3659        let max_iterations = 1000;
3660        for _iteration in 0..max_iterations {
3661            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3662            if let Some(ctx) = ctx {
3663                ctx.check_timeout()?;
3664            }
3665
3666            if working_table.is_empty() {
3667                break;
3668            }
3669
3670            // Bind working table to CTE name in params
3671            let working_val = Value::List(
3672                working_table
3673                    .iter()
3674                    .map(|row| {
3675                        if row.len() == 1 {
3676                            row.values().next().unwrap().clone()
3677                        } else {
3678                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3679                        }
3680                    })
3681                    .collect(),
3682            );
3683
3684            let mut next_params = params.clone();
3685            next_params.insert(cte_name.to_string(), working_val);
3686
3687            // Execute recursive part
3688            let next_result = self
3689                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3690                .await?;
3691
3692            if next_result.is_empty() {
3693                break;
3694            }
3695
3696            // Filter out already-seen rows (cycle detection)
3697            let new_rows: Vec<_> = next_result
3698                .into_iter()
3699                .filter(|row| {
3700                    let key = row_key(row);
3701                    seen.insert(key) // Returns false if already present
3702                })
3703                .collect();
3704
3705            if new_rows.is_empty() {
3706                // All results were cycles - terminate
3707                break;
3708            }
3709
3710            result_table.extend(new_rows.clone());
3711            working_table = new_rows;
3712        }
3713
3714        // Output accumulated results as a variable
3715        let final_list = Value::List(
3716            result_table
3717                .into_iter()
3718                .map(|row| {
3719                    // If the CTE returns a single column and we want to treat it as a list of values?
3720                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3721                    // Cypher LISTs usually contain values.
3722                    // If the row has 1 column, maybe unwrap?
3723                    // But SQL CTEs are tables.
3724                    // Let's stick to List<Map> for consistency with how we pass it in.
3725                    // UNLESS the user extracts it.
3726                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3727                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3728                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3729                    // If row has 1 col, users expect `[val, val]`.
3730                    if row.len() == 1 {
3731                        row.values().next().unwrap().clone()
3732                    } else {
3733                        Value::Map(row.into_iter().collect())
3734                    }
3735                })
3736                .collect(),
3737        );
3738
3739        let mut final_row = HashMap::new();
3740        final_row.insert(cte_name.to_string(), final_list);
3741        Ok(vec![final_row])
3742    }
3743
3744    /// Interval for timeout checks in sort loops.
3745    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3746
3747    pub(crate) async fn execute_sort(
3748        &self,
3749        rows: Vec<HashMap<String, Value>>,
3750        order_by: &[uni_cypher::ast::SortItem],
3751        prop_manager: &PropertyManager,
3752        params: &HashMap<String, Value>,
3753        ctx: Option<&QueryContext>,
3754    ) -> Result<Vec<HashMap<String, Value>>> {
3755        // CWE-400: Check timeout before potentially expensive sort
3756        if let Some(ctx) = ctx {
3757            ctx.check_timeout()?;
3758        }
3759
3760        let mut rows_with_keys = Vec::with_capacity(rows.len());
3761        for (idx, row) in rows.into_iter().enumerate() {
3762            // Periodic timeout check during key extraction
3763            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3764                && let Some(ctx) = ctx
3765            {
3766                ctx.check_timeout()?;
3767            }
3768
3769            let mut keys = Vec::new();
3770            for item in order_by {
3771                let val = row
3772                    .get(&item.expr.to_string_repr())
3773                    .cloned()
3774                    .unwrap_or(Value::Null);
3775                let val = if val.is_null() {
3776                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3777                        .await
3778                        .unwrap_or(Value::Null)
3779                } else {
3780                    val
3781                };
3782                keys.push(val);
3783            }
3784            rows_with_keys.push((row, keys));
3785        }
3786
3787        // Check timeout again before synchronous sort (can't be interrupted)
3788        if let Some(ctx) = ctx {
3789            ctx.check_timeout()?;
3790        }
3791
3792        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3793
3794        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3795    }
3796
3797    /// Create accumulators for aggregate expressions.
3798    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3799        aggregates
3800            .iter()
3801            .map(|expr| {
3802                if let Expr::FunctionCall { name, distinct, .. } = expr {
3803                    Accumulator::new(name, *distinct)
3804                } else {
3805                    Accumulator::new("COUNT", false)
3806                }
3807            })
3808            .collect()
3809    }
3810
3811    /// Build result row from group-by keys and accumulators.
3812    pub(crate) fn build_aggregate_result(
3813        group_by: &[Expr],
3814        aggregates: &[Expr],
3815        key_vals: &[Value],
3816        accs: &[Accumulator],
3817    ) -> HashMap<String, Value> {
3818        let mut res_row = HashMap::new();
3819        for (i, expr) in group_by.iter().enumerate() {
3820            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3821        }
3822        for (i, expr) in aggregates.iter().enumerate() {
3823            // Use aggregate_column_name to ensure consistency with planner
3824            let col_name = crate::query::planner::aggregate_column_name(expr);
3825            res_row.insert(col_name, accs[i].finish());
3826        }
3827        res_row
3828    }
3829
3830    /// Compare and return ordering for sort operation.
3831    pub(crate) fn compare_sort_keys(
3832        a_keys: &[Value],
3833        b_keys: &[Value],
3834        order_by: &[uni_cypher::ast::SortItem],
3835    ) -> std::cmp::Ordering {
3836        for (i, item) in order_by.iter().enumerate() {
3837            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3838            if order != std::cmp::Ordering::Equal {
3839                return if item.ascending {
3840                    order
3841                } else {
3842                    order.reverse()
3843                };
3844            }
3845        }
3846        std::cmp::Ordering::Equal
3847    }
3848
3849    /// Executes BACKUP command to local or cloud storage.
3850    ///
3851    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
3852    pub(crate) async fn execute_backup(
3853        &self,
3854        destination: &str,
3855        _options: &HashMap<String, Value>,
3856    ) -> Result<Vec<HashMap<String, Value>>> {
3857        // 1. Flush L0
3858        if let Some(writer_arc) = &self.writer {
3859            let mut writer = writer_arc.write().await;
3860            writer.flush_to_l1(None).await?;
3861        }
3862
3863        // 2. Snapshot
3864        let snapshot_manager = self.storage.snapshot_manager();
3865        let snapshot = snapshot_manager
3866            .load_latest_snapshot()
3867            .await?
3868            .ok_or_else(|| anyhow!("No snapshot found"))?;
3869
3870        // 3. Copy files - cloud or local path
3871        if is_cloud_url(destination) {
3872            self.backup_to_cloud(destination, &snapshot.snapshot_id)
3873                .await?;
3874        } else {
3875            // Validate local destination path against sandbox
3876            let validated_dest = self.validate_path(destination)?;
3877            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3878                .await?;
3879        }
3880
3881        let mut res = HashMap::new();
3882        res.insert(
3883            "status".to_string(),
3884            Value::String("Backup completed".to_string()),
3885        );
3886        res.insert(
3887            "snapshot_id".to_string(),
3888            Value::String(snapshot.snapshot_id),
3889        );
3890        Ok(vec![res])
3891    }
3892
3893    /// Backs up database to a local filesystem destination.
3894    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3895        let source_path = std::path::Path::new(self.storage.base_path());
3896
3897        if !dest_path.exists() {
3898            std::fs::create_dir_all(dest_path)?;
3899        }
3900
3901        // Recursive copy (local to local)
3902        if source_path.exists() {
3903            Self::copy_dir_all(source_path, dest_path)?;
3904        }
3905
3906        // Copy schema to destination/catalog/schema.json
3907        let schema_manager = self.storage.schema_manager();
3908        let dest_catalog = dest_path.join("catalog");
3909        if !dest_catalog.exists() {
3910            std::fs::create_dir_all(&dest_catalog)?;
3911        }
3912
3913        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3914        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3915
3916        Ok(())
3917    }
3918
3919    /// Backs up database to a cloud storage destination.
3920    ///
3921    /// Streams data from source to destination, supporting cross-cloud backups.
3922    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3923        use object_store::ObjectStore;
3924        use object_store::local::LocalFileSystem;
3925        use object_store::path::Path as ObjPath;
3926
3927        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3928        let source_path = std::path::Path::new(self.storage.base_path());
3929
3930        // Create local store for source, coerced to dyn ObjectStore
3931        let src_store: Arc<dyn ObjectStore> =
3932            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3933
3934        // Copy catalog/ directory
3935        let catalog_src = ObjPath::from("catalog");
3936        let catalog_dst = if dest_prefix.as_ref().is_empty() {
3937            ObjPath::from("catalog")
3938        } else {
3939            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3940        };
3941        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3942
3943        // Copy storage/ directory
3944        let storage_src = ObjPath::from("storage");
3945        let storage_dst = if dest_prefix.as_ref().is_empty() {
3946            ObjPath::from("storage")
3947        } else {
3948            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3949        };
3950        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3951
3952        // Ensure schema is present at canonical catalog location.
3953        let schema_manager = self.storage.schema_manager();
3954        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3955        let schema_path = if dest_prefix.as_ref().is_empty() {
3956            ObjPath::from("catalog/schema.json")
3957        } else {
3958            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3959        };
3960        dest_store
3961            .put(&schema_path, bytes::Bytes::from(schema_content).into())
3962            .await?;
3963
3964        Ok(())
3965    }
3966
3967    /// Maximum directory depth for backup operations.
3968    ///
3969    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
3970    /// excessively deep directory structures.
3971    const MAX_BACKUP_DEPTH: usize = 100;
3972
3973    /// Maximum file count for backup operations.
3974    ///
3975    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
3976    /// long-running operations from malicious or unexpectedly large directories.
3977    const MAX_BACKUP_FILES: usize = 100_000;
3978
3979    /// Recursively copies a directory with security limits.
3980    ///
3981    /// # Security
3982    ///
3983    /// - **CWE-674**: Depth limit prevents stack overflow
3984    /// - **CWE-400**: File count limit prevents resource exhaustion
3985    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
3986    pub(crate) fn copy_dir_all(
3987        src: &std::path::Path,
3988        dst: &std::path::Path,
3989    ) -> std::io::Result<()> {
3990        let mut file_count = 0usize;
3991        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3992    }
3993
3994    /// Internal implementation with depth and file count tracking.
3995    pub(crate) fn copy_dir_all_impl(
3996        src: &std::path::Path,
3997        dst: &std::path::Path,
3998        depth: usize,
3999        file_count: &mut usize,
4000    ) -> std::io::Result<()> {
4001        if depth >= Self::MAX_BACKUP_DEPTH {
4002            return Err(std::io::Error::new(
4003                std::io::ErrorKind::InvalidInput,
4004                format!(
4005                    "Maximum backup depth {} exceeded at {:?}",
4006                    Self::MAX_BACKUP_DEPTH,
4007                    src
4008                ),
4009            ));
4010        }
4011
4012        std::fs::create_dir_all(dst)?;
4013
4014        for entry in std::fs::read_dir(src)? {
4015            if *file_count >= Self::MAX_BACKUP_FILES {
4016                return Err(std::io::Error::new(
4017                    std::io::ErrorKind::InvalidInput,
4018                    format!(
4019                        "Maximum backup file count {} exceeded",
4020                        Self::MAX_BACKUP_FILES
4021                    ),
4022                ));
4023            }
4024            *file_count += 1;
4025
4026            let entry = entry?;
4027            let metadata = entry.metadata()?;
4028
4029            // Skip symlinks to prevent loops and traversal attacks
4030            if metadata.file_type().is_symlink() {
4031                // Silently skip - logging would require tracing dependency
4032                continue;
4033            }
4034
4035            let dst_path = dst.join(entry.file_name());
4036            if metadata.is_dir() {
4037                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4038            } else {
4039                std::fs::copy(entry.path(), dst_path)?;
4040            }
4041        }
4042        Ok(())
4043    }
4044
4045    pub(crate) async fn execute_copy(
4046        &self,
4047        target: &str,
4048        source: &str,
4049        options: &HashMap<String, Value>,
4050        prop_manager: &PropertyManager,
4051    ) -> Result<Vec<HashMap<String, Value>>> {
4052        let format = options
4053            .get("format")
4054            .and_then(|v| v.as_str())
4055            .unwrap_or_else(|| {
4056                if source.ends_with(".parquet") {
4057                    "parquet"
4058                } else {
4059                    "csv"
4060                }
4061            });
4062
4063        match format.to_lowercase().as_str() {
4064            "csv" => self.execute_csv_import(target, source, options).await,
4065            "parquet" => {
4066                self.execute_parquet_import(target, source, options, prop_manager)
4067                    .await
4068            }
4069            _ => Err(anyhow!("Unsupported format: {}", format)),
4070        }
4071    }
4072
4073    pub(crate) async fn execute_csv_import(
4074        &self,
4075        target: &str,
4076        source: &str,
4077        options: &HashMap<String, Value>,
4078    ) -> Result<Vec<HashMap<String, Value>>> {
4079        // Validate source path against sandbox
4080        let validated_source = self.validate_path(source)?;
4081
4082        let writer_lock = self
4083            .writer
4084            .as_ref()
4085            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4086
4087        let schema = self.storage.schema_manager().schema();
4088
4089        // 1. Determine if target is Label or EdgeType
4090        let label_meta = schema.labels.get(target);
4091        let edge_meta = schema.edge_types.get(target);
4092
4093        if label_meta.is_none() && edge_meta.is_none() {
4094            return Err(anyhow!("Target '{}' not found in schema", target));
4095        }
4096
4097        // 2. Open CSV
4098        let delimiter_str = options
4099            .get("delimiter")
4100            .and_then(|v| v.as_str())
4101            .unwrap_or(",");
4102        let delimiter = if delimiter_str.is_empty() {
4103            b','
4104        } else {
4105            delimiter_str.as_bytes()[0]
4106        };
4107        let has_header = options
4108            .get("header")
4109            .and_then(|v| v.as_bool())
4110            .unwrap_or(true);
4111
4112        let mut rdr = csv::ReaderBuilder::new()
4113            .delimiter(delimiter)
4114            .has_headers(has_header)
4115            .from_path(&validated_source)?;
4116
4117        let headers = rdr.headers()?.clone();
4118        let mut count = 0;
4119
4120        let mut writer = writer_lock.write().await;
4121
4122        if label_meta.is_some() {
4123            let target_props = schema
4124                .properties
4125                .get(target)
4126                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4127
4128            for result in rdr.records() {
4129                let record = result?;
4130                let mut props = HashMap::new();
4131
4132                for (i, header) in headers.iter().enumerate() {
4133                    if let Some(val_str) = record.get(i)
4134                        && let Some(prop_meta) = target_props.get(header)
4135                    {
4136                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4137                        props.insert(header.to_string(), val);
4138                    }
4139                }
4140
4141                let vid = writer.next_vid().await?;
4142                writer
4143                    .insert_vertex_with_labels(vid, props, &[target.to_string()])
4144                    .await?;
4145                count += 1;
4146            }
4147        } else if let Some(meta) = edge_meta {
4148            let type_id = meta.id;
4149            let target_props = schema
4150                .properties
4151                .get(target)
4152                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4153
4154            // For edges, we need src and dst VIDs.
4155            // Expecting columns '_src' and '_dst' or as specified in options.
4156            let src_col = options
4157                .get("src_col")
4158                .and_then(|v| v.as_str())
4159                .unwrap_or("_src");
4160            let dst_col = options
4161                .get("dst_col")
4162                .and_then(|v| v.as_str())
4163                .unwrap_or("_dst");
4164
4165            for result in rdr.records() {
4166                let record = result?;
4167                let mut props = HashMap::new();
4168                let mut src_vid = None;
4169                let mut dst_vid = None;
4170
4171                for (i, header) in headers.iter().enumerate() {
4172                    if let Some(val_str) = record.get(i) {
4173                        if header == src_col {
4174                            src_vid =
4175                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4176                        } else if header == dst_col {
4177                            dst_vid =
4178                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4179                        } else if let Some(prop_meta) = target_props.get(header) {
4180                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4181                            props.insert(header.to_string(), val);
4182                        }
4183                    }
4184                }
4185
4186                let src =
4187                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4188                let dst = dst_vid
4189                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4190
4191                let eid = writer.next_eid(type_id).await?;
4192                writer
4193                    .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4194                    .await?;
4195                count += 1;
4196            }
4197        }
4198
4199        let mut res = HashMap::new();
4200        res.insert("count".to_string(), Value::Int(count as i64));
4201        Ok(vec![res])
4202    }
4203
4204    /// Imports data from Parquet file to a label or edge type.
4205    ///
4206    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4207    pub(crate) async fn execute_parquet_import(
4208        &self,
4209        target: &str,
4210        source: &str,
4211        options: &HashMap<String, Value>,
4212        _prop_manager: &PropertyManager,
4213    ) -> Result<Vec<HashMap<String, Value>>> {
4214        let writer_lock = self
4215            .writer
4216            .as_ref()
4217            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4218
4219        let schema = self.storage.schema_manager().schema();
4220
4221        // 1. Determine if target is Label or EdgeType
4222        let label_meta = schema.labels.get(target);
4223        let edge_meta = schema.edge_types.get(target);
4224
4225        if label_meta.is_none() && edge_meta.is_none() {
4226            return Err(anyhow!("Target '{}' not found in schema", target));
4227        }
4228
4229        // 2. Open Parquet - support both local and cloud URLs
4230        let reader = if is_cloud_url(source) {
4231            self.open_parquet_from_cloud(source).await?
4232        } else {
4233            // Validate local source path against sandbox
4234            let validated_source = self.validate_path(source)?;
4235            let file = std::fs::File::open(&validated_source)?;
4236            let builder =
4237                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4238            builder.build()?
4239        };
4240        let mut reader = reader;
4241
4242        let mut count = 0;
4243        let mut writer = writer_lock.write().await;
4244
4245        if label_meta.is_some() {
4246            let target_props = schema
4247                .properties
4248                .get(target)
4249                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4250
4251            for batch in reader.by_ref() {
4252                let batch = batch?;
4253                for row in 0..batch.num_rows() {
4254                    let mut props = HashMap::new();
4255                    for field in batch.schema().fields() {
4256                        let name = field.name();
4257                        if target_props.contains_key(name) {
4258                            let col = batch.column_by_name(name).unwrap();
4259                            if !col.is_null(row) {
4260                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4261                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4262                                let val =
4263                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4264                                props.insert(name.clone(), val);
4265                            }
4266                        }
4267                    }
4268                    let vid = writer.next_vid().await?;
4269                    writer
4270                        .insert_vertex_with_labels(vid, props, &[target.to_string()])
4271                        .await?;
4272                    count += 1;
4273                }
4274            }
4275        } else if let Some(meta) = edge_meta {
4276            let type_id = meta.id;
4277            let target_props = schema
4278                .properties
4279                .get(target)
4280                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4281
4282            let src_col = options
4283                .get("src_col")
4284                .and_then(|v| v.as_str())
4285                .unwrap_or("_src");
4286            let dst_col = options
4287                .get("dst_col")
4288                .and_then(|v| v.as_str())
4289                .unwrap_or("_dst");
4290
4291            for batch in reader {
4292                let batch = batch?;
4293                for row in 0..batch.num_rows() {
4294                    let mut props = HashMap::new();
4295                    let mut src_vid = None;
4296                    let mut dst_vid = None;
4297
4298                    for field in batch.schema().fields() {
4299                        let name = field.name();
4300                        let col = batch.column_by_name(name).unwrap();
4301                        if col.is_null(row) {
4302                            continue;
4303                        }
4304
4305                        if name == src_col {
4306                            let val = Self::arrow_to_value(col.as_ref(), row);
4307                            src_vid = Some(Self::vid_from_value(&val)?);
4308                        } else if name == dst_col {
4309                            let val = Self::arrow_to_value(col.as_ref(), row);
4310                            dst_vid = Some(Self::vid_from_value(&val)?);
4311                        } else if let Some(pm) = target_props.get(name) {
4312                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4313                            let val =
4314                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4315                            props.insert(name.clone(), val);
4316                        }
4317                    }
4318
4319                    let src = src_vid
4320                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4321                    let dst = dst_vid.ok_or_else(|| {
4322                        anyhow!("Missing destination VID in column '{}'", dst_col)
4323                    })?;
4324
4325                    let eid = writer.next_eid(type_id).await?;
4326                    writer
4327                        .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4328                        .await?;
4329                    count += 1;
4330                }
4331            }
4332        }
4333
4334        let mut res = HashMap::new();
4335        res.insert("count".to_string(), Value::Int(count as i64));
4336        Ok(vec![res])
4337    }
4338
4339    /// Opens a Parquet file from a cloud URL.
4340    ///
4341    /// Downloads the file to memory and creates a Parquet reader.
4342    async fn open_parquet_from_cloud(
4343        &self,
4344        source_url: &str,
4345    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4346        use object_store::ObjectStore;
4347
4348        let (store, path) = build_store_from_url(source_url)?;
4349
4350        // Download file contents
4351        let bytes = store.get(&path).await?.bytes().await?;
4352
4353        // Create a Parquet reader from the bytes
4354        let reader = bytes::Bytes::from(bytes.to_vec());
4355        let builder =
4356            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4357        Ok(builder.build()?)
4358    }
4359
4360    pub(crate) async fn scan_edge_type(
4361        &self,
4362        edge_type: &str,
4363        ctx: Option<&QueryContext>,
4364    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4365        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4366
4367        // 1. Scan L2 (Base)
4368        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4369
4370        // 2. Scan L1 (Delta)
4371        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4372
4373        // 3. Scan L0 (Memory) and filter tombstoned vertices
4374        if let Some(ctx) = ctx {
4375            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4376            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4377        }
4378
4379        Ok(edges
4380            .into_iter()
4381            .map(|(eid, (src, dst))| (eid, src, dst))
4382            .collect())
4383    }
4384
4385    /// Scan L2 (base) storage for edges of a given type.
4386    ///
4387    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4388    /// This L2 scan will typically find no data.
4389    pub(crate) async fn scan_edge_type_l2(
4390        &self,
4391        _edge_type: &str,
4392        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4393    ) -> Result<()> {
4394        // Edges are now stored in delta datasets (L1) via LanceDB.
4395        // Legacy L2 base edge storage is no longer used.
4396        Ok(())
4397    }
4398
4399    /// Scan L1 (delta) storage for edges of a given type.
4400    pub(crate) async fn scan_edge_type_l1(
4401        &self,
4402        edge_type: &str,
4403        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4404    ) -> Result<()> {
4405        use futures::TryStreamExt;
4406        use lancedb::query::{ExecutableQuery, QueryBase, Select};
4407
4408        if let Ok(ds) = self.storage.delta_dataset(edge_type, "fwd") {
4409            let lancedb_store = self.storage.lancedb_store();
4410            if let Ok(table) = ds.open_lancedb(lancedb_store).await {
4411                let query = table.query().select(Select::Columns(vec![
4412                    "eid".into(),
4413                    "src_vid".into(),
4414                    "dst_vid".into(),
4415                    "op".into(),
4416                    "_version".into(),
4417                ]));
4418
4419                if let Ok(stream) = query.execute().await {
4420                    let batches: Vec<arrow_array::RecordBatch> =
4421                        stream.try_collect().await.unwrap_or_default();
4422
4423                    // Collect ops with versions: eid -> (version, op, src, dst)
4424                    let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4425                        HashMap::new();
4426
4427                    for batch in batches {
4428                        self.process_delta_batch(&batch, &mut versioned_ops)?;
4429                    }
4430
4431                    // Apply the winning ops
4432                    for (eid, (_, op, src, dst)) in versioned_ops {
4433                        if op == 0 {
4434                            edges.insert(eid, (src, dst));
4435                        } else if op == 1 {
4436                            edges.remove(&eid);
4437                        }
4438                    }
4439                }
4440            }
4441        }
4442        Ok(())
4443    }
4444
4445    /// Process a delta batch, tracking versioned operations.
4446    pub(crate) fn process_delta_batch(
4447        &self,
4448        batch: &arrow_array::RecordBatch,
4449        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4450    ) -> Result<()> {
4451        use arrow_array::UInt64Array;
4452        let eid_col = batch
4453            .column_by_name("eid")
4454            .ok_or(anyhow!("Missing eid"))?
4455            .as_any()
4456            .downcast_ref::<UInt64Array>()
4457            .ok_or(anyhow!("Invalid eid"))?;
4458        let src_col = batch
4459            .column_by_name("src_vid")
4460            .ok_or(anyhow!("Missing src_vid"))?
4461            .as_any()
4462            .downcast_ref::<UInt64Array>()
4463            .ok_or(anyhow!("Invalid src_vid"))?;
4464        let dst_col = batch
4465            .column_by_name("dst_vid")
4466            .ok_or(anyhow!("Missing dst_vid"))?
4467            .as_any()
4468            .downcast_ref::<UInt64Array>()
4469            .ok_or(anyhow!("Invalid dst_vid"))?;
4470        let op_col = batch
4471            .column_by_name("op")
4472            .ok_or(anyhow!("Missing op"))?
4473            .as_any()
4474            .downcast_ref::<arrow_array::UInt8Array>()
4475            .ok_or(anyhow!("Invalid op"))?;
4476        let version_col = batch
4477            .column_by_name("_version")
4478            .ok_or(anyhow!("Missing _version"))?
4479            .as_any()
4480            .downcast_ref::<UInt64Array>()
4481            .ok_or(anyhow!("Invalid _version"))?;
4482
4483        for i in 0..batch.num_rows() {
4484            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4485            let version = version_col.value(i);
4486            let op = op_col.value(i);
4487            let src = Vid::from(src_col.value(i));
4488            let dst = Vid::from(dst_col.value(i));
4489
4490            match versioned_ops.entry(eid) {
4491                std::collections::hash_map::Entry::Vacant(e) => {
4492                    e.insert((version, op, src, dst));
4493                }
4494                std::collections::hash_map::Entry::Occupied(mut e) => {
4495                    if version > e.get().0 {
4496                        e.insert((version, op, src, dst));
4497                    }
4498                }
4499            }
4500        }
4501        Ok(())
4502    }
4503
4504    /// Scan L0 (memory) buffers for edges of a given type.
4505    pub(crate) fn scan_edge_type_l0(
4506        &self,
4507        edge_type: &str,
4508        ctx: &QueryContext,
4509        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4510    ) {
4511        let schema = self.storage.schema_manager().schema();
4512        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4513
4514        if let Some(type_id) = type_id {
4515            // Main L0
4516            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4517
4518            // Transaction L0
4519            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4520                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4521            }
4522
4523            // Pending flush L0s
4524            for pending_l0_arc in &ctx.pending_flush_l0s {
4525                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4526            }
4527        }
4528    }
4529
4530    /// Scan a single L0 buffer for edges and apply tombstones.
4531    pub(crate) fn scan_single_l0(
4532        &self,
4533        l0: &uni_store::runtime::L0Buffer,
4534        type_id: u32,
4535        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4536    ) {
4537        for edge_entry in l0.graph.edges() {
4538            if edge_entry.edge_type == type_id {
4539                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4540            }
4541        }
4542        // Process Tombstones
4543        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4544        for eid in eids_to_check {
4545            if l0.is_tombstoned(eid) {
4546                edges.remove(&eid);
4547            }
4548        }
4549    }
4550
4551    /// Filter out edges connected to tombstoned vertices.
4552    pub(crate) fn filter_tombstoned_vertex_edges(
4553        &self,
4554        ctx: &QueryContext,
4555        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4556    ) {
4557        let l0 = ctx.l0.read();
4558        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4559
4560        // Include tx_l0 vertex tombstones if present
4561        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4562            let tx_l0 = tx_l0_arc.read();
4563            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4564        }
4565
4566        // Include pending flush L0 vertex tombstones
4567        for pending_l0_arc in &ctx.pending_flush_l0s {
4568            let pending_l0 = pending_l0_arc.read();
4569            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4570        }
4571
4572        edges.retain(|_, (src, dst)| {
4573            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4574        });
4575    }
4576
4577    /// Execute a projection operation.
4578    pub(crate) async fn execute_project(
4579        &self,
4580        input_rows: Vec<HashMap<String, Value>>,
4581        projections: &[(Expr, Option<String>)],
4582        prop_manager: &PropertyManager,
4583        params: &HashMap<String, Value>,
4584        ctx: Option<&QueryContext>,
4585    ) -> Result<Vec<HashMap<String, Value>>> {
4586        let mut results = Vec::new();
4587        for m in input_rows {
4588            let mut row = HashMap::new();
4589            for (expr, alias) in projections {
4590                let val = self
4591                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4592                    .await?;
4593                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4594                row.insert(name, val);
4595            }
4596            results.push(row);
4597        }
4598        Ok(results)
4599    }
4600
4601    /// Execute an UNWIND operation.
4602    pub(crate) async fn execute_unwind(
4603        &self,
4604        input_rows: Vec<HashMap<String, Value>>,
4605        expr: &Expr,
4606        variable: &str,
4607        prop_manager: &PropertyManager,
4608        params: &HashMap<String, Value>,
4609        ctx: Option<&QueryContext>,
4610    ) -> Result<Vec<HashMap<String, Value>>> {
4611        let mut results = Vec::new();
4612        for row in input_rows {
4613            let val = self
4614                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4615                .await?;
4616            if let Value::List(items) = val {
4617                for item in items {
4618                    let mut new_row = row.clone();
4619                    new_row.insert(variable.to_string(), item);
4620                    results.push(new_row);
4621                }
4622            }
4623        }
4624        Ok(results)
4625    }
4626
4627    /// Execute an APPLY (correlated subquery) operation.
4628    pub(crate) async fn execute_apply(
4629        &self,
4630        input_rows: Vec<HashMap<String, Value>>,
4631        subquery: &LogicalPlan,
4632        input_filter: Option<&Expr>,
4633        prop_manager: &PropertyManager,
4634        params: &HashMap<String, Value>,
4635        ctx: Option<&QueryContext>,
4636    ) -> Result<Vec<HashMap<String, Value>>> {
4637        let mut filtered_rows = input_rows;
4638
4639        if let Some(filter) = input_filter {
4640            let mut filtered = Vec::new();
4641            for row in filtered_rows {
4642                let res = self
4643                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4644                    .await?;
4645                if res.as_bool().unwrap_or(false) {
4646                    filtered.push(row);
4647                }
4648            }
4649            filtered_rows = filtered;
4650        }
4651
4652        // Handle empty input: execute subquery once with empty context
4653        // This is critical for standalone CALL statements at the beginning of a query
4654        if filtered_rows.is_empty() {
4655            let sub_rows = self
4656                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4657                .await?;
4658            return Ok(sub_rows);
4659        }
4660
4661        let mut results = Vec::new();
4662        for row in filtered_rows {
4663            let mut sub_params = params.clone();
4664            sub_params.extend(row.clone());
4665
4666            let sub_rows = self
4667                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4668                .await?;
4669
4670            for sub_row in sub_rows {
4671                let mut new_row = row.clone();
4672                new_row.extend(sub_row);
4673                results.push(new_row);
4674            }
4675        }
4676        Ok(results)
4677    }
4678
4679    /// Execute SHOW INDEXES command.
4680    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4681        let schema = self.storage.schema_manager().schema();
4682        let mut rows = Vec::new();
4683        for idx in &schema.indexes {
4684            let (name, type_str, details) = match idx {
4685                uni_common::core::schema::IndexDefinition::Vector(c) => (
4686                    c.name.clone(),
4687                    "VECTOR",
4688                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4689                ),
4690                uni_common::core::schema::IndexDefinition::FullText(c) => (
4691                    c.name.clone(),
4692                    "FULLTEXT",
4693                    format!("on {}:{:?}", c.label, c.properties),
4694                ),
4695                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4696                    cfg.name.clone(),
4697                    "SCALAR",
4698                    format!(":{}({:?})", cfg.label, cfg.properties),
4699                ),
4700                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4701            };
4702
4703            if let Some(f) = filter
4704                && f != type_str
4705            {
4706                continue;
4707            }
4708
4709            let mut row = HashMap::new();
4710            row.insert("name".to_string(), Value::String(name));
4711            row.insert("type".to_string(), Value::String(type_str.to_string()));
4712            row.insert("details".to_string(), Value::String(details));
4713            rows.push(row);
4714        }
4715        rows
4716    }
4717
4718    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4719        let mut row = HashMap::new();
4720        row.insert("name".to_string(), Value::String("uni".to_string()));
4721        // Could add storage path, etc.
4722        vec![row]
4723    }
4724
4725    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4726        // Placeholder as we don't easy access to config struct from here
4727        vec![]
4728    }
4729
4730    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4731        let snapshot = self
4732            .storage
4733            .snapshot_manager()
4734            .load_latest_snapshot()
4735            .await?;
4736        let mut results = Vec::new();
4737
4738        if let Some(snap) = snapshot {
4739            for (label, s) in &snap.vertices {
4740                let mut row = HashMap::new();
4741                row.insert("type".to_string(), Value::String("Label".to_string()));
4742                row.insert("name".to_string(), Value::String(label.clone()));
4743                row.insert("count".to_string(), Value::Int(s.count as i64));
4744                results.push(row);
4745            }
4746            for (edge, s) in &snap.edges {
4747                let mut row = HashMap::new();
4748                row.insert("type".to_string(), Value::String("Edge".to_string()));
4749                row.insert("name".to_string(), Value::String(edge.clone()));
4750                row.insert("count".to_string(), Value::Int(s.count as i64));
4751                results.push(row);
4752            }
4753        }
4754
4755        Ok(results)
4756    }
4757
4758    pub(crate) fn execute_show_constraints(
4759        &self,
4760        clause: ShowConstraints,
4761    ) -> Vec<HashMap<String, Value>> {
4762        let schema = self.storage.schema_manager().schema();
4763        let mut rows = Vec::new();
4764        for c in &schema.constraints {
4765            if let Some(target) = &clause.target {
4766                match (target, &c.target) {
4767                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4768                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4769                        if e1 == e2 => {}
4770                    _ => continue,
4771                }
4772            }
4773
4774            let mut row = HashMap::new();
4775            row.insert("name".to_string(), Value::String(c.name.clone()));
4776            let type_str = match c.constraint_type {
4777                ConstraintType::Unique { .. } => "UNIQUE",
4778                ConstraintType::Exists { .. } => "EXISTS",
4779                ConstraintType::Check { .. } => "CHECK",
4780                _ => "UNKNOWN",
4781            };
4782            row.insert("type".to_string(), Value::String(type_str.to_string()));
4783
4784            let target_str = match &c.target {
4785                ConstraintTarget::Label(l) => format!("(:{})", l),
4786                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4787                _ => "UNKNOWN".to_string(),
4788            };
4789            row.insert("target".to_string(), Value::String(target_str));
4790
4791            rows.push(row);
4792        }
4793        rows
4794    }
4795
4796    /// Execute a MERGE operation.
4797    pub(crate) async fn execute_cross_join(
4798        &self,
4799        left: Box<LogicalPlan>,
4800        right: Box<LogicalPlan>,
4801        prop_manager: &PropertyManager,
4802        params: &HashMap<String, Value>,
4803        ctx: Option<&QueryContext>,
4804    ) -> Result<Vec<HashMap<String, Value>>> {
4805        let left_rows = self
4806            .execute_subplan(*left, prop_manager, params, ctx)
4807            .await?;
4808        let right_rows = self
4809            .execute_subplan(*right, prop_manager, params, ctx)
4810            .await?;
4811
4812        let mut results = Vec::new();
4813        for l in &left_rows {
4814            for r in &right_rows {
4815                let mut combined = l.clone();
4816                combined.extend(r.clone());
4817                results.push(combined);
4818            }
4819        }
4820        Ok(results)
4821    }
4822
4823    /// Execute a UNION operation with optional deduplication.
4824    pub(crate) async fn execute_union(
4825        &self,
4826        left: Box<LogicalPlan>,
4827        right: Box<LogicalPlan>,
4828        all: bool,
4829        prop_manager: &PropertyManager,
4830        params: &HashMap<String, Value>,
4831        ctx: Option<&QueryContext>,
4832    ) -> Result<Vec<HashMap<String, Value>>> {
4833        let mut left_rows = self
4834            .execute_subplan(*left, prop_manager, params, ctx)
4835            .await?;
4836        let mut right_rows = self
4837            .execute_subplan(*right, prop_manager, params, ctx)
4838            .await?;
4839
4840        left_rows.append(&mut right_rows);
4841
4842        if !all {
4843            let mut seen = HashSet::new();
4844            left_rows.retain(|row| {
4845                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4846                let key = format!("{:?}", sorted_row);
4847                seen.insert(key)
4848            });
4849        }
4850        Ok(left_rows)
4851    }
4852
4853    /// Check if an index with the given name exists.
4854    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4855        let schema = self.storage.schema_manager().schema();
4856        schema.indexes.iter().any(|idx| match idx {
4857            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4858            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4859            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4860            _ => false,
4861        })
4862    }
4863
4864    pub(crate) async fn execute_export(
4865        &self,
4866        target: &str,
4867        source: &str,
4868        options: &HashMap<String, Value>,
4869        prop_manager: &PropertyManager,
4870        ctx: Option<&QueryContext>,
4871    ) -> Result<Vec<HashMap<String, Value>>> {
4872        let format = options
4873            .get("format")
4874            .and_then(|v| v.as_str())
4875            .unwrap_or("csv")
4876            .to_lowercase();
4877
4878        match format.as_str() {
4879            "csv" => {
4880                self.execute_csv_export(target, source, options, prop_manager, ctx)
4881                    .await
4882            }
4883            "parquet" => {
4884                self.execute_parquet_export(target, source, options, prop_manager, ctx)
4885                    .await
4886            }
4887            _ => Err(anyhow!("Unsupported export format: {}", format)),
4888        }
4889    }
4890
4891    pub(crate) async fn execute_csv_export(
4892        &self,
4893        target: &str,
4894        source: &str,
4895        options: &HashMap<String, Value>,
4896        prop_manager: &PropertyManager,
4897        ctx: Option<&QueryContext>,
4898    ) -> Result<Vec<HashMap<String, Value>>> {
4899        // Validate destination path against sandbox
4900        let validated_dest = self.validate_path(source)?;
4901
4902        let schema = self.storage.schema_manager().schema();
4903        let label_meta = schema.labels.get(target);
4904        let edge_meta = schema.edge_types.get(target);
4905
4906        if label_meta.is_none() && edge_meta.is_none() {
4907            return Err(anyhow!("Target '{}' not found in schema", target));
4908        }
4909
4910        let delimiter_str = options
4911            .get("delimiter")
4912            .and_then(|v| v.as_str())
4913            .unwrap_or(",");
4914        let delimiter = if delimiter_str.is_empty() {
4915            b','
4916        } else {
4917            delimiter_str.as_bytes()[0]
4918        };
4919        let has_header = options
4920            .get("header")
4921            .and_then(|v| v.as_bool())
4922            .unwrap_or(true);
4923
4924        let mut wtr = csv::WriterBuilder::new()
4925            .delimiter(delimiter)
4926            .from_path(&validated_dest)?;
4927
4928        let mut count = 0;
4929        // Empty properties map for labels/edge types without registered properties
4930        let empty_props = HashMap::new();
4931
4932        if let Some(meta) = label_meta {
4933            let label_id = meta.id;
4934            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4935            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4936            prop_names.sort();
4937
4938            let mut headers = vec!["_vid".to_string()];
4939            headers.extend(prop_names.clone());
4940
4941            if has_header {
4942                wtr.write_record(&headers)?;
4943            }
4944
4945            let vids = self
4946                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4947                .await?;
4948
4949            for vid in vids {
4950                let props = prop_manager
4951                    .get_all_vertex_props_with_ctx(vid, ctx)
4952                    .await?
4953                    .unwrap_or_default();
4954
4955                let mut row = Vec::with_capacity(headers.len());
4956                row.push(vid.to_string());
4957                for p_name in &prop_names {
4958                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4959                    row.push(self.format_csv_value(val));
4960                }
4961                wtr.write_record(&row)?;
4962                count += 1;
4963            }
4964        } else if let Some(meta) = edge_meta {
4965            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4966            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4967            prop_names.sort();
4968
4969            // Headers for Edge: _eid, _src, _dst, _type, ...props
4970            let mut headers = vec![
4971                "_eid".to_string(),
4972                "_src".to_string(),
4973                "_dst".to_string(),
4974                "_type".to_string(),
4975            ];
4976            headers.extend(prop_names.clone());
4977
4978            if has_header {
4979                wtr.write_record(&headers)?;
4980            }
4981
4982            let edges = self.scan_edge_type(target, ctx).await?;
4983
4984            for (eid, src, dst) in edges {
4985                let props = prop_manager
4986                    .get_all_edge_props_with_ctx(eid, ctx)
4987                    .await?
4988                    .unwrap_or_default();
4989
4990                let mut row = Vec::with_capacity(headers.len());
4991                row.push(eid.to_string());
4992                row.push(src.to_string());
4993                row.push(dst.to_string());
4994                row.push(meta.id.to_string());
4995
4996                for p_name in &prop_names {
4997                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4998                    row.push(self.format_csv_value(val));
4999                }
5000                wtr.write_record(&row)?;
5001                count += 1;
5002            }
5003        }
5004
5005        wtr.flush()?;
5006        let mut res = HashMap::new();
5007        res.insert("count".to_string(), Value::Int(count as i64));
5008        Ok(vec![res])
5009    }
5010
5011    /// Exports data to Parquet format.
5012    ///
5013    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
5014    pub(crate) async fn execute_parquet_export(
5015        &self,
5016        target: &str,
5017        destination: &str,
5018        _options: &HashMap<String, Value>,
5019        prop_manager: &PropertyManager,
5020        ctx: Option<&QueryContext>,
5021    ) -> Result<Vec<HashMap<String, Value>>> {
5022        let schema_manager = self.storage.schema_manager();
5023        let schema = schema_manager.schema();
5024        let label_meta = schema.labels.get(target);
5025        let edge_meta = schema.edge_types.get(target);
5026
5027        if label_meta.is_none() && edge_meta.is_none() {
5028            return Err(anyhow!("Target '{}' not found in schema", target));
5029        }
5030
5031        let arrow_schema = if label_meta.is_some() {
5032            let dataset = self.storage.vertex_dataset(target)?;
5033            dataset.get_arrow_schema(&schema)?
5034        } else {
5035            // Edge Schema
5036            let dataset = self.storage.edge_dataset(target, "", "")?;
5037            dataset.get_arrow_schema(&schema)?
5038        };
5039
5040        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5041
5042        if let Some(meta) = label_meta {
5043            let label_id = meta.id;
5044            let vids = self
5045                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5046                .await?;
5047
5048            for vid in vids {
5049                let mut props = prop_manager
5050                    .get_all_vertex_props_with_ctx(vid, ctx)
5051                    .await?
5052                    .unwrap_or_default();
5053
5054                props.insert(
5055                    "_vid".to_string(),
5056                    uni_common::Value::Int(vid.as_u64() as i64),
5057                );
5058                if !props.contains_key("_uid") {
5059                    props.insert(
5060                        "_uid".to_string(),
5061                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5062                    );
5063                }
5064                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5065                props.insert("_version".to_string(), uni_common::Value::Int(1));
5066                rows.push(props);
5067            }
5068        } else if edge_meta.is_some() {
5069            let edges = self.scan_edge_type(target, ctx).await?;
5070            for (eid, src, dst) in edges {
5071                let mut props = prop_manager
5072                    .get_all_edge_props_with_ctx(eid, ctx)
5073                    .await?
5074                    .unwrap_or_default();
5075
5076                props.insert(
5077                    "eid".to_string(),
5078                    uni_common::Value::Int(eid.as_u64() as i64),
5079                );
5080                props.insert(
5081                    "src_vid".to_string(),
5082                    uni_common::Value::Int(src.as_u64() as i64),
5083                );
5084                props.insert(
5085                    "dst_vid".to_string(),
5086                    uni_common::Value::Int(dst.as_u64() as i64),
5087                );
5088                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5089                props.insert("_version".to_string(), uni_common::Value::Int(1));
5090                rows.push(props);
5091            }
5092        }
5093
5094        // Write to cloud or local file
5095        if is_cloud_url(destination) {
5096            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5097                .await?;
5098        } else {
5099            // Validate local destination path against sandbox
5100            let validated_dest = self.validate_path(destination)?;
5101            let file = std::fs::File::create(&validated_dest)?;
5102            let mut writer =
5103                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5104
5105            // Write all in one batch for now (simplification)
5106            if !rows.is_empty() {
5107                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5108                writer.write(&batch)?;
5109            }
5110
5111            writer.close()?;
5112        }
5113
5114        let mut res = HashMap::new();
5115        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5116        Ok(vec![res])
5117    }
5118
5119    /// Writes Parquet data to a cloud storage destination.
5120    async fn write_parquet_to_cloud(
5121        &self,
5122        dest_url: &str,
5123        rows: &[HashMap<String, uni_common::Value>],
5124        arrow_schema: &arrow_schema::Schema,
5125    ) -> Result<()> {
5126        use object_store::ObjectStore;
5127
5128        let (store, path) = build_store_from_url(dest_url)?;
5129
5130        // Write to an in-memory buffer
5131        let mut buffer = Vec::new();
5132        {
5133            let mut writer = parquet::arrow::ArrowWriter::try_new(
5134                &mut buffer,
5135                Arc::new(arrow_schema.clone()),
5136                None,
5137            )?;
5138
5139            if !rows.is_empty() {
5140                let batch = self.rows_to_batch(rows, arrow_schema)?;
5141                writer.write(&batch)?;
5142            }
5143
5144            writer.close()?;
5145        }
5146
5147        // Upload to cloud storage
5148        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5149
5150        Ok(())
5151    }
5152
5153    pub(crate) fn rows_to_batch(
5154        &self,
5155        rows: &[HashMap<String, uni_common::Value>],
5156        schema: &arrow_schema::Schema,
5157    ) -> Result<RecordBatch> {
5158        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5159
5160        for field in schema.fields() {
5161            let name = field.name();
5162            let dt = field.data_type();
5163
5164            let values: Vec<uni_common::Value> = rows
5165                .iter()
5166                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5167                .collect();
5168            let array = self.values_to_array(&values, dt)?;
5169            columns.push(array);
5170        }
5171
5172        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5173    }
5174
5175    /// Convert a slice of Values to an Arrow array.
5176    /// Delegates to the shared implementation in arrow_convert module.
5177    pub(crate) fn values_to_array(
5178        &self,
5179        values: &[uni_common::Value],
5180        dt: &arrow_schema::DataType,
5181    ) -> Result<Arc<dyn Array>> {
5182        arrow_convert::values_to_array(values, dt)
5183    }
5184
5185    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5186        match val {
5187            Value::Null => "".to_string(),
5188            Value::String(s) => s,
5189            Value::Int(i) => i.to_string(),
5190            Value::Float(f) => f.to_string(),
5191            Value::Bool(b) => b.to_string(),
5192            _ => format!("{}", val),
5193        }
5194    }
5195
5196    pub(crate) fn parse_csv_value(
5197        &self,
5198        s: &str,
5199        data_type: &uni_common::core::schema::DataType,
5200        prop_name: &str,
5201    ) -> Result<Value> {
5202        if s.is_empty() || s.to_lowercase() == "null" {
5203            return Ok(Value::Null);
5204        }
5205
5206        use uni_common::core::schema::DataType;
5207        match data_type {
5208            DataType::String => Ok(Value::String(s.to_string())),
5209            DataType::Int32 | DataType::Int64 => {
5210                let i = s.parse::<i64>().map_err(|_| {
5211                    anyhow!(
5212                        "Failed to parse integer for property '{}': {}",
5213                        prop_name,
5214                        s
5215                    )
5216                })?;
5217                Ok(Value::Int(i))
5218            }
5219            DataType::Float32 | DataType::Float64 => {
5220                let f = s.parse::<f64>().map_err(|_| {
5221                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5222                })?;
5223                Ok(Value::Float(f))
5224            }
5225            DataType::Bool => {
5226                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5227                    anyhow!(
5228                        "Failed to parse boolean for property '{}': {}",
5229                        prop_name,
5230                        s
5231                    )
5232                })?;
5233                Ok(Value::Bool(b))
5234            }
5235            DataType::CypherValue => {
5236                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5237                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5238                })?;
5239                Ok(Value::from(json_val))
5240            }
5241            DataType::Vector { .. } => {
5242                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5243                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5244                })?;
5245                Ok(Value::Vector(v))
5246            }
5247            _ => Ok(Value::String(s.to_string())),
5248        }
5249    }
5250
5251    pub(crate) async fn detach_delete_vertex(&self, vid: Vid, writer: &mut Writer) -> Result<()> {
5252        let schema = self.storage.schema_manager().schema();
5253        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5254
5255        // 1. Find and delete all outgoing edges
5256        let out_graph = self
5257            .storage
5258            .load_subgraph_cached(
5259                &[vid],
5260                &edge_type_ids,
5261                1,
5262                uni_store::runtime::Direction::Outgoing,
5263                Some(writer.l0_manager.get_current()),
5264            )
5265            .await?;
5266
5267        for edge in out_graph.edges() {
5268            writer
5269                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5270                .await?;
5271        }
5272
5273        // 2. Find and delete all incoming edges
5274        let in_graph = self
5275            .storage
5276            .load_subgraph_cached(
5277                &[vid],
5278                &edge_type_ids,
5279                1,
5280                uni_store::runtime::Direction::Incoming,
5281                Some(writer.l0_manager.get_current()),
5282            )
5283            .await?;
5284
5285        for edge in in_graph.edges() {
5286            writer
5287                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5288                .await?;
5289        }
5290
5291        Ok(())
5292    }
5293
5294    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5295    pub(crate) async fn batch_detach_delete_vertices(
5296        &self,
5297        vids: &[Vid],
5298        labels_per_vid: Vec<Option<Vec<String>>>,
5299        writer: &mut Writer,
5300    ) -> Result<()> {
5301        let schema = self.storage.schema_manager().schema();
5302        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5303
5304        // Load outgoing subgraph for all VIDs in one call.
5305        let out_graph = self
5306            .storage
5307            .load_subgraph_cached(
5308                vids,
5309                &edge_type_ids,
5310                1,
5311                uni_store::runtime::Direction::Outgoing,
5312                Some(writer.l0_manager.get_current()),
5313            )
5314            .await?;
5315
5316        for edge in out_graph.edges() {
5317            writer
5318                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5319                .await?;
5320        }
5321
5322        // Load incoming subgraph for all VIDs in one call.
5323        let in_graph = self
5324            .storage
5325            .load_subgraph_cached(
5326                vids,
5327                &edge_type_ids,
5328                1,
5329                uni_store::runtime::Direction::Incoming,
5330                Some(writer.l0_manager.get_current()),
5331            )
5332            .await?;
5333
5334        for edge in in_graph.edges() {
5335            writer
5336                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5337                .await?;
5338        }
5339
5340        // Delete all vertices.
5341        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5342            writer.delete_vertex(*vid, labels).await?;
5343        }
5344
5345        Ok(())
5346    }
5347}