Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::planner::LogicalPlan;
6use anyhow::{Result, anyhow};
7use lancedb::query::{ExecutableQuery, QueryBase};
8use std::collections::HashMap;
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17    DropEdgeType, DropLabel, Expr, Pattern, PatternElement, RemoveItem, SetClause, SetItem,
18};
19use uni_store::QueryContext;
20use uni_store::runtime::property_manager::PropertyManager;
21use uni_store::runtime::writer::Writer;
22
23/// Identity fields extracted from a map-encoded edge.
24struct EdgeIdentity {
25    eid: Eid,
26    src: Vid,
27    dst: Vid,
28    edge_type_id: u32,
29}
30
31impl Executor {
32    /// Extracts labels from a node value.
33    ///
34    /// Handles both `Value::Map` (with a `_labels` list field) and
35    /// `Value::Node` (with a `labels` vec field).
36    ///
37    /// Returns `None` when the value is not a node or has no labels.
38    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
39        match node_val {
40            Value::Map(map) => {
41                // Map-encoded node: look for _labels array
42                if let Some(Value::List(labels_arr)) = map.get("_labels") {
43                    let labels: Vec<String> = labels_arr
44                        .iter()
45                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
46                        .collect();
47                    if !labels.is_empty() {
48                        return Some(labels);
49                    }
50                }
51                None
52            }
53            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
54            _ => None,
55        }
56    }
57
58    /// Extracts user-visible properties from a value that represents a node or edge.
59    ///
60    /// Strips internal bookkeeping keys (those prefixed with `_` or named
61    /// `ext_id`) from map-encoded entities and returns only the user-facing
62    /// property key-value pairs.
63    ///
64    /// Returns `None` when `val` is not a map, node, or edge.
65    pub(crate) fn extract_user_properties_from_value(
66        val: &Value,
67    ) -> Option<HashMap<String, Value>> {
68        match val {
69            Value::Map(map) => {
70                // Distinguish entity-encoded maps from plain map literals.
71                // A node map has both `_vid` and `_labels`.
72                // An edge map has `_eid`, `_src`, and `_dst`.
73                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
74                let is_edge_map = map.contains_key("_eid")
75                    && map.contains_key("_src")
76                    && map.contains_key("_dst");
77
78                if is_node_map || is_edge_map {
79                    // Filter out internal bookkeeping keys
80                    let user_props: HashMap<String, Value> = map
81                        .iter()
82                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
83                        .map(|(k, v)| (k.clone(), v.clone()))
84                        .collect();
85                    // When mutation output omits dotted property columns, user
86                    // properties live inside `_all_props` rather than at the
87                    // top level of the entity map.
88                    if user_props.is_empty()
89                        && let Some(Value::Map(all_props)) = map.get("_all_props")
90                    {
91                        return Some(all_props.clone());
92                    }
93                    Some(user_props)
94                } else {
95                    // Plain map literal — return as-is
96                    Some(map.clone())
97                }
98            }
99            Value::Node(node) => Some(node.properties.clone()),
100            Value::Edge(edge) => Some(edge.properties.clone()),
101            _ => None,
102        }
103    }
104
105    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
106    ///
107    /// When `replace` is `true` the entity's property set is replaced: keys absent
108    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
109    /// layer removes them.  When `replace` is `false` the map is merged: keys in
110    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
111    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
112    /// modes.
113    ///
114    /// Labels are never altered — the spec states that `SET n = map` replaces
115    /// properties only.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the entity cannot be found in the storage layer, or
120    /// if the writer fails to persist the updated properties.
121    #[expect(clippy::too_many_arguments)]
122    async fn apply_properties_to_entity(
123        &self,
124        variable: &str,
125        new_props: HashMap<String, Value>,
126        replace: bool,
127        row: &mut HashMap<String, Value>,
128        writer: &mut Writer,
129        prop_manager: &PropertyManager,
130        params: &HashMap<String, Value>,
131        ctx: Option<&QueryContext>,
132    ) -> Result<()> {
133        // Clone the target so we can hold &row references elsewhere.
134        let target = row.get(variable).cloned();
135
136        match target {
137            Some(Value::Node(ref node)) => {
138                let vid = node.vid;
139                let labels = node.labels.clone();
140                let current = prop_manager
141                    .get_all_vertex_props_with_ctx(vid, ctx)
142                    .await?
143                    .unwrap_or_default();
144                let write_props = Self::merge_props(current, new_props, replace);
145                let mut enriched = write_props.clone();
146                for label_name in &labels {
147                    self.enrich_properties_with_generated_columns(
148                        label_name,
149                        &mut enriched,
150                        prop_manager,
151                        params,
152                        ctx,
153                    )
154                    .await?;
155                }
156                let _ = writer
157                    .insert_vertex_with_labels(vid, enriched.clone(), &labels)
158                    .await?;
159                // Update the in-memory row binding
160                if let Some(Value::Node(n)) = row.get_mut(variable) {
161                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
162                }
163            }
164            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
165                let vid = Self::vid_from_value(node_val)?;
166                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
167                let current = prop_manager
168                    .get_all_vertex_props_with_ctx(vid, ctx)
169                    .await?
170                    .unwrap_or_default();
171                let write_props = Self::merge_props(current, new_props, replace);
172                let mut enriched = write_props.clone();
173                for label_name in &labels {
174                    self.enrich_properties_with_generated_columns(
175                        label_name,
176                        &mut enriched,
177                        prop_manager,
178                        params,
179                        ctx,
180                    )
181                    .await?;
182                }
183                let _ = writer
184                    .insert_vertex_with_labels(vid, enriched.clone(), &labels)
185                    .await?;
186                // Update the in-memory map-encoded node binding
187                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
188                    // Remove old user property keys, keep internal fields
189                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
190                    // Build effective (non-null) properties
191                    let effective: HashMap<String, Value> =
192                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
193                    for (k, v) in &effective {
194                        node_map.insert(k.clone(), v.clone());
195                    }
196                    // Replace _all_props to reflect the complete property set
197                    node_map.insert("_all_props".to_string(), Value::Map(effective));
198                }
199            }
200            Some(Value::Edge(ref edge)) => {
201                let eid = edge.eid;
202                let src = edge.src;
203                let dst = edge.dst;
204                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
205                let current = prop_manager
206                    .get_all_edge_props_with_ctx(eid, ctx)
207                    .await?
208                    .unwrap_or_default();
209                let write_props = Self::merge_props(current, new_props, replace);
210                writer
211                    .insert_edge(
212                        src,
213                        dst,
214                        etype,
215                        eid,
216                        write_props.clone(),
217                        Some(edge.edge_type.clone()),
218                    )
219                    .await?;
220                // Update the in-memory row binding
221                if let Some(Value::Edge(e)) = row.get_mut(variable) {
222                    e.properties = write_props
223                        .into_iter()
224                        .filter(|(_, v)| !v.is_null())
225                        .collect();
226                }
227            }
228            Some(Value::Map(ref map))
229                if map.contains_key("_eid")
230                    && map.contains_key("_src")
231                    && map.contains_key("_dst") =>
232            {
233                let ei = self.extract_edge_identity(map)?;
234                let current = prop_manager
235                    .get_all_edge_props_with_ctx(ei.eid, ctx)
236                    .await?
237                    .unwrap_or_default();
238                let write_props = Self::merge_props(current, new_props, replace);
239                let edge_type_name = map
240                    .get("_type")
241                    .and_then(|v| v.as_str())
242                    .map(|s| s.to_string())
243                    .or_else(|| {
244                        self.storage
245                            .schema_manager()
246                            .edge_type_name_by_id_unified(ei.edge_type_id)
247                    });
248                writer
249                    .insert_edge(
250                        ei.src,
251                        ei.dst,
252                        ei.edge_type_id,
253                        ei.eid,
254                        write_props.clone(),
255                        edge_type_name,
256                    )
257                    .await?;
258                // Update the in-memory map-encoded edge binding
259                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
260                    edge_map.retain(|k, _| k.starts_with('_'));
261                    let effective: HashMap<String, Value> = write_props
262                        .into_iter()
263                        .filter(|(_, v)| !v.is_null())
264                        .collect();
265                    for (k, v) in &effective {
266                        edge_map.insert(k.clone(), v.clone());
267                    }
268                    // Replace _all_props to reflect the complete property set
269                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
270                }
271            }
272            _ => {
273                // No matching entity — nothing to do (caller already guarded against Null)
274            }
275        }
276        Ok(())
277    }
278
279    /// Computes the property map to write given current storage state and the
280    /// incoming change map.
281    ///
282    /// When `replace` is `true`, keys present in `current` but absent from
283    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
284    /// `incoming` are always preserved as explicit tombstones.
285    ///
286    /// When `replace` is `false`, `current` is the base and `incoming` is
287    /// merged on top: each key in `incoming` overwrites or tombstones the
288    /// corresponding entry in `current`.
289    fn merge_props(
290        current: HashMap<String, Value>,
291        incoming: HashMap<String, Value>,
292        replace: bool,
293    ) -> HashMap<String, Value> {
294        if replace {
295            // Start from the non-null incoming entries only.
296            let mut result: HashMap<String, Value> = incoming
297                .iter()
298                .filter(|(_, v)| !v.is_null())
299                .map(|(k, v)| (k.clone(), v.clone()))
300                .collect();
301            // Tombstone every current key that is absent from incoming OR explicitly
302            // set to null in incoming (both mean "delete this property").
303            for k in current.keys() {
304                if incoming.get(k).is_none_or(|v| v.is_null()) {
305                    result.insert(k.clone(), Value::Null);
306                }
307            }
308            result
309        } else {
310            // Merge: start from current and apply incoming on top
311            let mut result = current;
312            result.extend(incoming);
313            result
314        }
315    }
316
317    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
318    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
319        let eid = Eid::from(
320            map.get("_eid")
321                .and_then(|v| v.as_u64())
322                .ok_or_else(|| anyhow!("Invalid _eid"))?,
323        );
324        let src = Vid::from(
325            map.get("_src")
326                .and_then(|v| v.as_u64())
327                .ok_or_else(|| anyhow!("Invalid _src"))?,
328        );
329        let dst = Vid::from(
330            map.get("_dst")
331                .and_then(|v| v.as_u64())
332                .ok_or_else(|| anyhow!("Invalid _dst"))?,
333        );
334        let edge_type_id = self.resolve_edge_type_id(
335            map.get("_type")
336                .ok_or_else(|| anyhow!("Missing _type on edge map"))?,
337        )?;
338        Ok(EdgeIdentity {
339            eid,
340            src,
341            dst,
342            edge_type_id,
343        })
344    }
345
346    /// Resolve edge type ID from a Value, supporting both Int and String representations.
347    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
348    ///
349    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
350    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
351    /// where the edge type was just created and may not be in the read-only lookup yet.
352    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
353        match type_val {
354            Value::Int(i) => Ok(*i as u32),
355            Value::String(name) => {
356                // Use get_or_assign to support schemaless edge types
357                // (will create new ID if not found in schema or registry)
358                Ok(self
359                    .storage
360                    .schema_manager()
361                    .get_or_assign_edge_type_id(name))
362            }
363            _ => Err(anyhow!(
364                "Invalid _type value: expected Int or String, got {:?}",
365                type_val
366            )),
367        }
368    }
369
370    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
371        if let Some(writer_arc) = &self.writer {
372            // Flush first while holding the lock
373            {
374                let mut writer = writer_arc.write().await;
375                writer.flush_to_l1(None).await?;
376            } // Drop lock before compacting to avoid blocking reads/writes
377
378            // Compaction can run without holding the writer lock
379            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
380            let compaction_results = compactor.compact_all().await?;
381
382            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
383            let am = self.storage.adjacency_manager();
384            let schema = self.storage.schema_manager().schema();
385            for info in compaction_results {
386                // Convert string direction to Direction enum
387                let direction = match info.direction.as_str() {
388                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
389                    "bwd" => uni_store::storage::direction::Direction::Incoming,
390                    _ => continue,
391                };
392
393                // Get edge_type_id
394                if let Some(edge_type_id) =
395                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
396                {
397                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
398                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
399                }
400            }
401        }
402        Ok(())
403    }
404
405    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
406        if let Some(writer_arc) = &self.writer {
407            let mut writer = writer_arc.write().await;
408            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
409        }
410        Ok(())
411    }
412
413    pub(crate) async fn execute_copy_to(
414        &self,
415        identifier: &str,
416        path: &str,
417        format: &str,
418        options: &HashMap<String, Value>,
419    ) -> Result<usize> {
420        // Check schema to determine if identifier is an edge type or vertex label
421        let schema = self.storage.schema_manager().schema();
422
423        // Try as edge type first
424        if schema.get_edge_type_case_insensitive(identifier).is_some() {
425            return self
426                .export_edge_type_in_format(identifier, path, format)
427                .await;
428        }
429
430        // Try as vertex label
431        if schema.get_label_case_insensitive(identifier).is_some() {
432            return self
433                .export_vertex_label_in_format(identifier, path, format, options)
434                .await;
435        }
436
437        // Neither edge type nor vertex label found
438        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
439    }
440
441    async fn export_vertex_label_in_format(
442        &self,
443        label: &str,
444        path: &str,
445        format: &str,
446        _options: &HashMap<String, Value>,
447    ) -> Result<usize> {
448        match format {
449            "parquet" => self.export_vertex_label(label, path).await,
450            "csv" => {
451                // Get dataset and open table
452                let dataset = self.storage.vertex_dataset(label)?;
453                let table = dataset.open_lancedb(self.storage.lancedb_store()).await?;
454
455                // Query all data
456                let mut stream = table.query().execute().await?;
457
458                // Collect all batches
459                let mut all_rows = Vec::new();
460                let mut column_names = Vec::new();
461
462                // Iterate stream using StreamExt
463                use futures::StreamExt;
464                while let Some(batch_result) = stream.next().await {
465                    let batch = batch_result?;
466
467                    // Get column names from first batch
468                    if column_names.is_empty() {
469                        column_names = batch
470                            .schema()
471                            .fields()
472                            .iter()
473                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
474                            .map(|f| f.name().clone())
475                            .collect();
476                    }
477
478                    // Convert batch to rows
479                    for row_idx in 0..batch.num_rows() {
480                        let mut row = Vec::new();
481                        for field in batch.schema().fields() {
482                            if field.name().starts_with('_') || field.name() == "ext_id" {
483                                continue;
484                            }
485
486                            let col_idx = batch.schema().index_of(field.name())?;
487                            let column = batch.column(col_idx);
488                            let value = self.arrow_value_to_json(column, row_idx)?;
489
490                            // Convert value to CSV string
491                            let csv_value = match value {
492                                Value::Null => String::new(),
493                                Value::Bool(b) => b.to_string(),
494                                Value::Int(i) => i.to_string(),
495                                Value::Float(f) => f.to_string(),
496                                Value::String(s) => s,
497                                _ => format!("{value}"),
498                            };
499                            row.push(csv_value);
500                        }
501                        all_rows.push(row);
502                    }
503                }
504
505                // Write CSV
506                let file = std::fs::File::create(path)?;
507                let mut wtr = csv::Writer::from_writer(file);
508
509                // Write headers
510                log::debug!("CSV export headers: {:?}", column_names);
511                wtr.write_record(&column_names)?;
512
513                // Write rows
514                for (i, row) in all_rows.iter().enumerate() {
515                    log::debug!("CSV export row {}: {:?}", i, row);
516                    wtr.write_record(row)?;
517                }
518
519                wtr.flush()?;
520                Ok(all_rows.len())
521            }
522            _ => Err(anyhow!(
523                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
524                format
525            )),
526        }
527    }
528
529    async fn export_edge_type_in_format(
530        &self,
531        edge_type: &str,
532        path: &str,
533        format: &str,
534    ) -> Result<usize> {
535        match format {
536            "parquet" => self.export_edge_type(edge_type, path).await,
537            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
538            _ => Err(anyhow!(
539                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
540                format
541            )),
542        }
543    }
544
545    /// Write a stream of record batches to a Parquet file.
546    /// Returns the total number of rows written, or 0 if the stream is empty.
547    async fn write_batches_to_parquet(
548        mut stream: impl futures::Stream<Item = Result<arrow_array::RecordBatch, lancedb::Error>>
549        + Unpin,
550        path: &str,
551        entity_description: &str,
552    ) -> Result<usize> {
553        use futures::TryStreamExt;
554
555        // Get first batch to determine schema and create writer
556        let first_batch = match stream.try_next().await? {
557            Some(batch) => batch,
558            None => {
559                log::info!("No data to export from {}", entity_description);
560                return Ok(0);
561            }
562        };
563
564        // Create Parquet writer using schema from first batch
565        let file = std::fs::File::create(path)?;
566        let arrow_schema = first_batch.schema();
567        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
568
569        // Write first batch
570        let mut count = first_batch.num_rows();
571        writer.write(&first_batch)?;
572
573        // Write remaining batches
574        while let Some(batch) = stream.try_next().await? {
575            count += batch.num_rows();
576            writer.write(&batch)?;
577        }
578
579        writer.close()?;
580
581        log::info!(
582            "Exported {} rows from {} to '{}'",
583            count,
584            entity_description,
585            path
586        );
587        Ok(count)
588    }
589
590    /// Export vertices of a specific label to Parquet
591    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
592        let dataset = self.storage.vertex_dataset(label)?;
593        let lancedb_store = self.storage.lancedb_store();
594        let table = dataset.open_lancedb(lancedb_store).await?;
595        let query = table.query();
596        let stream = query.execute().await?;
597
598        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
599    }
600
601    /// Export edges of a specific type to Parquet
602    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
603        let schema = self.storage.schema_manager().schema();
604        if !schema.edge_types.contains_key(edge_type) {
605            return Err(anyhow!("Edge type '{}' not found", edge_type));
606        }
607
608        let lancedb_store = self.storage.lancedb_store();
609        let table = lancedb_store.open_main_edge_table().await?;
610        let query = table.query().only_if(format!("type = '{}'", edge_type));
611        let stream = query.execute().await?;
612
613        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
614    }
615
616    pub(crate) async fn execute_copy_from(
617        &self,
618        label: &str,
619        path: &str,
620        format: &str,
621        options: &HashMap<String, Value>,
622    ) -> Result<usize> {
623        // Read data from file
624        let batches = match format {
625            "parquet" => self.read_parquet_file(path)?,
626            "csv" => self.read_csv_file(path, label, options)?,
627            _ => {
628                return Err(anyhow!(
629                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
630                    format
631                ));
632            }
633        };
634
635        // Get writer
636        let writer_arc = self
637            .writer
638            .as_ref()
639            .ok_or_else(|| anyhow!("No writer available"))?;
640
641        let db_schema = self.storage.schema_manager().schema();
642
643        // Check if this is a label (vertex) or edge type
644        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
645
646        if is_edge {
647            // Import edges
648            let edge_type_id = db_schema
649                .edge_type_id_by_name(label)
650                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
651
652            // Get src and dst column names from options
653            let src_col = options
654                .get("src_col")
655                .and_then(|v| v.as_str())
656                .unwrap_or("src");
657            let dst_col = options
658                .get("dst_col")
659                .and_then(|v| v.as_str())
660                .unwrap_or("dst");
661
662            let mut total_rows = 0;
663            for batch in batches {
664                let num_rows = batch.num_rows();
665
666                for row_idx in 0..num_rows {
667                    let mut properties = HashMap::new();
668                    let mut src_vid: Option<Vid> = None;
669                    let mut dst_vid: Option<Vid> = None;
670
671                    // Extract properties and VIDs from each column
672                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
673                        let col_name = field.name();
674                        let column = batch.column(col_idx);
675                        let value = self.arrow_value_to_json(column, row_idx)?;
676
677                        if col_name == src_col {
678                            let raw = value.as_u64().unwrap_or_else(|| {
679                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
680                            });
681                            src_vid = Some(Vid::new(raw));
682                        } else if col_name == dst_col {
683                            let raw = value.as_u64().unwrap_or_else(|| {
684                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
685                            });
686                            dst_vid = Some(Vid::new(raw));
687                        } else if !col_name.starts_with('_') && !value.is_null() {
688                            properties.insert(col_name.clone(), value);
689                        }
690                    }
691
692                    let src = src_vid
693                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
694                    let dst = dst_vid
695                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
696
697                    // Generate EID and insert edge
698                    let mut writer = writer_arc.write().await;
699                    let eid = writer.next_eid(edge_type_id).await?;
700                    writer
701                        .insert_edge(
702                            src,
703                            dst,
704                            edge_type_id,
705                            eid,
706                            properties,
707                            Some(label.to_string()),
708                        )
709                        .await?;
710
711                    total_rows += 1;
712                }
713            }
714
715            log::info!(
716                "Imported {} edge rows from '{}' into edge type '{}'",
717                total_rows,
718                path,
719                label
720            );
721
722            // Flush to persist edges
723            if total_rows > 0 {
724                let mut writer = writer_arc.write().await;
725                writer.flush_to_l1(None).await?;
726            }
727
728            Ok(total_rows)
729        } else {
730            // Import vertices
731            // Validate the label exists in schema
732            db_schema
733                .label_id_by_name_case_insensitive(label)
734                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
735
736            let mut total_rows = 0;
737            for batch in batches {
738                let num_rows = batch.num_rows();
739
740                // Convert Arrow batch to rows
741                for row_idx in 0..num_rows {
742                    let mut properties = HashMap::new();
743
744                    // Extract properties from each column
745                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
746                        let col_name = field.name();
747
748                        // Skip internal columns
749                        if col_name.starts_with('_') {
750                            continue;
751                        }
752
753                        let column = batch.column(col_idx);
754                        let value = self.arrow_value_to_json(column, row_idx)?;
755
756                        if !value.is_null() {
757                            properties.insert(col_name.clone(), value);
758                        }
759                    }
760
761                    // Generate VID and insert
762                    let mut writer = writer_arc.write().await;
763                    let vid = writer.next_vid().await?;
764                    let _ = writer
765                        .insert_vertex_with_labels(vid, properties, &[label.to_string()])
766                        .await?;
767
768                    total_rows += 1;
769                }
770            }
771
772            log::info!(
773                "Imported {} rows from '{}' into label '{}'",
774                total_rows,
775                path,
776                label
777            );
778
779            // Flush to persist vertices
780            if total_rows > 0 {
781                let mut writer = writer_arc.write().await;
782                writer.flush_to_l1(None).await?;
783            }
784
785            Ok(total_rows)
786        }
787    }
788
789    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
790        use arrow_array::Array;
791        use arrow_schema::DataType as ArrowDataType;
792
793        if column.is_null(row_idx) {
794            return Ok(Value::Null);
795        }
796
797        match column.data_type() {
798            ArrowDataType::Utf8 => {
799                let array = column
800                    .as_any()
801                    .downcast_ref::<arrow_array::StringArray>()
802                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
803                Ok(Value::String(array.value(row_idx).to_string()))
804            }
805            ArrowDataType::Int32 => {
806                let array = column
807                    .as_any()
808                    .downcast_ref::<arrow_array::Int32Array>()
809                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
810                Ok(Value::Int(array.value(row_idx) as i64))
811            }
812            ArrowDataType::Int64 => {
813                let array = column
814                    .as_any()
815                    .downcast_ref::<arrow_array::Int64Array>()
816                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
817                Ok(Value::Int(array.value(row_idx)))
818            }
819            ArrowDataType::Float32 => {
820                let array = column
821                    .as_any()
822                    .downcast_ref::<arrow_array::Float32Array>()
823                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
824                Ok(Value::Float(array.value(row_idx) as f64))
825            }
826            ArrowDataType::Float64 => {
827                let array = column
828                    .as_any()
829                    .downcast_ref::<arrow_array::Float64Array>()
830                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
831                Ok(Value::Float(array.value(row_idx)))
832            }
833            ArrowDataType::Boolean => {
834                let array = column
835                    .as_any()
836                    .downcast_ref::<arrow_array::BooleanArray>()
837                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
838                Ok(Value::Bool(array.value(row_idx)))
839            }
840            ArrowDataType::UInt64 => {
841                let array = column
842                    .as_any()
843                    .downcast_ref::<arrow_array::UInt64Array>()
844                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
845                Ok(Value::Int(array.value(row_idx) as i64))
846            }
847            _ => {
848                // For other types, try to convert to string
849                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
850                if let Some(arr) = array {
851                    Ok(Value::String(arr.value(row_idx).to_string()))
852                } else {
853                    Ok(Value::Null)
854                }
855            }
856        }
857    }
858
859    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
860        let file = std::fs::File::open(path)?;
861        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
862            .build()?;
863        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
864    }
865
866    fn read_csv_file(
867        &self,
868        path: &str,
869        label: &str,
870        options: &HashMap<String, Value>,
871    ) -> Result<Vec<arrow_array::RecordBatch>> {
872        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
873        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
874        use std::sync::Arc;
875
876        // Parse CSV options
877        let has_headers = options
878            .get("headers")
879            .and_then(|v| v.as_bool())
880            .unwrap_or(true);
881
882        // Read CSV file
883        let file = std::fs::File::open(path)?;
884        let mut rdr = csv::ReaderBuilder::new()
885            .has_headers(has_headers)
886            .from_reader(file);
887
888        // Get schema for type conversion
889        let db_schema = self.storage.schema_manager().schema();
890        let properties = db_schema.properties.get(label);
891
892        // Collect all rows first to determine schema
893        let mut rows: Vec<Vec<String>> = Vec::new();
894        let headers: Vec<String> = if has_headers {
895            rdr.headers()?.iter().map(|s| s.to_string()).collect()
896        } else {
897            Vec::new()
898        };
899
900        for result in rdr.records() {
901            let record = result?;
902            rows.push(record.iter().map(|s| s.to_string()).collect());
903        }
904
905        if rows.is_empty() {
906            return Ok(Vec::new());
907        }
908
909        // Build Arrow schema with proper types based on DB schema
910        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
911        let col_names: Vec<String> = if has_headers {
912            headers
913        } else {
914            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
915        };
916
917        for name in &col_names {
918            let arrow_type = if let Some(props) = properties {
919                if let Some(prop_meta) = props.get(name) {
920                    match prop_meta.r#type {
921                        DataType::Int32 => ArrowDataType::Int32,
922                        DataType::Int64 => ArrowDataType::Int64,
923                        DataType::Float32 => ArrowDataType::Float32,
924                        DataType::Float64 => ArrowDataType::Float64,
925                        DataType::Bool => ArrowDataType::Boolean,
926                        _ => ArrowDataType::Utf8,
927                    }
928                } else {
929                    ArrowDataType::Utf8
930                }
931            } else {
932                ArrowDataType::Utf8
933            };
934            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
935        }
936
937        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
938
939        // Convert rows to Arrow arrays with proper types
940        let mut columns: Vec<ArrayRef> = Vec::new();
941        for (col_idx, field) in arrow_fields.iter().enumerate() {
942            match field.data_type() {
943                ArrowDataType::Int32 => {
944                    let values: Vec<Option<i32>> = rows
945                        .iter()
946                        .map(|row| {
947                            if col_idx < row.len() {
948                                row[col_idx].parse().ok()
949                            } else {
950                                None
951                            }
952                        })
953                        .collect();
954                    columns.push(Arc::new(Int32Array::from(values)));
955                }
956                _ => {
957                    // Default to string
958                    let values: Vec<Option<String>> = rows
959                        .iter()
960                        .map(|row| {
961                            if col_idx < row.len() {
962                                Some(row[col_idx].clone())
963                            } else {
964                                None
965                            }
966                        })
967                        .collect();
968                    columns.push(Arc::new(StringArray::from(values)));
969                }
970            }
971        }
972
973        let batch = RecordBatch::try_new(arrow_schema, columns)?;
974        Ok(vec![batch])
975    }
976
977    fn parse_data_type(type_str: &str) -> Result<DataType> {
978        use uni_common::core::schema::{CrdtType, PointType};
979        let type_str = type_str.to_lowercase();
980        let type_str = type_str.trim();
981        match type_str {
982            "string" | "text" | "varchar" => Ok(DataType::String),
983            "int" | "integer" | "int32" => Ok(DataType::Int32),
984            "long" | "int64" | "bigint" => Ok(DataType::Int64),
985            "float" | "float32" | "real" => Ok(DataType::Float32),
986            "double" | "float64" => Ok(DataType::Float64),
987            "bool" | "boolean" => Ok(DataType::Bool),
988            "timestamp" => Ok(DataType::Timestamp),
989            "date" => Ok(DataType::Date),
990            "time" => Ok(DataType::Time),
991            "datetime" => Ok(DataType::DateTime),
992            "duration" => Ok(DataType::Duration),
993            "json" | "jsonb" => Ok(DataType::CypherValue),
994            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
995            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
996            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
997            s if s.starts_with("vector(") && s.ends_with(')') => {
998                let dims_str = &s[7..s.len() - 1];
999                let dimensions = dims_str
1000                    .parse::<usize>()
1001                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1002                Ok(DataType::Vector { dimensions })
1003            }
1004            s if s.starts_with("list<") && s.ends_with('>') => {
1005                let inner_type_str = &s[5..s.len() - 1];
1006                let inner_type = Self::parse_data_type(inner_type_str)?;
1007                Ok(DataType::List(Box::new(inner_type)))
1008            }
1009            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1010            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1011            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1012        }
1013    }
1014
1015    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1016        let sm = self.storage.schema_manager_arc();
1017        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1018            return Ok(());
1019        }
1020        sm.add_label(&clause.name)?;
1021        for prop in clause.properties {
1022            let dt = Self::parse_data_type(&prop.data_type)?;
1023            sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1024            if prop.unique {
1025                let constraint = Constraint {
1026                    name: format!("{}_{}_unique", clause.name, prop.name),
1027                    constraint_type: ConstraintType::Unique {
1028                        properties: vec![prop.name],
1029                    },
1030                    target: ConstraintTarget::Label(clause.name.clone()),
1031                    enabled: true,
1032                };
1033                sm.add_constraint(constraint)?;
1034            }
1035        }
1036        sm.save().await?;
1037        Ok(())
1038    }
1039
1040    pub(crate) async fn enrich_properties_with_generated_columns(
1041        &self,
1042        label_name: &str,
1043        properties: &mut HashMap<String, Value>,
1044        prop_manager: &PropertyManager,
1045        params: &HashMap<String, Value>,
1046        ctx: Option<&QueryContext>,
1047    ) -> Result<()> {
1048        let schema = self.storage.schema_manager().schema();
1049
1050        if let Some(props_meta) = schema.properties.get(label_name) {
1051            let mut generators = Vec::new();
1052            for (prop_name, meta) in props_meta {
1053                if let Some(expr_str) = &meta.generation_expression {
1054                    generators.push((prop_name.clone(), expr_str.clone()));
1055                }
1056            }
1057
1058            for (prop_name, expr_str) in generators {
1059                let cache_key = (label_name.to_string(), prop_name.clone());
1060                let expr = {
1061                    let cache = self.gen_expr_cache.read().await;
1062                    cache.get(&cache_key).cloned()
1063                };
1064
1065                let expr = match expr {
1066                    Some(e) => e,
1067                    None => {
1068                        let parsed = uni_cypher::parse_expression(&expr_str)
1069                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1070                        let mut cache = self.gen_expr_cache.write().await;
1071                        cache.insert(cache_key, parsed.clone());
1072                        parsed
1073                    }
1074                };
1075
1076                let mut scope = HashMap::new();
1077
1078                // If expression has an explicit variable, use it as an object
1079                if let Some(var) = expr.extract_variable() {
1080                    scope.insert(var, Value::Map(properties.clone()));
1081                } else {
1082                    // No explicit variable - add properties directly to scope for bare references
1083                    // e.g., "lower(email)" can reference "email" directly
1084                    for (k, v) in properties.iter() {
1085                        scope.insert(k.clone(), v.clone());
1086                    }
1087                }
1088
1089                let val = self
1090                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1091                    .await?;
1092                properties.insert(prop_name, val);
1093            }
1094        }
1095        Ok(())
1096    }
1097
1098    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1099        let sm = self.storage.schema_manager_arc();
1100        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1101            return Ok(());
1102        }
1103        sm.add_edge_type(&clause.name, clause.src_labels, clause.dst_labels)?;
1104        for prop in clause.properties {
1105            let dt = Self::parse_data_type(&prop.data_type)?;
1106            sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1107        }
1108        sm.save().await?;
1109        Ok(())
1110    }
1111
1112    /// Executes an ALTER action on a schema entity.
1113    ///
1114    /// This is a shared helper for both `execute_alter_label` and
1115    /// `execute_alter_edge_type` since they have identical logic.
1116    pub(crate) async fn execute_alter_entity(
1117        sm: &Arc<SchemaManager>,
1118        entity_name: &str,
1119        action: AlterAction,
1120    ) -> Result<()> {
1121        match action {
1122            AlterAction::AddProperty(prop) => {
1123                let dt = Self::parse_data_type(&prop.data_type)?;
1124                sm.add_property(entity_name, &prop.name, dt, prop.nullable)?;
1125            }
1126            AlterAction::DropProperty(prop_name) => {
1127                sm.drop_property(entity_name, &prop_name)?;
1128            }
1129            AlterAction::RenameProperty { old_name, new_name } => {
1130                sm.rename_property(entity_name, &old_name, &new_name)?;
1131            }
1132        }
1133        sm.save().await?;
1134        Ok(())
1135    }
1136
1137    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1138        Self::execute_alter_entity(
1139            &self.storage.schema_manager_arc(),
1140            &clause.name,
1141            clause.action,
1142        )
1143        .await
1144    }
1145
1146    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1147        Self::execute_alter_entity(
1148            &self.storage.schema_manager_arc(),
1149            &clause.name,
1150            clause.action,
1151        )
1152        .await
1153    }
1154
1155    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1156        let sm = self.storage.schema_manager_arc();
1157        sm.drop_label(&clause.name, clause.if_exists)?;
1158        sm.save().await?;
1159        Ok(())
1160    }
1161
1162    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1163        let sm = self.storage.schema_manager_arc();
1164        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1165        sm.save().await?;
1166        Ok(())
1167    }
1168
1169    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1170        let sm = self.storage.schema_manager_arc();
1171        let target = ConstraintTarget::Label(clause.label);
1172        let c_type = match clause.constraint_type {
1173            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1174                properties: clause.properties,
1175            },
1176            AstConstraintType::Exists => {
1177                let property = clause
1178                    .properties
1179                    .into_iter()
1180                    .next()
1181                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1182                ConstraintType::Exists { property }
1183            }
1184            AstConstraintType::Check => {
1185                let expression = clause
1186                    .expression
1187                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1188                ConstraintType::Check {
1189                    expression: expression.to_string_repr(),
1190                }
1191            }
1192        };
1193
1194        let constraint = Constraint {
1195            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1196            constraint_type: c_type,
1197            target,
1198            enabled: true,
1199        };
1200
1201        sm.add_constraint(constraint)?;
1202        sm.save().await?;
1203        Ok(())
1204    }
1205
1206    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1207        let sm = self.storage.schema_manager_arc();
1208        sm.drop_constraint(&clause.name, false)?;
1209        sm.save().await?;
1210        Ok(())
1211    }
1212
1213    fn get_composite_constraint(&self, label: &str) -> Option<Constraint> {
1214        let schema = self.storage.schema_manager().schema();
1215        schema
1216            .constraints
1217            .iter()
1218            .find(|c| {
1219                if !c.enabled {
1220                    return false;
1221                }
1222                match &c.target {
1223                    ConstraintTarget::Label(l) if l == label => {
1224                        matches!(c.constraint_type, ConstraintType::Unique { .. })
1225                    }
1226                    _ => false,
1227                }
1228            })
1229            .cloned()
1230    }
1231
1232    #[expect(clippy::too_many_arguments)]
1233    pub(crate) async fn execute_merge(
1234        &self,
1235        rows: Vec<HashMap<String, Value>>,
1236        pattern: &Pattern,
1237        on_match: Option<&SetClause>,
1238        on_create: Option<&SetClause>,
1239        prop_manager: &PropertyManager,
1240        params: &HashMap<String, Value>,
1241        ctx: Option<&QueryContext>,
1242    ) -> Result<Vec<HashMap<String, Value>>> {
1243        let writer_lock = self
1244            .writer
1245            .as_ref()
1246            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1247
1248        // Prepare pattern for path variable binding: assign temp edge variable
1249        // names to unnamed relationships in paths that have path variables.
1250        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1251
1252        let mut results = Vec::new();
1253        for mut row in rows {
1254            // Optimization: Check for single node pattern with unique constraint
1255            let mut optimized_vid = None;
1256            if pattern.paths.len() == 1 {
1257                let path = &pattern.paths[0];
1258                if path.elements.len() == 1
1259                    && let PatternElement::Node(n) = &path.elements[0]
1260                    && n.labels.len() == 1
1261                    && let Some(constraint) = self.get_composite_constraint(&n.labels[0])
1262                    && let ConstraintType::Unique { properties } = constraint.constraint_type
1263                {
1264                    let label = &n.labels[0];
1265                    // Evaluate pattern properties
1266                    let mut pattern_props = HashMap::new();
1267                    if let Some(props_expr) = &n.properties {
1268                        let val = self
1269                            .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1270                            .await?;
1271                        if let Value::Map(map) = val {
1272                            for (k, v) in map {
1273                                pattern_props.insert(k, v);
1274                            }
1275                        }
1276                    }
1277
1278                    // Check if all constraint properties are present
1279                    let has_all_keys = properties.iter().all(|p| pattern_props.contains_key(p));
1280                    if has_all_keys {
1281                        // Extract key properties and convert to serde_json::Value for index lookup
1282                        let key_props: HashMap<String, serde_json::Value> = properties
1283                            .iter()
1284                            .filter_map(|p| {
1285                                pattern_props.get(p).map(|v| (p.clone(), v.clone().into()))
1286                            })
1287                            .collect();
1288
1289                        // Use optimized lookup
1290                        if let Ok(Some(vid)) = self
1291                            .storage
1292                            .index_manager()
1293                            .composite_lookup(label, &key_props)
1294                            .await
1295                        {
1296                            optimized_vid = Some((vid, pattern_props));
1297                        }
1298                    }
1299                }
1300            }
1301
1302            if let Some((vid, _pattern_props)) = optimized_vid {
1303                // Optimized Path: Node found via index
1304                let mut writer = writer_lock.write().await;
1305                let mut match_row = row.clone();
1306                if let PatternElement::Node(n) = &pattern.paths[0].elements[0]
1307                    && let Some(var) = &n.variable
1308                {
1309                    match_row.insert(var.clone(), Value::Int(vid.as_u64() as i64));
1310                }
1311
1312                if let Some(set) = on_match {
1313                    self.execute_set_items_locked(
1314                        &set.items,
1315                        &mut match_row,
1316                        &mut writer,
1317                        prop_manager,
1318                        params,
1319                        ctx,
1320                    )
1321                    .await?;
1322                }
1323                Self::bind_path_variables(&path_pattern, &mut match_row, &temp_vars);
1324                results.push(match_row);
1325            } else {
1326                // Fallback to standard execution
1327                let matches = self
1328                    .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1329                    .await?;
1330                let mut writer = writer_lock.write().await;
1331                if !matches.is_empty() {
1332                    for mut m in matches {
1333                        if let Some(set) = on_match {
1334                            self.execute_set_items_locked(
1335                                &set.items,
1336                                &mut m,
1337                                &mut writer,
1338                                prop_manager,
1339                                params,
1340                                ctx,
1341                            )
1342                            .await?;
1343                        }
1344                        Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1345                        results.push(m);
1346                    }
1347                } else {
1348                    self.execute_create_pattern(
1349                        &path_pattern,
1350                        &mut row,
1351                        &mut writer,
1352                        prop_manager,
1353                        params,
1354                        ctx,
1355                    )
1356                    .await?;
1357                    if let Some(set) = on_create {
1358                        self.execute_set_items_locked(
1359                            &set.items,
1360                            &mut row,
1361                            &mut writer,
1362                            prop_manager,
1363                            params,
1364                            ctx,
1365                        )
1366                        .await?;
1367                    }
1368                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1369                    results.push(row);
1370                }
1371            }
1372        }
1373        Ok(results)
1374    }
1375
1376    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
1377    pub(crate) async fn execute_create_pattern(
1378        &self,
1379        pattern: &Pattern,
1380        row: &mut HashMap<String, Value>,
1381        writer: &mut Writer,
1382        prop_manager: &PropertyManager,
1383        params: &HashMap<String, Value>,
1384        ctx: Option<&QueryContext>,
1385    ) -> Result<()> {
1386        for path in &pattern.paths {
1387            let mut prev_vid: Option<Vid> = None;
1388            // (rel_var, type_id, type_name, props_expr, direction)
1389            type PendingRel = (String, u32, String, Option<Expr>, Direction);
1390            let mut rel_pending: Option<PendingRel> = None;
1391
1392            for element in &path.elements {
1393                match element {
1394                    PatternElement::Node(n) => {
1395                        let mut vid = None;
1396
1397                        // Check if node variable already bound in row
1398                        if let Some(var) = &n.variable
1399                            && let Some(val) = row.get(var)
1400                            && let Ok(existing_vid) = Self::vid_from_value(val)
1401                        {
1402                            vid = Some(existing_vid);
1403                        }
1404
1405                        // If not bound, create it
1406                        if vid.is_none() {
1407                            let mut props = HashMap::new();
1408                            if let Some(props_expr) = &n.properties {
1409                                let props_val = self
1410                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
1411                                    .await?;
1412                                if let Value::Map(map) = props_val {
1413                                    for (k, v) in map {
1414                                        props.insert(k, v);
1415                                    }
1416                                } else {
1417                                    return Err(anyhow!("Properties must evaluate to a map"));
1418                                }
1419                            }
1420
1421                            // Support unlabeled nodes and unknown labels (schemaless)
1422                            let schema = self.storage.schema_manager().schema();
1423
1424                            // VID generation is label-independent
1425                            let new_vid = writer.next_vid().await?;
1426
1427                            // Enrich with generated columns only for known labels
1428                            for label_name in &n.labels {
1429                                if schema.get_label_case_insensitive(label_name).is_some() {
1430                                    self.enrich_properties_with_generated_columns(
1431                                        label_name,
1432                                        &mut props,
1433                                        prop_manager,
1434                                        params,
1435                                        ctx,
1436                                    )
1437                                    .await?;
1438                                }
1439                            }
1440
1441                            // Insert vertex and get back final properties (includes auto-generated embeddings)
1442                            let final_props = writer
1443                                .insert_vertex_with_labels(new_vid, props, &n.labels)
1444                                .await?;
1445
1446                            // Build node object with final properties (includes embeddings)
1447                            if let Some(var) = &n.variable {
1448                                let mut obj = HashMap::new();
1449                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
1450                                let labels_list: Vec<Value> =
1451                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
1452                                obj.insert("_labels".to_string(), Value::List(labels_list));
1453                                for (k, v) in &final_props {
1454                                    obj.insert(k.clone(), v.clone());
1455                                }
1456                                // Store node as a Map with _vid, matching MATCH behavior
1457                                row.insert(var.clone(), Value::Map(obj));
1458                            }
1459                            vid = Some(new_vid);
1460                        }
1461
1462                        let current_vid = vid.unwrap();
1463
1464                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
1465                            rel_pending.take()
1466                            && let Some(src) = prev_vid
1467                        {
1468                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
1469
1470                            if !is_rel_bound {
1471                                let mut rel_props = HashMap::new();
1472                                if let Some(expr) = rel_props_expr {
1473                                    let val = self
1474                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
1475                                        .await?;
1476                                    if let Value::Map(map) = val {
1477                                        rel_props.extend(map);
1478                                    }
1479                                }
1480                                let eid = writer.next_eid(type_id).await?;
1481
1482                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
1483                                let (edge_src, edge_dst) = match dir {
1484                                    Direction::Incoming => (current_vid, src),
1485                                    _ => (src, current_vid),
1486                                };
1487
1488                                let store_props = !rel_var.is_empty();
1489                                let user_props = if store_props {
1490                                    rel_props.clone()
1491                                } else {
1492                                    HashMap::new()
1493                                };
1494
1495                                writer
1496                                    .insert_edge(
1497                                        edge_src,
1498                                        edge_dst,
1499                                        type_id,
1500                                        eid,
1501                                        rel_props,
1502                                        Some(type_name.clone()),
1503                                    )
1504                                    .await?;
1505
1506                                // Edge type name is now stored by insert_edge
1507
1508                                if store_props {
1509                                    let mut edge_map = HashMap::new();
1510                                    edge_map.insert(
1511                                        "_eid".to_string(),
1512                                        Value::Int(eid.as_u64() as i64),
1513                                    );
1514                                    edge_map.insert(
1515                                        "_src".to_string(),
1516                                        Value::Int(edge_src.as_u64() as i64),
1517                                    );
1518                                    edge_map.insert(
1519                                        "_dst".to_string(),
1520                                        Value::Int(edge_dst.as_u64() as i64),
1521                                    );
1522                                    edge_map
1523                                        .insert("_type".to_string(), Value::Int(type_id as i64));
1524                                    // Include user properties so downstream RETURN sees them
1525                                    for (k, v) in user_props {
1526                                        edge_map.insert(k, v);
1527                                    }
1528                                    row.insert(rel_var, Value::Map(edge_map));
1529                                }
1530                            }
1531                        }
1532                        prev_vid = Some(current_vid);
1533                    }
1534                    PatternElement::Relationship(r) => {
1535                        if r.types.len() != 1 {
1536                            return Err(anyhow!(
1537                                "CREATE relationship must specify exactly one type"
1538                            ));
1539                        }
1540                        let type_name = &r.types[0];
1541                        // Get or assign edge type ID (schemaless types get bit 31 = 1)
1542                        let type_id = self
1543                            .storage
1544                            .schema_manager()
1545                            .get_or_assign_edge_type_id(type_name);
1546
1547                        rel_pending = Some((
1548                            r.variable.clone().unwrap_or_default(),
1549                            type_id,
1550                            type_name.clone(),
1551                            r.properties.clone(),
1552                            r.direction.clone(),
1553                        ));
1554                    }
1555                    PatternElement::Parenthesized { .. } => {
1556                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
1557                    }
1558                }
1559            }
1560        }
1561        Ok(())
1562    }
1563
1564    /// Validates that a value is a valid property type per OpenCypher.
1565    /// Rejects maps, nodes, edges, paths, and lists containing those types or nested lists.
1566    /// Skips validation for CypherValue-typed properties which accept any value.
1567    fn validate_property_value(
1568        prop_name: &str,
1569        val: &Value,
1570        schema: &uni_common::core::schema::Schema,
1571        labels: &[String],
1572    ) -> Result<()> {
1573        // CypherValue-typed properties accept any value (including Maps)
1574        for label in labels {
1575            if let Some(props) = schema.properties.get(label)
1576                && let Some(prop_meta) = props.get(prop_name)
1577                && prop_meta.r#type == uni_common::core::schema::DataType::CypherValue
1578            {
1579                return Ok(());
1580            }
1581        }
1582
1583        match val {
1584            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
1585                anyhow::bail!(
1586                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1587                    prop_name
1588                );
1589            }
1590            Value::List(items) => {
1591                for item in items {
1592                    match item {
1593                        Value::Map(_)
1594                        | Value::Node(_)
1595                        | Value::Edge(_)
1596                        | Value::Path(_)
1597                        | Value::List(_) => {
1598                            anyhow::bail!(
1599                                "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1600                                prop_name
1601                            );
1602                        }
1603                        _ => {}
1604                    }
1605                }
1606            }
1607            _ => {}
1608        }
1609        Ok(())
1610    }
1611
1612    pub(crate) async fn execute_set_items_locked(
1613        &self,
1614        items: &[SetItem],
1615        row: &mut HashMap<String, Value>,
1616        writer: &mut Writer,
1617        prop_manager: &PropertyManager,
1618        params: &HashMap<String, Value>,
1619        ctx: Option<&QueryContext>,
1620    ) -> Result<()> {
1621        for item in items {
1622            match item {
1623                SetItem::Property { expr, value } => {
1624                    if let Expr::Property(var_expr, prop_name) = expr
1625                        && let Expr::Variable(var_name) = &**var_expr
1626                        && let Some(node_val) = row.get(var_name)
1627                    {
1628                        if let Ok(vid) = Self::vid_from_value(node_val) {
1629                            let labels =
1630                                Self::extract_labels_from_node(node_val).unwrap_or_default();
1631                            let schema = self.storage.schema_manager().schema().clone();
1632                            let mut props = prop_manager
1633                                .get_all_vertex_props_with_ctx(vid, ctx)
1634                                .await?
1635                                .unwrap_or_default();
1636                            let val = self
1637                                .evaluate_expr(value, row, prop_manager, params, ctx)
1638                                .await?;
1639                            Self::validate_property_value(prop_name, &val, &schema, &labels)?;
1640                            props.insert(prop_name.clone(), val.clone());
1641
1642                            // Enrich with generated columns
1643                            for label_name in &labels {
1644                                self.enrich_properties_with_generated_columns(
1645                                    label_name,
1646                                    &mut props,
1647                                    prop_manager,
1648                                    params,
1649                                    ctx,
1650                                )
1651                                .await?;
1652                            }
1653
1654                            let _ = writer
1655                                .insert_vertex_with_labels(vid, props, &labels)
1656                                .await?;
1657
1658                            // Update the row object so subsequent RETURN sees the new value
1659                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1660                                node_map.insert(prop_name.clone(), val);
1661                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
1662                                node.properties.insert(prop_name.clone(), val);
1663                            }
1664                        } else if let Value::Map(map) = node_val
1665                            && map.get("_eid").is_some_and(|v| !v.is_null())
1666                            && map.get("_src").is_some_and(|v| !v.is_null())
1667                            && map.get("_dst").is_some_and(|v| !v.is_null())
1668                            && map.get("_type").is_some_and(|v| !v.is_null())
1669                        {
1670                            let ei = self.extract_edge_identity(map)?;
1671                            let schema = self.storage.schema_manager().schema().clone();
1672                            // Handle _type as either String or Int (Int from CREATE, String from queries)
1673                            let edge_type_name = match map.get("_type") {
1674                                Some(Value::String(s)) => s.clone(),
1675                                Some(Value::Int(id)) => schema
1676                                    .edge_type_name_by_id_unified(*id as u32)
1677                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
1678                                _ => String::new(),
1679                            };
1680
1681                            let mut props = prop_manager
1682                                .get_all_edge_props_with_ctx(ei.eid, ctx)
1683                                .await?
1684                                .unwrap_or_default();
1685                            let val = self
1686                                .evaluate_expr(value, row, prop_manager, params, ctx)
1687                                .await?;
1688                            Self::validate_property_value(
1689                                prop_name,
1690                                &val,
1691                                &schema,
1692                                std::slice::from_ref(&edge_type_name),
1693                            )?;
1694                            props.insert(prop_name.clone(), val.clone());
1695                            writer
1696                                .insert_edge(
1697                                    ei.src,
1698                                    ei.dst,
1699                                    ei.edge_type_id,
1700                                    ei.eid,
1701                                    props,
1702                                    Some(edge_type_name.clone()),
1703                                )
1704                                .await?;
1705
1706                            // Update the row object so subsequent RETURN sees the new value
1707                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1708                                edge_map.insert(prop_name.clone(), val);
1709                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1710                                edge.properties.insert(prop_name.clone(), val);
1711                            }
1712                        } else if let Value::Edge(edge) = node_val {
1713                            // Handle Value::Edge directly (when traverse returns Edge objects)
1714                            let eid = edge.eid;
1715                            let src = edge.src;
1716                            let dst = edge.dst;
1717                            let edge_type_name = edge.edge_type.clone();
1718                            let etype =
1719                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
1720                            let schema = self.storage.schema_manager().schema().clone();
1721
1722                            let mut props = prop_manager
1723                                .get_all_edge_props_with_ctx(eid, ctx)
1724                                .await?
1725                                .unwrap_or_default();
1726                            let val = self
1727                                .evaluate_expr(value, row, prop_manager, params, ctx)
1728                                .await?;
1729                            Self::validate_property_value(
1730                                prop_name,
1731                                &val,
1732                                &schema,
1733                                std::slice::from_ref(&edge_type_name),
1734                            )?;
1735                            props.insert(prop_name.clone(), val.clone());
1736                            writer
1737                                .insert_edge(
1738                                    src,
1739                                    dst,
1740                                    etype,
1741                                    eid,
1742                                    props,
1743                                    Some(edge_type_name.clone()),
1744                                )
1745                                .await?;
1746
1747                            // Update the row object so subsequent RETURN sees the new value
1748                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1749                                edge.properties.insert(prop_name.clone(), val);
1750                            }
1751                        }
1752                    }
1753                }
1754                SetItem::Labels { variable, labels } => {
1755                    if let Some(node_val) = row.get(variable)
1756                        && let Ok(vid) = Self::vid_from_value(node_val)
1757                    {
1758                        // Get current labels from node value
1759                        let current_labels =
1760                            Self::extract_labels_from_node(node_val).unwrap_or_default();
1761
1762                        // Determine new labels to add (skip duplicates)
1763                        let labels_to_add: Vec<_> = labels
1764                            .iter()
1765                            .filter(|l| !current_labels.contains(l))
1766                            .cloned()
1767                            .collect();
1768
1769                        if !labels_to_add.is_empty() {
1770                            // Add labels via L0Buffer (schemaless: accept any label name,
1771                            // matching CREATE behavior)
1772                            if let Some(ctx) = ctx {
1773                                ctx.l0.write().add_vertex_labels(vid, &labels_to_add);
1774                            }
1775
1776                            // Update the node value in the row with new labels
1777                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
1778                                let mut updated_labels = current_labels;
1779                                updated_labels.extend(labels_to_add);
1780                                let labels_list =
1781                                    updated_labels.into_iter().map(Value::String).collect();
1782                                obj.insert("_labels".to_string(), Value::List(labels_list));
1783                            }
1784                        }
1785                    }
1786                }
1787                SetItem::Variable { variable, value }
1788                | SetItem::VariablePlus { variable, value } => {
1789                    let replace = matches!(item, SetItem::Variable { .. });
1790                    let op_str = if replace { "=" } else { "+=" };
1791
1792                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
1793                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
1794                        continue;
1795                    }
1796                    let rhs = self
1797                        .evaluate_expr(value, row, prop_manager, params, ctx)
1798                        .await?;
1799                    let new_props =
1800                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
1801                            anyhow!(
1802                                "SET {} {} expr: right-hand side must evaluate to a map, \
1803                                 node, or relationship",
1804                                variable,
1805                                op_str
1806                            )
1807                        })?;
1808                    self.apply_properties_to_entity(
1809                        variable,
1810                        new_props,
1811                        replace,
1812                        row,
1813                        writer,
1814                        prop_manager,
1815                        params,
1816                        ctx,
1817                    )
1818                    .await?;
1819                }
1820            }
1821        }
1822        Ok(())
1823    }
1824
1825    /// Execute REMOVE clause items (property removal or label removal).
1826    ///
1827    /// Property removals are batched per variable to avoid stale reads: when
1828    /// multiple properties of the same entity are removed in one REMOVE clause,
1829    /// we read from storage once, null all specified properties, and write back
1830    /// once. This prevents the second removal from reading stale data that
1831    /// doesn't reflect the first removal's L0 write.
1832    pub(crate) async fn execute_remove_items_locked(
1833        &self,
1834        items: &[RemoveItem],
1835        row: &mut HashMap<String, Value>,
1836        writer: &mut Writer,
1837        prop_manager: &PropertyManager,
1838        ctx: Option<&QueryContext>,
1839    ) -> Result<()> {
1840        // Collect property names to remove, grouped by variable.
1841        // Use Vec<(String, Vec<String>)> to preserve insertion order.
1842        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
1843
1844        for item in items {
1845            match item {
1846                RemoveItem::Property(expr) => {
1847                    if let Expr::Property(var_expr, prop_name) = expr
1848                        && let Expr::Variable(var_name) = &**var_expr
1849                    {
1850                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
1851                            entry.1.push(prop_name.clone());
1852                        } else {
1853                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
1854                        }
1855                    }
1856                }
1857                RemoveItem::Labels { variable, labels } => {
1858                    self.execute_remove_labels(variable, labels, row, ctx)?;
1859                }
1860            }
1861        }
1862
1863        // Execute batched property removals per variable.
1864        for (var_name, prop_names) in &prop_removals {
1865            let Some(node_val) = row.get(var_name) else {
1866                continue;
1867            };
1868
1869            if let Ok(vid) = Self::vid_from_value(node_val) {
1870                // Vertex property removal
1871                let mut props = prop_manager
1872                    .get_all_vertex_props_with_ctx(vid, ctx)
1873                    .await?
1874                    .unwrap_or_default();
1875
1876                // Only write back if at least one property actually exists
1877                let any_exist = prop_names
1878                    .iter()
1879                    .any(|p| props.get(p).is_some_and(|v| !v.is_null()));
1880                if any_exist {
1881                    for prop_name in prop_names {
1882                        props.insert(prop_name.clone(), Value::Null);
1883                    }
1884                }
1885                // Compute effective properties (post-removal) for _all_props
1886                let effective: HashMap<String, Value> = props
1887                    .iter()
1888                    .filter(|(_, v)| !v.is_null())
1889                    .map(|(k, v)| (k.clone(), v.clone()))
1890                    .collect();
1891                if any_exist {
1892                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
1893                    let _ = writer
1894                        .insert_vertex_with_labels(vid, props, &labels)
1895                        .await?;
1896                }
1897
1898                // Update the row map: set removed props to Null
1899                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1900                    for prop_name in prop_names {
1901                        node_map.insert(prop_name.clone(), Value::Null);
1902                    }
1903                    // Set _all_props to the complete effective property set
1904                    node_map.insert("_all_props".to_string(), Value::Map(effective));
1905                }
1906            } else if let Value::Map(map) = node_val {
1907                // Edge property removal (map-encoded)
1908                // Check for non-null _eid to skip OPTIONAL MATCH null edges
1909                let mut edge_effective: Option<HashMap<String, Value>> = None;
1910                if map.get("_eid").is_some_and(|v| !v.is_null()) {
1911                    let ei = self.extract_edge_identity(map)?;
1912                    let mut props = prop_manager
1913                        .get_all_edge_props_with_ctx(ei.eid, ctx)
1914                        .await?
1915                        .unwrap_or_default();
1916
1917                    let any_exist = prop_names
1918                        .iter()
1919                        .any(|p| props.get(p).is_some_and(|v| !v.is_null()));
1920                    if any_exist {
1921                        for prop_name in prop_names {
1922                            props.insert(prop_name.to_string(), Value::Null);
1923                        }
1924                    }
1925                    // Compute effective properties (post-removal) for _all_props
1926                    edge_effective = Some(
1927                        props
1928                            .iter()
1929                            .filter(|(_, v)| !v.is_null())
1930                            .map(|(k, v)| (k.clone(), v.clone()))
1931                            .collect(),
1932                    );
1933                    if any_exist {
1934                        let edge_type_name = map
1935                            .get("_type")
1936                            .and_then(|v| v.as_str())
1937                            .map(|s| s.to_string())
1938                            .or_else(|| {
1939                                self.storage
1940                                    .schema_manager()
1941                                    .edge_type_name_by_id_unified(ei.edge_type_id)
1942                            });
1943                        writer
1944                            .insert_edge(
1945                                ei.src,
1946                                ei.dst,
1947                                ei.edge_type_id,
1948                                ei.eid,
1949                                props,
1950                                edge_type_name,
1951                            )
1952                            .await?;
1953                    }
1954                }
1955
1956                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1957                    for prop_name in prop_names {
1958                        edge_map.insert(prop_name.clone(), Value::Null);
1959                    }
1960                    if let Some(effective) = edge_effective {
1961                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
1962                    }
1963                }
1964            } else if let Value::Edge(edge) = node_val {
1965                // Edge property removal (Value::Edge)
1966                let eid = edge.eid;
1967                let src = edge.src;
1968                let dst = edge.dst;
1969                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
1970
1971                let mut props = prop_manager
1972                    .get_all_edge_props_with_ctx(eid, ctx)
1973                    .await?
1974                    .unwrap_or_default();
1975
1976                let any_exist = prop_names
1977                    .iter()
1978                    .any(|p| props.get(p).is_some_and(|v| !v.is_null()));
1979                if any_exist {
1980                    for prop_name in prop_names {
1981                        props.insert(prop_name.to_string(), Value::Null);
1982                    }
1983                    writer
1984                        .insert_edge(src, dst, etype, eid, props, Some(edge.edge_type.clone()))
1985                        .await?;
1986                }
1987
1988                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1989                    for prop_name in prop_names {
1990                        edge.properties.insert(prop_name.to_string(), Value::Null);
1991                    }
1992                }
1993            }
1994        }
1995
1996        Ok(())
1997    }
1998
1999    /// Execute label removal.
2000    pub(crate) fn execute_remove_labels(
2001        &self,
2002        variable: &str,
2003        labels: &[String],
2004        row: &mut HashMap<String, Value>,
2005        ctx: Option<&QueryContext>,
2006    ) -> Result<()> {
2007        if let Some(node_val) = row.get(variable)
2008            && let Ok(vid) = Self::vid_from_value(node_val)
2009        {
2010            // Get current labels from node value
2011            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2012
2013            // Determine which labels to actually remove (only those currently present)
2014            let labels_to_remove: Vec<_> = labels
2015                .iter()
2016                .filter(|l| current_labels.contains(l))
2017                .collect();
2018
2019            if !labels_to_remove.is_empty() {
2020                // Remove labels via L0Buffer
2021                if let Some(ctx) = ctx {
2022                    let mut l0 = ctx.l0.write();
2023                    for label in &labels_to_remove {
2024                        l0.remove_vertex_label(vid, label);
2025                    }
2026                }
2027
2028                // Update the node value in the row with remaining labels
2029                if let Some(Value::Map(obj)) = row.get_mut(variable) {
2030                    let remaining_labels: Vec<_> = current_labels
2031                        .iter()
2032                        .filter(|l| !labels_to_remove.contains(l))
2033                        .cloned()
2034                        .collect();
2035                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
2036                    obj.insert("_labels".to_string(), Value::List(labels_list));
2037                }
2038            }
2039        }
2040        Ok(())
2041    }
2042
2043    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
2044    /// by looking up the type from the L0 buffer's edge endpoints.
2045    fn resolve_edge_type_id_for_edge(
2046        &self,
2047        edge: &crate::types::Edge,
2048        writer: &Writer,
2049    ) -> Result<u32> {
2050        if !edge.edge_type.is_empty() {
2051            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
2052        }
2053        // Edge type name is empty (e.g., from anonymous MATCH patterns).
2054        // Look up the edge type ID from the L0 buffer's edge endpoints.
2055        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid) {
2056            return Ok(etype);
2057        }
2058        Err(anyhow!(
2059            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
2060            edge.eid
2061        ))
2062    }
2063
2064    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
2065    pub(crate) async fn execute_delete_item_locked(
2066        &self,
2067        val: &Value,
2068        detach: bool,
2069        writer: &mut Writer,
2070    ) -> Result<()> {
2071        match val {
2072            Value::Null => {
2073                // DELETE null is a no-op per OpenCypher spec
2074            }
2075            Value::Path(path) => {
2076                // Delete path edges first, then nodes
2077                for edge in &path.edges {
2078                    let etype = self.resolve_edge_type_id_for_edge(edge, writer)?;
2079                    writer
2080                        .delete_edge(edge.eid, edge.src, edge.dst, etype)
2081                        .await?;
2082                }
2083                for node in &path.nodes {
2084                    self.execute_delete_vertex(node.vid, detach, Some(node.labels.clone()), writer)
2085                        .await?;
2086                }
2087            }
2088            _ => {
2089                // Try Path reconstruction from Map first (Arrow loses Path type)
2090                if let Ok(path) = Path::try_from(val) {
2091                    for edge in &path.edges {
2092                        let etype = self.resolve_edge_type_id_for_edge(edge, writer)?;
2093                        writer
2094                            .delete_edge(edge.eid, edge.src, edge.dst, etype)
2095                            .await?;
2096                    }
2097                    for node in &path.nodes {
2098                        self.execute_delete_vertex(
2099                            node.vid,
2100                            detach,
2101                            Some(node.labels.clone()),
2102                            writer,
2103                        )
2104                        .await?;
2105                    }
2106                } else if let Ok(vid) = Self::vid_from_value(val) {
2107                    let labels = Self::extract_labels_from_node(val);
2108                    self.execute_delete_vertex(vid, detach, labels, writer)
2109                        .await?;
2110                } else if let Value::Map(map) = val {
2111                    self.execute_delete_edge_from_map(map, writer).await?;
2112                } else if let Value::Edge(edge) = val {
2113                    let etype = self.resolve_edge_type_id_for_edge(edge, writer)?;
2114                    writer
2115                        .delete_edge(edge.eid, edge.src, edge.dst, etype)
2116                        .await?;
2117                }
2118            }
2119        }
2120        Ok(())
2121    }
2122
2123    /// Execute vertex deletion with optional detach.
2124    pub(crate) async fn execute_delete_vertex(
2125        &self,
2126        vid: Vid,
2127        detach: bool,
2128        labels: Option<Vec<String>>,
2129        writer: &mut Writer,
2130    ) -> Result<()> {
2131        if detach {
2132            self.detach_delete_vertex(vid, writer).await?;
2133        } else {
2134            self.check_vertex_has_no_edges(vid, writer).await?;
2135        }
2136        writer.delete_vertex(vid, labels).await?;
2137        Ok(())
2138    }
2139
2140    /// Check that a vertex has no edges (required for non-DETACH DELETE).
2141    pub(crate) async fn check_vertex_has_no_edges(&self, vid: Vid, writer: &Writer) -> Result<()> {
2142        let schema = self.storage.schema_manager().schema();
2143        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
2144
2145        let out_graph = self
2146            .storage
2147            .load_subgraph_cached(
2148                &[vid],
2149                &edge_type_ids,
2150                1,
2151                uni_store::runtime::Direction::Outgoing,
2152                Some(writer.l0_manager.get_current()),
2153            )
2154            .await?;
2155        let has_out = out_graph.edges().next().is_some();
2156
2157        let in_graph = self
2158            .storage
2159            .load_subgraph_cached(
2160                &[vid],
2161                &edge_type_ids,
2162                1,
2163                uni_store::runtime::Direction::Incoming,
2164                Some(writer.l0_manager.get_current()),
2165            )
2166            .await?;
2167        let has_in = in_graph.edges().next().is_some();
2168
2169        if has_out || has_in {
2170            return Err(anyhow!(
2171                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
2172                vid
2173            ));
2174        }
2175        Ok(())
2176    }
2177
2178    /// Execute edge deletion from a map representation.
2179    pub(crate) async fn execute_delete_edge_from_map(
2180        &self,
2181        map: &HashMap<String, Value>,
2182        writer: &mut Writer,
2183    ) -> Result<()> {
2184        // Check for non-null _eid to skip OPTIONAL MATCH null edges
2185        if map.get("_eid").is_some_and(|v| !v.is_null()) {
2186            let ei = self.extract_edge_identity(map)?;
2187            writer
2188                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id)
2189                .await?;
2190        }
2191        Ok(())
2192    }
2193
2194    /// Build a scan plan node.
2195    ///
2196    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
2197    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
2198    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
2199    fn make_scan_plan(
2200        label_id: u16,
2201        labels: Vec<String>,
2202        variable: String,
2203        filter: Option<Expr>,
2204    ) -> LogicalPlan {
2205        if label_id > 0 {
2206            LogicalPlan::Scan {
2207                label_id,
2208                labels,
2209                variable,
2210                filter,
2211                optional: false,
2212            }
2213        } else if !labels.is_empty() {
2214            // Schemaless label: use ScanMainByLabels to filter by label name
2215            LogicalPlan::ScanMainByLabels {
2216                labels,
2217                variable,
2218                filter,
2219                optional: false,
2220            }
2221        } else {
2222            LogicalPlan::ScanAll {
2223                variable,
2224                filter,
2225                optional: false,
2226            }
2227        }
2228    }
2229
2230    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
2231    /// already contains prior operators.
2232    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
2233        if matches!(plan, LogicalPlan::Empty) {
2234            scan
2235        } else {
2236            LogicalPlan::CrossJoin {
2237                left: Box::new(plan),
2238                right: Box::new(scan),
2239            }
2240        }
2241    }
2242
2243    /// Resolve MERGE property map expressions against the current row context.
2244    ///
2245    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
2246    /// property expressions that reference bound variables. These need to be
2247    /// evaluated to concrete literal values before being converted to filter
2248    /// expressions by `properties_to_expr()`.
2249    async fn resolve_merge_properties(
2250        &self,
2251        properties: &Option<Expr>,
2252        row: &HashMap<String, Value>,
2253        prop_manager: &PropertyManager,
2254        params: &HashMap<String, Value>,
2255        ctx: Option<&QueryContext>,
2256    ) -> Result<Option<Expr>> {
2257        let entries = match properties {
2258            Some(Expr::Map(entries)) => entries,
2259            other => return Ok(other.clone()),
2260        };
2261        let mut resolved = Vec::new();
2262        for (key, val_expr) in entries {
2263            if matches!(val_expr, Expr::Literal(_)) {
2264                resolved.push((key.clone(), val_expr.clone()));
2265            } else {
2266                let value = self
2267                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
2268                    .await?;
2269                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
2270            }
2271        }
2272        Ok(Some(Expr::Map(resolved)))
2273    }
2274
2275    /// Convert a runtime Value back to an AST literal expression.
2276    fn value_to_literal_expr(value: &Value) -> Expr {
2277        match value {
2278            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
2279            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
2280            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
2281            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
2282            Value::Null => Expr::Literal(CypherLiteral::Null),
2283            Value::List(items) => {
2284                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
2285            }
2286            Value::Map(entries) => Expr::Map(
2287                entries
2288                    .iter()
2289                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
2290                    .collect(),
2291            ),
2292            _ => Expr::Literal(CypherLiteral::Null),
2293        }
2294    }
2295
2296    pub(crate) async fn execute_merge_match(
2297        &self,
2298        pattern: &Pattern,
2299        row: &HashMap<String, Value>,
2300        prop_manager: &PropertyManager,
2301        params: &HashMap<String, Value>,
2302        ctx: Option<&QueryContext>,
2303    ) -> Result<Vec<HashMap<String, Value>>> {
2304        // Construct a LogicalPlan for the MATCH part of MERGE
2305        let planner =
2306            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
2307
2308        // We need to construct a CypherQuery to use the planner's plan() method,
2309        // or we can manually construct the LogicalPlan.
2310        // Manual construction is safer as we don't have to round-trip through AST.
2311
2312        let mut plan = LogicalPlan::Empty;
2313        let mut vars_in_scope = Vec::new();
2314
2315        // Add existing bound variables from row to scope
2316        for key in row.keys() {
2317            vars_in_scope.push(key.clone());
2318        }
2319
2320        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
2321        for path in &pattern.paths {
2322            let elements = &path.elements;
2323            let mut i = 0;
2324            while i < elements.len() {
2325                let part = &elements[i];
2326                match part {
2327                    PatternElement::Node(n) => {
2328                        let variable = n.variable.clone().unwrap_or_default();
2329
2330                        // If variable is already bound in the input row, we filter
2331                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
2332
2333                        if is_bound {
2334                            // If bound, we must Scan this specific VID to start the chain
2335                            // Extract VID from row
2336                            let val = row.get(&variable).unwrap();
2337                            let vid = Self::vid_from_value(val)?;
2338
2339                            // In the new storage model, VIDs don't embed label info.
2340                            // We get label from the node value if available, otherwise use 0 to scan all.
2341                            let extracted_labels =
2342                                Self::extract_labels_from_node(val).unwrap_or_default();
2343                            let label_id = {
2344                                let schema = self.storage.schema_manager().schema();
2345                                extracted_labels
2346                                    .first()
2347                                    .and_then(|l| schema.label_id_by_name(l))
2348                                    .unwrap_or(0)
2349                            };
2350
2351                            let resolved_props = self
2352                                .resolve_merge_properties(
2353                                    &n.properties,
2354                                    row,
2355                                    prop_manager,
2356                                    params,
2357                                    ctx,
2358                                )
2359                                .await?;
2360                            let prop_filter =
2361                                planner.properties_to_expr(&variable, &resolved_props);
2362
2363                            // Create a filter expression for VID: variable._vid = vid
2364                            // But our expression engine handles `Expr::Variable` as column.
2365                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
2366                            // Or we use internal property `_vid`.
2367
2368                            // Note: Scan supports `filter`.
2369                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
2370
2371                            let vid_filter = Expr::BinaryOp {
2372                                left: Box::new(Expr::Property(
2373                                    Box::new(Expr::Variable(variable.clone())),
2374                                    "_vid".to_string(),
2375                                )),
2376                                op: BinaryOp::Eq,
2377                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
2378                                    vid.as_u64() as i64,
2379                                ))),
2380                            };
2381
2382                            let combined_filter = if let Some(pf) = prop_filter {
2383                                Some(Expr::BinaryOp {
2384                                    left: Box::new(vid_filter),
2385                                    op: BinaryOp::And,
2386                                    right: Box::new(pf),
2387                                })
2388                            } else {
2389                                Some(vid_filter)
2390                            };
2391
2392                            let scan = Self::make_scan_plan(
2393                                label_id,
2394                                extracted_labels,
2395                                variable.clone(),
2396                                combined_filter,
2397                            );
2398                            plan = Self::attach_scan(plan, scan);
2399                        } else {
2400                            let label_id = if n.labels.is_empty() {
2401                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
2402                                0
2403                            } else {
2404                                let label_name = &n.labels[0];
2405                                let schema = self.storage.schema_manager().schema();
2406                                // Fall back to label_id 0 (any/schemaless) when the label is not
2407                                // in the schema — this allows MERGE to work in schemaless mode.
2408                                schema
2409                                    .get_label_case_insensitive(label_name)
2410                                    .map(|m| m.id)
2411                                    .unwrap_or(0)
2412                            };
2413
2414                            let resolved_props = self
2415                                .resolve_merge_properties(
2416                                    &n.properties,
2417                                    row,
2418                                    prop_manager,
2419                                    params,
2420                                    ctx,
2421                                )
2422                                .await?;
2423                            let prop_filter =
2424                                planner.properties_to_expr(&variable, &resolved_props);
2425                            let scan = Self::make_scan_plan(
2426                                label_id,
2427                                n.labels.clone(),
2428                                variable.clone(),
2429                                prop_filter,
2430                            );
2431                            plan = Self::attach_scan(plan, scan);
2432
2433                            // Add label filters when:
2434                            // 1. Multiple labels with a known schema label: filter for
2435                            //    additional labels (Scan only scans by the first label).
2436                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
2437                            //    nodes, so we must filter to only those with the
2438                            //    specified label(s).
2439                            if !n.labels.is_empty()
2440                                && !variable.is_empty()
2441                                && (label_id == 0 || n.labels.len() > 1)
2442                                && let Some(label_filter) =
2443                                    planner.node_filter_expr(&variable, &n.labels, &None)
2444                            {
2445                                plan = LogicalPlan::Filter {
2446                                    input: Box::new(plan),
2447                                    predicate: label_filter,
2448                                    optional_variables: std::collections::HashSet::new(),
2449                                };
2450                            }
2451
2452                            if !variable.is_empty() {
2453                                vars_in_scope.push(variable.clone());
2454                            }
2455                        }
2456
2457                        // Now look ahead for relationship
2458                        i += 1;
2459                        while i < elements.len() {
2460                            if let PatternElement::Relationship(r) = &elements[i] {
2461                                let target_node_part = &elements[i + 1];
2462                                if let PatternElement::Node(n_target) = target_node_part {
2463                                    let schema = self.storage.schema_manager().schema();
2464                                    let mut edge_type_ids = Vec::new();
2465
2466                                    if r.types.is_empty() {
2467                                        return Err(anyhow!("MERGE edge must have a type"));
2468                                    } else if r.types.len() > 1 {
2469                                        return Err(anyhow!(
2470                                            "MERGE does not support multiple edge types"
2471                                        ));
2472                                    } else {
2473                                        let type_name = &r.types[0];
2474                                        // Use get_or_assign so schemaless edge types work without
2475                                        // a prior schema declaration (same approach as CREATE).
2476                                        let type_id = self
2477                                            .storage
2478                                            .schema_manager()
2479                                            .get_or_assign_edge_type_id(type_name);
2480                                        edge_type_ids.push(type_id);
2481                                    }
2482
2483                                    // Resolve target label ID. For schemaless labels (not in the
2484                                    // schema), fall back to 0 which means "any label" in traversal.
2485                                    let target_label_id: u16 = if let Some(lbl) =
2486                                        n_target.labels.first()
2487                                    {
2488                                        schema
2489                                            .get_label_case_insensitive(lbl)
2490                                            .map(|m| m.id)
2491                                            .unwrap_or(0)
2492                                    } else if let Some(var) = &n_target.variable {
2493                                        if let Some(val) = row.get(var) {
2494                                            // In the new storage model, get labels from node value
2495                                            if let Some(labels) =
2496                                                Self::extract_labels_from_node(val)
2497                                            {
2498                                                if let Some(first_label) = labels.first() {
2499                                                    schema
2500                                                        .get_label_case_insensitive(first_label)
2501                                                        .map(|m| m.id)
2502                                                        .unwrap_or(0)
2503                                                } else {
2504                                                    // Bound node with no labels — schemaless, any
2505                                                    0
2506                                                }
2507                                            } else if Self::vid_from_value(val).is_ok() {
2508                                                // VID without label info — schemaless, any
2509                                                0
2510                                            } else {
2511                                                return Err(anyhow!(
2512                                                    "Variable {} is not a node",
2513                                                    var
2514                                                ));
2515                                            }
2516                                        } else {
2517                                            return Err(anyhow!(
2518                                                "MERGE pattern node must have a label or be a bound variable"
2519                                            ));
2520                                        }
2521                                    } else {
2522                                        return Err(anyhow!(
2523                                            "MERGE pattern node must have a label"
2524                                        ));
2525                                    };
2526
2527                                    let target_variable =
2528                                        n_target.variable.clone().unwrap_or_default();
2529                                    let source_variable = match &elements[i - 1] {
2530                                        PatternElement::Node(n) => {
2531                                            n.variable.clone().unwrap_or_default()
2532                                        }
2533                                        _ => String::new(),
2534                                    };
2535
2536                                    let is_variable_length = r.range.is_some();
2537                                    let type_name = &r.types[0];
2538
2539                                    // Use TraverseMainByType for schemaless edge types
2540                                    // (same as MATCH planner) so edge properties are loaded
2541                                    // correctly from storage + L0 via the adjacency map.
2542                                    // Regular Traverse only loads properties via
2543                                    // property_manager which doesn't handle schemaless types.
2544                                    let is_schemaless = edge_type_ids.iter().all(|id| {
2545                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
2546                                    });
2547
2548                                    if is_schemaless {
2549                                        plan = LogicalPlan::TraverseMainByType {
2550                                            type_names: vec![type_name.clone()],
2551                                            input: Box::new(plan),
2552                                            direction: r.direction.clone(),
2553                                            source_variable,
2554                                            target_variable: target_variable.clone(),
2555                                            step_variable: r.variable.clone(),
2556                                            min_hops: r
2557                                                .range
2558                                                .as_ref()
2559                                                .and_then(|r| r.min)
2560                                                .unwrap_or(1)
2561                                                as usize,
2562                                            max_hops: r
2563                                                .range
2564                                                .as_ref()
2565                                                .and_then(|r| r.max)
2566                                                .unwrap_or(1)
2567                                                as usize,
2568                                            optional: false,
2569                                            target_filter: None,
2570                                            path_variable: None,
2571                                            is_variable_length,
2572                                            optional_pattern_vars: std::collections::HashSet::new(),
2573                                            scope_match_variables: std::collections::HashSet::new(),
2574                                            edge_filter_expr: None,
2575                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2576                                        };
2577                                    } else {
2578                                        // Collect edge property names needed for MERGE filter
2579                                        let mut edge_props = std::collections::HashSet::new();
2580                                        if let Some(Expr::Map(entries)) = &r.properties {
2581                                            for (key, _) in entries {
2582                                                edge_props.insert(key.clone());
2583                                            }
2584                                        }
2585                                        plan = LogicalPlan::Traverse {
2586                                            input: Box::new(plan),
2587                                            edge_type_ids: edge_type_ids.clone(),
2588                                            direction: r.direction.clone(),
2589                                            source_variable,
2590                                            target_variable: target_variable.clone(),
2591                                            target_label_id,
2592                                            step_variable: r.variable.clone(),
2593                                            min_hops: r
2594                                                .range
2595                                                .as_ref()
2596                                                .and_then(|r| r.min)
2597                                                .unwrap_or(1)
2598                                                as usize,
2599                                            max_hops: r
2600                                                .range
2601                                                .as_ref()
2602                                                .and_then(|r| r.max)
2603                                                .unwrap_or(1)
2604                                                as usize,
2605                                            optional: false,
2606                                            target_filter: None,
2607                                            path_variable: None,
2608                                            edge_properties: edge_props,
2609                                            is_variable_length,
2610                                            optional_pattern_vars: std::collections::HashSet::new(),
2611                                            scope_match_variables: std::collections::HashSet::new(),
2612                                            edge_filter_expr: None,
2613                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2614                                            qpp_steps: None,
2615                                        };
2616                                    }
2617
2618                                    // Apply property filters for relationship
2619                                    if r.properties.is_some()
2620                                        && let Some(r_var) = &r.variable
2621                                    {
2622                                        let resolved_rel_props = self
2623                                            .resolve_merge_properties(
2624                                                &r.properties,
2625                                                row,
2626                                                prop_manager,
2627                                                params,
2628                                                ctx,
2629                                            )
2630                                            .await?;
2631                                        if let Some(prop_filter) =
2632                                            planner.properties_to_expr(r_var, &resolved_rel_props)
2633                                        {
2634                                            plan = LogicalPlan::Filter {
2635                                                input: Box::new(plan),
2636                                                predicate: prop_filter,
2637                                                optional_variables: std::collections::HashSet::new(
2638                                                ),
2639                                            };
2640                                        }
2641                                    }
2642
2643                                    // Apply property filters for target node if it was new
2644                                    if !target_variable.is_empty() {
2645                                        let resolved_target_props = self
2646                                            .resolve_merge_properties(
2647                                                &n_target.properties,
2648                                                row,
2649                                                prop_manager,
2650                                                params,
2651                                                ctx,
2652                                            )
2653                                            .await?;
2654                                        if let Some(prop_filter) = planner.properties_to_expr(
2655                                            &target_variable,
2656                                            &resolved_target_props,
2657                                        ) {
2658                                            plan = LogicalPlan::Filter {
2659                                                input: Box::new(plan),
2660                                                predicate: prop_filter,
2661                                                optional_variables: std::collections::HashSet::new(
2662                                                ),
2663                                            };
2664                                        }
2665                                        vars_in_scope.push(target_variable.clone());
2666                                    }
2667
2668                                    if let Some(sv) = &r.variable {
2669                                        vars_in_scope.push(sv.clone());
2670                                    }
2671                                    i += 2;
2672                                } else {
2673                                    break;
2674                                }
2675                            } else {
2676                                break;
2677                            }
2678                        }
2679                    }
2680                    _ => return Err(anyhow!("Pattern must start with a node")),
2681                }
2682            }
2683
2684            // Execute the plan to find all matches, then filter against bound variables in `row`.
2685        }
2686
2687        let db_matches = self
2688            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
2689            .await?;
2690
2691        // Keep only DB results that are consistent with the input row bindings.
2692        // Skip internal keys (starting with "__") as they are implementation
2693        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
2694        // Also skip the empty-string key (""), which is the placeholder variable
2695        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
2696        // and must not constrain the current pattern's match.
2697        let final_matches = db_matches
2698            .into_iter()
2699            .filter(|db_match| {
2700                row.iter().all(|(key, val)| {
2701                    if key.is_empty() || key.starts_with("__") {
2702                        return true;
2703                    }
2704                    let Some(db_val) = db_match.get(key) else {
2705                        return true;
2706                    };
2707                    if db_val == val {
2708                        return true;
2709                    }
2710                    // Values differ -- treat as consistent if they represent the same VID
2711                    matches!(
2712                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
2713                        (Ok(v1), Ok(v2)) if v1 == v2
2714                    )
2715                })
2716            })
2717            .map(|db_match| {
2718                let mut merged = row.clone();
2719                merged.extend(db_match);
2720                merged
2721            })
2722            .collect();
2723
2724        Ok(final_matches)
2725    }
2726
2727    /// Prepare a MERGE pattern for path variable binding.
2728    ///
2729    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
2730    /// unnamed relationships need internal variable names so that `execute_create_pattern`
2731    /// stores the edge data in the row for later path construction.
2732    ///
2733    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
2734    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
2735        let has_path_vars = pattern
2736            .paths
2737            .iter()
2738            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
2739
2740        if !has_path_vars {
2741            return (pattern.clone(), Vec::new());
2742        }
2743
2744        let mut modified = pattern.clone();
2745        let mut temp_vars = Vec::new();
2746
2747        for path in &mut modified.paths {
2748            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
2749                continue;
2750            }
2751            for (idx, element) in path.elements.iter_mut().enumerate() {
2752                if let PatternElement::Relationship(r) = element
2753                    && r.variable.as_ref().is_none_or(String::is_empty)
2754                {
2755                    let temp_var = format!("__path_r_{}", idx);
2756                    r.variable = Some(temp_var.clone());
2757                    temp_vars.push(temp_var);
2758                }
2759            }
2760        }
2761
2762        (modified, temp_vars)
2763    }
2764
2765    /// Bind path variables in the result row based on the MERGE pattern.
2766    ///
2767    /// Walks each path in the pattern, collects node/edge values from the row
2768    /// by variable name, and constructs a `Value::Path`.
2769    fn bind_path_variables(
2770        pattern: &Pattern,
2771        row: &mut HashMap<String, Value>,
2772        temp_vars: &[String],
2773    ) {
2774        for path in &pattern.paths {
2775            let Some(path_var) = path.variable.as_ref() else {
2776                continue;
2777            };
2778            if path_var.is_empty() {
2779                continue;
2780            }
2781
2782            let mut nodes = Vec::new();
2783            let mut edges = Vec::new();
2784
2785            for element in &path.elements {
2786                match element {
2787                    PatternElement::Node(n) => {
2788                        if let Some(var) = &n.variable
2789                            && let Some(val) = row.get(var)
2790                            && let Some(node) = Self::value_to_node_for_path(val)
2791                        {
2792                            nodes.push(node);
2793                        }
2794                    }
2795                    PatternElement::Relationship(r) => {
2796                        if let Some(var) = &r.variable
2797                            && let Some(val) = row.get(var)
2798                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
2799                        {
2800                            edges.push(edge);
2801                        }
2802                    }
2803                    _ => {}
2804                }
2805            }
2806
2807            if !nodes.is_empty() {
2808                use uni_common::value::Path;
2809                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
2810            }
2811        }
2812
2813        // Clean up internal temp variables
2814        for var in temp_vars {
2815            row.remove(var);
2816        }
2817    }
2818
2819    /// Convert a Value (Map or Node) to a Node for path construction.
2820    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
2821        match val {
2822            Value::Node(n) => Some(n.clone()),
2823            Value::Map(map) => {
2824                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
2825                let labels = if let Some(Value::List(l)) = map.get("_labels") {
2826                    l.iter()
2827                        .filter_map(|v| {
2828                            if let Value::String(s) = v {
2829                                Some(s.clone())
2830                            } else {
2831                                None
2832                            }
2833                        })
2834                        .collect()
2835                } else {
2836                    vec![]
2837                };
2838                let properties: HashMap<String, Value> = map
2839                    .iter()
2840                    .filter(|(k, _)| !k.starts_with('_'))
2841                    .map(|(k, v)| (k.clone(), v.clone()))
2842                    .collect();
2843                Some(uni_common::value::Node {
2844                    vid,
2845                    labels,
2846                    properties,
2847                })
2848            }
2849            _ => None,
2850        }
2851    }
2852
2853    /// Convert a Value (Map or Edge) to an Edge for path construction.
2854    fn value_to_edge_for_path(
2855        val: &Value,
2856        type_names: &[String],
2857    ) -> Option<uni_common::value::Edge> {
2858        match val {
2859            Value::Edge(e) => Some(e.clone()),
2860            Value::Map(map) => {
2861                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
2862                let edge_type = map
2863                    .get("_type_name")
2864                    .and_then(|v| {
2865                        if let Value::String(s) = v {
2866                            Some(s.clone())
2867                        } else {
2868                            None
2869                        }
2870                    })
2871                    .or_else(|| type_names.first().cloned())
2872                    .unwrap_or_default();
2873                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
2874                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
2875                let properties: HashMap<String, Value> = map
2876                    .iter()
2877                    .filter(|(k, _)| !k.starts_with('_'))
2878                    .map(|(k, v)| (k.clone(), v.clone()))
2879                    .collect();
2880                Some(uni_common::value::Edge {
2881                    eid,
2882                    edge_type,
2883                    src,
2884                    dst,
2885                    properties,
2886                })
2887            }
2888            _ => None,
2889        }
2890    }
2891}