Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::planner::LogicalPlan;
6use anyhow::{Result, anyhow};
7use std::collections::HashMap;
8use std::sync::Arc;
9use uni_common::DataType;
10use uni_common::core::id::{Eid, Vid};
11use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
12use uni_common::{Path, Value};
13use uni_cypher::ast::{
14    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
15    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
16    DropEdgeType, DropLabel, Expr, Pattern, PatternElement, RemoveItem, SetClause, SetItem,
17};
18use uni_store::QueryContext;
19use uni_store::runtime::property_manager::PropertyManager;
20use uni_store::runtime::writer::Writer;
21
22/// Identity fields extracted from a map-encoded edge.
23struct EdgeIdentity {
24    eid: Eid,
25    src: Vid,
26    dst: Vid,
27    edge_type_id: u32,
28}
29
30impl Executor {
31    /// Extracts labels from a node value.
32    ///
33    /// Handles both `Value::Map` (with a `_labels` list field) and
34    /// `Value::Node` (with a `labels` vec field).
35    ///
36    /// Returns `None` when the value is not a node or has no labels.
37    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
38        match node_val {
39            Value::Map(map) => {
40                // Map-encoded node: look for _labels array
41                if let Some(Value::List(labels_arr)) = map.get("_labels") {
42                    let labels: Vec<String> = labels_arr
43                        .iter()
44                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
45                        .collect();
46                    if !labels.is_empty() {
47                        return Some(labels);
48                    }
49                }
50                None
51            }
52            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
53            _ => None,
54        }
55    }
56
57    /// Extracts user-visible properties from a value that represents a node or edge.
58    ///
59    /// Strips internal bookkeeping keys (those prefixed with `_` or named
60    /// `ext_id`) from map-encoded entities and returns only the user-facing
61    /// property key-value pairs.
62    ///
63    /// Returns `None` when `val` is not a map, node, or edge.
64    pub(crate) fn extract_user_properties_from_value(
65        val: &Value,
66    ) -> Option<HashMap<String, Value>> {
67        match val {
68            Value::Map(map) => {
69                // Distinguish entity-encoded maps from plain map literals.
70                // A node map has both `_vid` and `_labels`.
71                // An edge map has `_eid`, `_src`, and `_dst`.
72                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
73                let is_edge_map = map.contains_key("_eid")
74                    && map.contains_key("_src")
75                    && map.contains_key("_dst");
76
77                if is_node_map || is_edge_map {
78                    // Filter out internal bookkeeping keys
79                    let user_props: HashMap<String, Value> = map
80                        .iter()
81                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
82                        .map(|(k, v)| (k.clone(), v.clone()))
83                        .collect();
84                    // When mutation output omits dotted property columns, user
85                    // properties live inside `_all_props` rather than at the
86                    // top level of the entity map.
87                    if user_props.is_empty()
88                        && let Some(Value::Map(all_props)) = map.get("_all_props")
89                    {
90                        return Some(all_props.clone());
91                    }
92                    Some(user_props)
93                } else {
94                    // Plain map literal — return as-is
95                    Some(map.clone())
96                }
97            }
98            Value::Node(node) => Some(node.properties.clone()),
99            Value::Edge(edge) => Some(edge.properties.clone()),
100            _ => None,
101        }
102    }
103
104    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
105    ///
106    /// When `replace` is `true` the entity's property set is replaced: keys absent
107    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
108    /// layer removes them.  When `replace` is `false` the map is merged: keys in
109    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
110    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
111    /// modes.
112    ///
113    /// Labels are never altered — the spec states that `SET n = map` replaces
114    /// properties only.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the entity cannot be found in the storage layer, or
119    /// if the writer fails to persist the updated properties.
120    #[expect(clippy::too_many_arguments)]
121    async fn apply_properties_to_entity(
122        &self,
123        variable: &str,
124        new_props: HashMap<String, Value>,
125        replace: bool,
126        row: &mut HashMap<String, Value>,
127        writer: &mut Writer,
128        prop_manager: &PropertyManager,
129        params: &HashMap<String, Value>,
130        ctx: Option<&QueryContext>,
131        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
132    ) -> Result<()> {
133        // Clone the target so we can hold &row references elsewhere.
134        let target = row.get(variable).cloned();
135
136        match target {
137            Some(Value::Node(ref node)) => {
138                let vid = node.vid;
139                let labels = node.labels.clone();
140                let current = prop_manager
141                    .get_all_vertex_props_with_ctx(vid, ctx)
142                    .await?
143                    .unwrap_or_default();
144                let write_props = Self::merge_props(current, new_props, replace);
145                let mut enriched = write_props.clone();
146                for label_name in &labels {
147                    self.enrich_properties_with_generated_columns(
148                        label_name,
149                        &mut enriched,
150                        prop_manager,
151                        params,
152                        ctx,
153                    )
154                    .await?;
155                }
156                let _ = writer
157                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
158                    .await?;
159                // Update the in-memory row binding
160                if let Some(Value::Node(n)) = row.get_mut(variable) {
161                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
162                }
163            }
164            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
165                let vid = Self::vid_from_value(node_val)?;
166                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
167                let current = prop_manager
168                    .get_all_vertex_props_with_ctx(vid, ctx)
169                    .await?
170                    .unwrap_or_default();
171                let write_props = Self::merge_props(current, new_props, replace);
172                let mut enriched = write_props.clone();
173                for label_name in &labels {
174                    self.enrich_properties_with_generated_columns(
175                        label_name,
176                        &mut enriched,
177                        prop_manager,
178                        params,
179                        ctx,
180                    )
181                    .await?;
182                }
183                let _ = writer
184                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
185                    .await?;
186                // Update the in-memory map-encoded node binding
187                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
188                    // Remove old user property keys, keep internal fields
189                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
190                    // Build effective (non-null) properties
191                    let effective: HashMap<String, Value> =
192                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
193                    for (k, v) in &effective {
194                        node_map.insert(k.clone(), v.clone());
195                    }
196                    // Replace _all_props to reflect the complete property set
197                    node_map.insert("_all_props".to_string(), Value::Map(effective));
198                }
199            }
200            Some(Value::Edge(ref edge)) => {
201                let eid = edge.eid;
202                let src = edge.src;
203                let dst = edge.dst;
204                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
205                let current = prop_manager
206                    .get_all_edge_props_with_ctx(eid, ctx)
207                    .await?
208                    .unwrap_or_default();
209                let write_props = Self::merge_props(current, new_props, replace);
210                writer
211                    .insert_edge(
212                        src,
213                        dst,
214                        etype,
215                        eid,
216                        write_props.clone(),
217                        Some(edge.edge_type.clone()),
218                        tx_l0,
219                    )
220                    .await?;
221                // Update the in-memory row binding
222                if let Some(Value::Edge(e)) = row.get_mut(variable) {
223                    e.properties = write_props
224                        .into_iter()
225                        .filter(|(_, v)| !v.is_null())
226                        .collect();
227                }
228            }
229            Some(Value::Map(ref map))
230                if map.contains_key("_eid")
231                    && map.contains_key("_src")
232                    && map.contains_key("_dst") =>
233            {
234                let ei = self.extract_edge_identity(map)?;
235                let current = prop_manager
236                    .get_all_edge_props_with_ctx(ei.eid, ctx)
237                    .await?
238                    .unwrap_or_default();
239                let write_props = Self::merge_props(current, new_props, replace);
240                let edge_type_name = map
241                    .get("_type")
242                    .and_then(|v| v.as_str())
243                    .map(|s| s.to_string())
244                    .or_else(|| {
245                        self.storage
246                            .schema_manager()
247                            .edge_type_name_by_id_unified(ei.edge_type_id)
248                    });
249                writer
250                    .insert_edge(
251                        ei.src,
252                        ei.dst,
253                        ei.edge_type_id,
254                        ei.eid,
255                        write_props.clone(),
256                        edge_type_name,
257                        tx_l0,
258                    )
259                    .await?;
260                // Update the in-memory map-encoded edge binding
261                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
262                    edge_map.retain(|k, _| k.starts_with('_'));
263                    let effective: HashMap<String, Value> = write_props
264                        .into_iter()
265                        .filter(|(_, v)| !v.is_null())
266                        .collect();
267                    for (k, v) in &effective {
268                        edge_map.insert(k.clone(), v.clone());
269                    }
270                    // Replace _all_props to reflect the complete property set
271                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
272                }
273            }
274            _ => {
275                // No matching entity — nothing to do (caller already guarded against Null)
276            }
277        }
278        Ok(())
279    }
280
281    /// Computes the property map to write given current storage state and the
282    /// incoming change map.
283    ///
284    /// When `replace` is `true`, keys present in `current` but absent from
285    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
286    /// `incoming` are always preserved as explicit tombstones.
287    ///
288    /// When `replace` is `false`, `current` is the base and `incoming` is
289    /// merged on top: each key in `incoming` overwrites or tombstones the
290    /// corresponding entry in `current`.
291    fn merge_props(
292        current: HashMap<String, Value>,
293        incoming: HashMap<String, Value>,
294        replace: bool,
295    ) -> HashMap<String, Value> {
296        if replace {
297            // Start from the non-null incoming entries only.
298            let mut result: HashMap<String, Value> = incoming
299                .iter()
300                .filter(|(_, v)| !v.is_null())
301                .map(|(k, v)| (k.clone(), v.clone()))
302                .collect();
303            // Tombstone every current key that is absent from incoming OR explicitly
304            // set to null in incoming (both mean "delete this property").
305            for k in current.keys() {
306                if incoming.get(k).is_none_or(|v| v.is_null()) {
307                    result.insert(k.clone(), Value::Null);
308                }
309            }
310            result
311        } else {
312            // Merge: start from current and apply incoming on top
313            let mut result = current;
314            result.extend(incoming);
315            result
316        }
317    }
318
319    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
320    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
321        let eid = Eid::from(
322            map.get("_eid")
323                .and_then(|v| v.as_u64())
324                .ok_or_else(|| anyhow!("Invalid _eid"))?,
325        );
326        let src = Vid::from(
327            map.get("_src")
328                .and_then(|v| v.as_u64())
329                .ok_or_else(|| anyhow!("Invalid _src"))?,
330        );
331        let dst = Vid::from(
332            map.get("_dst")
333                .and_then(|v| v.as_u64())
334                .ok_or_else(|| anyhow!("Invalid _dst"))?,
335        );
336        let edge_type_id = self.resolve_edge_type_id(
337            map.get("_type")
338                .ok_or_else(|| anyhow!("Missing _type on edge map"))?,
339        )?;
340        Ok(EdgeIdentity {
341            eid,
342            src,
343            dst,
344            edge_type_id,
345        })
346    }
347
348    /// Resolve edge type ID from a Value, supporting both Int and String representations.
349    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
350    ///
351    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
352    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
353    /// where the edge type was just created and may not be in the read-only lookup yet.
354    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
355        match type_val {
356            Value::Int(i) => Ok(*i as u32),
357            Value::String(name) => {
358                // Use get_or_assign to support schemaless edge types
359                // (will create new ID if not found in schema or registry)
360                Ok(self
361                    .storage
362                    .schema_manager()
363                    .get_or_assign_edge_type_id(name))
364            }
365            _ => Err(anyhow!(
366                "Invalid _type value: expected Int or String, got {:?}",
367                type_val
368            )),
369        }
370    }
371
372    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
373        if let Some(writer_arc) = &self.writer {
374            // Flush first while holding the lock
375            {
376                let mut writer = writer_arc.write().await;
377                writer.flush_to_l1(None).await?;
378            } // Drop lock before compacting to avoid blocking reads/writes
379
380            // Compaction can run without holding the writer lock
381            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
382            let compaction_results = compactor.compact_all().await?;
383
384            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
385            let am = self.storage.adjacency_manager();
386            let schema = self.storage.schema_manager().schema();
387            for info in compaction_results {
388                // Convert string direction to Direction enum
389                let direction = match info.direction.as_str() {
390                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
391                    "bwd" => uni_store::storage::direction::Direction::Incoming,
392                    _ => continue,
393                };
394
395                // Get edge_type_id
396                if let Some(edge_type_id) =
397                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
398                {
399                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
400                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
401                }
402            }
403        }
404        Ok(())
405    }
406
407    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
408        if let Some(writer_arc) = &self.writer {
409            let mut writer = writer_arc.write().await;
410            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
411        }
412        Ok(())
413    }
414
415    pub(crate) async fn execute_copy_to(
416        &self,
417        identifier: &str,
418        path: &str,
419        format: &str,
420        options: &HashMap<String, Value>,
421    ) -> Result<usize> {
422        // Check schema to determine if identifier is an edge type or vertex label
423        let schema = self.storage.schema_manager().schema();
424
425        // Try as edge type first
426        if schema.get_edge_type_case_insensitive(identifier).is_some() {
427            return self
428                .export_edge_type_in_format(identifier, path, format)
429                .await;
430        }
431
432        // Try as vertex label
433        if schema.get_label_case_insensitive(identifier).is_some() {
434            return self
435                .export_vertex_label_in_format(identifier, path, format, options)
436                .await;
437        }
438
439        // Neither edge type nor vertex label found
440        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
441    }
442
443    async fn export_vertex_label_in_format(
444        &self,
445        label: &str,
446        path: &str,
447        format: &str,
448        _options: &HashMap<String, Value>,
449    ) -> Result<usize> {
450        match format {
451            "parquet" => self.export_vertex_label(label, path).await,
452            "csv" => {
453                let mut stream = self
454                    .storage
455                    .scan_vertex_table_stream(label)
456                    .await?
457                    .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
458
459                // Collect all batches
460                let mut all_rows = Vec::new();
461                let mut column_names = Vec::new();
462
463                // Iterate stream using StreamExt
464                use futures::StreamExt;
465                while let Some(batch_result) = stream.next().await {
466                    let batch = batch_result?;
467
468                    // Get column names from first batch
469                    if column_names.is_empty() {
470                        column_names = batch
471                            .schema()
472                            .fields()
473                            .iter()
474                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
475                            .map(|f| f.name().clone())
476                            .collect();
477                    }
478
479                    // Convert batch to rows
480                    for row_idx in 0..batch.num_rows() {
481                        let mut row = Vec::new();
482                        for field in batch.schema().fields() {
483                            if field.name().starts_with('_') || field.name() == "ext_id" {
484                                continue;
485                            }
486
487                            let col_idx = batch.schema().index_of(field.name())?;
488                            let column = batch.column(col_idx);
489                            let value = self.arrow_value_to_json(column, row_idx)?;
490
491                            // Convert value to CSV string
492                            let csv_value = match value {
493                                Value::Null => String::new(),
494                                Value::Bool(b) => b.to_string(),
495                                Value::Int(i) => i.to_string(),
496                                Value::Float(f) => f.to_string(),
497                                Value::String(s) => s,
498                                _ => format!("{value}"),
499                            };
500                            row.push(csv_value);
501                        }
502                        all_rows.push(row);
503                    }
504                }
505
506                // Write CSV
507                let file = std::fs::File::create(path)?;
508                let mut wtr = csv::Writer::from_writer(file);
509
510                // Write headers
511                log::debug!("CSV export headers: {:?}", column_names);
512                wtr.write_record(&column_names)?;
513
514                // Write rows
515                for (i, row) in all_rows.iter().enumerate() {
516                    log::debug!("CSV export row {}: {:?}", i, row);
517                    wtr.write_record(row)?;
518                }
519
520                wtr.flush()?;
521                Ok(all_rows.len())
522            }
523            _ => Err(anyhow!(
524                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
525                format
526            )),
527        }
528    }
529
530    async fn export_edge_type_in_format(
531        &self,
532        edge_type: &str,
533        path: &str,
534        format: &str,
535    ) -> Result<usize> {
536        match format {
537            "parquet" => self.export_edge_type(edge_type, path).await,
538            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
539            _ => Err(anyhow!(
540                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
541                format
542            )),
543        }
544    }
545
546    /// Write a stream of record batches to a Parquet file.
547    /// Returns the total number of rows written, or 0 if the stream is empty.
548    async fn write_batches_to_parquet(
549        mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
550        path: &str,
551        entity_description: &str,
552    ) -> Result<usize> {
553        use futures::TryStreamExt;
554
555        // Get first batch to determine schema and create writer
556        let first_batch = match stream.try_next().await? {
557            Some(batch) => batch,
558            None => {
559                log::info!("No data to export from {}", entity_description);
560                return Ok(0);
561            }
562        };
563
564        // Create Parquet writer using schema from first batch
565        let file = std::fs::File::create(path)?;
566        let arrow_schema = first_batch.schema();
567        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
568
569        // Write first batch
570        let mut count = first_batch.num_rows();
571        writer.write(&first_batch)?;
572
573        // Write remaining batches
574        while let Some(batch) = stream.try_next().await? {
575            count += batch.num_rows();
576            writer.write(&batch)?;
577        }
578
579        writer.close()?;
580
581        log::info!(
582            "Exported {} rows from {} to '{}'",
583            count,
584            entity_description,
585            path
586        );
587        Ok(count)
588    }
589
590    /// Export vertices of a specific label to Parquet
591    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
592        let stream = self
593            .storage
594            .scan_vertex_table_stream(label)
595            .await?
596            .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
597
598        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
599    }
600
601    /// Export edges of a specific type to Parquet
602    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
603        let schema = self.storage.schema_manager().schema();
604        if !schema.edge_types.contains_key(edge_type) {
605            return Err(anyhow!("Edge type '{}' not found", edge_type));
606        }
607
608        let filter = format!("type = '{}'", edge_type);
609        let stream = self
610            .storage
611            .scan_main_edge_table_stream(Some(&filter))
612            .await?
613            .ok_or_else(|| anyhow!("No edge data found"))?;
614
615        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
616    }
617
618    pub(crate) async fn execute_copy_from(
619        &self,
620        label: &str,
621        path: &str,
622        format: &str,
623        options: &HashMap<String, Value>,
624    ) -> Result<usize> {
625        // Read data from file
626        let batches = match format {
627            "parquet" => self.read_parquet_file(path)?,
628            "csv" => self.read_csv_file(path, label, options)?,
629            _ => {
630                return Err(anyhow!(
631                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
632                    format
633                ));
634            }
635        };
636
637        // Get writer
638        let writer_arc = self
639            .writer
640            .as_ref()
641            .ok_or_else(|| anyhow!("No writer available"))?;
642
643        let db_schema = self.storage.schema_manager().schema();
644
645        // Check if this is a label (vertex) or edge type
646        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
647
648        if is_edge {
649            // Import edges
650            let edge_type_id = db_schema
651                .edge_type_id_by_name(label)
652                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
653
654            // Get src and dst column names from options
655            let src_col = options
656                .get("src_col")
657                .and_then(|v| v.as_str())
658                .unwrap_or("src");
659            let dst_col = options
660                .get("dst_col")
661                .and_then(|v| v.as_str())
662                .unwrap_or("dst");
663
664            let mut total_rows = 0;
665            for batch in batches {
666                let num_rows = batch.num_rows();
667
668                for row_idx in 0..num_rows {
669                    let mut properties = HashMap::new();
670                    let mut src_vid: Option<Vid> = None;
671                    let mut dst_vid: Option<Vid> = None;
672
673                    // Extract properties and VIDs from each column
674                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
675                        let col_name = field.name();
676                        let column = batch.column(col_idx);
677                        let value = self.arrow_value_to_json(column, row_idx)?;
678
679                        if col_name == src_col {
680                            let raw = value.as_u64().unwrap_or_else(|| {
681                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
682                            });
683                            src_vid = Some(Vid::new(raw));
684                        } else if col_name == dst_col {
685                            let raw = value.as_u64().unwrap_or_else(|| {
686                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
687                            });
688                            dst_vid = Some(Vid::new(raw));
689                        } else if !col_name.starts_with('_') && !value.is_null() {
690                            properties.insert(col_name.clone(), value);
691                        }
692                    }
693
694                    let src = src_vid
695                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
696                    let dst = dst_vid
697                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
698
699                    // Generate EID and insert edge
700                    let mut writer = writer_arc.write().await;
701                    let eid = writer.next_eid(edge_type_id).await?;
702                    writer
703                        .insert_edge(
704                            src,
705                            dst,
706                            edge_type_id,
707                            eid,
708                            properties,
709                            Some(label.to_string()),
710                            None,
711                        )
712                        .await?;
713
714                    total_rows += 1;
715                }
716            }
717
718            log::info!(
719                "Imported {} edge rows from '{}' into edge type '{}'",
720                total_rows,
721                path,
722                label
723            );
724
725            // Flush to persist edges
726            if total_rows > 0 {
727                let mut writer = writer_arc.write().await;
728                writer.flush_to_l1(None).await?;
729            }
730
731            Ok(total_rows)
732        } else {
733            // Import vertices
734            // Validate the label exists in schema
735            db_schema
736                .label_id_by_name_case_insensitive(label)
737                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
738
739            let mut total_rows = 0;
740            for batch in batches {
741                let num_rows = batch.num_rows();
742
743                // Convert Arrow batch to rows
744                for row_idx in 0..num_rows {
745                    let mut properties = HashMap::new();
746
747                    // Extract properties from each column
748                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
749                        let col_name = field.name();
750
751                        // Skip internal columns
752                        if col_name.starts_with('_') {
753                            continue;
754                        }
755
756                        let column = batch.column(col_idx);
757                        let value = self.arrow_value_to_json(column, row_idx)?;
758
759                        if !value.is_null() {
760                            properties.insert(col_name.clone(), value);
761                        }
762                    }
763
764                    // Generate VID and insert
765                    let mut writer = writer_arc.write().await;
766                    let vid = writer.next_vid().await?;
767                    let _ = writer
768                        .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
769                        .await?;
770
771                    total_rows += 1;
772                }
773            }
774
775            log::info!(
776                "Imported {} rows from '{}' into label '{}'",
777                total_rows,
778                path,
779                label
780            );
781
782            // Flush to persist vertices
783            if total_rows > 0 {
784                let mut writer = writer_arc.write().await;
785                writer.flush_to_l1(None).await?;
786            }
787
788            Ok(total_rows)
789        }
790    }
791
792    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
793        use arrow_array::Array;
794        use arrow_schema::DataType as ArrowDataType;
795
796        if column.is_null(row_idx) {
797            return Ok(Value::Null);
798        }
799
800        match column.data_type() {
801            ArrowDataType::Utf8 => {
802                let array = column
803                    .as_any()
804                    .downcast_ref::<arrow_array::StringArray>()
805                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
806                Ok(Value::String(array.value(row_idx).to_string()))
807            }
808            ArrowDataType::Int32 => {
809                let array = column
810                    .as_any()
811                    .downcast_ref::<arrow_array::Int32Array>()
812                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
813                Ok(Value::Int(array.value(row_idx) as i64))
814            }
815            ArrowDataType::Int64 => {
816                let array = column
817                    .as_any()
818                    .downcast_ref::<arrow_array::Int64Array>()
819                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
820                Ok(Value::Int(array.value(row_idx)))
821            }
822            ArrowDataType::Float32 => {
823                let array = column
824                    .as_any()
825                    .downcast_ref::<arrow_array::Float32Array>()
826                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
827                Ok(Value::Float(array.value(row_idx) as f64))
828            }
829            ArrowDataType::Float64 => {
830                let array = column
831                    .as_any()
832                    .downcast_ref::<arrow_array::Float64Array>()
833                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
834                Ok(Value::Float(array.value(row_idx)))
835            }
836            ArrowDataType::Boolean => {
837                let array = column
838                    .as_any()
839                    .downcast_ref::<arrow_array::BooleanArray>()
840                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
841                Ok(Value::Bool(array.value(row_idx)))
842            }
843            ArrowDataType::UInt64 => {
844                let array = column
845                    .as_any()
846                    .downcast_ref::<arrow_array::UInt64Array>()
847                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
848                Ok(Value::Int(array.value(row_idx) as i64))
849            }
850            _ => {
851                // For other types, try to convert to string
852                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
853                if let Some(arr) = array {
854                    Ok(Value::String(arr.value(row_idx).to_string()))
855                } else {
856                    Ok(Value::Null)
857                }
858            }
859        }
860    }
861
862    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
863        let file = std::fs::File::open(path)?;
864        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
865            .build()?;
866        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
867    }
868
869    fn read_csv_file(
870        &self,
871        path: &str,
872        label: &str,
873        options: &HashMap<String, Value>,
874    ) -> Result<Vec<arrow_array::RecordBatch>> {
875        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
876        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
877        use std::sync::Arc;
878
879        // Parse CSV options
880        let has_headers = options
881            .get("headers")
882            .and_then(|v| v.as_bool())
883            .unwrap_or(true);
884
885        // Read CSV file
886        let file = std::fs::File::open(path)?;
887        let mut rdr = csv::ReaderBuilder::new()
888            .has_headers(has_headers)
889            .from_reader(file);
890
891        // Get schema for type conversion
892        let db_schema = self.storage.schema_manager().schema();
893        let properties = db_schema.properties.get(label);
894
895        // Collect all rows first to determine schema
896        let mut rows: Vec<Vec<String>> = Vec::new();
897        let headers: Vec<String> = if has_headers {
898            rdr.headers()?.iter().map(|s| s.to_string()).collect()
899        } else {
900            Vec::new()
901        };
902
903        for result in rdr.records() {
904            let record = result?;
905            rows.push(record.iter().map(|s| s.to_string()).collect());
906        }
907
908        if rows.is_empty() {
909            return Ok(Vec::new());
910        }
911
912        // Build Arrow schema with proper types based on DB schema
913        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
914        let col_names: Vec<String> = if has_headers {
915            headers
916        } else {
917            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
918        };
919
920        for name in &col_names {
921            let arrow_type = if let Some(props) = properties {
922                if let Some(prop_meta) = props.get(name) {
923                    match prop_meta.r#type {
924                        DataType::Int32 => ArrowDataType::Int32,
925                        DataType::Int64 => ArrowDataType::Int64,
926                        DataType::Float32 => ArrowDataType::Float32,
927                        DataType::Float64 => ArrowDataType::Float64,
928                        DataType::Bool => ArrowDataType::Boolean,
929                        _ => ArrowDataType::Utf8,
930                    }
931                } else {
932                    ArrowDataType::Utf8
933                }
934            } else {
935                ArrowDataType::Utf8
936            };
937            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
938        }
939
940        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
941
942        // Convert rows to Arrow arrays with proper types
943        let mut columns: Vec<ArrayRef> = Vec::new();
944        for (col_idx, field) in arrow_fields.iter().enumerate() {
945            match field.data_type() {
946                ArrowDataType::Int32 => {
947                    let values: Vec<Option<i32>> = rows
948                        .iter()
949                        .map(|row| {
950                            if col_idx < row.len() {
951                                row[col_idx].parse().ok()
952                            } else {
953                                None
954                            }
955                        })
956                        .collect();
957                    columns.push(Arc::new(Int32Array::from(values)));
958                }
959                _ => {
960                    // Default to string
961                    let values: Vec<Option<String>> = rows
962                        .iter()
963                        .map(|row| {
964                            if col_idx < row.len() {
965                                Some(row[col_idx].clone())
966                            } else {
967                                None
968                            }
969                        })
970                        .collect();
971                    columns.push(Arc::new(StringArray::from(values)));
972                }
973            }
974        }
975
976        let batch = RecordBatch::try_new(arrow_schema, columns)?;
977        Ok(vec![batch])
978    }
979
980    fn parse_data_type(type_str: &str) -> Result<DataType> {
981        use uni_common::core::schema::{CrdtType, PointType};
982        let type_str = type_str.to_lowercase();
983        let type_str = type_str.trim();
984        match type_str {
985            "string" | "text" | "varchar" => Ok(DataType::String),
986            "int" | "integer" | "int32" => Ok(DataType::Int32),
987            "long" | "int64" | "bigint" => Ok(DataType::Int64),
988            "float" | "float32" | "real" => Ok(DataType::Float32),
989            "double" | "float64" => Ok(DataType::Float64),
990            "bool" | "boolean" => Ok(DataType::Bool),
991            "timestamp" => Ok(DataType::Timestamp),
992            "date" => Ok(DataType::Date),
993            "time" => Ok(DataType::Time),
994            "datetime" => Ok(DataType::DateTime),
995            "duration" => Ok(DataType::Duration),
996            "json" | "jsonb" => Ok(DataType::CypherValue),
997            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
998            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
999            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1000            s if s.starts_with("vector(") && s.ends_with(')') => {
1001                let dims_str = &s[7..s.len() - 1];
1002                let dimensions = dims_str
1003                    .parse::<usize>()
1004                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1005                Ok(DataType::Vector { dimensions })
1006            }
1007            s if s.starts_with("list<") && s.ends_with('>') => {
1008                let inner_type_str = &s[5..s.len() - 1];
1009                let inner_type = Self::parse_data_type(inner_type_str)?;
1010                Ok(DataType::List(Box::new(inner_type)))
1011            }
1012            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1013            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1014            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1015        }
1016    }
1017
1018    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1019        let sm = self.storage.schema_manager_arc();
1020        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1021            return Ok(());
1022        }
1023        sm.add_label(&clause.name)?;
1024        for prop in clause.properties {
1025            let dt = Self::parse_data_type(&prop.data_type)?;
1026            sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1027            if prop.unique {
1028                let constraint = Constraint {
1029                    name: format!("{}_{}_unique", clause.name, prop.name),
1030                    constraint_type: ConstraintType::Unique {
1031                        properties: vec![prop.name],
1032                    },
1033                    target: ConstraintTarget::Label(clause.name.clone()),
1034                    enabled: true,
1035                };
1036                sm.add_constraint(constraint)?;
1037            }
1038        }
1039        sm.save().await?;
1040        Ok(())
1041    }
1042
1043    pub(crate) async fn enrich_properties_with_generated_columns(
1044        &self,
1045        label_name: &str,
1046        properties: &mut HashMap<String, Value>,
1047        prop_manager: &PropertyManager,
1048        params: &HashMap<String, Value>,
1049        ctx: Option<&QueryContext>,
1050    ) -> Result<()> {
1051        let schema = self.storage.schema_manager().schema();
1052
1053        if let Some(props_meta) = schema.properties.get(label_name) {
1054            let mut generators = Vec::new();
1055            for (prop_name, meta) in props_meta {
1056                if let Some(expr_str) = &meta.generation_expression {
1057                    generators.push((prop_name.clone(), expr_str.clone()));
1058                }
1059            }
1060
1061            for (prop_name, expr_str) in generators {
1062                let cache_key = (label_name.to_string(), prop_name.clone());
1063                let expr = {
1064                    let cache = self.gen_expr_cache.read().await;
1065                    cache.get(&cache_key).cloned()
1066                };
1067
1068                let expr = match expr {
1069                    Some(e) => e,
1070                    None => {
1071                        let parsed = uni_cypher::parse_expression(&expr_str)
1072                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1073                        let mut cache = self.gen_expr_cache.write().await;
1074                        cache.insert(cache_key, parsed.clone());
1075                        parsed
1076                    }
1077                };
1078
1079                let mut scope = HashMap::new();
1080
1081                // If expression has an explicit variable, use it as an object
1082                if let Some(var) = expr.extract_variable() {
1083                    scope.insert(var, Value::Map(properties.clone()));
1084                } else {
1085                    // No explicit variable - add properties directly to scope for bare references
1086                    // e.g., "lower(email)" can reference "email" directly
1087                    for (k, v) in properties.iter() {
1088                        scope.insert(k.clone(), v.clone());
1089                    }
1090                }
1091
1092                let val = self
1093                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1094                    .await?;
1095                properties.insert(prop_name, val);
1096            }
1097        }
1098        Ok(())
1099    }
1100
1101    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1102        let sm = self.storage.schema_manager_arc();
1103        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1104            return Ok(());
1105        }
1106        sm.add_edge_type(&clause.name, clause.src_labels, clause.dst_labels)?;
1107        for prop in clause.properties {
1108            let dt = Self::parse_data_type(&prop.data_type)?;
1109            sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1110        }
1111        sm.save().await?;
1112        Ok(())
1113    }
1114
1115    /// Executes an ALTER action on a schema entity.
1116    ///
1117    /// This is a shared helper for both `execute_alter_label` and
1118    /// `execute_alter_edge_type` since they have identical logic.
1119    pub(crate) async fn execute_alter_entity(
1120        sm: &Arc<SchemaManager>,
1121        entity_name: &str,
1122        action: AlterAction,
1123    ) -> Result<()> {
1124        match action {
1125            AlterAction::AddProperty(prop) => {
1126                let dt = Self::parse_data_type(&prop.data_type)?;
1127                sm.add_property(entity_name, &prop.name, dt, prop.nullable)?;
1128            }
1129            AlterAction::DropProperty(prop_name) => {
1130                sm.drop_property(entity_name, &prop_name)?;
1131            }
1132            AlterAction::RenameProperty { old_name, new_name } => {
1133                sm.rename_property(entity_name, &old_name, &new_name)?;
1134            }
1135        }
1136        sm.save().await?;
1137        Ok(())
1138    }
1139
1140    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1141        Self::execute_alter_entity(
1142            &self.storage.schema_manager_arc(),
1143            &clause.name,
1144            clause.action,
1145        )
1146        .await
1147    }
1148
1149    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1150        Self::execute_alter_entity(
1151            &self.storage.schema_manager_arc(),
1152            &clause.name,
1153            clause.action,
1154        )
1155        .await
1156    }
1157
1158    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1159        let sm = self.storage.schema_manager_arc();
1160        sm.drop_label(&clause.name, clause.if_exists)?;
1161        sm.save().await?;
1162        Ok(())
1163    }
1164
1165    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1166        let sm = self.storage.schema_manager_arc();
1167        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1168        sm.save().await?;
1169        Ok(())
1170    }
1171
1172    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1173        let sm = self.storage.schema_manager_arc();
1174        let target = ConstraintTarget::Label(clause.label);
1175        let c_type = match clause.constraint_type {
1176            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1177                properties: clause.properties,
1178            },
1179            AstConstraintType::Exists => {
1180                let property = clause
1181                    .properties
1182                    .into_iter()
1183                    .next()
1184                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1185                ConstraintType::Exists { property }
1186            }
1187            AstConstraintType::Check => {
1188                let expression = clause
1189                    .expression
1190                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1191                ConstraintType::Check {
1192                    expression: expression.to_string_repr(),
1193                }
1194            }
1195        };
1196
1197        let constraint = Constraint {
1198            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1199            constraint_type: c_type,
1200            target,
1201            enabled: true,
1202        };
1203
1204        sm.add_constraint(constraint)?;
1205        sm.save().await?;
1206        Ok(())
1207    }
1208
1209    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1210        let sm = self.storage.schema_manager_arc();
1211        sm.drop_constraint(&clause.name, false)?;
1212        sm.save().await?;
1213        Ok(())
1214    }
1215
1216    fn get_composite_constraint(&self, label: &str) -> Option<Constraint> {
1217        let schema = self.storage.schema_manager().schema();
1218        schema
1219            .constraints
1220            .iter()
1221            .find(|c| {
1222                if !c.enabled {
1223                    return false;
1224                }
1225                match &c.target {
1226                    ConstraintTarget::Label(l) if l == label => {
1227                        matches!(c.constraint_type, ConstraintType::Unique { .. })
1228                    }
1229                    _ => false,
1230                }
1231            })
1232            .cloned()
1233    }
1234
1235    #[expect(clippy::too_many_arguments)]
1236    pub(crate) async fn execute_merge(
1237        &self,
1238        rows: Vec<HashMap<String, Value>>,
1239        pattern: &Pattern,
1240        on_match: Option<&SetClause>,
1241        on_create: Option<&SetClause>,
1242        prop_manager: &PropertyManager,
1243        params: &HashMap<String, Value>,
1244        ctx: Option<&QueryContext>,
1245        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1246    ) -> Result<Vec<HashMap<String, Value>>> {
1247        let writer_lock = self
1248            .writer
1249            .as_ref()
1250            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1251
1252        // Prepare pattern for path variable binding: assign temp edge variable
1253        // names to unnamed relationships in paths that have path variables.
1254        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1255
1256        let mut results = Vec::new();
1257        for mut row in rows {
1258            // Optimization: Check for single node pattern with unique constraint
1259            let mut optimized_vid = None;
1260            if pattern.paths.len() == 1 {
1261                let path = &pattern.paths[0];
1262                if path.elements.len() == 1
1263                    && let PatternElement::Node(n) = &path.elements[0]
1264                    && n.labels.len() == 1
1265                    && let Some(constraint) = self.get_composite_constraint(&n.labels[0])
1266                    && let ConstraintType::Unique { properties } = constraint.constraint_type
1267                {
1268                    let label = &n.labels[0];
1269                    // Evaluate pattern properties
1270                    let mut pattern_props = HashMap::new();
1271                    if let Some(props_expr) = &n.properties {
1272                        let val = self
1273                            .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1274                            .await?;
1275                        if let Value::Map(map) = val {
1276                            for (k, v) in map {
1277                                pattern_props.insert(k, v);
1278                            }
1279                        }
1280                    }
1281
1282                    // Check if all constraint properties are present
1283                    let has_all_keys = properties.iter().all(|p| pattern_props.contains_key(p));
1284                    if has_all_keys {
1285                        // Extract key properties and convert to serde_json::Value for index lookup
1286                        let key_props: HashMap<String, serde_json::Value> = properties
1287                            .iter()
1288                            .filter_map(|p| {
1289                                pattern_props.get(p).map(|v| (p.clone(), v.clone().into()))
1290                            })
1291                            .collect();
1292
1293                        // Use optimized lookup
1294                        if let Ok(Some(vid)) = self
1295                            .storage
1296                            .index_manager()
1297                            .composite_lookup(label, &key_props)
1298                            .await
1299                        {
1300                            optimized_vid = Some((vid, pattern_props));
1301                        }
1302                    }
1303                }
1304            }
1305
1306            if let Some((vid, _pattern_props)) = optimized_vid {
1307                // Optimized Path: Node found via index
1308                let mut writer = writer_lock.write().await;
1309
1310                let mut match_row = row.clone();
1311                if let PatternElement::Node(n) = &pattern.paths[0].elements[0]
1312                    && let Some(var) = &n.variable
1313                {
1314                    match_row.insert(var.clone(), Value::Int(vid.as_u64() as i64));
1315                }
1316
1317                let result = if let Some(set) = on_match {
1318                    self.execute_set_items_locked(
1319                        &set.items,
1320                        &mut match_row,
1321                        &mut writer,
1322                        prop_manager,
1323                        params,
1324                        ctx,
1325                        tx_l0_override,
1326                    )
1327                    .await
1328                } else {
1329                    Ok(())
1330                };
1331
1332                drop(writer);
1333                result?;
1334
1335                Self::bind_path_variables(&path_pattern, &mut match_row, &temp_vars);
1336                results.push(match_row);
1337            } else {
1338                // Fallback to standard execution
1339                let matches = self
1340                    .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1341                    .await?;
1342                let mut writer = writer_lock.write().await;
1343
1344                let result: Result<Vec<HashMap<String, Value>>> = async {
1345                    let mut batch = Vec::new();
1346                    if !matches.is_empty() {
1347                        for mut m in matches {
1348                            if let Some(set) = on_match {
1349                                self.execute_set_items_locked(
1350                                    &set.items,
1351                                    &mut m,
1352                                    &mut writer,
1353                                    prop_manager,
1354                                    params,
1355                                    ctx,
1356                                    tx_l0_override,
1357                                )
1358                                .await?;
1359                            }
1360                            Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1361                            batch.push(m);
1362                        }
1363                    } else {
1364                        self.execute_create_pattern(
1365                            &path_pattern,
1366                            &mut row,
1367                            &mut writer,
1368                            prop_manager,
1369                            params,
1370                            ctx,
1371                            tx_l0_override,
1372                        )
1373                        .await?;
1374                        if let Some(set) = on_create {
1375                            self.execute_set_items_locked(
1376                                &set.items,
1377                                &mut row,
1378                                &mut writer,
1379                                prop_manager,
1380                                params,
1381                                ctx,
1382                                tx_l0_override,
1383                            )
1384                            .await?;
1385                        }
1386                        Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1387                        batch.push(row);
1388                    }
1389                    Ok(batch)
1390                }
1391                .await;
1392
1393                drop(writer);
1394                results.extend(result?);
1395            }
1396        }
1397        Ok(results)
1398    }
1399
1400    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
1401    #[expect(clippy::too_many_arguments)]
1402    pub(crate) async fn execute_create_pattern(
1403        &self,
1404        pattern: &Pattern,
1405        row: &mut HashMap<String, Value>,
1406        writer: &mut Writer,
1407        prop_manager: &PropertyManager,
1408        params: &HashMap<String, Value>,
1409        ctx: Option<&QueryContext>,
1410        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1411    ) -> Result<()> {
1412        for path in &pattern.paths {
1413            let mut prev_vid: Option<Vid> = None;
1414            // (rel_var, type_id, type_name, props_expr, direction)
1415            type PendingRel = (String, u32, String, Option<Expr>, Direction);
1416            let mut rel_pending: Option<PendingRel> = None;
1417
1418            for element in &path.elements {
1419                match element {
1420                    PatternElement::Node(n) => {
1421                        let mut vid = None;
1422
1423                        // Check if node variable already bound in row
1424                        if let Some(var) = &n.variable
1425                            && let Some(val) = row.get(var)
1426                            && let Ok(existing_vid) = Self::vid_from_value(val)
1427                        {
1428                            vid = Some(existing_vid);
1429                        }
1430
1431                        // If not bound, create it
1432                        if vid.is_none() {
1433                            let mut props = HashMap::new();
1434                            if let Some(props_expr) = &n.properties {
1435                                let props_val = self
1436                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
1437                                    .await?;
1438                                if let Value::Map(map) = props_val {
1439                                    for (k, v) in map {
1440                                        props.insert(k, v);
1441                                    }
1442                                } else {
1443                                    return Err(anyhow!("Properties must evaluate to a map"));
1444                                }
1445                            }
1446
1447                            // Support unlabeled nodes and unknown labels (schemaless)
1448                            let schema = self.storage.schema_manager().schema();
1449
1450                            // VID generation is label-independent
1451                            let new_vid = writer.next_vid().await?;
1452
1453                            // Enrich with generated columns only for known labels
1454                            for label_name in &n.labels {
1455                                if schema.get_label_case_insensitive(label_name).is_some() {
1456                                    self.enrich_properties_with_generated_columns(
1457                                        label_name,
1458                                        &mut props,
1459                                        prop_manager,
1460                                        params,
1461                                        ctx,
1462                                    )
1463                                    .await?;
1464                                }
1465                            }
1466
1467                            // Insert vertex and get back final properties (includes auto-generated embeddings)
1468                            let final_props = writer
1469                                .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
1470                                .await?;
1471
1472                            // Build node object with final properties (includes embeddings)
1473                            if let Some(var) = &n.variable {
1474                                let mut obj = HashMap::new();
1475                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
1476                                let labels_list: Vec<Value> =
1477                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
1478                                obj.insert("_labels".to_string(), Value::List(labels_list));
1479                                for (k, v) in &final_props {
1480                                    obj.insert(k.clone(), v.clone());
1481                                }
1482                                // Store node as a Map with _vid, matching MATCH behavior
1483                                row.insert(var.clone(), Value::Map(obj));
1484                            }
1485                            vid = Some(new_vid);
1486                        }
1487
1488                        let current_vid = vid.unwrap();
1489
1490                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
1491                            rel_pending.take()
1492                            && let Some(src) = prev_vid
1493                        {
1494                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
1495
1496                            if !is_rel_bound {
1497                                let mut rel_props = HashMap::new();
1498                                if let Some(expr) = rel_props_expr {
1499                                    let val = self
1500                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
1501                                        .await?;
1502                                    if let Value::Map(map) = val {
1503                                        rel_props.extend(map);
1504                                    }
1505                                }
1506                                let eid = writer.next_eid(type_id).await?;
1507
1508                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
1509                                let (edge_src, edge_dst) = match dir {
1510                                    Direction::Incoming => (current_vid, src),
1511                                    _ => (src, current_vid),
1512                                };
1513
1514                                let store_props = !rel_var.is_empty();
1515                                let user_props = if store_props {
1516                                    rel_props.clone()
1517                                } else {
1518                                    HashMap::new()
1519                                };
1520
1521                                writer
1522                                    .insert_edge(
1523                                        edge_src,
1524                                        edge_dst,
1525                                        type_id,
1526                                        eid,
1527                                        rel_props,
1528                                        Some(type_name.clone()),
1529                                        tx_l0,
1530                                    )
1531                                    .await?;
1532
1533                                // Edge type name is now stored by insert_edge
1534
1535                                if store_props {
1536                                    let mut edge_map = HashMap::new();
1537                                    edge_map.insert(
1538                                        "_eid".to_string(),
1539                                        Value::Int(eid.as_u64() as i64),
1540                                    );
1541                                    edge_map.insert(
1542                                        "_src".to_string(),
1543                                        Value::Int(edge_src.as_u64() as i64),
1544                                    );
1545                                    edge_map.insert(
1546                                        "_dst".to_string(),
1547                                        Value::Int(edge_dst.as_u64() as i64),
1548                                    );
1549                                    edge_map
1550                                        .insert("_type".to_string(), Value::Int(type_id as i64));
1551                                    // Include user properties so downstream RETURN sees them
1552                                    for (k, v) in user_props {
1553                                        edge_map.insert(k, v);
1554                                    }
1555                                    row.insert(rel_var, Value::Map(edge_map));
1556                                }
1557                            }
1558                        }
1559                        prev_vid = Some(current_vid);
1560                    }
1561                    PatternElement::Relationship(r) => {
1562                        if r.types.len() != 1 {
1563                            return Err(anyhow!(
1564                                "CREATE relationship must specify exactly one type"
1565                            ));
1566                        }
1567                        let type_name = &r.types[0];
1568                        // Get or assign edge type ID (schemaless types get bit 31 = 1)
1569                        let type_id = self
1570                            .storage
1571                            .schema_manager()
1572                            .get_or_assign_edge_type_id(type_name);
1573
1574                        rel_pending = Some((
1575                            r.variable.clone().unwrap_or_default(),
1576                            type_id,
1577                            type_name.clone(),
1578                            r.properties.clone(),
1579                            r.direction.clone(),
1580                        ));
1581                    }
1582                    PatternElement::Parenthesized { .. } => {
1583                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
1584                    }
1585                }
1586            }
1587        }
1588        Ok(())
1589    }
1590
1591    /// Validates that a value is a valid property type per OpenCypher.
1592    /// Rejects maps, nodes, edges, paths, and lists containing those types or nested lists.
1593    /// Skips validation for CypherValue-typed properties which accept any value.
1594    fn validate_property_value(
1595        prop_name: &str,
1596        val: &Value,
1597        schema: &uni_common::core::schema::Schema,
1598        labels: &[String],
1599    ) -> Result<()> {
1600        // CypherValue-typed properties accept any value (including Maps)
1601        for label in labels {
1602            if let Some(props) = schema.properties.get(label)
1603                && let Some(prop_meta) = props.get(prop_name)
1604                && prop_meta.r#type == uni_common::core::schema::DataType::CypherValue
1605            {
1606                return Ok(());
1607            }
1608        }
1609
1610        match val {
1611            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
1612                anyhow::bail!(
1613                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1614                    prop_name
1615                );
1616            }
1617            Value::List(items) => {
1618                for item in items {
1619                    match item {
1620                        Value::Map(_)
1621                        | Value::Node(_)
1622                        | Value::Edge(_)
1623                        | Value::Path(_)
1624                        | Value::List(_) => {
1625                            anyhow::bail!(
1626                                "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1627                                prop_name
1628                            );
1629                        }
1630                        _ => {}
1631                    }
1632                }
1633            }
1634            _ => {}
1635        }
1636        Ok(())
1637    }
1638
1639    #[expect(clippy::too_many_arguments)]
1640    pub(crate) async fn execute_set_items_locked(
1641        &self,
1642        items: &[SetItem],
1643        row: &mut HashMap<String, Value>,
1644        writer: &mut Writer,
1645        prop_manager: &PropertyManager,
1646        params: &HashMap<String, Value>,
1647        ctx: Option<&QueryContext>,
1648        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1649    ) -> Result<()> {
1650        for item in items {
1651            match item {
1652                SetItem::Property { expr, value } => {
1653                    if let Expr::Property(var_expr, prop_name) = expr
1654                        && let Expr::Variable(var_name) = &**var_expr
1655                        && let Some(node_val) = row.get(var_name)
1656                    {
1657                        if let Ok(vid) = Self::vid_from_value(node_val) {
1658                            let labels =
1659                                Self::extract_labels_from_node(node_val).unwrap_or_default();
1660                            let schema = self.storage.schema_manager().schema().clone();
1661                            let mut props = prop_manager
1662                                .get_all_vertex_props_with_ctx(vid, ctx)
1663                                .await?
1664                                .unwrap_or_default();
1665                            let val = self
1666                                .evaluate_expr(value, row, prop_manager, params, ctx)
1667                                .await?;
1668                            Self::validate_property_value(prop_name, &val, &schema, &labels)?;
1669                            props.insert(prop_name.clone(), val.clone());
1670
1671                            // Enrich with generated columns
1672                            for label_name in &labels {
1673                                self.enrich_properties_with_generated_columns(
1674                                    label_name,
1675                                    &mut props,
1676                                    prop_manager,
1677                                    params,
1678                                    ctx,
1679                                )
1680                                .await?;
1681                            }
1682
1683                            let _ = writer
1684                                .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1685                                .await?;
1686
1687                            // Update the row object so subsequent RETURN sees the new value
1688                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1689                                node_map.insert(prop_name.clone(), val);
1690                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
1691                                node.properties.insert(prop_name.clone(), val);
1692                            }
1693                        } else if let Value::Map(map) = node_val
1694                            && map.get("_eid").is_some_and(|v| !v.is_null())
1695                            && map.get("_src").is_some_and(|v| !v.is_null())
1696                            && map.get("_dst").is_some_and(|v| !v.is_null())
1697                            && map.get("_type").is_some_and(|v| !v.is_null())
1698                        {
1699                            let ei = self.extract_edge_identity(map)?;
1700                            let schema = self.storage.schema_manager().schema().clone();
1701                            // Handle _type as either String or Int (Int from CREATE, String from queries)
1702                            let edge_type_name = match map.get("_type") {
1703                                Some(Value::String(s)) => s.clone(),
1704                                Some(Value::Int(id)) => schema
1705                                    .edge_type_name_by_id_unified(*id as u32)
1706                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
1707                                _ => String::new(),
1708                            };
1709
1710                            let mut props = prop_manager
1711                                .get_all_edge_props_with_ctx(ei.eid, ctx)
1712                                .await?
1713                                .unwrap_or_default();
1714                            let val = self
1715                                .evaluate_expr(value, row, prop_manager, params, ctx)
1716                                .await?;
1717                            Self::validate_property_value(
1718                                prop_name,
1719                                &val,
1720                                &schema,
1721                                std::slice::from_ref(&edge_type_name),
1722                            )?;
1723                            props.insert(prop_name.clone(), val.clone());
1724                            writer
1725                                .insert_edge(
1726                                    ei.src,
1727                                    ei.dst,
1728                                    ei.edge_type_id,
1729                                    ei.eid,
1730                                    props,
1731                                    Some(edge_type_name.clone()),
1732                                    tx_l0,
1733                                )
1734                                .await?;
1735
1736                            // Update the row object so subsequent RETURN sees the new value
1737                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1738                                edge_map.insert(prop_name.clone(), val);
1739                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1740                                edge.properties.insert(prop_name.clone(), val);
1741                            }
1742                        } else if let Value::Edge(edge) = node_val {
1743                            // Handle Value::Edge directly (when traverse returns Edge objects)
1744                            let eid = edge.eid;
1745                            let src = edge.src;
1746                            let dst = edge.dst;
1747                            let edge_type_name = edge.edge_type.clone();
1748                            let etype =
1749                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
1750                            let schema = self.storage.schema_manager().schema().clone();
1751
1752                            let mut props = prop_manager
1753                                .get_all_edge_props_with_ctx(eid, ctx)
1754                                .await?
1755                                .unwrap_or_default();
1756                            let val = self
1757                                .evaluate_expr(value, row, prop_manager, params, ctx)
1758                                .await?;
1759                            Self::validate_property_value(
1760                                prop_name,
1761                                &val,
1762                                &schema,
1763                                std::slice::from_ref(&edge_type_name),
1764                            )?;
1765                            props.insert(prop_name.clone(), val.clone());
1766                            writer
1767                                .insert_edge(
1768                                    src,
1769                                    dst,
1770                                    etype,
1771                                    eid,
1772                                    props,
1773                                    Some(edge_type_name.clone()),
1774                                    tx_l0,
1775                                )
1776                                .await?;
1777
1778                            // Update the row object so subsequent RETURN sees the new value
1779                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1780                                edge.properties.insert(prop_name.clone(), val);
1781                            }
1782                        }
1783                    }
1784                }
1785                SetItem::Labels { variable, labels } => {
1786                    if let Some(node_val) = row.get(variable)
1787                        && let Ok(vid) = Self::vid_from_value(node_val)
1788                    {
1789                        // Get current labels from node value
1790                        let current_labels =
1791                            Self::extract_labels_from_node(node_val).unwrap_or_default();
1792
1793                        // Determine new labels to add (skip duplicates)
1794                        let labels_to_add: Vec<_> = labels
1795                            .iter()
1796                            .filter(|l| !current_labels.contains(l))
1797                            .cloned()
1798                            .collect();
1799
1800                        if !labels_to_add.is_empty() {
1801                            // Add labels via L0Buffer (schemaless: accept any label name,
1802                            // matching CREATE behavior)
1803                            if let Some(ctx) = ctx {
1804                                ctx.l0.write().add_vertex_labels(vid, &labels_to_add);
1805                            }
1806
1807                            // Update the node value in the row with new labels
1808                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
1809                                let mut updated_labels = current_labels;
1810                                updated_labels.extend(labels_to_add);
1811                                let labels_list =
1812                                    updated_labels.into_iter().map(Value::String).collect();
1813                                obj.insert("_labels".to_string(), Value::List(labels_list));
1814                            }
1815                        }
1816                    }
1817                }
1818                SetItem::Variable { variable, value }
1819                | SetItem::VariablePlus { variable, value } => {
1820                    let replace = matches!(item, SetItem::Variable { .. });
1821                    let op_str = if replace { "=" } else { "+=" };
1822
1823                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
1824                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
1825                        continue;
1826                    }
1827                    let rhs = self
1828                        .evaluate_expr(value, row, prop_manager, params, ctx)
1829                        .await?;
1830                    let new_props =
1831                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
1832                            anyhow!(
1833                                "SET {} {} expr: right-hand side must evaluate to a map, \
1834                                 node, or relationship",
1835                                variable,
1836                                op_str
1837                            )
1838                        })?;
1839                    self.apply_properties_to_entity(
1840                        variable,
1841                        new_props,
1842                        replace,
1843                        row,
1844                        writer,
1845                        prop_manager,
1846                        params,
1847                        ctx,
1848                        tx_l0,
1849                    )
1850                    .await?;
1851                }
1852            }
1853        }
1854        Ok(())
1855    }
1856
1857    /// Execute REMOVE clause items (property removal or label removal).
1858    ///
1859    /// Property removals are batched per variable to avoid stale reads: when
1860    /// multiple properties of the same entity are removed in one REMOVE clause,
1861    /// we read from storage once, null all specified properties, and write back
1862    /// once. This prevents the second removal from reading stale data that
1863    /// doesn't reflect the first removal's L0 write.
1864    pub(crate) async fn execute_remove_items_locked(
1865        &self,
1866        items: &[RemoveItem],
1867        row: &mut HashMap<String, Value>,
1868        writer: &mut Writer,
1869        prop_manager: &PropertyManager,
1870        ctx: Option<&QueryContext>,
1871        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1872    ) -> Result<()> {
1873        // Collect property names to remove, grouped by variable.
1874        // Use Vec<(String, Vec<String>)> to preserve insertion order.
1875        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
1876
1877        for item in items {
1878            match item {
1879                RemoveItem::Property(expr) => {
1880                    if let Expr::Property(var_expr, prop_name) = expr
1881                        && let Expr::Variable(var_name) = &**var_expr
1882                    {
1883                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
1884                            entry.1.push(prop_name.clone());
1885                        } else {
1886                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
1887                        }
1888                    }
1889                }
1890                RemoveItem::Labels { variable, labels } => {
1891                    self.execute_remove_labels(variable, labels, row, ctx)?;
1892                }
1893            }
1894        }
1895
1896        // Execute batched property removals per variable.
1897        for (var_name, prop_names) in &prop_removals {
1898            let Some(node_val) = row.get(var_name) else {
1899                continue;
1900            };
1901
1902            if let Ok(vid) = Self::vid_from_value(node_val) {
1903                // Vertex property removal
1904                let mut props = prop_manager
1905                    .get_all_vertex_props_with_ctx(vid, ctx)
1906                    .await?
1907                    .unwrap_or_default();
1908
1909                // Only write back if at least one property actually exists
1910                let removed_count = prop_names
1911                    .iter()
1912                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1913                    .count();
1914                let any_exist = removed_count > 0;
1915                if any_exist {
1916                    writer.track_properties_removed(removed_count, tx_l0);
1917                    for prop_name in prop_names {
1918                        props.insert(prop_name.clone(), Value::Null);
1919                    }
1920                }
1921                // Compute effective properties (post-removal) for _all_props
1922                let effective: HashMap<String, Value> = props
1923                    .iter()
1924                    .filter(|(_, v)| !v.is_null())
1925                    .map(|(k, v)| (k.clone(), v.clone()))
1926                    .collect();
1927                if any_exist {
1928                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
1929                    let _ = writer
1930                        .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1931                        .await?;
1932                }
1933
1934                // Update the row map: set removed props to Null
1935                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1936                    for prop_name in prop_names {
1937                        node_map.insert(prop_name.clone(), Value::Null);
1938                    }
1939                    // Set _all_props to the complete effective property set
1940                    node_map.insert("_all_props".to_string(), Value::Map(effective));
1941                }
1942            } else if let Value::Map(map) = node_val {
1943                // Edge property removal (map-encoded)
1944                // Check for non-null _eid to skip OPTIONAL MATCH null edges
1945                let mut edge_effective: Option<HashMap<String, Value>> = None;
1946                if map.get("_eid").is_some_and(|v| !v.is_null()) {
1947                    let ei = self.extract_edge_identity(map)?;
1948                    let mut props = prop_manager
1949                        .get_all_edge_props_with_ctx(ei.eid, ctx)
1950                        .await?
1951                        .unwrap_or_default();
1952
1953                    let removed_count = prop_names
1954                        .iter()
1955                        .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1956                        .count();
1957                    let any_exist = removed_count > 0;
1958                    if any_exist {
1959                        writer.track_properties_removed(removed_count, tx_l0);
1960                        for prop_name in prop_names {
1961                            props.insert(prop_name.to_string(), Value::Null);
1962                        }
1963                    }
1964                    // Compute effective properties (post-removal) for _all_props
1965                    edge_effective = Some(
1966                        props
1967                            .iter()
1968                            .filter(|(_, v)| !v.is_null())
1969                            .map(|(k, v)| (k.clone(), v.clone()))
1970                            .collect(),
1971                    );
1972                    if any_exist {
1973                        let edge_type_name = map
1974                            .get("_type")
1975                            .and_then(|v| v.as_str())
1976                            .map(|s| s.to_string())
1977                            .or_else(|| {
1978                                self.storage
1979                                    .schema_manager()
1980                                    .edge_type_name_by_id_unified(ei.edge_type_id)
1981                            });
1982                        writer
1983                            .insert_edge(
1984                                ei.src,
1985                                ei.dst,
1986                                ei.edge_type_id,
1987                                ei.eid,
1988                                props,
1989                                edge_type_name,
1990                                tx_l0,
1991                            )
1992                            .await?;
1993                    }
1994                }
1995
1996                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1997                    for prop_name in prop_names {
1998                        edge_map.insert(prop_name.clone(), Value::Null);
1999                    }
2000                    if let Some(effective) = edge_effective {
2001                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
2002                    }
2003                }
2004            } else if let Value::Edge(edge) = node_val {
2005                // Edge property removal (Value::Edge)
2006                let eid = edge.eid;
2007                let src = edge.src;
2008                let dst = edge.dst;
2009                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
2010
2011                let mut props = prop_manager
2012                    .get_all_edge_props_with_ctx(eid, ctx)
2013                    .await?
2014                    .unwrap_or_default();
2015
2016                let removed_count = prop_names
2017                    .iter()
2018                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2019                    .count();
2020                if removed_count > 0 {
2021                    writer.track_properties_removed(removed_count, tx_l0);
2022                    for prop_name in prop_names {
2023                        props.insert(prop_name.to_string(), Value::Null);
2024                    }
2025                    writer
2026                        .insert_edge(
2027                            src,
2028                            dst,
2029                            etype,
2030                            eid,
2031                            props,
2032                            Some(edge.edge_type.clone()),
2033                            tx_l0,
2034                        )
2035                        .await?;
2036                }
2037
2038                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2039                    for prop_name in prop_names {
2040                        edge.properties.insert(prop_name.to_string(), Value::Null);
2041                    }
2042                }
2043            }
2044        }
2045
2046        Ok(())
2047    }
2048
2049    /// Execute label removal.
2050    pub(crate) fn execute_remove_labels(
2051        &self,
2052        variable: &str,
2053        labels: &[String],
2054        row: &mut HashMap<String, Value>,
2055        ctx: Option<&QueryContext>,
2056    ) -> Result<()> {
2057        if let Some(node_val) = row.get(variable)
2058            && let Ok(vid) = Self::vid_from_value(node_val)
2059        {
2060            // Get current labels from node value
2061            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2062
2063            // Determine which labels to actually remove (only those currently present)
2064            let labels_to_remove: Vec<_> = labels
2065                .iter()
2066                .filter(|l| current_labels.contains(l))
2067                .collect();
2068
2069            if !labels_to_remove.is_empty() {
2070                // Remove labels via L0Buffer
2071                if let Some(ctx) = ctx {
2072                    let mut l0 = ctx.l0.write();
2073                    for label in &labels_to_remove {
2074                        l0.remove_vertex_label(vid, label);
2075                    }
2076                }
2077
2078                // Update the node value in the row with remaining labels
2079                if let Some(Value::Map(obj)) = row.get_mut(variable) {
2080                    let remaining_labels: Vec<_> = current_labels
2081                        .iter()
2082                        .filter(|l| !labels_to_remove.contains(l))
2083                        .cloned()
2084                        .collect();
2085                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
2086                    obj.insert("_labels".to_string(), Value::List(labels_list));
2087                }
2088            }
2089        }
2090        Ok(())
2091    }
2092
2093    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
2094    /// by looking up the type from the L0 buffer's edge endpoints.
2095    fn resolve_edge_type_id_for_edge(
2096        &self,
2097        edge: &crate::types::Edge,
2098        writer: &Writer,
2099        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2100    ) -> Result<u32> {
2101        if !edge.edge_type.is_empty() {
2102            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
2103        }
2104        // Edge type name is empty (e.g., from anonymous MATCH patterns).
2105        // Look up the edge type ID from the L0 buffer's edge endpoints.
2106        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
2107            return Ok(etype);
2108        }
2109        Err(anyhow!(
2110            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
2111            edge.eid
2112        ))
2113    }
2114
2115    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
2116    pub(crate) async fn execute_delete_item_locked(
2117        &self,
2118        val: &Value,
2119        detach: bool,
2120        writer: &mut Writer,
2121        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2122    ) -> Result<()> {
2123        match val {
2124            Value::Null => {
2125                // DELETE null is a no-op per OpenCypher spec
2126            }
2127            Value::Path(path) => {
2128                // Delete path edges first, then nodes
2129                for edge in &path.edges {
2130                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2131                    writer
2132                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2133                        .await?;
2134                }
2135                for node in &path.nodes {
2136                    self.execute_delete_vertex(
2137                        node.vid,
2138                        detach,
2139                        Some(node.labels.clone()),
2140                        writer,
2141                        tx_l0,
2142                    )
2143                    .await?;
2144                }
2145            }
2146            _ => {
2147                // Try Path reconstruction from Map first (Arrow loses Path type)
2148                if let Ok(path) = Path::try_from(val) {
2149                    for edge in &path.edges {
2150                        let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2151                        writer
2152                            .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2153                            .await?;
2154                    }
2155                    for node in &path.nodes {
2156                        self.execute_delete_vertex(
2157                            node.vid,
2158                            detach,
2159                            Some(node.labels.clone()),
2160                            writer,
2161                            tx_l0,
2162                        )
2163                        .await?;
2164                    }
2165                } else if let Ok(vid) = Self::vid_from_value(val) {
2166                    let labels = Self::extract_labels_from_node(val);
2167                    self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
2168                        .await?;
2169                } else if let Value::Map(map) = val {
2170                    self.execute_delete_edge_from_map(map, writer, tx_l0)
2171                        .await?;
2172                } else if let Value::Edge(edge) = val {
2173                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2174                    writer
2175                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2176                        .await?;
2177                }
2178            }
2179        }
2180        Ok(())
2181    }
2182
2183    /// Execute vertex deletion with optional detach.
2184    pub(crate) async fn execute_delete_vertex(
2185        &self,
2186        vid: Vid,
2187        detach: bool,
2188        labels: Option<Vec<String>>,
2189        writer: &mut Writer,
2190        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2191    ) -> Result<()> {
2192        if detach {
2193            self.detach_delete_vertex(vid, writer, tx_l0).await?;
2194        } else {
2195            self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
2196        }
2197        writer.delete_vertex(vid, labels, tx_l0).await?;
2198        Ok(())
2199    }
2200
2201    /// Check that a vertex has no edges (required for non-DETACH DELETE).
2202    ///
2203    /// Loads the subgraph from storage, then excludes edges that have been
2204    /// tombstoned in the writer's L0 or the transaction's L0. This ensures
2205    /// edges deleted earlier in the same DELETE clause are properly excluded.
2206    pub(crate) async fn check_vertex_has_no_edges(
2207        &self,
2208        vid: Vid,
2209        writer: &Writer,
2210        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2211    ) -> Result<()> {
2212        let schema = self.storage.schema_manager().schema();
2213        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
2214
2215        // Collect tombstoned edge IDs from both the writer L0 and tx L0.
2216        let mut tombstoned_eids = std::collections::HashSet::new();
2217        {
2218            let writer_l0 = writer.l0_manager.get_current();
2219            let guard = writer_l0.read();
2220            for &eid in guard.tombstones.keys() {
2221                tombstoned_eids.insert(eid);
2222            }
2223        }
2224        if let Some(tx) = tx_l0 {
2225            let guard = tx.read();
2226            for &eid in guard.tombstones.keys() {
2227                tombstoned_eids.insert(eid);
2228            }
2229        }
2230
2231        let out_graph = self
2232            .storage
2233            .load_subgraph_cached(
2234                &[vid],
2235                &edge_type_ids,
2236                1,
2237                uni_store::runtime::Direction::Outgoing,
2238                Some(writer.l0_manager.get_current()),
2239            )
2240            .await?;
2241        let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2242
2243        let in_graph = self
2244            .storage
2245            .load_subgraph_cached(
2246                &[vid],
2247                &edge_type_ids,
2248                1,
2249                uni_store::runtime::Direction::Incoming,
2250                Some(writer.l0_manager.get_current()),
2251            )
2252            .await?;
2253        let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2254
2255        if has_out || has_in {
2256            return Err(anyhow!(
2257                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
2258                vid
2259            ));
2260        }
2261        Ok(())
2262    }
2263
2264    /// Execute edge deletion from a map representation.
2265    pub(crate) async fn execute_delete_edge_from_map(
2266        &self,
2267        map: &HashMap<String, Value>,
2268        writer: &mut Writer,
2269        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2270    ) -> Result<()> {
2271        // Check for non-null _eid to skip OPTIONAL MATCH null edges
2272        if map.get("_eid").is_some_and(|v| !v.is_null()) {
2273            let ei = self.extract_edge_identity(map)?;
2274            writer
2275                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
2276                .await?;
2277        }
2278        Ok(())
2279    }
2280
2281    /// Build a scan plan node.
2282    ///
2283    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
2284    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
2285    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
2286    fn make_scan_plan(
2287        label_id: u16,
2288        labels: Vec<String>,
2289        variable: String,
2290        filter: Option<Expr>,
2291    ) -> LogicalPlan {
2292        if label_id > 0 {
2293            LogicalPlan::Scan {
2294                label_id,
2295                labels,
2296                variable,
2297                filter,
2298                optional: false,
2299            }
2300        } else if !labels.is_empty() {
2301            // Schemaless label: use ScanMainByLabels to filter by label name
2302            LogicalPlan::ScanMainByLabels {
2303                labels,
2304                variable,
2305                filter,
2306                optional: false,
2307            }
2308        } else {
2309            LogicalPlan::ScanAll {
2310                variable,
2311                filter,
2312                optional: false,
2313            }
2314        }
2315    }
2316
2317    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
2318    /// already contains prior operators.
2319    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
2320        if matches!(plan, LogicalPlan::Empty) {
2321            scan
2322        } else {
2323            LogicalPlan::CrossJoin {
2324                left: Box::new(plan),
2325                right: Box::new(scan),
2326            }
2327        }
2328    }
2329
2330    /// Resolve MERGE property map expressions against the current row context.
2331    ///
2332    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
2333    /// property expressions that reference bound variables. These need to be
2334    /// evaluated to concrete literal values before being converted to filter
2335    /// expressions by `properties_to_expr()`.
2336    async fn resolve_merge_properties(
2337        &self,
2338        properties: &Option<Expr>,
2339        row: &HashMap<String, Value>,
2340        prop_manager: &PropertyManager,
2341        params: &HashMap<String, Value>,
2342        ctx: Option<&QueryContext>,
2343    ) -> Result<Option<Expr>> {
2344        let entries = match properties {
2345            Some(Expr::Map(entries)) => entries,
2346            other => return Ok(other.clone()),
2347        };
2348        let mut resolved = Vec::new();
2349        for (key, val_expr) in entries {
2350            if matches!(val_expr, Expr::Literal(_)) {
2351                resolved.push((key.clone(), val_expr.clone()));
2352            } else {
2353                let value = self
2354                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
2355                    .await?;
2356                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
2357            }
2358        }
2359        Ok(Some(Expr::Map(resolved)))
2360    }
2361
2362    /// Convert a runtime Value back to an AST literal expression.
2363    fn value_to_literal_expr(value: &Value) -> Expr {
2364        match value {
2365            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
2366            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
2367            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
2368            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
2369            Value::Null => Expr::Literal(CypherLiteral::Null),
2370            Value::List(items) => {
2371                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
2372            }
2373            Value::Map(entries) => Expr::Map(
2374                entries
2375                    .iter()
2376                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
2377                    .collect(),
2378            ),
2379            _ => Expr::Literal(CypherLiteral::Null),
2380        }
2381    }
2382
2383    pub(crate) async fn execute_merge_match(
2384        &self,
2385        pattern: &Pattern,
2386        row: &HashMap<String, Value>,
2387        prop_manager: &PropertyManager,
2388        params: &HashMap<String, Value>,
2389        ctx: Option<&QueryContext>,
2390    ) -> Result<Vec<HashMap<String, Value>>> {
2391        // Construct a LogicalPlan for the MATCH part of MERGE
2392        let planner =
2393            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
2394
2395        // We need to construct a CypherQuery to use the planner's plan() method,
2396        // or we can manually construct the LogicalPlan.
2397        // Manual construction is safer as we don't have to round-trip through AST.
2398
2399        let mut plan = LogicalPlan::Empty;
2400        let mut vars_in_scope = Vec::new();
2401
2402        // Add existing bound variables from row to scope
2403        for key in row.keys() {
2404            vars_in_scope.push(key.clone());
2405        }
2406
2407        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
2408        for path in &pattern.paths {
2409            let elements = &path.elements;
2410            let mut i = 0;
2411            while i < elements.len() {
2412                let part = &elements[i];
2413                match part {
2414                    PatternElement::Node(n) => {
2415                        let variable = n.variable.clone().unwrap_or_default();
2416
2417                        // If variable is already bound in the input row, we filter
2418                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
2419
2420                        if is_bound {
2421                            // If bound, we must Scan this specific VID to start the chain
2422                            // Extract VID from row
2423                            let val = row.get(&variable).unwrap();
2424                            let vid = Self::vid_from_value(val)?;
2425
2426                            // In the new storage model, VIDs don't embed label info.
2427                            // We get label from the node value if available, otherwise use 0 to scan all.
2428                            let extracted_labels =
2429                                Self::extract_labels_from_node(val).unwrap_or_default();
2430                            let label_id = {
2431                                let schema = self.storage.schema_manager().schema();
2432                                extracted_labels
2433                                    .first()
2434                                    .and_then(|l| schema.label_id_by_name(l))
2435                                    .unwrap_or(0)
2436                            };
2437
2438                            let resolved_props = self
2439                                .resolve_merge_properties(
2440                                    &n.properties,
2441                                    row,
2442                                    prop_manager,
2443                                    params,
2444                                    ctx,
2445                                )
2446                                .await?;
2447                            let prop_filter =
2448                                planner.properties_to_expr(&variable, &resolved_props);
2449
2450                            // Create a filter expression for VID: variable._vid = vid
2451                            // But our expression engine handles `Expr::Variable` as column.
2452                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
2453                            // Or we use internal property `_vid`.
2454
2455                            // Note: Scan supports `filter`.
2456                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
2457
2458                            let vid_filter = Expr::BinaryOp {
2459                                left: Box::new(Expr::Property(
2460                                    Box::new(Expr::Variable(variable.clone())),
2461                                    "_vid".to_string(),
2462                                )),
2463                                op: BinaryOp::Eq,
2464                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
2465                                    vid.as_u64() as i64,
2466                                ))),
2467                            };
2468
2469                            let combined_filter = if let Some(pf) = prop_filter {
2470                                Some(Expr::BinaryOp {
2471                                    left: Box::new(vid_filter),
2472                                    op: BinaryOp::And,
2473                                    right: Box::new(pf),
2474                                })
2475                            } else {
2476                                Some(vid_filter)
2477                            };
2478
2479                            let scan = Self::make_scan_plan(
2480                                label_id,
2481                                extracted_labels,
2482                                variable.clone(),
2483                                combined_filter,
2484                            );
2485                            plan = Self::attach_scan(plan, scan);
2486                        } else {
2487                            let label_id = if n.labels.is_empty() {
2488                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
2489                                0
2490                            } else {
2491                                let label_name = &n.labels[0];
2492                                let schema = self.storage.schema_manager().schema();
2493                                // Fall back to label_id 0 (any/schemaless) when the label is not
2494                                // in the schema — this allows MERGE to work in schemaless mode.
2495                                schema
2496                                    .get_label_case_insensitive(label_name)
2497                                    .map(|m| m.id)
2498                                    .unwrap_or(0)
2499                            };
2500
2501                            let resolved_props = self
2502                                .resolve_merge_properties(
2503                                    &n.properties,
2504                                    row,
2505                                    prop_manager,
2506                                    params,
2507                                    ctx,
2508                                )
2509                                .await?;
2510                            let prop_filter =
2511                                planner.properties_to_expr(&variable, &resolved_props);
2512                            let scan = Self::make_scan_plan(
2513                                label_id,
2514                                n.labels.clone(),
2515                                variable.clone(),
2516                                prop_filter,
2517                            );
2518                            plan = Self::attach_scan(plan, scan);
2519
2520                            // Add label filters when:
2521                            // 1. Multiple labels with a known schema label: filter for
2522                            //    additional labels (Scan only scans by the first label).
2523                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
2524                            //    nodes, so we must filter to only those with the
2525                            //    specified label(s).
2526                            if !n.labels.is_empty()
2527                                && !variable.is_empty()
2528                                && (label_id == 0 || n.labels.len() > 1)
2529                                && let Some(label_filter) =
2530                                    planner.node_filter_expr(&variable, &n.labels, &None)
2531                            {
2532                                plan = LogicalPlan::Filter {
2533                                    input: Box::new(plan),
2534                                    predicate: label_filter,
2535                                    optional_variables: std::collections::HashSet::new(),
2536                                };
2537                            }
2538
2539                            if !variable.is_empty() {
2540                                vars_in_scope.push(variable.clone());
2541                            }
2542                        }
2543
2544                        // Now look ahead for relationship
2545                        i += 1;
2546                        while i < elements.len() {
2547                            if let PatternElement::Relationship(r) = &elements[i] {
2548                                let target_node_part = &elements[i + 1];
2549                                if let PatternElement::Node(n_target) = target_node_part {
2550                                    let schema = self.storage.schema_manager().schema();
2551                                    let mut edge_type_ids = Vec::new();
2552
2553                                    if r.types.is_empty() {
2554                                        return Err(anyhow!("MERGE edge must have a type"));
2555                                    } else if r.types.len() > 1 {
2556                                        return Err(anyhow!(
2557                                            "MERGE does not support multiple edge types"
2558                                        ));
2559                                    } else {
2560                                        let type_name = &r.types[0];
2561                                        // Use get_or_assign so schemaless edge types work without
2562                                        // a prior schema declaration (same approach as CREATE).
2563                                        let type_id = self
2564                                            .storage
2565                                            .schema_manager()
2566                                            .get_or_assign_edge_type_id(type_name);
2567                                        edge_type_ids.push(type_id);
2568                                    }
2569
2570                                    // Resolve target label ID. For schemaless labels (not in the
2571                                    // schema), fall back to 0 which means "any label" in traversal.
2572                                    let target_label_id: u16 = if let Some(lbl) =
2573                                        n_target.labels.first()
2574                                    {
2575                                        schema
2576                                            .get_label_case_insensitive(lbl)
2577                                            .map(|m| m.id)
2578                                            .unwrap_or(0)
2579                                    } else if let Some(var) = &n_target.variable {
2580                                        if let Some(val) = row.get(var) {
2581                                            // In the new storage model, get labels from node value
2582                                            if let Some(labels) =
2583                                                Self::extract_labels_from_node(val)
2584                                            {
2585                                                if let Some(first_label) = labels.first() {
2586                                                    schema
2587                                                        .get_label_case_insensitive(first_label)
2588                                                        .map(|m| m.id)
2589                                                        .unwrap_or(0)
2590                                                } else {
2591                                                    // Bound node with no labels — schemaless, any
2592                                                    0
2593                                                }
2594                                            } else if Self::vid_from_value(val).is_ok() {
2595                                                // VID without label info — schemaless, any
2596                                                0
2597                                            } else {
2598                                                return Err(anyhow!(
2599                                                    "Variable {} is not a node",
2600                                                    var
2601                                                ));
2602                                            }
2603                                        } else {
2604                                            return Err(anyhow!(
2605                                                "MERGE pattern node must have a label or be a bound variable"
2606                                            ));
2607                                        }
2608                                    } else {
2609                                        return Err(anyhow!(
2610                                            "MERGE pattern node must have a label"
2611                                        ));
2612                                    };
2613
2614                                    let target_variable =
2615                                        n_target.variable.clone().unwrap_or_default();
2616                                    let source_variable = match &elements[i - 1] {
2617                                        PatternElement::Node(n) => {
2618                                            n.variable.clone().unwrap_or_default()
2619                                        }
2620                                        _ => String::new(),
2621                                    };
2622
2623                                    let is_variable_length = r.range.is_some();
2624                                    let type_name = &r.types[0];
2625
2626                                    // Use TraverseMainByType for schemaless edge types
2627                                    // (same as MATCH planner) so edge properties are loaded
2628                                    // correctly from storage + L0 via the adjacency map.
2629                                    // Regular Traverse only loads properties via
2630                                    // property_manager which doesn't handle schemaless types.
2631                                    let is_schemaless = edge_type_ids.iter().all(|id| {
2632                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
2633                                    });
2634
2635                                    if is_schemaless {
2636                                        plan = LogicalPlan::TraverseMainByType {
2637                                            type_names: vec![type_name.clone()],
2638                                            input: Box::new(plan),
2639                                            direction: r.direction.clone(),
2640                                            source_variable,
2641                                            target_variable: target_variable.clone(),
2642                                            step_variable: r.variable.clone(),
2643                                            min_hops: r
2644                                                .range
2645                                                .as_ref()
2646                                                .and_then(|r| r.min)
2647                                                .unwrap_or(1)
2648                                                as usize,
2649                                            max_hops: r
2650                                                .range
2651                                                .as_ref()
2652                                                .and_then(|r| r.max)
2653                                                .unwrap_or(1)
2654                                                as usize,
2655                                            optional: false,
2656                                            target_filter: None,
2657                                            path_variable: None,
2658                                            is_variable_length,
2659                                            optional_pattern_vars: std::collections::HashSet::new(),
2660                                            scope_match_variables: std::collections::HashSet::new(),
2661                                            edge_filter_expr: None,
2662                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2663                                        };
2664                                    } else {
2665                                        // Collect edge property names needed for MERGE filter
2666                                        let mut edge_props = std::collections::HashSet::new();
2667                                        if let Some(Expr::Map(entries)) = &r.properties {
2668                                            for (key, _) in entries {
2669                                                edge_props.insert(key.clone());
2670                                            }
2671                                        }
2672                                        plan = LogicalPlan::Traverse {
2673                                            input: Box::new(plan),
2674                                            edge_type_ids: edge_type_ids.clone(),
2675                                            direction: r.direction.clone(),
2676                                            source_variable,
2677                                            target_variable: target_variable.clone(),
2678                                            target_label_id,
2679                                            step_variable: r.variable.clone(),
2680                                            min_hops: r
2681                                                .range
2682                                                .as_ref()
2683                                                .and_then(|r| r.min)
2684                                                .unwrap_or(1)
2685                                                as usize,
2686                                            max_hops: r
2687                                                .range
2688                                                .as_ref()
2689                                                .and_then(|r| r.max)
2690                                                .unwrap_or(1)
2691                                                as usize,
2692                                            optional: false,
2693                                            target_filter: None,
2694                                            path_variable: None,
2695                                            edge_properties: edge_props,
2696                                            is_variable_length,
2697                                            optional_pattern_vars: std::collections::HashSet::new(),
2698                                            scope_match_variables: std::collections::HashSet::new(),
2699                                            edge_filter_expr: None,
2700                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2701                                            qpp_steps: None,
2702                                        };
2703                                    }
2704
2705                                    // Apply property filters for relationship
2706                                    if r.properties.is_some()
2707                                        && let Some(r_var) = &r.variable
2708                                    {
2709                                        let resolved_rel_props = self
2710                                            .resolve_merge_properties(
2711                                                &r.properties,
2712                                                row,
2713                                                prop_manager,
2714                                                params,
2715                                                ctx,
2716                                            )
2717                                            .await?;
2718                                        if let Some(prop_filter) =
2719                                            planner.properties_to_expr(r_var, &resolved_rel_props)
2720                                        {
2721                                            plan = LogicalPlan::Filter {
2722                                                input: Box::new(plan),
2723                                                predicate: prop_filter,
2724                                                optional_variables: std::collections::HashSet::new(
2725                                                ),
2726                                            };
2727                                        }
2728                                    }
2729
2730                                    // Apply property filters for target node if it was new
2731                                    if !target_variable.is_empty() {
2732                                        let resolved_target_props = self
2733                                            .resolve_merge_properties(
2734                                                &n_target.properties,
2735                                                row,
2736                                                prop_manager,
2737                                                params,
2738                                                ctx,
2739                                            )
2740                                            .await?;
2741                                        if let Some(prop_filter) = planner.properties_to_expr(
2742                                            &target_variable,
2743                                            &resolved_target_props,
2744                                        ) {
2745                                            plan = LogicalPlan::Filter {
2746                                                input: Box::new(plan),
2747                                                predicate: prop_filter,
2748                                                optional_variables: std::collections::HashSet::new(
2749                                                ),
2750                                            };
2751                                        }
2752                                        vars_in_scope.push(target_variable.clone());
2753                                    }
2754
2755                                    if let Some(sv) = &r.variable {
2756                                        vars_in_scope.push(sv.clone());
2757                                    }
2758                                    i += 2;
2759                                } else {
2760                                    break;
2761                                }
2762                            } else {
2763                                break;
2764                            }
2765                        }
2766                    }
2767                    _ => return Err(anyhow!("Pattern must start with a node")),
2768                }
2769            }
2770
2771            // Execute the plan to find all matches, then filter against bound variables in `row`.
2772        }
2773
2774        let db_matches = self
2775            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
2776            .await?;
2777
2778        // Keep only DB results that are consistent with the input row bindings.
2779        // Skip internal keys (starting with "__") as they are implementation
2780        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
2781        // Also skip the empty-string key (""), which is the placeholder variable
2782        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
2783        // and must not constrain the current pattern's match.
2784        let final_matches = db_matches
2785            .into_iter()
2786            .filter(|db_match| {
2787                row.iter().all(|(key, val)| {
2788                    if key.is_empty() || key.starts_with("__") {
2789                        return true;
2790                    }
2791                    let Some(db_val) = db_match.get(key) else {
2792                        return true;
2793                    };
2794                    if db_val == val {
2795                        return true;
2796                    }
2797                    // Values differ -- treat as consistent if they represent the same VID
2798                    matches!(
2799                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
2800                        (Ok(v1), Ok(v2)) if v1 == v2
2801                    )
2802                })
2803            })
2804            .map(|db_match| {
2805                let mut merged = row.clone();
2806                merged.extend(db_match);
2807                merged
2808            })
2809            .collect();
2810
2811        Ok(final_matches)
2812    }
2813
2814    /// Prepare a MERGE pattern for path variable binding.
2815    ///
2816    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
2817    /// unnamed relationships need internal variable names so that `execute_create_pattern`
2818    /// stores the edge data in the row for later path construction.
2819    ///
2820    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
2821    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
2822        let has_path_vars = pattern
2823            .paths
2824            .iter()
2825            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
2826
2827        if !has_path_vars {
2828            return (pattern.clone(), Vec::new());
2829        }
2830
2831        let mut modified = pattern.clone();
2832        let mut temp_vars = Vec::new();
2833
2834        for path in &mut modified.paths {
2835            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
2836                continue;
2837            }
2838            for (idx, element) in path.elements.iter_mut().enumerate() {
2839                if let PatternElement::Relationship(r) = element
2840                    && r.variable.as_ref().is_none_or(String::is_empty)
2841                {
2842                    let temp_var = format!("__path_r_{}", idx);
2843                    r.variable = Some(temp_var.clone());
2844                    temp_vars.push(temp_var);
2845                }
2846            }
2847        }
2848
2849        (modified, temp_vars)
2850    }
2851
2852    /// Bind path variables in the result row based on the MERGE pattern.
2853    ///
2854    /// Walks each path in the pattern, collects node/edge values from the row
2855    /// by variable name, and constructs a `Value::Path`.
2856    fn bind_path_variables(
2857        pattern: &Pattern,
2858        row: &mut HashMap<String, Value>,
2859        temp_vars: &[String],
2860    ) {
2861        for path in &pattern.paths {
2862            let Some(path_var) = path.variable.as_ref() else {
2863                continue;
2864            };
2865            if path_var.is_empty() {
2866                continue;
2867            }
2868
2869            let mut nodes = Vec::new();
2870            let mut edges = Vec::new();
2871
2872            for element in &path.elements {
2873                match element {
2874                    PatternElement::Node(n) => {
2875                        if let Some(var) = &n.variable
2876                            && let Some(val) = row.get(var)
2877                            && let Some(node) = Self::value_to_node_for_path(val)
2878                        {
2879                            nodes.push(node);
2880                        }
2881                    }
2882                    PatternElement::Relationship(r) => {
2883                        if let Some(var) = &r.variable
2884                            && let Some(val) = row.get(var)
2885                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
2886                        {
2887                            edges.push(edge);
2888                        }
2889                    }
2890                    _ => {}
2891                }
2892            }
2893
2894            if !nodes.is_empty() {
2895                use uni_common::value::Path;
2896                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
2897            }
2898        }
2899
2900        // Clean up internal temp variables
2901        for var in temp_vars {
2902            row.remove(var);
2903        }
2904    }
2905
2906    /// Convert a Value (Map or Node) to a Node for path construction.
2907    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
2908        match val {
2909            Value::Node(n) => Some(n.clone()),
2910            Value::Map(map) => {
2911                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
2912                let labels = if let Some(Value::List(l)) = map.get("_labels") {
2913                    l.iter()
2914                        .filter_map(|v| {
2915                            if let Value::String(s) = v {
2916                                Some(s.clone())
2917                            } else {
2918                                None
2919                            }
2920                        })
2921                        .collect()
2922                } else {
2923                    vec![]
2924                };
2925                let properties: HashMap<String, Value> = map
2926                    .iter()
2927                    .filter(|(k, _)| !k.starts_with('_'))
2928                    .map(|(k, v)| (k.clone(), v.clone()))
2929                    .collect();
2930                Some(uni_common::value::Node {
2931                    vid,
2932                    labels,
2933                    properties,
2934                })
2935            }
2936            _ => None,
2937        }
2938    }
2939
2940    /// Convert a Value (Map or Edge) to an Edge for path construction.
2941    fn value_to_edge_for_path(
2942        val: &Value,
2943        type_names: &[String],
2944    ) -> Option<uni_common::value::Edge> {
2945        match val {
2946            Value::Edge(e) => Some(e.clone()),
2947            Value::Map(map) => {
2948                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
2949                let edge_type = map
2950                    .get("_type_name")
2951                    .and_then(|v| {
2952                        if let Value::String(s) = v {
2953                            Some(s.clone())
2954                        } else {
2955                            None
2956                        }
2957                    })
2958                    .or_else(|| type_names.first().cloned())
2959                    .unwrap_or_default();
2960                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
2961                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
2962                let properties: HashMap<String, Value> = map
2963                    .iter()
2964                    .filter(|(k, _)| !k.starts_with('_'))
2965                    .map(|(k, v)| (k.clone(), v.clone()))
2966                    .collect();
2967                Some(uni_common::value::Edge {
2968                    eid,
2969                    edge_type,
2970                    src,
2971                    dst,
2972                    properties,
2973                })
2974            }
2975            _ => None,
2976        }
2977    }
2978}