Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::planner::LogicalPlan;
6use anyhow::{Result, anyhow};
7use std::collections::HashMap;
8use std::sync::Arc;
9use uni_common::DataType;
10use uni_common::core::id::{Eid, Vid};
11use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
12use uni_common::{Path, Value};
13use uni_cypher::ast::{
14    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
15    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
16    DropEdgeType, DropLabel, Expr, Pattern, PatternElement, RemoveItem, SetClause, SetItem,
17};
18use uni_store::QueryContext;
19use uni_store::runtime::property_manager::PropertyManager;
20use uni_store::runtime::writer::Writer;
21
22/// Identity fields extracted from a map-encoded edge.
23struct EdgeIdentity {
24    eid: Eid,
25    src: Vid,
26    dst: Vid,
27    edge_type_id: u32,
28}
29
30impl Executor {
31    /// Extracts labels from a node value.
32    ///
33    /// Handles both `Value::Map` (with a `_labels` list field) and
34    /// `Value::Node` (with a `labels` vec field).
35    ///
36    /// Returns `None` when the value is not a node or has no labels.
37    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
38        match node_val {
39            Value::Map(map) => {
40                // Map-encoded node: look for _labels array
41                if let Some(Value::List(labels_arr)) = map.get("_labels") {
42                    let labels: Vec<String> = labels_arr
43                        .iter()
44                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
45                        .collect();
46                    if !labels.is_empty() {
47                        return Some(labels);
48                    }
49                }
50                None
51            }
52            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
53            _ => None,
54        }
55    }
56
57    /// Extracts user-visible properties from a value that represents a node or edge.
58    ///
59    /// Strips internal bookkeeping keys (those prefixed with `_` or named
60    /// `ext_id`) from map-encoded entities and returns only the user-facing
61    /// property key-value pairs.
62    ///
63    /// Returns `None` when `val` is not a map, node, or edge.
64    pub(crate) fn extract_user_properties_from_value(
65        val: &Value,
66    ) -> Option<HashMap<String, Value>> {
67        match val {
68            Value::Map(map) => {
69                // Distinguish entity-encoded maps from plain map literals.
70                // A node map has both `_vid` and `_labels`.
71                // An edge map has `_eid`, `_src`, and `_dst`.
72                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
73                let is_edge_map = map.contains_key("_eid")
74                    && map.contains_key("_src")
75                    && map.contains_key("_dst");
76
77                if is_node_map || is_edge_map {
78                    // Filter out internal bookkeeping keys
79                    let user_props: HashMap<String, Value> = map
80                        .iter()
81                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
82                        .map(|(k, v)| (k.clone(), v.clone()))
83                        .collect();
84                    // When mutation output omits dotted property columns, user
85                    // properties live inside `_all_props` rather than at the
86                    // top level of the entity map.
87                    if user_props.is_empty()
88                        && let Some(Value::Map(all_props)) = map.get("_all_props")
89                    {
90                        return Some(all_props.clone());
91                    }
92                    Some(user_props)
93                } else {
94                    // Plain map literal — return as-is
95                    Some(map.clone())
96                }
97            }
98            Value::Node(node) => Some(node.properties.clone()),
99            Value::Edge(edge) => Some(edge.properties.clone()),
100            _ => None,
101        }
102    }
103
104    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
105    ///
106    /// When `replace` is `true` the entity's property set is replaced: keys absent
107    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
108    /// layer removes them.  When `replace` is `false` the map is merged: keys in
109    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
110    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
111    /// modes.
112    ///
113    /// Labels are never altered — the spec states that `SET n = map` replaces
114    /// properties only.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the entity cannot be found in the storage layer, or
119    /// if the writer fails to persist the updated properties.
120    #[expect(clippy::too_many_arguments)]
121    async fn apply_properties_to_entity(
122        &self,
123        variable: &str,
124        new_props: HashMap<String, Value>,
125        replace: bool,
126        row: &mut HashMap<String, Value>,
127        writer: &mut Writer,
128        prop_manager: &PropertyManager,
129        params: &HashMap<String, Value>,
130        ctx: Option<&QueryContext>,
131        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
132    ) -> Result<()> {
133        // Clone the target so we can hold &row references elsewhere.
134        let target = row.get(variable).cloned();
135
136        match target {
137            Some(Value::Node(ref node)) => {
138                let vid = node.vid;
139                let labels = node.labels.clone();
140                let current = prop_manager
141                    .get_all_vertex_props_with_ctx(vid, ctx)
142                    .await?
143                    .unwrap_or_default();
144                let write_props = Self::merge_props(current, new_props, replace);
145                let mut enriched = write_props.clone();
146                for label_name in &labels {
147                    self.enrich_properties_with_generated_columns(
148                        label_name,
149                        &mut enriched,
150                        prop_manager,
151                        params,
152                        ctx,
153                    )
154                    .await?;
155                }
156                let _ = writer
157                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
158                    .await?;
159                // Update the in-memory row binding
160                if let Some(Value::Node(n)) = row.get_mut(variable) {
161                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
162                }
163            }
164            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
165                let vid = Self::vid_from_value(node_val)?;
166                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
167                let current = prop_manager
168                    .get_all_vertex_props_with_ctx(vid, ctx)
169                    .await?
170                    .unwrap_or_default();
171                let write_props = Self::merge_props(current, new_props, replace);
172                let mut enriched = write_props.clone();
173                for label_name in &labels {
174                    self.enrich_properties_with_generated_columns(
175                        label_name,
176                        &mut enriched,
177                        prop_manager,
178                        params,
179                        ctx,
180                    )
181                    .await?;
182                }
183                let _ = writer
184                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
185                    .await?;
186                // Update the in-memory map-encoded node binding
187                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
188                    // Remove old user property keys, keep internal fields
189                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
190                    // Build effective (non-null) properties
191                    let effective: HashMap<String, Value> =
192                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
193                    for (k, v) in &effective {
194                        node_map.insert(k.clone(), v.clone());
195                    }
196                    // Replace _all_props to reflect the complete property set
197                    node_map.insert("_all_props".to_string(), Value::Map(effective));
198                }
199            }
200            Some(Value::Edge(ref edge)) => {
201                let eid = edge.eid;
202                let src = edge.src;
203                let dst = edge.dst;
204                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
205                let current = prop_manager
206                    .get_all_edge_props_with_ctx(eid, ctx)
207                    .await?
208                    .unwrap_or_default();
209                let write_props = Self::merge_props(current, new_props, replace);
210                writer
211                    .insert_edge(
212                        src,
213                        dst,
214                        etype,
215                        eid,
216                        write_props.clone(),
217                        Some(edge.edge_type.clone()),
218                        tx_l0,
219                    )
220                    .await?;
221                // Update the in-memory row binding
222                if let Some(Value::Edge(e)) = row.get_mut(variable) {
223                    e.properties = write_props
224                        .into_iter()
225                        .filter(|(_, v)| !v.is_null())
226                        .collect();
227                }
228            }
229            Some(Value::Map(ref map))
230                if map.contains_key("_eid")
231                    && map.contains_key("_src")
232                    && map.contains_key("_dst") =>
233            {
234                let ei = self.extract_edge_identity(map)?;
235                let current = prop_manager
236                    .get_all_edge_props_with_ctx(ei.eid, ctx)
237                    .await?
238                    .unwrap_or_default();
239                let write_props = Self::merge_props(current, new_props, replace);
240                let edge_type_name = map
241                    .get("_type")
242                    .and_then(|v| v.as_str())
243                    .map(|s| s.to_string())
244                    .or_else(|| {
245                        self.storage
246                            .schema_manager()
247                            .edge_type_name_by_id_unified(ei.edge_type_id)
248                    });
249                writer
250                    .insert_edge(
251                        ei.src,
252                        ei.dst,
253                        ei.edge_type_id,
254                        ei.eid,
255                        write_props.clone(),
256                        edge_type_name,
257                        tx_l0,
258                    )
259                    .await?;
260                // Update the in-memory map-encoded edge binding
261                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
262                    edge_map.retain(|k, _| k.starts_with('_'));
263                    let effective: HashMap<String, Value> = write_props
264                        .into_iter()
265                        .filter(|(_, v)| !v.is_null())
266                        .collect();
267                    for (k, v) in &effective {
268                        edge_map.insert(k.clone(), v.clone());
269                    }
270                    // Replace _all_props to reflect the complete property set
271                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
272                }
273            }
274            _ => {
275                // No matching entity — nothing to do (caller already guarded against Null)
276            }
277        }
278        Ok(())
279    }
280
281    /// Computes the property map to write given current storage state and the
282    /// incoming change map.
283    ///
284    /// When `replace` is `true`, keys present in `current` but absent from
285    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
286    /// `incoming` are always preserved as explicit tombstones.
287    ///
288    /// When `replace` is `false`, `current` is the base and `incoming` is
289    /// merged on top: each key in `incoming` overwrites or tombstones the
290    /// corresponding entry in `current`.
291    fn merge_props(
292        current: HashMap<String, Value>,
293        incoming: HashMap<String, Value>,
294        replace: bool,
295    ) -> HashMap<String, Value> {
296        if replace {
297            // Start from the non-null incoming entries only.
298            let mut result: HashMap<String, Value> = incoming
299                .iter()
300                .filter(|(_, v)| !v.is_null())
301                .map(|(k, v)| (k.clone(), v.clone()))
302                .collect();
303            // Tombstone every current key that is absent from incoming OR explicitly
304            // set to null in incoming (both mean "delete this property").
305            for k in current.keys() {
306                if incoming.get(k).is_none_or(|v| v.is_null()) {
307                    result.insert(k.clone(), Value::Null);
308                }
309            }
310            result
311        } else {
312            // Merge: start from current and apply incoming on top
313            let mut result = current;
314            result.extend(incoming);
315            result
316        }
317    }
318
319    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
320    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
321        let eid = Eid::from(
322            map.get("_eid")
323                .and_then(|v| v.as_u64())
324                .ok_or_else(|| anyhow!("Invalid _eid"))?,
325        );
326        let src = Vid::from(
327            map.get("_src")
328                .and_then(|v| v.as_u64())
329                .ok_or_else(|| anyhow!("Invalid _src"))?,
330        );
331        let dst = Vid::from(
332            map.get("_dst")
333                .and_then(|v| v.as_u64())
334                .ok_or_else(|| anyhow!("Invalid _dst"))?,
335        );
336        let edge_type_id = self.resolve_edge_type_id(
337            map.get("_type")
338                .ok_or_else(|| anyhow!("Missing _type on edge map"))?,
339        )?;
340        Ok(EdgeIdentity {
341            eid,
342            src,
343            dst,
344            edge_type_id,
345        })
346    }
347
348    /// Resolve edge type ID from a Value, supporting both Int and String representations.
349    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
350    ///
351    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
352    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
353    /// where the edge type was just created and may not be in the read-only lookup yet.
354    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
355        match type_val {
356            Value::Int(i) => Ok(*i as u32),
357            Value::String(name) => {
358                // Use get_or_assign to support schemaless edge types
359                // (will create new ID if not found in schema or registry)
360                Ok(self
361                    .storage
362                    .schema_manager()
363                    .get_or_assign_edge_type_id(name))
364            }
365            _ => Err(anyhow!(
366                "Invalid _type value: expected Int or String, got {:?}",
367                type_val
368            )),
369        }
370    }
371
372    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
373        if let Some(writer_arc) = &self.writer {
374            // Flush first while holding the lock
375            {
376                let mut writer = writer_arc.write().await;
377                writer.flush_to_l1(None).await?;
378            } // Drop lock before compacting to avoid blocking reads/writes
379
380            // Compaction can run without holding the writer lock
381            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
382            let compaction_results = compactor.compact_all().await?;
383
384            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
385            let am = self.storage.adjacency_manager();
386            let schema = self.storage.schema_manager().schema();
387            for info in compaction_results {
388                // Convert string direction to Direction enum
389                let direction = match info.direction.as_str() {
390                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
391                    "bwd" => uni_store::storage::direction::Direction::Incoming,
392                    _ => continue,
393                };
394
395                // Get edge_type_id
396                if let Some(edge_type_id) =
397                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
398                {
399                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
400                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
401                }
402            }
403        }
404        Ok(())
405    }
406
407    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
408        if let Some(writer_arc) = &self.writer {
409            let mut writer = writer_arc.write().await;
410            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
411        }
412        Ok(())
413    }
414
415    pub(crate) async fn execute_copy_to(
416        &self,
417        identifier: &str,
418        path: &str,
419        format: &str,
420        options: &HashMap<String, Value>,
421    ) -> Result<usize> {
422        // Check schema to determine if identifier is an edge type or vertex label
423        let schema = self.storage.schema_manager().schema();
424
425        // Try as edge type first
426        if schema.get_edge_type_case_insensitive(identifier).is_some() {
427            return self
428                .export_edge_type_in_format(identifier, path, format)
429                .await;
430        }
431
432        // Try as vertex label
433        if schema.get_label_case_insensitive(identifier).is_some() {
434            return self
435                .export_vertex_label_in_format(identifier, path, format, options)
436                .await;
437        }
438
439        // Neither edge type nor vertex label found
440        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
441    }
442
443    async fn export_vertex_label_in_format(
444        &self,
445        label: &str,
446        path: &str,
447        format: &str,
448        _options: &HashMap<String, Value>,
449    ) -> Result<usize> {
450        match format {
451            "parquet" => self.export_vertex_label(label, path).await,
452            "csv" => {
453                let mut stream = self
454                    .storage
455                    .scan_vertex_table_stream(label)
456                    .await?
457                    .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
458
459                // Collect all batches
460                let mut all_rows = Vec::new();
461                let mut column_names = Vec::new();
462
463                // Iterate stream using StreamExt
464                use futures::StreamExt;
465                while let Some(batch_result) = stream.next().await {
466                    let batch = batch_result?;
467
468                    // Get column names from first batch
469                    if column_names.is_empty() {
470                        column_names = batch
471                            .schema()
472                            .fields()
473                            .iter()
474                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
475                            .map(|f| f.name().clone())
476                            .collect();
477                    }
478
479                    // Convert batch to rows
480                    for row_idx in 0..batch.num_rows() {
481                        let mut row = Vec::new();
482                        for field in batch.schema().fields() {
483                            if field.name().starts_with('_') || field.name() == "ext_id" {
484                                continue;
485                            }
486
487                            let col_idx = batch.schema().index_of(field.name())?;
488                            let column = batch.column(col_idx);
489                            let value = self.arrow_value_to_json(column, row_idx)?;
490
491                            // Convert value to CSV string
492                            let csv_value = match value {
493                                Value::Null => String::new(),
494                                Value::Bool(b) => b.to_string(),
495                                Value::Int(i) => i.to_string(),
496                                Value::Float(f) => f.to_string(),
497                                Value::String(s) => s,
498                                _ => format!("{value}"),
499                            };
500                            row.push(csv_value);
501                        }
502                        all_rows.push(row);
503                    }
504                }
505
506                // Write CSV
507                let file = std::fs::File::create(path)?;
508                let mut wtr = csv::Writer::from_writer(file);
509
510                // Write headers
511                log::debug!("CSV export headers: {:?}", column_names);
512                wtr.write_record(&column_names)?;
513
514                // Write rows
515                for (i, row) in all_rows.iter().enumerate() {
516                    log::debug!("CSV export row {}: {:?}", i, row);
517                    wtr.write_record(row)?;
518                }
519
520                wtr.flush()?;
521                Ok(all_rows.len())
522            }
523            _ => Err(anyhow!(
524                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
525                format
526            )),
527        }
528    }
529
530    async fn export_edge_type_in_format(
531        &self,
532        edge_type: &str,
533        path: &str,
534        format: &str,
535    ) -> Result<usize> {
536        match format {
537            "parquet" => self.export_edge_type(edge_type, path).await,
538            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
539            _ => Err(anyhow!(
540                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
541                format
542            )),
543        }
544    }
545
546    /// Write a stream of record batches to a Parquet file.
547    /// Returns the total number of rows written, or 0 if the stream is empty.
548    async fn write_batches_to_parquet(
549        mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
550        path: &str,
551        entity_description: &str,
552    ) -> Result<usize> {
553        use futures::TryStreamExt;
554
555        // Get first batch to determine schema and create writer
556        let first_batch = match stream.try_next().await? {
557            Some(batch) => batch,
558            None => {
559                log::info!("No data to export from {}", entity_description);
560                return Ok(0);
561            }
562        };
563
564        // Create Parquet writer using schema from first batch
565        let file = std::fs::File::create(path)?;
566        let arrow_schema = first_batch.schema();
567        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
568
569        // Write first batch
570        let mut count = first_batch.num_rows();
571        writer.write(&first_batch)?;
572
573        // Write remaining batches
574        while let Some(batch) = stream.try_next().await? {
575            count += batch.num_rows();
576            writer.write(&batch)?;
577        }
578
579        writer.close()?;
580
581        log::info!(
582            "Exported {} rows from {} to '{}'",
583            count,
584            entity_description,
585            path
586        );
587        Ok(count)
588    }
589
590    /// Export vertices of a specific label to Parquet
591    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
592        let stream = self
593            .storage
594            .scan_vertex_table_stream(label)
595            .await?
596            .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
597
598        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
599    }
600
601    /// Export edges of a specific type to Parquet
602    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
603        let schema = self.storage.schema_manager().schema();
604        if !schema.edge_types.contains_key(edge_type) {
605            return Err(anyhow!("Edge type '{}' not found", edge_type));
606        }
607
608        let filter = format!("type = '{}'", edge_type);
609        let stream = self
610            .storage
611            .scan_main_edge_table_stream(Some(&filter))
612            .await?
613            .ok_or_else(|| anyhow!("No edge data found"))?;
614
615        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
616    }
617
618    pub(crate) async fn execute_copy_from(
619        &self,
620        label: &str,
621        path: &str,
622        format: &str,
623        options: &HashMap<String, Value>,
624    ) -> Result<usize> {
625        // Read data from file
626        let batches = match format {
627            "parquet" => self.read_parquet_file(path)?,
628            "csv" => self.read_csv_file(path, label, options)?,
629            _ => {
630                return Err(anyhow!(
631                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
632                    format
633                ));
634            }
635        };
636
637        // Get writer
638        let writer_arc = self
639            .writer
640            .as_ref()
641            .ok_or_else(|| anyhow!("No writer available"))?;
642
643        let db_schema = self.storage.schema_manager().schema();
644
645        // Check if this is a label (vertex) or edge type
646        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
647
648        if is_edge {
649            // Import edges
650            let edge_type_id = db_schema
651                .edge_type_id_by_name(label)
652                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
653
654            // Get src and dst column names from options
655            let src_col = options
656                .get("src_col")
657                .and_then(|v| v.as_str())
658                .unwrap_or("src");
659            let dst_col = options
660                .get("dst_col")
661                .and_then(|v| v.as_str())
662                .unwrap_or("dst");
663
664            let mut total_rows = 0;
665            for batch in batches {
666                let num_rows = batch.num_rows();
667
668                for row_idx in 0..num_rows {
669                    let mut properties = HashMap::new();
670                    let mut src_vid: Option<Vid> = None;
671                    let mut dst_vid: Option<Vid> = None;
672
673                    // Extract properties and VIDs from each column
674                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
675                        let col_name = field.name();
676                        let column = batch.column(col_idx);
677                        let value = self.arrow_value_to_json(column, row_idx)?;
678
679                        if col_name == src_col {
680                            let raw = value.as_u64().unwrap_or_else(|| {
681                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
682                            });
683                            src_vid = Some(Vid::new(raw));
684                        } else if col_name == dst_col {
685                            let raw = value.as_u64().unwrap_or_else(|| {
686                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
687                            });
688                            dst_vid = Some(Vid::new(raw));
689                        } else if !col_name.starts_with('_') && !value.is_null() {
690                            properties.insert(col_name.clone(), value);
691                        }
692                    }
693
694                    let src = src_vid
695                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
696                    let dst = dst_vid
697                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
698
699                    // Generate EID and insert edge
700                    let mut writer = writer_arc.write().await;
701                    let eid = writer.next_eid(edge_type_id).await?;
702                    writer
703                        .insert_edge(
704                            src,
705                            dst,
706                            edge_type_id,
707                            eid,
708                            properties,
709                            Some(label.to_string()),
710                            None,
711                        )
712                        .await?;
713
714                    total_rows += 1;
715                }
716            }
717
718            log::info!(
719                "Imported {} edge rows from '{}' into edge type '{}'",
720                total_rows,
721                path,
722                label
723            );
724
725            // Flush to persist edges
726            if total_rows > 0 {
727                let mut writer = writer_arc.write().await;
728                writer.flush_to_l1(None).await?;
729            }
730
731            Ok(total_rows)
732        } else {
733            // Import vertices
734            // Validate the label exists in schema
735            db_schema
736                .label_id_by_name_case_insensitive(label)
737                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
738
739            let mut total_rows = 0;
740            for batch in batches {
741                let num_rows = batch.num_rows();
742
743                // Convert Arrow batch to rows
744                for row_idx in 0..num_rows {
745                    let mut properties = HashMap::new();
746
747                    // Extract properties from each column
748                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
749                        let col_name = field.name();
750
751                        // Skip internal columns
752                        if col_name.starts_with('_') {
753                            continue;
754                        }
755
756                        let column = batch.column(col_idx);
757                        let value = self.arrow_value_to_json(column, row_idx)?;
758
759                        if !value.is_null() {
760                            properties.insert(col_name.clone(), value);
761                        }
762                    }
763
764                    // Generate VID and insert
765                    let mut writer = writer_arc.write().await;
766                    let vid = writer.next_vid().await?;
767                    let _ = writer
768                        .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
769                        .await?;
770
771                    total_rows += 1;
772                }
773            }
774
775            log::info!(
776                "Imported {} rows from '{}' into label '{}'",
777                total_rows,
778                path,
779                label
780            );
781
782            // Flush to persist vertices
783            if total_rows > 0 {
784                let mut writer = writer_arc.write().await;
785                writer.flush_to_l1(None).await?;
786            }
787
788            Ok(total_rows)
789        }
790    }
791
792    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
793        use arrow_array::Array;
794        use arrow_schema::DataType as ArrowDataType;
795
796        if column.is_null(row_idx) {
797            return Ok(Value::Null);
798        }
799
800        match column.data_type() {
801            ArrowDataType::Utf8 => {
802                let array = column
803                    .as_any()
804                    .downcast_ref::<arrow_array::StringArray>()
805                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
806                Ok(Value::String(array.value(row_idx).to_string()))
807            }
808            ArrowDataType::Int32 => {
809                let array = column
810                    .as_any()
811                    .downcast_ref::<arrow_array::Int32Array>()
812                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
813                Ok(Value::Int(array.value(row_idx) as i64))
814            }
815            ArrowDataType::Int64 => {
816                let array = column
817                    .as_any()
818                    .downcast_ref::<arrow_array::Int64Array>()
819                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
820                Ok(Value::Int(array.value(row_idx)))
821            }
822            ArrowDataType::Float32 => {
823                let array = column
824                    .as_any()
825                    .downcast_ref::<arrow_array::Float32Array>()
826                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
827                Ok(Value::Float(array.value(row_idx) as f64))
828            }
829            ArrowDataType::Float64 => {
830                let array = column
831                    .as_any()
832                    .downcast_ref::<arrow_array::Float64Array>()
833                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
834                Ok(Value::Float(array.value(row_idx)))
835            }
836            ArrowDataType::Boolean => {
837                let array = column
838                    .as_any()
839                    .downcast_ref::<arrow_array::BooleanArray>()
840                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
841                Ok(Value::Bool(array.value(row_idx)))
842            }
843            ArrowDataType::UInt64 => {
844                let array = column
845                    .as_any()
846                    .downcast_ref::<arrow_array::UInt64Array>()
847                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
848                Ok(Value::Int(array.value(row_idx) as i64))
849            }
850            _ => {
851                // For other types, try to convert to string
852                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
853                if let Some(arr) = array {
854                    Ok(Value::String(arr.value(row_idx).to_string()))
855                } else {
856                    Ok(Value::Null)
857                }
858            }
859        }
860    }
861
862    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
863        let file = std::fs::File::open(path)?;
864        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
865            .build()?;
866        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
867    }
868
869    fn read_csv_file(
870        &self,
871        path: &str,
872        label: &str,
873        options: &HashMap<String, Value>,
874    ) -> Result<Vec<arrow_array::RecordBatch>> {
875        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
876        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
877        use std::sync::Arc;
878
879        // Parse CSV options
880        let has_headers = options
881            .get("headers")
882            .and_then(|v| v.as_bool())
883            .unwrap_or(true);
884
885        // Read CSV file
886        let file = std::fs::File::open(path)?;
887        let mut rdr = csv::ReaderBuilder::new()
888            .has_headers(has_headers)
889            .from_reader(file);
890
891        // Get schema for type conversion
892        let db_schema = self.storage.schema_manager().schema();
893        let properties = db_schema.properties.get(label);
894
895        // Collect all rows first to determine schema
896        let mut rows: Vec<Vec<String>> = Vec::new();
897        let headers: Vec<String> = if has_headers {
898            rdr.headers()?.iter().map(|s| s.to_string()).collect()
899        } else {
900            Vec::new()
901        };
902
903        for result in rdr.records() {
904            let record = result?;
905            rows.push(record.iter().map(|s| s.to_string()).collect());
906        }
907
908        if rows.is_empty() {
909            return Ok(Vec::new());
910        }
911
912        // Build Arrow schema with proper types based on DB schema
913        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
914        let col_names: Vec<String> = if has_headers {
915            headers
916        } else {
917            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
918        };
919
920        for name in &col_names {
921            let arrow_type = if let Some(props) = properties {
922                if let Some(prop_meta) = props.get(name) {
923                    match prop_meta.r#type {
924                        DataType::Int32 => ArrowDataType::Int32,
925                        DataType::Int64 => ArrowDataType::Int64,
926                        DataType::Float32 => ArrowDataType::Float32,
927                        DataType::Float64 => ArrowDataType::Float64,
928                        DataType::Bool => ArrowDataType::Boolean,
929                        _ => ArrowDataType::Utf8,
930                    }
931                } else {
932                    ArrowDataType::Utf8
933                }
934            } else {
935                ArrowDataType::Utf8
936            };
937            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
938        }
939
940        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
941
942        // Convert rows to Arrow arrays with proper types
943        let mut columns: Vec<ArrayRef> = Vec::new();
944        for (col_idx, field) in arrow_fields.iter().enumerate() {
945            match field.data_type() {
946                ArrowDataType::Int32 => {
947                    let values: Vec<Option<i32>> = rows
948                        .iter()
949                        .map(|row| {
950                            if col_idx < row.len() {
951                                row[col_idx].parse().ok()
952                            } else {
953                                None
954                            }
955                        })
956                        .collect();
957                    columns.push(Arc::new(Int32Array::from(values)));
958                }
959                _ => {
960                    // Default to string
961                    let values: Vec<Option<String>> = rows
962                        .iter()
963                        .map(|row| {
964                            if col_idx < row.len() {
965                                Some(row[col_idx].clone())
966                            } else {
967                                None
968                            }
969                        })
970                        .collect();
971                    columns.push(Arc::new(StringArray::from(values)));
972                }
973            }
974        }
975
976        let batch = RecordBatch::try_new(arrow_schema, columns)?;
977        Ok(vec![batch])
978    }
979
980    fn parse_data_type(type_str: &str) -> Result<DataType> {
981        use uni_common::core::schema::{CrdtType, PointType};
982        let type_str = type_str.to_lowercase();
983        let type_str = type_str.trim();
984        match type_str {
985            "string" | "text" | "varchar" => Ok(DataType::String),
986            "int" | "integer" | "int32" => Ok(DataType::Int32),
987            "long" | "int64" | "bigint" => Ok(DataType::Int64),
988            "float" | "float32" | "real" => Ok(DataType::Float32),
989            "double" | "float64" => Ok(DataType::Float64),
990            "bool" | "boolean" => Ok(DataType::Bool),
991            "timestamp" => Ok(DataType::Timestamp),
992            "date" => Ok(DataType::Date),
993            "time" => Ok(DataType::Time),
994            "datetime" => Ok(DataType::DateTime),
995            "duration" => Ok(DataType::Duration),
996            "btic" => Ok(DataType::Btic),
997            "json" | "jsonb" => Ok(DataType::CypherValue),
998            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
999            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1000            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1001            s if s.starts_with("vector(") && s.ends_with(')') => {
1002                let dims_str = &s[7..s.len() - 1];
1003                let dimensions = dims_str
1004                    .parse::<usize>()
1005                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1006                Ok(DataType::Vector { dimensions })
1007            }
1008            s if s.starts_with("list<") && s.ends_with('>') => {
1009                let inner_type_str = &s[5..s.len() - 1];
1010                let inner_type = Self::parse_data_type(inner_type_str)?;
1011                Ok(DataType::List(Box::new(inner_type)))
1012            }
1013            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1014            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1015            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1016        }
1017    }
1018
1019    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1020        let sm = self.storage.schema_manager_arc();
1021        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1022            return Ok(());
1023        }
1024        sm.add_label(&clause.name)?;
1025        for prop in clause.properties {
1026            let dt = Self::parse_data_type(&prop.data_type)?;
1027            sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1028            if prop.unique {
1029                let constraint = Constraint {
1030                    name: format!("{}_{}_unique", clause.name, prop.name),
1031                    constraint_type: ConstraintType::Unique {
1032                        properties: vec![prop.name],
1033                    },
1034                    target: ConstraintTarget::Label(clause.name.clone()),
1035                    enabled: true,
1036                };
1037                sm.add_constraint(constraint)?;
1038            }
1039        }
1040        sm.save().await?;
1041        Ok(())
1042    }
1043
1044    pub(crate) async fn enrich_properties_with_generated_columns(
1045        &self,
1046        label_name: &str,
1047        properties: &mut HashMap<String, Value>,
1048        prop_manager: &PropertyManager,
1049        params: &HashMap<String, Value>,
1050        ctx: Option<&QueryContext>,
1051    ) -> Result<()> {
1052        let schema = self.storage.schema_manager().schema();
1053
1054        if let Some(props_meta) = schema.properties.get(label_name) {
1055            let mut generators = Vec::new();
1056            for (prop_name, meta) in props_meta {
1057                if let Some(expr_str) = &meta.generation_expression {
1058                    generators.push((prop_name.clone(), expr_str.clone()));
1059                }
1060            }
1061
1062            for (prop_name, expr_str) in generators {
1063                let cache_key = (label_name.to_string(), prop_name.clone());
1064                let expr = {
1065                    let cache = self.gen_expr_cache.read().await;
1066                    cache.get(&cache_key).cloned()
1067                };
1068
1069                let expr = match expr {
1070                    Some(e) => e,
1071                    None => {
1072                        let parsed = uni_cypher::parse_expression(&expr_str)
1073                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1074                        let mut cache = self.gen_expr_cache.write().await;
1075                        cache.insert(cache_key, parsed.clone());
1076                        parsed
1077                    }
1078                };
1079
1080                let mut scope = HashMap::new();
1081
1082                // If expression has an explicit variable, use it as an object
1083                if let Some(var) = expr.extract_variable() {
1084                    scope.insert(var, Value::Map(properties.clone()));
1085                } else {
1086                    // No explicit variable - add properties directly to scope for bare references
1087                    // e.g., "lower(email)" can reference "email" directly
1088                    for (k, v) in properties.iter() {
1089                        scope.insert(k.clone(), v.clone());
1090                    }
1091                }
1092
1093                let val = self
1094                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1095                    .await?;
1096                properties.insert(prop_name, val);
1097            }
1098        }
1099        Ok(())
1100    }
1101
1102    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1103        let sm = self.storage.schema_manager_arc();
1104        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1105            return Ok(());
1106        }
1107        sm.add_edge_type(&clause.name, clause.src_labels, clause.dst_labels)?;
1108        for prop in clause.properties {
1109            let dt = Self::parse_data_type(&prop.data_type)?;
1110            sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1111        }
1112        sm.save().await?;
1113        Ok(())
1114    }
1115
1116    /// Executes an ALTER action on a schema entity.
1117    ///
1118    /// This is a shared helper for both `execute_alter_label` and
1119    /// `execute_alter_edge_type` since they have identical logic.
1120    pub(crate) async fn execute_alter_entity(
1121        sm: &Arc<SchemaManager>,
1122        entity_name: &str,
1123        action: AlterAction,
1124    ) -> Result<()> {
1125        match action {
1126            AlterAction::AddProperty(prop) => {
1127                let dt = Self::parse_data_type(&prop.data_type)?;
1128                sm.add_property(entity_name, &prop.name, dt, prop.nullable)?;
1129            }
1130            AlterAction::DropProperty(prop_name) => {
1131                sm.drop_property(entity_name, &prop_name)?;
1132            }
1133            AlterAction::RenameProperty { old_name, new_name } => {
1134                sm.rename_property(entity_name, &old_name, &new_name)?;
1135            }
1136        }
1137        sm.save().await?;
1138        Ok(())
1139    }
1140
1141    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1142        Self::execute_alter_entity(
1143            &self.storage.schema_manager_arc(),
1144            &clause.name,
1145            clause.action,
1146        )
1147        .await
1148    }
1149
1150    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1151        Self::execute_alter_entity(
1152            &self.storage.schema_manager_arc(),
1153            &clause.name,
1154            clause.action,
1155        )
1156        .await
1157    }
1158
1159    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1160        let sm = self.storage.schema_manager_arc();
1161        sm.drop_label(&clause.name, clause.if_exists)?;
1162        sm.save().await?;
1163        Ok(())
1164    }
1165
1166    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1167        let sm = self.storage.schema_manager_arc();
1168        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1169        sm.save().await?;
1170        Ok(())
1171    }
1172
1173    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1174        let sm = self.storage.schema_manager_arc();
1175        let target = ConstraintTarget::Label(clause.label);
1176        let c_type = match clause.constraint_type {
1177            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1178                properties: clause.properties,
1179            },
1180            AstConstraintType::Exists => {
1181                let property = clause
1182                    .properties
1183                    .into_iter()
1184                    .next()
1185                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1186                ConstraintType::Exists { property }
1187            }
1188            AstConstraintType::Check => {
1189                let expression = clause
1190                    .expression
1191                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1192                ConstraintType::Check {
1193                    expression: expression.to_string_repr(),
1194                }
1195            }
1196        };
1197
1198        let constraint = Constraint {
1199            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1200            constraint_type: c_type,
1201            target,
1202            enabled: true,
1203        };
1204
1205        sm.add_constraint(constraint)?;
1206        sm.save().await?;
1207        Ok(())
1208    }
1209
1210    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1211        let sm = self.storage.schema_manager_arc();
1212        sm.drop_constraint(&clause.name, false)?;
1213        sm.save().await?;
1214        Ok(())
1215    }
1216
1217    fn get_composite_constraint(&self, label: &str) -> Option<Constraint> {
1218        let schema = self.storage.schema_manager().schema();
1219        schema
1220            .constraints
1221            .iter()
1222            .find(|c| {
1223                if !c.enabled {
1224                    return false;
1225                }
1226                match &c.target {
1227                    ConstraintTarget::Label(l) if l == label => {
1228                        matches!(c.constraint_type, ConstraintType::Unique { .. })
1229                    }
1230                    _ => false,
1231                }
1232            })
1233            .cloned()
1234    }
1235
1236    #[expect(clippy::too_many_arguments)]
1237    pub(crate) async fn execute_merge(
1238        &self,
1239        rows: Vec<HashMap<String, Value>>,
1240        pattern: &Pattern,
1241        on_match: Option<&SetClause>,
1242        on_create: Option<&SetClause>,
1243        prop_manager: &PropertyManager,
1244        params: &HashMap<String, Value>,
1245        ctx: Option<&QueryContext>,
1246        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1247    ) -> Result<Vec<HashMap<String, Value>>> {
1248        let writer_lock = self
1249            .writer
1250            .as_ref()
1251            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1252
1253        // Prepare pattern for path variable binding: assign temp edge variable
1254        // names to unnamed relationships in paths that have path variables.
1255        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1256
1257        let mut results = Vec::new();
1258        for mut row in rows {
1259            // Optimization: Check for single node pattern with unique constraint
1260            let mut optimized_vid = None;
1261            if pattern.paths.len() == 1 {
1262                let path = &pattern.paths[0];
1263                if path.elements.len() == 1
1264                    && let PatternElement::Node(n) = &path.elements[0]
1265                    && n.labels.len() == 1
1266                    && let Some(constraint) = self.get_composite_constraint(&n.labels[0])
1267                    && let ConstraintType::Unique { properties } = constraint.constraint_type
1268                {
1269                    let label = &n.labels[0];
1270                    // Evaluate pattern properties
1271                    let mut pattern_props = HashMap::new();
1272                    if let Some(props_expr) = &n.properties {
1273                        let val = self
1274                            .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1275                            .await?;
1276                        if let Value::Map(map) = val {
1277                            for (k, v) in map {
1278                                pattern_props.insert(k, v);
1279                            }
1280                        }
1281                    }
1282
1283                    // Check if all constraint properties are present
1284                    let has_all_keys = properties.iter().all(|p| pattern_props.contains_key(p));
1285                    if has_all_keys {
1286                        // Extract key properties and convert to serde_json::Value for index lookup
1287                        let key_props: HashMap<String, serde_json::Value> = properties
1288                            .iter()
1289                            .filter_map(|p| {
1290                                pattern_props.get(p).map(|v| (p.clone(), v.clone().into()))
1291                            })
1292                            .collect();
1293
1294                        // Use optimized lookup
1295                        if let Ok(Some(vid)) = self
1296                            .storage
1297                            .index_manager()
1298                            .composite_lookup(label, &key_props)
1299                            .await
1300                        {
1301                            optimized_vid = Some((vid, pattern_props));
1302                        }
1303                    }
1304                }
1305            }
1306
1307            if let Some((vid, _pattern_props)) = optimized_vid {
1308                // Optimized Path: Node found via index
1309                let mut writer = writer_lock.write().await;
1310
1311                let mut match_row = row.clone();
1312                if let PatternElement::Node(n) = &pattern.paths[0].elements[0]
1313                    && let Some(var) = &n.variable
1314                {
1315                    match_row.insert(var.clone(), Value::Int(vid.as_u64() as i64));
1316                }
1317
1318                let result = if let Some(set) = on_match {
1319                    self.execute_set_items_locked(
1320                        &set.items,
1321                        &mut match_row,
1322                        &mut writer,
1323                        prop_manager,
1324                        params,
1325                        ctx,
1326                        tx_l0_override,
1327                    )
1328                    .await
1329                } else {
1330                    Ok(())
1331                };
1332
1333                drop(writer);
1334                result?;
1335
1336                Self::bind_path_variables(&path_pattern, &mut match_row, &temp_vars);
1337                results.push(match_row);
1338            } else {
1339                // Fallback to standard execution
1340                let matches = self
1341                    .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1342                    .await?;
1343                let mut writer = writer_lock.write().await;
1344
1345                let result: Result<Vec<HashMap<String, Value>>> = async {
1346                    let mut batch = Vec::new();
1347                    if !matches.is_empty() {
1348                        for mut m in matches {
1349                            if let Some(set) = on_match {
1350                                self.execute_set_items_locked(
1351                                    &set.items,
1352                                    &mut m,
1353                                    &mut writer,
1354                                    prop_manager,
1355                                    params,
1356                                    ctx,
1357                                    tx_l0_override,
1358                                )
1359                                .await?;
1360                            }
1361                            Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1362                            batch.push(m);
1363                        }
1364                    } else {
1365                        self.execute_create_pattern(
1366                            &path_pattern,
1367                            &mut row,
1368                            &mut writer,
1369                            prop_manager,
1370                            params,
1371                            ctx,
1372                            tx_l0_override,
1373                        )
1374                        .await?;
1375                        if let Some(set) = on_create {
1376                            self.execute_set_items_locked(
1377                                &set.items,
1378                                &mut row,
1379                                &mut writer,
1380                                prop_manager,
1381                                params,
1382                                ctx,
1383                                tx_l0_override,
1384                            )
1385                            .await?;
1386                        }
1387                        Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1388                        batch.push(row);
1389                    }
1390                    Ok(batch)
1391                }
1392                .await;
1393
1394                drop(writer);
1395                results.extend(result?);
1396            }
1397        }
1398        Ok(results)
1399    }
1400
1401    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
1402    #[expect(clippy::too_many_arguments)]
1403    pub(crate) async fn execute_create_pattern(
1404        &self,
1405        pattern: &Pattern,
1406        row: &mut HashMap<String, Value>,
1407        writer: &mut Writer,
1408        prop_manager: &PropertyManager,
1409        params: &HashMap<String, Value>,
1410        ctx: Option<&QueryContext>,
1411        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1412    ) -> Result<()> {
1413        for path in &pattern.paths {
1414            let mut prev_vid: Option<Vid> = None;
1415            // (rel_var, type_id, type_name, props_expr, direction)
1416            type PendingRel = (String, u32, String, Option<Expr>, Direction);
1417            let mut rel_pending: Option<PendingRel> = None;
1418
1419            for element in &path.elements {
1420                match element {
1421                    PatternElement::Node(n) => {
1422                        let mut vid = None;
1423
1424                        // Check if node variable already bound in row
1425                        if let Some(var) = &n.variable
1426                            && let Some(val) = row.get(var)
1427                            && let Ok(existing_vid) = Self::vid_from_value(val)
1428                        {
1429                            vid = Some(existing_vid);
1430                        }
1431
1432                        // If not bound, create it
1433                        if vid.is_none() {
1434                            let mut props = HashMap::new();
1435                            if let Some(props_expr) = &n.properties {
1436                                let props_val = self
1437                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
1438                                    .await?;
1439                                if let Value::Map(map) = props_val {
1440                                    for (k, v) in map {
1441                                        props.insert(k, v);
1442                                    }
1443                                } else {
1444                                    return Err(anyhow!("Properties must evaluate to a map"));
1445                                }
1446                            }
1447
1448                            // Support unlabeled nodes and unknown labels (schemaless)
1449                            let schema = self.storage.schema_manager().schema();
1450
1451                            // VID generation is label-independent
1452                            let new_vid = writer.next_vid().await?;
1453
1454                            // Enrich with generated columns only for known labels
1455                            for label_name in &n.labels {
1456                                if schema.get_label_case_insensitive(label_name).is_some() {
1457                                    self.enrich_properties_with_generated_columns(
1458                                        label_name,
1459                                        &mut props,
1460                                        prop_manager,
1461                                        params,
1462                                        ctx,
1463                                    )
1464                                    .await?;
1465                                }
1466                            }
1467
1468                            // Insert vertex and get back final properties (includes auto-generated embeddings)
1469                            let final_props = writer
1470                                .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
1471                                .await?;
1472
1473                            // Build node object with final properties (includes embeddings)
1474                            if let Some(var) = &n.variable {
1475                                let mut obj = HashMap::new();
1476                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
1477                                let labels_list: Vec<Value> =
1478                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
1479                                obj.insert("_labels".to_string(), Value::List(labels_list));
1480                                for (k, v) in &final_props {
1481                                    obj.insert(k.clone(), v.clone());
1482                                }
1483                                // Store node as a Map with _vid, matching MATCH behavior
1484                                row.insert(var.clone(), Value::Map(obj));
1485                            }
1486                            vid = Some(new_vid);
1487                        }
1488
1489                        let current_vid = vid.unwrap();
1490
1491                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
1492                            rel_pending.take()
1493                            && let Some(src) = prev_vid
1494                        {
1495                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
1496
1497                            if !is_rel_bound {
1498                                let mut rel_props = HashMap::new();
1499                                if let Some(expr) = rel_props_expr {
1500                                    let val = self
1501                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
1502                                        .await?;
1503                                    if let Value::Map(map) = val {
1504                                        rel_props.extend(map);
1505                                    }
1506                                }
1507                                let eid = writer.next_eid(type_id).await?;
1508
1509                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
1510                                let (edge_src, edge_dst) = match dir {
1511                                    Direction::Incoming => (current_vid, src),
1512                                    _ => (src, current_vid),
1513                                };
1514
1515                                let store_props = !rel_var.is_empty();
1516                                let user_props = if store_props {
1517                                    rel_props.clone()
1518                                } else {
1519                                    HashMap::new()
1520                                };
1521
1522                                writer
1523                                    .insert_edge(
1524                                        edge_src,
1525                                        edge_dst,
1526                                        type_id,
1527                                        eid,
1528                                        rel_props,
1529                                        Some(type_name.clone()),
1530                                        tx_l0,
1531                                    )
1532                                    .await?;
1533
1534                                // Edge type name is now stored by insert_edge
1535
1536                                if store_props {
1537                                    let mut edge_map = HashMap::new();
1538                                    edge_map.insert(
1539                                        "_eid".to_string(),
1540                                        Value::Int(eid.as_u64() as i64),
1541                                    );
1542                                    edge_map.insert(
1543                                        "_src".to_string(),
1544                                        Value::Int(edge_src.as_u64() as i64),
1545                                    );
1546                                    edge_map.insert(
1547                                        "_dst".to_string(),
1548                                        Value::Int(edge_dst.as_u64() as i64),
1549                                    );
1550                                    edge_map
1551                                        .insert("_type".to_string(), Value::Int(type_id as i64));
1552                                    // Include user properties so downstream RETURN sees them
1553                                    for (k, v) in user_props {
1554                                        edge_map.insert(k, v);
1555                                    }
1556                                    row.insert(rel_var, Value::Map(edge_map));
1557                                }
1558                            }
1559                        }
1560                        prev_vid = Some(current_vid);
1561                    }
1562                    PatternElement::Relationship(r) => {
1563                        if r.types.len() != 1 {
1564                            return Err(anyhow!(
1565                                "CREATE relationship must specify exactly one type"
1566                            ));
1567                        }
1568                        let type_name = &r.types[0];
1569                        // Get or assign edge type ID (schemaless types get bit 31 = 1)
1570                        let type_id = self
1571                            .storage
1572                            .schema_manager()
1573                            .get_or_assign_edge_type_id(type_name);
1574
1575                        rel_pending = Some((
1576                            r.variable.clone().unwrap_or_default(),
1577                            type_id,
1578                            type_name.clone(),
1579                            r.properties.clone(),
1580                            r.direction.clone(),
1581                        ));
1582                    }
1583                    PatternElement::Parenthesized { .. } => {
1584                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
1585                    }
1586                }
1587            }
1588        }
1589        Ok(())
1590    }
1591
1592    /// Validates that a value is a valid property type per OpenCypher.
1593    /// Rejects maps, nodes, edges, paths, and lists containing those types or nested lists.
1594    /// Skips validation for CypherValue-typed properties which accept any value.
1595    fn validate_property_value(
1596        prop_name: &str,
1597        val: &Value,
1598        schema: &uni_common::core::schema::Schema,
1599        labels: &[String],
1600    ) -> Result<()> {
1601        // CypherValue-typed properties accept any value (including Maps)
1602        for label in labels {
1603            if let Some(props) = schema.properties.get(label)
1604                && let Some(prop_meta) = props.get(prop_name)
1605                && prop_meta.r#type == uni_common::core::schema::DataType::CypherValue
1606            {
1607                return Ok(());
1608            }
1609        }
1610
1611        match val {
1612            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
1613                anyhow::bail!(
1614                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1615                    prop_name
1616                );
1617            }
1618            Value::List(items) => {
1619                for item in items {
1620                    match item {
1621                        Value::Map(_)
1622                        | Value::Node(_)
1623                        | Value::Edge(_)
1624                        | Value::Path(_)
1625                        | Value::List(_) => {
1626                            anyhow::bail!(
1627                                "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1628                                prop_name
1629                            );
1630                        }
1631                        _ => {}
1632                    }
1633                }
1634            }
1635            _ => {}
1636        }
1637        Ok(())
1638    }
1639
1640    #[expect(clippy::too_many_arguments)]
1641    pub(crate) async fn execute_set_items_locked(
1642        &self,
1643        items: &[SetItem],
1644        row: &mut HashMap<String, Value>,
1645        writer: &mut Writer,
1646        prop_manager: &PropertyManager,
1647        params: &HashMap<String, Value>,
1648        ctx: Option<&QueryContext>,
1649        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1650    ) -> Result<()> {
1651        for item in items {
1652            match item {
1653                SetItem::Property { expr, value } => {
1654                    if let Expr::Property(var_expr, prop_name) = expr
1655                        && let Expr::Variable(var_name) = &**var_expr
1656                        && let Some(node_val) = row.get(var_name)
1657                    {
1658                        if let Ok(vid) = Self::vid_from_value(node_val) {
1659                            let labels =
1660                                Self::extract_labels_from_node(node_val).unwrap_or_default();
1661                            let schema = self.storage.schema_manager().schema().clone();
1662                            let mut props = prop_manager
1663                                .get_all_vertex_props_with_ctx(vid, ctx)
1664                                .await?
1665                                .unwrap_or_default();
1666                            let val = self
1667                                .evaluate_expr(value, row, prop_manager, params, ctx)
1668                                .await?;
1669                            Self::validate_property_value(prop_name, &val, &schema, &labels)?;
1670                            props.insert(prop_name.clone(), val.clone());
1671
1672                            // Enrich with generated columns
1673                            for label_name in &labels {
1674                                self.enrich_properties_with_generated_columns(
1675                                    label_name,
1676                                    &mut props,
1677                                    prop_manager,
1678                                    params,
1679                                    ctx,
1680                                )
1681                                .await?;
1682                            }
1683
1684                            let _ = writer
1685                                .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1686                                .await?;
1687
1688                            // Update the row object so subsequent RETURN sees the new value
1689                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1690                                node_map.insert(prop_name.clone(), val);
1691                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
1692                                node.properties.insert(prop_name.clone(), val);
1693                            }
1694                        } else if let Value::Map(map) = node_val
1695                            && map.get("_eid").is_some_and(|v| !v.is_null())
1696                            && map.get("_src").is_some_and(|v| !v.is_null())
1697                            && map.get("_dst").is_some_and(|v| !v.is_null())
1698                            && map.get("_type").is_some_and(|v| !v.is_null())
1699                        {
1700                            let ei = self.extract_edge_identity(map)?;
1701                            let schema = self.storage.schema_manager().schema().clone();
1702                            // Handle _type as either String or Int (Int from CREATE, String from queries)
1703                            let edge_type_name = match map.get("_type") {
1704                                Some(Value::String(s)) => s.clone(),
1705                                Some(Value::Int(id)) => schema
1706                                    .edge_type_name_by_id_unified(*id as u32)
1707                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
1708                                _ => String::new(),
1709                            };
1710
1711                            let mut props = prop_manager
1712                                .get_all_edge_props_with_ctx(ei.eid, ctx)
1713                                .await?
1714                                .unwrap_or_default();
1715                            let val = self
1716                                .evaluate_expr(value, row, prop_manager, params, ctx)
1717                                .await?;
1718                            Self::validate_property_value(
1719                                prop_name,
1720                                &val,
1721                                &schema,
1722                                std::slice::from_ref(&edge_type_name),
1723                            )?;
1724                            props.insert(prop_name.clone(), val.clone());
1725                            writer
1726                                .insert_edge(
1727                                    ei.src,
1728                                    ei.dst,
1729                                    ei.edge_type_id,
1730                                    ei.eid,
1731                                    props,
1732                                    Some(edge_type_name.clone()),
1733                                    tx_l0,
1734                                )
1735                                .await?;
1736
1737                            // Update the row object so subsequent RETURN sees the new value
1738                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1739                                edge_map.insert(prop_name.clone(), val);
1740                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1741                                edge.properties.insert(prop_name.clone(), val);
1742                            }
1743                        } else if let Value::Edge(edge) = node_val {
1744                            // Handle Value::Edge directly (when traverse returns Edge objects)
1745                            let eid = edge.eid;
1746                            let src = edge.src;
1747                            let dst = edge.dst;
1748                            let edge_type_name = edge.edge_type.clone();
1749                            let etype =
1750                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
1751                            let schema = self.storage.schema_manager().schema().clone();
1752
1753                            let mut props = prop_manager
1754                                .get_all_edge_props_with_ctx(eid, ctx)
1755                                .await?
1756                                .unwrap_or_default();
1757                            let val = self
1758                                .evaluate_expr(value, row, prop_manager, params, ctx)
1759                                .await?;
1760                            Self::validate_property_value(
1761                                prop_name,
1762                                &val,
1763                                &schema,
1764                                std::slice::from_ref(&edge_type_name),
1765                            )?;
1766                            props.insert(prop_name.clone(), val.clone());
1767                            writer
1768                                .insert_edge(
1769                                    src,
1770                                    dst,
1771                                    etype,
1772                                    eid,
1773                                    props,
1774                                    Some(edge_type_name.clone()),
1775                                    tx_l0,
1776                                )
1777                                .await?;
1778
1779                            // Update the row object so subsequent RETURN sees the new value
1780                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1781                                edge.properties.insert(prop_name.clone(), val);
1782                            }
1783                        }
1784                    }
1785                }
1786                SetItem::Labels { variable, labels } => {
1787                    if let Some(node_val) = row.get(variable)
1788                        && let Ok(vid) = Self::vid_from_value(node_val)
1789                    {
1790                        // Get current labels from node value
1791                        let current_labels =
1792                            Self::extract_labels_from_node(node_val).unwrap_or_default();
1793
1794                        // Determine new labels to add (skip duplicates)
1795                        let labels_to_add: Vec<_> = labels
1796                            .iter()
1797                            .filter(|l| !current_labels.contains(l))
1798                            .cloned()
1799                            .collect();
1800
1801                        if !labels_to_add.is_empty() {
1802                            // Add labels via L0Buffer (schemaless: accept any label name,
1803                            // matching CREATE behavior)
1804                            if let Some(ctx) = ctx {
1805                                ctx.l0.write().add_vertex_labels(vid, &labels_to_add);
1806                            }
1807
1808                            // Update the node value in the row with new labels
1809                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
1810                                let mut updated_labels = current_labels;
1811                                updated_labels.extend(labels_to_add);
1812                                let labels_list =
1813                                    updated_labels.into_iter().map(Value::String).collect();
1814                                obj.insert("_labels".to_string(), Value::List(labels_list));
1815                            }
1816                        }
1817                    }
1818                }
1819                SetItem::Variable { variable, value }
1820                | SetItem::VariablePlus { variable, value } => {
1821                    let replace = matches!(item, SetItem::Variable { .. });
1822                    let op_str = if replace { "=" } else { "+=" };
1823
1824                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
1825                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
1826                        continue;
1827                    }
1828                    let rhs = self
1829                        .evaluate_expr(value, row, prop_manager, params, ctx)
1830                        .await?;
1831                    let new_props =
1832                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
1833                            anyhow!(
1834                                "SET {} {} expr: right-hand side must evaluate to a map, \
1835                                 node, or relationship",
1836                                variable,
1837                                op_str
1838                            )
1839                        })?;
1840                    self.apply_properties_to_entity(
1841                        variable,
1842                        new_props,
1843                        replace,
1844                        row,
1845                        writer,
1846                        prop_manager,
1847                        params,
1848                        ctx,
1849                        tx_l0,
1850                    )
1851                    .await?;
1852                }
1853            }
1854        }
1855        Ok(())
1856    }
1857
1858    /// Execute REMOVE clause items (property removal or label removal).
1859    ///
1860    /// Property removals are batched per variable to avoid stale reads: when
1861    /// multiple properties of the same entity are removed in one REMOVE clause,
1862    /// we read from storage once, null all specified properties, and write back
1863    /// once. This prevents the second removal from reading stale data that
1864    /// doesn't reflect the first removal's L0 write.
1865    pub(crate) async fn execute_remove_items_locked(
1866        &self,
1867        items: &[RemoveItem],
1868        row: &mut HashMap<String, Value>,
1869        writer: &mut Writer,
1870        prop_manager: &PropertyManager,
1871        ctx: Option<&QueryContext>,
1872        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1873    ) -> Result<()> {
1874        // Collect property names to remove, grouped by variable.
1875        // Use Vec<(String, Vec<String>)> to preserve insertion order.
1876        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
1877
1878        for item in items {
1879            match item {
1880                RemoveItem::Property(expr) => {
1881                    if let Expr::Property(var_expr, prop_name) = expr
1882                        && let Expr::Variable(var_name) = &**var_expr
1883                    {
1884                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
1885                            entry.1.push(prop_name.clone());
1886                        } else {
1887                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
1888                        }
1889                    }
1890                }
1891                RemoveItem::Labels { variable, labels } => {
1892                    self.execute_remove_labels(variable, labels, row, ctx)?;
1893                }
1894            }
1895        }
1896
1897        // Execute batched property removals per variable.
1898        for (var_name, prop_names) in &prop_removals {
1899            let Some(node_val) = row.get(var_name) else {
1900                continue;
1901            };
1902
1903            if let Ok(vid) = Self::vid_from_value(node_val) {
1904                // Vertex property removal
1905                let mut props = prop_manager
1906                    .get_all_vertex_props_with_ctx(vid, ctx)
1907                    .await?
1908                    .unwrap_or_default();
1909
1910                // Only write back if at least one property actually exists
1911                let removed_count = prop_names
1912                    .iter()
1913                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1914                    .count();
1915                let any_exist = removed_count > 0;
1916                if any_exist {
1917                    writer.track_properties_removed(removed_count, tx_l0);
1918                    for prop_name in prop_names {
1919                        props.insert(prop_name.clone(), Value::Null);
1920                    }
1921                }
1922                // Compute effective properties (post-removal) for _all_props
1923                let effective: HashMap<String, Value> = props
1924                    .iter()
1925                    .filter(|(_, v)| !v.is_null())
1926                    .map(|(k, v)| (k.clone(), v.clone()))
1927                    .collect();
1928                if any_exist {
1929                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
1930                    let _ = writer
1931                        .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1932                        .await?;
1933                }
1934
1935                // Update the row map: set removed props to Null
1936                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1937                    for prop_name in prop_names {
1938                        node_map.insert(prop_name.clone(), Value::Null);
1939                    }
1940                    // Set _all_props to the complete effective property set
1941                    node_map.insert("_all_props".to_string(), Value::Map(effective));
1942                }
1943            } else if let Value::Map(map) = node_val {
1944                // Edge property removal (map-encoded)
1945                // Check for non-null _eid to skip OPTIONAL MATCH null edges
1946                let mut edge_effective: Option<HashMap<String, Value>> = None;
1947                if map.get("_eid").is_some_and(|v| !v.is_null()) {
1948                    let ei = self.extract_edge_identity(map)?;
1949                    let mut props = prop_manager
1950                        .get_all_edge_props_with_ctx(ei.eid, ctx)
1951                        .await?
1952                        .unwrap_or_default();
1953
1954                    let removed_count = prop_names
1955                        .iter()
1956                        .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1957                        .count();
1958                    let any_exist = removed_count > 0;
1959                    if any_exist {
1960                        writer.track_properties_removed(removed_count, tx_l0);
1961                        for prop_name in prop_names {
1962                            props.insert(prop_name.to_string(), Value::Null);
1963                        }
1964                    }
1965                    // Compute effective properties (post-removal) for _all_props
1966                    edge_effective = Some(
1967                        props
1968                            .iter()
1969                            .filter(|(_, v)| !v.is_null())
1970                            .map(|(k, v)| (k.clone(), v.clone()))
1971                            .collect(),
1972                    );
1973                    if any_exist {
1974                        let edge_type_name = map
1975                            .get("_type")
1976                            .and_then(|v| v.as_str())
1977                            .map(|s| s.to_string())
1978                            .or_else(|| {
1979                                self.storage
1980                                    .schema_manager()
1981                                    .edge_type_name_by_id_unified(ei.edge_type_id)
1982                            });
1983                        writer
1984                            .insert_edge(
1985                                ei.src,
1986                                ei.dst,
1987                                ei.edge_type_id,
1988                                ei.eid,
1989                                props,
1990                                edge_type_name,
1991                                tx_l0,
1992                            )
1993                            .await?;
1994                    }
1995                }
1996
1997                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1998                    for prop_name in prop_names {
1999                        edge_map.insert(prop_name.clone(), Value::Null);
2000                    }
2001                    if let Some(effective) = edge_effective {
2002                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
2003                    }
2004                }
2005            } else if let Value::Edge(edge) = node_val {
2006                // Edge property removal (Value::Edge)
2007                let eid = edge.eid;
2008                let src = edge.src;
2009                let dst = edge.dst;
2010                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
2011
2012                let mut props = prop_manager
2013                    .get_all_edge_props_with_ctx(eid, ctx)
2014                    .await?
2015                    .unwrap_or_default();
2016
2017                let removed_count = prop_names
2018                    .iter()
2019                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2020                    .count();
2021                if removed_count > 0 {
2022                    writer.track_properties_removed(removed_count, tx_l0);
2023                    for prop_name in prop_names {
2024                        props.insert(prop_name.to_string(), Value::Null);
2025                    }
2026                    writer
2027                        .insert_edge(
2028                            src,
2029                            dst,
2030                            etype,
2031                            eid,
2032                            props,
2033                            Some(edge.edge_type.clone()),
2034                            tx_l0,
2035                        )
2036                        .await?;
2037                }
2038
2039                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2040                    for prop_name in prop_names {
2041                        edge.properties.insert(prop_name.to_string(), Value::Null);
2042                    }
2043                }
2044            }
2045        }
2046
2047        Ok(())
2048    }
2049
2050    /// Execute label removal.
2051    pub(crate) fn execute_remove_labels(
2052        &self,
2053        variable: &str,
2054        labels: &[String],
2055        row: &mut HashMap<String, Value>,
2056        ctx: Option<&QueryContext>,
2057    ) -> Result<()> {
2058        if let Some(node_val) = row.get(variable)
2059            && let Ok(vid) = Self::vid_from_value(node_val)
2060        {
2061            // Get current labels from node value
2062            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2063
2064            // Determine which labels to actually remove (only those currently present)
2065            let labels_to_remove: Vec<_> = labels
2066                .iter()
2067                .filter(|l| current_labels.contains(l))
2068                .collect();
2069
2070            if !labels_to_remove.is_empty() {
2071                // Remove labels via L0Buffer
2072                if let Some(ctx) = ctx {
2073                    let mut l0 = ctx.l0.write();
2074                    for label in &labels_to_remove {
2075                        l0.remove_vertex_label(vid, label);
2076                    }
2077                }
2078
2079                // Update the node value in the row with remaining labels
2080                if let Some(Value::Map(obj)) = row.get_mut(variable) {
2081                    let remaining_labels: Vec<_> = current_labels
2082                        .iter()
2083                        .filter(|l| !labels_to_remove.contains(l))
2084                        .cloned()
2085                        .collect();
2086                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
2087                    obj.insert("_labels".to_string(), Value::List(labels_list));
2088                }
2089            }
2090        }
2091        Ok(())
2092    }
2093
2094    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
2095    /// by looking up the type from the L0 buffer's edge endpoints.
2096    fn resolve_edge_type_id_for_edge(
2097        &self,
2098        edge: &crate::types::Edge,
2099        writer: &Writer,
2100        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2101    ) -> Result<u32> {
2102        if !edge.edge_type.is_empty() {
2103            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
2104        }
2105        // Edge type name is empty (e.g., from anonymous MATCH patterns).
2106        // Look up the edge type ID from the L0 buffer's edge endpoints.
2107        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
2108            return Ok(etype);
2109        }
2110        Err(anyhow!(
2111            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
2112            edge.eid
2113        ))
2114    }
2115
2116    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
2117    pub(crate) async fn execute_delete_item_locked(
2118        &self,
2119        val: &Value,
2120        detach: bool,
2121        writer: &mut Writer,
2122        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2123    ) -> Result<()> {
2124        match val {
2125            Value::Null => {
2126                // DELETE null is a no-op per OpenCypher spec
2127            }
2128            Value::Path(path) => {
2129                // Delete path edges first, then nodes
2130                for edge in &path.edges {
2131                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2132                    writer
2133                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2134                        .await?;
2135                }
2136                for node in &path.nodes {
2137                    self.execute_delete_vertex(
2138                        node.vid,
2139                        detach,
2140                        Some(node.labels.clone()),
2141                        writer,
2142                        tx_l0,
2143                    )
2144                    .await?;
2145                }
2146            }
2147            _ => {
2148                // Try Path reconstruction from Map first (Arrow loses Path type)
2149                if let Ok(path) = Path::try_from(val) {
2150                    for edge in &path.edges {
2151                        let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2152                        writer
2153                            .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2154                            .await?;
2155                    }
2156                    for node in &path.nodes {
2157                        self.execute_delete_vertex(
2158                            node.vid,
2159                            detach,
2160                            Some(node.labels.clone()),
2161                            writer,
2162                            tx_l0,
2163                        )
2164                        .await?;
2165                    }
2166                } else if let Ok(vid) = Self::vid_from_value(val) {
2167                    let labels = Self::extract_labels_from_node(val);
2168                    self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
2169                        .await?;
2170                } else if let Value::Map(map) = val {
2171                    self.execute_delete_edge_from_map(map, writer, tx_l0)
2172                        .await?;
2173                } else if let Value::Edge(edge) = val {
2174                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2175                    writer
2176                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2177                        .await?;
2178                }
2179            }
2180        }
2181        Ok(())
2182    }
2183
2184    /// Execute vertex deletion with optional detach.
2185    pub(crate) async fn execute_delete_vertex(
2186        &self,
2187        vid: Vid,
2188        detach: bool,
2189        labels: Option<Vec<String>>,
2190        writer: &mut Writer,
2191        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2192    ) -> Result<()> {
2193        if detach {
2194            self.detach_delete_vertex(vid, writer, tx_l0).await?;
2195        } else {
2196            self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
2197        }
2198        writer.delete_vertex(vid, labels, tx_l0).await?;
2199        Ok(())
2200    }
2201
2202    /// Check that a vertex has no edges (required for non-DETACH DELETE).
2203    ///
2204    /// Loads the subgraph from storage, then excludes edges that have been
2205    /// tombstoned in the writer's L0 or the transaction's L0. This ensures
2206    /// edges deleted earlier in the same DELETE clause are properly excluded.
2207    pub(crate) async fn check_vertex_has_no_edges(
2208        &self,
2209        vid: Vid,
2210        writer: &Writer,
2211        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2212    ) -> Result<()> {
2213        let schema = self.storage.schema_manager().schema();
2214        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
2215
2216        // Collect tombstoned edge IDs from both the writer L0 and tx L0.
2217        let mut tombstoned_eids = std::collections::HashSet::new();
2218        {
2219            let writer_l0 = writer.l0_manager.get_current();
2220            let guard = writer_l0.read();
2221            for &eid in guard.tombstones.keys() {
2222                tombstoned_eids.insert(eid);
2223            }
2224        }
2225        if let Some(tx) = tx_l0 {
2226            let guard = tx.read();
2227            for &eid in guard.tombstones.keys() {
2228                tombstoned_eids.insert(eid);
2229            }
2230        }
2231
2232        let out_graph = self
2233            .storage
2234            .load_subgraph_cached(
2235                &[vid],
2236                &edge_type_ids,
2237                1,
2238                uni_store::runtime::Direction::Outgoing,
2239                Some(writer.l0_manager.get_current()),
2240            )
2241            .await?;
2242        let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2243
2244        let in_graph = self
2245            .storage
2246            .load_subgraph_cached(
2247                &[vid],
2248                &edge_type_ids,
2249                1,
2250                uni_store::runtime::Direction::Incoming,
2251                Some(writer.l0_manager.get_current()),
2252            )
2253            .await?;
2254        let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2255
2256        if has_out || has_in {
2257            return Err(anyhow!(
2258                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
2259                vid
2260            ));
2261        }
2262        Ok(())
2263    }
2264
2265    /// Execute edge deletion from a map representation.
2266    pub(crate) async fn execute_delete_edge_from_map(
2267        &self,
2268        map: &HashMap<String, Value>,
2269        writer: &mut Writer,
2270        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2271    ) -> Result<()> {
2272        // Check for non-null _eid to skip OPTIONAL MATCH null edges
2273        if map.get("_eid").is_some_and(|v| !v.is_null()) {
2274            let ei = self.extract_edge_identity(map)?;
2275            writer
2276                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
2277                .await?;
2278        }
2279        Ok(())
2280    }
2281
2282    /// Build a scan plan node.
2283    ///
2284    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
2285    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
2286    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
2287    fn make_scan_plan(
2288        label_id: u16,
2289        labels: Vec<String>,
2290        variable: String,
2291        filter: Option<Expr>,
2292    ) -> LogicalPlan {
2293        if label_id > 0 {
2294            LogicalPlan::Scan {
2295                label_id,
2296                labels,
2297                variable,
2298                filter,
2299                optional: false,
2300            }
2301        } else if !labels.is_empty() {
2302            // Schemaless label: use ScanMainByLabels to filter by label name
2303            LogicalPlan::ScanMainByLabels {
2304                labels,
2305                variable,
2306                filter,
2307                optional: false,
2308            }
2309        } else {
2310            LogicalPlan::ScanAll {
2311                variable,
2312                filter,
2313                optional: false,
2314            }
2315        }
2316    }
2317
2318    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
2319    /// already contains prior operators.
2320    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
2321        if matches!(plan, LogicalPlan::Empty) {
2322            scan
2323        } else {
2324            LogicalPlan::CrossJoin {
2325                left: Box::new(plan),
2326                right: Box::new(scan),
2327            }
2328        }
2329    }
2330
2331    /// Resolve MERGE property map expressions against the current row context.
2332    ///
2333    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
2334    /// property expressions that reference bound variables. These need to be
2335    /// evaluated to concrete literal values before being converted to filter
2336    /// expressions by `properties_to_expr()`.
2337    async fn resolve_merge_properties(
2338        &self,
2339        properties: &Option<Expr>,
2340        row: &HashMap<String, Value>,
2341        prop_manager: &PropertyManager,
2342        params: &HashMap<String, Value>,
2343        ctx: Option<&QueryContext>,
2344    ) -> Result<Option<Expr>> {
2345        let entries = match properties {
2346            Some(Expr::Map(entries)) => entries,
2347            other => return Ok(other.clone()),
2348        };
2349        let mut resolved = Vec::new();
2350        for (key, val_expr) in entries {
2351            if matches!(val_expr, Expr::Literal(_)) {
2352                resolved.push((key.clone(), val_expr.clone()));
2353            } else {
2354                let value = self
2355                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
2356                    .await?;
2357                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
2358            }
2359        }
2360        Ok(Some(Expr::Map(resolved)))
2361    }
2362
2363    /// Convert a runtime Value back to an AST literal expression.
2364    fn value_to_literal_expr(value: &Value) -> Expr {
2365        match value {
2366            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
2367            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
2368            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
2369            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
2370            Value::Null => Expr::Literal(CypherLiteral::Null),
2371            Value::List(items) => {
2372                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
2373            }
2374            Value::Map(entries) => Expr::Map(
2375                entries
2376                    .iter()
2377                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
2378                    .collect(),
2379            ),
2380            _ => Expr::Literal(CypherLiteral::Null),
2381        }
2382    }
2383
2384    pub(crate) async fn execute_merge_match(
2385        &self,
2386        pattern: &Pattern,
2387        row: &HashMap<String, Value>,
2388        prop_manager: &PropertyManager,
2389        params: &HashMap<String, Value>,
2390        ctx: Option<&QueryContext>,
2391    ) -> Result<Vec<HashMap<String, Value>>> {
2392        // Construct a LogicalPlan for the MATCH part of MERGE
2393        let planner =
2394            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
2395
2396        // We need to construct a CypherQuery to use the planner's plan() method,
2397        // or we can manually construct the LogicalPlan.
2398        // Manual construction is safer as we don't have to round-trip through AST.
2399
2400        let mut plan = LogicalPlan::Empty;
2401        let mut vars_in_scope = Vec::new();
2402
2403        // Add existing bound variables from row to scope
2404        for key in row.keys() {
2405            vars_in_scope.push(key.clone());
2406        }
2407
2408        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
2409        for path in &pattern.paths {
2410            let elements = &path.elements;
2411            let mut i = 0;
2412            while i < elements.len() {
2413                let part = &elements[i];
2414                match part {
2415                    PatternElement::Node(n) => {
2416                        let variable = n.variable.clone().unwrap_or_default();
2417
2418                        // If variable is already bound in the input row, we filter
2419                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
2420
2421                        if is_bound {
2422                            // If bound, we must Scan this specific VID to start the chain
2423                            // Extract VID from row
2424                            let val = row.get(&variable).unwrap();
2425                            let vid = Self::vid_from_value(val)?;
2426
2427                            // In the new storage model, VIDs don't embed label info.
2428                            // We get label from the node value if available, otherwise use 0 to scan all.
2429                            let extracted_labels =
2430                                Self::extract_labels_from_node(val).unwrap_or_default();
2431                            let label_id = {
2432                                let schema = self.storage.schema_manager().schema();
2433                                extracted_labels
2434                                    .first()
2435                                    .and_then(|l| schema.label_id_by_name(l))
2436                                    .unwrap_or(0)
2437                            };
2438
2439                            let resolved_props = self
2440                                .resolve_merge_properties(
2441                                    &n.properties,
2442                                    row,
2443                                    prop_manager,
2444                                    params,
2445                                    ctx,
2446                                )
2447                                .await?;
2448                            let prop_filter =
2449                                planner.properties_to_expr(&variable, &resolved_props);
2450
2451                            // Create a filter expression for VID: variable._vid = vid
2452                            // But our expression engine handles `Expr::Variable` as column.
2453                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
2454                            // Or we use internal property `_vid`.
2455
2456                            // Note: Scan supports `filter`.
2457                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
2458
2459                            let vid_filter = Expr::BinaryOp {
2460                                left: Box::new(Expr::Property(
2461                                    Box::new(Expr::Variable(variable.clone())),
2462                                    "_vid".to_string(),
2463                                )),
2464                                op: BinaryOp::Eq,
2465                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
2466                                    vid.as_u64() as i64,
2467                                ))),
2468                            };
2469
2470                            let combined_filter = if let Some(pf) = prop_filter {
2471                                Some(Expr::BinaryOp {
2472                                    left: Box::new(vid_filter),
2473                                    op: BinaryOp::And,
2474                                    right: Box::new(pf),
2475                                })
2476                            } else {
2477                                Some(vid_filter)
2478                            };
2479
2480                            let scan = Self::make_scan_plan(
2481                                label_id,
2482                                extracted_labels,
2483                                variable.clone(),
2484                                combined_filter,
2485                            );
2486                            plan = Self::attach_scan(plan, scan);
2487                        } else {
2488                            let label_id = if n.labels.is_empty() {
2489                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
2490                                0
2491                            } else {
2492                                let label_name = &n.labels[0];
2493                                let schema = self.storage.schema_manager().schema();
2494                                // Fall back to label_id 0 (any/schemaless) when the label is not
2495                                // in the schema — this allows MERGE to work in schemaless mode.
2496                                schema
2497                                    .get_label_case_insensitive(label_name)
2498                                    .map(|m| m.id)
2499                                    .unwrap_or(0)
2500                            };
2501
2502                            let resolved_props = self
2503                                .resolve_merge_properties(
2504                                    &n.properties,
2505                                    row,
2506                                    prop_manager,
2507                                    params,
2508                                    ctx,
2509                                )
2510                                .await?;
2511                            let prop_filter =
2512                                planner.properties_to_expr(&variable, &resolved_props);
2513                            let scan = Self::make_scan_plan(
2514                                label_id,
2515                                n.labels.clone(),
2516                                variable.clone(),
2517                                prop_filter,
2518                            );
2519                            plan = Self::attach_scan(plan, scan);
2520
2521                            // Add label filters when:
2522                            // 1. Multiple labels with a known schema label: filter for
2523                            //    additional labels (Scan only scans by the first label).
2524                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
2525                            //    nodes, so we must filter to only those with the
2526                            //    specified label(s).
2527                            if !n.labels.is_empty()
2528                                && !variable.is_empty()
2529                                && (label_id == 0 || n.labels.len() > 1)
2530                                && let Some(label_filter) =
2531                                    planner.node_filter_expr(&variable, &n.labels, &None)
2532                            {
2533                                plan = LogicalPlan::Filter {
2534                                    input: Box::new(plan),
2535                                    predicate: label_filter,
2536                                    optional_variables: std::collections::HashSet::new(),
2537                                };
2538                            }
2539
2540                            if !variable.is_empty() {
2541                                vars_in_scope.push(variable.clone());
2542                            }
2543                        }
2544
2545                        // Now look ahead for relationship
2546                        i += 1;
2547                        while i < elements.len() {
2548                            if let PatternElement::Relationship(r) = &elements[i] {
2549                                let target_node_part = &elements[i + 1];
2550                                if let PatternElement::Node(n_target) = target_node_part {
2551                                    let schema = self.storage.schema_manager().schema();
2552                                    let mut edge_type_ids = Vec::new();
2553
2554                                    if r.types.is_empty() {
2555                                        return Err(anyhow!("MERGE edge must have a type"));
2556                                    } else if r.types.len() > 1 {
2557                                        return Err(anyhow!(
2558                                            "MERGE does not support multiple edge types"
2559                                        ));
2560                                    } else {
2561                                        let type_name = &r.types[0];
2562                                        // Use get_or_assign so schemaless edge types work without
2563                                        // a prior schema declaration (same approach as CREATE).
2564                                        let type_id = self
2565                                            .storage
2566                                            .schema_manager()
2567                                            .get_or_assign_edge_type_id(type_name);
2568                                        edge_type_ids.push(type_id);
2569                                    }
2570
2571                                    // Resolve target label ID. For schemaless labels (not in the
2572                                    // schema), fall back to 0 which means "any label" in traversal.
2573                                    let target_label_id: u16 = if let Some(lbl) =
2574                                        n_target.labels.first()
2575                                    {
2576                                        schema
2577                                            .get_label_case_insensitive(lbl)
2578                                            .map(|m| m.id)
2579                                            .unwrap_or(0)
2580                                    } else if let Some(var) = &n_target.variable {
2581                                        if let Some(val) = row.get(var) {
2582                                            // In the new storage model, get labels from node value
2583                                            if let Some(labels) =
2584                                                Self::extract_labels_from_node(val)
2585                                            {
2586                                                if let Some(first_label) = labels.first() {
2587                                                    schema
2588                                                        .get_label_case_insensitive(first_label)
2589                                                        .map(|m| m.id)
2590                                                        .unwrap_or(0)
2591                                                } else {
2592                                                    // Bound node with no labels — schemaless, any
2593                                                    0
2594                                                }
2595                                            } else if Self::vid_from_value(val).is_ok() {
2596                                                // VID without label info — schemaless, any
2597                                                0
2598                                            } else {
2599                                                return Err(anyhow!(
2600                                                    "Variable {} is not a node",
2601                                                    var
2602                                                ));
2603                                            }
2604                                        } else {
2605                                            return Err(anyhow!(
2606                                                "MERGE pattern node must have a label or be a bound variable"
2607                                            ));
2608                                        }
2609                                    } else {
2610                                        return Err(anyhow!(
2611                                            "MERGE pattern node must have a label"
2612                                        ));
2613                                    };
2614
2615                                    let target_variable =
2616                                        n_target.variable.clone().unwrap_or_default();
2617                                    let source_variable = match &elements[i - 1] {
2618                                        PatternElement::Node(n) => {
2619                                            n.variable.clone().unwrap_or_default()
2620                                        }
2621                                        _ => String::new(),
2622                                    };
2623
2624                                    let is_variable_length = r.range.is_some();
2625                                    let type_name = &r.types[0];
2626
2627                                    // Use TraverseMainByType for schemaless edge types
2628                                    // (same as MATCH planner) so edge properties are loaded
2629                                    // correctly from storage + L0 via the adjacency map.
2630                                    // Regular Traverse only loads properties via
2631                                    // property_manager which doesn't handle schemaless types.
2632                                    let is_schemaless = edge_type_ids.iter().all(|id| {
2633                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
2634                                    });
2635
2636                                    if is_schemaless {
2637                                        plan = LogicalPlan::TraverseMainByType {
2638                                            type_names: vec![type_name.clone()],
2639                                            input: Box::new(plan),
2640                                            direction: r.direction.clone(),
2641                                            source_variable,
2642                                            target_variable: target_variable.clone(),
2643                                            step_variable: r.variable.clone(),
2644                                            min_hops: r
2645                                                .range
2646                                                .as_ref()
2647                                                .and_then(|r| r.min)
2648                                                .unwrap_or(1)
2649                                                as usize,
2650                                            max_hops: r
2651                                                .range
2652                                                .as_ref()
2653                                                .and_then(|r| r.max)
2654                                                .unwrap_or(1)
2655                                                as usize,
2656                                            optional: false,
2657                                            target_filter: None,
2658                                            path_variable: None,
2659                                            is_variable_length,
2660                                            optional_pattern_vars: std::collections::HashSet::new(),
2661                                            scope_match_variables: std::collections::HashSet::new(),
2662                                            edge_filter_expr: None,
2663                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2664                                        };
2665                                    } else {
2666                                        // Collect edge property names needed for MERGE filter
2667                                        let mut edge_props = std::collections::HashSet::new();
2668                                        if let Some(Expr::Map(entries)) = &r.properties {
2669                                            for (key, _) in entries {
2670                                                edge_props.insert(key.clone());
2671                                            }
2672                                        }
2673                                        plan = LogicalPlan::Traverse {
2674                                            input: Box::new(plan),
2675                                            edge_type_ids: edge_type_ids.clone(),
2676                                            direction: r.direction.clone(),
2677                                            source_variable,
2678                                            target_variable: target_variable.clone(),
2679                                            target_label_id,
2680                                            step_variable: r.variable.clone(),
2681                                            min_hops: r
2682                                                .range
2683                                                .as_ref()
2684                                                .and_then(|r| r.min)
2685                                                .unwrap_or(1)
2686                                                as usize,
2687                                            max_hops: r
2688                                                .range
2689                                                .as_ref()
2690                                                .and_then(|r| r.max)
2691                                                .unwrap_or(1)
2692                                                as usize,
2693                                            optional: false,
2694                                            target_filter: None,
2695                                            path_variable: None,
2696                                            edge_properties: edge_props,
2697                                            is_variable_length,
2698                                            optional_pattern_vars: std::collections::HashSet::new(),
2699                                            scope_match_variables: std::collections::HashSet::new(),
2700                                            edge_filter_expr: None,
2701                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2702                                            qpp_steps: None,
2703                                        };
2704                                    }
2705
2706                                    // Apply property filters for relationship
2707                                    if r.properties.is_some()
2708                                        && let Some(r_var) = &r.variable
2709                                    {
2710                                        let resolved_rel_props = self
2711                                            .resolve_merge_properties(
2712                                                &r.properties,
2713                                                row,
2714                                                prop_manager,
2715                                                params,
2716                                                ctx,
2717                                            )
2718                                            .await?;
2719                                        if let Some(prop_filter) =
2720                                            planner.properties_to_expr(r_var, &resolved_rel_props)
2721                                        {
2722                                            plan = LogicalPlan::Filter {
2723                                                input: Box::new(plan),
2724                                                predicate: prop_filter,
2725                                                optional_variables: std::collections::HashSet::new(
2726                                                ),
2727                                            };
2728                                        }
2729                                    }
2730
2731                                    // Apply property filters for target node if it was new
2732                                    if !target_variable.is_empty() {
2733                                        let resolved_target_props = self
2734                                            .resolve_merge_properties(
2735                                                &n_target.properties,
2736                                                row,
2737                                                prop_manager,
2738                                                params,
2739                                                ctx,
2740                                            )
2741                                            .await?;
2742                                        if let Some(prop_filter) = planner.properties_to_expr(
2743                                            &target_variable,
2744                                            &resolved_target_props,
2745                                        ) {
2746                                            plan = LogicalPlan::Filter {
2747                                                input: Box::new(plan),
2748                                                predicate: prop_filter,
2749                                                optional_variables: std::collections::HashSet::new(
2750                                                ),
2751                                            };
2752                                        }
2753                                        vars_in_scope.push(target_variable.clone());
2754                                    }
2755
2756                                    if let Some(sv) = &r.variable {
2757                                        vars_in_scope.push(sv.clone());
2758                                    }
2759                                    i += 2;
2760                                } else {
2761                                    break;
2762                                }
2763                            } else {
2764                                break;
2765                            }
2766                        }
2767                    }
2768                    _ => return Err(anyhow!("Pattern must start with a node")),
2769                }
2770            }
2771
2772            // Execute the plan to find all matches, then filter against bound variables in `row`.
2773        }
2774
2775        let db_matches = self
2776            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
2777            .await?;
2778
2779        // Keep only DB results that are consistent with the input row bindings.
2780        // Skip internal keys (starting with "__") as they are implementation
2781        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
2782        // Also skip the empty-string key (""), which is the placeholder variable
2783        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
2784        // and must not constrain the current pattern's match.
2785        let final_matches = db_matches
2786            .into_iter()
2787            .filter(|db_match| {
2788                row.iter().all(|(key, val)| {
2789                    if key.is_empty() || key.starts_with("__") {
2790                        return true;
2791                    }
2792                    let Some(db_val) = db_match.get(key) else {
2793                        return true;
2794                    };
2795                    if db_val == val {
2796                        return true;
2797                    }
2798                    // Values differ -- treat as consistent if they represent the same VID
2799                    matches!(
2800                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
2801                        (Ok(v1), Ok(v2)) if v1 == v2
2802                    )
2803                })
2804            })
2805            .map(|db_match| {
2806                let mut merged = row.clone();
2807                merged.extend(db_match);
2808                merged
2809            })
2810            .collect();
2811
2812        Ok(final_matches)
2813    }
2814
2815    /// Prepare a MERGE pattern for path variable binding.
2816    ///
2817    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
2818    /// unnamed relationships need internal variable names so that `execute_create_pattern`
2819    /// stores the edge data in the row for later path construction.
2820    ///
2821    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
2822    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
2823        let has_path_vars = pattern
2824            .paths
2825            .iter()
2826            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
2827
2828        if !has_path_vars {
2829            return (pattern.clone(), Vec::new());
2830        }
2831
2832        let mut modified = pattern.clone();
2833        let mut temp_vars = Vec::new();
2834
2835        for path in &mut modified.paths {
2836            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
2837                continue;
2838            }
2839            for (idx, element) in path.elements.iter_mut().enumerate() {
2840                if let PatternElement::Relationship(r) = element
2841                    && r.variable.as_ref().is_none_or(String::is_empty)
2842                {
2843                    let temp_var = format!("__path_r_{}", idx);
2844                    r.variable = Some(temp_var.clone());
2845                    temp_vars.push(temp_var);
2846                }
2847            }
2848        }
2849
2850        (modified, temp_vars)
2851    }
2852
2853    /// Bind path variables in the result row based on the MERGE pattern.
2854    ///
2855    /// Walks each path in the pattern, collects node/edge values from the row
2856    /// by variable name, and constructs a `Value::Path`.
2857    fn bind_path_variables(
2858        pattern: &Pattern,
2859        row: &mut HashMap<String, Value>,
2860        temp_vars: &[String],
2861    ) {
2862        for path in &pattern.paths {
2863            let Some(path_var) = path.variable.as_ref() else {
2864                continue;
2865            };
2866            if path_var.is_empty() {
2867                continue;
2868            }
2869
2870            let mut nodes = Vec::new();
2871            let mut edges = Vec::new();
2872
2873            for element in &path.elements {
2874                match element {
2875                    PatternElement::Node(n) => {
2876                        if let Some(var) = &n.variable
2877                            && let Some(val) = row.get(var)
2878                            && let Some(node) = Self::value_to_node_for_path(val)
2879                        {
2880                            nodes.push(node);
2881                        }
2882                    }
2883                    PatternElement::Relationship(r) => {
2884                        if let Some(var) = &r.variable
2885                            && let Some(val) = row.get(var)
2886                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
2887                        {
2888                            edges.push(edge);
2889                        }
2890                    }
2891                    _ => {}
2892                }
2893            }
2894
2895            if !nodes.is_empty() {
2896                use uni_common::value::Path;
2897                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
2898            }
2899        }
2900
2901        // Clean up internal temp variables
2902        for var in temp_vars {
2903            row.remove(var);
2904        }
2905    }
2906
2907    /// Convert a Value (Map or Node) to a Node for path construction.
2908    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
2909        match val {
2910            Value::Node(n) => Some(n.clone()),
2911            Value::Map(map) => {
2912                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
2913                let labels = if let Some(Value::List(l)) = map.get("_labels") {
2914                    l.iter()
2915                        .filter_map(|v| {
2916                            if let Value::String(s) = v {
2917                                Some(s.clone())
2918                            } else {
2919                                None
2920                            }
2921                        })
2922                        .collect()
2923                } else {
2924                    vec![]
2925                };
2926                let properties: HashMap<String, Value> = map
2927                    .iter()
2928                    .filter(|(k, _)| !k.starts_with('_'))
2929                    .map(|(k, v)| (k.clone(), v.clone()))
2930                    .collect();
2931                Some(uni_common::value::Node {
2932                    vid,
2933                    labels,
2934                    properties,
2935                })
2936            }
2937            _ => None,
2938        }
2939    }
2940
2941    /// Convert a Value (Map or Edge) to an Edge for path construction.
2942    fn value_to_edge_for_path(
2943        val: &Value,
2944        type_names: &[String],
2945    ) -> Option<uni_common::value::Edge> {
2946        match val {
2947            Value::Edge(e) => Some(e.clone()),
2948            Value::Map(map) => {
2949                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
2950                let edge_type = map
2951                    .get("_type_name")
2952                    .and_then(|v| {
2953                        if let Value::String(s) = v {
2954                            Some(s.clone())
2955                        } else {
2956                            None
2957                        }
2958                    })
2959                    .or_else(|| type_names.first().cloned())
2960                    .unwrap_or_default();
2961                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
2962                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
2963                let properties: HashMap<String, Value> = map
2964                    .iter()
2965                    .filter(|(k, _)| !k.starts_with('_'))
2966                    .map(|(k, v)| (k.clone(), v.clone()))
2967                    .collect();
2968                Some(uni_common::value::Edge {
2969                    eid,
2970                    edge_type,
2971                    src,
2972                    dst,
2973                    properties,
2974                })
2975            }
2976            _ => None,
2977        }
2978    }
2979}