Skip to main content

uni_query/query/executor/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7    eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14/// Convert a `Value` to `chrono::DateTime<Utc>`, handling both `Value::Temporal` and `Value::String`.
15fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16    match val {
17        Value::Temporal(tv) => {
18            use uni_common::TemporalValue;
19            match tv {
20                TemporalValue::DateTime {
21                    nanos_since_epoch, ..
22                }
23                | TemporalValue::LocalDateTime {
24                    nanos_since_epoch, ..
25                } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26                TemporalValue::Date { days_since_epoch } => {
27                    chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28                }
29                _ => None,
30            }
31        }
32        Value::String(s) => parse_datetime_utc(s).ok(),
33        _ => None,
34    }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46    BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47    ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54use uni_store::storage::index_manager::IndexManager;
55
56// DataFusion engine imports
57use crate::query::df_graph::L0Context;
58use crate::query::df_planner::HybridPhysicalPlanner;
59use datafusion::physical_plan::ExecutionPlanProperties;
60use datafusion::prelude::SessionContext;
61use parking_lot::RwLock as SyncRwLock;
62
63use arrow_array::{Array, RecordBatch};
64use csv;
65use parquet;
66
67use super::core::*;
68
69/// Number of system fields on an edge map: `_eid`, `_src`, `_dst`, `_type`, `_type_name`.
70const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
71/// Number of system fields on a vertex map: `_vid`, `_label`, `_uid`.
72const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
73
74/// Collect VIDs from all L0 buffers visible to a query context.
75///
76/// Applies `extractor` to each L0 buffer (main, transaction, pending flush) and
77/// collects the results. Returns an empty vec when no query context is present.
78fn collect_l0_vids(
79    ctx: Option<&QueryContext>,
80    extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
81) -> Vec<Vid> {
82    let mut vids = Vec::new();
83    if let Some(ctx) = ctx {
84        vids.extend(extractor(&ctx.l0.read()));
85        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
86            vids.extend(extractor(&tx_l0_arc.read()));
87        }
88        for pending_l0_arc in &ctx.pending_flush_l0s {
89            vids.extend(extractor(&pending_l0_arc.read()));
90        }
91    }
92    vids
93}
94
95/// Hydrate an entity map (vertex or edge) with properties if not already loaded.
96///
97/// This is the fallback for pushdown hydration - if the entity only has system fields
98/// (indicating pushdown didn't load properties), we load all properties here.
99///
100/// System field counts:
101/// - Edge: 5 fields (_eid, _src, _dst, _type, _type_name)
102/// - Vertex: 3 fields (_vid, _label, _uid)
103async fn hydrate_entity_if_needed(
104    map: &mut HashMap<String, Value>,
105    prop_manager: &PropertyManager,
106    ctx: Option<&QueryContext>,
107) {
108    // Check for edge entity
109    if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
110        if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
111            tracing::debug!(
112                "Pushdown fallback: hydrating edge {} at execution time",
113                eid_u64
114            );
115            if let Ok(Some(props)) = prop_manager
116                .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
117                .await
118            {
119                for (key, value) in props {
120                    map.entry(key).or_insert(value);
121                }
122            }
123        } else {
124            tracing::trace!(
125                "Pushdown success: edge {} already has {} properties",
126                eid_u64,
127                map.len() - EDGE_SYSTEM_FIELD_COUNT
128            );
129        }
130        return;
131    }
132
133    // Check for vertex entity
134    if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
135        if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
136            tracing::debug!(
137                "Pushdown fallback: hydrating vertex {} at execution time",
138                vid_u64
139            );
140            if let Ok(Some(props)) = prop_manager
141                .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
142                .await
143            {
144                for (key, value) in props {
145                    map.entry(key).or_insert(value);
146                }
147            }
148        } else {
149            tracing::trace!(
150                "Pushdown success: vertex {} already has {} properties",
151                vid_u64,
152                map.len() - VERTEX_SYSTEM_FIELD_COUNT
153            );
154        }
155    }
156}
157
158impl Executor {
159    /// Helper to verify and filter candidates against an optional predicate.
160    ///
161    /// Deduplicates candidates, loads properties, and evaluates the filter expression.
162    /// Returns only VIDs that pass the filter (or are not deleted).
163    async fn verify_and_filter_candidates(
164        &self,
165        mut candidates: Vec<Vid>,
166        variable: &str,
167        filter: Option<&Expr>,
168        ctx: Option<&QueryContext>,
169        prop_manager: &PropertyManager,
170        params: &HashMap<String, Value>,
171    ) -> Result<Vec<Vid>> {
172        candidates.sort_unstable();
173        candidates.dedup();
174
175        let mut verified_vids = Vec::new();
176        for vid in candidates {
177            let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
178                continue; // Deleted
179            };
180
181            if let Some(expr) = filter {
182                let mut props_map: HashMap<String, Value> = props;
183                props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
184
185                let mut row = HashMap::new();
186                row.insert(variable.to_string(), Value::Map(props_map));
187
188                let res = self
189                    .evaluate_expr(expr, &row, prop_manager, params, ctx)
190                    .await?;
191                if res.as_bool().unwrap_or(false) {
192                    verified_vids.push(vid);
193                }
194            } else {
195                verified_vids.push(vid);
196            }
197        }
198
199        Ok(verified_vids)
200    }
201
202    pub(crate) async fn scan_storage_candidates(
203        &self,
204        label_id: u16,
205        variable: &str,
206        filter: Option<&Expr>,
207    ) -> Result<Vec<Vid>> {
208        let schema = self.storage.schema_manager().schema();
209        let label_name = schema
210            .label_name_by_id(label_id)
211            .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
212
213        let ds = self.storage.vertex_dataset(label_name)?;
214        let lancedb_store = self.storage.lancedb_store();
215
216        // Try LanceDB first (canonical storage)
217        match ds.open_lancedb(lancedb_store).await {
218            Ok(table) => {
219                use arrow_array::UInt64Array;
220                use futures::TryStreamExt;
221                use lancedb::query::{ExecutableQuery, QueryBase, Select};
222
223                let mut query = table.query();
224
225                // Apply filter if provided, with schema awareness
226                // to skip overflow properties that aren't physical Lance columns.
227                // For labels with no registered properties (schemaless), use an empty
228                // map so all non-system properties are recognized as overflow.
229                let empty_props = std::collections::HashMap::new();
230                let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
231                if let Some(expr) = filter
232                    && let Some(sql) = LanceFilterGenerator::generate(
233                        std::slice::from_ref(expr),
234                        variable,
235                        Some(label_props),
236                    )
237                {
238                    query = query.only_if(format!("_deleted = false AND ({})", sql));
239                } else {
240                    query = query.only_if("_deleted = false");
241                }
242
243                // Project to only _vid
244                let query = query.select(Select::columns(&["_vid"]));
245                let stream = query.execute().await?;
246                let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
247
248                let mut vids = Vec::new();
249                for batch in batches {
250                    let vid_col = batch
251                        .column_by_name("_vid")
252                        .ok_or(anyhow!("Missing _vid"))?
253                        .as_any()
254                        .downcast_ref::<UInt64Array>()
255                        .ok_or(anyhow!("Invalid _vid"))?;
256                    for i in 0..batch.num_rows() {
257                        vids.push(Vid::from(vid_col.value(i)));
258                    }
259                }
260                Ok(vids)
261            }
262            Err(e) => {
263                // Only treat "not found" / "does not exist" errors as empty results.
264                // Propagate all other errors (network, auth, corruption, etc.)
265                let err_msg = e.to_string().to_lowercase();
266                if err_msg.contains("not found")
267                    || err_msg.contains("does not exist")
268                    || err_msg.contains("no such file")
269                    || err_msg.contains("object not found")
270                {
271                    Ok(Vec::new())
272                } else {
273                    Err(e)
274                }
275            }
276        }
277    }
278
279    pub(crate) async fn scan_label_with_filter(
280        &self,
281        label_id: u16,
282        variable: &str,
283        filter: Option<&Expr>,
284        ctx: Option<&QueryContext>,
285        prop_manager: &PropertyManager,
286        params: &HashMap<String, Value>,
287    ) -> Result<Vec<Vid>> {
288        let mut candidates = self
289            .scan_storage_candidates(label_id, variable, filter)
290            .await?;
291
292        // Convert label_id to label_name for L0 lookup
293        let schema = self.storage.schema_manager().schema();
294        if let Some(label_name) = schema.label_name_by_id(label_id) {
295            candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
296        }
297
298        self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
299            .await
300    }
301
302    pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
303        // Handle Value::Node directly (has vid field)
304        if let Value::Node(node) = val {
305            return Ok(node.vid);
306        }
307        // Handle Object (node) containing _vid field
308        if let Value::Map(map) = val
309            && let Some(vid_val) = map.get("_vid")
310            && let Some(v) = vid_val.as_u64()
311        {
312            return Ok(Vid::from(v));
313        }
314        // Handle string format
315        if let Some(s) = val.as_str()
316            && let Ok(id) = s.parse::<u64>()
317        {
318            return Ok(Vid::new(id));
319        }
320        // Handle raw u64
321        if let Some(v) = val.as_u64() {
322            return Ok(Vid::from(v));
323        }
324        Err(anyhow!("Invalid Vid format: {:?}", val))
325    }
326
327    /// Find a node value in the row by VID.
328    ///
329    /// Scans all values in the row, looking for a node (Map or Node) whose VID
330    /// matches the target. Returns the full node value if found, or a minimal
331    /// Map with just `_vid` as fallback.
332    fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
333        for val in row.values() {
334            if let Ok(vid) = Self::vid_from_value(val)
335                && vid == target_vid
336            {
337                return val.clone();
338            }
339        }
340        // Fallback: return minimal node map
341        Value::Map(HashMap::from([(
342            "_vid".to_string(),
343            Value::Int(target_vid.as_u64() as i64),
344        )]))
345    }
346
347    /// Create L0 context, session, and planner for DataFusion execution.
348    ///
349    /// This is the shared setup for `execute_datafusion` and `execute_merge_read_plan`.
350    /// Returns `(session_ctx, planner, prop_manager_arc)`.
351    pub async fn create_datafusion_planner(
352        &self,
353        prop_manager: &PropertyManager,
354        params: &HashMap<String, Value>,
355    ) -> Result<(
356        Arc<SyncRwLock<SessionContext>>,
357        HybridPhysicalPlanner,
358        Arc<PropertyManager>,
359    )> {
360        let query_ctx = self.get_context().await;
361        let l0_context = match query_ctx {
362            Some(ref ctx) => L0Context::from_query_context(ctx),
363            None => L0Context::empty(),
364        };
365
366        let prop_manager_arc = Arc::new(PropertyManager::new(
367            self.storage.clone(),
368            self.storage.schema_manager_arc(),
369            prop_manager.cache_size(),
370        ));
371
372        let session = SessionContext::new();
373        crate::query::df_udfs::register_cypher_udfs(&session)?;
374        let session_ctx = Arc::new(SyncRwLock::new(session));
375
376        let mut planner = HybridPhysicalPlanner::with_l0_context(
377            session_ctx.clone(),
378            self.storage.clone(),
379            l0_context,
380            prop_manager_arc.clone(),
381            self.storage.schema_manager().schema(),
382            params.clone(),
383            HashMap::new(),
384        );
385
386        planner = planner.with_algo_registry(self.algo_registry.clone());
387        if let Some(ref registry) = self.procedure_registry {
388            planner = planner.with_procedure_registry(registry.clone());
389        }
390        if let Some(ref xervo_runtime) = self.xervo_runtime {
391            planner = planner.with_xervo_runtime(xervo_runtime.clone());
392        }
393
394        Ok((session_ctx, planner, prop_manager_arc))
395    }
396
397    /// Execute a DataFusion physical plan and collect all result batches.
398    pub fn collect_batches(
399        session_ctx: &Arc<SyncRwLock<SessionContext>>,
400        execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
401    ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
402        Box::pin(async move {
403            use futures::TryStreamExt;
404
405            let task_ctx = session_ctx.read().task_ctx();
406            let partition_count = execution_plan.output_partitioning().partition_count();
407            let mut all_batches = Vec::new();
408            for partition in 0..partition_count {
409                let stream = execution_plan.execute(partition, task_ctx.clone())?;
410                let batches: Vec<RecordBatch> = stream.try_collect().await?;
411                all_batches.extend(batches);
412            }
413            Ok(all_batches)
414        })
415    }
416
417    /// Executes a query using the DataFusion-based engine.
418    ///
419    /// Uses `HybridPhysicalPlanner` which produces DataFusion `ExecutionPlan`
420    /// trees with custom graph operators for graph-specific operations.
421    pub async fn execute_datafusion(
422        &self,
423        plan: LogicalPlan,
424        prop_manager: &PropertyManager,
425        params: &HashMap<String, Value>,
426    ) -> Result<Vec<RecordBatch>> {
427        let (session_ctx, mut planner, prop_manager_arc) =
428            self.create_datafusion_planner(prop_manager, params).await?;
429
430        // Build MutationContext when the plan contains write operations
431        if Self::contains_write_operations(&plan) {
432            let writer = self
433                .writer
434                .as_ref()
435                .ok_or_else(|| anyhow!("Write operations require a Writer"))?
436                .clone();
437            let query_ctx = self.get_context().await;
438
439            debug_assert!(
440                query_ctx.is_some(),
441                "BUG: query_ctx is None for write operation"
442            );
443
444            let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
445                executor: self.clone(),
446                writer,
447                prop_manager: prop_manager_arc,
448                params: params.clone(),
449                query_ctx,
450            });
451            planner = planner.with_mutation_context(mutation_ctx);
452            tracing::debug!(
453                plan_type = Self::get_plan_type(&plan),
454                "Mutation routed to DataFusion engine"
455            );
456        }
457
458        let execution_plan = planner.plan(&plan)?;
459        let result = Self::collect_batches(&session_ctx, execution_plan).await;
460
461        // Harvest warnings from the graph execution context after query completion.
462        let graph_warnings = planner.graph_ctx().take_warnings();
463        if !graph_warnings.is_empty()
464            && let Ok(mut w) = self.warnings.lock()
465        {
466            w.extend(graph_warnings);
467        }
468
469        result
470    }
471
472    /// Execute a MERGE read sub-plan through the DataFusion engine.
473    ///
474    /// Plans and executes the MERGE pattern match using flat columnar output
475    /// (no structural projections), then groups dotted columns (`a._vid`,
476    /// `a._labels`, etc.) into per-variable Maps for downstream MERGE logic.
477    pub(crate) async fn execute_merge_read_plan(
478        &self,
479        plan: LogicalPlan,
480        prop_manager: &PropertyManager,
481        params: &HashMap<String, Value>,
482        merge_variables: Vec<String>,
483    ) -> Result<Vec<HashMap<String, Value>>> {
484        let (session_ctx, planner, _prop_manager_arc) =
485            self.create_datafusion_planner(prop_manager, params).await?;
486
487        // Plan with full property access ("*") for all merge variables so that
488        // ON MATCH SET / ON CREATE SET have complete entity Maps to work with.
489        let extra: HashMap<String, HashSet<String>> = merge_variables
490            .iter()
491            .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
492            .collect();
493        let execution_plan = planner.plan_with_properties(&plan, extra)?;
494        let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
495
496        // Convert to flat rows (dotted column names like "a._vid", "b._labels")
497        let flat_rows = self.record_batches_to_rows(all_batches)?;
498
499        // Group dotted columns into per-variable Maps for MERGE's match logic.
500        // E.g., {"a._vid": 0, "a._labels": ["A"]} → {"a": Map({"_vid": 0, "_labels": ["A"]})}
501        let rows = flat_rows
502            .into_iter()
503            .map(|mut row| {
504                for var in &merge_variables {
505                    // Skip if already materialized (e.g., by record_batches_to_rows)
506                    if row.contains_key(var) {
507                        continue;
508                    }
509                    let prefix = format!("{}.", var);
510                    let dotted_keys: Vec<String> = row
511                        .keys()
512                        .filter(|k| k.starts_with(&prefix))
513                        .cloned()
514                        .collect();
515                    if !dotted_keys.is_empty() {
516                        let mut map = HashMap::new();
517                        for key in dotted_keys {
518                            let prop_name = key[prefix.len()..].to_string();
519                            if let Some(val) = row.remove(&key) {
520                                map.insert(prop_name, val);
521                            }
522                        }
523                        row.insert(var.clone(), Value::Map(map));
524                    }
525                }
526                row
527            })
528            .collect();
529
530        Ok(rows)
531    }
532
533    /// Converts DataFusion RecordBatches to row-based HashMap format.
534    ///
535    /// Handles special metadata on fields:
536    /// - `cv_encoded=true`: Parse the string value as JSON to restore original type
537    ///
538    /// Also normalizes path structures to user-facing format (converts _vid to _id).
539    fn record_batches_to_rows(
540        &self,
541        batches: Vec<RecordBatch>,
542    ) -> Result<Vec<HashMap<String, Value>>> {
543        let mut rows = Vec::new();
544
545        for batch in batches {
546            let num_rows = batch.num_rows();
547            let schema = batch.schema();
548
549            for row_idx in 0..num_rows {
550                let mut row = HashMap::new();
551
552                for (col_idx, field) in schema.fields().iter().enumerate() {
553                    let column = batch.column(col_idx);
554                    // Infer Uni DataType from Arrow type for DateTime/Time struct decoding
555                    let data_type =
556                        if uni_common::core::schema::is_datetime_struct(field.data_type()) {
557                            Some(&uni_common::DataType::DateTime)
558                        } else if uni_common::core::schema::is_time_struct(field.data_type()) {
559                            Some(&uni_common::DataType::Time)
560                        } else {
561                            None
562                        };
563                    let mut value =
564                        arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
565
566                    // Check if this field contains JSON-encoded values (e.g., from UNWIND)
567                    // Parse JSON string to restore the original type
568                    if field
569                        .metadata()
570                        .get("cv_encoded")
571                        .is_some_and(|v| v == "true")
572                        && let Value::String(s) = &value
573                        && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
574                    {
575                        value = Value::from(parsed);
576                    }
577
578                    // Normalize path structures to user-facing format
579                    value = Self::normalize_path_if_needed(value);
580
581                    row.insert(field.name().clone(), value);
582                }
583
584                // Merge system fields into bare variable maps.
585                // The projection step emits helper columns like "n._vid" and "n._labels"
586                // alongside the materialized "n" column (a Map of user properties).
587                // Here we merge those system fields into the map and remove the helpers.
588                //
589                // For search procedures (vector/FTS), the bare variable may be a VID
590                // string placeholder rather than a Map. In that case, promote it to a
591                // Map so we can merge system fields and properties into it.
592                let bare_vars: Vec<String> = row
593                    .keys()
594                    .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
595                    .cloned()
596                    .collect();
597
598                // Detect VID-placeholder variables that have system columns
599                // (e.g., "node" is String("1") but "node._vid" exists).
600                // Promote these to Maps so system fields can be merged in.
601                let vid_placeholder_vars: Vec<String> = row
602                    .keys()
603                    .filter(|k| {
604                        !k.contains('.')
605                            && matches!(row.get(*k), Some(Value::String(_)))
606                            && row.contains_key(&format!("{}._vid", k))
607                    })
608                    .cloned()
609                    .collect();
610
611                for var in &vid_placeholder_vars {
612                    // Build a Map from system and property columns
613                    let prefix = format!("{}.", var);
614                    let mut map = HashMap::new();
615
616                    let dotted_keys: Vec<String> = row
617                        .keys()
618                        .filter(|k| k.starts_with(&prefix))
619                        .cloned()
620                        .collect();
621
622                    for key in &dotted_keys {
623                        let prop_name = &key[prefix.len()..];
624                        if let Some(val) = row.remove(key) {
625                            map.insert(prop_name.to_string(), val);
626                        }
627                    }
628
629                    // Replace the VID-string placeholder with the constructed Map
630                    row.insert(var.clone(), Value::Map(map));
631                }
632
633                for var in &bare_vars {
634                    // Merge node system fields (_vid, _labels)
635                    let vid_key = format!("{}._vid", var);
636                    let labels_key = format!("{}._labels", var);
637
638                    let vid_val = row.remove(&vid_key);
639                    let labels_val = row.remove(&labels_key);
640
641                    if let Some(Value::Map(map)) = row.get_mut(var) {
642                        if let Some(v) = vid_val {
643                            map.insert("_vid".to_string(), v);
644                        }
645                        if let Some(v) = labels_val {
646                            map.insert("_labels".to_string(), v);
647                        }
648                    }
649
650                    // Merge edge system fields (_eid, _type, _src_vid, _dst_vid).
651                    // These are emitted as helper columns by the traverse exec.
652                    // The structural projection already includes them in the struct,
653                    // but we still need to remove the dotted helper columns.
654                    let eid_key = format!("{}._eid", var);
655                    let type_key = format!("{}._type", var);
656
657                    let eid_val = row.remove(&eid_key);
658                    let type_val = row.remove(&type_key);
659
660                    if (eid_val.is_some() || type_val.is_some())
661                        && let Some(Value::Map(map)) = row.get_mut(var)
662                    {
663                        if let Some(v) = eid_val {
664                            map.entry("_eid".to_string()).or_insert(v);
665                        }
666                        if let Some(v) = type_val {
667                            map.entry("_type".to_string()).or_insert(v);
668                        }
669                    }
670
671                    // Remove remaining dotted helper columns (e.g. _all_props, _src_vid, _dst_vid)
672                    let prefix = format!("{}.", var);
673                    let helper_keys: Vec<String> = row
674                        .keys()
675                        .filter(|k| k.starts_with(&prefix))
676                        .cloned()
677                        .collect();
678                    for key in helper_keys {
679                        row.remove(&key);
680                    }
681                }
682
683                rows.push(row);
684            }
685        }
686
687        Ok(rows)
688    }
689
690    /// Normalize a value if it's a path structure, converting internal format to user-facing format.
691    ///
692    /// This only normalizes path structures (objects with "nodes" and "relationships" arrays).
693    /// Other values are returned unchanged to avoid interfering with query execution.
694    fn normalize_path_if_needed(value: Value) -> Value {
695        match value {
696            Value::Map(map)
697                if map.contains_key("nodes")
698                    && (map.contains_key("relationships") || map.contains_key("edges")) =>
699            {
700                Self::normalize_path_map(map)
701            }
702            other => other,
703        }
704    }
705
706    /// Normalize a path map object.
707    fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
708        // Normalize nodes array
709        if let Some(Value::List(nodes)) = map.remove("nodes") {
710            let normalized_nodes: Vec<Value> = nodes
711                .into_iter()
712                .map(|n| {
713                    if let Value::Map(node_map) = n {
714                        Self::normalize_path_node_map(node_map)
715                    } else {
716                        n
717                    }
718                })
719                .collect();
720            map.insert("nodes".to_string(), Value::List(normalized_nodes));
721        }
722
723        // Normalize relationships array (may be called "relationships" or "edges")
724        let rels_key = if map.contains_key("relationships") {
725            "relationships"
726        } else {
727            "edges"
728        };
729        if let Some(Value::List(rels)) = map.remove(rels_key) {
730            let normalized_rels: Vec<Value> = rels
731                .into_iter()
732                .map(|r| {
733                    if let Value::Map(rel_map) = r {
734                        Self::normalize_path_edge_map(rel_map)
735                    } else {
736                        r
737                    }
738                })
739                .collect();
740            map.insert("relationships".to_string(), Value::List(normalized_rels));
741        }
742
743        Value::Map(map)
744    }
745
746    /// Convert a Value to its string representation for path normalization.
747    fn value_to_id_string(val: Value) -> String {
748        match val {
749            Value::Int(n) => n.to_string(),
750            Value::Float(n) => n.to_string(),
751            Value::String(s) => s,
752            other => other.to_string(),
753        }
754    }
755
756    /// Move a map entry from `src_key` to `dst_key`, converting the value to a string.
757    /// When `src_key == dst_key`, this simply stringifies the value in place.
758    fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
759        if let Some(val) = map.remove(src_key) {
760            map.insert(
761                dst_key.to_string(),
762                Value::String(Self::value_to_id_string(val)),
763            );
764        }
765    }
766
767    /// Ensure the "properties" field is a non-null map.
768    fn ensure_properties_map(map: &mut HashMap<String, Value>) {
769        match map.get("properties") {
770            Some(props) if !props.is_null() => {}
771            _ => {
772                map.insert("properties".to_string(), Value::Map(HashMap::new()));
773            }
774        }
775    }
776
777    /// Normalize a node within a path to user-facing format.
778    fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
779        Self::stringify_map_field(&mut map, "_vid", "_id");
780        Self::ensure_properties_map(&mut map);
781        Value::Map(map)
782    }
783
784    /// Normalize an edge within a path to user-facing format.
785    fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
786        Self::stringify_map_field(&mut map, "_eid", "_id");
787        Self::stringify_map_field(&mut map, "_src", "_src");
788        Self::stringify_map_field(&mut map, "_dst", "_dst");
789
790        if let Some(type_name) = map.remove("_type_name") {
791            map.insert("_type".to_string(), type_name);
792        }
793
794        Self::ensure_properties_map(&mut map);
795        Value::Map(map)
796    }
797
798    #[instrument(
799        skip(self, prop_manager, params),
800        fields(rows_returned, duration_ms),
801        level = "info"
802    )]
803    pub fn execute<'a>(
804        &'a self,
805        plan: LogicalPlan,
806        prop_manager: &'a PropertyManager,
807        params: &'a HashMap<String, Value>,
808    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
809        Box::pin(async move {
810            let query_type = Self::get_plan_type(&plan);
811            let ctx = self.get_context().await;
812            let start = Instant::now();
813
814            // Route DDL/Admin queries to the fallback executor.
815            // All other queries (including similar_to) flow through DataFusion.
816            let res = if Self::is_ddl_or_admin(&plan) {
817                self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
818                    .await
819            } else {
820                let batches = self
821                    .execute_datafusion(plan.clone(), prop_manager, params)
822                    .await?;
823                self.record_batches_to_rows(batches)
824            };
825
826            let duration = start.elapsed();
827            metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
828                .record(duration.as_secs_f64());
829
830            tracing::Span::current().record("duration_ms", duration.as_millis());
831            match &res {
832                Ok(rows) => {
833                    tracing::Span::current().record("rows_returned", rows.len());
834                    metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
835                        .increment(rows.len() as u64);
836                }
837                Err(e) => {
838                    let error_type = if e.to_string().contains("timed out") {
839                        "timeout"
840                    } else if e.to_string().contains("syntax") {
841                        "syntax"
842                    } else {
843                        "execution"
844                    };
845                    metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
846                }
847            }
848
849            res
850        })
851    }
852
853    fn get_plan_type(plan: &LogicalPlan) -> &'static str {
854        match plan {
855            LogicalPlan::Scan { .. } => "read_scan",
856            LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
857            LogicalPlan::Traverse { .. } => "read_traverse",
858            LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
859            LogicalPlan::ScanAll { .. } => "read_scan_all",
860            LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
861            LogicalPlan::VectorKnn { .. } => "read_vector",
862            LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
863            LogicalPlan::Merge { .. } => "write_merge",
864            LogicalPlan::Delete { .. } => "write_delete",
865            LogicalPlan::Set { .. } => "write_set",
866            LogicalPlan::Remove { .. } => "write_remove",
867            LogicalPlan::ProcedureCall { .. } => "call",
868            LogicalPlan::Copy { .. } => "copy",
869            LogicalPlan::Backup { .. } => "backup",
870            _ => "other",
871        }
872    }
873
874    /// Return all direct child plan references from a `LogicalPlan`.
875    ///
876    /// This centralizes the variant→children mapping so that recursive walkers
877    /// (e.g., `contains_write_operations`) can delegate the
878    /// "recurse into children" logic instead of duplicating the match arms.
879    ///
880    /// Note: `Foreach` returns only its `input`; the `body: Vec<LogicalPlan>`
881    /// is not included because it requires special iteration. Callers that
882    /// need to inspect the body should handle `Foreach` before falling through.
883    fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
884        match plan {
885            // Single-input wrappers
886            LogicalPlan::Project { input, .. }
887            | LogicalPlan::Sort { input, .. }
888            | LogicalPlan::Limit { input, .. }
889            | LogicalPlan::Distinct { input }
890            | LogicalPlan::Aggregate { input, .. }
891            | LogicalPlan::Window { input, .. }
892            | LogicalPlan::Unwind { input, .. }
893            | LogicalPlan::Filter { input, .. }
894            | LogicalPlan::Create { input, .. }
895            | LogicalPlan::CreateBatch { input, .. }
896            | LogicalPlan::Set { input, .. }
897            | LogicalPlan::Remove { input, .. }
898            | LogicalPlan::Delete { input, .. }
899            | LogicalPlan::Merge { input, .. }
900            | LogicalPlan::Foreach { input, .. }
901            | LogicalPlan::Traverse { input, .. }
902            | LogicalPlan::TraverseMainByType { input, .. }
903            | LogicalPlan::BindZeroLengthPath { input, .. }
904            | LogicalPlan::BindPath { input, .. }
905            | LogicalPlan::ShortestPath { input, .. }
906            | LogicalPlan::AllShortestPaths { input, .. }
907            | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
908
909            // Two-input wrappers
910            LogicalPlan::Apply {
911                input, subquery, ..
912            }
913            | LogicalPlan::SubqueryCall { input, subquery } => {
914                vec![input.as_ref(), subquery.as_ref()]
915            }
916            LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
917                vec![left.as_ref(), right.as_ref()]
918            }
919            LogicalPlan::RecursiveCTE {
920                initial, recursive, ..
921            } => vec![initial.as_ref(), recursive.as_ref()],
922            LogicalPlan::QuantifiedPattern {
923                input,
924                pattern_plan,
925                ..
926            } => vec![input.as_ref(), pattern_plan.as_ref()],
927
928            // Leaf nodes (scans, DDL, admin, etc.)
929            _ => vec![],
930        }
931    }
932
933    /// Check if a plan is a DDL or admin operation that should skip DataFusion.
934    ///
935    /// These operations don't produce data streams and aren't supported by the
936    /// DataFusion planner. Recurses through wrapper nodes (`Project`, `Sort`,
937    /// `Limit`, etc.) to detect DDL/admin operations nested inside read
938    /// wrappers (e.g. `CALL procedure(...) YIELD x RETURN x`).
939    fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
940        match plan {
941            // DDL / schema operations
942            LogicalPlan::CreateLabel(_)
943            | LogicalPlan::CreateEdgeType(_)
944            | LogicalPlan::AlterLabel(_)
945            | LogicalPlan::AlterEdgeType(_)
946            | LogicalPlan::DropLabel(_)
947            | LogicalPlan::DropEdgeType(_)
948            | LogicalPlan::CreateConstraint(_)
949            | LogicalPlan::DropConstraint(_)
950            | LogicalPlan::ShowConstraints(_) => true,
951
952            // Index operations
953            LogicalPlan::CreateVectorIndex { .. }
954            | LogicalPlan::CreateFullTextIndex { .. }
955            | LogicalPlan::CreateScalarIndex { .. }
956            | LogicalPlan::CreateJsonFtsIndex { .. }
957            | LogicalPlan::DropIndex { .. }
958            | LogicalPlan::ShowIndexes { .. } => true,
959
960            // Admin / utility operations
961            LogicalPlan::ShowDatabase
962            | LogicalPlan::ShowConfig
963            | LogicalPlan::ShowStatistics
964            | LogicalPlan::Vacuum
965            | LogicalPlan::Checkpoint
966            | LogicalPlan::Begin
967            | LogicalPlan::Commit
968            | LogicalPlan::Rollback
969            | LogicalPlan::Copy { .. }
970            | LogicalPlan::CopyTo { .. }
971            | LogicalPlan::CopyFrom { .. }
972            | LogicalPlan::Backup { .. }
973            | LogicalPlan::Explain { .. } => true,
974
975            // Procedure calls: DF-eligible procedures go through DataFusion,
976            // everything else (DDL, admin, unknown) stays on fallback.
977            LogicalPlan::ProcedureCall { procedure_name, .. } => {
978                !Self::is_df_eligible_procedure(procedure_name)
979            }
980
981            // Recurse through children using plan_children
982            _ => Self::plan_children(plan)
983                .iter()
984                .any(|child| Self::is_ddl_or_admin(child)),
985        }
986    }
987
988    /// Returns `true` if the procedure is a read-only, data-producing procedure
989    /// that can be executed through the DataFusion engine.
990    ///
991    /// This is a **positive allowlist** — unknown procedures default to the
992    /// fallback executor (safe for TCK test procedures, future DDL, and admin).
993    fn is_df_eligible_procedure(name: &str) -> bool {
994        matches!(
995            name,
996            "uni.schema.labels"
997                | "uni.schema.edgeTypes"
998                | "uni.schema.relationshipTypes"
999                | "uni.schema.indexes"
1000                | "uni.schema.constraints"
1001                | "uni.schema.labelInfo"
1002                | "uni.vector.query"
1003                | "uni.fts.query"
1004                | "uni.search"
1005        ) || name.starts_with("uni.algo.")
1006    }
1007
1008    /// Check if a plan contains write/mutation operations anywhere in the tree.
1009    ///
1010    /// Write operations (`CREATE`, `MERGE`, `DELETE`, `SET`, `REMOVE`, `FOREACH`)
1011    /// are used to determine when a MutationContext needs to be built for DataFusion.
1012    /// This recurses through read-only wrapper nodes to detect writes nested inside
1013    /// projections (e.g. `CREATE (n:Person) RETURN n` produces `Project { Create { ... } }`).
1014    fn contains_write_operations(plan: &LogicalPlan) -> bool {
1015        match plan {
1016            LogicalPlan::Create { .. }
1017            | LogicalPlan::CreateBatch { .. }
1018            | LogicalPlan::Merge { .. }
1019            | LogicalPlan::Delete { .. }
1020            | LogicalPlan::Set { .. }
1021            | LogicalPlan::Remove { .. }
1022            | LogicalPlan::Foreach { .. } => true,
1023            _ => Self::plan_children(plan)
1024                .iter()
1025                .any(|child| Self::contains_write_operations(child)),
1026        }
1027    }
1028
1029    /// Executes a query as a stream of result batches.
1030    ///
1031    /// Routes DDL/Admin through the fallback executor and everything else
1032    /// through DataFusion.
1033    pub fn execute_stream(
1034        self,
1035        plan: LogicalPlan,
1036        prop_manager: Arc<PropertyManager>,
1037        params: HashMap<String, Value>,
1038    ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1039        let this = self;
1040        let this_for_ctx = this.clone();
1041
1042        let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1043
1044        ctx_stream
1045            .flat_map(move |ctx| {
1046                let plan = plan.clone();
1047                let this = this.clone();
1048                let prop_manager = prop_manager.clone();
1049                let params = params.clone();
1050
1051                let fut = async move {
1052                    if Self::is_ddl_or_admin(&plan) {
1053                        this.execute_subplan(plan, &prop_manager, &params, ctx.as_ref())
1054                            .await
1055                    } else {
1056                        let batches = this
1057                            .execute_datafusion(plan, &prop_manager, &params)
1058                            .await?;
1059                        this.record_batches_to_rows(batches)
1060                    }
1061                };
1062                stream::once(fut).boxed()
1063            })
1064            .boxed()
1065    }
1066
1067    /// Converts an Arrow array element at a given row index to a Value.
1068    /// Delegates to the shared implementation in arrow_convert module.
1069    pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1070        arrow_convert::arrow_to_value(col, row, None)
1071    }
1072
1073    pub(crate) fn evaluate_expr<'a>(
1074        &'a self,
1075        expr: &'a Expr,
1076        row: &'a HashMap<String, Value>,
1077        prop_manager: &'a PropertyManager,
1078        params: &'a HashMap<String, Value>,
1079        ctx: Option<&'a QueryContext>,
1080    ) -> BoxFuture<'a, Result<Value>> {
1081        let this = self;
1082        Box::pin(async move {
1083            // First check if the expression itself is already pre-computed in the row
1084            let repr = expr.to_string_repr();
1085            if let Some(val) = row.get(&repr) {
1086                return Ok(val.clone());
1087            }
1088
1089            match expr {
1090                Expr::PatternComprehension { .. } => {
1091                    // Handled by DataFusion path via PatternComprehensionExecExpr
1092                    Err(anyhow::anyhow!(
1093                        "Pattern comprehensions are handled by DataFusion executor"
1094                    ))
1095                }
1096                Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1097                    "COLLECT subqueries not yet supported in executor"
1098                )),
1099                Expr::Variable(name) => {
1100                    if let Some(val) = row.get(name) {
1101                        Ok(val.clone())
1102                    } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1103                        // Fallback: scan results may have system columns like "d._vid"
1104                        // without a materialized "d" Map. Return the VID so Property
1105                        // evaluation can fetch properties from storage.
1106                        Ok(vid_val.clone())
1107                    } else {
1108                        Ok(params.get(name).cloned().unwrap_or(Value::Null))
1109                    }
1110                }
1111                Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1112                Expr::Property(var_expr, prop_name) => {
1113                    // Fast path: if the base is a Variable, try flat-key lookup first.
1114                    // DataFusion scan results use flat keys like "d.embedding" rather than
1115                    // nested maps, so "d.embedding" won't be found via Variable("d") -> Property.
1116                    if let Expr::Variable(var_name) = var_expr.as_ref() {
1117                        let flat_key = format!("{}.{}", var_name, prop_name);
1118                        if let Some(val) = row.get(flat_key.as_str()) {
1119                            return Ok(val.clone());
1120                        }
1121                    }
1122
1123                    let base_val = this
1124                        .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1125                        .await?;
1126
1127                    // Handle system properties _vid and _id directly
1128                    if (prop_name == "_vid" || prop_name == "_id")
1129                        && let Ok(vid) = Self::vid_from_value(&base_val)
1130                    {
1131                        return Ok(Value::Int(vid.as_u64() as i64));
1132                    }
1133
1134                    // Handle Value::Node - access properties directly or via prop manager
1135                    if let Value::Node(node) = &base_val {
1136                        // Handle system properties
1137                        if prop_name == "_vid" || prop_name == "_id" {
1138                            return Ok(Value::Int(node.vid.as_u64() as i64));
1139                        }
1140                        if prop_name == "_labels" {
1141                            return Ok(Value::List(
1142                                node.labels
1143                                    .iter()
1144                                    .map(|l| Value::String(l.clone()))
1145                                    .collect(),
1146                            ));
1147                        }
1148                        // Check in-memory properties first
1149                        if let Some(val) = node.properties.get(prop_name.as_str()) {
1150                            return Ok(val.clone());
1151                        }
1152                        // Fallback to storage lookup
1153                        if let Ok(val) = prop_manager
1154                            .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1155                            .await
1156                        {
1157                            return Ok(val);
1158                        }
1159                        return Ok(Value::Null);
1160                    }
1161
1162                    // Handle Value::Edge - access properties directly or via prop manager
1163                    if let Value::Edge(edge) = &base_val {
1164                        // Handle system properties
1165                        if prop_name == "_eid" || prop_name == "_id" {
1166                            return Ok(Value::Int(edge.eid.as_u64() as i64));
1167                        }
1168                        if prop_name == "_type" {
1169                            return Ok(Value::String(edge.edge_type.clone()));
1170                        }
1171                        if prop_name == "_src" {
1172                            return Ok(Value::Int(edge.src.as_u64() as i64));
1173                        }
1174                        if prop_name == "_dst" {
1175                            return Ok(Value::Int(edge.dst.as_u64() as i64));
1176                        }
1177                        // Check in-memory properties first
1178                        if let Some(val) = edge.properties.get(prop_name.as_str()) {
1179                            return Ok(val.clone());
1180                        }
1181                        // Fallback to storage lookup
1182                        if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1183                        {
1184                            return Ok(val);
1185                        }
1186                        return Ok(Value::Null);
1187                    }
1188
1189                    // If base_val is an object (node/edge), check its properties first
1190                    // This handles properties from CREATE/MERGE that may not be persisted yet
1191                    if let Value::Map(map) = &base_val {
1192                        // First check top-level (for system properties like _id, _label, etc.)
1193                        if let Some(val) = map.get(prop_name.as_str()) {
1194                            return Ok(val.clone());
1195                        }
1196                        // Then check inside "properties" object (for user properties)
1197                        if let Some(Value::Map(props)) = map.get("properties")
1198                            && let Some(val) = props.get(prop_name.as_str())
1199                        {
1200                            return Ok(val.clone());
1201                        }
1202                        // Fallback to storage lookup using _vid or _id
1203                        let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1204                            map.get("_id")
1205                                .and_then(|v| v.as_str())
1206                                .and_then(|s| s.parse::<u64>().ok())
1207                        });
1208                        if let Some(id) = vid_opt {
1209                            let vid = Vid::from(id);
1210                            if let Ok(val) = prop_manager
1211                                .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1212                                .await
1213                            {
1214                                return Ok(val);
1215                            }
1216                        } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1217                            let eid = uni_common::core::id::Eid::from(id);
1218                            if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1219                                return Ok(val);
1220                            }
1221                        }
1222                        return Ok(Value::Null);
1223                    }
1224
1225                    // If base_val is just a VID, fetch from property manager
1226                    if let Ok(vid) = Self::vid_from_value(&base_val) {
1227                        return prop_manager
1228                            .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1229                            .await;
1230                    }
1231
1232                    if base_val.is_null() {
1233                        return Ok(Value::Null);
1234                    }
1235
1236                    // Check if base_val is a temporal value and prop_name is a temporal accessor
1237                    {
1238                        use crate::query::datetime::{
1239                            eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1240                            is_duration_string, is_temporal_accessor, is_temporal_string,
1241                        };
1242
1243                        // Handle Value::Temporal directly (no string parsing needed)
1244                        if let Value::Temporal(tv) = &base_val {
1245                            if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1246                                if is_duration_accessor(prop_name) {
1247                                    // Convert to string for the existing accessor logic
1248                                    return eval_duration_accessor(
1249                                        &base_val.to_string(),
1250                                        prop_name,
1251                                    );
1252                                }
1253                            } else if is_temporal_accessor(prop_name) {
1254                                return eval_temporal_accessor(&base_val.to_string(), prop_name);
1255                            }
1256                        }
1257
1258                        // Handle Value::String temporal (backward compat)
1259                        if let Value::String(s) = &base_val {
1260                            if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1261                                return eval_temporal_accessor(s, prop_name);
1262                            }
1263                            if is_duration_string(s) && is_duration_accessor(prop_name) {
1264                                return eval_duration_accessor(s, prop_name);
1265                            }
1266                        }
1267                    }
1268
1269                    Err(anyhow!(
1270                        "Cannot access property '{}' on {:?}",
1271                        prop_name,
1272                        base_val
1273                    ))
1274                }
1275                Expr::ArrayIndex {
1276                    array: arr_expr,
1277                    index: idx_expr,
1278                } => {
1279                    let arr_val = this
1280                        .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1281                        .await?;
1282                    let idx_val = this
1283                        .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1284                        .await?;
1285
1286                    if let Value::List(arr) = &arr_val {
1287                        // Handle signed indices (allow negative)
1288                        if let Some(i) = idx_val.as_i64() {
1289                            let idx = if i < 0 {
1290                                // Negative index: -1 = last element, -2 = second to last, etc.
1291                                let positive_idx = arr.len() as i64 + i;
1292                                if positive_idx < 0 {
1293                                    return Ok(Value::Null); // Out of bounds
1294                                }
1295                                positive_idx as usize
1296                            } else {
1297                                i as usize
1298                            };
1299                            if idx < arr.len() {
1300                                return Ok(arr[idx].clone());
1301                            }
1302                            return Ok(Value::Null);
1303                        } else if idx_val.is_null() {
1304                            return Ok(Value::Null);
1305                        } else {
1306                            return Err(anyhow::anyhow!(
1307                                "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1308                                idx_val
1309                            ));
1310                        }
1311                    }
1312                    if let Value::Map(map) = &arr_val {
1313                        if let Some(key) = idx_val.as_str() {
1314                            return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1315                        } else if !idx_val.is_null() {
1316                            return Err(anyhow::anyhow!(
1317                                "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1318                                idx_val
1319                            ));
1320                        }
1321                    }
1322                    // Handle bracket access on Node: n['name'] returns property
1323                    if let Value::Node(node) = &arr_val {
1324                        if let Some(key) = idx_val.as_str() {
1325                            // Check in-memory properties first
1326                            if let Some(val) = node.properties.get(key) {
1327                                return Ok(val.clone());
1328                            }
1329                            // Fallback to property manager
1330                            if let Ok(val) = prop_manager
1331                                .get_vertex_prop_with_ctx(node.vid, key, ctx)
1332                                .await
1333                            {
1334                                return Ok(val);
1335                            }
1336                            return Ok(Value::Null);
1337                        } else if !idx_val.is_null() {
1338                            return Err(anyhow::anyhow!(
1339                                "TypeError: Node index must be a string, got: {:?}",
1340                                idx_val
1341                            ));
1342                        }
1343                    }
1344                    // Handle bracket access on Edge: e['property'] returns property
1345                    if let Value::Edge(edge) = &arr_val {
1346                        if let Some(key) = idx_val.as_str() {
1347                            // Check in-memory properties first
1348                            if let Some(val) = edge.properties.get(key) {
1349                                return Ok(val.clone());
1350                            }
1351                            // Fallback to property manager
1352                            if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1353                                return Ok(val);
1354                            }
1355                            return Ok(Value::Null);
1356                        } else if !idx_val.is_null() {
1357                            return Err(anyhow::anyhow!(
1358                                "TypeError: Edge index must be a string, got: {:?}",
1359                                idx_val
1360                            ));
1361                        }
1362                    }
1363                    // Handle bracket access on VID (integer): n['name'] where n is a VID
1364                    if let Ok(vid) = Self::vid_from_value(&arr_val)
1365                        && let Some(key) = idx_val.as_str()
1366                    {
1367                        if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1368                        {
1369                            return Ok(val);
1370                        }
1371                        return Ok(Value::Null);
1372                    }
1373                    if arr_val.is_null() {
1374                        return Ok(Value::Null);
1375                    }
1376                    Err(anyhow!(
1377                        "TypeError: InvalidArgumentType - cannot index into {:?}",
1378                        arr_val
1379                    ))
1380                }
1381                Expr::ArraySlice { array, start, end } => {
1382                    let arr_val = this
1383                        .evaluate_expr(array, row, prop_manager, params, ctx)
1384                        .await?;
1385
1386                    if let Value::List(arr) = &arr_val {
1387                        let len = arr.len();
1388
1389                        // Evaluate start index (default to 0), null → null result
1390                        let start_idx = if let Some(s) = start {
1391                            let v = this
1392                                .evaluate_expr(s, row, prop_manager, params, ctx)
1393                                .await?;
1394                            if v.is_null() {
1395                                return Ok(Value::Null);
1396                            }
1397                            let raw = v.as_i64().unwrap_or(0);
1398                            if raw < 0 {
1399                                (len as i64 + raw).max(0) as usize
1400                            } else {
1401                                (raw as usize).min(len)
1402                            }
1403                        } else {
1404                            0
1405                        };
1406
1407                        // Evaluate end index (default to length), null → null result
1408                        let end_idx = if let Some(e) = end {
1409                            let v = this
1410                                .evaluate_expr(e, row, prop_manager, params, ctx)
1411                                .await?;
1412                            if v.is_null() {
1413                                return Ok(Value::Null);
1414                            }
1415                            let raw = v.as_i64().unwrap_or(len as i64);
1416                            if raw < 0 {
1417                                (len as i64 + raw).max(0) as usize
1418                            } else {
1419                                (raw as usize).min(len)
1420                            }
1421                        } else {
1422                            len
1423                        };
1424
1425                        // Return sliced array
1426                        if start_idx >= end_idx {
1427                            return Ok(Value::List(vec![]));
1428                        }
1429                        let end_idx = end_idx.min(len);
1430                        return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1431                    }
1432
1433                    if arr_val.is_null() {
1434                        return Ok(Value::Null);
1435                    }
1436                    Err(anyhow!("Cannot slice {:?}", arr_val))
1437                }
1438                Expr::Literal(lit) => Ok(lit.to_value()),
1439                Expr::List(items) => {
1440                    let mut vals = Vec::new();
1441                    for item in items {
1442                        vals.push(
1443                            this.evaluate_expr(item, row, prop_manager, params, ctx)
1444                                .await?,
1445                        );
1446                    }
1447                    Ok(Value::List(vals))
1448                }
1449                Expr::Map(items) => {
1450                    let mut map = HashMap::new();
1451                    for (key, value_expr) in items {
1452                        let val = this
1453                            .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1454                            .await?;
1455                        map.insert(key.clone(), val);
1456                    }
1457                    Ok(Value::Map(map))
1458                }
1459                Expr::Exists { query, .. } => {
1460                    // Plan and execute subquery; failures return false (pattern doesn't match)
1461                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1462                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1463
1464                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1465                        Ok(plan) => {
1466                            let mut sub_params = params.clone();
1467                            sub_params.extend(row.clone());
1468
1469                            match this.execute(plan, prop_manager, &sub_params).await {
1470                                Ok(results) => Ok(Value::Bool(!results.is_empty())),
1471                                Err(e) => {
1472                                    log::debug!("EXISTS subquery execution failed: {}", e);
1473                                    Ok(Value::Bool(false))
1474                                }
1475                            }
1476                        }
1477                        Err(e) => {
1478                            log::debug!("EXISTS subquery planning failed: {}", e);
1479                            Ok(Value::Bool(false))
1480                        }
1481                    }
1482                }
1483                Expr::CountSubquery(query) => {
1484                    // Similar to Exists but returns count
1485                    let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1486
1487                    let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1488
1489                    match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1490                        Ok(plan) => {
1491                            let mut sub_params = params.clone();
1492                            sub_params.extend(row.clone());
1493
1494                            match this.execute(plan, prop_manager, &sub_params).await {
1495                                Ok(results) => Ok(Value::from(results.len() as i64)),
1496                                Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1497                            }
1498                        }
1499                        Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1500                    }
1501                }
1502                Expr::Quantifier {
1503                    quantifier,
1504                    variable,
1505                    list,
1506                    predicate,
1507                } => {
1508                    // Quantifier expression evaluation (ALL/ANY/SINGLE/NONE)
1509                    //
1510                    // This is the primary execution path for quantifiers because DataFusion
1511                    // does not support lambda functions yet. Queries with quantifiers attempt
1512                    // DataFusion translation first, fail (see df_expr.rs:289), then fall back
1513                    // to this fallback executor path.
1514                    //
1515                    // This is intentional design - we get correct semantics with row-by-row
1516                    // evaluation until DataFusion adds lambda support.
1517                    //
1518                    // See: https://github.com/apache/datafusion/issues/14205
1519
1520                    // Evaluate the list expression
1521                    let list_val = this
1522                        .evaluate_expr(list, row, prop_manager, params, ctx)
1523                        .await?;
1524
1525                    // Handle null propagation
1526                    if list_val.is_null() {
1527                        return Ok(Value::Null);
1528                    }
1529
1530                    // Convert to array
1531                    let items = match list_val {
1532                        Value::List(arr) => arr,
1533                        _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1534                    };
1535
1536                    // Evaluate predicate for each item
1537                    let mut satisfied_count = 0;
1538                    for item in &items {
1539                        // Create new row with bound variable
1540                        let mut item_row = row.clone();
1541                        item_row.insert(variable.clone(), item.clone());
1542
1543                        // Evaluate predicate with bound variable
1544                        let pred_result = this
1545                            .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1546                            .await?;
1547
1548                        // Check if predicate is satisfied
1549                        if let Value::Bool(true) = pred_result {
1550                            satisfied_count += 1;
1551                        }
1552                    }
1553
1554                    // Return based on quantifier type
1555                    let result = match quantifier {
1556                        Quantifier::All => satisfied_count == items.len(),
1557                        Quantifier::Any => satisfied_count > 0,
1558                        Quantifier::Single => satisfied_count == 1,
1559                        Quantifier::None => satisfied_count == 0,
1560                    };
1561
1562                    Ok(Value::Bool(result))
1563                }
1564                Expr::ListComprehension {
1565                    variable,
1566                    list,
1567                    where_clause,
1568                    map_expr,
1569                } => {
1570                    // List comprehension evaluation: [x IN list WHERE pred | expr]
1571                    //
1572                    // Similar to quantifiers, this requires lambda-like evaluation
1573                    // which DataFusion doesn't support yet. This is the primary execution path.
1574
1575                    // Evaluate the list expression
1576                    let list_val = this
1577                        .evaluate_expr(list, row, prop_manager, params, ctx)
1578                        .await?;
1579
1580                    // Handle null propagation
1581                    if list_val.is_null() {
1582                        return Ok(Value::Null);
1583                    }
1584
1585                    // Convert to array
1586                    let items = match list_val {
1587                        Value::List(arr) => arr,
1588                        _ => {
1589                            return Err(anyhow!(
1590                                "List comprehension expects a list, got: {:?}",
1591                                list_val
1592                            ));
1593                        }
1594                    };
1595
1596                    // Collect mapped values
1597                    let mut results = Vec::new();
1598                    for item in &items {
1599                        // Create new row with bound variable
1600                        let mut item_row = row.clone();
1601                        item_row.insert(variable.clone(), item.clone());
1602
1603                        // Apply WHERE filter if present
1604                        if let Some(predicate) = where_clause {
1605                            let pred_result = this
1606                                .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1607                                .await?;
1608
1609                            // Skip items that don't match the filter
1610                            if !matches!(pred_result, Value::Bool(true)) {
1611                                continue;
1612                            }
1613                        }
1614
1615                        // Apply map expression
1616                        let mapped_val = this
1617                            .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1618                            .await?;
1619                        results.push(mapped_val);
1620                    }
1621
1622                    Ok(Value::List(results))
1623                }
1624                Expr::BinaryOp { left, op, right } => {
1625                    // Short-circuit evaluation for AND/OR
1626                    match op {
1627                        BinaryOp::And => {
1628                            let l_val = this
1629                                .evaluate_expr(left, row, prop_manager, params, ctx)
1630                                .await?;
1631                            // Short-circuit: if left is false, don't evaluate right
1632                            if let Some(false) = l_val.as_bool() {
1633                                return Ok(Value::Bool(false));
1634                            }
1635                            let r_val = this
1636                                .evaluate_expr(right, row, prop_manager, params, ctx)
1637                                .await?;
1638                            eval_binary_op(&l_val, op, &r_val)
1639                        }
1640                        BinaryOp::Or => {
1641                            let l_val = this
1642                                .evaluate_expr(left, row, prop_manager, params, ctx)
1643                                .await?;
1644                            // Short-circuit: if left is true, don't evaluate right
1645                            if let Some(true) = l_val.as_bool() {
1646                                return Ok(Value::Bool(true));
1647                            }
1648                            let r_val = this
1649                                .evaluate_expr(right, row, prop_manager, params, ctx)
1650                                .await?;
1651                            eval_binary_op(&l_val, op, &r_val)
1652                        }
1653                        _ => {
1654                            // For all other operators, evaluate both sides
1655                            let l_val = this
1656                                .evaluate_expr(left, row, prop_manager, params, ctx)
1657                                .await?;
1658                            let r_val = this
1659                                .evaluate_expr(right, row, prop_manager, params, ctx)
1660                                .await?;
1661                            eval_binary_op(&l_val, op, &r_val)
1662                        }
1663                    }
1664                }
1665                Expr::In { expr, list } => {
1666                    let l_val = this
1667                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1668                        .await?;
1669                    let r_val = this
1670                        .evaluate_expr(list, row, prop_manager, params, ctx)
1671                        .await?;
1672                    eval_in_op(&l_val, &r_val)
1673                }
1674                Expr::UnaryOp { op, expr } => {
1675                    let val = this
1676                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1677                        .await?;
1678                    match op {
1679                        UnaryOp::Not => {
1680                            // Three-valued logic: NOT null = null
1681                            match val.as_bool() {
1682                                Some(b) => Ok(Value::Bool(!b)),
1683                                None if val.is_null() => Ok(Value::Null),
1684                                None => Err(anyhow!(
1685                                    "InvalidArgumentType: NOT requires a boolean argument"
1686                                )),
1687                            }
1688                        }
1689                        UnaryOp::Neg => {
1690                            if let Some(i) = val.as_i64() {
1691                                Ok(Value::Int(-i))
1692                            } else if let Some(f) = val.as_f64() {
1693                                Ok(Value::Float(-f))
1694                            } else {
1695                                Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1696                            }
1697                        }
1698                    }
1699                }
1700                Expr::IsNull(expr) => {
1701                    let val = this
1702                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1703                        .await?;
1704                    Ok(Value::Bool(val.is_null()))
1705                }
1706                Expr::IsNotNull(expr) => {
1707                    let val = this
1708                        .evaluate_expr(expr, row, prop_manager, params, ctx)
1709                        .await?;
1710                    Ok(Value::Bool(!val.is_null()))
1711                }
1712                Expr::IsUnique(_) => {
1713                    // IS UNIQUE is only valid in constraint definitions, not in query expressions
1714                    Err(anyhow!(
1715                        "IS UNIQUE can only be used in constraint definitions"
1716                    ))
1717                }
1718                Expr::Case {
1719                    expr,
1720                    when_then,
1721                    else_expr,
1722                } => {
1723                    if let Some(base_expr) = expr {
1724                        let base_val = this
1725                            .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1726                            .await?;
1727                        for (w, t) in when_then {
1728                            let w_val = this
1729                                .evaluate_expr(w, row, prop_manager, params, ctx)
1730                                .await?;
1731                            if base_val == w_val {
1732                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1733                            }
1734                        }
1735                    } else {
1736                        for (w, t) in when_then {
1737                            let w_val = this
1738                                .evaluate_expr(w, row, prop_manager, params, ctx)
1739                                .await?;
1740                            if w_val.as_bool() == Some(true) {
1741                                return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1742                            }
1743                        }
1744                    }
1745                    if let Some(e) = else_expr {
1746                        return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1747                    }
1748                    Ok(Value::Null)
1749                }
1750                Expr::Wildcard => Ok(Value::Null),
1751                Expr::FunctionCall { name, args, .. } => {
1752                    // Special case: id() returns VID for nodes and EID for relationships
1753                    if name.eq_ignore_ascii_case("ID") {
1754                        if args.len() != 1 {
1755                            return Err(anyhow!("id() requires exactly 1 argument"));
1756                        }
1757                        let val = this
1758                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1759                            .await?;
1760                        if let Value::Map(map) = &val {
1761                            // Check for _vid (vertex) first
1762                            if let Some(vid_val) = map.get("_vid") {
1763                                return Ok(vid_val.clone());
1764                            }
1765                            // Check for _eid (edge/relationship)
1766                            if let Some(eid_val) = map.get("_eid") {
1767                                return Ok(eid_val.clone());
1768                            }
1769                            // Check for _id (fallback)
1770                            if let Some(id_val) = map.get("_id") {
1771                                return Ok(id_val.clone());
1772                            }
1773                        }
1774                        return Ok(Value::Null);
1775                    }
1776
1777                    // Special case: elementId() returns string format "label_id:local_offset"
1778                    if name.eq_ignore_ascii_case("ELEMENTID") {
1779                        if args.len() != 1 {
1780                            return Err(anyhow!("elementId() requires exactly 1 argument"));
1781                        }
1782                        let val = this
1783                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1784                            .await?;
1785                        if let Value::Map(map) = &val {
1786                            // Check for _vid (vertex) first
1787                            // In new storage model, VIDs are pure auto-increment - return as simple ID string
1788                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1789                                return Ok(Value::String(vid_val.to_string()));
1790                            }
1791                            // Check for _eid (edge/relationship)
1792                            // In new storage model, EIDs are pure auto-increment - return as simple ID string
1793                            if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1794                                return Ok(Value::String(eid_val.to_string()));
1795                            }
1796                        }
1797                        return Ok(Value::Null);
1798                    }
1799
1800                    // Special case: type() returns the relationship type name
1801                    if name.eq_ignore_ascii_case("TYPE") {
1802                        if args.len() != 1 {
1803                            return Err(anyhow!("type() requires exactly 1 argument"));
1804                        }
1805                        let val = this
1806                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1807                            .await?;
1808                        if let Value::Map(map) = &val
1809                            && let Some(type_val) = map.get("_type")
1810                        {
1811                            // Numeric _type is an edge type ID; string _type is already a name
1812                            if let Some(type_id) =
1813                                type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1814                            {
1815                                if let Some(name) = this
1816                                    .storage
1817                                    .schema_manager()
1818                                    .edge_type_name_by_id_unified(type_id)
1819                                {
1820                                    return Ok(Value::String(name));
1821                                }
1822                            } else if let Some(name) = type_val.as_str() {
1823                                return Ok(Value::String(name.to_string()));
1824                            }
1825                        }
1826                        return Ok(Value::Null);
1827                    }
1828
1829                    // Special case: labels() returns the labels of a node
1830                    if name.eq_ignore_ascii_case("LABELS") {
1831                        if args.len() != 1 {
1832                            return Err(anyhow!("labels() requires exactly 1 argument"));
1833                        }
1834                        let val = this
1835                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1836                            .await?;
1837                        if let Value::Map(map) = &val
1838                            && let Some(labels_val) = map.get("_labels")
1839                        {
1840                            return Ok(labels_val.clone());
1841                        }
1842                        return Ok(Value::Null);
1843                    }
1844
1845                    // Special case: properties() returns the properties map of a node/edge
1846                    if name.eq_ignore_ascii_case("PROPERTIES") {
1847                        if args.len() != 1 {
1848                            return Err(anyhow!("properties() requires exactly 1 argument"));
1849                        }
1850                        let val = this
1851                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1852                            .await?;
1853                        if let Value::Map(map) = &val {
1854                            // Filter out internal properties (those starting with _)
1855                            let mut props = HashMap::new();
1856                            for (k, v) in map.iter() {
1857                                if !k.starts_with('_') {
1858                                    props.insert(k.clone(), v.clone());
1859                                }
1860                            }
1861                            return Ok(Value::Map(props));
1862                        }
1863                        return Ok(Value::Null);
1864                    }
1865
1866                    // Special case: startNode() returns the start node of a relationship
1867                    if name.eq_ignore_ascii_case("STARTNODE") {
1868                        if args.len() != 1 {
1869                            return Err(anyhow!("startNode() requires exactly 1 argument"));
1870                        }
1871                        let val = this
1872                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1873                            .await?;
1874                        if let Value::Edge(edge) = &val {
1875                            return Ok(Self::find_node_by_vid(row, edge.src));
1876                        }
1877                        if let Value::Map(map) = &val {
1878                            if let Some(start_node) = map.get("_startNode") {
1879                                return Ok(start_node.clone());
1880                            }
1881                            if let Some(src_vid) = map.get("_src_vid") {
1882                                return Ok(Value::Map(HashMap::from([(
1883                                    "_vid".to_string(),
1884                                    src_vid.clone(),
1885                                )])));
1886                            }
1887                            // Resolve _src VID by looking up node in row
1888                            if let Some(src_id) = map.get("_src")
1889                                && let Some(u) = src_id.as_u64()
1890                            {
1891                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1892                            }
1893                        }
1894                        return Ok(Value::Null);
1895                    }
1896
1897                    // Special case: endNode() returns the end node of a relationship
1898                    if name.eq_ignore_ascii_case("ENDNODE") {
1899                        if args.len() != 1 {
1900                            return Err(anyhow!("endNode() requires exactly 1 argument"));
1901                        }
1902                        let val = this
1903                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1904                            .await?;
1905                        if let Value::Edge(edge) = &val {
1906                            return Ok(Self::find_node_by_vid(row, edge.dst));
1907                        }
1908                        if let Value::Map(map) = &val {
1909                            if let Some(end_node) = map.get("_endNode") {
1910                                return Ok(end_node.clone());
1911                            }
1912                            if let Some(dst_vid) = map.get("_dst_vid") {
1913                                return Ok(Value::Map(HashMap::from([(
1914                                    "_vid".to_string(),
1915                                    dst_vid.clone(),
1916                                )])));
1917                            }
1918                            // Resolve _dst VID by looking up node in row
1919                            if let Some(dst_id) = map.get("_dst")
1920                                && let Some(u) = dst_id.as_u64()
1921                            {
1922                                return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1923                            }
1924                        }
1925                        return Ok(Value::Null);
1926                    }
1927
1928                    // Special case: hasLabel() checks if a node has a specific label
1929                    // Used for WHERE n:Label predicates
1930                    if name.eq_ignore_ascii_case("HASLABEL") {
1931                        if args.len() != 2 {
1932                            return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1933                        }
1934                        let node_val = this
1935                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1936                            .await?;
1937                        let label_val = this
1938                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1939                            .await?;
1940
1941                        let label_to_check = label_val.as_str().ok_or_else(|| {
1942                            anyhow!("Second argument to hasLabel must be a string")
1943                        })?;
1944
1945                        let has_label = match &node_val {
1946                            // Handle proper Value::Node type (from result normalization)
1947                            Value::Map(map) if map.contains_key("_vid") => {
1948                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1949                                    labels_arr
1950                                        .iter()
1951                                        .any(|l| l.as_str() == Some(label_to_check))
1952                                } else {
1953                                    false
1954                                }
1955                            }
1956                            // Also handle legacy Object format
1957                            Value::Map(map) => {
1958                                if let Some(Value::List(labels_arr)) = map.get("_labels") {
1959                                    labels_arr
1960                                        .iter()
1961                                        .any(|l| l.as_str() == Some(label_to_check))
1962                                } else {
1963                                    false
1964                                }
1965                            }
1966                            _ => false,
1967                        };
1968                        return Ok(Value::Bool(has_label));
1969                    }
1970
1971                    // Quantifier functions (ANY/ALL/NONE/SINGLE) as function calls are not supported.
1972                    // These should be parsed as Expr::Quantifier instead.
1973                    if matches!(
1974                        name.to_uppercase().as_str(),
1975                        "ANY" | "ALL" | "NONE" | "SINGLE"
1976                    ) {
1977                        return Err(anyhow!(
1978                            "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1979                            name.to_lowercase()
1980                        ));
1981                    }
1982
1983                    // Special case: COALESCE needs short-circuit evaluation
1984                    if name.eq_ignore_ascii_case("COALESCE") {
1985                        for arg in args {
1986                            let val = this
1987                                .evaluate_expr(arg, row, prop_manager, params, ctx)
1988                                .await?;
1989                            if !val.is_null() {
1990                                return Ok(val);
1991                            }
1992                        }
1993                        return Ok(Value::Null);
1994                    }
1995
1996                    // Special case: vector_similarity has dedicated implementation
1997                    if name.eq_ignore_ascii_case("vector_similarity") {
1998                        if args.len() != 2 {
1999                            return Err(anyhow!("vector_similarity takes 2 arguments"));
2000                        }
2001                        let v1 = this
2002                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2003                            .await?;
2004                        let v2 = this
2005                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2006                            .await?;
2007                        return eval_vector_similarity(&v1, &v2);
2008                    }
2009
2010                    // Special case: uni.validAt handles node fetching
2011                    if name.eq_ignore_ascii_case("uni.temporal.validAt")
2012                        || name.eq_ignore_ascii_case("uni.validAt")
2013                        || name.eq_ignore_ascii_case("validAt")
2014                    {
2015                        if args.len() != 4 {
2016                            return Err(anyhow!("validAt requires 4 arguments"));
2017                        }
2018                        let node_val = this
2019                            .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2020                            .await?;
2021                        let start_prop = this
2022                            .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2023                            .await?
2024                            .as_str()
2025                            .ok_or(anyhow!("start_prop must be string"))?
2026                            .to_string();
2027                        let end_prop = this
2028                            .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2029                            .await?
2030                            .as_str()
2031                            .ok_or(anyhow!("end_prop must be string"))?
2032                            .to_string();
2033                        let time_val = this
2034                            .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2035                            .await?;
2036
2037                        let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2038                            anyhow!("time argument must be a datetime value or string")
2039                        })?;
2040
2041                        // Fetch temporal property values - supports both vertices and edges
2042                        let valid_from_val: Option<Value> = if let Ok(vid) =
2043                            Self::vid_from_value(&node_val)
2044                        {
2045                            // Vertex case - VID string format
2046                            prop_manager
2047                                .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2048                                .await
2049                                .ok()
2050                        } else if let Value::Map(map) = &node_val {
2051                            // Check for embedded _vid or _eid in object
2052                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2053                                let vid = Vid::from(vid_val);
2054                                prop_manager
2055                                    .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2056                                    .await
2057                                    .ok()
2058                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2059                                // Edge case
2060                                let eid = uni_common::core::id::Eid::from(eid_val);
2061                                prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2062                            } else {
2063                                // Inline object - property embedded directly
2064                                map.get(&start_prop).cloned()
2065                            }
2066                        } else {
2067                            return Ok(Value::Bool(false));
2068                        };
2069
2070                        let valid_from = match valid_from_val {
2071                            Some(ref v) => match value_to_datetime_utc(v) {
2072                                Some(dt) => dt,
2073                                None if v.is_null() => return Ok(Value::Bool(false)),
2074                                None => {
2075                                    return Err(anyhow!(
2076                                        "Property {} must be a datetime value or string",
2077                                        start_prop
2078                                    ));
2079                                }
2080                            },
2081                            None => return Ok(Value::Bool(false)),
2082                        };
2083
2084                        let valid_to_val: Option<Value> = if let Ok(vid) =
2085                            Self::vid_from_value(&node_val)
2086                        {
2087                            // Vertex case - VID string format
2088                            prop_manager
2089                                .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2090                                .await
2091                                .ok()
2092                        } else if let Value::Map(map) = &node_val {
2093                            // Check for embedded _vid or _eid in object
2094                            if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2095                                let vid = Vid::from(vid_val);
2096                                prop_manager
2097                                    .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2098                                    .await
2099                                    .ok()
2100                            } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2101                                // Edge case
2102                                let eid = uni_common::core::id::Eid::from(eid_val);
2103                                prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2104                            } else {
2105                                // Inline object - property embedded directly
2106                                map.get(&end_prop).cloned()
2107                            }
2108                        } else {
2109                            return Ok(Value::Bool(false));
2110                        };
2111
2112                        let valid_to = match valid_to_val {
2113                            Some(ref v) => match value_to_datetime_utc(v) {
2114                                Some(dt) => Some(dt),
2115                                None if v.is_null() => None,
2116                                None => {
2117                                    return Err(anyhow!(
2118                                        "Property {} must be a datetime value or null",
2119                                        end_prop
2120                                    ));
2121                                }
2122                            },
2123                            None => None,
2124                        };
2125
2126                        let is_valid = valid_from <= query_time
2127                            && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2128                        return Ok(Value::Bool(is_valid));
2129                    }
2130
2131                    // For all other functions, evaluate arguments then call helper
2132                    let mut evaluated_args = Vec::with_capacity(args.len());
2133                    for arg in args {
2134                        let mut val = this
2135                            .evaluate_expr(arg, row, prop_manager, params, ctx)
2136                            .await?;
2137
2138                        // Eagerly hydrate edge/vertex maps if pushdown hydration didn't load properties.
2139                        // Functions like validAt() need access to properties like valid_from/valid_to.
2140                        if let Value::Map(ref mut map) = val {
2141                            hydrate_entity_if_needed(map, prop_manager, ctx).await;
2142                        }
2143
2144                        evaluated_args.push(val);
2145                    }
2146                    eval_scalar_function(name, &evaluated_args)
2147                }
2148                Expr::Reduce {
2149                    accumulator,
2150                    init,
2151                    variable,
2152                    list,
2153                    expr,
2154                } => {
2155                    let mut acc = self
2156                        .evaluate_expr(init, row, prop_manager, params, ctx)
2157                        .await?;
2158                    let list_val = self
2159                        .evaluate_expr(list, row, prop_manager, params, ctx)
2160                        .await?;
2161
2162                    if let Value::List(items) = list_val {
2163                        for item in items {
2164                            // Create a temporary scope/row with accumulator and variable
2165                            // For simplicity in fallback executor, we can construct a new row map
2166                            // merging current row + new variables.
2167                            let mut scope = row.clone();
2168                            scope.insert(accumulator.clone(), acc.clone());
2169                            scope.insert(variable.clone(), item);
2170
2171                            acc = self
2172                                .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2173                                .await?;
2174                        }
2175                    } else {
2176                        return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2177                    }
2178                    Ok(acc)
2179                }
2180                Expr::ValidAt { .. } => {
2181                    // VALID_AT should have been transformed to a function call in the planner
2182                    Err(anyhow!(
2183                        "VALID_AT expression should have been transformed to function call in planner"
2184                    ))
2185                }
2186
2187                Expr::LabelCheck { expr, labels } => {
2188                    let val = this
2189                        .evaluate_expr(expr, row, prop_manager, params, ctx)
2190                        .await?;
2191                    match &val {
2192                        Value::Null => Ok(Value::Null),
2193                        Value::Map(map) => {
2194                            // Check if this is an edge (has _eid) or node (has _vid)
2195                            let is_edge = map.contains_key("_eid")
2196                                || map.contains_key("_type_name")
2197                                || (map.contains_key("_type") && !map.contains_key("_vid"));
2198
2199                            if is_edge {
2200                                // Edges have a single type
2201                                if labels.len() > 1 {
2202                                    return Ok(Value::Bool(false));
2203                                }
2204                                let label_to_check = &labels[0];
2205                                let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2206                                {
2207                                    t == label_to_check
2208                                } else if let Some(Value::String(t)) = map.get("_type") {
2209                                    t == label_to_check
2210                                } else {
2211                                    false
2212                                };
2213                                Ok(Value::Bool(has_type))
2214                            } else {
2215                                // Node: check all labels
2216                                let has_all = labels.iter().all(|label_to_check| {
2217                                    if let Some(Value::List(labels_arr)) = map.get("_labels") {
2218                                        labels_arr
2219                                            .iter()
2220                                            .any(|l| l.as_str() == Some(label_to_check.as_str()))
2221                                    } else {
2222                                        false
2223                                    }
2224                                });
2225                                Ok(Value::Bool(has_all))
2226                            }
2227                        }
2228                        _ => Ok(Value::Bool(false)),
2229                    }
2230                }
2231
2232                Expr::MapProjection { base, items } => {
2233                    let base_value = this
2234                        .evaluate_expr(base, row, prop_manager, params, ctx)
2235                        .await?;
2236
2237                    // Extract properties from the base object
2238                    let properties = match &base_value {
2239                        Value::Map(map) => map,
2240                        _ => {
2241                            return Err(anyhow!(
2242                                "Map projection requires object, got {:?}",
2243                                base_value
2244                            ));
2245                        }
2246                    };
2247
2248                    let mut result_map = HashMap::new();
2249
2250                    for item in items {
2251                        match item {
2252                            MapProjectionItem::Property(prop) => {
2253                                if let Some(value) = properties.get(prop.as_str()) {
2254                                    result_map.insert(prop.clone(), value.clone());
2255                                }
2256                            }
2257                            MapProjectionItem::AllProperties => {
2258                                // Include all properties except internal fields (those starting with _)
2259                                for (key, value) in properties.iter() {
2260                                    if !key.starts_with('_') {
2261                                        result_map.insert(key.clone(), value.clone());
2262                                    }
2263                                }
2264                            }
2265                            MapProjectionItem::LiteralEntry(key, expr) => {
2266                                let value = this
2267                                    .evaluate_expr(expr, row, prop_manager, params, ctx)
2268                                    .await?;
2269                                result_map.insert(key.clone(), value);
2270                            }
2271                            MapProjectionItem::Variable(var_name) => {
2272                                // Variable selector: include the value of the variable in the result
2273                                // e.g., person{.name, friend} includes the value of 'friend' variable
2274                                if let Some(value) = row.get(var_name.as_str()) {
2275                                    result_map.insert(var_name.clone(), value.clone());
2276                                }
2277                            }
2278                        }
2279                    }
2280
2281                    Ok(Value::Map(result_map))
2282                }
2283            }
2284        })
2285    }
2286
2287    pub(crate) fn execute_subplan<'a>(
2288        &'a self,
2289        plan: LogicalPlan,
2290        prop_manager: &'a PropertyManager,
2291        params: &'a HashMap<String, Value>,
2292        ctx: Option<&'a QueryContext>,
2293    ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2294        Box::pin(async move {
2295            if let Some(ctx) = ctx {
2296                ctx.check_timeout()?;
2297            }
2298            match plan {
2299                LogicalPlan::Union { left, right, all } => {
2300                    self.execute_union(left, right, all, prop_manager, params, ctx)
2301                        .await
2302                }
2303                LogicalPlan::CreateVectorIndex {
2304                    config,
2305                    if_not_exists,
2306                } => {
2307                    if if_not_exists && self.index_exists_by_name(&config.name) {
2308                        return Ok(vec![]);
2309                    }
2310                    let idx_mgr = IndexManager::new(
2311                        self.storage.base_path(),
2312                        self.storage.schema_manager_arc(),
2313                        self.storage.lancedb_store_arc(),
2314                    );
2315                    idx_mgr.create_vector_index(config).await?;
2316                    Ok(vec![])
2317                }
2318                LogicalPlan::CreateFullTextIndex {
2319                    config,
2320                    if_not_exists,
2321                } => {
2322                    if if_not_exists && self.index_exists_by_name(&config.name) {
2323                        return Ok(vec![]);
2324                    }
2325                    let idx_mgr = IndexManager::new(
2326                        self.storage.base_path(),
2327                        self.storage.schema_manager_arc(),
2328                        self.storage.lancedb_store_arc(),
2329                    );
2330                    idx_mgr.create_fts_index(config).await?;
2331                    Ok(vec![])
2332                }
2333                LogicalPlan::CreateScalarIndex {
2334                    mut config,
2335                    if_not_exists,
2336                } => {
2337                    if if_not_exists && self.index_exists_by_name(&config.name) {
2338                        return Ok(vec![]);
2339                    }
2340
2341                    // Check for expression indexes - create generated columns
2342                    let mut modified_properties = Vec::new();
2343
2344                    for prop in &config.properties {
2345                        // Heuristic: if contains '(' and ')', it's an expression
2346                        if prop.contains('(') && prop.contains(')') {
2347                            let gen_col = SchemaManager::generated_column_name(prop);
2348
2349                            // Add generated property to schema
2350                            let sm = self.storage.schema_manager_arc();
2351                            if let Err(e) = sm.add_generated_property(
2352                                &config.label,
2353                                &gen_col,
2354                                DataType::String, // Default type for expressions
2355                                prop.clone(),
2356                            ) {
2357                                log::warn!("Failed to add generated property (might exist): {}", e);
2358                            }
2359
2360                            modified_properties.push(gen_col);
2361                        } else {
2362                            // Simple property - use as-is
2363                            modified_properties.push(prop.clone());
2364                        }
2365                    }
2366
2367                    config.properties = modified_properties;
2368
2369                    let idx_mgr = IndexManager::new(
2370                        self.storage.base_path(),
2371                        self.storage.schema_manager_arc(),
2372                        self.storage.lancedb_store_arc(),
2373                    );
2374                    idx_mgr.create_scalar_index(config).await?;
2375                    Ok(vec![])
2376                }
2377                LogicalPlan::CreateJsonFtsIndex {
2378                    config,
2379                    if_not_exists,
2380                } => {
2381                    if if_not_exists && self.index_exists_by_name(&config.name) {
2382                        return Ok(vec![]);
2383                    }
2384                    let idx_mgr = IndexManager::new(
2385                        self.storage.base_path(),
2386                        self.storage.schema_manager_arc(),
2387                        self.storage.lancedb_store_arc(),
2388                    );
2389                    idx_mgr.create_json_fts_index(config).await?;
2390                    Ok(vec![])
2391                }
2392                LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2393                LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2394                LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2395                LogicalPlan::Vacuum => {
2396                    self.execute_vacuum().await?;
2397                    Ok(vec![])
2398                }
2399                LogicalPlan::Checkpoint => {
2400                    self.execute_checkpoint().await?;
2401                    Ok(vec![])
2402                }
2403                LogicalPlan::CopyTo {
2404                    label,
2405                    path,
2406                    format,
2407                    options,
2408                } => {
2409                    let count = self
2410                        .execute_copy_to(&label, &path, &format, &options)
2411                        .await?;
2412                    let mut result = HashMap::new();
2413                    result.insert("count".to_string(), Value::Int(count as i64));
2414                    Ok(vec![result])
2415                }
2416                LogicalPlan::CopyFrom {
2417                    label,
2418                    path,
2419                    format,
2420                    options,
2421                } => {
2422                    let count = self
2423                        .execute_copy_from(&label, &path, &format, &options)
2424                        .await?;
2425                    let mut result = HashMap::new();
2426                    result.insert("count".to_string(), Value::Int(count as i64));
2427                    Ok(vec![result])
2428                }
2429                LogicalPlan::CreateLabel(clause) => {
2430                    self.execute_create_label(clause).await?;
2431                    Ok(vec![])
2432                }
2433                LogicalPlan::CreateEdgeType(clause) => {
2434                    self.execute_create_edge_type(clause).await?;
2435                    Ok(vec![])
2436                }
2437                LogicalPlan::AlterLabel(clause) => {
2438                    self.execute_alter_label(clause).await?;
2439                    Ok(vec![])
2440                }
2441                LogicalPlan::AlterEdgeType(clause) => {
2442                    self.execute_alter_edge_type(clause).await?;
2443                    Ok(vec![])
2444                }
2445                LogicalPlan::DropLabel(clause) => {
2446                    self.execute_drop_label(clause).await?;
2447                    Ok(vec![])
2448                }
2449                LogicalPlan::DropEdgeType(clause) => {
2450                    self.execute_drop_edge_type(clause).await?;
2451                    Ok(vec![])
2452                }
2453                LogicalPlan::CreateConstraint(clause) => {
2454                    self.execute_create_constraint(clause).await?;
2455                    Ok(vec![])
2456                }
2457                LogicalPlan::DropConstraint(clause) => {
2458                    self.execute_drop_constraint(clause).await?;
2459                    Ok(vec![])
2460                }
2461                LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2462                LogicalPlan::DropIndex { name, if_exists } => {
2463                    let idx_mgr = IndexManager::new(
2464                        self.storage.base_path(),
2465                        self.storage.schema_manager_arc(),
2466                        self.storage.lancedb_store_arc(),
2467                    );
2468                    match idx_mgr.drop_index(&name).await {
2469                        Ok(_) => Ok(vec![]),
2470                        Err(e) => {
2471                            if if_exists && e.to_string().contains("not found") {
2472                                Ok(vec![])
2473                            } else {
2474                                Err(e)
2475                            }
2476                        }
2477                    }
2478                }
2479                LogicalPlan::ShowIndexes { filter } => {
2480                    Ok(self.execute_show_indexes(filter.as_deref()))
2481                }
2482                // Scan/traverse nodes: delegate to DataFusion for data access,
2483                // then convert results to HashMaps for the fallback executor.
2484                LogicalPlan::Scan { .. }
2485                | LogicalPlan::ExtIdLookup { .. }
2486                | LogicalPlan::ScanAll { .. }
2487                | LogicalPlan::ScanMainByLabels { .. }
2488                | LogicalPlan::Traverse { .. }
2489                | LogicalPlan::TraverseMainByType { .. } => {
2490                    let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2491                    self.record_batches_to_rows(batches)
2492                }
2493                LogicalPlan::Filter {
2494                    input,
2495                    predicate,
2496                    optional_variables,
2497                } => {
2498                    let input_matches = self
2499                        .execute_subplan(*input, prop_manager, params, ctx)
2500                        .await?;
2501
2502                    tracing::debug!(
2503                        "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2504                        predicate,
2505                        input_matches.len(),
2506                        optional_variables
2507                    );
2508
2509                    // For OPTIONAL MATCH with WHERE: we need LEFT OUTER JOIN semantics.
2510                    // Group rows by non-optional variables, apply filter, and ensure
2511                    // at least one row per group (with NULLs if filter removes all).
2512                    if !optional_variables.is_empty() {
2513                        // Helper to check if a key belongs to an optional variable.
2514                        // Keys can be "var" or "var.field" (e.g., "m" or "m._vid").
2515                        let is_optional_key = |k: &str| -> bool {
2516                            optional_variables.contains(k)
2517                                || optional_variables
2518                                    .iter()
2519                                    .any(|var| k.starts_with(&format!("{}.", var)))
2520                        };
2521
2522                        // Helper to check if a key is internal (should not affect grouping)
2523                        let is_internal_key =
2524                            |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2525
2526                        // Compute the key (non-optional, non-internal variables) for grouping
2527                        let non_optional_vars: Vec<String> = input_matches
2528                            .first()
2529                            .map(|row| {
2530                                row.keys()
2531                                    .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2532                                    .cloned()
2533                                    .collect()
2534                            })
2535                            .unwrap_or_default();
2536
2537                        // Group rows by their non-optional variable values
2538                        let mut groups: std::collections::HashMap<
2539                            Vec<u8>,
2540                            Vec<HashMap<String, Value>>,
2541                        > = std::collections::HashMap::new();
2542
2543                        for row in &input_matches {
2544                            // Create a key from non-optional variable values
2545                            let key: Vec<u8> = non_optional_vars
2546                                .iter()
2547                                .map(|var| {
2548                                    row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2549                                })
2550                                .collect::<Vec<_>>()
2551                                .join("|")
2552                                .into_bytes();
2553
2554                            groups.entry(key).or_default().push(row.clone());
2555                        }
2556
2557                        let mut filtered = Vec::new();
2558                        for (_key, group_rows) in groups {
2559                            let mut group_passed = Vec::new();
2560
2561                            for row in &group_rows {
2562                                // If optional variables are already NULL, preserve the row
2563                                let has_null_optional = optional_variables.iter().any(|var| {
2564                                    // Check both "var" and "var._vid" style keys
2565                                    let direct_null =
2566                                        matches!(row.get(var), Some(Value::Null) | None);
2567                                    let prefixed_null = row
2568                                        .keys()
2569                                        .filter(|k| k.starts_with(&format!("{}.", var)))
2570                                        .any(|k| matches!(row.get(k), Some(Value::Null)));
2571                                    direct_null || prefixed_null
2572                                });
2573
2574                                if has_null_optional {
2575                                    group_passed.push(row.clone());
2576                                    continue;
2577                                }
2578
2579                                let res = self
2580                                    .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2581                                    .await?;
2582
2583                                if res.as_bool().unwrap_or(false) {
2584                                    group_passed.push(row.clone());
2585                                }
2586                            }
2587
2588                            if group_passed.is_empty() {
2589                                // No rows passed - emit one row with NULLs for optional variables
2590                                // Use the first row's non-optional values as a template
2591                                if let Some(template) = group_rows.first() {
2592                                    let mut null_row = HashMap::new();
2593                                    for (k, v) in template {
2594                                        if is_optional_key(k) {
2595                                            null_row.insert(k.clone(), Value::Null);
2596                                        } else {
2597                                            null_row.insert(k.clone(), v.clone());
2598                                        }
2599                                    }
2600                                    filtered.push(null_row);
2601                                }
2602                            } else {
2603                                filtered.extend(group_passed);
2604                            }
2605                        }
2606
2607                        tracing::debug!(
2608                            "Filter (OPTIONAL): {} input rows -> {} output rows",
2609                            input_matches.len(),
2610                            filtered.len()
2611                        );
2612
2613                        return Ok(filtered);
2614                    }
2615
2616                    // Standard filter for non-OPTIONAL MATCH
2617                    let mut filtered = Vec::new();
2618                    for row in input_matches.iter() {
2619                        let res = self
2620                            .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2621                            .await?;
2622
2623                        let passes = res.as_bool().unwrap_or(false);
2624
2625                        if passes {
2626                            filtered.push(row.clone());
2627                        }
2628                    }
2629
2630                    tracing::debug!(
2631                        "Filter: {} input rows -> {} output rows",
2632                        input_matches.len(),
2633                        filtered.len()
2634                    );
2635
2636                    Ok(filtered)
2637                }
2638                LogicalPlan::ProcedureCall {
2639                    procedure_name,
2640                    arguments,
2641                    yield_items,
2642                } => {
2643                    let yield_names: Vec<String> =
2644                        yield_items.iter().map(|(n, _)| n.clone()).collect();
2645                    let results = self
2646                        .execute_procedure(
2647                            &procedure_name,
2648                            &arguments,
2649                            &yield_names,
2650                            prop_manager,
2651                            params,
2652                            ctx,
2653                        )
2654                        .await?;
2655
2656                    // Handle aliasing: collect all original values first, then
2657                    // build the aliased row in one pass. This avoids issues when
2658                    // an alias matches another yield item's original name (e.g.,
2659                    // YIELD a AS b, b AS d — renaming "a" to "b" must not
2660                    // clobber the original "b" before it is renamed to "d").
2661                    let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2662                    if !has_aliases {
2663                        // No aliases (includes YIELD * which produces empty yield_items) —
2664                        // pass through the procedure output rows unchanged.
2665                        Ok(results)
2666                    } else {
2667                        let mut aliased_results = Vec::with_capacity(results.len());
2668                        for row in results {
2669                            let mut new_row = HashMap::new();
2670                            for (name, alias) in &yield_items {
2671                                let col_name = alias.as_ref().unwrap_or(name);
2672                                let val = row.get(name).cloned().unwrap_or(Value::Null);
2673                                new_row.insert(col_name.clone(), val);
2674                            }
2675                            aliased_results.push(new_row);
2676                        }
2677                        Ok(aliased_results)
2678                    }
2679                }
2680                LogicalPlan::VectorKnn { .. } => {
2681                    unreachable!("VectorKnn is handled by DataFusion engine")
2682                }
2683                LogicalPlan::InvertedIndexLookup { .. } => {
2684                    unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2685                }
2686                LogicalPlan::Sort { input, order_by } => {
2687                    let rows = self
2688                        .execute_subplan(*input, prop_manager, params, ctx)
2689                        .await?;
2690                    self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2691                        .await
2692                }
2693                LogicalPlan::Limit { input, skip, fetch } => {
2694                    let rows = self
2695                        .execute_subplan(*input, prop_manager, params, ctx)
2696                        .await?;
2697                    let skip = skip.unwrap_or(0);
2698                    let take = fetch.unwrap_or(usize::MAX);
2699                    Ok(rows.into_iter().skip(skip).take(take).collect())
2700                }
2701                LogicalPlan::Aggregate {
2702                    input,
2703                    group_by,
2704                    aggregates,
2705                } => {
2706                    let rows = self
2707                        .execute_subplan(*input, prop_manager, params, ctx)
2708                        .await?;
2709                    self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2710                        .await
2711                }
2712                LogicalPlan::Window {
2713                    input,
2714                    window_exprs,
2715                } => {
2716                    let rows = self
2717                        .execute_subplan(*input, prop_manager, params, ctx)
2718                        .await?;
2719                    self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2720                        .await
2721                }
2722                LogicalPlan::Project { input, projections } => {
2723                    let matches = self
2724                        .execute_subplan(*input, prop_manager, params, ctx)
2725                        .await?;
2726                    self.execute_project(matches, &projections, prop_manager, params, ctx)
2727                        .await
2728                }
2729                LogicalPlan::Distinct { input } => {
2730                    let rows = self
2731                        .execute_subplan(*input, prop_manager, params, ctx)
2732                        .await?;
2733                    let mut seen = std::collections::HashSet::new();
2734                    let mut result = Vec::new();
2735                    for row in rows {
2736                        let key = Self::canonical_row_key(&row);
2737                        if seen.insert(key) {
2738                            result.push(row);
2739                        }
2740                    }
2741                    Ok(result)
2742                }
2743                LogicalPlan::Unwind {
2744                    input,
2745                    expr,
2746                    variable,
2747                } => {
2748                    let input_rows = self
2749                        .execute_subplan(*input, prop_manager, params, ctx)
2750                        .await?;
2751                    self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2752                        .await
2753                }
2754                LogicalPlan::Apply {
2755                    input,
2756                    subquery,
2757                    input_filter,
2758                } => {
2759                    let input_rows = self
2760                        .execute_subplan(*input, prop_manager, params, ctx)
2761                        .await?;
2762                    self.execute_apply(
2763                        input_rows,
2764                        &subquery,
2765                        input_filter.as_ref(),
2766                        prop_manager,
2767                        params,
2768                        ctx,
2769                    )
2770                    .await
2771                }
2772                LogicalPlan::SubqueryCall { input, subquery } => {
2773                    let input_rows = self
2774                        .execute_subplan(*input, prop_manager, params, ctx)
2775                        .await?;
2776                    // Execute subquery for each input row (correlated)
2777                    // No input_filter for CALL { }
2778                    self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2779                        .await
2780                }
2781                LogicalPlan::RecursiveCTE {
2782                    cte_name,
2783                    initial,
2784                    recursive,
2785                } => {
2786                    self.execute_recursive_cte(
2787                        &cte_name,
2788                        *initial,
2789                        *recursive,
2790                        prop_manager,
2791                        params,
2792                        ctx,
2793                    )
2794                    .await
2795                }
2796                LogicalPlan::CrossJoin { left, right } => {
2797                    self.execute_cross_join(left, right, prop_manager, params, ctx)
2798                        .await
2799                }
2800                LogicalPlan::Set { .. }
2801                | LogicalPlan::Remove { .. }
2802                | LogicalPlan::Merge { .. }
2803                | LogicalPlan::Create { .. }
2804                | LogicalPlan::CreateBatch { .. } => {
2805                    unreachable!("mutations are handled by DataFusion engine")
2806                }
2807                LogicalPlan::Delete { .. } => {
2808                    unreachable!("mutations are handled by DataFusion engine")
2809                }
2810                LogicalPlan::Begin => {
2811                    if let Some(writer_lock) = &self.writer {
2812                        let mut writer = writer_lock.write().await;
2813                        writer.begin_transaction()?;
2814                    } else {
2815                        return Err(anyhow!("Transaction requires a Writer"));
2816                    }
2817                    Ok(vec![HashMap::new()])
2818                }
2819                LogicalPlan::Commit => {
2820                    if let Some(writer_lock) = &self.writer {
2821                        let mut writer = writer_lock.write().await;
2822                        writer.commit_transaction().await?;
2823                    } else {
2824                        return Err(anyhow!("Transaction requires a Writer"));
2825                    }
2826                    Ok(vec![HashMap::new()])
2827                }
2828                LogicalPlan::Rollback => {
2829                    if let Some(writer_lock) = &self.writer {
2830                        let mut writer = writer_lock.write().await;
2831                        writer.rollback_transaction()?;
2832                    } else {
2833                        return Err(anyhow!("Transaction requires a Writer"));
2834                    }
2835                    Ok(vec![HashMap::new()])
2836                }
2837                LogicalPlan::Copy {
2838                    target,
2839                    source,
2840                    is_export,
2841                    options,
2842                } => {
2843                    if is_export {
2844                        self.execute_export(&target, &source, &options, prop_manager, ctx)
2845                            .await
2846                    } else {
2847                        self.execute_copy(&target, &source, &options, prop_manager)
2848                            .await
2849                    }
2850                }
2851                LogicalPlan::Backup {
2852                    destination,
2853                    options,
2854                } => self.execute_backup(&destination, &options).await,
2855                LogicalPlan::Explain { plan } => {
2856                    let plan_str = format!("{:#?}", plan);
2857                    let mut row = HashMap::new();
2858                    row.insert("plan".to_string(), Value::String(plan_str));
2859                    Ok(vec![row])
2860                }
2861                LogicalPlan::ShortestPath { .. } => {
2862                    unreachable!("ShortestPath is handled by DataFusion engine")
2863                }
2864                LogicalPlan::AllShortestPaths { .. } => {
2865                    unreachable!("AllShortestPaths is handled by DataFusion engine")
2866                }
2867                LogicalPlan::Foreach { .. } => {
2868                    unreachable!("mutations are handled by DataFusion engine")
2869                }
2870                LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2871                LogicalPlan::BindZeroLengthPath { .. } => {
2872                    unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2873                }
2874                LogicalPlan::BindPath { .. } => {
2875                    unreachable!("BindPath is handled by DataFusion engine")
2876                }
2877                LogicalPlan::QuantifiedPattern { .. } => {
2878                    unreachable!("QuantifiedPattern is handled by DataFusion engine")
2879                }
2880                LogicalPlan::LocyProgram { .. }
2881                | LogicalPlan::LocyFold { .. }
2882                | LogicalPlan::LocyBestBy { .. }
2883                | LogicalPlan::LocyPriority { .. }
2884                | LogicalPlan::LocyDerivedScan { .. }
2885                | LogicalPlan::LocyProject { .. } => {
2886                    unreachable!("Locy operators are handled by DataFusion engine")
2887                }
2888            }
2889        })
2890    }
2891
2892    /// Execute a single plan from a FOREACH body with the given scope.
2893    ///
2894    /// Used by the DataFusion ForeachExec operator to delegate body clause
2895    /// execution back to the executor.
2896    pub(crate) async fn execute_foreach_body_plan(
2897        &self,
2898        plan: LogicalPlan,
2899        scope: &mut HashMap<String, Value>,
2900        writer: &mut uni_store::runtime::writer::Writer,
2901        prop_manager: &PropertyManager,
2902        params: &HashMap<String, Value>,
2903        ctx: Option<&QueryContext>,
2904    ) -> Result<()> {
2905        match plan {
2906            LogicalPlan::Set { items, .. } => {
2907                self.execute_set_items_locked(&items, scope, writer, prop_manager, params, ctx)
2908                    .await?;
2909            }
2910            LogicalPlan::Remove { items, .. } => {
2911                self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx)
2912                    .await?;
2913            }
2914            LogicalPlan::Delete { items, detach, .. } => {
2915                for expr in &items {
2916                    let val = self
2917                        .evaluate_expr(expr, scope, prop_manager, params, ctx)
2918                        .await?;
2919                    self.execute_delete_item_locked(&val, detach, writer)
2920                        .await?;
2921                }
2922            }
2923            LogicalPlan::Create { pattern, .. } => {
2924                self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2925                    .await?;
2926            }
2927            LogicalPlan::CreateBatch { patterns, .. } => {
2928                for pattern in &patterns {
2929                    self.execute_create_pattern(pattern, scope, writer, prop_manager, params, ctx)
2930                        .await?;
2931                }
2932            }
2933            LogicalPlan::Merge {
2934                pattern,
2935                on_match: _,
2936                on_create,
2937                ..
2938            } => {
2939                self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2940                    .await?;
2941                if let Some(on_create_clause) = on_create {
2942                    self.execute_set_items_locked(
2943                        &on_create_clause.items,
2944                        scope,
2945                        writer,
2946                        prop_manager,
2947                        params,
2948                        ctx,
2949                    )
2950                    .await?;
2951                }
2952            }
2953            LogicalPlan::Foreach {
2954                variable,
2955                list,
2956                body,
2957                ..
2958            } => {
2959                let list_val = self
2960                    .evaluate_expr(&list, scope, prop_manager, params, ctx)
2961                    .await?;
2962                let items = match list_val {
2963                    Value::List(arr) => arr,
2964                    Value::Null => return Ok(()),
2965                    _ => return Err(anyhow!("FOREACH requires a list")),
2966                };
2967                for item in items {
2968                    let mut nested_scope = scope.clone();
2969                    nested_scope.insert(variable.clone(), item);
2970                    for nested_plan in &body {
2971                        Box::pin(self.execute_foreach_body_plan(
2972                            nested_plan.clone(),
2973                            &mut nested_scope,
2974                            writer,
2975                            prop_manager,
2976                            params,
2977                            ctx,
2978                        ))
2979                        .await?;
2980                    }
2981                }
2982            }
2983            _ => {
2984                return Err(anyhow!(
2985                    "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2986                ));
2987            }
2988        }
2989        Ok(())
2990    }
2991
2992    fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2993        let mut pairs: Vec<_> = row.iter().collect();
2994        pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2995
2996        pairs
2997            .into_iter()
2998            .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2999            .collect::<Vec<_>>()
3000            .join("|")
3001    }
3002
3003    fn canonical_value_key(v: &Value) -> String {
3004        match v {
3005            Value::Null => "null".to_string(),
3006            Value::Bool(b) => format!("b:{b}"),
3007            Value::Int(i) => format!("n:{i}"),
3008            Value::Float(f) => {
3009                if f.is_nan() {
3010                    "nan".to_string()
3011                } else if f.is_infinite() {
3012                    if f.is_sign_positive() {
3013                        "inf:+".to_string()
3014                    } else {
3015                        "inf:-".to_string()
3016                    }
3017                } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3018                    format!("n:{}", *f as i64)
3019                } else {
3020                    format!("f:{f}")
3021                }
3022            }
3023            Value::String(s) => {
3024                if let Some(k) = Self::temporal_string_key(s) {
3025                    format!("temporal:{k}")
3026                } else {
3027                    format!("s:{s}")
3028                }
3029            }
3030            Value::Bytes(b) => format!("bytes:{:?}", b),
3031            Value::List(items) => format!(
3032                "list:[{}]",
3033                items
3034                    .iter()
3035                    .map(Self::canonical_value_key)
3036                    .collect::<Vec<_>>()
3037                    .join(",")
3038            ),
3039            Value::Map(map) => {
3040                let mut pairs: Vec<_> = map.iter().collect();
3041                pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3042                format!(
3043                    "map:{{{}}}",
3044                    pairs
3045                        .into_iter()
3046                        .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3047                        .collect::<Vec<_>>()
3048                        .join(",")
3049                )
3050            }
3051            Value::Node(n) => {
3052                let mut labels = n.labels.clone();
3053                labels.sort();
3054                format!(
3055                    "node:{}:{}:{}",
3056                    n.vid.as_u64(),
3057                    labels.join(":"),
3058                    Self::canonical_value_key(&Value::Map(n.properties.clone()))
3059                )
3060            }
3061            Value::Edge(e) => format!(
3062                "edge:{}:{}:{}:{}:{}",
3063                e.eid.as_u64(),
3064                e.edge_type,
3065                e.src.as_u64(),
3066                e.dst.as_u64(),
3067                Self::canonical_value_key(&Value::Map(e.properties.clone()))
3068            ),
3069            Value::Path(p) => format!(
3070                "path:nodes=[{}];edges=[{}]",
3071                p.nodes
3072                    .iter()
3073                    .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3074                    .collect::<Vec<_>>()
3075                    .join(","),
3076                p.edges
3077                    .iter()
3078                    .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3079                    .collect::<Vec<_>>()
3080                    .join(",")
3081            ),
3082            Value::Vector(vs) => format!("vec:{:?}", vs),
3083            Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3084            _ => format!("{v:?}"),
3085        }
3086    }
3087
3088    fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3089        match t {
3090            uni_common::TemporalValue::Date { days_since_epoch } => {
3091                format!("date:{days_since_epoch}")
3092            }
3093            uni_common::TemporalValue::LocalTime {
3094                nanos_since_midnight,
3095            } => format!("localtime:{nanos_since_midnight}"),
3096            uni_common::TemporalValue::Time {
3097                nanos_since_midnight,
3098                offset_seconds,
3099            } => {
3100                let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3101                format!("time:{utc_nanos}")
3102            }
3103            uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3104                format!("localdatetime:{nanos_since_epoch}")
3105            }
3106            uni_common::TemporalValue::DateTime {
3107                nanos_since_epoch, ..
3108            } => format!("datetime:{nanos_since_epoch}"),
3109            uni_common::TemporalValue::Duration {
3110                months,
3111                days,
3112                nanos,
3113            } => format!("duration:{months}:{days}:{nanos}"),
3114        }
3115    }
3116
3117    fn temporal_string_key(s: &str) -> Option<String> {
3118        let fn_name = match classify_temporal(s)? {
3119            uni_common::TemporalType::Date => "DATE",
3120            uni_common::TemporalType::LocalTime => "LOCALTIME",
3121            uni_common::TemporalType::Time => "TIME",
3122            uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3123            uni_common::TemporalType::DateTime => "DATETIME",
3124            uni_common::TemporalType::Duration => "DURATION",
3125        };
3126        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3127            Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3128            _ => None,
3129        }
3130    }
3131
3132    /// Execute aggregate operation: GROUP BY + aggregate functions.
3133    /// Interval for timeout checks in aggregate loops.
3134    pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3135
3136    pub(crate) async fn execute_aggregate(
3137        &self,
3138        rows: Vec<HashMap<String, Value>>,
3139        group_by: &[Expr],
3140        aggregates: &[Expr],
3141        prop_manager: &PropertyManager,
3142        params: &HashMap<String, Value>,
3143        ctx: Option<&QueryContext>,
3144    ) -> Result<Vec<HashMap<String, Value>>> {
3145        // CWE-400: Check timeout before aggregation
3146        if let Some(ctx) = ctx {
3147            ctx.check_timeout()?;
3148        }
3149
3150        let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3151
3152        // Cypher semantics: aggregation without grouping keys returns one row even
3153        // on empty input (e.g. `RETURN count(*)`, `RETURN avg(x)`).
3154        if rows.is_empty() {
3155            if group_by.is_empty() {
3156                let accs = Self::create_accumulators(aggregates);
3157                let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3158                return Ok(vec![row]);
3159            }
3160            return Ok(vec![]);
3161        }
3162
3163        for (idx, row) in rows.into_iter().enumerate() {
3164            // Periodic timeout check during aggregation
3165            if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3166                && let Some(ctx) = ctx
3167            {
3168                ctx.check_timeout()?;
3169            }
3170
3171            let key_vals = self
3172                .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3173                .await?;
3174            // Build a canonical key so grouping follows Cypher value semantics
3175            // (e.g. temporal equality by instant, numeric normalization where applicable).
3176            let key_str = format!(
3177                "[{}]",
3178                key_vals
3179                    .iter()
3180                    .map(Self::canonical_value_key)
3181                    .collect::<Vec<_>>()
3182                    .join(",")
3183            );
3184
3185            let entry = groups
3186                .entry(key_str)
3187                .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3188
3189            self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3190                .await?;
3191        }
3192
3193        let results = groups
3194            .values()
3195            .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3196            .collect();
3197
3198        Ok(results)
3199    }
3200
3201    pub(crate) async fn execute_window(
3202        &self,
3203        mut rows: Vec<HashMap<String, Value>>,
3204        window_exprs: &[Expr],
3205        _prop_manager: &PropertyManager,
3206        _params: &HashMap<String, Value>,
3207        ctx: Option<&QueryContext>,
3208    ) -> Result<Vec<HashMap<String, Value>>> {
3209        // CWE-400: Check timeout before window computation
3210        if let Some(ctx) = ctx {
3211            ctx.check_timeout()?;
3212        }
3213
3214        // If no rows or no window expressions, return as-is
3215        if rows.is_empty() || window_exprs.is_empty() {
3216            return Ok(rows);
3217        }
3218
3219        // Process each window function expression
3220        for window_expr in window_exprs {
3221            // Extract window function details
3222            let Expr::FunctionCall {
3223                name,
3224                args,
3225                window_spec: Some(window_spec),
3226                ..
3227            } = window_expr
3228            else {
3229                return Err(anyhow!(
3230                    "Window expression must be a FunctionCall with OVER clause: {:?}",
3231                    window_expr
3232                ));
3233            };
3234
3235            let name_upper = name.to_uppercase();
3236
3237            // Validate it's a supported window function
3238            if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3239                return Err(anyhow!(
3240                    "Unsupported window function: {}. Supported functions: {}",
3241                    name,
3242                    WINDOW_FUNCTIONS.join(", ")
3243                ));
3244            }
3245
3246            // Build partition groups based on PARTITION BY clause
3247            let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3248
3249            for (row_idx, row) in rows.iter().enumerate() {
3250                // Evaluate partition key
3251                let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3252                    // No partitioning - all rows in one partition
3253                    vec![]
3254                } else {
3255                    window_spec
3256                        .partition_by
3257                        .iter()
3258                        .map(|expr| self.evaluate_simple_expr(expr, row))
3259                        .collect::<Result<Vec<_>>>()?
3260                };
3261
3262                partition_map
3263                    .entry(partition_key)
3264                    .or_default()
3265                    .push(row_idx);
3266            }
3267
3268            // Process each partition
3269            for (_partition_key, row_indices) in partition_map.iter_mut() {
3270                // Sort rows within partition by ORDER BY clause
3271                if !window_spec.order_by.is_empty() {
3272                    row_indices.sort_by(|&a, &b| {
3273                        for sort_item in &window_spec.order_by {
3274                            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3275                            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3276
3277                            if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3278                                let cmp = Executor::compare_values(&va, &vb);
3279                                let cmp = if sort_item.ascending {
3280                                    cmp
3281                                } else {
3282                                    cmp.reverse()
3283                                };
3284                                if cmp != std::cmp::Ordering::Equal {
3285                                    return cmp;
3286                                }
3287                            }
3288                        }
3289                        std::cmp::Ordering::Equal
3290                    });
3291                }
3292
3293                // Compute window function values for this partition
3294                for (position, &row_idx) in row_indices.iter().enumerate() {
3295                    let window_value = match name_upper.as_str() {
3296                        "ROW_NUMBER" => Value::from((position + 1) as i64),
3297                        "RANK" => {
3298                            // RANK: position (1-indexed) of first row in group of tied rows
3299                            let rank = if position == 0 {
3300                                1i64
3301                            } else {
3302                                let prev_row_idx = row_indices[position - 1];
3303                                let same_as_prev = self.rows_have_same_sort_keys(
3304                                    &window_spec.order_by,
3305                                    &rows,
3306                                    row_idx,
3307                                    prev_row_idx,
3308                                );
3309
3310                                if same_as_prev {
3311                                    // Walk backwards to find where this group started
3312                                    let mut group_start = position - 1;
3313                                    while group_start > 0 {
3314                                        let curr_idx = row_indices[group_start];
3315                                        let prev_idx = row_indices[group_start - 1];
3316                                        if !self.rows_have_same_sort_keys(
3317                                            &window_spec.order_by,
3318                                            &rows,
3319                                            curr_idx,
3320                                            prev_idx,
3321                                        ) {
3322                                            break;
3323                                        }
3324                                        group_start -= 1;
3325                                    }
3326                                    (group_start + 1) as i64
3327                                } else {
3328                                    (position + 1) as i64
3329                                }
3330                            };
3331                            Value::from(rank)
3332                        }
3333                        "DENSE_RANK" => {
3334                            // Dense rank: continuous ranking without gaps
3335                            let mut dense_rank = 1i64;
3336                            for i in 0..position {
3337                                let curr_idx = row_indices[i + 1];
3338                                let prev_idx = row_indices[i];
3339                                if !self.rows_have_same_sort_keys(
3340                                    &window_spec.order_by,
3341                                    &rows,
3342                                    curr_idx,
3343                                    prev_idx,
3344                                ) {
3345                                    dense_rank += 1;
3346                                }
3347                            }
3348                            Value::from(dense_rank)
3349                        }
3350                        "LAG" => {
3351                            let (value_expr, offset, default_value) =
3352                                self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3353
3354                            if position >= offset {
3355                                let target_idx = row_indices[position - offset];
3356                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3357                            } else {
3358                                default_value
3359                            }
3360                        }
3361                        "LEAD" => {
3362                            let (value_expr, offset, default_value) =
3363                                self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3364
3365                            if position + offset < row_indices.len() {
3366                                let target_idx = row_indices[position + offset];
3367                                self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3368                            } else {
3369                                default_value
3370                            }
3371                        }
3372                        "NTILE" => {
3373                            // Extract num_buckets argument: NTILE(num_buckets)
3374                            let num_buckets_expr = args.first().ok_or_else(|| {
3375                                anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3376                            })?;
3377                            let num_buckets_val =
3378                                self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3379                            let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3380                                anyhow!(
3381                                    "NTILE argument must be an integer, got: {:?}",
3382                                    num_buckets_val
3383                                )
3384                            })?;
3385
3386                            if num_buckets <= 0 {
3387                                return Err(anyhow!(
3388                                    "NTILE bucket count must be positive, got: {}",
3389                                    num_buckets
3390                                ));
3391                            }
3392
3393                            let num_buckets = num_buckets as usize;
3394                            let partition_size = row_indices.len();
3395
3396                            // Calculate bucket assignment using standard algorithm
3397                            // For N rows and B buckets:
3398                            // - Base size: N / B
3399                            // - Extra rows: N % B (go to first buckets)
3400                            let base_size = partition_size / num_buckets;
3401                            let extra_rows = partition_size % num_buckets;
3402
3403                            // Determine bucket for current row
3404                            let bucket = if position < extra_rows * (base_size + 1) {
3405                                // Row is in one of the larger buckets (first 'extra_rows' buckets)
3406                                position / (base_size + 1) + 1
3407                            } else {
3408                                // Row is in one of the normal-sized buckets
3409                                let adjusted_position = position - extra_rows * (base_size + 1);
3410                                extra_rows + (adjusted_position / base_size) + 1
3411                            };
3412
3413                            Value::from(bucket as i64)
3414                        }
3415                        "FIRST_VALUE" => {
3416                            // FIRST_VALUE returns the value of the expression from the first row in the window frame
3417                            let value_expr = args.first().ok_or_else(|| {
3418                                anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3419                            })?;
3420
3421                            // Get the first row in the partition (after ordering)
3422                            if row_indices.is_empty() {
3423                                Value::Null
3424                            } else {
3425                                let first_idx = row_indices[0];
3426                                self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3427                            }
3428                        }
3429                        "LAST_VALUE" => {
3430                            // LAST_VALUE returns the value of the expression from the last row in the window frame
3431                            let value_expr = args.first().ok_or_else(|| {
3432                                anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3433                            })?;
3434
3435                            // Get the last row in the partition (after ordering)
3436                            if row_indices.is_empty() {
3437                                Value::Null
3438                            } else {
3439                                let last_idx = row_indices[row_indices.len() - 1];
3440                                self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3441                            }
3442                        }
3443                        "NTH_VALUE" => {
3444                            // NTH_VALUE returns the value of the expression from the nth row in the window frame
3445                            if args.len() != 2 {
3446                                return Err(anyhow!(
3447                                    "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3448                                ));
3449                            }
3450
3451                            let value_expr = &args[0];
3452                            let n_expr = &args[1];
3453
3454                            let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3455                            let n = n_val.as_i64().ok_or_else(|| {
3456                                anyhow!(
3457                                    "NTH_VALUE second argument must be an integer, got: {:?}",
3458                                    n_val
3459                                )
3460                            })?;
3461
3462                            if n <= 0 {
3463                                return Err(anyhow!(
3464                                    "NTH_VALUE position must be positive, got: {}",
3465                                    n
3466                                ));
3467                            }
3468
3469                            let nth_index = (n - 1) as usize; // Convert 1-based to 0-based
3470                            if nth_index < row_indices.len() {
3471                                let nth_idx = row_indices[nth_index];
3472                                self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3473                            } else {
3474                                Value::Null
3475                            }
3476                        }
3477                        _ => unreachable!("Window function {} already validated", name),
3478                    };
3479
3480                    // Add window function result to row
3481                    // Use the window expression's string representation as the column name
3482                    let col_name = window_expr.to_string_repr();
3483                    rows[row_idx].insert(col_name, window_value);
3484                }
3485            }
3486        }
3487
3488        Ok(rows)
3489    }
3490
3491    /// Helper to evaluate simple expressions for window function sorting/partitioning.
3492    ///
3493    /// Uses `&self` for consistency with other evaluation methods, though it only
3494    /// recurses for property access.
3495    fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3496        match expr {
3497            Expr::Variable(name) => row
3498                .get(name)
3499                .cloned()
3500                .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3501            Expr::Property(base, prop) => {
3502                let base_val = self.evaluate_simple_expr(base, row)?;
3503                if let Value::Map(map) = base_val {
3504                    map.get(prop)
3505                        .cloned()
3506                        .ok_or_else(|| anyhow!("Property not found: {}", prop))
3507                } else {
3508                    Err(anyhow!("Cannot access property on non-object"))
3509                }
3510            }
3511            Expr::Literal(lit) => Ok(lit.to_value()),
3512            _ => Err(anyhow!(
3513                "Unsupported expression in window function: {:?}",
3514                expr
3515            )),
3516        }
3517    }
3518
3519    /// Check if two rows have matching sort keys for ranking functions.
3520    fn rows_have_same_sort_keys(
3521        &self,
3522        order_by: &[uni_cypher::ast::SortItem],
3523        rows: &[HashMap<String, Value>],
3524        idx_a: usize,
3525        idx_b: usize,
3526    ) -> bool {
3527        order_by.iter().all(|sort_item| {
3528            let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3529            let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3530            matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3531        })
3532    }
3533
3534    /// Extract offset and default value for LAG/LEAD window functions.
3535    fn extract_lag_lead_params<'a>(
3536        &self,
3537        func_name: &str,
3538        args: &'a [Expr],
3539        row: &HashMap<String, Value>,
3540    ) -> Result<(&'a Expr, usize, Value)> {
3541        let value_expr = args.first().ok_or_else(|| {
3542            anyhow!(
3543                "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3544                func_name,
3545                func_name
3546            )
3547        })?;
3548
3549        let offset = if let Some(offset_expr) = args.get(1) {
3550            let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3551            offset_val.as_i64().ok_or_else(|| {
3552                anyhow!(
3553                    "{} offset must be an integer, got: {:?}",
3554                    func_name,
3555                    offset_val
3556                )
3557            })? as usize
3558        } else {
3559            1
3560        };
3561
3562        let default_value = if let Some(default_expr) = args.get(2) {
3563            self.evaluate_simple_expr(default_expr, row)?
3564        } else {
3565            Value::Null
3566        };
3567
3568        Ok((value_expr, offset, default_value))
3569    }
3570
3571    /// Evaluate group-by key expressions for a row.
3572    pub(crate) async fn evaluate_group_keys(
3573        &self,
3574        group_by: &[Expr],
3575        row: &HashMap<String, Value>,
3576        prop_manager: &PropertyManager,
3577        params: &HashMap<String, Value>,
3578        ctx: Option<&QueryContext>,
3579    ) -> Result<Vec<Value>> {
3580        let mut key_vals = Vec::new();
3581        for expr in group_by {
3582            key_vals.push(
3583                self.evaluate_expr(expr, row, prop_manager, params, ctx)
3584                    .await?,
3585            );
3586        }
3587        Ok(key_vals)
3588    }
3589
3590    /// Update accumulators with values from the current row.
3591    pub(crate) async fn update_accumulators(
3592        &self,
3593        accs: &mut [Accumulator],
3594        aggregates: &[Expr],
3595        row: &HashMap<String, Value>,
3596        prop_manager: &PropertyManager,
3597        params: &HashMap<String, Value>,
3598        ctx: Option<&QueryContext>,
3599    ) -> Result<()> {
3600        for (i, agg_expr) in aggregates.iter().enumerate() {
3601            if let Expr::FunctionCall { args, .. } = agg_expr {
3602                let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3603                let val = if is_wildcard {
3604                    Value::Null
3605                } else {
3606                    self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3607                        .await?
3608                };
3609                accs[i].update(&val, is_wildcard);
3610            }
3611        }
3612        Ok(())
3613    }
3614
3615    /// Execute sort operation with ORDER BY clauses.
3616    pub(crate) async fn execute_recursive_cte(
3617        &self,
3618        cte_name: &str,
3619        initial: LogicalPlan,
3620        recursive: LogicalPlan,
3621        prop_manager: &PropertyManager,
3622        params: &HashMap<String, Value>,
3623        ctx: Option<&QueryContext>,
3624    ) -> Result<Vec<HashMap<String, Value>>> {
3625        use std::collections::HashSet;
3626
3627        // Helper to create a stable key for cycle detection.
3628        // Uses sorted keys to ensure consistent ordering.
3629        pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3630            let mut pairs: Vec<_> = row.iter().collect();
3631            pairs.sort_by(|a, b| a.0.cmp(b.0));
3632            format!("{pairs:?}")
3633        }
3634
3635        // 1. Execute Anchor
3636        let mut working_table = self
3637            .execute_subplan(initial, prop_manager, params, ctx)
3638            .await?;
3639        let mut result_table = working_table.clone();
3640
3641        // Track seen rows for cycle detection
3642        let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3643
3644        // 2. Loop
3645        // Safety: Max iterations to prevent infinite loop
3646        // TODO: expose this via UniConfig for user control
3647        let max_iterations = 1000;
3648        for _iteration in 0..max_iterations {
3649            // CWE-400: Check timeout at each iteration to prevent resource exhaustion
3650            if let Some(ctx) = ctx {
3651                ctx.check_timeout()?;
3652            }
3653
3654            if working_table.is_empty() {
3655                break;
3656            }
3657
3658            // Bind working table to CTE name in params
3659            let working_val = Value::List(
3660                working_table
3661                    .iter()
3662                    .map(|row| {
3663                        if row.len() == 1 {
3664                            row.values().next().unwrap().clone()
3665                        } else {
3666                            Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3667                        }
3668                    })
3669                    .collect(),
3670            );
3671
3672            let mut next_params = params.clone();
3673            next_params.insert(cte_name.to_string(), working_val);
3674
3675            // Execute recursive part
3676            let next_result = self
3677                .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3678                .await?;
3679
3680            if next_result.is_empty() {
3681                break;
3682            }
3683
3684            // Filter out already-seen rows (cycle detection)
3685            let new_rows: Vec<_> = next_result
3686                .into_iter()
3687                .filter(|row| {
3688                    let key = row_key(row);
3689                    seen.insert(key) // Returns false if already present
3690                })
3691                .collect();
3692
3693            if new_rows.is_empty() {
3694                // All results were cycles - terminate
3695                break;
3696            }
3697
3698            result_table.extend(new_rows.clone());
3699            working_table = new_rows;
3700        }
3701
3702        // Output accumulated results as a variable
3703        let final_list = Value::List(
3704            result_table
3705                .into_iter()
3706                .map(|row| {
3707                    // If the CTE returns a single column and we want to treat it as a list of values?
3708                    // E.g. WITH RECURSIVE r AS (RETURN 1 UNION RETURN 2) -> [1, 2] or [{expr:1}, {expr:2}]?
3709                    // Cypher LISTs usually contain values.
3710                    // If the row has 1 column, maybe unwrap?
3711                    // But SQL CTEs are tables.
3712                    // Let's stick to List<Map> for consistency with how we pass it in.
3713                    // UNLESS the user extracts it.
3714                    // My parser test `MATCH (n) WHERE n IN hierarchy` implies `hierarchy` contains Nodes.
3715                    // If `row` contains `root` (Node), then `hierarchy` should be `[Node, Node]`.
3716                    // If row has multiple cols, `[ {a:1, b:2}, ... ]`.
3717                    // If row has 1 col, users expect `[val, val]`.
3718                    if row.len() == 1 {
3719                        row.values().next().unwrap().clone()
3720                    } else {
3721                        Value::Map(row.into_iter().collect())
3722                    }
3723                })
3724                .collect(),
3725        );
3726
3727        let mut final_row = HashMap::new();
3728        final_row.insert(cte_name.to_string(), final_list);
3729        Ok(vec![final_row])
3730    }
3731
3732    /// Interval for timeout checks in sort loops.
3733    const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3734
3735    pub(crate) async fn execute_sort(
3736        &self,
3737        rows: Vec<HashMap<String, Value>>,
3738        order_by: &[uni_cypher::ast::SortItem],
3739        prop_manager: &PropertyManager,
3740        params: &HashMap<String, Value>,
3741        ctx: Option<&QueryContext>,
3742    ) -> Result<Vec<HashMap<String, Value>>> {
3743        // CWE-400: Check timeout before potentially expensive sort
3744        if let Some(ctx) = ctx {
3745            ctx.check_timeout()?;
3746        }
3747
3748        let mut rows_with_keys = Vec::with_capacity(rows.len());
3749        for (idx, row) in rows.into_iter().enumerate() {
3750            // Periodic timeout check during key extraction
3751            if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3752                && let Some(ctx) = ctx
3753            {
3754                ctx.check_timeout()?;
3755            }
3756
3757            let mut keys = Vec::new();
3758            for item in order_by {
3759                let val = row
3760                    .get(&item.expr.to_string_repr())
3761                    .cloned()
3762                    .unwrap_or(Value::Null);
3763                let val = if val.is_null() {
3764                    self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3765                        .await
3766                        .unwrap_or(Value::Null)
3767                } else {
3768                    val
3769                };
3770                keys.push(val);
3771            }
3772            rows_with_keys.push((row, keys));
3773        }
3774
3775        // Check timeout again before synchronous sort (can't be interrupted)
3776        if let Some(ctx) = ctx {
3777            ctx.check_timeout()?;
3778        }
3779
3780        rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3781
3782        Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3783    }
3784
3785    /// Create accumulators for aggregate expressions.
3786    pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3787        aggregates
3788            .iter()
3789            .map(|expr| {
3790                if let Expr::FunctionCall { name, distinct, .. } = expr {
3791                    Accumulator::new(name, *distinct)
3792                } else {
3793                    Accumulator::new("COUNT", false)
3794                }
3795            })
3796            .collect()
3797    }
3798
3799    /// Build result row from group-by keys and accumulators.
3800    pub(crate) fn build_aggregate_result(
3801        group_by: &[Expr],
3802        aggregates: &[Expr],
3803        key_vals: &[Value],
3804        accs: &[Accumulator],
3805    ) -> HashMap<String, Value> {
3806        let mut res_row = HashMap::new();
3807        for (i, expr) in group_by.iter().enumerate() {
3808            res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3809        }
3810        for (i, expr) in aggregates.iter().enumerate() {
3811            // Use aggregate_column_name to ensure consistency with planner
3812            let col_name = crate::query::planner::aggregate_column_name(expr);
3813            res_row.insert(col_name, accs[i].finish());
3814        }
3815        res_row
3816    }
3817
3818    /// Compare and return ordering for sort operation.
3819    pub(crate) fn compare_sort_keys(
3820        a_keys: &[Value],
3821        b_keys: &[Value],
3822        order_by: &[uni_cypher::ast::SortItem],
3823    ) -> std::cmp::Ordering {
3824        for (i, item) in order_by.iter().enumerate() {
3825            let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3826            if order != std::cmp::Ordering::Equal {
3827                return if item.ascending {
3828                    order
3829                } else {
3830                    order.reverse()
3831                };
3832            }
3833        }
3834        std::cmp::Ordering::Equal
3835    }
3836
3837    /// Executes BACKUP command to local or cloud storage.
3838    ///
3839    /// Supports both local filesystem paths and cloud URLs (s3://, gs://, az://).
3840    pub(crate) async fn execute_backup(
3841        &self,
3842        destination: &str,
3843        _options: &HashMap<String, Value>,
3844    ) -> Result<Vec<HashMap<String, Value>>> {
3845        // 1. Flush L0
3846        if let Some(writer_arc) = &self.writer {
3847            let mut writer = writer_arc.write().await;
3848            writer.flush_to_l1(None).await?;
3849        }
3850
3851        // 2. Snapshot
3852        let snapshot_manager = self.storage.snapshot_manager();
3853        let snapshot = snapshot_manager
3854            .load_latest_snapshot()
3855            .await?
3856            .ok_or_else(|| anyhow!("No snapshot found"))?;
3857
3858        // 3. Copy files - cloud or local path
3859        if is_cloud_url(destination) {
3860            self.backup_to_cloud(destination, &snapshot.snapshot_id)
3861                .await?;
3862        } else {
3863            // Validate local destination path against sandbox
3864            let validated_dest = self.validate_path(destination)?;
3865            self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3866                .await?;
3867        }
3868
3869        let mut res = HashMap::new();
3870        res.insert(
3871            "status".to_string(),
3872            Value::String("Backup completed".to_string()),
3873        );
3874        res.insert(
3875            "snapshot_id".to_string(),
3876            Value::String(snapshot.snapshot_id),
3877        );
3878        Ok(vec![res])
3879    }
3880
3881    /// Backs up database to a local filesystem destination.
3882    async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3883        let source_path = std::path::Path::new(self.storage.base_path());
3884
3885        if !dest_path.exists() {
3886            std::fs::create_dir_all(dest_path)?;
3887        }
3888
3889        // Recursive copy (local to local)
3890        if source_path.exists() {
3891            Self::copy_dir_all(source_path, dest_path)?;
3892        }
3893
3894        // Copy schema to destination/catalog/schema.json
3895        let schema_manager = self.storage.schema_manager();
3896        let dest_catalog = dest_path.join("catalog");
3897        if !dest_catalog.exists() {
3898            std::fs::create_dir_all(&dest_catalog)?;
3899        }
3900
3901        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3902        std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3903
3904        Ok(())
3905    }
3906
3907    /// Backs up database to a cloud storage destination.
3908    ///
3909    /// Streams data from source to destination, supporting cross-cloud backups.
3910    async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3911        use object_store::ObjectStore;
3912        use object_store::local::LocalFileSystem;
3913        use object_store::path::Path as ObjPath;
3914
3915        let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3916        let source_path = std::path::Path::new(self.storage.base_path());
3917
3918        // Create local store for source, coerced to dyn ObjectStore
3919        let src_store: Arc<dyn ObjectStore> =
3920            Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3921
3922        // Copy catalog/ directory
3923        let catalog_src = ObjPath::from("catalog");
3924        let catalog_dst = if dest_prefix.as_ref().is_empty() {
3925            ObjPath::from("catalog")
3926        } else {
3927            ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3928        };
3929        copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3930
3931        // Copy storage/ directory
3932        let storage_src = ObjPath::from("storage");
3933        let storage_dst = if dest_prefix.as_ref().is_empty() {
3934            ObjPath::from("storage")
3935        } else {
3936            ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3937        };
3938        copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3939
3940        // Ensure schema is present at canonical catalog location.
3941        let schema_manager = self.storage.schema_manager();
3942        let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3943        let schema_path = if dest_prefix.as_ref().is_empty() {
3944            ObjPath::from("catalog/schema.json")
3945        } else {
3946            ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3947        };
3948        dest_store
3949            .put(&schema_path, bytes::Bytes::from(schema_content).into())
3950            .await?;
3951
3952        Ok(())
3953    }
3954
3955    /// Maximum directory depth for backup operations.
3956    ///
3957    /// **CWE-674 (Uncontrolled Recursion)**: Prevents stack overflow from
3958    /// excessively deep directory structures.
3959    const MAX_BACKUP_DEPTH: usize = 100;
3960
3961    /// Maximum file count for backup operations.
3962    ///
3963    /// **CWE-400 (Resource Consumption)**: Prevents disk exhaustion and
3964    /// long-running operations from malicious or unexpectedly large directories.
3965    const MAX_BACKUP_FILES: usize = 100_000;
3966
3967    /// Recursively copies a directory with security limits.
3968    ///
3969    /// # Security
3970    ///
3971    /// - **CWE-674**: Depth limit prevents stack overflow
3972    /// - **CWE-400**: File count limit prevents resource exhaustion
3973    /// - **Symlink handling**: Symlinks are skipped to prevent loop attacks
3974    pub(crate) fn copy_dir_all(
3975        src: &std::path::Path,
3976        dst: &std::path::Path,
3977    ) -> std::io::Result<()> {
3978        let mut file_count = 0usize;
3979        Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3980    }
3981
3982    /// Internal implementation with depth and file count tracking.
3983    pub(crate) fn copy_dir_all_impl(
3984        src: &std::path::Path,
3985        dst: &std::path::Path,
3986        depth: usize,
3987        file_count: &mut usize,
3988    ) -> std::io::Result<()> {
3989        if depth >= Self::MAX_BACKUP_DEPTH {
3990            return Err(std::io::Error::new(
3991                std::io::ErrorKind::InvalidInput,
3992                format!(
3993                    "Maximum backup depth {} exceeded at {:?}",
3994                    Self::MAX_BACKUP_DEPTH,
3995                    src
3996                ),
3997            ));
3998        }
3999
4000        std::fs::create_dir_all(dst)?;
4001
4002        for entry in std::fs::read_dir(src)? {
4003            if *file_count >= Self::MAX_BACKUP_FILES {
4004                return Err(std::io::Error::new(
4005                    std::io::ErrorKind::InvalidInput,
4006                    format!(
4007                        "Maximum backup file count {} exceeded",
4008                        Self::MAX_BACKUP_FILES
4009                    ),
4010                ));
4011            }
4012            *file_count += 1;
4013
4014            let entry = entry?;
4015            let metadata = entry.metadata()?;
4016
4017            // Skip symlinks to prevent loops and traversal attacks
4018            if metadata.file_type().is_symlink() {
4019                // Silently skip - logging would require tracing dependency
4020                continue;
4021            }
4022
4023            let dst_path = dst.join(entry.file_name());
4024            if metadata.is_dir() {
4025                Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4026            } else {
4027                std::fs::copy(entry.path(), dst_path)?;
4028            }
4029        }
4030        Ok(())
4031    }
4032
4033    pub(crate) async fn execute_copy(
4034        &self,
4035        target: &str,
4036        source: &str,
4037        options: &HashMap<String, Value>,
4038        prop_manager: &PropertyManager,
4039    ) -> Result<Vec<HashMap<String, Value>>> {
4040        let format = options
4041            .get("format")
4042            .and_then(|v| v.as_str())
4043            .unwrap_or_else(|| {
4044                if source.ends_with(".parquet") {
4045                    "parquet"
4046                } else {
4047                    "csv"
4048                }
4049            });
4050
4051        match format.to_lowercase().as_str() {
4052            "csv" => self.execute_csv_import(target, source, options).await,
4053            "parquet" => {
4054                self.execute_parquet_import(target, source, options, prop_manager)
4055                    .await
4056            }
4057            _ => Err(anyhow!("Unsupported format: {}", format)),
4058        }
4059    }
4060
4061    pub(crate) async fn execute_csv_import(
4062        &self,
4063        target: &str,
4064        source: &str,
4065        options: &HashMap<String, Value>,
4066    ) -> Result<Vec<HashMap<String, Value>>> {
4067        // Validate source path against sandbox
4068        let validated_source = self.validate_path(source)?;
4069
4070        let writer_lock = self
4071            .writer
4072            .as_ref()
4073            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4074
4075        let schema = self.storage.schema_manager().schema();
4076
4077        // 1. Determine if target is Label or EdgeType
4078        let label_meta = schema.labels.get(target);
4079        let edge_meta = schema.edge_types.get(target);
4080
4081        if label_meta.is_none() && edge_meta.is_none() {
4082            return Err(anyhow!("Target '{}' not found in schema", target));
4083        }
4084
4085        // 2. Open CSV
4086        let delimiter_str = options
4087            .get("delimiter")
4088            .and_then(|v| v.as_str())
4089            .unwrap_or(",");
4090        let delimiter = if delimiter_str.is_empty() {
4091            b','
4092        } else {
4093            delimiter_str.as_bytes()[0]
4094        };
4095        let has_header = options
4096            .get("header")
4097            .and_then(|v| v.as_bool())
4098            .unwrap_or(true);
4099
4100        let mut rdr = csv::ReaderBuilder::new()
4101            .delimiter(delimiter)
4102            .has_headers(has_header)
4103            .from_path(&validated_source)?;
4104
4105        let headers = rdr.headers()?.clone();
4106        let mut count = 0;
4107
4108        let mut writer = writer_lock.write().await;
4109
4110        if label_meta.is_some() {
4111            let target_props = schema
4112                .properties
4113                .get(target)
4114                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4115
4116            for result in rdr.records() {
4117                let record = result?;
4118                let mut props = HashMap::new();
4119
4120                for (i, header) in headers.iter().enumerate() {
4121                    if let Some(val_str) = record.get(i)
4122                        && let Some(prop_meta) = target_props.get(header)
4123                    {
4124                        let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4125                        props.insert(header.to_string(), val);
4126                    }
4127                }
4128
4129                let vid = writer.next_vid().await?;
4130                writer
4131                    .insert_vertex_with_labels(vid, props, &[target.to_string()])
4132                    .await?;
4133                count += 1;
4134            }
4135        } else if let Some(meta) = edge_meta {
4136            let type_id = meta.id;
4137            let target_props = schema
4138                .properties
4139                .get(target)
4140                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4141
4142            // For edges, we need src and dst VIDs.
4143            // Expecting columns '_src' and '_dst' or as specified in options.
4144            let src_col = options
4145                .get("src_col")
4146                .and_then(|v| v.as_str())
4147                .unwrap_or("_src");
4148            let dst_col = options
4149                .get("dst_col")
4150                .and_then(|v| v.as_str())
4151                .unwrap_or("_dst");
4152
4153            for result in rdr.records() {
4154                let record = result?;
4155                let mut props = HashMap::new();
4156                let mut src_vid = None;
4157                let mut dst_vid = None;
4158
4159                for (i, header) in headers.iter().enumerate() {
4160                    if let Some(val_str) = record.get(i) {
4161                        if header == src_col {
4162                            src_vid =
4163                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4164                        } else if header == dst_col {
4165                            dst_vid =
4166                                Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4167                        } else if let Some(prop_meta) = target_props.get(header) {
4168                            let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4169                            props.insert(header.to_string(), val);
4170                        }
4171                    }
4172                }
4173
4174                let src =
4175                    src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4176                let dst = dst_vid
4177                    .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4178
4179                let eid = writer.next_eid(type_id).await?;
4180                writer
4181                    .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4182                    .await?;
4183                count += 1;
4184            }
4185        }
4186
4187        let mut res = HashMap::new();
4188        res.insert("count".to_string(), Value::Int(count as i64));
4189        Ok(vec![res])
4190    }
4191
4192    /// Imports data from Parquet file to a label or edge type.
4193    ///
4194    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
4195    pub(crate) async fn execute_parquet_import(
4196        &self,
4197        target: &str,
4198        source: &str,
4199        options: &HashMap<String, Value>,
4200        _prop_manager: &PropertyManager,
4201    ) -> Result<Vec<HashMap<String, Value>>> {
4202        let writer_lock = self
4203            .writer
4204            .as_ref()
4205            .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4206
4207        let schema = self.storage.schema_manager().schema();
4208
4209        // 1. Determine if target is Label or EdgeType
4210        let label_meta = schema.labels.get(target);
4211        let edge_meta = schema.edge_types.get(target);
4212
4213        if label_meta.is_none() && edge_meta.is_none() {
4214            return Err(anyhow!("Target '{}' not found in schema", target));
4215        }
4216
4217        // 2. Open Parquet - support both local and cloud URLs
4218        let reader = if is_cloud_url(source) {
4219            self.open_parquet_from_cloud(source).await?
4220        } else {
4221            // Validate local source path against sandbox
4222            let validated_source = self.validate_path(source)?;
4223            let file = std::fs::File::open(&validated_source)?;
4224            let builder =
4225                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4226            builder.build()?
4227        };
4228        let mut reader = reader;
4229
4230        let mut count = 0;
4231        let mut writer = writer_lock.write().await;
4232
4233        if label_meta.is_some() {
4234            let target_props = schema
4235                .properties
4236                .get(target)
4237                .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4238
4239            for batch in reader.by_ref() {
4240                let batch = batch?;
4241                for row in 0..batch.num_rows() {
4242                    let mut props = HashMap::new();
4243                    for field in batch.schema().fields() {
4244                        let name = field.name();
4245                        if target_props.contains_key(name) {
4246                            let col = batch.column_by_name(name).unwrap();
4247                            if !col.is_null(row) {
4248                                // Look up Uni DataType from schema for proper DateTime/Time decoding
4249                                let data_type = target_props.get(name).map(|pm| &pm.r#type);
4250                                let val =
4251                                    arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4252                                props.insert(name.clone(), val);
4253                            }
4254                        }
4255                    }
4256                    let vid = writer.next_vid().await?;
4257                    writer
4258                        .insert_vertex_with_labels(vid, props, &[target.to_string()])
4259                        .await?;
4260                    count += 1;
4261                }
4262            }
4263        } else if let Some(meta) = edge_meta {
4264            let type_id = meta.id;
4265            let target_props = schema
4266                .properties
4267                .get(target)
4268                .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4269
4270            let src_col = options
4271                .get("src_col")
4272                .and_then(|v| v.as_str())
4273                .unwrap_or("_src");
4274            let dst_col = options
4275                .get("dst_col")
4276                .and_then(|v| v.as_str())
4277                .unwrap_or("_dst");
4278
4279            for batch in reader {
4280                let batch = batch?;
4281                for row in 0..batch.num_rows() {
4282                    let mut props = HashMap::new();
4283                    let mut src_vid = None;
4284                    let mut dst_vid = None;
4285
4286                    for field in batch.schema().fields() {
4287                        let name = field.name();
4288                        let col = batch.column_by_name(name).unwrap();
4289                        if col.is_null(row) {
4290                            continue;
4291                        }
4292
4293                        if name == src_col {
4294                            let val = Self::arrow_to_value(col.as_ref(), row);
4295                            src_vid = Some(Self::vid_from_value(&val)?);
4296                        } else if name == dst_col {
4297                            let val = Self::arrow_to_value(col.as_ref(), row);
4298                            dst_vid = Some(Self::vid_from_value(&val)?);
4299                        } else if let Some(pm) = target_props.get(name) {
4300                            // Look up Uni DataType from schema for proper DateTime/Time decoding
4301                            let val =
4302                                arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4303                            props.insert(name.clone(), val);
4304                        }
4305                    }
4306
4307                    let src = src_vid
4308                        .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4309                    let dst = dst_vid.ok_or_else(|| {
4310                        anyhow!("Missing destination VID in column '{}'", dst_col)
4311                    })?;
4312
4313                    let eid = writer.next_eid(type_id).await?;
4314                    writer
4315                        .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4316                        .await?;
4317                    count += 1;
4318                }
4319            }
4320        }
4321
4322        let mut res = HashMap::new();
4323        res.insert("count".to_string(), Value::Int(count as i64));
4324        Ok(vec![res])
4325    }
4326
4327    /// Opens a Parquet file from a cloud URL.
4328    ///
4329    /// Downloads the file to memory and creates a Parquet reader.
4330    async fn open_parquet_from_cloud(
4331        &self,
4332        source_url: &str,
4333    ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4334        use object_store::ObjectStore;
4335
4336        let (store, path) = build_store_from_url(source_url)?;
4337
4338        // Download file contents
4339        let bytes = store.get(&path).await?.bytes().await?;
4340
4341        // Create a Parquet reader from the bytes
4342        let reader = bytes::Bytes::from(bytes.to_vec());
4343        let builder =
4344            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4345        Ok(builder.build()?)
4346    }
4347
4348    pub(crate) async fn scan_edge_type(
4349        &self,
4350        edge_type: &str,
4351        ctx: Option<&QueryContext>,
4352    ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4353        let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4354
4355        // 1. Scan L2 (Base)
4356        self.scan_edge_type_l2(edge_type, &mut edges).await?;
4357
4358        // 2. Scan L1 (Delta)
4359        self.scan_edge_type_l1(edge_type, &mut edges).await?;
4360
4361        // 3. Scan L0 (Memory) and filter tombstoned vertices
4362        if let Some(ctx) = ctx {
4363            self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4364            self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4365        }
4366
4367        Ok(edges
4368            .into_iter()
4369            .map(|(eid, (src, dst))| (eid, src, dst))
4370            .collect())
4371    }
4372
4373    /// Scan L2 (base) storage for edges of a given type.
4374    ///
4375    /// Note: Edges are now stored exclusively in delta datasets (L1) via LanceDB.
4376    /// This L2 scan will typically find no data.
4377    pub(crate) async fn scan_edge_type_l2(
4378        &self,
4379        _edge_type: &str,
4380        _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4381    ) -> Result<()> {
4382        // Edges are now stored in delta datasets (L1) via LanceDB.
4383        // Legacy L2 base edge storage is no longer used.
4384        Ok(())
4385    }
4386
4387    /// Scan L1 (delta) storage for edges of a given type.
4388    pub(crate) async fn scan_edge_type_l1(
4389        &self,
4390        edge_type: &str,
4391        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4392    ) -> Result<()> {
4393        use futures::TryStreamExt;
4394        use lancedb::query::{ExecutableQuery, QueryBase, Select};
4395
4396        if let Ok(ds) = self.storage.delta_dataset(edge_type, "fwd") {
4397            let lancedb_store = self.storage.lancedb_store();
4398            if let Ok(table) = ds.open_lancedb(lancedb_store).await {
4399                let query = table.query().select(Select::Columns(vec![
4400                    "eid".into(),
4401                    "src_vid".into(),
4402                    "dst_vid".into(),
4403                    "op".into(),
4404                    "_version".into(),
4405                ]));
4406
4407                if let Ok(stream) = query.execute().await {
4408                    let batches: Vec<arrow_array::RecordBatch> =
4409                        stream.try_collect().await.unwrap_or_default();
4410
4411                    // Collect ops with versions: eid -> (version, op, src, dst)
4412                    let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4413                        HashMap::new();
4414
4415                    for batch in batches {
4416                        self.process_delta_batch(&batch, &mut versioned_ops)?;
4417                    }
4418
4419                    // Apply the winning ops
4420                    for (eid, (_, op, src, dst)) in versioned_ops {
4421                        if op == 0 {
4422                            edges.insert(eid, (src, dst));
4423                        } else if op == 1 {
4424                            edges.remove(&eid);
4425                        }
4426                    }
4427                }
4428            }
4429        }
4430        Ok(())
4431    }
4432
4433    /// Process a delta batch, tracking versioned operations.
4434    pub(crate) fn process_delta_batch(
4435        &self,
4436        batch: &arrow_array::RecordBatch,
4437        versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4438    ) -> Result<()> {
4439        use arrow_array::UInt64Array;
4440        let eid_col = batch
4441            .column_by_name("eid")
4442            .ok_or(anyhow!("Missing eid"))?
4443            .as_any()
4444            .downcast_ref::<UInt64Array>()
4445            .ok_or(anyhow!("Invalid eid"))?;
4446        let src_col = batch
4447            .column_by_name("src_vid")
4448            .ok_or(anyhow!("Missing src_vid"))?
4449            .as_any()
4450            .downcast_ref::<UInt64Array>()
4451            .ok_or(anyhow!("Invalid src_vid"))?;
4452        let dst_col = batch
4453            .column_by_name("dst_vid")
4454            .ok_or(anyhow!("Missing dst_vid"))?
4455            .as_any()
4456            .downcast_ref::<UInt64Array>()
4457            .ok_or(anyhow!("Invalid dst_vid"))?;
4458        let op_col = batch
4459            .column_by_name("op")
4460            .ok_or(anyhow!("Missing op"))?
4461            .as_any()
4462            .downcast_ref::<arrow_array::UInt8Array>()
4463            .ok_or(anyhow!("Invalid op"))?;
4464        let version_col = batch
4465            .column_by_name("_version")
4466            .ok_or(anyhow!("Missing _version"))?
4467            .as_any()
4468            .downcast_ref::<UInt64Array>()
4469            .ok_or(anyhow!("Invalid _version"))?;
4470
4471        for i in 0..batch.num_rows() {
4472            let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4473            let version = version_col.value(i);
4474            let op = op_col.value(i);
4475            let src = Vid::from(src_col.value(i));
4476            let dst = Vid::from(dst_col.value(i));
4477
4478            match versioned_ops.entry(eid) {
4479                std::collections::hash_map::Entry::Vacant(e) => {
4480                    e.insert((version, op, src, dst));
4481                }
4482                std::collections::hash_map::Entry::Occupied(mut e) => {
4483                    if version > e.get().0 {
4484                        e.insert((version, op, src, dst));
4485                    }
4486                }
4487            }
4488        }
4489        Ok(())
4490    }
4491
4492    /// Scan L0 (memory) buffers for edges of a given type.
4493    pub(crate) fn scan_edge_type_l0(
4494        &self,
4495        edge_type: &str,
4496        ctx: &QueryContext,
4497        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4498    ) {
4499        let schema = self.storage.schema_manager().schema();
4500        let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4501
4502        if let Some(type_id) = type_id {
4503            // Main L0
4504            self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4505
4506            // Transaction L0
4507            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4508                self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4509            }
4510
4511            // Pending flush L0s
4512            for pending_l0_arc in &ctx.pending_flush_l0s {
4513                self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4514            }
4515        }
4516    }
4517
4518    /// Scan a single L0 buffer for edges and apply tombstones.
4519    pub(crate) fn scan_single_l0(
4520        &self,
4521        l0: &uni_store::runtime::L0Buffer,
4522        type_id: u32,
4523        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4524    ) {
4525        for edge_entry in l0.graph.edges() {
4526            if edge_entry.edge_type == type_id {
4527                edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4528            }
4529        }
4530        // Process Tombstones
4531        let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4532        for eid in eids_to_check {
4533            if l0.is_tombstoned(eid) {
4534                edges.remove(&eid);
4535            }
4536        }
4537    }
4538
4539    /// Filter out edges connected to tombstoned vertices.
4540    pub(crate) fn filter_tombstoned_vertex_edges(
4541        &self,
4542        ctx: &QueryContext,
4543        edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4544    ) {
4545        let l0 = ctx.l0.read();
4546        let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4547
4548        // Include tx_l0 vertex tombstones if present
4549        if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4550            let tx_l0 = tx_l0_arc.read();
4551            all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4552        }
4553
4554        // Include pending flush L0 vertex tombstones
4555        for pending_l0_arc in &ctx.pending_flush_l0s {
4556            let pending_l0 = pending_l0_arc.read();
4557            all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4558        }
4559
4560        edges.retain(|_, (src, dst)| {
4561            !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4562        });
4563    }
4564
4565    /// Execute a projection operation.
4566    pub(crate) async fn execute_project(
4567        &self,
4568        input_rows: Vec<HashMap<String, Value>>,
4569        projections: &[(Expr, Option<String>)],
4570        prop_manager: &PropertyManager,
4571        params: &HashMap<String, Value>,
4572        ctx: Option<&QueryContext>,
4573    ) -> Result<Vec<HashMap<String, Value>>> {
4574        let mut results = Vec::new();
4575        for m in input_rows {
4576            let mut row = HashMap::new();
4577            for (expr, alias) in projections {
4578                let val = self
4579                    .evaluate_expr(expr, &m, prop_manager, params, ctx)
4580                    .await?;
4581                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4582                row.insert(name, val);
4583            }
4584            results.push(row);
4585        }
4586        Ok(results)
4587    }
4588
4589    /// Execute an UNWIND operation.
4590    pub(crate) async fn execute_unwind(
4591        &self,
4592        input_rows: Vec<HashMap<String, Value>>,
4593        expr: &Expr,
4594        variable: &str,
4595        prop_manager: &PropertyManager,
4596        params: &HashMap<String, Value>,
4597        ctx: Option<&QueryContext>,
4598    ) -> Result<Vec<HashMap<String, Value>>> {
4599        let mut results = Vec::new();
4600        for row in input_rows {
4601            let val = self
4602                .evaluate_expr(expr, &row, prop_manager, params, ctx)
4603                .await?;
4604            if let Value::List(items) = val {
4605                for item in items {
4606                    let mut new_row = row.clone();
4607                    new_row.insert(variable.to_string(), item);
4608                    results.push(new_row);
4609                }
4610            }
4611        }
4612        Ok(results)
4613    }
4614
4615    /// Execute an APPLY (correlated subquery) operation.
4616    pub(crate) async fn execute_apply(
4617        &self,
4618        input_rows: Vec<HashMap<String, Value>>,
4619        subquery: &LogicalPlan,
4620        input_filter: Option<&Expr>,
4621        prop_manager: &PropertyManager,
4622        params: &HashMap<String, Value>,
4623        ctx: Option<&QueryContext>,
4624    ) -> Result<Vec<HashMap<String, Value>>> {
4625        let mut filtered_rows = input_rows;
4626
4627        if let Some(filter) = input_filter {
4628            let mut filtered = Vec::new();
4629            for row in filtered_rows {
4630                let res = self
4631                    .evaluate_expr(filter, &row, prop_manager, params, ctx)
4632                    .await?;
4633                if res.as_bool().unwrap_or(false) {
4634                    filtered.push(row);
4635                }
4636            }
4637            filtered_rows = filtered;
4638        }
4639
4640        // Handle empty input: execute subquery once with empty context
4641        // This is critical for standalone CALL statements at the beginning of a query
4642        if filtered_rows.is_empty() {
4643            let sub_rows = self
4644                .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4645                .await?;
4646            return Ok(sub_rows);
4647        }
4648
4649        let mut results = Vec::new();
4650        for row in filtered_rows {
4651            let mut sub_params = params.clone();
4652            sub_params.extend(row.clone());
4653
4654            let sub_rows = self
4655                .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4656                .await?;
4657
4658            for sub_row in sub_rows {
4659                let mut new_row = row.clone();
4660                new_row.extend(sub_row);
4661                results.push(new_row);
4662            }
4663        }
4664        Ok(results)
4665    }
4666
4667    /// Execute SHOW INDEXES command.
4668    pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4669        let schema = self.storage.schema_manager().schema();
4670        let mut rows = Vec::new();
4671        for idx in &schema.indexes {
4672            let (name, type_str, details) = match idx {
4673                uni_common::core::schema::IndexDefinition::Vector(c) => (
4674                    c.name.clone(),
4675                    "VECTOR",
4676                    format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4677                ),
4678                uni_common::core::schema::IndexDefinition::FullText(c) => (
4679                    c.name.clone(),
4680                    "FULLTEXT",
4681                    format!("on {}:{:?}", c.label, c.properties),
4682                ),
4683                uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4684                    cfg.name.clone(),
4685                    "SCALAR",
4686                    format!(":{}({:?})", cfg.label, cfg.properties),
4687                ),
4688                _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4689            };
4690
4691            if let Some(f) = filter
4692                && f != type_str
4693            {
4694                continue;
4695            }
4696
4697            let mut row = HashMap::new();
4698            row.insert("name".to_string(), Value::String(name));
4699            row.insert("type".to_string(), Value::String(type_str.to_string()));
4700            row.insert("details".to_string(), Value::String(details));
4701            rows.push(row);
4702        }
4703        rows
4704    }
4705
4706    pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4707        let mut row = HashMap::new();
4708        row.insert("name".to_string(), Value::String("uni".to_string()));
4709        // Could add storage path, etc.
4710        vec![row]
4711    }
4712
4713    pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4714        // Placeholder as we don't easy access to config struct from here
4715        vec![]
4716    }
4717
4718    pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4719        let snapshot = self
4720            .storage
4721            .snapshot_manager()
4722            .load_latest_snapshot()
4723            .await?;
4724        let mut results = Vec::new();
4725
4726        if let Some(snap) = snapshot {
4727            for (label, s) in &snap.vertices {
4728                let mut row = HashMap::new();
4729                row.insert("type".to_string(), Value::String("Label".to_string()));
4730                row.insert("name".to_string(), Value::String(label.clone()));
4731                row.insert("count".to_string(), Value::Int(s.count as i64));
4732                results.push(row);
4733            }
4734            for (edge, s) in &snap.edges {
4735                let mut row = HashMap::new();
4736                row.insert("type".to_string(), Value::String("Edge".to_string()));
4737                row.insert("name".to_string(), Value::String(edge.clone()));
4738                row.insert("count".to_string(), Value::Int(s.count as i64));
4739                results.push(row);
4740            }
4741        }
4742
4743        Ok(results)
4744    }
4745
4746    pub(crate) fn execute_show_constraints(
4747        &self,
4748        clause: ShowConstraints,
4749    ) -> Vec<HashMap<String, Value>> {
4750        let schema = self.storage.schema_manager().schema();
4751        let mut rows = Vec::new();
4752        for c in &schema.constraints {
4753            if let Some(target) = &clause.target {
4754                match (target, &c.target) {
4755                    (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4756                    (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4757                        if e1 == e2 => {}
4758                    _ => continue,
4759                }
4760            }
4761
4762            let mut row = HashMap::new();
4763            row.insert("name".to_string(), Value::String(c.name.clone()));
4764            let type_str = match c.constraint_type {
4765                ConstraintType::Unique { .. } => "UNIQUE",
4766                ConstraintType::Exists { .. } => "EXISTS",
4767                ConstraintType::Check { .. } => "CHECK",
4768                _ => "UNKNOWN",
4769            };
4770            row.insert("type".to_string(), Value::String(type_str.to_string()));
4771
4772            let target_str = match &c.target {
4773                ConstraintTarget::Label(l) => format!("(:{})", l),
4774                ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4775                _ => "UNKNOWN".to_string(),
4776            };
4777            row.insert("target".to_string(), Value::String(target_str));
4778
4779            rows.push(row);
4780        }
4781        rows
4782    }
4783
4784    /// Execute a MERGE operation.
4785    pub(crate) async fn execute_cross_join(
4786        &self,
4787        left: Box<LogicalPlan>,
4788        right: Box<LogicalPlan>,
4789        prop_manager: &PropertyManager,
4790        params: &HashMap<String, Value>,
4791        ctx: Option<&QueryContext>,
4792    ) -> Result<Vec<HashMap<String, Value>>> {
4793        let left_rows = self
4794            .execute_subplan(*left, prop_manager, params, ctx)
4795            .await?;
4796        let right_rows = self
4797            .execute_subplan(*right, prop_manager, params, ctx)
4798            .await?;
4799
4800        let mut results = Vec::new();
4801        for l in &left_rows {
4802            for r in &right_rows {
4803                let mut combined = l.clone();
4804                combined.extend(r.clone());
4805                results.push(combined);
4806            }
4807        }
4808        Ok(results)
4809    }
4810
4811    /// Execute a UNION operation with optional deduplication.
4812    pub(crate) async fn execute_union(
4813        &self,
4814        left: Box<LogicalPlan>,
4815        right: Box<LogicalPlan>,
4816        all: bool,
4817        prop_manager: &PropertyManager,
4818        params: &HashMap<String, Value>,
4819        ctx: Option<&QueryContext>,
4820    ) -> Result<Vec<HashMap<String, Value>>> {
4821        let mut left_rows = self
4822            .execute_subplan(*left, prop_manager, params, ctx)
4823            .await?;
4824        let mut right_rows = self
4825            .execute_subplan(*right, prop_manager, params, ctx)
4826            .await?;
4827
4828        left_rows.append(&mut right_rows);
4829
4830        if !all {
4831            let mut seen = HashSet::new();
4832            left_rows.retain(|row| {
4833                let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4834                let key = format!("{sorted_row:?}");
4835                seen.insert(key)
4836            });
4837        }
4838        Ok(left_rows)
4839    }
4840
4841    /// Check if an index with the given name exists.
4842    pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4843        let schema = self.storage.schema_manager().schema();
4844        schema.indexes.iter().any(|idx| match idx {
4845            uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4846            uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4847            uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4848            _ => false,
4849        })
4850    }
4851
4852    pub(crate) async fn execute_export(
4853        &self,
4854        target: &str,
4855        source: &str,
4856        options: &HashMap<String, Value>,
4857        prop_manager: &PropertyManager,
4858        ctx: Option<&QueryContext>,
4859    ) -> Result<Vec<HashMap<String, Value>>> {
4860        let format = options
4861            .get("format")
4862            .and_then(|v| v.as_str())
4863            .unwrap_or("csv")
4864            .to_lowercase();
4865
4866        match format.as_str() {
4867            "csv" => {
4868                self.execute_csv_export(target, source, options, prop_manager, ctx)
4869                    .await
4870            }
4871            "parquet" => {
4872                self.execute_parquet_export(target, source, options, prop_manager, ctx)
4873                    .await
4874            }
4875            _ => Err(anyhow!("Unsupported export format: {}", format)),
4876        }
4877    }
4878
4879    pub(crate) async fn execute_csv_export(
4880        &self,
4881        target: &str,
4882        source: &str,
4883        options: &HashMap<String, Value>,
4884        prop_manager: &PropertyManager,
4885        ctx: Option<&QueryContext>,
4886    ) -> Result<Vec<HashMap<String, Value>>> {
4887        // Validate destination path against sandbox
4888        let validated_dest = self.validate_path(source)?;
4889
4890        let schema = self.storage.schema_manager().schema();
4891        let label_meta = schema.labels.get(target);
4892        let edge_meta = schema.edge_types.get(target);
4893
4894        if label_meta.is_none() && edge_meta.is_none() {
4895            return Err(anyhow!("Target '{}' not found in schema", target));
4896        }
4897
4898        let delimiter_str = options
4899            .get("delimiter")
4900            .and_then(|v| v.as_str())
4901            .unwrap_or(",");
4902        let delimiter = if delimiter_str.is_empty() {
4903            b','
4904        } else {
4905            delimiter_str.as_bytes()[0]
4906        };
4907        let has_header = options
4908            .get("header")
4909            .and_then(|v| v.as_bool())
4910            .unwrap_or(true);
4911
4912        let mut wtr = csv::WriterBuilder::new()
4913            .delimiter(delimiter)
4914            .from_path(&validated_dest)?;
4915
4916        let mut count = 0;
4917        // Empty properties map for labels/edge types without registered properties
4918        let empty_props = HashMap::new();
4919
4920        if let Some(meta) = label_meta {
4921            let label_id = meta.id;
4922            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4923            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4924            prop_names.sort();
4925
4926            let mut headers = vec!["_vid".to_string()];
4927            headers.extend(prop_names.clone());
4928
4929            if has_header {
4930                wtr.write_record(&headers)?;
4931            }
4932
4933            let vids = self
4934                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4935                .await?;
4936
4937            for vid in vids {
4938                let props = prop_manager
4939                    .get_all_vertex_props_with_ctx(vid, ctx)
4940                    .await?
4941                    .unwrap_or_default();
4942
4943                let mut row = Vec::with_capacity(headers.len());
4944                row.push(vid.to_string());
4945                for p_name in &prop_names {
4946                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4947                    row.push(self.format_csv_value(val));
4948                }
4949                wtr.write_record(&row)?;
4950                count += 1;
4951            }
4952        } else if let Some(meta) = edge_meta {
4953            let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4954            let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4955            prop_names.sort();
4956
4957            // Headers for Edge: _eid, _src, _dst, _type, ...props
4958            let mut headers = vec![
4959                "_eid".to_string(),
4960                "_src".to_string(),
4961                "_dst".to_string(),
4962                "_type".to_string(),
4963            ];
4964            headers.extend(prop_names.clone());
4965
4966            if has_header {
4967                wtr.write_record(&headers)?;
4968            }
4969
4970            let edges = self.scan_edge_type(target, ctx).await?;
4971
4972            for (eid, src, dst) in edges {
4973                let props = prop_manager
4974                    .get_all_edge_props_with_ctx(eid, ctx)
4975                    .await?
4976                    .unwrap_or_default();
4977
4978                let mut row = Vec::with_capacity(headers.len());
4979                row.push(eid.to_string());
4980                row.push(src.to_string());
4981                row.push(dst.to_string());
4982                row.push(meta.id.to_string());
4983
4984                for p_name in &prop_names {
4985                    let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4986                    row.push(self.format_csv_value(val));
4987                }
4988                wtr.write_record(&row)?;
4989                count += 1;
4990            }
4991        }
4992
4993        wtr.flush()?;
4994        let mut res = HashMap::new();
4995        res.insert("count".to_string(), Value::Int(count as i64));
4996        Ok(vec![res])
4997    }
4998
4999    /// Exports data to Parquet format.
5000    ///
5001    /// Supports local filesystem and cloud URLs (s3://, gs://, az://).
5002    pub(crate) async fn execute_parquet_export(
5003        &self,
5004        target: &str,
5005        destination: &str,
5006        _options: &HashMap<String, Value>,
5007        prop_manager: &PropertyManager,
5008        ctx: Option<&QueryContext>,
5009    ) -> Result<Vec<HashMap<String, Value>>> {
5010        let schema_manager = self.storage.schema_manager();
5011        let schema = schema_manager.schema();
5012        let label_meta = schema.labels.get(target);
5013        let edge_meta = schema.edge_types.get(target);
5014
5015        if label_meta.is_none() && edge_meta.is_none() {
5016            return Err(anyhow!("Target '{}' not found in schema", target));
5017        }
5018
5019        let arrow_schema = if label_meta.is_some() {
5020            let dataset = self.storage.vertex_dataset(target)?;
5021            dataset.get_arrow_schema(&schema)?
5022        } else {
5023            // Edge Schema
5024            let dataset = self.storage.edge_dataset(target, "", "")?;
5025            dataset.get_arrow_schema(&schema)?
5026        };
5027
5028        let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5029
5030        if let Some(meta) = label_meta {
5031            let label_id = meta.id;
5032            let vids = self
5033                .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5034                .await?;
5035
5036            for vid in vids {
5037                let mut props = prop_manager
5038                    .get_all_vertex_props_with_ctx(vid, ctx)
5039                    .await?
5040                    .unwrap_or_default();
5041
5042                props.insert(
5043                    "_vid".to_string(),
5044                    uni_common::Value::Int(vid.as_u64() as i64),
5045                );
5046                if !props.contains_key("_uid") {
5047                    props.insert(
5048                        "_uid".to_string(),
5049                        uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5050                    );
5051                }
5052                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5053                props.insert("_version".to_string(), uni_common::Value::Int(1));
5054                rows.push(props);
5055            }
5056        } else if edge_meta.is_some() {
5057            let edges = self.scan_edge_type(target, ctx).await?;
5058            for (eid, src, dst) in edges {
5059                let mut props = prop_manager
5060                    .get_all_edge_props_with_ctx(eid, ctx)
5061                    .await?
5062                    .unwrap_or_default();
5063
5064                props.insert(
5065                    "eid".to_string(),
5066                    uni_common::Value::Int(eid.as_u64() as i64),
5067                );
5068                props.insert(
5069                    "src_vid".to_string(),
5070                    uni_common::Value::Int(src.as_u64() as i64),
5071                );
5072                props.insert(
5073                    "dst_vid".to_string(),
5074                    uni_common::Value::Int(dst.as_u64() as i64),
5075                );
5076                props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5077                props.insert("_version".to_string(), uni_common::Value::Int(1));
5078                rows.push(props);
5079            }
5080        }
5081
5082        // Write to cloud or local file
5083        if is_cloud_url(destination) {
5084            self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5085                .await?;
5086        } else {
5087            // Validate local destination path against sandbox
5088            let validated_dest = self.validate_path(destination)?;
5089            let file = std::fs::File::create(&validated_dest)?;
5090            let mut writer =
5091                parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5092
5093            // Write all in one batch for now (simplification)
5094            if !rows.is_empty() {
5095                let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5096                writer.write(&batch)?;
5097            }
5098
5099            writer.close()?;
5100        }
5101
5102        let mut res = HashMap::new();
5103        res.insert("count".to_string(), Value::Int(rows.len() as i64));
5104        Ok(vec![res])
5105    }
5106
5107    /// Writes Parquet data to a cloud storage destination.
5108    async fn write_parquet_to_cloud(
5109        &self,
5110        dest_url: &str,
5111        rows: &[HashMap<String, uni_common::Value>],
5112        arrow_schema: &arrow_schema::Schema,
5113    ) -> Result<()> {
5114        use object_store::ObjectStore;
5115
5116        let (store, path) = build_store_from_url(dest_url)?;
5117
5118        // Write to an in-memory buffer
5119        let mut buffer = Vec::new();
5120        {
5121            let mut writer = parquet::arrow::ArrowWriter::try_new(
5122                &mut buffer,
5123                Arc::new(arrow_schema.clone()),
5124                None,
5125            )?;
5126
5127            if !rows.is_empty() {
5128                let batch = self.rows_to_batch(rows, arrow_schema)?;
5129                writer.write(&batch)?;
5130            }
5131
5132            writer.close()?;
5133        }
5134
5135        // Upload to cloud storage
5136        store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5137
5138        Ok(())
5139    }
5140
5141    pub(crate) fn rows_to_batch(
5142        &self,
5143        rows: &[HashMap<String, uni_common::Value>],
5144        schema: &arrow_schema::Schema,
5145    ) -> Result<RecordBatch> {
5146        let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5147
5148        for field in schema.fields() {
5149            let name = field.name();
5150            let dt = field.data_type();
5151
5152            let values: Vec<uni_common::Value> = rows
5153                .iter()
5154                .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5155                .collect();
5156            let array = self.values_to_array(&values, dt)?;
5157            columns.push(array);
5158        }
5159
5160        Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5161    }
5162
5163    /// Convert a slice of Values to an Arrow array.
5164    /// Delegates to the shared implementation in arrow_convert module.
5165    pub(crate) fn values_to_array(
5166        &self,
5167        values: &[uni_common::Value],
5168        dt: &arrow_schema::DataType,
5169    ) -> Result<Arc<dyn Array>> {
5170        arrow_convert::values_to_array(values, dt)
5171    }
5172
5173    pub(crate) fn format_csv_value(&self, val: Value) -> String {
5174        match val {
5175            Value::Null => "".to_string(),
5176            Value::String(s) => s,
5177            Value::Int(i) => i.to_string(),
5178            Value::Float(f) => f.to_string(),
5179            Value::Bool(b) => b.to_string(),
5180            _ => format!("{val}"),
5181        }
5182    }
5183
5184    pub(crate) fn parse_csv_value(
5185        &self,
5186        s: &str,
5187        data_type: &uni_common::core::schema::DataType,
5188        prop_name: &str,
5189    ) -> Result<Value> {
5190        if s.is_empty() || s.to_lowercase() == "null" {
5191            return Ok(Value::Null);
5192        }
5193
5194        use uni_common::core::schema::DataType;
5195        match data_type {
5196            DataType::String => Ok(Value::String(s.to_string())),
5197            DataType::Int32 | DataType::Int64 => {
5198                let i = s.parse::<i64>().map_err(|_| {
5199                    anyhow!(
5200                        "Failed to parse integer for property '{}': {}",
5201                        prop_name,
5202                        s
5203                    )
5204                })?;
5205                Ok(Value::Int(i))
5206            }
5207            DataType::Float32 | DataType::Float64 => {
5208                let f = s.parse::<f64>().map_err(|_| {
5209                    anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5210                })?;
5211                Ok(Value::Float(f))
5212            }
5213            DataType::Bool => {
5214                let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5215                    anyhow!(
5216                        "Failed to parse boolean for property '{}': {}",
5217                        prop_name,
5218                        s
5219                    )
5220                })?;
5221                Ok(Value::Bool(b))
5222            }
5223            DataType::CypherValue => {
5224                let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5225                    anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5226                })?;
5227                Ok(Value::from(json_val))
5228            }
5229            DataType::Vector { .. } => {
5230                let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5231                    anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5232                })?;
5233                Ok(Value::Vector(v))
5234            }
5235            _ => Ok(Value::String(s.to_string())),
5236        }
5237    }
5238
5239    pub(crate) async fn detach_delete_vertex(&self, vid: Vid, writer: &mut Writer) -> Result<()> {
5240        let schema = self.storage.schema_manager().schema();
5241        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5242
5243        // 1. Find and delete all outgoing edges
5244        let out_graph = self
5245            .storage
5246            .load_subgraph_cached(
5247                &[vid],
5248                &edge_type_ids,
5249                1,
5250                uni_store::runtime::Direction::Outgoing,
5251                Some(writer.l0_manager.get_current()),
5252            )
5253            .await?;
5254
5255        for edge in out_graph.edges() {
5256            writer
5257                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5258                .await?;
5259        }
5260
5261        // 2. Find and delete all incoming edges
5262        let in_graph = self
5263            .storage
5264            .load_subgraph_cached(
5265                &[vid],
5266                &edge_type_ids,
5267                1,
5268                uni_store::runtime::Direction::Incoming,
5269                Some(writer.l0_manager.get_current()),
5270            )
5271            .await?;
5272
5273        for edge in in_graph.edges() {
5274            writer
5275                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5276                .await?;
5277        }
5278
5279        Ok(())
5280    }
5281
5282    /// Batch detach-delete: load subgraphs for all VIDs at once, then delete edges and vertices.
5283    pub(crate) async fn batch_detach_delete_vertices(
5284        &self,
5285        vids: &[Vid],
5286        labels_per_vid: Vec<Option<Vec<String>>>,
5287        writer: &mut Writer,
5288    ) -> Result<()> {
5289        let schema = self.storage.schema_manager().schema();
5290        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5291
5292        // Load outgoing subgraph for all VIDs in one call.
5293        let out_graph = self
5294            .storage
5295            .load_subgraph_cached(
5296                vids,
5297                &edge_type_ids,
5298                1,
5299                uni_store::runtime::Direction::Outgoing,
5300                Some(writer.l0_manager.get_current()),
5301            )
5302            .await?;
5303
5304        for edge in out_graph.edges() {
5305            writer
5306                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5307                .await?;
5308        }
5309
5310        // Load incoming subgraph for all VIDs in one call.
5311        let in_graph = self
5312            .storage
5313            .load_subgraph_cached(
5314                vids,
5315                &edge_type_ids,
5316                1,
5317                uni_store::runtime::Direction::Incoming,
5318                Some(writer.l0_manager.get_current()),
5319            )
5320            .await?;
5321
5322        for edge in in_graph.edges() {
5323            writer
5324                .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5325                .await?;
5326        }
5327
5328        // Delete all vertices.
5329        for (vid, labels) in vids.iter().zip(labels_per_vid) {
5330            writer.delete_vertex(*vid, labels).await?;
5331        }
5332
5333        Ok(())
5334    }
5335}