Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54use uni_store::storage::index_manager::IndexManager;
55
56// DataFusion engine imports
57use crate::query::df_graph::L0Context;
58use crate::query::df_planner::HybridPhysicalPlanner;
59use datafusion::physical_plan::ExecutionPlanProperties;
60use datafusion::prelude::SessionContext;
61use parking_lot::RwLock as SyncRwLock;
62
63use arrow_array::{Array, RecordBatch};
64use csv;
65use parquet;
66
67use super::core::*;
68
69/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
70const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
71/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
72const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
73
74/// Collect VIDs from all L0 buffers visible to a query context.
75///
76/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
77/// collects the results. Returns an empty vec when no query context is present.
78fn collect_l0_vids(
79    ctx: Option<&QueryContext>,
80    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
81) -> Vec<Vid> {
82    let mut vids = Vec::new();
83    if let Some(ctx) = ctx {
84        vids.extend(extractor(&ctx.l0.read()));
85        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
86            vids.extend(extractor(&tx_l0_arc.read()));
87        }
88        for pending_l0_arc in &ctx.pending_flush_l0s {
89            vids.extend(extractor(&pending_l0_arc.read()));
90        }
91    }
92    vids
93}
94
95/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
96///
97/// This is the fallback for pushdown hydration - if the entity only has system fields
98/// (indicating pushdown didn't load properties), we load all properties here.
99///
100/// System field counts:
101/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
102/// - Vertex: 3 fields (_vid, _label, _uid)
103async fn hydrate_entity_if_needed(
104    map: &mut HashMap<String, Value>,
105    prop_manager: &PropertyManager,
106    ctx: Option<&QueryContext>,
107) {
108    // Check for edge entity
109    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
110        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
111            tracing::debug!(
112                "Pushdown fallback: hydrating edge {} at execution time",
113                eid_u64
114            );
115            if let Ok(Some(props)) = prop_manager
116                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
117                .await
118            {
119                for (key, value) in props {
120                    map.entry(key).or_insert(value);
121                }
122            }
123        } else {
124            tracing::trace!(
125                "Pushdown success: edge {} already has {} properties",
126                eid_u64,
127                map.len() - EDGE_SYSTEM_FIELD_COUNT
128            );
129        }
130        return;
131    }
132
133    // Check for vertex entity
134    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
135        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
136            tracing::debug!(
137                "Pushdown fallback: hydrating vertex {} at execution time",
138                vid_u64
139            );
140            if let Ok(Some(props)) = prop_manager
141                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
142                .await
143            {
144                for (key, value) in props {
145                    map.entry(key).or_insert(value);
146                }
147            }
148        } else {
149            tracing::trace!(
150                "Pushdown success: vertex {} already has {} properties",
151                vid_u64,
152                map.len() - VERTEX_SYSTEM_FIELD_COUNT
153            );
154        }
155    }
156}
157
158impl Executor {
159    /// Helper to verify and filter candidates against an optional predicate.
160    ///
161    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
162    /// Returns only VIDs that pass the filter (or are not deleted).
163    async fn verify_and_filter_candidates(
164        &self,
165        mut candidates: Vec<Vid>,
166        variable: &str,
167        filter: Option<&Expr>,
168        ctx: Option<&QueryContext>,
169        prop_manager: &PropertyManager,
170        params: &HashMap<String, Value>,
171    ) -> Result<Vec<Vid>> {
172        candidates.sort_unstable();
173        candidates.dedup();
174
175        let mut verified_vids = Vec::new();
176        for vid in candidates {
177            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
178                continue; // Deleted
179            };
180
181            if let Some(expr) = filter {
182                let mut props_map: HashMap<String, Value> = props;
183                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
184
185                let mut row = HashMap::new();
186                row.insert(variable.to_string(), Value::Map(props_map));
187
188                let res = self
189                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
190                    .await?;
191                if res.as_bool().unwrap_or(false) {
192                    verified_vids.push(vid);
193                }
194            } else {
195                verified_vids.push(vid);
196            }
197        }
198
199        Ok(verified_vids)
200    }
201
202    pub(crate) async fn scan_storage_candidates(
203        &self,
204        label_id: u16,
205        variable: &str,
206        filter: Option<&Expr>,
207    ) -> Result<Vec<Vid>> {
208        let schema = self.storage.schema_manager().schema();
209        let label_name = schema
210            .label_name_by_id(label_id)
211            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
212
213        let ds = self.storage.vertex_dataset(label_name)?;
214        let lancedb_store = self.storage.lancedb_store();
215
216        // Try LanceDB first (canonical storage)
217        match ds.open_lancedb(lancedb_store).await {
218            Ok(table) => {
219                use arrow_array::UInt64Array;
220                use futures::TryStreamExt;
221                use lancedb::query::{ExecutableQuery, QueryBase, Select};
222
223                let mut query = table.query();
224
225                // Apply filter if provided, with schema awareness
226                // to skip overflow properties that aren't physical Lance columns.
227                // For labels with no registered properties (schemaless), use an empty
228                // map so all non-system properties are recognized as overflow.
229                let empty_props = std::collections::HashMap::new();
230                let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
231                if let Some(expr) = filter
232                    && let Some(sql) = LanceFilterGenerator::generate(
233                        std::slice::from_ref(expr),
234                        variable,
235                        Some(label_props),
236                    )
237                {
238                    query = query.only_if(format!("_deleted = false AND ({})", sql));
239                } else {
240                    query = query.only_if("_deleted = false");
241                }
242
243                // Project to only _vid
244                let query = query.select(Select::columns(&["_vid"]));
245                let stream = query.execute().await?;
246                let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
247
248                let mut vids = Vec::new();
249                for batch in batches {
250                    let vid_col = batch
251                        .column_by_name("_vid")
252                        .ok_or(anyhow!("Missing _vid"))?
253                        .as_any()
254                        .downcast_ref::<UInt64Array>()
255                        .ok_or(anyhow!("Invalid _vid"))?;
256                    for i in 0..batch.num_rows() {
257                        vids.push(Vid::from(vid_col.value(i)));
258                    }
259                }
260                Ok(vids)
261            }
262            Err(e) => {
263                // Only treat "not found" / "does not exist" errors as empty results.
264                // Propagate all other errors (network, auth, corruption, etc.)
265                let err_msg = e.to_string().to_lowercase();
266                if err_msg.contains("not found")
267                    || err_msg.contains("does not exist")
268                    || err_msg.contains("no such file")
269                    || err_msg.contains("object not found")
270                {
271                    Ok(Vec::new())
272                } else {
273                    Err(e)
274                }
275            }
276        }
277    }
278
279    pub(crate) async fn scan_label_with_filter(
280        &self,
281        label_id: u16,
282        variable: &str,
283        filter: Option<&Expr>,
284        ctx: Option<&QueryContext>,
285        prop_manager: &PropertyManager,
286        params: &HashMap<String, Value>,
287    ) -> Result<Vec<Vid>> {
288        let mut candidates = self
289            .scan_storage_candidates(label_id, variable, filter)
290            .await?;
291
292        // Convert label_id to label_name for L0 lookup
293        let schema = self.storage.schema_manager().schema();
294        if let Some(label_name) = schema.label_name_by_id(label_id) {
295            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
296        }
297
298        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
299            .await
300    }
301
302    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
303        // Handle Value::Node directly (has vid field)
304        if let Value::Node(node) = val {
305            return Ok(node.vid);
306        }
307        // Handle Object (node) containing _vid field
308        if let Value::Map(map) = val
309            && let Some(vid_val) = map.get("_vid")
310            && let Some(v) = vid_val.as_u64()
311        {
312            return Ok(Vid::from(v));
313        }
314        // Handle string format
315        if let Some(s) = val.as_str()
316            && let Ok(id) = s.parse::<u64>()
317        {
318            return Ok(Vid::new(id));
319        }
320        // Handle raw u64
321        if let Some(v) = val.as_u64() {
322            return Ok(Vid::from(v));
323        }
324        Err(anyhow!("Invalid Vid format: {:?}", val))
325    }
326
327    /// Find a node value in the row by VID.
328    ///
329    /// Scans all values in the row, looking for a node (Map or Node) whose VID
330    /// matches the target. Returns the full node value if found, or a minimal
331    /// Map with just `_vid` as fallback.
332    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
333        for val in row.values() {
334            if let Ok(vid) = Self::vid_from_value(val)
335                && vid == target_vid
336            {
337                return val.clone();
338            }
339        }
340        // Fallback: return minimal node map
341        Value::Map(HashMap::from([(
342            "_vid".to_string(),
343            Value::Int(target_vid.as_u64() as i64),
344        )]))
345    }
346
347    /// Create L0 context, session, and planner for DataFusion execution.
348    ///
349    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
350    /// Returns `(session_ctx, planner, prop_manager_arc)`.
351    pub async fn create_datafusion_planner(
352        &self,
353        prop_manager: &PropertyManager,
354        params: &HashMap<String, Value>,
355    ) -> Result<(
356        Arc<SyncRwLock<SessionContext>>,
357        HybridPhysicalPlanner,
358        Arc<PropertyManager>,
359    )> {
360        let query_ctx = self.get_context().await;
361        let l0_context = match query_ctx {
362            Some(ref ctx) => L0Context::from_query_context(ctx),
363            None => L0Context::empty(),
364        };
365
366        let prop_manager_arc = Arc::new(PropertyManager::new(
367            self.storage.clone(),
368            self.storage.schema_manager_arc(),
369            prop_manager.cache_size(),
370        ));
371
372        let session = SessionContext::new();
373        crate::query::df_udfs::register_cypher_udfs(&session)?;
374        let session_ctx = Arc::new(SyncRwLock::new(session));
375
376        let mut planner = HybridPhysicalPlanner::with_l0_context(
377            session_ctx.clone(),
378            self.storage.clone(),
379            l0_context,
380            prop_manager_arc.clone(),
381            self.storage.schema_manager().schema(),
382            params.clone(),
383            HashMap::new(),
384        );
385
386        planner = planner.with_algo_registry(self.algo_registry.clone());
387        if let Some(ref registry) = self.procedure_registry {
388            planner = planner.with_procedure_registry(registry.clone());
389        }
390        if let Some(ref xervo_runtime) = self.xervo_runtime {
391            planner = planner.with_xervo_runtime(xervo_runtime.clone());
392        }
393
394        Ok((session_ctx, planner, prop_manager_arc))
395    }
396
397    /// Execute a DataFusion physical plan and collect all result batches.
398    pub fn collect_batches(
399        session_ctx: &Arc<SyncRwLock<SessionContext>>,
400        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
401    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
402        Box::pin(async move {
403            use futures::TryStreamExt;
404
405            let task_ctx = session_ctx.read().task_ctx();
406            let partition_count = execution_plan.output_partitioning().partition_count();
407            let mut all_batches = Vec::new();
408            for partition in 0..partition_count {
409                let stream = execution_plan.execute(partition, task_ctx.clone())?;
410                let batches: Vec<RecordBatch> = stream.try_collect().await?;
411                all_batches.extend(batches);
412            }
413            Ok(all_batches)
414        })
415    }
416
417    /// Executes a query using the DataFusion-based engine.
418    ///
419    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
420    /// trees with custom graph operators for graph-specific operations.
421    pub async fn execute_datafusion(
422        &self,
423        plan: LogicalPlan,
424        prop_manager: &PropertyManager,
425        params: &HashMap<String, Value>,
426    ) -> Result<Vec<RecordBatch>> {
427        let (session_ctx, mut planner, prop_manager_arc) =
428            self.create_datafusion_planner(prop_manager, params).await?;
429
430        // Build MutationContext when the plan contains write operations
431        if Self::contains_write_operations(&plan) {
432            let writer = self
433                .writer
434                .as_ref()
435                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
436                .clone();
437            let query_ctx = self.get_context().await;
438
439            debug_assert!(
440                query_ctx.is_some(),
441                "BUG: query_ctx is None for write operation"
442            );
443
444            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
445                executor: self.clone(),
446                writer,
447                prop_manager: prop_manager_arc,
448                params: params.clone(),
449                query_ctx,
450            });
451            planner = planner.with_mutation_context(mutation_ctx);
452            tracing::debug!(
453                plan_type = Self::get_plan_type(&plan),
454                "Mutation routed to DataFusion engine"
455            );
456        }
457
458        let execution_plan = planner.plan(&plan)?;
459        Self::collect_batches(&session_ctx, execution_plan).await
460    }
461
462    /// Execute a MERGE read sub-plan through the DataFusion engine.
463    ///
464    /// Plans and executes the MERGE pattern match using flat columnar output
465    /// (no structural projections), then groups dotted columns (`a._vid`,
466    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
467    pub(crate) async fn execute_merge_read_plan(
468        &self,
469        plan: LogicalPlan,
470        prop_manager: &PropertyManager,
471        params: &HashMap<String, Value>,
472        merge_variables: Vec<String>,
473    ) -> Result<Vec<HashMap<String, Value>>> {
474        let (session_ctx, planner, _prop_manager_arc) =
475            self.create_datafusion_planner(prop_manager, params).await?;
476
477        // Plan with full property access ("*") for all merge variables so that
478        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
479        let extra: HashMap<String, HashSet<String>> = merge_variables
480            .iter()
481            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
482            .collect();
483        let execution_plan = planner.plan_with_properties(&plan, extra)?;
484        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
485
486        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
487        let flat_rows = self.record_batches_to_rows(all_batches)?;
488
489        // Group dotted columns into per-variable Maps for MERGE's match logic.
490        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
491        let rows = flat_rows
492            .into_iter()
493            .map(|mut row| {
494                for var in &merge_variables {
495                    // Skip if already materialized (e.g., by record_batches_to_rows)
496                    if row.contains_key(var) {
497                        continue;
498                    }
499                    let prefix = format!("{}.", var);
500                    let dotted_keys: Vec<String> = row
501                        .keys()
502                        .filter(|k| k.starts_with(&prefix))
503                        .cloned()
504                        .collect();
505                    if !dotted_keys.is_empty() {
506                        let mut map = HashMap::new();
507                        for key in dotted_keys {
508                            let prop_name = key[prefix.len()..].to_string();
509                            if let Some(val) = row.remove(&key) {
510                                map.insert(prop_name, val);
511                            }
512                        }
513                        row.insert(var.clone(), Value::Map(map));
514                    }
515                }
516                row
517            })
518            .collect();
519
520        Ok(rows)
521    }
522
523    /// Converts DataFusion RecordBatches to row-based HashMap format.
524    ///
525    /// Handles special metadata on fields:
526    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
527    ///
528    /// Also normalizes path structures to user-facing format (converts _vid to _id).
529    fn record_batches_to_rows(
530        &self,
531        batches: Vec<RecordBatch>,
532    ) -> Result<Vec<HashMap<String, Value>>> {
533        let mut rows = Vec::new();
534
535        for batch in batches {
536            let num_rows = batch.num_rows();
537            let schema = batch.schema();
538
539            for row_idx in 0..num_rows {
540                let mut row = HashMap::new();
541
542                for (col_idx, field) in schema.fields().iter().enumerate() {
543                    let column = batch.column(col_idx);
544                    // Infer Uni DataType from Arrow type for DateTime/Time struct decoding
545                    let data_type =
546                        if uni_common::core::schema::is_datetime_struct(field.data_type()) {
547                            Some(&uni_common::DataType::DateTime)
548                        } else if uni_common::core::schema::is_time_struct(field.data_type()) {
549                            Some(&uni_common::DataType::Time)
550                        } else {
551                            None
552                        };
553                    let mut value =
554                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
555
556                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
557                    // Parse JSON string to restore the original type
558                    if field
559                        .metadata()
560                        .get("cv_encoded")
561                        .is_some_and(|v| v == "true")
562                        && let Value::String(s) = &value
563                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
564                    {
565                        value = Value::from(parsed);
566                    }
567
568                    // Normalize path structures to user-facing format
569                    value = Self::normalize_path_if_needed(value);
570
571                    row.insert(field.name().clone(), value);
572                }
573
574                // Merge system fields into bare variable maps.
575                // The projection step emits helper columns like "n._vid" and "n._labels"
576                // alongside the materialized "n" column (a Map of user properties).
577                // Here we merge those system fields into the map and remove the helpers.
578                //
579                // For search procedures (vector/FTS), the bare variable may be a VID
580                // string placeholder rather than a Map. In that case, promote it to a
581                // Map so we can merge system fields and properties into it.
582                let bare_vars: Vec<String> = row
583                    .keys()
584                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
585                    .cloned()
586                    .collect();
587
588                // Detect VID-placeholder variables that have system columns
589                // (e.g., "node" is String("1") but "node._vid" exists).
590                // Promote these to Maps so system fields can be merged in.
591                let vid_placeholder_vars: Vec<String> = row
592                    .keys()
593                    .filter(|k| {
594                        !k.contains('.')
595                            && matches!(row.get(*k), Some(Value::String(_)))
596                            && row.contains_key(&format!("{}._vid", k))
597                    })
598                    .cloned()
599                    .collect();
600
601                for var in &vid_placeholder_vars {
602                    // Build a Map from system and property columns
603                    let prefix = format!("{}.", var);
604                    let mut map = HashMap::new();
605
606                    let dotted_keys: Vec<String> = row
607                        .keys()
608                        .filter(|k| k.starts_with(&prefix))
609                        .cloned()
610                        .collect();
611
612                    for key in &dotted_keys {
613                        let prop_name = &key[prefix.len()..];
614                        if let Some(val) = row.remove(key) {
615                            map.insert(prop_name.to_string(), val);
616                        }
617                    }
618
619                    // Replace the VID-string placeholder with the constructed Map
620                    row.insert(var.clone(), Value::Map(map));
621                }
622
623                for var in &bare_vars {
624                    // Merge node system fields (_vid, _labels)
625                    let vid_key = format!("{}._vid", var);
626                    let labels_key = format!("{}._labels", var);
627
628                    let vid_val = row.remove(&vid_key);
629                    let labels_val = row.remove(&labels_key);
630
631                    if let Some(Value::Map(map)) = row.get_mut(var) {
632                        if let Some(v) = vid_val {
633                            map.insert("_vid".to_string(), v);
634                        }
635                        if let Some(v) = labels_val {
636                            map.insert("_labels".to_string(), v);
637                        }
638                    }
639
640                    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
641                    // These are emitted as helper columns by the traverse exec.
642                    // The structural projection already includes them in the struct,
643                    // but we still need to remove the dotted helper columns.
644                    let eid_key = format!("{}._eid", var);
645                    let type_key = format!("{}._type", var);
646
647                    let eid_val = row.remove(&eid_key);
648                    let type_val = row.remove(&type_key);
649
650                    if (eid_val.is_some() || type_val.is_some())
651                        && let Some(Value::Map(map)) = row.get_mut(var)
652                    {
653                        if let Some(v) = eid_val {
654                            map.entry("_eid".to_string()).or_insert(v);
655                        }
656                        if let Some(v) = type_val {
657                            map.entry("_type".to_string()).or_insert(v);
658                        }
659                    }
660
661                    // Remove remaining dotted helper columns (e.g. _all_props, _src_vid, _dst_vid)
662                    let prefix = format!("{}.", var);
663                    let helper_keys: Vec<String> = row
664                        .keys()
665                        .filter(|k| k.starts_with(&prefix))
666                        .cloned()
667                        .collect();
668                    for key in helper_keys {
669                        row.remove(&key);
670                    }
671                }
672
673                rows.push(row);
674            }
675        }
676
677        Ok(rows)
678    }
679
680    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
681    ///
682    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
683    /// Other values are returned unchanged to avoid interfering with query execution.
684    fn normalize_path_if_needed(value: Value) -> Value {
685        match value {
686            Value::Map(map)
687                if map.contains_key("nodes")
688                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
689            {
690                Self::normalize_path_map(map)
691            }
692            other => other,
693        }
694    }
695
696    /// Normalize a path map object.
697    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
698        // Normalize nodes array
699        if let Some(Value::List(nodes)) = map.remove("nodes") {
700            let normalized_nodes: Vec<Value> = nodes
701                .into_iter()
702                .map(|n| {
703                    if let Value::Map(node_map) = n {
704                        Self::normalize_path_node_map(node_map)
705                    } else {
706                        n
707                    }
708                })
709                .collect();
710            map.insert("nodes".to_string(), Value::List(normalized_nodes));
711        }
712
713        // Normalize relationships array (may be called "relationships" or "edges")
714        let rels_key = if map.contains_key("relationships") {
715            "relationships"
716        } else {
717            "edges"
718        };
719        if let Some(Value::List(rels)) = map.remove(rels_key) {
720            let normalized_rels: Vec<Value> = rels
721                .into_iter()
722                .map(|r| {
723                    if let Value::Map(rel_map) = r {
724                        Self::normalize_path_edge_map(rel_map)
725                    } else {
726                        r
727                    }
728                })
729                .collect();
730            map.insert("relationships".to_string(), Value::List(normalized_rels));
731        }
732
733        Value::Map(map)
734    }
735
736    /// Convert a Value to its string representation for path normalization.
737    fn value_to_id_string(val: Value) -> String {
738        match val {
739            Value::Int(n) => n.to_string(),
740            Value::Float(n) => n.to_string(),
741            Value::String(s) => s,
742            other => other.to_string(),
743        }
744    }
745
746    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
747    /// When `src_key == dst_key`, this simply stringifies the value in place.
748    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
749        if let Some(val) = map.remove(src_key) {
750            map.insert(
751                dst_key.to_string(),
752                Value::String(Self::value_to_id_string(val)),
753            );
754        }
755    }
756
757    /// Ensure the "properties" field is a non-null map.
758    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
759        match map.get("properties") {
760            Some(props) if !props.is_null() => {}
761            _ => {
762                map.insert("properties".to_string(), Value::Map(HashMap::new()));
763            }
764        }
765    }
766
767    /// Normalize a node within a path to user-facing format.
768    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
769        Self::stringify_map_field(&mut map, "_vid", "_id");
770        Self::ensure_properties_map(&mut map);
771        Value::Map(map)
772    }
773
774    /// Normalize an edge within a path to user-facing format.
775    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
776        Self::stringify_map_field(&mut map, "_eid", "_id");
777        Self::stringify_map_field(&mut map, "_src", "_src");
778        Self::stringify_map_field(&mut map, "_dst", "_dst");
779
780        if let Some(type_name) = map.remove("_type_name") {
781            map.insert("_type".to_string(), type_name);
782        }
783
784        Self::ensure_properties_map(&mut map);
785        Value::Map(map)
786    }
787
788    #[instrument(
789        skip(self, prop_manager, params),
790        fields(rows_returned, duration_ms),
791        level = "info"
792    )]
793    pub fn execute<'a>(
794        &'a self,
795        plan: LogicalPlan,
796        prop_manager: &'a PropertyManager,
797        params: &'a HashMap<String, Value>,
798    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
799        Box::pin(async move {
800            let query_type = Self::get_plan_type(&plan);
801            let ctx = self.get_context().await;
802            let start = Instant::now();
803
804            // Route DDL/Admin queries to the fallback executor.
805            // All other queries (including similar_to) flow through DataFusion.
806            let res = if Self::is_ddl_or_admin(&plan) {
807                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
808                    .await
809            } else {
810                let batches = self
811                    .execute_datafusion(plan.clone(), prop_manager, params)
812                    .await?;
813                self.record_batches_to_rows(batches)
814            };
815
816            let duration = start.elapsed();
817            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
818                .record(duration.as_secs_f64());
819
820            tracing::Span::current().record("duration_ms", duration.as_millis());
821            match &res {
822                Ok(rows) => {
823                    tracing::Span::current().record("rows_returned", rows.len());
824                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
825                        .increment(rows.len() as u64);
826                }
827                Err(e) => {
828                    let error_type = if e.to_string().contains("timed out") {
829                        "timeout"
830                    } else if e.to_string().contains("syntax") {
831                        "syntax"
832                    } else {
833                        "execution"
834                    };
835                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
836                }
837            }
838
839            res
840        })
841    }
842
843    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
844        match plan {
845            LogicalPlan::Scan { .. } => "read_scan",
846            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
847            LogicalPlan::Traverse { .. } => "read_traverse",
848            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
849            LogicalPlan::ScanAll { .. } => "read_scan_all",
850            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
851            LogicalPlan::VectorKnn { .. } => "read_vector",
852            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
853            LogicalPlan::Merge { .. } => "write_merge",
854            LogicalPlan::Delete { .. } => "write_delete",
855            LogicalPlan::Set { .. } => "write_set",
856            LogicalPlan::Remove { .. } => "write_remove",
857            LogicalPlan::ProcedureCall { .. } => "call",
858            LogicalPlan::Copy { .. } => "copy",
859            LogicalPlan::Backup { .. } => "backup",
860            _ => "other",
861        }
862    }
863
864    /// Return all direct child plan references from a `LogicalPlan`.
865    ///
866    /// This centralizes the variant→children mapping so that recursive walkers
867    /// (e.g., `contains_write_operations`) can delegate the
868    /// "recurse into children" logic instead of duplicating the match arms.
869    ///
870    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
871    /// is not included because it requires special iteration. Callers that
872    /// need to inspect the body should handle `Foreach` before falling through.
873    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
874        match plan {
875            // Single-input wrappers
876            LogicalPlan::Project { input, .. }
877            | LogicalPlan::Sort { input, .. }
878            | LogicalPlan::Limit { input, .. }
879            | LogicalPlan::Distinct { input }
880            | LogicalPlan::Aggregate { input, .. }
881            | LogicalPlan::Window { input, .. }
882            | LogicalPlan::Unwind { input, .. }
883            | LogicalPlan::Filter { input, .. }
884            | LogicalPlan::Create { input, .. }
885            | LogicalPlan::CreateBatch { input, .. }
886            | LogicalPlan::Set { input, .. }
887            | LogicalPlan::Remove { input, .. }
888            | LogicalPlan::Delete { input, .. }
889            | LogicalPlan::Merge { input, .. }
890            | LogicalPlan::Foreach { input, .. }
891            | LogicalPlan::Traverse { input, .. }
892            | LogicalPlan::TraverseMainByType { input, .. }
893            | LogicalPlan::BindZeroLengthPath { input, .. }
894            | LogicalPlan::BindPath { input, .. }
895            | LogicalPlan::ShortestPath { input, .. }
896            | LogicalPlan::AllShortestPaths { input, .. }
897            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
898
899            // Two-input wrappers
900            LogicalPlan::Apply {
901                input, subquery, ..
902            }
903            | LogicalPlan::SubqueryCall { input, subquery } => {
904                vec![input.as_ref(), subquery.as_ref()]
905            }
906            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
907                vec![left.as_ref(), right.as_ref()]
908            }
909            LogicalPlan::RecursiveCTE {
910                initial, recursive, ..
911            } => vec![initial.as_ref(), recursive.as_ref()],
912            LogicalPlan::QuantifiedPattern {
913                input,
914                pattern_plan,
915                ..
916            } => vec![input.as_ref(), pattern_plan.as_ref()],
917
918            // Leaf nodes (scans, DDL, admin, etc.)
919            _ => vec![],
920        }
921    }
922
923    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
924    ///
925    /// These operations don't produce data streams and aren't supported by the
926    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
927    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
928    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
929    fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
930        match plan {
931            // DDL / schema operations
932            LogicalPlan::CreateLabel(_)
933            | LogicalPlan::CreateEdgeType(_)
934            | LogicalPlan::AlterLabel(_)
935            | LogicalPlan::AlterEdgeType(_)
936            | LogicalPlan::DropLabel(_)
937            | LogicalPlan::DropEdgeType(_)
938            | LogicalPlan::CreateConstraint(_)
939            | LogicalPlan::DropConstraint(_)
940            | LogicalPlan::ShowConstraints(_) => true,
941
942            // Index operations
943            LogicalPlan::CreateVectorIndex { .. }
944            | LogicalPlan::CreateFullTextIndex { .. }
945            | LogicalPlan::CreateScalarIndex { .. }
946            | LogicalPlan::CreateJsonFtsIndex { .. }
947            | LogicalPlan::DropIndex { .. }
948            | LogicalPlan::ShowIndexes { .. } => true,
949
950            // Admin / utility operations
951            LogicalPlan::ShowDatabase
952            | LogicalPlan::ShowConfig
953            | LogicalPlan::ShowStatistics
954            | LogicalPlan::Vacuum
955            | LogicalPlan::Checkpoint
956            | LogicalPlan::Begin
957            | LogicalPlan::Commit
958            | LogicalPlan::Rollback
959            | LogicalPlan::Copy { .. }
960            | LogicalPlan::CopyTo { .. }
961            | LogicalPlan::CopyFrom { .. }
962            | LogicalPlan::Backup { .. }
963            | LogicalPlan::Explain { .. } => true,
964
965            // Procedure calls: DF-eligible procedures go through DataFusion,
966            // everything else (DDL, admin, unknown) stays on fallback.
967            LogicalPlan::ProcedureCall { procedure_name, .. } => {
968                !Self::is_df_eligible_procedure(procedure_name)
969            }
970
971            // Recurse through children using plan_children
972            _ => Self::plan_children(plan)
973                .iter()
974                .any(|child| Self::is_ddl_or_admin(child)),
975        }
976    }
977
978    /// Returns `true` if the procedure is a read-only, data-producing procedure
979    /// that can be executed through the DataFusion engine.
980    ///
981    /// This is a **positive allowlist** — unknown procedures default to the
982    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
983    fn is_df_eligible_procedure(name: &str) -> bool {
984        matches!(
985            name,
986            "uni.schema.labels"
987                | "uni.schema.edgeTypes"
988                | "uni.schema.relationshipTypes"
989                | "uni.schema.indexes"
990                | "uni.schema.constraints"
991                | "uni.schema.labelInfo"
992                | "uni.vector.query"
993                | "uni.fts.query"
994                | "uni.search"
995        ) || name.starts_with("uni.algo.")
996    }
997
998    /// Check if a plan contains write/mutation operations anywhere in the tree.
999    ///
1000    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
1001    /// are used to determine when a MutationContext needs to be built for DataFusion.
1002    /// This recurses through read-only wrapper nodes to detect writes nested inside
1003    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
1004    fn contains_write_operations(plan: &LogicalPlan) -> bool {
1005        match plan {
1006            LogicalPlan::Create { .. }
1007            | LogicalPlan::CreateBatch { .. }
1008            | LogicalPlan::Merge { .. }
1009            | LogicalPlan::Delete { .. }
1010            | LogicalPlan::Set { .. }
1011            | LogicalPlan::Remove { .. }
1012            | LogicalPlan::Foreach { .. } => true,
1013            _ => Self::plan_children(plan)
1014                .iter()
1015                .any(|child| Self::contains_write_operations(child)),
1016        }
1017    }
1018
1019    /// Executes a query as a stream of result batches.
1020    ///
1021    /// Routes DDL/Admin through the fallback executor and everything else
1022    /// through DataFusion.
1023    pub fn execute_stream(
1024        self,
1025        plan: LogicalPlan,
1026        prop_manager: Arc<PropertyManager>,
1027        params: HashMap<String, Value>,
1028    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1029        let this = self;
1030        let this_for_ctx = this.clone();
1031
1032        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1033
1034        ctx_stream
1035            .flat_map(move |ctx| {
1036                let plan = plan.clone();
1037                let this = this.clone();
1038                let prop_manager = prop_manager.clone();
1039                let params = params.clone();
1040
1041                let fut = async move {
1042                    if Self::is_ddl_or_admin(&plan) {
1043                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1044                            .await
1045                    } else {
1046                        let batches = this
1047                            .execute_datafusion(plan, &prop_manager, &params)
1048                            .await?;
1049                        this.record_batches_to_rows(batches)
1050                    }
1051                };
1052                stream::once(fut).boxed()
1053            })
1054            .boxed()
1055    }
1056
1057    /// Converts an Arrow array element at a given row index to a Value.
1058    /// Delegates to the shared implementation in arrow_convert module.
1059    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1060        arrow_convert::arrow_to_value(col, row, None)
1061    }
1062
1063    pub(crate) fn evaluate_expr<'a>(
1064        &'a self,
1065        expr: &'a Expr,
1066        row: &'a HashMap<String, Value>,
1067        prop_manager: &'a PropertyManager,
1068        params: &'a HashMap<String, Value>,
1069        ctx: Option<&'a QueryContext>,
1070    ) -> BoxFuture<'a, Result<Value>> {
1071        let this = self;
1072        Box::pin(async move {
1073            // First check if the expression itself is already pre-computed in the row
1074            let repr = expr.to_string_repr();
1075            if let Some(val) = row.get(&repr) {
1076                return Ok(val.clone());
1077            }
1078
1079            match expr {
1080                Expr::PatternComprehension { .. } => {
1081                    // Handled by DataFusion path via PatternComprehensionExecExpr
1082                    Err(anyhow::anyhow!(
1083                        "Pattern comprehensions are handled by DataFusion executor"
1084                    ))
1085                }
1086                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1087                    "COLLECT subqueries not yet supported in executor"
1088                )),
1089                Expr::Variable(name) => {
1090                    if let Some(val) = row.get(name) {
1091                        Ok(val.clone())
1092                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1093                        // Fallback: scan results may have system columns like "d._vid"
1094                        // without a materialized "d" Map. Return the VID so Property
1095                        // evaluation can fetch properties from storage.
1096                        Ok(vid_val.clone())
1097                    } else {
1098                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1099                    }
1100                }
1101                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1102                Expr::Property(var_expr, prop_name) => {
1103                    // Fast path: if the base is a Variable, try flat-key lookup first.
1104                    // DataFusion scan results use flat keys like "d.embedding" rather than
1105                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1106                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1107                        let flat_key = format!("{}.{}", var_name, prop_name);
1108                        if let Some(val) = row.get(flat_key.as_str()) {
1109                            return Ok(val.clone());
1110                        }
1111                    }
1112
1113                    let base_val = this
1114                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1115                        .await?;
1116
1117                    // Handle system properties _vid and _id directly
1118                    if (prop_name == "_vid" || prop_name == "_id")
1119                        && let Ok(vid) = Self::vid_from_value(&base_val)
1120                    {
1121                        return Ok(Value::Int(vid.as_u64() as i64));
1122                    }
1123
1124                    // Handle Value::Node - access properties directly or via prop manager
1125                    if let Value::Node(node) = &base_val {
1126                        // Handle system properties
1127                        if prop_name == "_vid" || prop_name == "_id" {
1128                            return Ok(Value::Int(node.vid.as_u64() as i64));
1129                        }
1130                        if prop_name == "_labels" {
1131                            return Ok(Value::List(
1132                                node.labels
1133                                    .iter()
1134                                    .map(|l| Value::String(l.clone()))
1135                                    .collect(),
1136                            ));
1137                        }
1138                        // Check in-memory properties first
1139                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1140                            return Ok(val.clone());
1141                        }
1142                        // Fallback to storage lookup
1143                        if let Ok(val) = prop_manager
1144                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1145                            .await
1146                        {
1147                            return Ok(val);
1148                        }
1149                        return Ok(Value::Null);
1150                    }
1151
1152                    // Handle Value::Edge - access properties directly or via prop manager
1153                    if let Value::Edge(edge) = &base_val {
1154                        // Handle system properties
1155                        if prop_name == "_eid" || prop_name == "_id" {
1156                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1157                        }
1158                        if prop_name == "_type" {
1159                            return Ok(Value::String(edge.edge_type.clone()));
1160                        }
1161                        if prop_name == "_src" {
1162                            return Ok(Value::Int(edge.src.as_u64() as i64));
1163                        }
1164                        if prop_name == "_dst" {
1165                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1166                        }
1167                        // Check in-memory properties first
1168                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1169                            return Ok(val.clone());
1170                        }
1171                        // Fallback to storage lookup
1172                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1173                        {
1174                            return Ok(val);
1175                        }
1176                        return Ok(Value::Null);
1177                    }
1178
1179                    // If base_val is an object (node/edge), check its properties first
1180                    // This handles properties from CREATE/MERGE that may not be persisted yet
1181                    if let Value::Map(map) = &base_val {
1182                        // First check top-level (for system properties like _id, _label, etc.)
1183                        if let Some(val) = map.get(prop_name.as_str()) {
1184                            return Ok(val.clone());
1185                        }
1186                        // Then check inside "properties" object (for user properties)
1187                        if let Some(Value::Map(props)) = map.get("properties")
1188                            && let Some(val) = props.get(prop_name.as_str())
1189                        {
1190                            return Ok(val.clone());
1191                        }
1192                        // Fallback to storage lookup using _vid or _id
1193                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1194                            map.get("_id")
1195                                .and_then(|v| v.as_str())
1196                                .and_then(|s| s.parse::<u64>().ok())
1197                        });
1198                        if let Some(id) = vid_opt {
1199                            let vid = Vid::from(id);
1200                            if let Ok(val) = prop_manager
1201                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1202                                .await
1203                            {
1204                                return Ok(val);
1205                            }
1206                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1207                            let eid = uni_common::core::id::Eid::from(id);
1208                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1209                                return Ok(val);
1210                            }
1211                        }
1212                        return Ok(Value::Null);
1213                    }
1214
1215                    // If base_val is just a VID, fetch from property manager
1216                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1217                        return prop_manager
1218                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1219                            .await;
1220                    }
1221
1222                    if base_val.is_null() {
1223                        return Ok(Value::Null);
1224                    }
1225
1226                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1227                    {
1228                        use crate::query::datetime::{
1229                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1230                            is_duration_string, is_temporal_accessor, is_temporal_string,
1231                        };
1232
1233                        // Handle Value::Temporal directly (no string parsing needed)
1234                        if let Value::Temporal(tv) = &base_val {
1235                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1236                                if is_duration_accessor(prop_name) {
1237                                    // Convert to string for the existing accessor logic
1238                                    return eval_duration_accessor(
1239                                        &base_val.to_string(),
1240                                        prop_name,
1241                                    );
1242                                }
1243                            } else if is_temporal_accessor(prop_name) {
1244                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1245                            }
1246                        }
1247
1248                        // Handle Value::String temporal (backward compat)
1249                        if let Value::String(s) = &base_val {
1250                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1251                                return eval_temporal_accessor(s, prop_name);
1252                            }
1253                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1254                                return eval_duration_accessor(s, prop_name);
1255                            }
1256                        }
1257                    }
1258
1259                    Err(anyhow!(
1260                        "Cannot access property '{}' on {:?}",
1261                        prop_name,
1262                        base_val
1263                    ))
1264                }
1265                Expr::ArrayIndex {
1266                    array: arr_expr,
1267                    index: idx_expr,
1268                } => {
1269                    let arr_val = this
1270                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1271                        .await?;
1272                    let idx_val = this
1273                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1274                        .await?;
1275
1276                    if let Value::List(arr) = &arr_val {
1277                        // Handle signed indices (allow negative)
1278                        if let Some(i) = idx_val.as_i64() {
1279                            let idx = if i < 0 {
1280                                // Negative index: -1 = last element, -2 = second to last, etc.
1281                                let positive_idx = arr.len() as i64 + i;
1282                                if positive_idx < 0 {
1283                                    return Ok(Value::Null); // Out of bounds
1284                                }
1285                                positive_idx as usize
1286                            } else {
1287                                i as usize
1288                            };
1289                            if idx < arr.len() {
1290                                return Ok(arr[idx].clone());
1291                            }
1292                            return Ok(Value::Null);
1293                        } else if idx_val.is_null() {
1294                            return Ok(Value::Null);
1295                        } else {
1296                            return Err(anyhow::anyhow!(
1297                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1298                                idx_val
1299                            ));
1300                        }
1301                    }
1302                    if let Value::Map(map) = &arr_val {
1303                        if let Some(key) = idx_val.as_str() {
1304                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1305                        } else if !idx_val.is_null() {
1306                            return Err(anyhow::anyhow!(
1307                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1308                                idx_val
1309                            ));
1310                        }
1311                    }
1312                    // Handle bracket access on Node: n['name'] returns property
1313                    if let Value::Node(node) = &arr_val {
1314                        if let Some(key) = idx_val.as_str() {
1315                            // Check in-memory properties first
1316                            if let Some(val) = node.properties.get(key) {
1317                                return Ok(val.clone());
1318                            }
1319                            // Fallback to property manager
1320                            if let Ok(val) = prop_manager
1321                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1322                                .await
1323                            {
1324                                return Ok(val);
1325                            }
1326                            return Ok(Value::Null);
1327                        } else if !idx_val.is_null() {
1328                            return Err(anyhow::anyhow!(
1329                                "TypeError: Node index must be a string, got: {:?}",
1330                                idx_val
1331                            ));
1332                        }
1333                    }
1334                    // Handle bracket access on Edge: e['property'] returns property
1335                    if let Value::Edge(edge) = &arr_val {
1336                        if let Some(key) = idx_val.as_str() {
1337                            // Check in-memory properties first
1338                            if let Some(val) = edge.properties.get(key) {
1339                                return Ok(val.clone());
1340                            }
1341                            // Fallback to property manager
1342                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1343                                return Ok(val);
1344                            }
1345                            return Ok(Value::Null);
1346                        } else if !idx_val.is_null() {
1347                            return Err(anyhow::anyhow!(
1348                                "TypeError: Edge index must be a string, got: {:?}",
1349                                idx_val
1350                            ));
1351                        }
1352                    }
1353                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1354                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1355                        && let Some(key) = idx_val.as_str()
1356                    {
1357                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1358                        {
1359                            return Ok(val);
1360                        }
1361                        return Ok(Value::Null);
1362                    }
1363                    if arr_val.is_null() {
1364                        return Ok(Value::Null);
1365                    }
1366                    Err(anyhow!(
1367                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1368                        arr_val
1369                    ))
1370                }
1371                Expr::ArraySlice { array, start, end } => {
1372                    let arr_val = this
1373                        .evaluate_expr(array, row, prop_manager, params, ctx)
1374                        .await?;
1375
1376                    if let Value::List(arr) = &arr_val {
1377                        let len = arr.len();
1378
1379                        // Evaluate start index (default to 0), null → null result
1380                        let start_idx = if let Some(s) = start {
1381                            let v = this
1382                                .evaluate_expr(s, row, prop_manager, params, ctx)
1383                                .await?;
1384                            if v.is_null() {
1385                                return Ok(Value::Null);
1386                            }
1387                            let raw = v.as_i64().unwrap_or(0);
1388                            if raw < 0 {
1389                                (len as i64 + raw).max(0) as usize
1390                            } else {
1391                                (raw as usize).min(len)
1392                            }
1393                        } else {
1394                            0
1395                        };
1396
1397                        // Evaluate end index (default to length), null → null result
1398                        let end_idx = if let Some(e) = end {
1399                            let v = this
1400                                .evaluate_expr(e, row, prop_manager, params, ctx)
1401                                .await?;
1402                            if v.is_null() {
1403                                return Ok(Value::Null);
1404                            }
1405                            let raw = v.as_i64().unwrap_or(len as i64);
1406                            if raw < 0 {
1407                                (len as i64 + raw).max(0) as usize
1408                            } else {
1409                                (raw as usize).min(len)
1410                            }
1411                        } else {
1412                            len
1413                        };
1414
1415                        // Return sliced array
1416                        if start_idx >= end_idx {
1417                            return Ok(Value::List(vec![]));
1418                        }
1419                        let end_idx = end_idx.min(len);
1420                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1421                    }
1422
1423                    if arr_val.is_null() {
1424                        return Ok(Value::Null);
1425                    }
1426                    Err(anyhow!("Cannot slice {:?}", arr_val))
1427                }
1428                Expr::Literal(lit) => Ok(lit.to_value()),
1429                Expr::List(items) => {
1430                    let mut vals = Vec::new();
1431                    for item in items {
1432                        vals.push(
1433                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1434                                .await?,
1435                        );
1436                    }
1437                    Ok(Value::List(vals))
1438                }
1439                Expr::Map(items) => {
1440                    let mut map = HashMap::new();
1441                    for (key, value_expr) in items {
1442                        let val = this
1443                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1444                            .await?;
1445                        map.insert(key.clone(), val);
1446                    }
1447                    Ok(Value::Map(map))
1448                }
1449                Expr::Exists { query, .. } => {
1450                    // Plan and execute subquery; failures return false (pattern doesn't match)
1451                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1452                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1453
1454                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1455                        Ok(plan) => {
1456                            let mut sub_params = params.clone();
1457                            sub_params.extend(row.clone());
1458
1459                            match this.execute(plan, prop_manager, &sub_params).await {
1460                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1461                                Err(e) => {
1462                                    log::debug!("EXISTS subquery execution failed: {}", e);
1463                                    Ok(Value::Bool(false))
1464                                }
1465                            }
1466                        }
1467                        Err(e) => {
1468                            log::debug!("EXISTS subquery planning failed: {}", e);
1469                            Ok(Value::Bool(false))
1470                        }
1471                    }
1472                }
1473                Expr::CountSubquery(query) => {
1474                    // Similar to Exists but returns count
1475                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1476
1477                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1478
1479                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1480                        Ok(plan) => {
1481                            let mut sub_params = params.clone();
1482                            sub_params.extend(row.clone());
1483
1484                            match this.execute(plan, prop_manager, &sub_params).await {
1485                                Ok(results) => Ok(Value::from(results.len() as i64)),
1486                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1487                            }
1488                        }
1489                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1490                    }
1491                }
1492                Expr::Quantifier {
1493                    quantifier,
1494                    variable,
1495                    list,
1496                    predicate,
1497                } => {
1498                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1499                    //
1500                    // This is the primary execution path for quantifiers because DataFusion
1501                    // does not support lambda functions yet. Queries with quantifiers attempt
1502                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1503                    // to this fallback executor path.
1504                    //
1505                    // This is intentional design - we get correct semantics with row-by-row
1506                    // evaluation until DataFusion adds lambda support.
1507                    //
1508                    // See: https://github.com/apache/datafusion/issues/14205
1509
1510                    // Evaluate the list expression
1511                    let list_val = this
1512                        .evaluate_expr(list, row, prop_manager, params, ctx)
1513                        .await?;
1514
1515                    // Handle null propagation
1516                    if list_val.is_null() {
1517                        return Ok(Value::Null);
1518                    }
1519
1520                    // Convert to array
1521                    let items = match list_val {
1522                        Value::List(arr) => arr,
1523                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1524                    };
1525
1526                    // Evaluate predicate for each item
1527                    let mut satisfied_count = 0;
1528                    for item in &items {
1529                        // Create new row with bound variable
1530                        let mut item_row = row.clone();
1531                        item_row.insert(variable.clone(), item.clone());
1532
1533                        // Evaluate predicate with bound variable
1534                        let pred_result = this
1535                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1536                            .await?;
1537
1538                        // Check if predicate is satisfied
1539                        if let Value::Bool(true) = pred_result {
1540                            satisfied_count += 1;
1541                        }
1542                    }
1543
1544                    // Return based on quantifier type
1545                    let result = match quantifier {
1546                        Quantifier::All => satisfied_count == items.len(),
1547                        Quantifier::Any => satisfied_count > 0,
1548                        Quantifier::Single => satisfied_count == 1,
1549                        Quantifier::None => satisfied_count == 0,
1550                    };
1551
1552                    Ok(Value::Bool(result))
1553                }
1554                Expr::ListComprehension {
1555                    variable,
1556                    list,
1557                    where_clause,
1558                    map_expr,
1559                } => {
1560                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1561                    //
1562                    // Similar to quantifiers, this requires lambda-like evaluation
1563                    // which DataFusion doesn't support yet. This is the primary execution path.
1564
1565                    // Evaluate the list expression
1566                    let list_val = this
1567                        .evaluate_expr(list, row, prop_manager, params, ctx)
1568                        .await?;
1569
1570                    // Handle null propagation
1571                    if list_val.is_null() {
1572                        return Ok(Value::Null);
1573                    }
1574
1575                    // Convert to array
1576                    let items = match list_val {
1577                        Value::List(arr) => arr,
1578                        _ => {
1579                            return Err(anyhow!(
1580                                "List comprehension expects a list, got: {:?}",
1581                                list_val
1582                            ));
1583                        }
1584                    };
1585
1586                    // Collect mapped values
1587                    let mut results = Vec::new();
1588                    for item in &items {
1589                        // Create new row with bound variable
1590                        let mut item_row = row.clone();
1591                        item_row.insert(variable.clone(), item.clone());
1592
1593                        // Apply WHERE filter if present
1594                        if let Some(predicate) = where_clause {
1595                            let pred_result = this
1596                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1597                                .await?;
1598
1599                            // Skip items that don't match the filter
1600                            if !matches!(pred_result, Value::Bool(true)) {
1601                                continue;
1602                            }
1603                        }
1604
1605                        // Apply map expression
1606                        let mapped_val = this
1607                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1608                            .await?;
1609                        results.push(mapped_val);
1610                    }
1611
1612                    Ok(Value::List(results))
1613                }
1614                Expr::BinaryOp { left, op, right } => {
1615                    // Short-circuit evaluation for AND/OR
1616                    match op {
1617                        BinaryOp::And => {
1618                            let l_val = this
1619                                .evaluate_expr(left, row, prop_manager, params, ctx)
1620                                .await?;
1621                            // Short-circuit: if left is false, don't evaluate right
1622                            if let Some(false) = l_val.as_bool() {
1623                                return Ok(Value::Bool(false));
1624                            }
1625                            let r_val = this
1626                                .evaluate_expr(right, row, prop_manager, params, ctx)
1627                                .await?;
1628                            eval_binary_op(&l_val, op, &r_val)
1629                        }
1630                        BinaryOp::Or => {
1631                            let l_val = this
1632                                .evaluate_expr(left, row, prop_manager, params, ctx)
1633                                .await?;
1634                            // Short-circuit: if left is true, don't evaluate right
1635                            if let Some(true) = l_val.as_bool() {
1636                                return Ok(Value::Bool(true));
1637                            }
1638                            let r_val = this
1639                                .evaluate_expr(right, row, prop_manager, params, ctx)
1640                                .await?;
1641                            eval_binary_op(&l_val, op, &r_val)
1642                        }
1643                        _ => {
1644                            // For all other operators, evaluate both sides
1645                            let l_val = this
1646                                .evaluate_expr(left, row, prop_manager, params, ctx)
1647                                .await?;
1648                            let r_val = this
1649                                .evaluate_expr(right, row, prop_manager, params, ctx)
1650                                .await?;
1651                            eval_binary_op(&l_val, op, &r_val)
1652                        }
1653                    }
1654                }
1655                Expr::In { expr, list } => {
1656                    let l_val = this
1657                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1658                        .await?;
1659                    let r_val = this
1660                        .evaluate_expr(list, row, prop_manager, params, ctx)
1661                        .await?;
1662                    eval_in_op(&l_val, &r_val)
1663                }
1664                Expr::UnaryOp { op, expr } => {
1665                    let val = this
1666                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1667                        .await?;
1668                    match op {
1669                        UnaryOp::Not => {
1670                            // Three-valued logic: NOT null = null
1671                            match val.as_bool() {
1672                                Some(b) => Ok(Value::Bool(!b)),
1673                                None if val.is_null() => Ok(Value::Null),
1674                                None => Err(anyhow!(
1675                                    "InvalidArgumentType: NOT requires a boolean argument"
1676                                )),
1677                            }
1678                        }
1679                        UnaryOp::Neg => {
1680                            if let Some(i) = val.as_i64() {
1681                                Ok(Value::Int(-i))
1682                            } else if let Some(f) = val.as_f64() {
1683                                Ok(Value::Float(-f))
1684                            } else {
1685                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1686                            }
1687                        }
1688                    }
1689                }
1690                Expr::IsNull(expr) => {
1691                    let val = this
1692                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1693                        .await?;
1694                    Ok(Value::Bool(val.is_null()))
1695                }
1696                Expr::IsNotNull(expr) => {
1697                    let val = this
1698                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1699                        .await?;
1700                    Ok(Value::Bool(!val.is_null()))
1701                }
1702                Expr::IsUnique(_) => {
1703                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1704                    Err(anyhow!(
1705                        "IS UNIQUE can only be used in constraint definitions"
1706                    ))
1707                }
1708                Expr::Case {
1709                    expr,
1710                    when_then,
1711                    else_expr,
1712                } => {
1713                    if let Some(base_expr) = expr {
1714                        let base_val = this
1715                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1716                            .await?;
1717                        for (w, t) in when_then {
1718                            let w_val = this
1719                                .evaluate_expr(w, row, prop_manager, params, ctx)
1720                                .await?;
1721                            if base_val == w_val {
1722                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1723                            }
1724                        }
1725                    } else {
1726                        for (w, t) in when_then {
1727                            let w_val = this
1728                                .evaluate_expr(w, row, prop_manager, params, ctx)
1729                                .await?;
1730                            if w_val.as_bool() == Some(true) {
1731                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1732                            }
1733                        }
1734                    }
1735                    if let Some(e) = else_expr {
1736                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1737                    }
1738                    Ok(Value::Null)
1739                }
1740                Expr::Wildcard => Ok(Value::Null),
1741                Expr::FunctionCall { name, args, .. } => {
1742                    // Special case: id() returns VID for nodes and EID for relationships
1743                    if name.eq_ignore_ascii_case("ID") {
1744                        if args.len() != 1 {
1745                            return Err(anyhow!("id() requires exactly 1 argument"));
1746                        }
1747                        let val = this
1748                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1749                            .await?;
1750                        if let Value::Map(map) = &val {
1751                            // Check for _vid (vertex) first
1752                            if let Some(vid_val) = map.get("_vid") {
1753                                return Ok(vid_val.clone());
1754                            }
1755                            // Check for _eid (edge/relationship)
1756                            if let Some(eid_val) = map.get("_eid") {
1757                                return Ok(eid_val.clone());
1758                            }
1759                            // Check for _id (fallback)
1760                            if let Some(id_val) = map.get("_id") {
1761                                return Ok(id_val.clone());
1762                            }
1763                        }
1764                        return Ok(Value::Null);
1765                    }
1766
1767                    // Special case: elementId() returns string format "label_id:local_offset"
1768                    if name.eq_ignore_ascii_case("ELEMENTID") {
1769                        if args.len() != 1 {
1770                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1771                        }
1772                        let val = this
1773                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1774                            .await?;
1775                        if let Value::Map(map) = &val {
1776                            // Check for _vid (vertex) first
1777                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1778                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1779                                return Ok(Value::String(vid_val.to_string()));
1780                            }
1781                            // Check for _eid (edge/relationship)
1782                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1783                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1784                                return Ok(Value::String(eid_val.to_string()));
1785                            }
1786                        }
1787                        return Ok(Value::Null);
1788                    }
1789
1790                    // Special case: type() returns the relationship type name
1791                    if name.eq_ignore_ascii_case("TYPE") {
1792                        if args.len() != 1 {
1793                            return Err(anyhow!("type() requires exactly 1 argument"));
1794                        }
1795                        let val = this
1796                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1797                            .await?;
1798                        if let Value::Map(map) = &val
1799                            && let Some(type_val) = map.get("_type")
1800                        {
1801                            // Numeric _type is an edge type ID; string _type is already a name
1802                            if let Some(type_id) =
1803                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1804                            {
1805                                if let Some(name) = this
1806                                    .storage
1807                                    .schema_manager()
1808                                    .edge_type_name_by_id_unified(type_id)
1809                                {
1810                                    return Ok(Value::String(name));
1811                                }
1812                            } else if let Some(name) = type_val.as_str() {
1813                                return Ok(Value::String(name.to_string()));
1814                            }
1815                        }
1816                        return Ok(Value::Null);
1817                    }
1818
1819                    // Special case: labels() returns the labels of a node
1820                    if name.eq_ignore_ascii_case("LABELS") {
1821                        if args.len() != 1 {
1822                            return Err(anyhow!("labels() requires exactly 1 argument"));
1823                        }
1824                        let val = this
1825                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1826                            .await?;
1827                        if let Value::Map(map) = &val
1828                            && let Some(labels_val) = map.get("_labels")
1829                        {
1830                            return Ok(labels_val.clone());
1831                        }
1832                        return Ok(Value::Null);
1833                    }
1834
1835                    // Special case: properties() returns the properties map of a node/edge
1836                    if name.eq_ignore_ascii_case("PROPERTIES") {
1837                        if args.len() != 1 {
1838                            return Err(anyhow!("properties() requires exactly 1 argument"));
1839                        }
1840                        let val = this
1841                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1842                            .await?;
1843                        if let Value::Map(map) = &val {
1844                            // Filter out internal properties (those starting with _)
1845                            let mut props = HashMap::new();
1846                            for (k, v) in map.iter() {
1847                                if !k.starts_with('_') {
1848                                    props.insert(k.clone(), v.clone());
1849                                }
1850                            }
1851                            return Ok(Value::Map(props));
1852                        }
1853                        return Ok(Value::Null);
1854                    }
1855
1856                    // Special case: startNode() returns the start node of a relationship
1857                    if name.eq_ignore_ascii_case("STARTNODE") {
1858                        if args.len() != 1 {
1859                            return Err(anyhow!("startNode() requires exactly 1 argument"));
1860                        }
1861                        let val = this
1862                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1863                            .await?;
1864                        if let Value::Edge(edge) = &val {
1865                            return Ok(Self::find_node_by_vid(row, edge.src));
1866                        }
1867                        if let Value::Map(map) = &val {
1868                            if let Some(start_node) = map.get("_startNode") {
1869                                return Ok(start_node.clone());
1870                            }
1871                            if let Some(src_vid) = map.get("_src_vid") {
1872                                return Ok(Value::Map(HashMap::from([(
1873                                    "_vid".to_string(),
1874                                    src_vid.clone(),
1875                                )])));
1876                            }
1877                            // Resolve _src VID by looking up node in row
1878                            if let Some(src_id) = map.get("_src")
1879                                && let Some(u) = src_id.as_u64()
1880                            {
1881                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1882                            }
1883                        }
1884                        return Ok(Value::Null);
1885                    }
1886
1887                    // Special case: endNode() returns the end node of a relationship
1888                    if name.eq_ignore_ascii_case("ENDNODE") {
1889                        if args.len() != 1 {
1890                            return Err(anyhow!("endNode() requires exactly 1 argument"));
1891                        }
1892                        let val = this
1893                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1894                            .await?;
1895                        if let Value::Edge(edge) = &val {
1896                            return Ok(Self::find_node_by_vid(row, edge.dst));
1897                        }
1898                        if let Value::Map(map) = &val {
1899                            if let Some(end_node) = map.get("_endNode") {
1900                                return Ok(end_node.clone());
1901                            }
1902                            if let Some(dst_vid) = map.get("_dst_vid") {
1903                                return Ok(Value::Map(HashMap::from([(
1904                                    "_vid".to_string(),
1905                                    dst_vid.clone(),
1906                                )])));
1907                            }
1908                            // Resolve _dst VID by looking up node in row
1909                            if let Some(dst_id) = map.get("_dst")
1910                                && let Some(u) = dst_id.as_u64()
1911                            {
1912                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1913                            }
1914                        }
1915                        return Ok(Value::Null);
1916                    }
1917
1918                    // Special case: hasLabel() checks if a node has a specific label
1919                    // Used for WHERE n:Label predicates
1920                    if name.eq_ignore_ascii_case("HASLABEL") {
1921                        if args.len() != 2 {
1922                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1923                        }
1924                        let node_val = this
1925                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1926                            .await?;
1927                        let label_val = this
1928                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1929                            .await?;
1930
1931                        let label_to_check = label_val.as_str().ok_or_else(|| {
1932                            anyhow!("Second argument to hasLabel must be a string")
1933                        })?;
1934
1935                        let has_label = match &node_val {
1936                            // Handle proper Value::Node type (from result normalization)
1937                            Value::Map(map) if map.contains_key("_vid") => {
1938                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1939                                    labels_arr
1940                                        .iter()
1941                                        .any(|l| l.as_str() == Some(label_to_check))
1942                                } else {
1943                                    false
1944                                }
1945                            }
1946                            // Also handle legacy Object format
1947                            Value::Map(map) => {
1948                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1949                                    labels_arr
1950                                        .iter()
1951                                        .any(|l| l.as_str() == Some(label_to_check))
1952                                } else {
1953                                    false
1954                                }
1955                            }
1956                            _ => false,
1957                        };
1958                        return Ok(Value::Bool(has_label));
1959                    }
1960
1961                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
1962                    // These should be parsed as Expr::Quantifier instead.
1963                    if matches!(
1964                        name.to_uppercase().as_str(),
1965                        "ANY" | "ALL" | "NONE" | "SINGLE"
1966                    ) {
1967                        return Err(anyhow!(
1968                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1969                            name.to_lowercase()
1970                        ));
1971                    }
1972
1973                    // Special case: COALESCE needs short-circuit evaluation
1974                    if name.eq_ignore_ascii_case("COALESCE") {
1975                        for arg in args {
1976                            let val = this
1977                                .evaluate_expr(arg, row, prop_manager, params, ctx)
1978                                .await?;
1979                            if !val.is_null() {
1980                                return Ok(val);
1981                            }
1982                        }
1983                        return Ok(Value::Null);
1984                    }
1985
1986                    // Special case: vector_similarity has dedicated implementation
1987                    if name.eq_ignore_ascii_case("vector_similarity") {
1988                        if args.len() != 2 {
1989                            return Err(anyhow!("vector_similarity takes 2 arguments"));
1990                        }
1991                        let v1 = this
1992                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1993                            .await?;
1994                        let v2 = this
1995                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1996                            .await?;
1997                        return eval_vector_similarity(&v1, &v2);
1998                    }
1999
2000                    // Special case: uni.validAt handles node fetching
2001                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
2002                        || name.eq_ignore_ascii_case("uni.validAt")
2003                        || name.eq_ignore_ascii_case("validAt")
2004                    {
2005                        if args.len() != 4 {
2006                            return Err(anyhow!("validAt requires 4 arguments"));
2007                        }
2008                        let node_val = this
2009                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2010                            .await?;
2011                        let start_prop = this
2012                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2013                            .await?
2014                            .as_str()
2015                            .ok_or(anyhow!("start_prop must be string"))?
2016                            .to_string();
2017                        let end_prop = this
2018                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2019                            .await?
2020                            .as_str()
2021                            .ok_or(anyhow!("end_prop must be string"))?
2022                            .to_string();
2023                        let time_val = this
2024                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2025                            .await?;
2026
2027                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2028                            anyhow!("time argument must be a datetime value or string")
2029                        })?;
2030
2031                        // Fetch temporal property values - supports both vertices and edges
2032                        let valid_from_val: Option<Value> = if let Ok(vid) =
2033                            Self::vid_from_value(&node_val)
2034                        {
2035                            // Vertex case - VID string format
2036                            prop_manager
2037                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2038                                .await
2039                                .ok()
2040                        } else if let Value::Map(map) = &node_val {
2041                            // Check for embedded _vid or _eid in object
2042                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2043                                let vid = Vid::from(vid_val);
2044                                prop_manager
2045                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2046                                    .await
2047                                    .ok()
2048                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2049                                // Edge case
2050                                let eid = uni_common::core::id::Eid::from(eid_val);
2051                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2052                            } else {
2053                                // Inline object - property embedded directly
2054                                map.get(&start_prop).cloned()
2055                            }
2056                        } else {
2057                            return Ok(Value::Bool(false));
2058                        };
2059
2060                        let valid_from = match valid_from_val {
2061                            Some(ref v) => match value_to_datetime_utc(v) {
2062                                Some(dt) => dt,
2063                                None if v.is_null() => return Ok(Value::Bool(false)),
2064                                None => {
2065                                    return Err(anyhow!(
2066                                        "Property {} must be a datetime value or string",
2067                                        start_prop
2068                                    ));
2069                                }
2070                            },
2071                            None => return Ok(Value::Bool(false)),
2072                        };
2073
2074                        let valid_to_val: Option<Value> = if let Ok(vid) =
2075                            Self::vid_from_value(&node_val)
2076                        {
2077                            // Vertex case - VID string format
2078                            prop_manager
2079                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2080                                .await
2081                                .ok()
2082                        } else if let Value::Map(map) = &node_val {
2083                            // Check for embedded _vid or _eid in object
2084                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2085                                let vid = Vid::from(vid_val);
2086                                prop_manager
2087                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2088                                    .await
2089                                    .ok()
2090                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2091                                // Edge case
2092                                let eid = uni_common::core::id::Eid::from(eid_val);
2093                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2094                            } else {
2095                                // Inline object - property embedded directly
2096                                map.get(&end_prop).cloned()
2097                            }
2098                        } else {
2099                            return Ok(Value::Bool(false));
2100                        };
2101
2102                        let valid_to = match valid_to_val {
2103                            Some(ref v) => match value_to_datetime_utc(v) {
2104                                Some(dt) => Some(dt),
2105                                None if v.is_null() => None,
2106                                None => {
2107                                    return Err(anyhow!(
2108                                        "Property {} must be a datetime value or null",
2109                                        end_prop
2110                                    ));
2111                                }
2112                            },
2113                            None => None,
2114                        };
2115
2116                        let is_valid = valid_from <= query_time
2117                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2118                        return Ok(Value::Bool(is_valid));
2119                    }
2120
2121                    // For all other functions, evaluate arguments then call helper
2122                    let mut evaluated_args = Vec::with_capacity(args.len());
2123                    for arg in args {
2124                        let mut val = this
2125                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2126                            .await?;
2127
2128                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2129                        // Functions like validAt() need access to properties like valid_from/valid_to.
2130                        if let Value::Map(ref mut map) = val {
2131                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2132                        }
2133
2134                        evaluated_args.push(val);
2135                    }
2136                    eval_scalar_function(name, &evaluated_args)
2137                }
2138                Expr::Reduce {
2139                    accumulator,
2140                    init,
2141                    variable,
2142                    list,
2143                    expr,
2144                } => {
2145                    let mut acc = self
2146                        .evaluate_expr(init, row, prop_manager, params, ctx)
2147                        .await?;
2148                    let list_val = self
2149                        .evaluate_expr(list, row, prop_manager, params, ctx)
2150                        .await?;
2151
2152                    if let Value::List(items) = list_val {
2153                        for item in items {
2154                            // Create a temporary scope/row with accumulator and variable
2155                            // For simplicity in fallback executor, we can construct a new row map
2156                            // merging current row + new variables.
2157                            let mut scope = row.clone();
2158                            scope.insert(accumulator.clone(), acc.clone());
2159                            scope.insert(variable.clone(), item);
2160
2161                            acc = self
2162                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2163                                .await?;
2164                        }
2165                    } else {
2166                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2167                    }
2168                    Ok(acc)
2169                }
2170                Expr::ValidAt { .. } => {
2171                    // VALID_AT should have been transformed to a function call in the planner
2172                    Err(anyhow!(
2173                        "VALID_AT expression should have been transformed to function call in planner"
2174                    ))
2175                }
2176
2177                Expr::LabelCheck { expr, labels } => {
2178                    let val = this
2179                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2180                        .await?;
2181                    match &val {
2182                        Value::Null => Ok(Value::Null),
2183                        Value::Map(map) => {
2184                            // Check if this is an edge (has _eid) or node (has _vid)
2185                            let is_edge = map.contains_key("_eid")
2186                                || map.contains_key("_type_name")
2187                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2188
2189                            if is_edge {
2190                                // Edges have a single type
2191                                if labels.len() > 1 {
2192                                    return Ok(Value::Bool(false));
2193                                }
2194                                let label_to_check = &labels[0];
2195                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2196                                {
2197                                    t == label_to_check
2198                                } else if let Some(Value::String(t)) = map.get("_type") {
2199                                    t == label_to_check
2200                                } else {
2201                                    false
2202                                };
2203                                Ok(Value::Bool(has_type))
2204                            } else {
2205                                // Node: check all labels
2206                                let has_all = labels.iter().all(|label_to_check| {
2207                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2208                                        labels_arr
2209                                            .iter()
2210                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2211                                    } else {
2212                                        false
2213                                    }
2214                                });
2215                                Ok(Value::Bool(has_all))
2216                            }
2217                        }
2218                        _ => Ok(Value::Bool(false)),
2219                    }
2220                }
2221
2222                Expr::MapProjection { base, items } => {
2223                    let base_value = this
2224                        .evaluate_expr(base, row, prop_manager, params, ctx)
2225                        .await?;
2226
2227                    // Extract properties from the base object
2228                    let properties = match &base_value {
2229                        Value::Map(map) => map,
2230                        _ => {
2231                            return Err(anyhow!(
2232                                "Map projection requires object, got {:?}",
2233                                base_value
2234                            ));
2235                        }
2236                    };
2237
2238                    let mut result_map = HashMap::new();
2239
2240                    for item in items {
2241                        match item {
2242                            MapProjectionItem::Property(prop) => {
2243                                if let Some(value) = properties.get(prop.as_str()) {
2244                                    result_map.insert(prop.clone(), value.clone());
2245                                }
2246                            }
2247                            MapProjectionItem::AllProperties => {
2248                                // Include all properties except internal fields (those starting with _)
2249                                for (key, value) in properties.iter() {
2250                                    if !key.starts_with('_') {
2251                                        result_map.insert(key.clone(), value.clone());
2252                                    }
2253                                }
2254                            }
2255                            MapProjectionItem::LiteralEntry(key, expr) => {
2256                                let value = this
2257                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2258                                    .await?;
2259                                result_map.insert(key.clone(), value);
2260                            }
2261                            MapProjectionItem::Variable(var_name) => {
2262                                // Variable selector: include the value of the variable in the result
2263                                // e.g., person{.name, friend} includes the value of 'friend' variable
2264                                if let Some(value) = row.get(var_name.as_str()) {
2265                                    result_map.insert(var_name.clone(), value.clone());
2266                                }
2267                            }
2268                        }
2269                    }
2270
2271                    Ok(Value::Map(result_map))
2272                }
2273            }
2274        })
2275    }
2276
2277    pub(crate) fn execute_subplan<'a>(
2278        &'a self,
2279        plan: LogicalPlan,
2280        prop_manager: &'a PropertyManager,
2281        params: &'a HashMap<String, Value>,
2282        ctx: Option<&'a QueryContext>,
2283    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2284        Box::pin(async move {
2285            if let Some(ctx) = ctx {
2286                ctx.check_timeout()?;
2287            }
2288            match plan {
2289                LogicalPlan::Union { left, right, all } => {
2290                    self.execute_union(left, right, all, prop_manager, params, ctx)
2291                        .await
2292                }
2293                LogicalPlan::CreateVectorIndex {
2294                    config,
2295                    if_not_exists,
2296                } => {
2297                    if if_not_exists && self.index_exists_by_name(&config.name) {
2298                        return Ok(vec![]);
2299                    }
2300                    let idx_mgr = IndexManager::new(
2301                        self.storage.base_path(),
2302                        self.storage.schema_manager_arc(),
2303                        self.storage.lancedb_store_arc(),
2304                    );
2305                    idx_mgr.create_vector_index(config).await?;
2306                    Ok(vec![])
2307                }
2308                LogicalPlan::CreateFullTextIndex {
2309                    config,
2310                    if_not_exists,
2311                } => {
2312                    if if_not_exists && self.index_exists_by_name(&config.name) {
2313                        return Ok(vec![]);
2314                    }
2315                    let idx_mgr = IndexManager::new(
2316                        self.storage.base_path(),
2317                        self.storage.schema_manager_arc(),
2318                        self.storage.lancedb_store_arc(),
2319                    );
2320                    idx_mgr.create_fts_index(config).await?;
2321                    Ok(vec![])
2322                }
2323                LogicalPlan::CreateScalarIndex {
2324                    mut config,
2325                    if_not_exists,
2326                } => {
2327                    if if_not_exists && self.index_exists_by_name(&config.name) {
2328                        return Ok(vec![]);
2329                    }
2330
2331                    // Check for expression indexes - create generated columns
2332                    let mut modified_properties = Vec::new();
2333
2334                    for prop in &config.properties {
2335                        // Heuristic: if contains '(' and ')', it's an expression
2336                        if prop.contains('(') && prop.contains(')') {
2337                            let gen_col = SchemaManager::generated_column_name(prop);
2338
2339                            // Add generated property to schema
2340                            let sm = self.storage.schema_manager_arc();
2341                            if let Err(e) = sm.add_generated_property(
2342                                &config.label,
2343                                &gen_col,
2344                                DataType::String, // Default type for expressions
2345                                prop.clone(),
2346                            ) {
2347                                log::warn!("Failed to add generated property (might exist): {}", e);
2348                            }
2349
2350                            modified_properties.push(gen_col);
2351                        } else {
2352                            // Simple property - use as-is
2353                            modified_properties.push(prop.clone());
2354                        }
2355                    }
2356
2357                    config.properties = modified_properties;
2358
2359                    let idx_mgr = IndexManager::new(
2360                        self.storage.base_path(),
2361                        self.storage.schema_manager_arc(),
2362                        self.storage.lancedb_store_arc(),
2363                    );
2364                    idx_mgr.create_scalar_index(config).await?;
2365                    Ok(vec![])
2366                }
2367                LogicalPlan::CreateJsonFtsIndex {
2368                    config,
2369                    if_not_exists,
2370                } => {
2371                    if if_not_exists && self.index_exists_by_name(&config.name) {
2372                        return Ok(vec![]);
2373                    }
2374                    let idx_mgr = IndexManager::new(
2375                        self.storage.base_path(),
2376                        self.storage.schema_manager_arc(),
2377                        self.storage.lancedb_store_arc(),
2378                    );
2379                    idx_mgr.create_json_fts_index(config).await?;
2380                    Ok(vec![])
2381                }
2382                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2383                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2384                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2385                LogicalPlan::Vacuum => {
2386                    self.execute_vacuum().await?;
2387                    Ok(vec![])
2388                }
2389                LogicalPlan::Checkpoint => {
2390                    self.execute_checkpoint().await?;
2391                    Ok(vec![])
2392                }
2393                LogicalPlan::CopyTo {
2394                    label,
2395                    path,
2396                    format,
2397                    options,
2398                } => {
2399                    let count = self
2400                        .execute_copy_to(&label, &path, &format, &options)
2401                        .await?;
2402                    let mut result = HashMap::new();
2403                    result.insert("count".to_string(), Value::Int(count as i64));
2404                    Ok(vec![result])
2405                }
2406                LogicalPlan::CopyFrom {
2407                    label,
2408                    path,
2409                    format,
2410                    options,
2411                } => {
2412                    let count = self
2413                        .execute_copy_from(&label, &path, &format, &options)
2414                        .await?;
2415                    let mut result = HashMap::new();
2416                    result.insert("count".to_string(), Value::Int(count as i64));
2417                    Ok(vec![result])
2418                }
2419                LogicalPlan::CreateLabel(clause) => {
2420                    self.execute_create_label(clause).await?;
2421                    Ok(vec![])
2422                }
2423                LogicalPlan::CreateEdgeType(clause) => {
2424                    self.execute_create_edge_type(clause).await?;
2425                    Ok(vec![])
2426                }
2427                LogicalPlan::AlterLabel(clause) => {
2428                    self.execute_alter_label(clause).await?;
2429                    Ok(vec![])
2430                }
2431                LogicalPlan::AlterEdgeType(clause) => {
2432                    self.execute_alter_edge_type(clause).await?;
2433                    Ok(vec![])
2434                }
2435                LogicalPlan::DropLabel(clause) => {
2436                    self.execute_drop_label(clause).await?;
2437                    Ok(vec![])
2438                }
2439                LogicalPlan::DropEdgeType(clause) => {
2440                    self.execute_drop_edge_type(clause).await?;
2441                    Ok(vec![])
2442                }
2443                LogicalPlan::CreateConstraint(clause) => {
2444                    self.execute_create_constraint(clause).await?;
2445                    Ok(vec![])
2446                }
2447                LogicalPlan::DropConstraint(clause) => {
2448                    self.execute_drop_constraint(clause).await?;
2449                    Ok(vec![])
2450                }
2451                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2452                LogicalPlan::DropIndex { name, if_exists } => {
2453                    let idx_mgr = IndexManager::new(
2454                        self.storage.base_path(),
2455                        self.storage.schema_manager_arc(),
2456                        self.storage.lancedb_store_arc(),
2457                    );
2458                    match idx_mgr.drop_index(&name).await {
2459                        Ok(_) => Ok(vec![]),
2460                        Err(e) => {
2461                            if if_exists && e.to_string().contains("not found") {
2462                                Ok(vec![])
2463                            } else {
2464                                Err(e)
2465                            }
2466                        }
2467                    }
2468                }
2469                LogicalPlan::ShowIndexes { filter } => {
2470                    Ok(self.execute_show_indexes(filter.as_deref()))
2471                }
2472                // Scan/traverse nodes: delegate to DataFusion for data access,
2473                // then convert results to HashMaps for the fallback executor.
2474                LogicalPlan::Scan { .. }
2475                | LogicalPlan::ExtIdLookup { .. }
2476                | LogicalPlan::ScanAll { .. }
2477                | LogicalPlan::ScanMainByLabels { .. }
2478                | LogicalPlan::Traverse { .. }
2479                | LogicalPlan::TraverseMainByType { .. } => {
2480                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2481                    self.record_batches_to_rows(batches)
2482                }
2483                LogicalPlan::Filter {
2484                    input,
2485                    predicate,
2486                    optional_variables,
2487                } => {
2488                    let input_matches = self
2489                        .execute_subplan(*input, prop_manager, params, ctx)
2490                        .await?;
2491
2492                    tracing::debug!(
2493                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2494                        predicate,
2495                        input_matches.len(),
2496                        optional_variables
2497                    );
2498
2499                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2500                    // Group rows by non-optional variables, apply filter, and ensure
2501                    // at least one row per group (with NULLs if filter removes all).
2502                    if !optional_variables.is_empty() {
2503                        // Helper to check if a key belongs to an optional variable.
2504                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2505                        let is_optional_key = |k: &str| -> bool {
2506                            optional_variables.contains(k)
2507                                || optional_variables
2508                                    .iter()
2509                                    .any(|var| k.starts_with(&format!("{}.", var)))
2510                        };
2511
2512                        // Helper to check if a key is internal (should not affect grouping)
2513                        let is_internal_key =
2514                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2515
2516                        // Compute the key (non-optional, non-internal variables) for grouping
2517                        let non_optional_vars: Vec<String> = input_matches
2518                            .first()
2519                            .map(|row| {
2520                                row.keys()
2521                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2522                                    .cloned()
2523                                    .collect()
2524                            })
2525                            .unwrap_or_default();
2526
2527                        // Group rows by their non-optional variable values
2528                        let mut groups: std::collections::HashMap<
2529                            Vec<u8>,
2530                            Vec<HashMap<String, Value>>,
2531                        > = std::collections::HashMap::new();
2532
2533                        for row in &input_matches {
2534                            // Create a key from non-optional variable values
2535                            let key: Vec<u8> = non_optional_vars
2536                                .iter()
2537                                .map(|var| {
2538                                    row.get(var).map(|v| format!("{:?}", v)).unwrap_or_default()
2539                                })
2540                                .collect::<Vec<_>>()
2541                                .join("|")
2542                                .into_bytes();
2543
2544                            groups.entry(key).or_default().push(row.clone());
2545                        }
2546
2547                        let mut filtered = Vec::new();
2548                        for (_key, group_rows) in groups {
2549                            let mut group_passed = Vec::new();
2550
2551                            for row in &group_rows {
2552                                // If optional variables are already NULL, preserve the row
2553                                let has_null_optional = optional_variables.iter().any(|var| {
2554                                    // Check both "var" and "var._vid" style keys
2555                                    let direct_null =
2556                                        matches!(row.get(var), Some(Value::Null) | None);
2557                                    let prefixed_null = row
2558                                        .keys()
2559                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2560                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2561                                    direct_null || prefixed_null
2562                                });
2563
2564                                if has_null_optional {
2565                                    group_passed.push(row.clone());
2566                                    continue;
2567                                }
2568
2569                                let res = self
2570                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2571                                    .await?;
2572
2573                                if res.as_bool().unwrap_or(false) {
2574                                    group_passed.push(row.clone());
2575                                }
2576                            }
2577
2578                            if group_passed.is_empty() {
2579                                // No rows passed - emit one row with NULLs for optional variables
2580                                // Use the first row's non-optional values as a template
2581                                if let Some(template) = group_rows.first() {
2582                                    let mut null_row = HashMap::new();
2583                                    for (k, v) in template {
2584                                        if is_optional_key(k) {
2585                                            null_row.insert(k.clone(), Value::Null);
2586                                        } else {
2587                                            null_row.insert(k.clone(), v.clone());
2588                                        }
2589                                    }
2590                                    filtered.push(null_row);
2591                                }
2592                            } else {
2593                                filtered.extend(group_passed);
2594                            }
2595                        }
2596
2597                        tracing::debug!(
2598                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2599                            input_matches.len(),
2600                            filtered.len()
2601                        );
2602
2603                        return Ok(filtered);
2604                    }
2605
2606                    // Standard filter for non-OPTIONAL MATCH
2607                    let mut filtered = Vec::new();
2608                    for row in input_matches.iter() {
2609                        let res = self
2610                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2611                            .await?;
2612
2613                        let passes = res.as_bool().unwrap_or(false);
2614
2615                        if passes {
2616                            filtered.push(row.clone());
2617                        }
2618                    }
2619
2620                    tracing::debug!(
2621                        "Filter: {} input rows -> {} output rows",
2622                        input_matches.len(),
2623                        filtered.len()
2624                    );
2625
2626                    Ok(filtered)
2627                }
2628                LogicalPlan::ProcedureCall {
2629                    procedure_name,
2630                    arguments,
2631                    yield_items,
2632                } => {
2633                    let yield_names: Vec<String> =
2634                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2635                    let results = self
2636                        .execute_procedure(
2637                            &procedure_name,
2638                            &arguments,
2639                            &yield_names,
2640                            prop_manager,
2641                            params,
2642                            ctx,
2643                        )
2644                        .await?;
2645
2646                    // Handle aliasing: collect all original values first, then
2647                    // build the aliased row in one pass. This avoids issues when
2648                    // an alias matches another yield item's original name (e.g.,
2649                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2650                    // clobber the original "b" before it is renamed to "d").
2651                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2652                    if !has_aliases {
2653                        // No aliases (includes YIELD * which produces empty yield_items) —
2654                        // pass through the procedure output rows unchanged.
2655                        Ok(results)
2656                    } else {
2657                        let mut aliased_results = Vec::with_capacity(results.len());
2658                        for row in results {
2659                            let mut new_row = HashMap::new();
2660                            for (name, alias) in &yield_items {
2661                                let col_name = alias.as_ref().unwrap_or(name);
2662                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2663                                new_row.insert(col_name.clone(), val);
2664                            }
2665                            aliased_results.push(new_row);
2666                        }
2667                        Ok(aliased_results)
2668                    }
2669                }
2670                LogicalPlan::VectorKnn { .. } => {
2671                    unreachable!("VectorKnn is handled by DataFusion engine")
2672                }
2673                LogicalPlan::InvertedIndexLookup { .. } => {
2674                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2675                }
2676                LogicalPlan::Sort { input, order_by } => {
2677                    let rows = self
2678                        .execute_subplan(*input, prop_manager, params, ctx)
2679                        .await?;
2680                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2681                        .await
2682                }
2683                LogicalPlan::Limit { input, skip, fetch } => {
2684                    let rows = self
2685                        .execute_subplan(*input, prop_manager, params, ctx)
2686                        .await?;
2687                    let skip = skip.unwrap_or(0);
2688                    let take = fetch.unwrap_or(usize::MAX);
2689                    Ok(rows.into_iter().skip(skip).take(take).collect())
2690                }
2691                LogicalPlan::Aggregate {
2692                    input,
2693                    group_by,
2694                    aggregates,
2695                } => {
2696                    let rows = self
2697                        .execute_subplan(*input, prop_manager, params, ctx)
2698                        .await?;
2699                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2700                        .await
2701                }
2702                LogicalPlan::Window {
2703                    input,
2704                    window_exprs,
2705                } => {
2706                    let rows = self
2707                        .execute_subplan(*input, prop_manager, params, ctx)
2708                        .await?;
2709                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2710                        .await
2711                }
2712                LogicalPlan::Project { input, projections } => {
2713                    let matches = self
2714                        .execute_subplan(*input, prop_manager, params, ctx)
2715                        .await?;
2716                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2717                        .await
2718                }
2719                LogicalPlan::Distinct { input } => {
2720                    let rows = self
2721                        .execute_subplan(*input, prop_manager, params, ctx)
2722                        .await?;
2723                    let mut seen = std::collections::HashSet::new();
2724                    let mut result = Vec::new();
2725                    for row in rows {
2726                        let key = Self::canonical_row_key(&row);
2727                        if seen.insert(key) {
2728                            result.push(row);
2729                        }
2730                    }
2731                    Ok(result)
2732                }
2733                LogicalPlan::Unwind {
2734                    input,
2735                    expr,
2736                    variable,
2737                } => {
2738                    let input_rows = self
2739                        .execute_subplan(*input, prop_manager, params, ctx)
2740                        .await?;
2741                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2742                        .await
2743                }
2744                LogicalPlan::Apply {
2745                    input,
2746                    subquery,
2747                    input_filter,
2748                } => {
2749                    let input_rows = self
2750                        .execute_subplan(*input, prop_manager, params, ctx)
2751                        .await?;
2752                    self.execute_apply(
2753                        input_rows,
2754                        &subquery,
2755                        input_filter.as_ref(),
2756                        prop_manager,
2757                        params,
2758                        ctx,
2759                    )
2760                    .await
2761                }
2762                LogicalPlan::SubqueryCall { input, subquery } => {
2763                    let input_rows = self
2764                        .execute_subplan(*input, prop_manager, params, ctx)
2765                        .await?;
2766                    // Execute subquery for each input row (correlated)
2767                    // No input_filter for CALL { }
2768                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2769                        .await
2770                }
2771                LogicalPlan::RecursiveCTE {
2772                    cte_name,
2773                    initial,
2774                    recursive,
2775                } => {
2776                    self.execute_recursive_cte(
2777                        &cte_name,
2778                        *initial,
2779                        *recursive,
2780                        prop_manager,
2781                        params,
2782                        ctx,
2783                    )
2784                    .await
2785                }
2786                LogicalPlan::CrossJoin { left, right } => {
2787                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2788                        .await
2789                }
2790                LogicalPlan::Set { .. }
2791                | LogicalPlan::Remove { .. }
2792                | LogicalPlan::Merge { .. }
2793                | LogicalPlan::Create { .. }
2794                | LogicalPlan::CreateBatch { .. } => {
2795                    unreachable!("mutations are handled by DataFusion engine")
2796                }
2797                LogicalPlan::Delete { .. } => {
2798                    unreachable!("mutations are handled by DataFusion engine")
2799                }
2800                LogicalPlan::Begin => {
2801                    if let Some(writer_lock) = &self.writer {
2802                        let mut writer = writer_lock.write().await;
2803                        writer.begin_transaction()?;
2804                    } else {
2805                        return Err(anyhow!("Transaction requires a Writer"));
2806                    }
2807                    Ok(vec![HashMap::new()])
2808                }
2809                LogicalPlan::Commit => {
2810                    if let Some(writer_lock) = &self.writer {
2811                        let mut writer = writer_lock.write().await;
2812                        writer.commit_transaction().await?;
2813                    } else {
2814                        return Err(anyhow!("Transaction requires a Writer"));
2815                    }
2816                    Ok(vec![HashMap::new()])
2817                }
2818                LogicalPlan::Rollback => {
2819                    if let Some(writer_lock) = &self.writer {
2820                        let mut writer = writer_lock.write().await;
2821                        writer.rollback_transaction()?;
2822                    } else {
2823                        return Err(anyhow!("Transaction requires a Writer"));
2824                    }
2825                    Ok(vec![HashMap::new()])
2826                }
2827                LogicalPlan::Copy {
2828                    target,
2829                    source,
2830                    is_export,
2831                    options,
2832                } => {
2833                    if is_export {
2834                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2835                            .await
2836                    } else {
2837                        self.execute_copy(&target, &source, &options, prop_manager)
2838                            .await
2839                    }
2840                }
2841                LogicalPlan::Backup {
2842                    destination,
2843                    options,
2844                } => self.execute_backup(&destination, &options).await,
2845                LogicalPlan::Explain { plan } => {
2846                    let plan_str = format!("{:#?}", plan);
2847                    let mut row = HashMap::new();
2848                    row.insert("plan".to_string(), Value::String(plan_str));
2849                    Ok(vec![row])
2850                }
2851                LogicalPlan::ShortestPath { .. } => {
2852                    unreachable!("ShortestPath is handled by DataFusion engine")
2853                }
2854                LogicalPlan::AllShortestPaths { .. } => {
2855                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2856                }
2857                LogicalPlan::Foreach { .. } => {
2858                    unreachable!("mutations are handled by DataFusion engine")
2859                }
2860                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2861                LogicalPlan::BindZeroLengthPath { .. } => {
2862                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2863                }
2864                LogicalPlan::BindPath { .. } => {
2865                    unreachable!("BindPath is handled by DataFusion engine")
2866                }
2867                LogicalPlan::QuantifiedPattern { .. } => {
2868                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
2869                }
2870                LogicalPlan::LocyProgram { .. }
2871                | LogicalPlan::LocyFold { .. }
2872                | LogicalPlan::LocyBestBy { .. }
2873                | LogicalPlan::LocyPriority { .. }
2874                | LogicalPlan::LocyDerivedScan { .. }
2875                | LogicalPlan::LocyProject { .. } => {
2876                    unreachable!("Locy operators are handled by DataFusion engine")
2877                }
2878            }
2879        })
2880    }
2881
2882    /// Execute a single plan from a FOREACH body with the given scope.
2883    ///
2884    /// Used by the DataFusion ForeachExec operator to delegate body clause
2885    /// execution back to the executor.
2886    pub(crate) async fn execute_foreach_body_plan(
2887        &self,
2888        plan: LogicalPlan,
2889        scope: &mut HashMap<String, Value>,
2890        writer: &mut uni_store::runtime::writer::Writer,
2891        prop_manager: &PropertyManager,
2892        params: &HashMap<String, Value>,
2893        ctx: Option<&QueryContext>,
2894    ) -> Result<()> {
2895        match plan {
2896            LogicalPlan::Set { items, .. } => {
2897                self.execute_set_items_locked(&items, scope, writer, prop_manager, params, ctx)
2898                    .await?;
2899            }
2900            LogicalPlan::Remove { items, .. } => {
2901                self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx)
2902                    .await?;
2903            }
2904            LogicalPlan::Delete { items, detach, .. } => {
2905                for expr in &items {
2906                    let val = self
2907                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
2908                        .await?;
2909                    self.execute_delete_item_locked(&val, detach, writer)
2910                        .await?;
2911                }
2912            }
2913            LogicalPlan::Create { pattern, .. } => {
2914                self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2915                    .await?;
2916            }
2917            LogicalPlan::CreateBatch { patterns, .. } => {
2918                for pattern in &patterns {
2919                    self.execute_create_pattern(pattern, scope, writer, prop_manager, params, ctx)
2920                        .await?;
2921                }
2922            }
2923            LogicalPlan::Merge {
2924                pattern,
2925                on_match: _,
2926                on_create,
2927                ..
2928            } => {
2929                self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2930                    .await?;
2931                if let Some(on_create_clause) = on_create {
2932                    self.execute_set_items_locked(
2933                        &on_create_clause.items,
2934                        scope,
2935                        writer,
2936                        prop_manager,
2937                        params,
2938                        ctx,
2939                    )
2940                    .await?;
2941                }
2942            }
2943            LogicalPlan::Foreach {
2944                variable,
2945                list,
2946                body,
2947                ..
2948            } => {
2949                let list_val = self
2950                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
2951                    .await?;
2952                let items = match list_val {
2953                    Value::List(arr) => arr,
2954                    Value::Null => return Ok(()),
2955                    _ => return Err(anyhow!("FOREACH requires a list")),
2956                };
2957                for item in items {
2958                    let mut nested_scope = scope.clone();
2959                    nested_scope.insert(variable.clone(), item);
2960                    for nested_plan in &body {
2961                        Box::pin(self.execute_foreach_body_plan(
2962                            nested_plan.clone(),
2963                            &mut nested_scope,
2964                            writer,
2965                            prop_manager,
2966                            params,
2967                            ctx,
2968                        ))
2969                        .await?;
2970                    }
2971                }
2972            }
2973            _ => {
2974                return Err(anyhow!(
2975                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2976                ));
2977            }
2978        }
2979        Ok(())
2980    }
2981
2982    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2983        let mut pairs: Vec<_> = row.iter().collect();
2984        pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2985
2986        pairs
2987            .into_iter()
2988            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2989            .collect::<Vec<_>>()
2990            .join("|")
2991    }
2992
2993    fn canonical_value_key(v: &Value) -> String {
2994        match v {
2995            Value::Null => "null".to_string(),
2996            Value::Bool(b) => format!("b:{b}"),
2997            Value::Int(i) => format!("n:{i}"),
2998            Value::Float(f) => {
2999                if f.is_nan() {
3000                    "nan".to_string()
3001                } else if f.is_infinite() {
3002                    if f.is_sign_positive() {
3003                        "inf:+".to_string()
3004                    } else {
3005                        "inf:-".to_string()
3006                    }
3007                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3008                    format!("n:{}", *f as i64)
3009                } else {
3010                    format!("f:{f}")
3011                }
3012            }
3013            Value::String(s) => {
3014                if let Some(k) = Self::temporal_string_key(s) {
3015                    format!("temporal:{k}")
3016                } else {
3017                    format!("s:{s}")
3018                }
3019            }
3020            Value::Bytes(b) => format!("bytes:{:?}", b),
3021            Value::List(items) => format!(
3022                "list:[{}]",
3023                items
3024                    .iter()
3025                    .map(Self::canonical_value_key)
3026                    .collect::<Vec<_>>()
3027                    .join(",")
3028            ),
3029            Value::Map(map) => {
3030                let mut pairs: Vec<_> = map.iter().collect();
3031                pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3032                format!(
3033                    "map:{{{}}}",
3034                    pairs
3035                        .into_iter()
3036                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3037                        .collect::<Vec<_>>()
3038                        .join(",")
3039                )
3040            }
3041            Value::Node(n) => {
3042                let mut labels = n.labels.clone();
3043                labels.sort();
3044                format!(
3045                    "node:{}:{}:{}",
3046                    n.vid.as_u64(),
3047                    labels.join(":"),
3048                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3049                )
3050            }
3051            Value::Edge(e) => format!(
3052                "edge:{}:{}:{}:{}:{}",
3053                e.eid.as_u64(),
3054                e.edge_type,
3055                e.src.as_u64(),
3056                e.dst.as_u64(),
3057                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3058            ),
3059            Value::Path(p) => format!(
3060                "path:nodes=[{}];edges=[{}]",
3061                p.nodes
3062                    .iter()
3063                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3064                    .collect::<Vec<_>>()
3065                    .join(","),
3066                p.edges
3067                    .iter()
3068                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3069                    .collect::<Vec<_>>()
3070                    .join(",")
3071            ),
3072            Value::Vector(vs) => format!("vec:{:?}", vs),
3073            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3074            _ => format!("{v:?}"),
3075        }
3076    }
3077
3078    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3079        match t {
3080            uni_common::TemporalValue::Date { days_since_epoch } => {
3081                format!("date:{days_since_epoch}")
3082            }
3083            uni_common::TemporalValue::LocalTime {
3084                nanos_since_midnight,
3085            } => format!("localtime:{nanos_since_midnight}"),
3086            uni_common::TemporalValue::Time {
3087                nanos_since_midnight,
3088                offset_seconds,
3089            } => {
3090                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3091                format!("time:{utc_nanos}")
3092            }
3093            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3094                format!("localdatetime:{nanos_since_epoch}")
3095            }
3096            uni_common::TemporalValue::DateTime {
3097                nanos_since_epoch, ..
3098            } => format!("datetime:{nanos_since_epoch}"),
3099            uni_common::TemporalValue::Duration {
3100                months,
3101                days,
3102                nanos,
3103            } => format!("duration:{months}:{days}:{nanos}"),
3104        }
3105    }
3106
3107    fn temporal_string_key(s: &str) -> Option<String> {
3108        let fn_name = match classify_temporal(s)? {
3109            uni_common::TemporalType::Date => "DATE",
3110            uni_common::TemporalType::LocalTime => "LOCALTIME",
3111            uni_common::TemporalType::Time => "TIME",
3112            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3113            uni_common::TemporalType::DateTime => "DATETIME",
3114            uni_common::TemporalType::Duration => "DURATION",
3115        };
3116        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3117            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3118            _ => None,
3119        }
3120    }
3121
3122    /// Execute aggregate operation: GROUP BY + aggregate functions.
3123    /// Interval for timeout checks in aggregate loops.
3124    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3125
3126    pub(crate) async fn execute_aggregate(
3127        &self,
3128        rows: Vec<HashMap<String, Value>>,
3129        group_by: &[Expr],
3130        aggregates: &[Expr],
3131        prop_manager: &PropertyManager,
3132        params: &HashMap<String, Value>,
3133        ctx: Option<&QueryContext>,
3134    ) -> Result<Vec<HashMap<String, Value>>> {
3135        // CWE-400: Check timeout before aggregation
3136        if let Some(ctx) = ctx {
3137            ctx.check_timeout()?;
3138        }
3139
3140        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3141
3142        // Cypher semantics: aggregation without grouping keys returns one row even
3143        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3144        if rows.is_empty() {
3145            if group_by.is_empty() {
3146                let accs = Self::create_accumulators(aggregates);
3147                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3148                return Ok(vec![row]);
3149            }
3150            return Ok(vec![]);
3151        }
3152
3153        for (idx, row) in rows.into_iter().enumerate() {
3154            // Periodic timeout check during aggregation
3155            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3156                && let Some(ctx) = ctx
3157            {
3158                ctx.check_timeout()?;
3159            }
3160
3161            let key_vals = self
3162                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3163                .await?;
3164            // Build a canonical key so grouping follows Cypher value semantics
3165            // (e.g. temporal equality by instant, numeric normalization where applicable).
3166            let key_str = format!(
3167                "[{}]",
3168                key_vals
3169                    .iter()
3170                    .map(Self::canonical_value_key)
3171                    .collect::<Vec<_>>()
3172                    .join(",")
3173            );
3174
3175            let entry = groups
3176                .entry(key_str)
3177                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3178
3179            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3180                .await?;
3181        }
3182
3183        let results = groups
3184            .values()
3185            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3186            .collect();
3187
3188        Ok(results)
3189    }
3190
3191    pub(crate) async fn execute_window(
3192        &self,
3193        mut rows: Vec<HashMap<String, Value>>,
3194        window_exprs: &[Expr],
3195        _prop_manager: &PropertyManager,
3196        _params: &HashMap<String, Value>,
3197        ctx: Option<&QueryContext>,
3198    ) -> Result<Vec<HashMap<String, Value>>> {
3199        // CWE-400: Check timeout before window computation
3200        if let Some(ctx) = ctx {
3201            ctx.check_timeout()?;
3202        }
3203
3204        // If no rows or no window expressions, return as-is
3205        if rows.is_empty() || window_exprs.is_empty() {
3206            return Ok(rows);
3207        }
3208
3209        // Process each window function expression
3210        for window_expr in window_exprs {
3211            // Extract window function details
3212            let Expr::FunctionCall {
3213                name,
3214                args,
3215                window_spec: Some(window_spec),
3216                ..
3217            } = window_expr
3218            else {
3219                return Err(anyhow!(
3220                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3221                    window_expr
3222                ));
3223            };
3224
3225            let name_upper = name.to_uppercase();
3226
3227            // Validate it's a supported window function
3228            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3229                return Err(anyhow!(
3230                    "Unsupported window function: {}. Supported functions: {}",
3231                    name,
3232                    WINDOW_FUNCTIONS.join(", ")
3233                ));
3234            }
3235
3236            // Build partition groups based on PARTITION BY clause
3237            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3238
3239            for (row_idx, row) in rows.iter().enumerate() {
3240                // Evaluate partition key
3241                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3242                    // No partitioning - all rows in one partition
3243                    vec![]
3244                } else {
3245                    window_spec
3246                        .partition_by
3247                        .iter()
3248                        .map(|expr| self.evaluate_simple_expr(expr, row))
3249                        .collect::<Result<Vec<_>>>()?
3250                };
3251
3252                partition_map
3253                    .entry(partition_key)
3254                    .or_default()
3255                    .push(row_idx);
3256            }
3257
3258            // Process each partition
3259            for (_partition_key, row_indices) in partition_map.iter_mut() {
3260                // Sort rows within partition by ORDER BY clause
3261                if !window_spec.order_by.is_empty() {
3262                    row_indices.sort_by(|&a, &b| {
3263                        for sort_item in &window_spec.order_by {
3264                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3265                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3266
3267                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3268                                let cmp = Executor::compare_values(&va, &vb);
3269                                let cmp = if sort_item.ascending {
3270                                    cmp
3271                                } else {
3272                                    cmp.reverse()
3273                                };
3274                                if cmp != std::cmp::Ordering::Equal {
3275                                    return cmp;
3276                                }
3277                            }
3278                        }
3279                        std::cmp::Ordering::Equal
3280                    });
3281                }
3282
3283                // Compute window function values for this partition
3284                for (position, &row_idx) in row_indices.iter().enumerate() {
3285                    let window_value = match name_upper.as_str() {
3286                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3287                        "RANK" => {
3288                            // RANK: position (1-indexed) of first row in group of tied rows
3289                            let rank = if position == 0 {
3290                                1i64
3291                            } else {
3292                                let prev_row_idx = row_indices[position - 1];
3293                                let same_as_prev = self.rows_have_same_sort_keys(
3294                                    &window_spec.order_by,
3295                                    &rows,
3296                                    row_idx,
3297                                    prev_row_idx,
3298                                );
3299
3300                                if same_as_prev {
3301                                    // Walk backwards to find where this group started
3302                                    let mut group_start = position - 1;
3303                                    while group_start > 0 {
3304                                        let curr_idx = row_indices[group_start];
3305                                        let prev_idx = row_indices[group_start - 1];
3306                                        if !self.rows_have_same_sort_keys(
3307                                            &window_spec.order_by,
3308                                            &rows,
3309                                            curr_idx,
3310                                            prev_idx,
3311                                        ) {
3312                                            break;
3313                                        }
3314                                        group_start -= 1;
3315                                    }
3316                                    (group_start + 1) as i64
3317                                } else {
3318                                    (position + 1) as i64
3319                                }
3320                            };
3321                            Value::from(rank)
3322                        }
3323                        "DENSE_RANK" => {
3324                            // Dense rank: continuous ranking without gaps
3325                            let mut dense_rank = 1i64;
3326                            for i in 0..position {
3327                                let curr_idx = row_indices[i + 1];
3328                                let prev_idx = row_indices[i];
3329                                if !self.rows_have_same_sort_keys(
3330                                    &window_spec.order_by,
3331                                    &rows,
3332                                    curr_idx,
3333                                    prev_idx,
3334                                ) {
3335                                    dense_rank += 1;
3336                                }
3337                            }
3338                            Value::from(dense_rank)
3339                        }
3340                        "LAG" => {
3341                            let (value_expr, offset, default_value) =
3342                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3343
3344                            if position >= offset {
3345                                let target_idx = row_indices[position - offset];
3346                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3347                            } else {
3348                                default_value
3349                            }
3350                        }
3351                        "LEAD" => {
3352                            let (value_expr, offset, default_value) =
3353                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3354
3355                            if position + offset < row_indices.len() {
3356                                let target_idx = row_indices[position + offset];
3357                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3358                            } else {
3359                                default_value
3360                            }
3361                        }
3362                        "NTILE" => {
3363                            // Extract num_buckets argument: NTILE(num_buckets)
3364                            let num_buckets_expr = args.first().ok_or_else(|| {
3365                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3366                            })?;
3367                            let num_buckets_val =
3368                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3369                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3370                                anyhow!(
3371                                    "NTILE argument must be an integer, got: {:?}",
3372                                    num_buckets_val
3373                                )
3374                            })?;
3375
3376                            if num_buckets <= 0 {
3377                                return Err(anyhow!(
3378                                    "NTILE bucket count must be positive, got: {}",
3379                                    num_buckets
3380                                ));
3381                            }
3382
3383                            let num_buckets = num_buckets as usize;
3384                            let partition_size = row_indices.len();
3385
3386                            // Calculate bucket assignment using standard algorithm
3387                            // For N rows and B buckets:
3388                            // - Base size: N / B
3389                            // - Extra rows: N % B (go to first buckets)
3390                            let base_size = partition_size / num_buckets;
3391                            let extra_rows = partition_size % num_buckets;
3392
3393                            // Determine bucket for current row
3394                            let bucket = if position < extra_rows * (base_size + 1) {
3395                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3396                                position / (base_size + 1) + 1
3397                            } else {
3398                                // Row is in one of the normal-sized buckets
3399                                let adjusted_position = position - extra_rows * (base_size + 1);
3400                                extra_rows + (adjusted_position / base_size) + 1
3401                            };
3402
3403                            Value::from(bucket as i64)
3404                        }
3405                        "FIRST_VALUE" => {
3406                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3407                            let value_expr = args.first().ok_or_else(|| {
3408                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3409                            })?;
3410
3411                            // Get the first row in the partition (after ordering)
3412                            if row_indices.is_empty() {
3413                                Value::Null
3414                            } else {
3415                                let first_idx = row_indices[0];
3416                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3417                            }
3418                        }
3419                        "LAST_VALUE" => {
3420                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3421                            let value_expr = args.first().ok_or_else(|| {
3422                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3423                            })?;
3424
3425                            // Get the last row in the partition (after ordering)
3426                            if row_indices.is_empty() {
3427                                Value::Null
3428                            } else {
3429                                let last_idx = row_indices[row_indices.len() - 1];
3430                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3431                            }
3432                        }
3433                        "NTH_VALUE" => {
3434                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3435                            if args.len() != 2 {
3436                                return Err(anyhow!(
3437                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3438                                ));
3439                            }
3440
3441                            let value_expr = &args[0];
3442                            let n_expr = &args[1];
3443
3444                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3445                            let n = n_val.as_i64().ok_or_else(|| {
3446                                anyhow!(
3447                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3448                                    n_val
3449                                )
3450                            })?;
3451
3452                            if n <= 0 {
3453                                return Err(anyhow!(
3454                                    "NTH_VALUE position must be positive, got: {}",
3455                                    n
3456                                ));
3457                            }
3458
3459                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3460                            if nth_index < row_indices.len() {
3461                                let nth_idx = row_indices[nth_index];
3462                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3463                            } else {
3464                                Value::Null
3465                            }
3466                        }
3467                        _ => unreachable!("Window function {} already validated", name),
3468                    };
3469
3470                    // Add window function result to row
3471                    // Use the window expression's string representation as the column name
3472                    let col_name = window_expr.to_string_repr();
3473                    rows[row_idx].insert(col_name, window_value);
3474                }
3475            }
3476        }
3477
3478        Ok(rows)
3479    }
3480
3481    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3482    ///
3483    /// Uses `&self` for consistency with other evaluation methods, though it only
3484    /// recurses for property access.
3485    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3486        match expr {
3487            Expr::Variable(name) => row
3488                .get(name)
3489                .cloned()
3490                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3491            Expr::Property(base, prop) => {
3492                let base_val = self.evaluate_simple_expr(base, row)?;
3493                if let Value::Map(map) = base_val {
3494                    map.get(prop)
3495                        .cloned()
3496                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3497                } else {
3498                    Err(anyhow!("Cannot access property on non-object"))
3499                }
3500            }
3501            Expr::Literal(lit) => Ok(lit.to_value()),
3502            _ => Err(anyhow!(
3503                "Unsupported expression in window function: {:?}",
3504                expr
3505            )),
3506        }
3507    }
3508
3509    /// Check if two rows have matching sort keys for ranking functions.
3510    fn rows_have_same_sort_keys(
3511        &self,
3512        order_by: &[uni_cypher::ast::SortItem],
3513        rows: &[HashMap<String, Value>],
3514        idx_a: usize,
3515        idx_b: usize,
3516    ) -> bool {
3517        order_by.iter().all(|sort_item| {
3518            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3519            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3520            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3521        })
3522    }
3523
3524    /// Extract offset and default value for LAG/LEAD window functions.
3525    fn extract_lag_lead_params<'a>(
3526        &self,
3527        func_name: &str,
3528        args: &'a [Expr],
3529        row: &HashMap<String, Value>,
3530    ) -> Result<(&'a Expr, usize, Value)> {
3531        let value_expr = args.first().ok_or_else(|| {
3532            anyhow!(
3533                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3534                func_name,
3535                func_name
3536            )
3537        })?;
3538
3539        let offset = if let Some(offset_expr) = args.get(1) {
3540            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3541            offset_val.as_i64().ok_or_else(|| {
3542                anyhow!(
3543                    "{} offset must be an integer, got: {:?}",
3544                    func_name,
3545                    offset_val
3546                )
3547            })? as usize
3548        } else {
3549            1
3550        };
3551
3552        let default_value = if let Some(default_expr) = args.get(2) {
3553            self.evaluate_simple_expr(default_expr, row)?
3554        } else {
3555            Value::Null
3556        };
3557
3558        Ok((value_expr, offset, default_value))
3559    }
3560
3561    /// Evaluate group-by key expressions for a row.
3562    pub(crate) async fn evaluate_group_keys(
3563        &self,
3564        group_by: &[Expr],
3565        row: &HashMap<String, Value>,
3566        prop_manager: &PropertyManager,
3567        params: &HashMap<String, Value>,
3568        ctx: Option<&QueryContext>,
3569    ) -> Result<Vec<Value>> {
3570        let mut key_vals = Vec::new();
3571        for expr in group_by {
3572            key_vals.push(
3573                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3574                    .await?,
3575            );
3576        }
3577        Ok(key_vals)
3578    }
3579
3580    /// Update accumulators with values from the current row.
3581    pub(crate) async fn update_accumulators(
3582        &self,
3583        accs: &mut [Accumulator],
3584        aggregates: &[Expr],
3585        row: &HashMap<String, Value>,
3586        prop_manager: &PropertyManager,
3587        params: &HashMap<String, Value>,
3588        ctx: Option<&QueryContext>,
3589    ) -> Result<()> {
3590        for (i, agg_expr) in aggregates.iter().enumerate() {
3591            if let Expr::FunctionCall { args, .. } = agg_expr {
3592                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3593                let val = if is_wildcard {
3594                    Value::Null
3595                } else {
3596                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3597                        .await?
3598                };
3599                accs[i].update(&val, is_wildcard);
3600            }
3601        }
3602        Ok(())
3603    }
3604
3605    /// Execute sort operation with ORDER BY clauses.
3606    pub(crate) async fn execute_recursive_cte(
3607        &self,
3608        cte_name: &str,
3609        initial: LogicalPlan,
3610        recursive: LogicalPlan,
3611        prop_manager: &PropertyManager,
3612        params: &HashMap<String, Value>,
3613        ctx: Option<&QueryContext>,
3614    ) -> Result<Vec<HashMap<String, Value>>> {
3615        use std::collections::HashSet;
3616
3617        // Helper to create a stable key for cycle detection.
3618        // Uses sorted keys to ensure consistent ordering.
3619        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3620            let mut pairs: Vec<_> = row.iter().collect();
3621            pairs.sort_by(|a, b| a.0.cmp(b.0));
3622            format!("{:?}", pairs)
3623        }
3624
3625        // 1. Execute Anchor
3626        let mut working_table = self
3627            .execute_subplan(initial, prop_manager, params, ctx)
3628            .await?;
3629        let mut result_table = working_table.clone();
3630
3631        // Track seen rows for cycle detection
3632        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3633
3634        // 2. Loop
3635        // Safety: Max iterations to prevent infinite loop
3636        // TODO: expose this via UniConfig for user control
3637        let max_iterations = 1000;
3638        for _iteration in 0..max_iterations {
3639            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3640            if let Some(ctx) = ctx {
3641                ctx.check_timeout()?;
3642            }
3643
3644            if working_table.is_empty() {
3645                break;
3646            }
3647
3648            // Bind working table to CTE name in params
3649            let working_val = Value::List(
3650                working_table
3651                    .iter()
3652                    .map(|row| {
3653                        if row.len() == 1 {
3654                            row.values().next().unwrap().clone()
3655                        } else {
3656                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3657                        }
3658                    })
3659                    .collect(),
3660            );
3661
3662            let mut next_params = params.clone();
3663            next_params.insert(cte_name.to_string(), working_val);
3664
3665            // Execute recursive part
3666            let next_result = self
3667                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3668                .await?;
3669
3670            if next_result.is_empty() {
3671                break;
3672            }
3673
3674            // Filter out already-seen rows (cycle detection)
3675            let new_rows: Vec<_> = next_result
3676                .into_iter()
3677                .filter(|row| {
3678                    let key = row_key(row);
3679                    seen.insert(key) // Returns false if already present
3680                })
3681                .collect();
3682
3683            if new_rows.is_empty() {
3684                // All results were cycles - terminate
3685                break;
3686            }
3687
3688            result_table.extend(new_rows.clone());
3689            working_table = new_rows;
3690        }
3691
3692        // Output accumulated results as a variable
3693        let final_list = Value::List(
3694            result_table
3695                .into_iter()
3696                .map(|row| {
3697                    // If the CTE returns a single column and we want to treat it as a list of values?
3698                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3699                    // Cypher LISTs usually contain values.
3700                    // If the row has 1 column, maybe unwrap?
3701                    // But SQL CTEs are tables.
3702                    // Let's stick to List<Map> for consistency with how we pass it in.
3703                    // UNLESS the user extracts it.
3704                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3705                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3706                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3707                    // If row has 1 col, users expect `[val, val]`.
3708                    if row.len() == 1 {
3709                        row.values().next().unwrap().clone()
3710                    } else {
3711                        Value::Map(row.into_iter().collect())
3712                    }
3713                })
3714                .collect(),
3715        );
3716
3717        let mut final_row = HashMap::new();
3718        final_row.insert(cte_name.to_string(), final_list);
3719        Ok(vec![final_row])
3720    }
3721
3722    /// Interval for timeout checks in sort loops.
3723    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3724
3725    pub(crate) async fn execute_sort(
3726        &self,
3727        rows: Vec<HashMap<String, Value>>,
3728        order_by: &[uni_cypher::ast::SortItem],
3729        prop_manager: &PropertyManager,
3730        params: &HashMap<String, Value>,
3731        ctx: Option<&QueryContext>,
3732    ) -> Result<Vec<HashMap<String, Value>>> {
3733        // CWE-400: Check timeout before potentially expensive sort
3734        if let Some(ctx) = ctx {
3735            ctx.check_timeout()?;
3736        }
3737
3738        let mut rows_with_keys = Vec::with_capacity(rows.len());
3739        for (idx, row) in rows.into_iter().enumerate() {
3740            // Periodic timeout check during key extraction
3741            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3742                && let Some(ctx) = ctx
3743            {
3744                ctx.check_timeout()?;
3745            }
3746
3747            let mut keys = Vec::new();
3748            for item in order_by {
3749                let val = row
3750                    .get(&item.expr.to_string_repr())
3751                    .cloned()
3752                    .unwrap_or(Value::Null);
3753                let val = if val.is_null() {
3754                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3755                        .await
3756                        .unwrap_or(Value::Null)
3757                } else {
3758                    val
3759                };
3760                keys.push(val);
3761            }
3762            rows_with_keys.push((row, keys));
3763        }
3764
3765        // Check timeout again before synchronous sort (can't be interrupted)
3766        if let Some(ctx) = ctx {
3767            ctx.check_timeout()?;
3768        }
3769
3770        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3771
3772        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3773    }
3774
3775    /// Create accumulators for aggregate expressions.
3776    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3777        aggregates
3778            .iter()
3779            .map(|expr| {
3780                if let Expr::FunctionCall { name, distinct, .. } = expr {
3781                    Accumulator::new(name, *distinct)
3782                } else {
3783                    Accumulator::new("COUNT", false)
3784                }
3785            })
3786            .collect()
3787    }
3788
3789    /// Build result row from group-by keys and accumulators.
3790    pub(crate) fn build_aggregate_result(
3791        group_by: &[Expr],
3792        aggregates: &[Expr],
3793        key_vals: &[Value],
3794        accs: &[Accumulator],
3795    ) -> HashMap<String, Value> {
3796        let mut res_row = HashMap::new();
3797        for (i, expr) in group_by.iter().enumerate() {
3798            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3799        }
3800        for (i, expr) in aggregates.iter().enumerate() {
3801            // Use aggregate_column_name to ensure consistency with planner
3802            let col_name = crate::query::planner::aggregate_column_name(expr);
3803            res_row.insert(col_name, accs[i].finish());
3804        }
3805        res_row
3806    }
3807
3808    /// Compare and return ordering for sort operation.
3809    pub(crate) fn compare_sort_keys(
3810        a_keys: &[Value],
3811        b_keys: &[Value],
3812        order_by: &[uni_cypher::ast::SortItem],
3813    ) -> std::cmp::Ordering {
3814        for (i, item) in order_by.iter().enumerate() {
3815            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3816            if order != std::cmp::Ordering::Equal {
3817                return if item.ascending {
3818                    order
3819                } else {
3820                    order.reverse()
3821                };
3822            }
3823        }
3824        std::cmp::Ordering::Equal
3825    }
3826
3827    /// Executes BACKUP command to local or cloud storage.
3828    ///
3829    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
3830    pub(crate) async fn execute_backup(
3831        &self,
3832        destination: &str,
3833        _options: &HashMap<String, Value>,
3834    ) -> Result<Vec<HashMap<String, Value>>> {
3835        // 1. Flush L0
3836        if let Some(writer_arc) = &self.writer {
3837            let mut writer = writer_arc.write().await;
3838            writer.flush_to_l1(None).await?;
3839        }
3840
3841        // 2. Snapshot
3842        let snapshot_manager = self.storage.snapshot_manager();
3843        let snapshot = snapshot_manager
3844            .load_latest_snapshot()
3845            .await?
3846            .ok_or_else(|| anyhow!("No snapshot found"))?;
3847
3848        // 3. Copy files - cloud or local path
3849        if is_cloud_url(destination) {
3850            self.backup_to_cloud(destination, &snapshot.snapshot_id)
3851                .await?;
3852        } else {
3853            // Validate local destination path against sandbox
3854            let validated_dest = self.validate_path(destination)?;
3855            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3856                .await?;
3857        }
3858
3859        let mut res = HashMap::new();
3860        res.insert(
3861            "status".to_string(),
3862            Value::String("Backup completed".to_string()),
3863        );
3864        res.insert(
3865            "snapshot_id".to_string(),
3866            Value::String(snapshot.snapshot_id),
3867        );
3868        Ok(vec![res])
3869    }
3870
3871    /// Backs up database to a local filesystem destination.
3872    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3873        let source_path = std::path::Path::new(self.storage.base_path());
3874
3875        if !dest_path.exists() {
3876            std::fs::create_dir_all(dest_path)?;
3877        }
3878
3879        // Recursive copy (local to local)
3880        if source_path.exists() {
3881            Self::copy_dir_all(source_path, dest_path)?;
3882        }
3883
3884        // Copy schema to destination/catalog/schema.json
3885        let schema_manager = self.storage.schema_manager();
3886        let dest_catalog = dest_path.join("catalog");
3887        if !dest_catalog.exists() {
3888            std::fs::create_dir_all(&dest_catalog)?;
3889        }
3890
3891        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3892        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3893
3894        Ok(())
3895    }
3896
3897    /// Backs up database to a cloud storage destination.
3898    ///
3899    /// Streams data from source to destination, supporting cross-cloud backups.
3900    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3901        use object_store::ObjectStore;
3902        use object_store::local::LocalFileSystem;
3903        use object_store::path::Path as ObjPath;
3904
3905        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3906        let source_path = std::path::Path::new(self.storage.base_path());
3907
3908        // Create local store for source, coerced to dyn ObjectStore
3909        let src_store: Arc<dyn ObjectStore> =
3910            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3911
3912        // Copy catalog/ directory
3913        let catalog_src = ObjPath::from("catalog");
3914        let catalog_dst = if dest_prefix.as_ref().is_empty() {
3915            ObjPath::from("catalog")
3916        } else {
3917            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3918        };
3919        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3920
3921        // Copy storage/ directory
3922        let storage_src = ObjPath::from("storage");
3923        let storage_dst = if dest_prefix.as_ref().is_empty() {
3924            ObjPath::from("storage")
3925        } else {
3926            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3927        };
3928        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3929
3930        // Ensure schema is present at canonical catalog location.
3931        let schema_manager = self.storage.schema_manager();
3932        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3933        let schema_path = if dest_prefix.as_ref().is_empty() {
3934            ObjPath::from("catalog/schema.json")
3935        } else {
3936            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3937        };
3938        dest_store
3939            .put(&schema_path, bytes::Bytes::from(schema_content).into())
3940            .await?;
3941
3942        Ok(())
3943    }
3944
3945    /// Maximum directory depth for backup operations.
3946    ///
3947    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
3948    /// excessively deep directory structures.
3949    const MAX_BACKUP_DEPTH: usize = 100;
3950
3951    /// Maximum file count for backup operations.
3952    ///
3953    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
3954    /// long-running operations from malicious or unexpectedly large directories.
3955    const MAX_BACKUP_FILES: usize = 100_000;
3956
3957    /// Recursively copies a directory with security limits.
3958    ///
3959    /// # Security
3960    ///
3961    /// - **CWE-674**: Depth limit prevents stack overflow
3962    /// - **CWE-400**: File count limit prevents resource exhaustion
3963    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
3964    pub(crate) fn copy_dir_all(
3965        src: &std::path::Path,
3966        dst: &std::path::Path,
3967    ) -> std::io::Result<()> {
3968        let mut file_count = 0usize;
3969        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3970    }
3971
3972    /// Internal implementation with depth and file count tracking.
3973    pub(crate) fn copy_dir_all_impl(
3974        src: &std::path::Path,
3975        dst: &std::path::Path,
3976        depth: usize,
3977        file_count: &mut usize,
3978    ) -> std::io::Result<()> {
3979        if depth >= Self::MAX_BACKUP_DEPTH {
3980            return Err(std::io::Error::new(
3981                std::io::ErrorKind::InvalidInput,
3982                format!(
3983                    "Maximum backup depth {} exceeded at {:?}",
3984                    Self::MAX_BACKUP_DEPTH,
3985                    src
3986                ),
3987            ));
3988        }
3989
3990        std::fs::create_dir_all(dst)?;
3991
3992        for entry in std::fs::read_dir(src)? {
3993            if *file_count >= Self::MAX_BACKUP_FILES {
3994                return Err(std::io::Error::new(
3995                    std::io::ErrorKind::InvalidInput,
3996                    format!(
3997                        "Maximum backup file count {} exceeded",
3998                        Self::MAX_BACKUP_FILES
3999                    ),
4000                ));
4001            }
4002            *file_count += 1;
4003
4004            let entry = entry?;
4005            let metadata = entry.metadata()?;
4006
4007            // Skip symlinks to prevent loops and traversal attacks
4008            if metadata.file_type().is_symlink() {
4009                // Silently skip - logging would require tracing dependency
4010                continue;
4011            }
4012
4013            let dst_path = dst.join(entry.file_name());
4014            if metadata.is_dir() {
4015                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4016            } else {
4017                std::fs::copy(entry.path(), dst_path)?;
4018            }
4019        }
4020        Ok(())
4021    }
4022
4023    pub(crate) async fn execute_copy(
4024        &self,
4025        target: &str,
4026        source: &str,
4027        options: &HashMap<String, Value>,
4028        prop_manager: &PropertyManager,
4029    ) -> Result<Vec<HashMap<String, Value>>> {
4030        let format = options
4031            .get("format")
4032            .and_then(|v| v.as_str())
4033            .unwrap_or_else(|| {
4034                if source.ends_with(".parquet") {
4035                    "parquet"
4036                } else {
4037                    "csv"
4038                }
4039            });
4040
4041        match format.to_lowercase().as_str() {
4042            "csv" => self.execute_csv_import(target, source, options).await,
4043            "parquet" => {
4044                self.execute_parquet_import(target, source, options, prop_manager)
4045                    .await
4046            }
4047            _ => Err(anyhow!("Unsupported format: {}", format)),
4048        }
4049    }
4050
4051    pub(crate) async fn execute_csv_import(
4052        &self,
4053        target: &str,
4054        source: &str,
4055        options: &HashMap<String, Value>,
4056    ) -> Result<Vec<HashMap<String, Value>>> {
4057        // Validate source path against sandbox
4058        let validated_source = self.validate_path(source)?;
4059
4060        let writer_lock = self
4061            .writer
4062            .as_ref()
4063            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4064
4065        let schema = self.storage.schema_manager().schema();
4066
4067        // 1. Determine if target is Label or EdgeType
4068        let label_meta = schema.labels.get(target);
4069        let edge_meta = schema.edge_types.get(target);
4070
4071        if label_meta.is_none() && edge_meta.is_none() {
4072            return Err(anyhow!("Target '{}' not found in schema", target));
4073        }
4074
4075        // 2. Open CSV
4076        let delimiter_str = options
4077            .get("delimiter")
4078            .and_then(|v| v.as_str())
4079            .unwrap_or(",");
4080        let delimiter = if delimiter_str.is_empty() {
4081            b','
4082        } else {
4083            delimiter_str.as_bytes()[0]
4084        };
4085        let has_header = options
4086            .get("header")
4087            .and_then(|v| v.as_bool())
4088            .unwrap_or(true);
4089
4090        let mut rdr = csv::ReaderBuilder::new()
4091            .delimiter(delimiter)
4092            .has_headers(has_header)
4093            .from_path(&validated_source)?;
4094
4095        let headers = rdr.headers()?.clone();
4096        let mut count = 0;
4097
4098        let mut writer = writer_lock.write().await;
4099
4100        if label_meta.is_some() {
4101            let target_props = schema
4102                .properties
4103                .get(target)
4104                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4105
4106            for result in rdr.records() {
4107                let record = result?;
4108                let mut props = HashMap::new();
4109
4110                for (i, header) in headers.iter().enumerate() {
4111                    if let Some(val_str) = record.get(i)
4112                        && let Some(prop_meta) = target_props.get(header)
4113                    {
4114                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4115                        props.insert(header.to_string(), val);
4116                    }
4117                }
4118
4119                let vid = writer.next_vid().await?;
4120                writer
4121                    .insert_vertex_with_labels(vid, props, &[target.to_string()])
4122                    .await?;
4123                count += 1;
4124            }
4125        } else if let Some(meta) = edge_meta {
4126            let type_id = meta.id;
4127            let target_props = schema
4128                .properties
4129                .get(target)
4130                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4131
4132            // For edges, we need src and dst VIDs.
4133            // Expecting columns '_src' and '_dst' or as specified in options.
4134            let src_col = options
4135                .get("src_col")
4136                .and_then(|v| v.as_str())
4137                .unwrap_or("_src");
4138            let dst_col = options
4139                .get("dst_col")
4140                .and_then(|v| v.as_str())
4141                .unwrap_or("_dst");
4142
4143            for result in rdr.records() {
4144                let record = result?;
4145                let mut props = HashMap::new();
4146                let mut src_vid = None;
4147                let mut dst_vid = None;
4148
4149                for (i, header) in headers.iter().enumerate() {
4150                    if let Some(val_str) = record.get(i) {
4151                        if header == src_col {
4152                            src_vid =
4153                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4154                        } else if header == dst_col {
4155                            dst_vid =
4156                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4157                        } else if let Some(prop_meta) = target_props.get(header) {
4158                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4159                            props.insert(header.to_string(), val);
4160                        }
4161                    }
4162                }
4163
4164                let src =
4165                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4166                let dst = dst_vid
4167                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4168
4169                let eid = writer.next_eid(type_id).await?;
4170                writer
4171                    .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4172                    .await?;
4173                count += 1;
4174            }
4175        }
4176
4177        let mut res = HashMap::new();
4178        res.insert("count".to_string(), Value::Int(count as i64));
4179        Ok(vec![res])
4180    }
4181
4182    /// Imports data from Parquet file to a label or edge type.
4183    ///
4184    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4185    pub(crate) async fn execute_parquet_import(
4186        &self,
4187        target: &str,
4188        source: &str,
4189        options: &HashMap<String, Value>,
4190        _prop_manager: &PropertyManager,
4191    ) -> Result<Vec<HashMap<String, Value>>> {
4192        let writer_lock = self
4193            .writer
4194            .as_ref()
4195            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4196
4197        let schema = self.storage.schema_manager().schema();
4198
4199        // 1. Determine if target is Label or EdgeType
4200        let label_meta = schema.labels.get(target);
4201        let edge_meta = schema.edge_types.get(target);
4202
4203        if label_meta.is_none() && edge_meta.is_none() {
4204            return Err(anyhow!("Target '{}' not found in schema", target));
4205        }
4206
4207        // 2. Open Parquet - support both local and cloud URLs
4208        let reader = if is_cloud_url(source) {
4209            self.open_parquet_from_cloud(source).await?
4210        } else {
4211            // Validate local source path against sandbox
4212            let validated_source = self.validate_path(source)?;
4213            let file = std::fs::File::open(&validated_source)?;
4214            let builder =
4215                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4216            builder.build()?
4217        };
4218        let mut reader = reader;
4219
4220        let mut count = 0;
4221        let mut writer = writer_lock.write().await;
4222
4223        if label_meta.is_some() {
4224            let target_props = schema
4225                .properties
4226                .get(target)
4227                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4228
4229            for batch in reader.by_ref() {
4230                let batch = batch?;
4231                for row in 0..batch.num_rows() {
4232                    let mut props = HashMap::new();
4233                    for field in batch.schema().fields() {
4234                        let name = field.name();
4235                        if target_props.contains_key(name) {
4236                            let col = batch.column_by_name(name).unwrap();
4237                            if !col.is_null(row) {
4238                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4239                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4240                                let val =
4241                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4242                                props.insert(name.clone(), val);
4243                            }
4244                        }
4245                    }
4246                    let vid = writer.next_vid().await?;
4247                    writer
4248                        .insert_vertex_with_labels(vid, props, &[target.to_string()])
4249                        .await?;
4250                    count += 1;
4251                }
4252            }
4253        } else if let Some(meta) = edge_meta {
4254            let type_id = meta.id;
4255            let target_props = schema
4256                .properties
4257                .get(target)
4258                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4259
4260            let src_col = options
4261                .get("src_col")
4262                .and_then(|v| v.as_str())
4263                .unwrap_or("_src");
4264            let dst_col = options
4265                .get("dst_col")
4266                .and_then(|v| v.as_str())
4267                .unwrap_or("_dst");
4268
4269            for batch in reader {
4270                let batch = batch?;
4271                for row in 0..batch.num_rows() {
4272                    let mut props = HashMap::new();
4273                    let mut src_vid = None;
4274                    let mut dst_vid = None;
4275
4276                    for field in batch.schema().fields() {
4277                        let name = field.name();
4278                        let col = batch.column_by_name(name).unwrap();
4279                        if col.is_null(row) {
4280                            continue;
4281                        }
4282
4283                        if name == src_col {
4284                            let val = Self::arrow_to_value(col.as_ref(), row);
4285                            src_vid = Some(Self::vid_from_value(&val)?);
4286                        } else if name == dst_col {
4287                            let val = Self::arrow_to_value(col.as_ref(), row);
4288                            dst_vid = Some(Self::vid_from_value(&val)?);
4289                        } else if let Some(pm) = target_props.get(name) {
4290                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4291                            let val =
4292                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4293                            props.insert(name.clone(), val);
4294                        }
4295                    }
4296
4297                    let src = src_vid
4298                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4299                    let dst = dst_vid.ok_or_else(|| {
4300                        anyhow!("Missing destination VID in column '{}'", dst_col)
4301                    })?;
4302
4303                    let eid = writer.next_eid(type_id).await?;
4304                    writer
4305                        .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4306                        .await?;
4307                    count += 1;
4308                }
4309            }
4310        }
4311
4312        let mut res = HashMap::new();
4313        res.insert("count".to_string(), Value::Int(count as i64));
4314        Ok(vec![res])
4315    }
4316
4317    /// Opens a Parquet file from a cloud URL.
4318    ///
4319    /// Downloads the file to memory and creates a Parquet reader.
4320    async fn open_parquet_from_cloud(
4321        &self,
4322        source_url: &str,
4323    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4324        use object_store::ObjectStore;
4325
4326        let (store, path) = build_store_from_url(source_url)?;
4327
4328        // Download file contents
4329        let bytes = store.get(&path).await?.bytes().await?;
4330
4331        // Create a Parquet reader from the bytes
4332        let reader = bytes::Bytes::from(bytes.to_vec());
4333        let builder =
4334            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4335        Ok(builder.build()?)
4336    }
4337
4338    pub(crate) async fn scan_edge_type(
4339        &self,
4340        edge_type: &str,
4341        ctx: Option<&QueryContext>,
4342    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4343        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4344
4345        // 1. Scan L2 (Base)
4346        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4347
4348        // 2. Scan L1 (Delta)
4349        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4350
4351        // 3. Scan L0 (Memory) and filter tombstoned vertices
4352        if let Some(ctx) = ctx {
4353            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4354            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4355        }
4356
4357        Ok(edges
4358            .into_iter()
4359            .map(|(eid, (src, dst))| (eid, src, dst))
4360            .collect())
4361    }
4362
4363    /// Scan L2 (base) storage for edges of a given type.
4364    ///
4365    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4366    /// This L2 scan will typically find no data.
4367    pub(crate) async fn scan_edge_type_l2(
4368        &self,
4369        _edge_type: &str,
4370        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4371    ) -> Result<()> {
4372        // Edges are now stored in delta datasets (L1) via LanceDB.
4373        // Legacy L2 base edge storage is no longer used.
4374        Ok(())
4375    }
4376
4377    /// Scan L1 (delta) storage for edges of a given type.
4378    pub(crate) async fn scan_edge_type_l1(
4379        &self,
4380        edge_type: &str,
4381        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4382    ) -> Result<()> {
4383        use futures::TryStreamExt;
4384        use lancedb::query::{ExecutableQuery, QueryBase, Select};
4385
4386        if let Ok(ds) = self.storage.delta_dataset(edge_type, "fwd") {
4387            let lancedb_store = self.storage.lancedb_store();
4388            if let Ok(table) = ds.open_lancedb(lancedb_store).await {
4389                let query = table.query().select(Select::Columns(vec![
4390                    "eid".into(),
4391                    "src_vid".into(),
4392                    "dst_vid".into(),
4393                    "op".into(),
4394                    "_version".into(),
4395                ]));
4396
4397                if let Ok(stream) = query.execute().await {
4398                    let batches: Vec<arrow_array::RecordBatch> =
4399                        stream.try_collect().await.unwrap_or_default();
4400
4401                    // Collect ops with versions: eid -> (version, op, src, dst)
4402                    let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4403                        HashMap::new();
4404
4405                    for batch in batches {
4406                        self.process_delta_batch(&batch, &mut versioned_ops)?;
4407                    }
4408
4409                    // Apply the winning ops
4410                    for (eid, (_, op, src, dst)) in versioned_ops {
4411                        if op == 0 {
4412                            edges.insert(eid, (src, dst));
4413                        } else if op == 1 {
4414                            edges.remove(&eid);
4415                        }
4416                    }
4417                }
4418            }
4419        }
4420        Ok(())
4421    }
4422
4423    /// Process a delta batch, tracking versioned operations.
4424    pub(crate) fn process_delta_batch(
4425        &self,
4426        batch: &arrow_array::RecordBatch,
4427        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4428    ) -> Result<()> {
4429        use arrow_array::UInt64Array;
4430        let eid_col = batch
4431            .column_by_name("eid")
4432            .ok_or(anyhow!("Missing eid"))?
4433            .as_any()
4434            .downcast_ref::<UInt64Array>()
4435            .ok_or(anyhow!("Invalid eid"))?;
4436        let src_col = batch
4437            .column_by_name("src_vid")
4438            .ok_or(anyhow!("Missing src_vid"))?
4439            .as_any()
4440            .downcast_ref::<UInt64Array>()
4441            .ok_or(anyhow!("Invalid src_vid"))?;
4442        let dst_col = batch
4443            .column_by_name("dst_vid")
4444            .ok_or(anyhow!("Missing dst_vid"))?
4445            .as_any()
4446            .downcast_ref::<UInt64Array>()
4447            .ok_or(anyhow!("Invalid dst_vid"))?;
4448        let op_col = batch
4449            .column_by_name("op")
4450            .ok_or(anyhow!("Missing op"))?
4451            .as_any()
4452            .downcast_ref::<arrow_array::UInt8Array>()
4453            .ok_or(anyhow!("Invalid op"))?;
4454        let version_col = batch
4455            .column_by_name("_version")
4456            .ok_or(anyhow!("Missing _version"))?
4457            .as_any()
4458            .downcast_ref::<UInt64Array>()
4459            .ok_or(anyhow!("Invalid _version"))?;
4460
4461        for i in 0..batch.num_rows() {
4462            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4463            let version = version_col.value(i);
4464            let op = op_col.value(i);
4465            let src = Vid::from(src_col.value(i));
4466            let dst = Vid::from(dst_col.value(i));
4467
4468            match versioned_ops.entry(eid) {
4469                std::collections::hash_map::Entry::Vacant(e) => {
4470                    e.insert((version, op, src, dst));
4471                }
4472                std::collections::hash_map::Entry::Occupied(mut e) => {
4473                    if version > e.get().0 {
4474                        e.insert((version, op, src, dst));
4475                    }
4476                }
4477            }
4478        }
4479        Ok(())
4480    }
4481
4482    /// Scan L0 (memory) buffers for edges of a given type.
4483    pub(crate) fn scan_edge_type_l0(
4484        &self,
4485        edge_type: &str,
4486        ctx: &QueryContext,
4487        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4488    ) {
4489        let schema = self.storage.schema_manager().schema();
4490        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4491
4492        if let Some(type_id) = type_id {
4493            // Main L0
4494            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4495
4496            // Transaction L0
4497            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4498                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4499            }
4500
4501            // Pending flush L0s
4502            for pending_l0_arc in &ctx.pending_flush_l0s {
4503                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4504            }
4505        }
4506    }
4507
4508    /// Scan a single L0 buffer for edges and apply tombstones.
4509    pub(crate) fn scan_single_l0(
4510        &self,
4511        l0: &uni_store::runtime::L0Buffer,
4512        type_id: u32,
4513        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4514    ) {
4515        for edge_entry in l0.graph.edges() {
4516            if edge_entry.edge_type == type_id {
4517                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4518            }
4519        }
4520        // Process Tombstones
4521        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4522        for eid in eids_to_check {
4523            if l0.is_tombstoned(eid) {
4524                edges.remove(&eid);
4525            }
4526        }
4527    }
4528
4529    /// Filter out edges connected to tombstoned vertices.
4530    pub(crate) fn filter_tombstoned_vertex_edges(
4531        &self,
4532        ctx: &QueryContext,
4533        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4534    ) {
4535        let l0 = ctx.l0.read();
4536        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4537
4538        // Include tx_l0 vertex tombstones if present
4539        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4540            let tx_l0 = tx_l0_arc.read();
4541            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4542        }
4543
4544        // Include pending flush L0 vertex tombstones
4545        for pending_l0_arc in &ctx.pending_flush_l0s {
4546            let pending_l0 = pending_l0_arc.read();
4547            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4548        }
4549
4550        edges.retain(|_, (src, dst)| {
4551            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4552        });
4553    }
4554
4555    /// Execute a projection operation.
4556    pub(crate) async fn execute_project(
4557        &self,
4558        input_rows: Vec<HashMap<String, Value>>,
4559        projections: &[(Expr, Option<String>)],
4560        prop_manager: &PropertyManager,
4561        params: &HashMap<String, Value>,
4562        ctx: Option<&QueryContext>,
4563    ) -> Result<Vec<HashMap<String, Value>>> {
4564        let mut results = Vec::new();
4565        for m in input_rows {
4566            let mut row = HashMap::new();
4567            for (expr, alias) in projections {
4568                let val = self
4569                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4570                    .await?;
4571                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4572                row.insert(name, val);
4573            }
4574            results.push(row);
4575        }
4576        Ok(results)
4577    }
4578
4579    /// Execute an UNWIND operation.
4580    pub(crate) async fn execute_unwind(
4581        &self,
4582        input_rows: Vec<HashMap<String, Value>>,
4583        expr: &Expr,
4584        variable: &str,
4585        prop_manager: &PropertyManager,
4586        params: &HashMap<String, Value>,
4587        ctx: Option<&QueryContext>,
4588    ) -> Result<Vec<HashMap<String, Value>>> {
4589        let mut results = Vec::new();
4590        for row in input_rows {
4591            let val = self
4592                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4593                .await?;
4594            if let Value::List(items) = val {
4595                for item in items {
4596                    let mut new_row = row.clone();
4597                    new_row.insert(variable.to_string(), item);
4598                    results.push(new_row);
4599                }
4600            }
4601        }
4602        Ok(results)
4603    }
4604
4605    /// Execute an APPLY (correlated subquery) operation.
4606    pub(crate) async fn execute_apply(
4607        &self,
4608        input_rows: Vec<HashMap<String, Value>>,
4609        subquery: &LogicalPlan,
4610        input_filter: Option<&Expr>,
4611        prop_manager: &PropertyManager,
4612        params: &HashMap<String, Value>,
4613        ctx: Option<&QueryContext>,
4614    ) -> Result<Vec<HashMap<String, Value>>> {
4615        let mut filtered_rows = input_rows;
4616
4617        if let Some(filter) = input_filter {
4618            let mut filtered = Vec::new();
4619            for row in filtered_rows {
4620                let res = self
4621                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4622                    .await?;
4623                if res.as_bool().unwrap_or(false) {
4624                    filtered.push(row);
4625                }
4626            }
4627            filtered_rows = filtered;
4628        }
4629
4630        // Handle empty input: execute subquery once with empty context
4631        // This is critical for standalone CALL statements at the beginning of a query
4632        if filtered_rows.is_empty() {
4633            let sub_rows = self
4634                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4635                .await?;
4636            return Ok(sub_rows);
4637        }
4638
4639        let mut results = Vec::new();
4640        for row in filtered_rows {
4641            let mut sub_params = params.clone();
4642            sub_params.extend(row.clone());
4643
4644            let sub_rows = self
4645                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4646                .await?;
4647
4648            for sub_row in sub_rows {
4649                let mut new_row = row.clone();
4650                new_row.extend(sub_row);
4651                results.push(new_row);
4652            }
4653        }
4654        Ok(results)
4655    }
4656
4657    /// Execute SHOW INDEXES command.
4658    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4659        let schema = self.storage.schema_manager().schema();
4660        let mut rows = Vec::new();
4661        for idx in &schema.indexes {
4662            let (name, type_str, details) = match idx {
4663                uni_common::core::schema::IndexDefinition::Vector(c) => (
4664                    c.name.clone(),
4665                    "VECTOR",
4666                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4667                ),
4668                uni_common::core::schema::IndexDefinition::FullText(c) => (
4669                    c.name.clone(),
4670                    "FULLTEXT",
4671                    format!("on {}:{:?}", c.label, c.properties),
4672                ),
4673                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4674                    cfg.name.clone(),
4675                    "SCALAR",
4676                    format!(":{}({:?})", cfg.label, cfg.properties),
4677                ),
4678                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4679            };
4680
4681            if let Some(f) = filter
4682                && f != type_str
4683            {
4684                continue;
4685            }
4686
4687            let mut row = HashMap::new();
4688            row.insert("name".to_string(), Value::String(name));
4689            row.insert("type".to_string(), Value::String(type_str.to_string()));
4690            row.insert("details".to_string(), Value::String(details));
4691            rows.push(row);
4692        }
4693        rows
4694    }
4695
4696    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4697        let mut row = HashMap::new();
4698        row.insert("name".to_string(), Value::String("uni".to_string()));
4699        // Could add storage path, etc.
4700        vec![row]
4701    }
4702
4703    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4704        // Placeholder as we don't easy access to config struct from here
4705        vec![]
4706    }
4707
4708    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4709        let snapshot = self
4710            .storage
4711            .snapshot_manager()
4712            .load_latest_snapshot()
4713            .await?;
4714        let mut results = Vec::new();
4715
4716        if let Some(snap) = snapshot {
4717            for (label, s) in &snap.vertices {
4718                let mut row = HashMap::new();
4719                row.insert("type".to_string(), Value::String("Label".to_string()));
4720                row.insert("name".to_string(), Value::String(label.clone()));
4721                row.insert("count".to_string(), Value::Int(s.count as i64));
4722                results.push(row);
4723            }
4724            for (edge, s) in &snap.edges {
4725                let mut row = HashMap::new();
4726                row.insert("type".to_string(), Value::String("Edge".to_string()));
4727                row.insert("name".to_string(), Value::String(edge.clone()));
4728                row.insert("count".to_string(), Value::Int(s.count as i64));
4729                results.push(row);
4730            }
4731        }
4732
4733        Ok(results)
4734    }
4735
4736    pub(crate) fn execute_show_constraints(
4737        &self,
4738        clause: ShowConstraints,
4739    ) -> Vec<HashMap<String, Value>> {
4740        let schema = self.storage.schema_manager().schema();
4741        let mut rows = Vec::new();
4742        for c in &schema.constraints {
4743            if let Some(target) = &clause.target {
4744                match (target, &c.target) {
4745                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4746                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4747                        if e1 == e2 => {}
4748                    _ => continue,
4749                }
4750            }
4751
4752            let mut row = HashMap::new();
4753            row.insert("name".to_string(), Value::String(c.name.clone()));
4754            let type_str = match c.constraint_type {
4755                ConstraintType::Unique { .. } => "UNIQUE",
4756                ConstraintType::Exists { .. } => "EXISTS",
4757                ConstraintType::Check { .. } => "CHECK",
4758                _ => "UNKNOWN",
4759            };
4760            row.insert("type".to_string(), Value::String(type_str.to_string()));
4761
4762            let target_str = match &c.target {
4763                ConstraintTarget::Label(l) => format!("(:{})", l),
4764                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4765                _ => "UNKNOWN".to_string(),
4766            };
4767            row.insert("target".to_string(), Value::String(target_str));
4768
4769            rows.push(row);
4770        }
4771        rows
4772    }
4773
4774    /// Execute a MERGE operation.
4775    pub(crate) async fn execute_cross_join(
4776        &self,
4777        left: Box<LogicalPlan>,
4778        right: Box<LogicalPlan>,
4779        prop_manager: &PropertyManager,
4780        params: &HashMap<String, Value>,
4781        ctx: Option<&QueryContext>,
4782    ) -> Result<Vec<HashMap<String, Value>>> {
4783        let left_rows = self
4784            .execute_subplan(*left, prop_manager, params, ctx)
4785            .await?;
4786        let right_rows = self
4787            .execute_subplan(*right, prop_manager, params, ctx)
4788            .await?;
4789
4790        let mut results = Vec::new();
4791        for l in &left_rows {
4792            for r in &right_rows {
4793                let mut combined = l.clone();
4794                combined.extend(r.clone());
4795                results.push(combined);
4796            }
4797        }
4798        Ok(results)
4799    }
4800
4801    /// Execute a UNION operation with optional deduplication.
4802    pub(crate) async fn execute_union(
4803        &self,
4804        left: Box<LogicalPlan>,
4805        right: Box<LogicalPlan>,
4806        all: bool,
4807        prop_manager: &PropertyManager,
4808        params: &HashMap<String, Value>,
4809        ctx: Option<&QueryContext>,
4810    ) -> Result<Vec<HashMap<String, Value>>> {
4811        let mut left_rows = self
4812            .execute_subplan(*left, prop_manager, params, ctx)
4813            .await?;
4814        let mut right_rows = self
4815            .execute_subplan(*right, prop_manager, params, ctx)
4816            .await?;
4817
4818        left_rows.append(&mut right_rows);
4819
4820        if !all {
4821            let mut seen = HashSet::new();
4822            left_rows.retain(|row| {
4823                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4824                let key = format!("{:?}", sorted_row);
4825                seen.insert(key)
4826            });
4827        }
4828        Ok(left_rows)
4829    }
4830
4831    /// Check if an index with the given name exists.
4832    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4833        let schema = self.storage.schema_manager().schema();
4834        schema.indexes.iter().any(|idx| match idx {
4835            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4836            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4837            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4838            _ => false,
4839        })
4840    }
4841
4842    pub(crate) async fn execute_export(
4843        &self,
4844        target: &str,
4845        source: &str,
4846        options: &HashMap<String, Value>,
4847        prop_manager: &PropertyManager,
4848        ctx: Option<&QueryContext>,
4849    ) -> Result<Vec<HashMap<String, Value>>> {
4850        let format = options
4851            .get("format")
4852            .and_then(|v| v.as_str())
4853            .unwrap_or("csv")
4854            .to_lowercase();
4855
4856        match format.as_str() {
4857            "csv" => {
4858                self.execute_csv_export(target, source, options, prop_manager, ctx)
4859                    .await
4860            }
4861            "parquet" => {
4862                self.execute_parquet_export(target, source, options, prop_manager, ctx)
4863                    .await
4864            }
4865            _ => Err(anyhow!("Unsupported export format: {}", format)),
4866        }
4867    }
4868
4869    pub(crate) async fn execute_csv_export(
4870        &self,
4871        target: &str,
4872        source: &str,
4873        options: &HashMap<String, Value>,
4874        prop_manager: &PropertyManager,
4875        ctx: Option<&QueryContext>,
4876    ) -> Result<Vec<HashMap<String, Value>>> {
4877        // Validate destination path against sandbox
4878        let validated_dest = self.validate_path(source)?;
4879
4880        let schema = self.storage.schema_manager().schema();
4881        let label_meta = schema.labels.get(target);
4882        let edge_meta = schema.edge_types.get(target);
4883
4884        if label_meta.is_none() && edge_meta.is_none() {
4885            return Err(anyhow!("Target '{}' not found in schema", target));
4886        }
4887
4888        let delimiter_str = options
4889            .get("delimiter")
4890            .and_then(|v| v.as_str())
4891            .unwrap_or(",");
4892        let delimiter = if delimiter_str.is_empty() {
4893            b','
4894        } else {
4895            delimiter_str.as_bytes()[0]
4896        };
4897        let has_header = options
4898            .get("header")
4899            .and_then(|v| v.as_bool())
4900            .unwrap_or(true);
4901
4902        let mut wtr = csv::WriterBuilder::new()
4903            .delimiter(delimiter)
4904            .from_path(&validated_dest)?;
4905
4906        let mut count = 0;
4907        // Empty properties map for labels/edge types without registered properties
4908        let empty_props = HashMap::new();
4909
4910        if let Some(meta) = label_meta {
4911            let label_id = meta.id;
4912            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4913            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4914            prop_names.sort();
4915
4916            let mut headers = vec!["_vid".to_string()];
4917            headers.extend(prop_names.clone());
4918
4919            if has_header {
4920                wtr.write_record(&headers)?;
4921            }
4922
4923            let vids = self
4924                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4925                .await?;
4926
4927            for vid in vids {
4928                let props = prop_manager
4929                    .get_all_vertex_props_with_ctx(vid, ctx)
4930                    .await?
4931                    .unwrap_or_default();
4932
4933                let mut row = Vec::with_capacity(headers.len());
4934                row.push(vid.to_string());
4935                for p_name in &prop_names {
4936                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4937                    row.push(self.format_csv_value(val));
4938                }
4939                wtr.write_record(&row)?;
4940                count += 1;
4941            }
4942        } else if let Some(meta) = edge_meta {
4943            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4944            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4945            prop_names.sort();
4946
4947            // Headers for Edge: _eid, _src, _dst, _type, ...props
4948            let mut headers = vec![
4949                "_eid".to_string(),
4950                "_src".to_string(),
4951                "_dst".to_string(),
4952                "_type".to_string(),
4953            ];
4954            headers.extend(prop_names.clone());
4955
4956            if has_header {
4957                wtr.write_record(&headers)?;
4958            }
4959
4960            let edges = self.scan_edge_type(target, ctx).await?;
4961
4962            for (eid, src, dst) in edges {
4963                let props = prop_manager
4964                    .get_all_edge_props_with_ctx(eid, ctx)
4965                    .await?
4966                    .unwrap_or_default();
4967
4968                let mut row = Vec::with_capacity(headers.len());
4969                row.push(eid.to_string());
4970                row.push(src.to_string());
4971                row.push(dst.to_string());
4972                row.push(meta.id.to_string());
4973
4974                for p_name in &prop_names {
4975                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4976                    row.push(self.format_csv_value(val));
4977                }
4978                wtr.write_record(&row)?;
4979                count += 1;
4980            }
4981        }
4982
4983        wtr.flush()?;
4984        let mut res = HashMap::new();
4985        res.insert("count".to_string(), Value::Int(count as i64));
4986        Ok(vec![res])
4987    }
4988
4989    /// Exports data to Parquet format.
4990    ///
4991    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4992    pub(crate) async fn execute_parquet_export(
4993        &self,
4994        target: &str,
4995        destination: &str,
4996        _options: &HashMap<String, Value>,
4997        prop_manager: &PropertyManager,
4998        ctx: Option<&QueryContext>,
4999    ) -> Result<Vec<HashMap<String, Value>>> {
5000        let schema_manager = self.storage.schema_manager();
5001        let schema = schema_manager.schema();
5002        let label_meta = schema.labels.get(target);
5003        let edge_meta = schema.edge_types.get(target);
5004
5005        if label_meta.is_none() && edge_meta.is_none() {
5006            return Err(anyhow!("Target '{}' not found in schema", target));
5007        }
5008
5009        let arrow_schema = if label_meta.is_some() {
5010            let dataset = self.storage.vertex_dataset(target)?;
5011            dataset.get_arrow_schema(&schema)?
5012        } else {
5013            // Edge Schema
5014            let dataset = self.storage.edge_dataset(target, "", "")?;
5015            dataset.get_arrow_schema(&schema)?
5016        };
5017
5018        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5019
5020        if let Some(meta) = label_meta {
5021            let label_id = meta.id;
5022            let vids = self
5023                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5024                .await?;
5025
5026            for vid in vids {
5027                let mut props = prop_manager
5028                    .get_all_vertex_props_with_ctx(vid, ctx)
5029                    .await?
5030                    .unwrap_or_default();
5031
5032                props.insert(
5033                    "_vid".to_string(),
5034                    uni_common::Value::Int(vid.as_u64() as i64),
5035                );
5036                if !props.contains_key("_uid") {
5037                    props.insert(
5038                        "_uid".to_string(),
5039                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5040                    );
5041                }
5042                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5043                props.insert("_version".to_string(), uni_common::Value::Int(1));
5044                rows.push(props);
5045            }
5046        } else if edge_meta.is_some() {
5047            let edges = self.scan_edge_type(target, ctx).await?;
5048            for (eid, src, dst) in edges {
5049                let mut props = prop_manager
5050                    .get_all_edge_props_with_ctx(eid, ctx)
5051                    .await?
5052                    .unwrap_or_default();
5053
5054                props.insert(
5055                    "eid".to_string(),
5056                    uni_common::Value::Int(eid.as_u64() as i64),
5057                );
5058                props.insert(
5059                    "src_vid".to_string(),
5060                    uni_common::Value::Int(src.as_u64() as i64),
5061                );
5062                props.insert(
5063                    "dst_vid".to_string(),
5064                    uni_common::Value::Int(dst.as_u64() as i64),
5065                );
5066                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5067                props.insert("_version".to_string(), uni_common::Value::Int(1));
5068                rows.push(props);
5069            }
5070        }
5071
5072        // Write to cloud or local file
5073        if is_cloud_url(destination) {
5074            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5075                .await?;
5076        } else {
5077            // Validate local destination path against sandbox
5078            let validated_dest = self.validate_path(destination)?;
5079            let file = std::fs::File::create(&validated_dest)?;
5080            let mut writer =
5081                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5082
5083            // Write all in one batch for now (simplification)
5084            if !rows.is_empty() {
5085                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5086                writer.write(&batch)?;
5087            }
5088
5089            writer.close()?;
5090        }
5091
5092        let mut res = HashMap::new();
5093        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5094        Ok(vec![res])
5095    }
5096
5097    /// Writes Parquet data to a cloud storage destination.
5098    async fn write_parquet_to_cloud(
5099        &self,
5100        dest_url: &str,
5101        rows: &[HashMap<String, uni_common::Value>],
5102        arrow_schema: &arrow_schema::Schema,
5103    ) -> Result<()> {
5104        use object_store::ObjectStore;
5105
5106        let (store, path) = build_store_from_url(dest_url)?;
5107
5108        // Write to an in-memory buffer
5109        let mut buffer = Vec::new();
5110        {
5111            let mut writer = parquet::arrow::ArrowWriter::try_new(
5112                &mut buffer,
5113                Arc::new(arrow_schema.clone()),
5114                None,
5115            )?;
5116
5117            if !rows.is_empty() {
5118                let batch = self.rows_to_batch(rows, arrow_schema)?;
5119                writer.write(&batch)?;
5120            }
5121
5122            writer.close()?;
5123        }
5124
5125        // Upload to cloud storage
5126        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5127
5128        Ok(())
5129    }
5130
5131    pub(crate) fn rows_to_batch(
5132        &self,
5133        rows: &[HashMap<String, uni_common::Value>],
5134        schema: &arrow_schema::Schema,
5135    ) -> Result<RecordBatch> {
5136        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5137
5138        for field in schema.fields() {
5139            let name = field.name();
5140            let dt = field.data_type();
5141
5142            let values: Vec<uni_common::Value> = rows
5143                .iter()
5144                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5145                .collect();
5146            let array = self.values_to_array(&values, dt)?;
5147            columns.push(array);
5148        }
5149
5150        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5151    }
5152
5153    /// Convert a slice of Values to an Arrow array.
5154    /// Delegates to the shared implementation in arrow_convert module.
5155    pub(crate) fn values_to_array(
5156        &self,
5157        values: &[uni_common::Value],
5158        dt: &arrow_schema::DataType,
5159    ) -> Result<Arc<dyn Array>> {
5160        arrow_convert::values_to_array(values, dt)
5161    }
5162
5163    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5164        match val {
5165            Value::Null => "".to_string(),
5166            Value::String(s) => s,
5167            Value::Int(i) => i.to_string(),
5168            Value::Float(f) => f.to_string(),
5169            Value::Bool(b) => b.to_string(),
5170            _ => format!("{}", val),
5171        }
5172    }
5173
5174    pub(crate) fn parse_csv_value(
5175        &self,
5176        s: &str,
5177        data_type: &uni_common::core::schema::DataType,
5178        prop_name: &str,
5179    ) -> Result<Value> {
5180        if s.is_empty() || s.to_lowercase() == "null" {
5181            return Ok(Value::Null);
5182        }
5183
5184        use uni_common::core::schema::DataType;
5185        match data_type {
5186            DataType::String => Ok(Value::String(s.to_string())),
5187            DataType::Int32 | DataType::Int64 => {
5188                let i = s.parse::<i64>().map_err(|_| {
5189                    anyhow!(
5190                        "Failed to parse integer for property '{}': {}",
5191                        prop_name,
5192                        s
5193                    )
5194                })?;
5195                Ok(Value::Int(i))
5196            }
5197            DataType::Float32 | DataType::Float64 => {
5198                let f = s.parse::<f64>().map_err(|_| {
5199                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5200                })?;
5201                Ok(Value::Float(f))
5202            }
5203            DataType::Bool => {
5204                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5205                    anyhow!(
5206                        "Failed to parse boolean for property '{}': {}",
5207                        prop_name,
5208                        s
5209                    )
5210                })?;
5211                Ok(Value::Bool(b))
5212            }
5213            DataType::CypherValue => {
5214                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5215                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5216                })?;
5217                Ok(Value::from(json_val))
5218            }
5219            DataType::Vector { .. } => {
5220                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5221                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5222                })?;
5223                Ok(Value::Vector(v))
5224            }
5225            _ => Ok(Value::String(s.to_string())),
5226        }
5227    }
5228
5229    pub(crate) async fn detach_delete_vertex(&self, vid: Vid, writer: &mut Writer) -> Result<()> {
5230        let schema = self.storage.schema_manager().schema();
5231        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5232
5233        // 1. Find and delete all outgoing edges
5234        let out_graph = self
5235            .storage
5236            .load_subgraph_cached(
5237                &[vid],
5238                &edge_type_ids,
5239                1,
5240                uni_store::runtime::Direction::Outgoing,
5241                Some(writer.l0_manager.get_current()),
5242            )
5243            .await?;
5244
5245        for edge in out_graph.edges() {
5246            writer
5247                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5248                .await?;
5249        }
5250
5251        // 2. Find and delete all incoming edges
5252        let in_graph = self
5253            .storage
5254            .load_subgraph_cached(
5255                &[vid],
5256                &edge_type_ids,
5257                1,
5258                uni_store::runtime::Direction::Incoming,
5259                Some(writer.l0_manager.get_current()),
5260            )
5261            .await?;
5262
5263        for edge in in_graph.edges() {
5264            writer
5265                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5266                .await?;
5267        }
5268
5269        Ok(())
5270    }
5271
5272    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5273    pub(crate) async fn batch_detach_delete_vertices(
5274        &self,
5275        vids: &[Vid],
5276        labels_per_vid: Vec<Option<Vec<String>>>,
5277        writer: &mut Writer,
5278    ) -> Result<()> {
5279        let schema = self.storage.schema_manager().schema();
5280        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5281
5282        // Load outgoing subgraph for all VIDs in one call.
5283        let out_graph = self
5284            .storage
5285            .load_subgraph_cached(
5286                vids,
5287                &edge_type_ids,
5288                1,
5289                uni_store::runtime::Direction::Outgoing,
5290                Some(writer.l0_manager.get_current()),
5291            )
5292            .await?;
5293
5294        for edge in out_graph.edges() {
5295            writer
5296                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5297                .await?;
5298        }
5299
5300        // Load incoming subgraph for all VIDs in one call.
5301        let in_graph = self
5302            .storage
5303            .load_subgraph_cached(
5304                vids,
5305                &edge_type_ids,
5306                1,
5307                uni_store::runtime::Direction::Incoming,
5308                Some(writer.l0_manager.get_current()),
5309            )
5310            .await?;
5311
5312        for edge in in_graph.edges() {
5313            writer
5314                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5315                .await?;
5316        }
5317
5318        // Delete all vertices.
5319        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5320            writer.delete_vertex(*vid, labels).await?;
5321        }
5322
5323        Ok(())
5324    }
5325}