Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17    DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18    SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32    eid: Eid,
33    src: Vid,
34    dst: Vid,
35    edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44    vid: Vid,
45    labels: Vec<String>,
46    /// Full property map (storage union L0 from
47    /// `get_all_vertex_props_with_ctx` plus the touched values applied
48    /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49    /// tells the flush which columns to send to Lance via MergeInsert.
50    props: HashMap<String, Value>,
51    /// `true` when the SET should flush via the partial-column MergeInsert
52    /// path: set when `UniConfig::partial_lance_writes` is on AND the
53    /// label has no generated columns. Generated-column labels still need
54    /// the full-row Append so the regenerated values land.
55    partial: bool,
56    /// Set of property keys touched by this statement. Threaded into L0
57    /// so the flush emits a `MergeInsertBuilder` source with exactly
58    /// these columns. Empty when `partial == false`.
59    touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64    src: Vid,
65    dst: Vid,
66    edge_type_id: u32,
67    eid: Eid,
68    edge_type_name: String,
69    /// `true` when the SET should flush via the partial-column
70    /// MergeInsert path on the per-edge-type delta tables (Round 12
71    /// §A). Set when `UniConfig::partial_lance_writes` is on.
72    partial: bool,
73    /// Property keys touched by this statement. Threaded into L0 so
74    /// the flush emits a `MergeInsertBuilder` source with exactly
75    /// these columns. Empty when `partial == false`.
76    touched: HashSet<String>,
77    props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84    if vid.is_ephemeral() {
85        return Err(anyhow::Error::from(
86            uni_common::UniError::EphemeralWriteAttempt {
87                kind: "node",
88                id: vid.transient_id().unwrap_or(vid.as_u64()),
89            },
90        ));
91    }
92    Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97    match val {
98        Value::Null => "Null",
99        Value::Bool(_) => "Bool",
100        Value::Int(_) => "Int",
101        Value::Float(_) => "Float",
102        Value::String(_) => "String",
103        Value::Bytes(_) => "Bytes",
104        Value::List(_) => "List",
105        Value::Map(_) => "Map",
106        Value::Node(_) => "Node",
107        Value::Edge(_) => "Edge",
108        Value::Path(_) => "Path",
109        Value::Vector(_) => "Vector",
110        Value::Temporal(_) => "Temporal",
111        _ => "value",
112    }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117    if eid.is_ephemeral() {
118        return Err(anyhow::Error::from(
119            uni_common::UniError::EphemeralWriteAttempt {
120                kind: "edge",
121                id: eid.transient_id().unwrap_or(eid.as_u64()),
122            },
123        ));
124    }
125    Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150    labels: &[String],
151    op: &str,
152) -> Result<()> {
153    let Some(registry) = registry else {
154        return Ok(());
155    };
156    for label in labels {
157        if registry.virtual_label_by_name(label).is_some() {
158            return Err(anyhow!(
159                "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160                 labels are read-only; write back via the originating catalog instead"
161            ));
162        }
163    }
164    Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178    edge_type_id: u32,
179    op: &str,
180) -> Result<()> {
181    let Some(registry) = registry else {
182        return Ok(());
183    };
184    if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185        return Err(anyhow!(
186            "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187             types are read-only; write back via the originating catalog instead",
188            entry.name
189        ));
190    }
191    Ok(())
192}
193
194impl Executor {
195    /// Extracts labels from a node value.
196    ///
197    /// Handles both `Value::Map` (with a `_labels` list field) and
198    /// `Value::Node` (with a `labels` vec field).
199    ///
200    /// Returns `None` when the value is not a node or has no labels.
201    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202        match node_val {
203            Value::Map(map) => {
204                // Map-encoded node: look for _labels array
205                if let Some(Value::List(labels_arr)) = map.get("_labels") {
206                    let labels: Vec<String> = labels_arr
207                        .iter()
208                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
209                        .collect();
210                    if !labels.is_empty() {
211                        return Some(labels);
212                    }
213                }
214                None
215            }
216            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217            _ => None,
218        }
219    }
220
221    /// Extracts user-visible properties from a value that represents a node or edge.
222    ///
223    /// Strips internal bookkeeping keys (those prefixed with `_` or named
224    /// `ext_id`) from map-encoded entities and returns only the user-facing
225    /// property key-value pairs.
226    ///
227    /// Returns `None` when `val` is not a map, node, or edge.
228    pub(crate) fn extract_user_properties_from_value(
229        val: &Value,
230    ) -> Option<HashMap<String, Value>> {
231        match val {
232            Value::Map(map) => {
233                // Distinguish entity-encoded maps from plain map literals.
234                // A node map has both `_vid` and `_labels`.
235                // An edge map has `_eid`, `_src`, and `_dst`.
236                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237                let is_edge_map = map.contains_key("_eid")
238                    && map.contains_key("_src")
239                    && map.contains_key("_dst");
240
241                if is_node_map || is_edge_map {
242                    // Filter out internal bookkeeping keys
243                    let user_props: HashMap<String, Value> = map
244                        .iter()
245                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246                        .map(|(k, v)| (k.clone(), v.clone()))
247                        .collect();
248                    // When mutation output omits dotted property columns, user
249                    // properties live inside `_all_props` rather than at the
250                    // top level of the entity map.
251                    if user_props.is_empty()
252                        && let Some(Value::Map(all_props)) = map.get("_all_props")
253                    {
254                        return Some(all_props.clone());
255                    }
256                    Some(user_props)
257                } else {
258                    // Plain map literal — return as-is
259                    Some(map.clone())
260                }
261            }
262            Value::Node(node) => Some(node.properties.clone()),
263            Value::Edge(edge) => Some(edge.properties.clone()),
264            _ => None,
265        }
266    }
267
268    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269    ///
270    /// When `replace` is `true` the entity's property set is replaced: keys absent
271    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272    /// layer removes them.  When `replace` is `false` the map is merged: keys in
273    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275    /// modes.
276    ///
277    /// Labels are never altered — the spec states that `SET n = map` replaces
278    /// properties only.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the entity cannot be found in the storage layer, or
283    /// if the writer fails to persist the updated properties.
284    #[expect(clippy::too_many_arguments)]
285    async fn apply_properties_to_entity(
286        &self,
287        variable: &str,
288        new_props: HashMap<String, Value>,
289        replace: bool,
290        row: &mut HashMap<String, Value>,
291        writer: &Writer,
292        prop_manager: &PropertyManager,
293        params: &HashMap<String, Value>,
294        ctx: Option<&QueryContext>,
295        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296        prefetched: &Prefetch,
297    ) -> Result<()> {
298        // Clone the target so we can hold &row references elsewhere.
299        let target = row.get(variable).cloned();
300
301        // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302        // forms, mirroring the per-property SET path (issue #68).
303        let schema = self.storage.schema_manager().schema();
304
305        match target {
306            Some(Value::Node(ref node)) => {
307                let vid = node.vid;
308                let labels = node.labels.clone();
309                let current =
310                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311                let write_props = Self::merge_props(current, new_props, replace);
312                let mut enriched = write_props.clone();
313                for label_name in &labels {
314                    self.enrich_properties_with_generated_columns(
315                        label_name,
316                        &mut enriched,
317                        prop_manager,
318                        params,
319                        ctx,
320                    )
321                    .await?;
322                }
323                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324                let _ = writer
325                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326                    .await?;
327                // Update the in-memory row binding
328                if let Some(Value::Node(n)) = row.get_mut(variable) {
329                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330                }
331            }
332            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333                let vid = Self::vid_from_value(node_val)?;
334                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335                let current =
336                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337                let write_props = Self::merge_props(current, new_props, replace);
338                let mut enriched = write_props.clone();
339                for label_name in &labels {
340                    self.enrich_properties_with_generated_columns(
341                        label_name,
342                        &mut enriched,
343                        prop_manager,
344                        params,
345                        ctx,
346                    )
347                    .await?;
348                }
349                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350                let _ = writer
351                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352                    .await?;
353                // Update the in-memory map-encoded node binding
354                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355                    // Remove old user property keys, keep internal fields
356                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357                    // Build effective (non-null) properties
358                    let effective: HashMap<String, Value> =
359                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360                    for (k, v) in &effective {
361                        node_map.insert(k.clone(), v.clone());
362                    }
363                    // Replace _all_props to reflect the complete property set
364                    node_map.insert("_all_props".to_string(), Value::Map(effective));
365                }
366            }
367            Some(Value::Edge(ref edge)) => {
368                let eid = edge.eid;
369                let src = edge.src;
370                let dst = edge.dst;
371                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372                let current =
373                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374                let write_props = Self::merge_props(current, new_props, replace);
375                let write_props = Self::coerce_and_validate_props(
376                    write_props,
377                    &schema,
378                    std::slice::from_ref(&edge.edge_type),
379                )?;
380                writer
381                    .insert_edge(
382                        src,
383                        dst,
384                        etype,
385                        eid,
386                        write_props.clone(),
387                        Some(edge.edge_type.clone()),
388                        tx_l0,
389                    )
390                    .await?;
391                // Update the in-memory row binding
392                if let Some(Value::Edge(e)) = row.get_mut(variable) {
393                    e.properties = write_props
394                        .into_iter()
395                        .filter(|(_, v)| !v.is_null())
396                        .collect();
397                }
398            }
399            Some(Value::Map(ref map))
400                if map.contains_key("_eid")
401                    && map.contains_key("_src")
402                    && map.contains_key("_dst") =>
403            {
404                let ei = self.extract_edge_identity(map)?;
405                let current =
406                    read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407                let write_props = Self::merge_props(current, new_props, replace);
408                let edge_type_name = map
409                    .get("_type")
410                    .and_then(|v| v.as_str())
411                    .map(|s| s.to_string())
412                    .or_else(|| {
413                        self.storage
414                            .schema_manager()
415                            .edge_type_name_by_id_unified(ei.edge_type_id)
416                    });
417                let write_props = match &edge_type_name {
418                    Some(name) => Self::coerce_and_validate_props(
419                        write_props,
420                        &schema,
421                        std::slice::from_ref(name),
422                    )?,
423                    None => write_props,
424                };
425                writer
426                    .insert_edge(
427                        ei.src,
428                        ei.dst,
429                        ei.edge_type_id,
430                        ei.eid,
431                        write_props.clone(),
432                        edge_type_name,
433                        tx_l0,
434                    )
435                    .await?;
436                // Update the in-memory map-encoded edge binding
437                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438                    edge_map.retain(|k, _| k.starts_with('_'));
439                    let effective: HashMap<String, Value> = write_props
440                        .into_iter()
441                        .filter(|(_, v)| !v.is_null())
442                        .collect();
443                    for (k, v) in &effective {
444                        edge_map.insert(k.clone(), v.clone());
445                    }
446                    // Replace _all_props to reflect the complete property set
447                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
448                }
449            }
450            _ => {
451                // No matching entity — nothing to do (caller already guarded against Null)
452            }
453        }
454        Ok(())
455    }
456
457    /// Computes the property map to write given current storage state and the
458    /// incoming change map.
459    ///
460    /// When `replace` is `true`, keys present in `current` but absent from
461    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
462    /// `incoming` are always preserved as explicit tombstones.
463    ///
464    /// When `replace` is `false`, `current` is the base and `incoming` is
465    /// merged on top: each key in `incoming` overwrites or tombstones the
466    /// corresponding entry in `current`.
467    fn merge_props(
468        current: HashMap<String, Value>,
469        incoming: HashMap<String, Value>,
470        replace: bool,
471    ) -> HashMap<String, Value> {
472        if replace {
473            // Start from the non-null incoming entries only.
474            let mut result: HashMap<String, Value> = incoming
475                .iter()
476                .filter(|(_, v)| !v.is_null())
477                .map(|(k, v)| (k.clone(), v.clone()))
478                .collect();
479            // Tombstone every current key that is absent from incoming OR explicitly
480            // set to null in incoming (both mean "delete this property").
481            for k in current.keys() {
482                if incoming.get(k).is_none_or(|v| v.is_null()) {
483                    result.insert(k.clone(), Value::Null);
484                }
485            }
486            result
487        } else {
488            // Merge: start from current and apply incoming on top
489            let mut result = current;
490            result.extend(incoming);
491            result
492        }
493    }
494
495    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497        let eid = Eid::from(
498            map.get("_eid")
499                .and_then(|v| v.as_u64())
500                .ok_or_else(|| anyhow!("Invalid _eid"))?,
501        );
502        let src = Vid::from(
503            map.get("_src")
504                .and_then(|v| v.as_u64())
505                .ok_or_else(|| anyhow!("Invalid _src"))?,
506        );
507        let dst = Vid::from(
508            map.get("_dst")
509                .and_then(|v| v.as_u64())
510                .ok_or_else(|| anyhow!("Invalid _dst"))?,
511        );
512        let edge_type_id = self.resolve_edge_type_id(
513            map.get("_type")
514                .or_else(|| map.get("_type_name"))
515                .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516        )?;
517        Ok(EdgeIdentity {
518            eid,
519            src,
520            dst,
521            edge_type_id,
522        })
523    }
524
525    /// Resolve edge type ID from a Value, supporting both Int and String representations.
526    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527    ///
528    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530    /// where the edge type was just created and may not be in the read-only lookup yet.
531    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532        match type_val {
533            Value::Int(i) => Ok(*i as u32),
534            Value::String(name) => {
535                if self.config.strict_schema {
536                    let schema = self.storage.schema_manager().schema();
537                    schema
538                        .edge_type_id_by_name_case_insensitive(name)
539                        .ok_or_else(|| {
540                            anyhow!(
541                                "Edge type '{}' is not defined in the schema \
542                                 (strict_schema is enabled). \
543                                 Declare it with db.schema().edge_type(...).apply() first.",
544                                name
545                            )
546                        })
547                } else {
548                    // Schemaless: assign new ID if not found in schema or registry.
549                    Ok(self
550                        .storage
551                        .schema_manager()
552                        .get_or_assign_edge_type_id(name))
553                }
554            }
555            _ => Err(anyhow!(
556                "Invalid _type value: expected Int or String, got {:?}",
557                type_val
558            )),
559        }
560    }
561
562    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563        if let Some(writer_arc) = &self.writer {
564            // Flush first while holding the lock
565            {
566                let writer: &uni_store::Writer = writer_arc.as_ref();
567                writer.flush_to_l1(None).await?;
568            } // Drop lock before compacting to avoid blocking reads/writes
569
570            // Compaction can run without holding the writer lock
571            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572            let compaction_results = compactor.compact_all().await?;
573
574            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575            let am = self.storage.adjacency_manager();
576            let schema = self.storage.schema_manager().schema();
577            for info in compaction_results {
578                // Convert string direction to Direction enum
579                let direction = match info.direction.as_str() {
580                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
581                    "bwd" => uni_store::storage::direction::Direction::Incoming,
582                    _ => continue,
583                };
584
585                // Get edge_type_id
586                if let Some(edge_type_id) =
587                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588                {
589                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591                }
592            }
593        }
594        Ok(())
595    }
596
597    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598        if let Some(writer_arc) = &self.writer {
599            let writer: &uni_store::Writer = writer_arc.as_ref();
600            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601        }
602        Ok(())
603    }
604
605    pub(crate) async fn execute_copy_to(
606        &self,
607        identifier: &str,
608        path: &str,
609        format: &str,
610        options: &HashMap<String, Value>,
611    ) -> Result<usize> {
612        // Check schema to determine if identifier is an edge type or vertex label
613        let schema = self.storage.schema_manager().schema();
614
615        // Try as edge type first
616        if schema.get_edge_type_case_insensitive(identifier).is_some() {
617            return self
618                .export_edge_type_in_format(identifier, path, format)
619                .await;
620        }
621
622        // Try as vertex label
623        if schema.get_label_case_insensitive(identifier).is_some() {
624            return self
625                .export_vertex_label_in_format(identifier, path, format, options)
626                .await;
627        }
628
629        // Neither edge type nor vertex label found
630        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631    }
632
633    async fn export_vertex_label_in_format(
634        &self,
635        label: &str,
636        path: &str,
637        format: &str,
638        _options: &HashMap<String, Value>,
639    ) -> Result<usize> {
640        match format {
641            "parquet" => self.export_vertex_label(label, path).await,
642            "csv" => {
643                let mut stream = self
644                    .storage
645                    .scan_vertex_table_stream(label)
646                    .await?
647                    .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649                // Collect all batches
650                let mut all_rows = Vec::new();
651                let mut column_names = Vec::new();
652
653                // Iterate stream using StreamExt
654                use futures::StreamExt;
655                while let Some(batch_result) = stream.next().await {
656                    let batch = batch_result?;
657
658                    // Get column names from first batch
659                    if column_names.is_empty() {
660                        column_names = batch
661                            .schema()
662                            .fields()
663                            .iter()
664                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665                            .map(|f| f.name().clone())
666                            .collect();
667                    }
668
669                    // Convert batch to rows
670                    for row_idx in 0..batch.num_rows() {
671                        let mut row = Vec::new();
672                        for field in batch.schema().fields() {
673                            if field.name().starts_with('_') || field.name() == "ext_id" {
674                                continue;
675                            }
676
677                            let col_idx = batch.schema().index_of(field.name())?;
678                            let column = batch.column(col_idx);
679                            let value = self.arrow_value_to_json(column, row_idx)?;
680
681                            // Convert value to CSV string
682                            let csv_value = match value {
683                                Value::Null => String::new(),
684                                Value::Bool(b) => b.to_string(),
685                                Value::Int(i) => i.to_string(),
686                                Value::Float(f) => f.to_string(),
687                                Value::String(s) => s,
688                                _ => format!("{value}"),
689                            };
690                            row.push(csv_value);
691                        }
692                        all_rows.push(row);
693                    }
694                }
695
696                // Write CSV
697                let file = std::fs::File::create(path)?;
698                let mut wtr = csv::Writer::from_writer(file);
699
700                // Write headers
701                log::debug!("CSV export headers: {:?}", column_names);
702                wtr.write_record(&column_names)?;
703
704                // Write rows
705                for (i, row) in all_rows.iter().enumerate() {
706                    log::debug!("CSV export row {}: {:?}", i, row);
707                    wtr.write_record(row)?;
708                }
709
710                wtr.flush()?;
711                Ok(all_rows.len())
712            }
713            _ => Err(anyhow!(
714                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715                format
716            )),
717        }
718    }
719
720    async fn export_edge_type_in_format(
721        &self,
722        edge_type: &str,
723        path: &str,
724        format: &str,
725    ) -> Result<usize> {
726        match format {
727            "parquet" => self.export_edge_type(edge_type, path).await,
728            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729            _ => Err(anyhow!(
730                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731                format
732            )),
733        }
734    }
735
736    /// Write a stream of record batches to a Parquet file.
737    /// Returns the total number of rows written, or 0 if the stream is empty.
738    async fn write_batches_to_parquet(
739        mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740        path: &str,
741        entity_description: &str,
742    ) -> Result<usize> {
743        use futures::TryStreamExt;
744
745        // Get first batch to determine schema and create writer
746        let first_batch = match stream.try_next().await? {
747            Some(batch) => batch,
748            None => {
749                log::info!("No data to export from {}", entity_description);
750                return Ok(0);
751            }
752        };
753
754        // Create Parquet writer using schema from first batch
755        let file = std::fs::File::create(path)?;
756        let arrow_schema = first_batch.schema();
757        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759        // Write first batch
760        let mut count = first_batch.num_rows();
761        writer.write(&first_batch)?;
762
763        // Write remaining batches
764        while let Some(batch) = stream.try_next().await? {
765            count += batch.num_rows();
766            writer.write(&batch)?;
767        }
768
769        writer.close()?;
770
771        log::info!(
772            "Exported {} rows from {} to '{}'",
773            count,
774            entity_description,
775            path
776        );
777        Ok(count)
778    }
779
780    /// Export vertices of a specific label to Parquet
781    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782        let stream = self
783            .storage
784            .scan_vertex_table_stream(label)
785            .await?
786            .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789    }
790
791    /// Export edges of a specific type to Parquet
792    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793        let schema = self.storage.schema_manager().schema();
794        if !schema.edge_types.contains_key(edge_type) {
795            return Err(anyhow!("Edge type '{}' not found", edge_type));
796        }
797
798        let filter = format!("type = '{}'", edge_type);
799        let stream = self
800            .storage
801            .scan_main_edge_table_stream(Some(&filter))
802            .await?
803            .ok_or_else(|| anyhow!("No edge data found"))?;
804
805        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806    }
807
808    pub(crate) async fn execute_copy_from(
809        &self,
810        label: &str,
811        path: &str,
812        format: &str,
813        options: &HashMap<String, Value>,
814    ) -> Result<usize> {
815        // Read data from file
816        let batches = match format {
817            "parquet" => self.read_parquet_file(path)?,
818            "csv" => self.read_csv_file(path, label, options)?,
819            _ => {
820                return Err(anyhow!(
821                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822                    format
823                ));
824            }
825        };
826
827        // Get writer
828        let writer_arc = self
829            .writer
830            .as_ref()
831            .ok_or_else(|| anyhow!("No writer available"))?;
832
833        let db_schema = self.storage.schema_manager().schema();
834
835        // Check if this is a label (vertex) or edge type
836        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838        if is_edge {
839            // Import edges
840            let edge_type_id = db_schema
841                .edge_type_id_by_name(label)
842                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844            // Get src and dst column names from options
845            let src_col = options
846                .get("src_col")
847                .and_then(|v| v.as_str())
848                .unwrap_or("src");
849            let dst_col = options
850                .get("dst_col")
851                .and_then(|v| v.as_str())
852                .unwrap_or("dst");
853
854            // §5.7 of concurrent_writer.md: writer is hoisted above the row
855            // loop now that there is no per-row lock acquisition cost.
856            let writer: &uni_store::Writer = writer_arc.as_ref();
857            let mut total_rows = 0;
858            for batch in batches {
859                let num_rows = batch.num_rows();
860                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861                let eids = writer.allocate_eids(num_rows).await?;
862
863                for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864                    let mut properties = HashMap::new();
865                    let mut src_vid: Option<Vid> = None;
866                    let mut dst_vid: Option<Vid> = None;
867
868                    // Extract properties and VIDs from each column
869                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870                        let col_name = field.name();
871                        let column = batch.column(col_idx);
872                        let value = self.arrow_value_to_json(column, row_idx)?;
873
874                        if col_name == src_col {
875                            let raw = value.as_u64().unwrap_or_else(|| {
876                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877                            });
878                            src_vid = Some(Vid::new(raw));
879                        } else if col_name == dst_col {
880                            let raw = value.as_u64().unwrap_or_else(|| {
881                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882                            });
883                            dst_vid = Some(Vid::new(raw));
884                        } else if !col_name.starts_with('_') && !value.is_null() {
885                            properties.insert(col_name.clone(), value);
886                        }
887                    }
888
889                    let src = src_vid
890                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891                    let dst = dst_vid
892                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894                    writer
895                        .insert_edge(
896                            src,
897                            dst,
898                            edge_type_id,
899                            eid,
900                            properties,
901                            Some(label.to_string()),
902                            None,
903                        )
904                        .await?;
905
906                    total_rows += 1;
907                }
908            }
909
910            log::info!(
911                "Imported {} edge rows from '{}' into edge type '{}'",
912                total_rows,
913                path,
914                label
915            );
916
917            // Flush to persist edges
918            if total_rows > 0 {
919                writer.flush_to_l1(None).await?;
920            }
921
922            Ok(total_rows)
923        } else {
924            // Import vertices
925            // Validate the label exists in schema
926            db_schema
927                .label_id_by_name_case_insensitive(label)
928                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930            // §5.7 of concurrent_writer.md: writer is hoisted above the row
931            // loop now that there is no per-row lock acquisition cost.
932            let writer: &uni_store::Writer = writer_arc.as_ref();
933            let mut total_rows = 0;
934            for batch in batches {
935                let num_rows = batch.num_rows();
936                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937                let vids = writer.allocate_vids(num_rows).await?;
938
939                // Convert Arrow batch to rows
940                for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941                    let mut properties = HashMap::new();
942
943                    // Extract properties from each column
944                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945                        let col_name = field.name();
946
947                        // Skip internal columns
948                        if col_name.starts_with('_') {
949                            continue;
950                        }
951
952                        let column = batch.column(col_idx);
953                        let value = self.arrow_value_to_json(column, row_idx)?;
954
955                        if !value.is_null() {
956                            properties.insert(col_name.clone(), value);
957                        }
958                    }
959
960                    let _ = writer
961                        .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962                        .await?;
963
964                    total_rows += 1;
965                }
966            }
967
968            log::info!(
969                "Imported {} rows from '{}' into label '{}'",
970                total_rows,
971                path,
972                label
973            );
974
975            // Flush to persist vertices
976            if total_rows > 0 {
977                writer.flush_to_l1(None).await?;
978            }
979
980            Ok(total_rows)
981        }
982    }
983
984    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985        use arrow_array::Array;
986        use arrow_schema::DataType as ArrowDataType;
987
988        if column.is_null(row_idx) {
989            return Ok(Value::Null);
990        }
991
992        match column.data_type() {
993            ArrowDataType::Utf8 => {
994                let array = column
995                    .as_any()
996                    .downcast_ref::<arrow_array::StringArray>()
997                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998                Ok(Value::String(array.value(row_idx).to_string()))
999            }
1000            ArrowDataType::Int32 => {
1001                let array = column
1002                    .as_any()
1003                    .downcast_ref::<arrow_array::Int32Array>()
1004                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005                Ok(Value::Int(array.value(row_idx) as i64))
1006            }
1007            ArrowDataType::Int64 => {
1008                let array = column
1009                    .as_any()
1010                    .downcast_ref::<arrow_array::Int64Array>()
1011                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012                Ok(Value::Int(array.value(row_idx)))
1013            }
1014            ArrowDataType::Float32 => {
1015                let array = column
1016                    .as_any()
1017                    .downcast_ref::<arrow_array::Float32Array>()
1018                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019                Ok(Value::Float(array.value(row_idx) as f64))
1020            }
1021            ArrowDataType::Float64 => {
1022                let array = column
1023                    .as_any()
1024                    .downcast_ref::<arrow_array::Float64Array>()
1025                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026                Ok(Value::Float(array.value(row_idx)))
1027            }
1028            ArrowDataType::Boolean => {
1029                let array = column
1030                    .as_any()
1031                    .downcast_ref::<arrow_array::BooleanArray>()
1032                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033                Ok(Value::Bool(array.value(row_idx)))
1034            }
1035            ArrowDataType::UInt64 => {
1036                let array = column
1037                    .as_any()
1038                    .downcast_ref::<arrow_array::UInt64Array>()
1039                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040                Ok(Value::Int(array.value(row_idx) as i64))
1041            }
1042            _ => {
1043                // For other types, try to convert to string
1044                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045                if let Some(arr) = array {
1046                    Ok(Value::String(arr.value(row_idx).to_string()))
1047                } else {
1048                    Ok(Value::Null)
1049                }
1050            }
1051        }
1052    }
1053
1054    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055        let file = std::fs::File::open(path)?;
1056        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057            .build()?;
1058        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059    }
1060
1061    fn read_csv_file(
1062        &self,
1063        path: &str,
1064        label: &str,
1065        options: &HashMap<String, Value>,
1066    ) -> Result<Vec<arrow_array::RecordBatch>> {
1067        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069        use std::sync::Arc;
1070
1071        // Parse CSV options
1072        let has_headers = options
1073            .get("headers")
1074            .and_then(|v| v.as_bool())
1075            .unwrap_or(true);
1076
1077        // Read CSV file
1078        let file = std::fs::File::open(path)?;
1079        let mut rdr = csv::ReaderBuilder::new()
1080            .has_headers(has_headers)
1081            .from_reader(file);
1082
1083        // Get schema for type conversion
1084        let db_schema = self.storage.schema_manager().schema();
1085        let properties = db_schema.properties.get(label);
1086
1087        // Collect all rows first to determine schema
1088        let mut rows: Vec<Vec<String>> = Vec::new();
1089        let headers: Vec<String> = if has_headers {
1090            rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091        } else {
1092            Vec::new()
1093        };
1094
1095        for result in rdr.records() {
1096            let record = result?;
1097            rows.push(record.iter().map(|s| s.to_string()).collect());
1098        }
1099
1100        if rows.is_empty() {
1101            return Ok(Vec::new());
1102        }
1103
1104        // Build Arrow schema with proper types based on DB schema
1105        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106        let col_names: Vec<String> = if has_headers {
1107            headers
1108        } else {
1109            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110        };
1111
1112        for name in &col_names {
1113            let arrow_type = if let Some(props) = properties {
1114                if let Some(prop_meta) = props.get(name) {
1115                    match prop_meta.r#type {
1116                        DataType::Int32 => ArrowDataType::Int32,
1117                        DataType::Int64 => ArrowDataType::Int64,
1118                        DataType::Float32 => ArrowDataType::Float32,
1119                        DataType::Float64 => ArrowDataType::Float64,
1120                        DataType::Bool => ArrowDataType::Boolean,
1121                        _ => ArrowDataType::Utf8,
1122                    }
1123                } else {
1124                    ArrowDataType::Utf8
1125                }
1126            } else {
1127                ArrowDataType::Utf8
1128            };
1129            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130        }
1131
1132        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134        // Convert rows to Arrow arrays with proper types
1135        let mut columns: Vec<ArrayRef> = Vec::new();
1136        for (col_idx, field) in arrow_fields.iter().enumerate() {
1137            match field.data_type() {
1138                ArrowDataType::Int32 => {
1139                    let values: Vec<Option<i32>> = rows
1140                        .iter()
1141                        .map(|row| {
1142                            if col_idx < row.len() {
1143                                row[col_idx].parse().ok()
1144                            } else {
1145                                None
1146                            }
1147                        })
1148                        .collect();
1149                    columns.push(Arc::new(Int32Array::from(values)));
1150                }
1151                _ => {
1152                    // Default to string
1153                    let values: Vec<Option<String>> = rows
1154                        .iter()
1155                        .map(|row| {
1156                            if col_idx < row.len() {
1157                                Some(row[col_idx].clone())
1158                            } else {
1159                                None
1160                            }
1161                        })
1162                        .collect();
1163                    columns.push(Arc::new(StringArray::from(values)));
1164                }
1165            }
1166        }
1167
1168        let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169        Ok(vec![batch])
1170    }
1171
1172    fn parse_data_type(type_str: &str) -> Result<DataType> {
1173        use uni_common::core::schema::{CrdtType, PointType};
1174        let type_str = type_str.to_lowercase();
1175        let type_str = type_str.trim();
1176        match type_str {
1177            "string" | "text" | "varchar" => Ok(DataType::String),
1178            "int" | "integer" | "int32" => Ok(DataType::Int32),
1179            "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180            "float" | "float32" | "real" => Ok(DataType::Float32),
1181            "double" | "float64" => Ok(DataType::Float64),
1182            "bool" | "boolean" => Ok(DataType::Bool),
1183            "timestamp" => Ok(DataType::Timestamp),
1184            "date" => Ok(DataType::Date),
1185            "time" => Ok(DataType::Time),
1186            "datetime" => Ok(DataType::DateTime),
1187            "duration" => Ok(DataType::Duration),
1188            "btic" => Ok(DataType::Btic),
1189            "json" | "jsonb" => Ok(DataType::CypherValue),
1190            "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194            s if s.starts_with("vector(") && s.ends_with(')') => {
1195                let dims_str = &s[7..s.len() - 1];
1196                let dimensions = dims_str
1197                    .parse::<usize>()
1198                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199                Ok(DataType::Vector { dimensions })
1200            }
1201            s if s.starts_with("list<") && s.ends_with('>') => {
1202                let inner_type_str = &s[5..s.len() - 1];
1203                let inner_type = Self::parse_data_type(inner_type_str)?;
1204                Ok(DataType::List(Box::new(inner_type)))
1205            }
1206            s if s.starts_with("map<") && s.ends_with('>') => {
1207                let (k_str, v_str) = Self::split_map_kv(&s[4..s.len() - 1])?;
1208                let key_type = Self::parse_data_type(&k_str)?;
1209                if !matches!(key_type, DataType::String) {
1210                    return Err(anyhow!("MAP key type must be STRING, got: {k_str}"));
1211                }
1212                let value_type = Self::parse_data_type(&v_str)?;
1213                Ok(DataType::Map(Box::new(key_type), Box::new(value_type)))
1214            }
1215            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1216            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1217            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1218        }
1219    }
1220
1221    /// Split a `MAP<K, V>` inner string on the top-level comma, respecting `<>`/`()` depth
1222    /// so nested value types (`STRING, LIST<INT>`, `STRING, MAP<STRING,INT>`) split at the
1223    /// right comma. Returns trimmed `(key, value)` type strings.
1224    fn split_map_kv(inner: &str) -> Result<(String, String)> {
1225        let mut depth = 0i32;
1226        for (i, c) in inner.char_indices() {
1227            match c {
1228                '<' | '(' => depth += 1,
1229                '>' | ')' => depth -= 1,
1230                ',' if depth == 0 => {
1231                    let k = inner[..i].trim();
1232                    let v = inner[i + 1..].trim();
1233                    if k.is_empty() || v.is_empty() {
1234                        return Err(anyhow!("MAP<K,V> requires both a key and a value type"));
1235                    }
1236                    return Ok((k.to_string(), v.to_string()));
1237                }
1238                _ => {}
1239            }
1240        }
1241        Err(anyhow!(
1242            "MAP<K,V> requires a comma separating key and value types"
1243        ))
1244    }
1245
1246    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1247        let sm = self.storage.schema_manager_arc();
1248        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1249            return Ok(());
1250        }
1251        sm.add_label_with_desc(&clause.name, clause.description)?;
1252        for prop in clause.properties {
1253            let dt = Self::parse_data_type(&prop.data_type)?;
1254            sm.add_property_with_desc(
1255                &clause.name,
1256                &prop.name,
1257                dt,
1258                prop.nullable,
1259                prop.description,
1260            )?;
1261            if prop.unique {
1262                let constraint = Constraint {
1263                    name: format!("{}_{}_unique", clause.name, prop.name),
1264                    constraint_type: ConstraintType::Unique {
1265                        properties: vec![prop.name],
1266                    },
1267                    target: ConstraintTarget::Label(clause.name.clone()),
1268                    enabled: true,
1269                };
1270                sm.add_constraint(constraint)?;
1271            }
1272        }
1273        sm.save().await?;
1274        Ok(())
1275    }
1276
1277    /// True if `key` is a generated property on any of the given labels.
1278    /// Used by the partial-write flush path (Round 12 §C) to decide
1279    /// whether the property should be added to `touched_keys` so that
1280    /// Lance MergeInsert sends the recomputed value.
1281    fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1282        let schema = self.storage.schema_manager().schema();
1283        for label in labels {
1284            if let Some(props_meta) = schema.properties.get(label)
1285                && let Some(meta) = props_meta.get(key)
1286                && meta.generation_expression.is_some()
1287            {
1288                return true;
1289            }
1290        }
1291        false
1292    }
1293
1294    pub(crate) async fn enrich_properties_with_generated_columns(
1295        &self,
1296        label_name: &str,
1297        properties: &mut HashMap<String, Value>,
1298        prop_manager: &PropertyManager,
1299        params: &HashMap<String, Value>,
1300        ctx: Option<&QueryContext>,
1301    ) -> Result<()> {
1302        let schema = self.storage.schema_manager().schema();
1303
1304        if let Some(props_meta) = schema.properties.get(label_name) {
1305            let mut generators = Vec::new();
1306            for (prop_name, meta) in props_meta {
1307                if let Some(expr_str) = &meta.generation_expression {
1308                    generators.push((prop_name.clone(), expr_str.clone()));
1309                }
1310            }
1311
1312            for (prop_name, expr_str) in generators {
1313                let cache_key = (label_name.to_string(), prop_name.clone());
1314                let expr = {
1315                    let cache = self.gen_expr_cache.read().await;
1316                    cache.get(&cache_key).cloned()
1317                };
1318
1319                let expr = match expr {
1320                    Some(e) => e,
1321                    None => {
1322                        let parsed = uni_cypher::parse_expression(&expr_str)
1323                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1324                        let mut cache = self.gen_expr_cache.write().await;
1325                        cache.insert(cache_key, parsed.clone());
1326                        parsed
1327                    }
1328                };
1329
1330                let mut scope = HashMap::new();
1331
1332                // If expression has an explicit variable, use it as an object
1333                if let Some(var) = expr.extract_variable() {
1334                    scope.insert(var, Value::Map(properties.clone()));
1335                } else {
1336                    // No explicit variable - add properties directly to scope for bare references
1337                    // e.g., "lower(email)" can reference "email" directly
1338                    for (k, v) in properties.iter() {
1339                        scope.insert(k.clone(), v.clone());
1340                    }
1341                }
1342
1343                let val = self
1344                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1345                    .await?;
1346                properties.insert(prop_name, val);
1347            }
1348        }
1349        Ok(())
1350    }
1351
1352    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1353        let sm = self.storage.schema_manager_arc();
1354        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1355            return Ok(());
1356        }
1357        sm.add_edge_type_with_desc(
1358            &clause.name,
1359            clause.src_labels,
1360            clause.dst_labels,
1361            clause.description,
1362        )?;
1363        for prop in clause.properties {
1364            let dt = Self::parse_data_type(&prop.data_type)?;
1365            sm.add_property_with_desc(
1366                &clause.name,
1367                &prop.name,
1368                dt,
1369                prop.nullable,
1370                prop.description,
1371            )?;
1372        }
1373        sm.save().await?;
1374        Ok(())
1375    }
1376
1377    /// Executes an ALTER action on a schema entity.
1378    ///
1379    /// This is a shared helper for both `execute_alter_label` and
1380    /// `execute_alter_edge_type` since they have identical logic.
1381    pub(crate) async fn execute_alter_entity(
1382        sm: &Arc<SchemaManager>,
1383        entity_name: &str,
1384        action: AlterAction,
1385    ) -> Result<()> {
1386        match action {
1387            AlterAction::AddProperty(prop) => {
1388                let dt = Self::parse_data_type(&prop.data_type)?;
1389                sm.add_property_with_desc(
1390                    entity_name,
1391                    &prop.name,
1392                    dt,
1393                    prop.nullable,
1394                    prop.description,
1395                )?;
1396            }
1397            AlterAction::DropProperty(prop_name) => {
1398                sm.drop_property(entity_name, &prop_name)?;
1399            }
1400            AlterAction::RenameProperty { old_name, new_name } => {
1401                sm.rename_property(entity_name, &old_name, &new_name)?;
1402            }
1403            AlterAction::SetDescription(desc) => {
1404                if sm.schema().labels.contains_key(entity_name) {
1405                    sm.set_label_description(entity_name, desc)?;
1406                } else {
1407                    sm.set_edge_type_description(entity_name, desc)?;
1408                }
1409            }
1410            AlterAction::SetPropertyDescription {
1411                property,
1412                description,
1413            } => {
1414                sm.set_property_description(entity_name, &property, description)?;
1415            }
1416        }
1417        sm.save().await?;
1418        Ok(())
1419    }
1420
1421    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1422        Self::execute_alter_entity(
1423            &self.storage.schema_manager_arc(),
1424            &clause.name,
1425            clause.action,
1426        )
1427        .await
1428    }
1429
1430    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1431        Self::execute_alter_entity(
1432            &self.storage.schema_manager_arc(),
1433            &clause.name,
1434            clause.action,
1435        )
1436        .await
1437    }
1438
1439    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1440        let sm = self.storage.schema_manager_arc();
1441        sm.drop_label(&clause.name, clause.if_exists)?;
1442        sm.save().await?;
1443        Ok(())
1444    }
1445
1446    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1447        let sm = self.storage.schema_manager_arc();
1448        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1449        sm.save().await?;
1450        Ok(())
1451    }
1452
1453    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1454        let sm = self.storage.schema_manager_arc();
1455        let target = ConstraintTarget::Label(clause.label);
1456        let c_type = match clause.constraint_type {
1457            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1458                properties: clause.properties,
1459            },
1460            AstConstraintType::Exists => {
1461                let property = clause
1462                    .properties
1463                    .into_iter()
1464                    .next()
1465                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1466                ConstraintType::Exists { property }
1467            }
1468            AstConstraintType::Check => {
1469                let expression = clause
1470                    .expression
1471                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1472                ConstraintType::Check {
1473                    expression: expression.to_string_repr(),
1474                }
1475            }
1476        };
1477
1478        let constraint = Constraint {
1479            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1480            constraint_type: c_type,
1481            target,
1482            enabled: true,
1483        };
1484
1485        sm.add_constraint(constraint)?;
1486        sm.save().await?;
1487        Ok(())
1488    }
1489
1490    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1491        let sm = self.storage.schema_manager_arc();
1492        sm.drop_constraint(&clause.name, false)?;
1493        sm.save().await?;
1494        Ok(())
1495    }
1496
1497    /// Detects the single-node, single-label MERGE shape the fast path serves.
1498    ///
1499    /// Returns the node pattern and its label when `pattern` is one path with
1500    /// one node element, exactly one label, and a static map-literal property
1501    /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1502    /// per-row query planning. The keys do NOT need to be indexed: the persisted
1503    /// lookup degrades to a (single, filtered) label scan when no scalar index
1504    /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1505    /// Any other shape (edges, multiple labels, non-literal properties) returns
1506    /// `None` so the caller uses the general per-row path.
1507    fn merge_single_node_fastpath<'p>(
1508        &self,
1509        pattern: &'p Pattern,
1510    ) -> Option<(&'p NodePattern, String)> {
1511        if pattern.paths.len() != 1 {
1512            return None;
1513        }
1514        let path = &pattern.paths[0];
1515        if path.elements.len() != 1 {
1516            return None;
1517        }
1518        let PatternElement::Node(n) = &path.elements[0] else {
1519            return None;
1520        };
1521        let labels = n.labels.names();
1522        if labels.len() != 1 {
1523            return None;
1524        }
1525        // The key must be a static map literal so the key names are known.
1526        let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1527            return None;
1528        };
1529        if entries.is_empty() {
1530            return None;
1531        }
1532        // Resolve the label to its schema-canonical case so the fast path agrees
1533        // with the general MERGE path (which matches labels case-insensitively).
1534        // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1535        // scans/keys a different label than the canonical one and creates a
1536        // duplicate (review #3a). Falls back to the as-written label when the
1537        // schema does not know it (schemaless).
1538        let canonical = self
1539            .storage
1540            .schema_manager()
1541            .schema()
1542            .canonical_label_name(&labels[0])
1543            .unwrap_or_else(|| labels[0].clone());
1544        Some((n, canonical))
1545    }
1546
1547    /// RC3: detect the bound-endpoints, anonymous-edge relationship MERGE shape
1548    /// `(a)-[:TYPE]->(b)` whose edge existence can be resolved with one O(1)
1549    /// adjacency probe instead of building and running a per-row traversal
1550    /// `LogicalPlan` (the general path is ~19x the bulk CREATE of the same edges).
1551    ///
1552    /// Returns `(source_var, target_var, edge_type_id, direction)` when the
1553    /// pattern is exactly one path of `[Node, Rel, Node]` where the relationship
1554    /// is a single concrete type, **anonymous** (no variable → no edge binding to
1555    /// reproduce), fixed-length, and unfiltered, and both endpoint nodes are plain
1556    /// variables with no re-specified MERGE properties or inline WHERE (those are
1557    /// filters only the general path applies). Any deviation returns `None` and
1558    /// the caller keeps the general path. The caller still verifies per row that
1559    /// both endpoints are actually bound to vids.
1560    fn merge_relationship_fastpath_shape(
1561        &self,
1562        pattern: &Pattern,
1563    ) -> Option<(
1564        String,
1565        String,
1566        u32,
1567        uni_store::storage::direction::Direction,
1568    )> {
1569        if pattern.paths.len() != 1 {
1570            return None;
1571        }
1572        let [
1573            PatternElement::Node(a),
1574            PatternElement::Relationship(r),
1575            PatternElement::Node(b),
1576        ] = pattern.paths[0].elements.as_slice()
1577        else {
1578            return None;
1579        };
1580        // Endpoints: plain variables, no extra MERGE-pattern properties / inline
1581        // WHERE (a re-specified property is a filter the general path must apply).
1582        let src_var = a.variable.as_ref()?;
1583        let dst_var = b.variable.as_ref()?;
1584        if a.properties.is_some()
1585            || a.where_clause.is_some()
1586            || b.properties.is_some()
1587            || b.where_clause.is_some()
1588        {
1589            return None;
1590        }
1591        // Relationship: single concrete type, anonymous, fixed-length, unfiltered.
1592        if r.variable.is_some()
1593            || r.range.is_some()
1594            || r.properties.is_some()
1595            || r.where_clause.is_some()
1596            || r.types.names().len() != 1
1597        {
1598            return None;
1599        }
1600        let type_name = &r.types.names()[0];
1601        let type_id = if self.config.strict_schema {
1602            self.storage
1603                .schema_manager()
1604                .schema()
1605                .edge_type_id_by_name_case_insensitive(type_name)?
1606        } else {
1607            self.storage
1608                .schema_manager()
1609                .get_or_assign_edge_type_id(type_name)
1610        };
1611        let dir = match r.direction {
1612            uni_cypher::ast::Direction::Outgoing => {
1613                uni_store::storage::direction::Direction::Outgoing
1614            }
1615            uni_cypher::ast::Direction::Incoming => {
1616                uni_store::storage::direction::Direction::Incoming
1617            }
1618            // Undirected existence is ambiguous to encode as one probe; the
1619            // general path handles it.
1620            uni_cypher::ast::Direction::Both => return None,
1621        };
1622        Some((src_var.clone(), dst_var.clone(), type_id, dir))
1623    }
1624
1625    /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1626    /// is not a scalar this fast path can represent.
1627    ///
1628    /// Returning `None` makes the caller fall back to the general per-row path,
1629    /// so unusual key value types (lists, maps, temporals, nulls) are never
1630    /// silently mis-matched. The `_deleted = false` clause mirrors the
1631    /// persisted-read predicate used elsewhere; the version high-water-mark
1632    /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1633    fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1634        if key_props.is_empty() {
1635            return None;
1636        }
1637        let mut parts = Vec::with_capacity(key_props.len() + 1);
1638        for (k, v) in key_props {
1639            if !Self::is_safe_key_ident(k) {
1640                return None;
1641            }
1642            let lit = Self::render_key_literal(v)?;
1643            // Unquoted identifier: the Lance filter parser does not resolve a
1644            // double-quoted column name against the table here, so `"k" = v`
1645            // silently matches nothing. Keys are validated above to be safe
1646            // bare identifiers.
1647            parts.push(format!("{k} = {lit}"));
1648        }
1649        parts.push("_deleted = false".to_string());
1650        Some(parts.join(" AND "))
1651    }
1652
1653    /// True when a MERGE key name is a safe bare identifier for a Lance
1654    /// filter (issue #8). Keys come from a static map literal, but validate
1655    /// anyway.
1656    fn is_safe_key_ident(k: &str) -> bool {
1657        !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1658    }
1659
1660    /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1661    /// for value types this fast path cannot represent (lists, maps,
1662    /// temporals, nulls) — the caller then falls back to the general path.
1663    fn render_key_literal(v: &Value) -> Option<String> {
1664        Some(match v {
1665            Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1666            Value::Int(i) => i.to_string(),
1667            Value::Float(f) => f.to_string(),
1668            Value::Bool(b) => b.to_string(),
1669            _ => return None,
1670        })
1671    }
1672
1673    /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1674    /// sorted by `key_names` order, values canonicalized).
1675    ///
1676    /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1677    /// never compares mixed literal types against one column); composite keys
1678    /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1679    /// the same `_deleted = false` clause the per-row filter used.
1680    fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1681        if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1682            return None;
1683        }
1684        let disjunction = if let [key] = key_names {
1685            // Group literals by value variant so each IN list is homogeneous.
1686            let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1687            for tuple in keys {
1688                let (_, v) = tuple.first()?;
1689                groups
1690                    .entry(std::mem::discriminant(v))
1691                    .or_default()
1692                    .push(Self::render_key_literal(v)?);
1693            }
1694            groups
1695                .into_values()
1696                .map(|lits| {
1697                    if let [lit] = lits.as_slice() {
1698                        format!("{key} = {lit}")
1699                    } else {
1700                        format!("{key} IN ({})", lits.join(", "))
1701                    }
1702                })
1703                .collect::<Vec<_>>()
1704                .join(" OR ")
1705        } else {
1706            keys.iter()
1707                .map(|tuple| {
1708                    let conj = tuple
1709                        .iter()
1710                        .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1711                        .collect::<Option<Vec<_>>>()?
1712                        .join(" AND ");
1713                    Some(format!("({conj})"))
1714                })
1715                .collect::<Option<Vec<_>>>()?
1716                .join(" OR ")
1717        };
1718        Some(format!("({disjunction}) AND _deleted = false"))
1719    }
1720
1721    /// Canonicalize a numeric MERGE-key value for *matching only*.
1722    ///
1723    /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1724    /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1725    /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1726    /// path already applies (review #3a). Non-numeric and non-integral values are
1727    /// returned unchanged. Used only to build match keys / comparisons, never the
1728    /// value written to a created node.
1729    fn canonical_key_value(v: &Value) -> Value {
1730        match v {
1731            Value::Float(f)
1732                if f.is_finite()
1733                    && f.fract() == 0.0
1734                    && *f >= i64::MIN as f64
1735                    && *f <= i64::MAX as f64 =>
1736            {
1737                Value::Int(*f as i64)
1738            }
1739            other => other.clone(),
1740        }
1741    }
1742
1743    /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1744    ///
1745    /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1746    /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1747    /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1748    /// created node's properties come from the original, un-canonicalized map.
1749    fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1750        let mut tuple: MergeKey = key_props
1751            .iter()
1752            .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1753            .collect();
1754        tuple.sort_by(|a, b| a.0.cmp(&b.0));
1755        tuple
1756    }
1757
1758    /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1759    ///
1760    /// Walked once per MERGE statement (issue #69): the per-row fast path then
1761    /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1762    /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1763    /// rows and rows created earlier in the same transaction; rows created by
1764    /// later rows of this same statement are folded in incrementally by
1765    /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1766    /// [`Self::merge_key_tuple`].
1767    fn merge_l0_existing(
1768        &self,
1769        label: &str,
1770        key_names: &[String],
1771        ctx: Option<&QueryContext>,
1772    ) -> HashMap<MergeKey, Vec<Vid>> {
1773        let mut candidates: Vec<Vid> = Vec::new();
1774        l0_visibility::visit_l0_buffers(ctx, |l0| {
1775            if let Some(vids) = l0.label_to_vids.get(label) {
1776                candidates.extend(vids.iter().copied());
1777            }
1778            false
1779        });
1780
1781        let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1782        let mut seen: HashSet<Vid> = HashSet::new();
1783        for vid in candidates {
1784            if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1785                continue;
1786            }
1787            // `lookup_vertex_prop` merges across L0 layers (newest wins).
1788            let tuple: MergeKey = key_names
1789                .iter()
1790                .map(|k| {
1791                    let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1792                    (k.clone(), Self::canonical_key_value(&v))
1793                })
1794                .collect();
1795            map.entry(tuple).or_default().push(vid);
1796        }
1797        map
1798    }
1799
1800    /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1801    /// size and Lance/DataFusion parse cost; chunks run sequentially.
1802    const MERGE_SCAN_CHUNK: usize = 1000;
1803
1804    /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1805    /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1806    /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1807    /// independent Lance scans).
1808    ///
1809    /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1810    /// read path `MATCH` uses, so it honors the version high-water-mark and
1811    /// sees flushed rows. On the declared-label branch the key-filtered scan
1812    /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1813    /// picks each candidate's max-`_version` row and requires it to be live
1814    /// and still keyed as requested (per-label tables are MVCC-append, so a
1815    /// superseded version's row would otherwise stale-match a rewritten key).
1816    /// Matched rows are grouped by their CANONICAL key tuple (stored values
1817    /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1818    /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1819    /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1820    /// by earlier rows of the same statement) is NOT checked here — the
1821    /// per-row consumer re-checks at row time, exactly as the old per-row
1822    /// scan did.
1823    ///
1824    /// The second returned map carries the FULL property maps the schemaless
1825    /// branch already decoded for each matched vid (empty on the declared-label
1826    /// branch, which projects only key columns) — the caller seeds the
1827    /// statement-level [`Prefetch`] from it at zero extra scans.
1828    ///
1829    /// # Errors
1830    /// Propagates persisted-scan and filter-build failures — fail-closed: a
1831    /// MERGE must never treat a failed lookup as "no match" and create
1832    /// duplicates.
1833    async fn merge_lookup_persisted_batch(
1834        &self,
1835        label: &str,
1836        key_names: &[String],
1837        keys: &HashSet<MergeKey>,
1838    ) -> Result<(
1839        HashMap<MergeKey, Vec<Vid>>,
1840        HashMap<Vid, uni_common::Properties>,
1841    )> {
1842        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1843        if keys.is_empty() {
1844            return Ok((out, HashMap::new()));
1845        }
1846        // An undeclared (schemaless) label has no per-label table — its flushed
1847        // rows live only in the unified main vertex table. Route to the
1848        // main-table lookup, mirroring the planner's scan routing (a schemaless
1849        // MATCH plans `ScanMainByLabels` on the same schema predicate).
1850        if self
1851            .storage
1852            .schema_manager()
1853            .schema()
1854            .get_label_case_insensitive(label)
1855            .is_none()
1856        {
1857            return self
1858                .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1859                .await;
1860        }
1861        // Declared label — the per-label table is MVCC-append (an update
1862        // flush adds a higher-`_version` row for the same vid) and the key
1863        // predicate is pushed into the Lance filter, so a SUPERSEDED version
1864        // whose row still carries a requested key is returned while the vid's
1865        // current row (key rewritten, fails the filter) is invisible to the
1866        // scan. Version dedup among the returned rows cannot detect that, so
1867        // the lookup runs in two passes: the key-filtered scan only nominates
1868        // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1869        // each candidate's max-`_version` row to be live and still keyed as
1870        // requested.
1871        let mut columns: Vec<&str> = vec!["_vid"];
1872        columns.extend(key_names.iter().map(String::as_str));
1873
1874        let key_list: Vec<&MergeKey> = keys.iter().collect();
1875        let mut candidates: Vec<Vid> = Vec::new();
1876        let mut seen: HashSet<Vid> = HashSet::new();
1877        for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1878            let filter = Self::merge_batch_filter(key_names, chunk)
1879                .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1880            let scanned = self
1881                .storage
1882                .scan_vertex_table(label, &columns, Some(&filter))
1883                .await?;
1884            let Some(batch) = scanned else { continue };
1885            let Some(vid_col) = batch
1886                .column_by_name("_vid")
1887                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1888            else {
1889                continue;
1890            };
1891            for i in 0..vid_col.len() {
1892                let vid = Vid::from(vid_col.value(i));
1893                if seen.insert(vid) {
1894                    candidates.push(vid);
1895                }
1896            }
1897        }
1898
1899        // Verification pass — tombstones are NOT filtered Lance-side (the
1900        // max-version pick must see them so a deleted winner cannot let an
1901        // older live version resurrect the match), exactly like the
1902        // schemaless branch below.
1903        let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1904        verify_columns.extend(key_names.iter().map(String::as_str));
1905        for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1906            let vid_list = chunk
1907                .iter()
1908                .map(|v| v.as_u64().to_string())
1909                .collect::<Vec<_>>()
1910                .join(", ");
1911            let filter = format!("_vid IN ({vid_list})");
1912            let scanned = self
1913                .storage
1914                .scan_vertex_table(label, &verify_columns, Some(&filter))
1915                .await?;
1916            let Some(batch) = scanned else { continue };
1917            let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1918                batch
1919                    .column_by_name("_vid")
1920                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1921                batch
1922                    .column_by_name("_deleted")
1923                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1924                batch
1925                    .column_by_name("_version")
1926                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1927            ) else {
1928                return Err(anyhow!(
1929                    "MERGE batched lookup: verification scan missing a required column"
1930                ));
1931            };
1932            let key_cols: Vec<_> = key_names
1933                .iter()
1934                .map(|k| batch.column_by_name(k))
1935                .collect::<Option<Vec<_>>>()
1936                .ok_or_else(|| {
1937                    anyhow!("MERGE batched lookup: projected key column missing from scan result")
1938                })?;
1939            // Per-vid MVCC dedup: keep the highest-version row for each vid.
1940            let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1941            for i in 0..batch.num_rows() {
1942                let vid = Vid::from(vid_col.value(i));
1943                let ver = ver_col.value(i);
1944                let entry = winners.entry(vid).or_insert((ver, i));
1945                if ver > entry.0 {
1946                    *entry = (ver, i);
1947                }
1948            }
1949            for (vid, (_ver, row)) in winners {
1950                if del_col.value(row) {
1951                    continue;
1952                }
1953                let tuple: MergeKey = key_names
1954                    .iter()
1955                    .zip(&key_cols)
1956                    .map(|(k, col)| {
1957                        let v = uni_store::storage::arrow_convert::arrow_to_value(
1958                            col.as_ref(),
1959                            row,
1960                            None,
1961                        );
1962                        (k.clone(), Self::canonical_key_value(&v))
1963                    })
1964                    .collect();
1965                if keys.contains(&tuple) {
1966                    out.entry(tuple).or_default().push(vid);
1967                }
1968            }
1969        }
1970        Ok((out, HashMap::new()))
1971    }
1972
1973    /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1974    ///
1975    /// Schemaless rows live only in the unified main vertex table (per-label
1976    /// tables exist only for declared labels), with all properties encoded in
1977    /// the `props_json` CypherValue blob — so key values cannot be pushed into
1978    /// the Lance filter; the key match happens in memory after decoding,
1979    /// exactly like the schemaless MATCH scan. One main-table scan regardless
1980    /// of key count.
1981    ///
1982    /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1983    /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1984    /// version per vid); the per-vid max-`_version` dedup runs here, then
1985    /// deleted winners are dropped.
1986    ///
1987    /// Also returns the full decoded property map per matched vid — the blob
1988    /// is decoded here anyway, and the caller seeds the statement-level
1989    /// [`Prefetch`] from it instead of re-reading per row.
1990    ///
1991    /// # Errors
1992    /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
1993    /// never treat a failed lookup as "no match" and create duplicates.
1994    async fn merge_lookup_persisted_batch_schemaless(
1995        &self,
1996        label: &str,
1997        key_names: &[String],
1998        keys: &HashSet<MergeKey>,
1999    ) -> Result<(
2000        HashMap<MergeKey, Vec<Vid>>,
2001        HashMap<Vid, uni_common::Properties>,
2002    )> {
2003        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2004        let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
2005        let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
2006        let Some(batch) = self
2007            .storage
2008            .scan_main_vertex_table(
2009                &["_vid", "_deleted", "props_json", "_version"],
2010                Some(&filter),
2011            )
2012            .await?
2013        else {
2014            return Ok((out, props_by_vid));
2015        };
2016        let (Some(vid_col), Some(del_col), Some(ver_col)) = (
2017            batch
2018                .column_by_name("_vid")
2019                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2020            batch
2021                .column_by_name("_deleted")
2022                .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
2023            batch
2024                .column_by_name("_version")
2025                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2026        ) else {
2027            return Err(anyhow!(
2028                "schemaless MERGE lookup: main vertex table scan missing a required column"
2029            ));
2030        };
2031        let props_col = batch
2032            .column_by_name("props_json")
2033            .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2034
2035        // Per-vid MVCC dedup: keep the highest-version row for each vid.
2036        let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
2037        for i in 0..batch.num_rows() {
2038            let vid = Vid::from(vid_col.value(i));
2039            let ver = ver_col.value(i);
2040            let entry = winners.entry(vid).or_insert((ver, i));
2041            if ver > entry.0 {
2042                *entry = (ver, i);
2043            }
2044        }
2045        for (vid, (_ver, row)) in winners {
2046            // Drop deletion tombstones AFTER picking the winner — a deleted
2047            // winner must not let an older live version resurrect the match.
2048            if del_col.value(row) {
2049                continue;
2050            }
2051            // A row without properties matches only an all-Null key tuple.
2052            let props = match props_col {
2053                Some(arr) if !arrow_array::Array::is_null(arr, row) => {
2054                    match uni_common::cypher_value_codec::decode(arr.value(row))
2055                        .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
2056                    {
2057                        Value::Map(m) => m,
2058                        _ => HashMap::new(),
2059                    }
2060                }
2061                _ => HashMap::new(),
2062            };
2063            let tuple: MergeKey = key_names
2064                .iter()
2065                .map(|k| {
2066                    (
2067                        k.clone(),
2068                        Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
2069                    )
2070                })
2071                .collect();
2072            if keys.contains(&tuple) {
2073                out.entry(tuple).or_default().push(vid);
2074                props_by_vid.insert(vid, props);
2075            }
2076        }
2077        Ok((out, props_by_vid))
2078    }
2079
2080    /// True if the statement-level MERGE property prefetch is safe for `label`.
2081    ///
2082    /// False when the label declares any CRDT-typed property: a prefetch HIT in
2083    /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
2084    /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
2085    /// labels keep the per-row read path. Undeclared labels are trivially safe
2086    /// (normalization is a no-op without schema CRDT entries).
2087    fn merge_label_prefetch_safe(&self, label: &str) -> bool {
2088        let schema = self.storage.schema_manager().schema();
2089        schema.properties.get(label).is_none_or(|props| {
2090            !props
2091                .values()
2092                .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
2093        })
2094    }
2095
2096    /// True if an L0 override rewrote any key column of a persisted match away
2097    /// from its requested value (so the persisted row no longer matches).
2098    fn vid_overrides_break_key(
2099        vid: Vid,
2100        key_props: &HashMap<String, Value>,
2101        ctx: Option<&QueryContext>,
2102    ) -> bool {
2103        key_props.iter().any(|(k, want)| {
2104            matches!(
2105                l0_visibility::lookup_vertex_prop(vid, k, ctx),
2106                Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
2107            )
2108        })
2109    }
2110
2111    /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2112    /// node variable.
2113    ///
2114    /// Matches the binding shape produced by `execute_create_pattern` and the
2115    /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2116    /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2117    /// valid node binding for those consumers.
2118    fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2119        let mut obj = HashMap::new();
2120        obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2121        obj.insert(
2122            "_labels".to_string(),
2123            Value::List(vec![Value::String(label.to_string())]),
2124        );
2125        for (k, v) in props {
2126            obj.insert(k, v);
2127        }
2128        Value::Map(obj)
2129    }
2130
2131    /// True if an L0-only vertex has every key column set to the requested
2132    /// value. A missing column matches only a requested `Null`.
2133    fn l0_vid_matches_key(
2134        vid: Vid,
2135        key_props: &HashMap<String, Value>,
2136        ctx: Option<&QueryContext>,
2137    ) -> bool {
2138        key_props.iter().all(
2139            |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2140                Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2141                None => *want == Value::Null,
2142            },
2143        )
2144    }
2145
2146    /// Index fast-path execution for one MERGE row of the shape detected by
2147    /// [`Self::merge_single_node_fastpath`].
2148    ///
2149    /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2150    /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2151    /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2152    /// applies ON MATCH SET to every match, or creates the node and applies
2153    /// ON CREATE SET when there is none. A newly created vertex is folded into
2154    /// `existing` so a later row of the same batch with the same key matches it
2155    /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2156    /// match, or one for a create).
2157    ///
2158    /// `prefetched` is the statement-level property prefetch (`None` when the
2159    /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2160    /// vids carry their persisted base row, freshly created vids are seeded
2161    /// with an empty base — per-row reads then resolve as base + L0 layering
2162    /// (every SET flush writes the full row to L0 before the next read, so a
2163    /// prefetch hit equals a fresh read) instead of one storage scan each.
2164    ///
2165    /// # Errors
2166    /// Propagates evaluation, create, and SET failures.
2167    #[expect(
2168        clippy::too_many_arguments,
2169        reason = "mirrors execute_merge's threaded execution state"
2170    )]
2171    async fn execute_merge_row_indexed(
2172        &self,
2173        label: &str,
2174        node: &NodePattern,
2175        path_pattern: &Pattern,
2176        temp_vars: &[String],
2177        mut row: HashMap<String, Value>,
2178        key_props: &HashMap<String, Value>,
2179        persisted: &HashMap<MergeKey, Vec<Vid>>,
2180        key_tuple: &MergeKey,
2181        existing: &mut HashMap<MergeKey, Vec<Vid>>,
2182        on_match: Option<&SetClause>,
2183        on_create: Option<&SetClause>,
2184        prop_manager: &PropertyManager,
2185        params: &HashMap<String, Value>,
2186        ctx: Option<&QueryContext>,
2187        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2188        writer: &Writer,
2189        mut prefetched: Option<&mut Prefetch>,
2190    ) -> Result<Vec<HashMap<String, Value>>> {
2191        let empty_prefetch = Prefetch::default();
2192        let mut seen: HashSet<Vid> = HashSet::new();
2193        let mut matches: Vec<Vid> = Vec::new();
2194        // Persisted (flushed) matches from the per-statement prefetch. The
2195        // prefetch is static for the statement, so re-verify liveness at row
2196        // time — an earlier row of this batch may have deleted the candidate
2197        // or rewritten its key (the old per-row scan saw those through its L0
2198        // overlay checks; these are the same checks, moved to row time).
2199        if let Some(vids) = persisted.get(key_tuple) {
2200            for &vid in vids {
2201                if l0_visibility::is_vertex_deleted(vid, ctx) {
2202                    continue;
2203                }
2204                if Self::vid_overrides_break_key(vid, key_props, ctx) {
2205                    continue;
2206                }
2207                if seen.insert(vid) {
2208                    matches.push(vid);
2209                }
2210            }
2211        }
2212        // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2213        // in case a prior row of this batch mutated or deleted the candidate.
2214        if let Some(vids) = existing.get(key_tuple) {
2215            for &vid in vids {
2216                if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2217                    continue;
2218                }
2219                if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2220                    matches.push(vid);
2221                }
2222            }
2223        }
2224
2225        let mut out = Vec::new();
2226        if matches.is_empty() {
2227            // No match: create the node, then apply ON CREATE SET. Fold the
2228            // ON CREATE SET property assignments into seed props first so a
2229            // NOT-NULL property supplied only by ON CREATE SET passes
2230            // create-time validation (RC4); the post-create SET below settles
2231            // the final values.
2232            let seed_props = self
2233                .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2234                .await?;
2235            self.execute_create_pattern(
2236                path_pattern,
2237                &mut row,
2238                writer,
2239                prop_manager,
2240                params,
2241                ctx,
2242                tx_l0_override,
2243                Some(&seed_props),
2244            )
2245            .await?;
2246            // Fold the new vertex into the batch snapshot for intra-batch
2247            // dedup, and seed the statement prefetch with an empty base: a
2248            // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2249            // resolves from the L0 row the create just wrote instead of
2250            // issuing a per-row storage scan that finds nothing.
2251            if let Some(var) = &node.variable
2252                && let Some(val) = row.get(var)
2253                && let Ok(vid) = Self::vid_from_value(val)
2254            {
2255                existing.entry(key_tuple.clone()).or_default().push(vid);
2256                if let Some(p) = prefetched.as_deref_mut() {
2257                    p.vertex.entry(vid).or_default();
2258                }
2259                // Phantom guard (RC2): register this MERGE-create's key so a
2260                // concurrent MERGE of the same key aborts retriably at commit
2261                // (converging to one node) instead of silently duplicating —
2262                // even with no declared UNIQUE constraint. Only inside a
2263                // transaction, where commit re-probes the guard; a plain CREATE
2264                // never registers a key, so it is unaffected.
2265                if let Some(tx_l0) = tx_l0_override {
2266                    let key_values: Vec<(String, Value)> = key_props
2267                        .iter()
2268                        .map(|(k, v)| (k.clone(), v.clone()))
2269                        .collect();
2270                    let guard_key =
2271                        uni_store::runtime::l0::serialize_constraint_key(label, &key_values);
2272                    tx_l0.write().insert_merge_guard_key(guard_key, vid);
2273                }
2274            }
2275            if let Some(set) = on_create {
2276                self.execute_set_items_locked(
2277                    &set.items,
2278                    &mut row,
2279                    writer,
2280                    prop_manager,
2281                    params,
2282                    ctx,
2283                    tx_l0_override,
2284                    prefetched.as_deref().unwrap_or(&empty_prefetch),
2285                )
2286                .await?;
2287            }
2288            Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2289            out.push(row);
2290        } else {
2291            // Apply ON MATCH SET to every matched node (multi-match semantics),
2292            // binding the node variable as a Map with _vid/_labels/props so
2293            // RETURN and downstream operators resolve it as they would for the
2294            // general MATCH and CREATE paths.
2295            for vid in matches {
2296                let mut m = row.clone();
2297                if let Some(var) = &node.variable {
2298                    // Minimal binding so ON MATCH SET resolves the node by _vid.
2299                    m.insert(
2300                        var.clone(),
2301                        Self::build_node_map(vid, label, HashMap::new()),
2302                    );
2303                }
2304                if let Some(set) = on_match {
2305                    self.execute_set_items_locked(
2306                        &set.items,
2307                        &mut m,
2308                        writer,
2309                        prop_manager,
2310                        params,
2311                        ctx,
2312                        tx_l0_override,
2313                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2314                    )
2315                    .await?;
2316                }
2317                if let Some(var) = &node.variable {
2318                    // Rebind with full, post-SET properties for RETURN
2319                    // fidelity. The SET above flushed the full row to L0, so a
2320                    // prefetch hit (base + L0 layering) reproduces exactly
2321                    // what a fresh storage read would return.
2322                    let props = read_vertex_props_with_prefetch(
2323                        vid,
2324                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2325                        prop_manager,
2326                        ctx,
2327                    )
2328                    .await?;
2329                    m.insert(var.clone(), Self::build_node_map(vid, label, props));
2330                }
2331                Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2332                out.push(m);
2333            }
2334        }
2335        Ok(out)
2336    }
2337
2338    #[expect(clippy::too_many_arguments)]
2339    pub(crate) async fn execute_merge(
2340        &self,
2341        rows: Vec<HashMap<String, Value>>,
2342        pattern: &Pattern,
2343        on_match: Option<&SetClause>,
2344        on_create: Option<&SetClause>,
2345        prop_manager: &PropertyManager,
2346        params: &HashMap<String, Value>,
2347        ctx: Option<&QueryContext>,
2348        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2349    ) -> Result<Vec<HashMap<String, Value>>> {
2350        let writer_lock = self
2351            .writer
2352            .as_ref()
2353            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2354
2355        // Prepare pattern for path variable binding: assign temp edge variable
2356        // names to unnamed relationships in paths that have path variables.
2357        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2358
2359        // Issue #69: a single-node, single-label MERGE takes the fast path,
2360        // skipping the per-row query planning that made batched MERGE no faster
2361        // than a per-entity loop. Indexed keys get an index point-lookup;
2362        // un-indexed keys still skip planning (the lookup is a filtered scan).
2363        // The shape is the same for every row, so it is detected once.
2364        let fastpath = self.merge_single_node_fastpath(pattern);
2365
2366        // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2367        // fast path then resolves L0/intra-batch matches with an O(1) lookup
2368        // instead of re-walking L0 for every row. `key_names` is the sorted
2369        // static key set, matching `merge_key_tuple`.
2370        let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2371        // Per-row pre-evaluated fast-path keys (None = that row falls back to
2372        // the general path), and the per-statement persisted prefetch over the
2373        // deduped key tuples — ONE chunked scan instead of one scan per row.
2374        // Key expressions only see the row's own bindings + params, so
2375        // evaluating them ahead of any creates cannot observe earlier rows.
2376        let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2377        let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2378        // Statement-level property prefetch for the fast path (review perf
2379        // residual): every persisted match's full row is batch-read ONCE, so
2380        // the per-row ON MATCH SET read and the post-SET rebind resolve as
2381        // prefetch-base + L0 layering instead of one storage scan each.
2382        // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2383        // skips CRDT normalization).
2384        let mut merge_prefetch: Option<Prefetch> = None;
2385        if let Some((node, label)) = &fastpath {
2386            let mut key_names: Vec<String> = match &node.properties {
2387                Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2388                _ => Vec::new(),
2389            };
2390            key_names.sort();
2391            fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2392
2393            row_fast.reserve(rows.len());
2394            for row in &rows {
2395                let mut key_props: HashMap<String, Value> = HashMap::new();
2396                if let Some(props_expr) = &node.properties
2397                    && let Value::Map(map) = self
2398                        .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2399                        .await?
2400                {
2401                    key_props = map;
2402                }
2403                // Only rows whose every key value is a scalar the persisted
2404                // scan can express take the fast path (same gate as before,
2405                // via the filter builder).
2406                if Self::merge_key_filter(&key_props).is_some() {
2407                    let tuple = Self::merge_key_tuple(&key_props);
2408                    row_fast.push(Some((key_props, tuple)));
2409                } else {
2410                    row_fast.push(None);
2411                }
2412            }
2413            let unique_keys: HashSet<MergeKey> = row_fast
2414                .iter()
2415                .flatten()
2416                .map(|(_, tuple)| tuple.clone())
2417                .collect();
2418            let (persisted, schemaless_props) = self
2419                .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2420                .await?;
2421            fast_persisted = persisted;
2422            if self.merge_label_prefetch_safe(label) {
2423                let mut pf = Prefetch::default();
2424                if !schemaless_props.is_empty() {
2425                    // The schemaless lookup already decoded each matched vid's
2426                    // full property map — zero extra scans.
2427                    pf.vertex.extend(schemaless_props);
2428                } else {
2429                    let vids: Vec<Vid> = fast_persisted
2430                        .values()
2431                        .flatten()
2432                        .copied()
2433                        .collect::<HashSet<Vid>>()
2434                        .into_iter()
2435                        .collect();
2436                    if !vids.is_empty()
2437                        && let Ok(batch_props) = prop_manager
2438                            .get_batch_vertex_props_for_label(&vids, label, ctx)
2439                            .await
2440                    {
2441                        // One `_vid IN (…)` scan for every matched row's base.
2442                        // On Err the map stays empty — every read falls back to
2443                        // the per-row path (fail-open, same posture as
2444                        // prefetch_set_targets).
2445                        pf.vertex.extend(batch_props);
2446                    }
2447                }
2448                merge_prefetch = Some(pf);
2449            }
2450        }
2451
2452        // RC3: relationship-MERGE existence fast-path. The single-node fast path
2453        // above does not cover `(a)-[:R]->(b)`; the general path rebuilds and runs
2454        // a per-row traversal `LogicalPlan` just to check whether the edge exists
2455        // (~19x the bulk CREATE of the same edges). For the bound-endpoints,
2456        // anonymous-edge shape (and no ON MATCH SET, whose match-row semantics the
2457        // general path materialises) we resolve existence with one MVCC-correct
2458        // adjacency probe — `GraphExecutionContext::get_neighbors` merges CSR + all
2459        // L0 buffers including the transaction's own writes, so intra-batch edges
2460        // are seen — and reuse the general create / ON CREATE handling unchanged.
2461        // An ON MATCH SET with actual items needs the general path's materialised
2462        // match rows; a plain MERGE carries an *empty* on_match, which the fast
2463        // path can serve (it emits the row directly, applying nothing on match).
2464        let on_match_empty = on_match.is_none_or(|s| s.items.is_empty());
2465        let rel_fast = if fastpath.is_none() && on_match_empty {
2466            self.merge_relationship_fastpath_shape(pattern)
2467        } else {
2468            None
2469        };
2470        let rel_graph_ctx = rel_fast.as_ref().map(|_| {
2471            let l0_context = match ctx {
2472                Some(c) => crate::query::df_graph::L0Context::from_query_context(c),
2473                None => crate::query::df_graph::L0Context::empty(),
2474            };
2475            let pm_arc = self.prop_manager_arc.clone().unwrap_or_else(|| {
2476                Arc::new(PropertyManager::new(
2477                    self.storage.clone(),
2478                    self.storage.schema_manager_arc(),
2479                    prop_manager.cache_size(),
2480                ))
2481            });
2482            crate::query::df_graph::GraphExecutionContext::with_l0_context(
2483                self.effective_storage(),
2484                l0_context,
2485                pm_arc,
2486            )
2487        });
2488
2489        let mut results = Vec::new();
2490        for (idx, mut row) in rows.into_iter().enumerate() {
2491            // Rows with a pre-evaluated scalar key take the fast path; rows
2492            // with a non-scalar key fall through to the general path below.
2493            if let Some((node, label)) = &fastpath
2494                && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2495            {
2496                let writer: &uni_store::Writer = writer_lock.as_ref();
2497                let row_out = self
2498                    .execute_merge_row_indexed(
2499                        label,
2500                        node,
2501                        &path_pattern,
2502                        &temp_vars,
2503                        row,
2504                        key_props,
2505                        &fast_persisted,
2506                        key_tuple,
2507                        &mut fast_existing,
2508                        on_match,
2509                        on_create,
2510                        prop_manager,
2511                        params,
2512                        ctx,
2513                        tx_l0_override,
2514                        writer,
2515                        merge_prefetch.as_mut(),
2516                    )
2517                    .await?;
2518                results.extend(row_out);
2519                continue;
2520            }
2521
2522            // RC3 relationship fast path: bound endpoints → resolve edge
2523            // existence with one adjacency probe and reuse the general
2524            // create / ON CREATE handling, skipping the per-row traversal plan.
2525            if let (Some((src_var, dst_var, type_id, dir)), Some(graph_ctx)) =
2526                (rel_fast.as_ref(), rel_graph_ctx.as_ref())
2527            {
2528                let src_vid = row.get(src_var).and_then(|v| Self::vid_from_value(v).ok());
2529                let dst_vid = row.get(dst_var).and_then(|v| Self::vid_from_value(v).ok());
2530                if let (Some(src_vid), Some(dst_vid)) = (src_vid, dst_vid) {
2531                    let exists = graph_ctx
2532                        .get_neighbors(src_vid, *type_id, *dir)
2533                        .into_iter()
2534                        .any(|(n, _eid)| n == dst_vid);
2535                    let writer: &uni_store::Writer = writer_lock.as_ref();
2536                    if !exists {
2537                        // Edge absent: create only the edge (endpoints are bound),
2538                        // then apply ON CREATE SET — identical to the general
2539                        // create branch below.
2540                        let seed_props = self
2541                            .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2542                            .await?;
2543                        self.execute_create_pattern(
2544                            &path_pattern,
2545                            &mut row,
2546                            writer,
2547                            prop_manager,
2548                            params,
2549                            ctx,
2550                            tx_l0_override,
2551                            Some(&seed_props),
2552                        )
2553                        .await?;
2554                        if let Some(set) = on_create {
2555                            self.execute_set_items_locked(
2556                                &set.items,
2557                                &mut row,
2558                                writer,
2559                                prop_manager,
2560                                params,
2561                                ctx,
2562                                tx_l0_override,
2563                                &Prefetch::default(),
2564                            )
2565                            .await?;
2566                        }
2567                    }
2568                    // Whether matched or just created, the edge now exists; bind
2569                    // path variables and emit the row (the edge is anonymous, so
2570                    // there is no edge binding to reproduce, and ON MATCH SET is
2571                    // excluded from this fast path).
2572                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2573                    results.push(row);
2574                    continue;
2575                }
2576                // Endpoints not bound to vids → fall through to the general path.
2577            }
2578
2579            // General execution: match-or-create per row. (The index fast path
2580            // above already handles single-node, single-label, scalar-indexed
2581            // MERGE — including unique-constrained labels, whose keys are
2582            // indexed — so there is no separate constraint-only fast path.)
2583            let matches = self
2584                .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2585                .await?;
2586            let writer: &uni_store::Writer = writer_lock.as_ref();
2587
2588            let result: Result<Vec<HashMap<String, Value>>> = async {
2589                let mut batch = Vec::new();
2590                if !matches.is_empty() {
2591                    for mut m in matches {
2592                        if let Some(set) = on_match {
2593                            self.execute_set_items_locked(
2594                                &set.items,
2595                                &mut m,
2596                                writer,
2597                                prop_manager,
2598                                params,
2599                                ctx,
2600                                tx_l0_override,
2601                                &Prefetch::default(),
2602                            )
2603                            .await?;
2604                        }
2605                        Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2606                        batch.push(m);
2607                    }
2608                } else {
2609                    // Fold ON CREATE SET into seed props so a NOT-NULL property
2610                    // set only by ON CREATE SET passes create-time validation
2611                    // (RC4); the post-create SET below settles the final values.
2612                    let seed_props = self
2613                        .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2614                        .await?;
2615                    self.execute_create_pattern(
2616                        &path_pattern,
2617                        &mut row,
2618                        writer,
2619                        prop_manager,
2620                        params,
2621                        ctx,
2622                        tx_l0_override,
2623                        Some(&seed_props),
2624                    )
2625                    .await?;
2626                    if let Some(set) = on_create {
2627                        self.execute_set_items_locked(
2628                            &set.items,
2629                            &mut row,
2630                            writer,
2631                            prop_manager,
2632                            params,
2633                            ctx,
2634                            tx_l0_override,
2635                            &Prefetch::default(),
2636                        )
2637                        .await?;
2638                    }
2639                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2640                    batch.push(row);
2641                }
2642                Ok(batch)
2643            }
2644            .await;
2645
2646            results.extend(result?);
2647        }
2648        Ok(results)
2649    }
2650
2651    /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2652    ///
2653    /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2654    /// only by `ON CREATE SET` is present when the MERGE node is created and
2655    /// passes constraint validation (RC4). The right-hand side is evaluated
2656    /// against the current `row`.
2657    ///
2658    /// Items whose right-hand side references the target variable (e.g.
2659    /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2660    /// let the post-create SET read the seeded value and apply the assignment
2661    /// twice. Such items run only post-create, exactly once (unchanged behavior).
2662    ///
2663    /// # Errors
2664    /// Returns an error if evaluating an assignment's right-hand side fails.
2665    pub(crate) async fn on_create_seed_props(
2666        &self,
2667        on_create: Option<&SetClause>,
2668        row: &HashMap<String, Value>,
2669        prop_manager: &PropertyManager,
2670        params: &HashMap<String, Value>,
2671        ctx: Option<&QueryContext>,
2672    ) -> Result<HashMap<String, HashMap<String, Value>>> {
2673        let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2674        let Some(set) = on_create else {
2675            return Ok(seed);
2676        };
2677        for item in &set.items {
2678            if let SetItem::Property { expr, value } = item
2679                && let Expr::Property(var_expr, prop_name) = expr
2680                && let Expr::Variable(var_name) = &**var_expr
2681                // Skip self-referential RHS so the post-create SET (which also
2682                // runs) applies it exactly once rather than reading the seed.
2683                && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2684                    value, var_name,
2685                )
2686            {
2687                let val = self
2688                    .evaluate_expr(value, row, prop_manager, params, ctx)
2689                    .await?;
2690                seed.entry(var_name.clone())
2691                    .or_default()
2692                    .insert(prop_name.clone(), val);
2693            }
2694        }
2695        Ok(seed)
2696    }
2697
2698    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2699    #[expect(clippy::too_many_arguments)]
2700    pub(crate) async fn execute_create_pattern(
2701        &self,
2702        pattern: &Pattern,
2703        row: &mut HashMap<String, Value>,
2704        writer: &Writer,
2705        prop_manager: &PropertyManager,
2706        params: &HashMap<String, Value>,
2707        ctx: Option<&QueryContext>,
2708        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2709        // Per-variable properties to gap-fill into newly-created nodes before
2710        // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2711        // NOT-NULL property supplied only by ON CREATE SET passes create-time
2712        // validation (RC4). `None` for plain CREATE.
2713        seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2714    ) -> Result<()> {
2715        for path in &pattern.paths {
2716            let mut prev_vid: Option<Vid> = None;
2717            // (rel_var, type_id, type_name, props_expr, direction)
2718            type PendingRel = (String, u32, String, Option<Expr>, Direction);
2719            let mut rel_pending: Option<PendingRel> = None;
2720
2721            for element in &path.elements {
2722                match element {
2723                    PatternElement::Node(n) => {
2724                        let mut vid = None;
2725
2726                        // Check if node variable already bound in row
2727                        if let Some(var) = &n.variable
2728                            && let Some(val) = row.get(var)
2729                            && let Ok(existing_vid) = Self::vid_from_value(val)
2730                        {
2731                            vid = Some(existing_vid);
2732                        }
2733
2734                        // If not bound, create it
2735                        if vid.is_none() {
2736                            let mut props = HashMap::new();
2737                            if let Some(props_expr) = &n.properties {
2738                                let props_val = self
2739                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2740                                    .await?;
2741                                if let Value::Map(map) = props_val {
2742                                    for (k, v) in map {
2743                                        props.insert(k, v);
2744                                    }
2745                                } else {
2746                                    return Err(anyhow!("Properties must evaluate to a map"));
2747                                }
2748                            }
2749
2750                            // MERGE ON CREATE SET: gap-fill properties supplied
2751                            // only by ON CREATE SET so a NOT-NULL property absent
2752                            // from the merge key passes create-time validation
2753                            // (RC4). `or_insert` keeps the merge-key/pattern props
2754                            // authoritative; the post-create SET re-applies the
2755                            // real values, so the final state is unchanged.
2756                            if let Some(seed) = seed_props
2757                                && let Some(var) = &n.variable
2758                                && let Some(var_seed) = seed.get(var)
2759                            {
2760                                for (k, v) in var_seed {
2761                                    props.entry(k.clone()).or_insert_with(|| v.clone());
2762                                }
2763                            }
2764
2765                            let schema = self.storage.schema_manager().schema();
2766
2767                            // Strict schema: reject undeclared labels.
2768                            if self.config.strict_schema {
2769                                for label_name in &n.labels {
2770                                    if schema.get_label_case_insensitive(label_name).is_none() {
2771                                        return Err(anyhow!(
2772                                            "Label '{}' is not defined in the schema \
2773                                             (strict_schema is enabled). \
2774                                             Declare it with db.schema().label(...).apply() first.",
2775                                            label_name
2776                                        ));
2777                                    }
2778                                }
2779                            }
2780
2781                            // VID generation is label-independent. Pull from the
2782                            // per-tx reservoir if set (amortizes the global
2783                            // IdAllocator mutex), else fall back to the direct
2784                            // per-VID path.
2785                            let new_vid = match &self.id_reservoir {
2786                                Some(r) => r.next_vid().await?,
2787                                None => writer.next_vid().await?,
2788                            };
2789
2790                            // Enrich with generated columns only for known labels
2791                            for label_name in &n.labels {
2792                                if schema.get_label_case_insensitive(label_name).is_some() {
2793                                    self.enrich_properties_with_generated_columns(
2794                                        label_name,
2795                                        &mut props,
2796                                        prop_manager,
2797                                        params,
2798                                        ctx,
2799                                    )
2800                                    .await?;
2801                                }
2802                            }
2803
2804                            // Validate/coerce against declared types AFTER enrichment, so
2805                            // a type mismatch is rejected here rather than silently nulled
2806                            // (and the row dropped) at flush — issue #68.
2807                            let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2808
2809                            // Insert vertex and get back final properties (includes auto-generated embeddings)
2810                            let final_props = writer
2811                                .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2812                                .await?;
2813
2814                            // Build node object with final properties (includes embeddings)
2815                            if let Some(var) = &n.variable {
2816                                let mut obj = HashMap::new();
2817                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2818                                let labels_list: Vec<Value> =
2819                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
2820                                obj.insert("_labels".to_string(), Value::List(labels_list));
2821                                for (k, v) in &final_props {
2822                                    obj.insert(k.clone(), v.clone());
2823                                }
2824                                // Store node as a Map with _vid, matching MATCH behavior
2825                                row.insert(var.clone(), Value::Map(obj));
2826                            }
2827                            vid = Some(new_vid);
2828                        }
2829
2830                        let current_vid = vid.unwrap();
2831
2832                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2833                            rel_pending.take()
2834                            && let Some(src) = prev_vid
2835                        {
2836                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2837
2838                            if !is_rel_bound {
2839                                let mut rel_props = HashMap::new();
2840                                if let Some(expr) = rel_props_expr {
2841                                    let val = self
2842                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
2843                                        .await?;
2844                                    if let Value::Map(map) = val {
2845                                        rel_props.extend(map);
2846                                    }
2847                                }
2848                                // Validate/coerce edge properties against the declared
2849                                // edge-type schema before storing — issue #68.
2850                                let edge_schema = self.storage.schema_manager().schema();
2851                                let rel_props = Self::coerce_and_validate_props(
2852                                    rel_props,
2853                                    &edge_schema,
2854                                    std::slice::from_ref(&type_name),
2855                                )?;
2856                                let eid = match &self.id_reservoir {
2857                                    Some(r) => r.next_eid().await?,
2858                                    None => writer.next_eid(type_id).await?,
2859                                };
2860
2861                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2862                                let (edge_src, edge_dst) = match dir {
2863                                    Direction::Incoming => (current_vid, src),
2864                                    _ => (src, current_vid),
2865                                };
2866
2867                                let store_props = !rel_var.is_empty();
2868                                let user_props = if store_props {
2869                                    rel_props.clone()
2870                                } else {
2871                                    HashMap::new()
2872                                };
2873
2874                                writer
2875                                    .insert_edge(
2876                                        edge_src,
2877                                        edge_dst,
2878                                        type_id,
2879                                        eid,
2880                                        rel_props,
2881                                        Some(type_name.clone()),
2882                                        tx_l0,
2883                                    )
2884                                    .await?;
2885
2886                                // Edge type name is now stored by insert_edge
2887
2888                                if store_props {
2889                                    let mut edge_map = HashMap::new();
2890                                    edge_map.insert(
2891                                        "_eid".to_string(),
2892                                        Value::Int(eid.as_u64() as i64),
2893                                    );
2894                                    edge_map.insert(
2895                                        "_src".to_string(),
2896                                        Value::Int(edge_src.as_u64() as i64),
2897                                    );
2898                                    edge_map.insert(
2899                                        "_dst".to_string(),
2900                                        Value::Int(edge_dst.as_u64() as i64),
2901                                    );
2902                                    edge_map
2903                                        .insert("_type".to_string(), Value::Int(type_id as i64));
2904                                    // Include user properties so downstream RETURN sees them
2905                                    for (k, v) in user_props {
2906                                        edge_map.insert(k, v);
2907                                    }
2908                                    row.insert(rel_var, Value::Map(edge_map));
2909                                }
2910                            }
2911                        }
2912                        prev_vid = Some(current_vid);
2913                    }
2914                    PatternElement::Relationship(r) => {
2915                        if r.types.len() != 1 {
2916                            return Err(anyhow!(
2917                                "CREATE relationship must specify exactly one type"
2918                            ));
2919                        }
2920                        let type_name = &r.types[0];
2921                        let type_id = if self.config.strict_schema {
2922                            let schema = self.storage.schema_manager().schema();
2923                            schema
2924                                .edge_type_id_by_name_case_insensitive(type_name)
2925                                .ok_or_else(|| {
2926                                    anyhow!(
2927                                        "Edge type '{}' is not defined in the schema \
2928                                         (strict_schema is enabled). \
2929                                         Declare it with db.schema().edge_type(...).apply() first.",
2930                                        type_name
2931                                    )
2932                                })?
2933                        } else {
2934                            // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2935                            self.storage
2936                                .schema_manager()
2937                                .get_or_assign_edge_type_id(type_name)
2938                        };
2939
2940                        rel_pending = Some((
2941                            r.variable.clone().unwrap_or_default(),
2942                            type_id,
2943                            type_name.clone(),
2944                            r.properties.clone(),
2945                            r.direction.clone(),
2946                        ));
2947                    }
2948                    PatternElement::Parenthesized { .. } => {
2949                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2950                    }
2951                }
2952            }
2953        }
2954        Ok(())
2955    }
2956
2957    /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2958    ///
2959    /// These are never valid OpenCypher property values regardless of the declared column
2960    /// type. A `CypherValue` column is the sole exception and is handled by the caller
2961    /// before this is reached.
2962    ///
2963    /// # Errors
2964    /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2965    fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2966        match val {
2967            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2968                anyhow::bail!(
2969                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2970                    prop_name
2971                );
2972            }
2973            Value::List(items) => {
2974                for item in items {
2975                    if matches!(
2976                        item,
2977                        Value::Map(_)
2978                            | Value::Node(_)
2979                            | Value::Edge(_)
2980                            | Value::Path(_)
2981                            | Value::List(_)
2982                    ) {
2983                        anyhow::bail!(
2984                            "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2985                            prop_name
2986                        );
2987                    }
2988                }
2989            }
2990            _ => {}
2991        }
2992        Ok(())
2993    }
2994
2995    /// Validates and coerces `val` against the declared schema type for `prop_name`.
2996    ///
2997    /// Returns the value to actually persist. Beyond the structural checks in
2998    /// [`Self::validate_structural_property_value`], this compares the value against the
2999    /// column's declared `DataType` and:
3000    ///
3001    /// - returns it unchanged when directly storable (including the intentional
3002    ///   `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
3003    /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
3004    ///   column into the proper `Temporal` value, using the same parser as the Cypher
3005    ///   `date()`/`time()`/`datetime()`/`duration()` constructors;
3006    /// - otherwise returns an error, so a type mismatch is surfaced at the call site
3007    ///   rather than silently nulled — and the row dropped at flush. See issue #68.
3008    ///
3009    /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
3010    /// behavior.
3011    ///
3012    /// # Errors
3013    /// Returns an error if the value's type is incompatible with the declared column type,
3014    /// or if a string destined for a temporal column is not a valid temporal literal.
3015    fn coerce_and_validate_property_value(
3016        prop_name: &str,
3017        val: Value,
3018        schema: &uni_common::core::schema::Schema,
3019        labels: &[String],
3020    ) -> Result<Value> {
3021        use uni_common::core::schema::DataType;
3022
3023        // Resolve the declared type from the first label that declares this property.
3024        let declared = labels.iter().find_map(|label| {
3025            schema
3026                .properties
3027                .get(label)
3028                .and_then(|props| props.get(prop_name))
3029                .map(|meta| &meta.r#type)
3030        });
3031
3032        // CypherValue columns accept any value (including maps) — skip all checks.
3033        if matches!(declared, Some(DataType::CypherValue)) {
3034            return Ok(val);
3035        }
3036
3037        let Some(dt) = declared else {
3038            // Schemaless property: reject structural values (maps/nodes/edges/paths and
3039            // lists containing them), otherwise store as-is.
3040            Self::validate_structural_property_value(prop_name, &val)?;
3041            return Ok(val);
3042        };
3043
3044        // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
3045        // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
3046        // `Vector`) receiving their matching value, and `Null` (always accepted).
3047        if dt.accepts(&val) {
3048            return Ok(val);
3049        }
3050
3051        // Known-safe coercion: a string into a temporal column is parsed as if it had
3052        // been wrapped in the matching Cypher temporal constructor.
3053        if matches!(val, Value::String(_)) {
3054            let ctor = match dt {
3055                DataType::DateTime => Some("DATETIME"),
3056                DataType::Date => Some("DATE"),
3057                DataType::Time => Some("TIME"),
3058                DataType::Duration => Some("DURATION"),
3059                _ => None,
3060            };
3061            if let Some(name) = ctor {
3062                return uni_query_functions::datetime::eval_datetime_function(
3063                    name,
3064                    std::slice::from_ref(&val),
3065                )
3066                .map_err(|e| {
3067                    anyhow!(
3068                        "TypeError: property '{}' is declared {:?} but the string value could \
3069                         not be parsed as a {} literal: {}",
3070                        prop_name,
3071                        dt,
3072                        name,
3073                        e
3074                    )
3075                });
3076            }
3077        }
3078
3079        // Not storable and not coercible. Prefer the structural message when the value
3080        // is itself structural (e.g. a map into a scalar column), preserving prior
3081        // behavior; otherwise report the scalar type mismatch.
3082        Self::validate_structural_property_value(prop_name, &val)?;
3083        anyhow::bail!(
3084            "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
3085            prop_name,
3086            dt,
3087            value_type_name(&val)
3088        );
3089    }
3090
3091    /// Coerces and validates every property in `props` against the declared types for `labels`.
3092    ///
3093    /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
3094    /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
3095    /// before handing properties to the writer, so a type mismatch is rejected up front rather
3096    /// than silently nulled — and the row dropped — at flush (issue #68).
3097    ///
3098    /// # Errors
3099    /// Returns an error on the first property whose value is incompatible with its declared type.
3100    fn coerce_and_validate_props(
3101        props: HashMap<String, Value>,
3102        schema: &uni_common::core::schema::Schema,
3103        labels: &[String],
3104    ) -> Result<HashMap<String, Value>> {
3105        let mut out = HashMap::with_capacity(props.len());
3106        for (k, v) in props {
3107            let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
3108            out.insert(k, cv);
3109        }
3110        Ok(out)
3111    }
3112
3113    #[expect(clippy::too_many_arguments)]
3114    pub(crate) async fn execute_set_items_locked(
3115        &self,
3116        items: &[SetItem],
3117        row: &mut HashMap<String, Value>,
3118        writer: &Writer,
3119        prop_manager: &PropertyManager,
3120        params: &HashMap<String, Value>,
3121        ctx: Option<&QueryContext>,
3122        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3123        prefetched: &Prefetch,
3124    ) -> Result<()> {
3125        // Coalesce SetItem::Property items by target so we do ONE read + ONE
3126        // write per (variable, target) instead of one read-modify-write cycle
3127        // per item. For an UPDATE that sets N properties on the same vertex
3128        // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
3129        // n.confidence = ...`), this collapses N redundant
3130        // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
3131        // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
3132        // the measurement, and the plan in
3133        // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
3134        // for the rationale.
3135        //
3136        // RHS evaluation order is preserved: we evaluate each RHS inline and
3137        // update the row binding immediately, so a later SetItem on the same
3138        // variable that reads `n.<earlier-prop>` sees the new value.
3139        //
3140        // Non-Property variants (Labels, Variable, VariablePlus) are less
3141        // common and have lower payoff; before processing one, we flush any
3142        // pending updates for the same variable so it sees the latest L0
3143        // state and ordering semantics are preserved.
3144        let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
3145        let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
3146
3147        for item in items {
3148            match item {
3149                SetItem::Property { expr, value } => {
3150                    if let Expr::Property(var_expr, prop_name) = expr
3151                        && let Expr::Variable(var_name) = &**var_expr
3152                        && let Some(node_val) = row.get(var_name)
3153                    {
3154                        if let Ok(vid) = Self::vid_from_value(node_val) {
3155                            reject_if_ephemeral_vid(vid)?;
3156                            let labels =
3157                                Self::extract_labels_from_node(node_val).unwrap_or_default();
3158                            let schema = self.storage.schema_manager().schema().clone();
3159
3160                            // Lazy one-time read. Always read the full row
3161                            // (preserves CRDT merge + constraint validation
3162                            // + scan-side L0 visibility). The
3163                            // partial-lance-writes optimization happens
3164                            // PURELY AT FLUSH TIME via the per-VID
3165                            // `vertex_partial_keys` set tracked in L0 — so
3166                            // L0 holds the full row, scans see the full
3167                            // row, and Lance only receives the touched
3168                            // columns. Generated-column-bearing labels
3169                            // ride the partial path too (Round 12 §C):
3170                            // `enrich_properties_with_generated_columns`
3171                            // runs at flush time over the merged-in-L0
3172                            // full row, and the produced generator keys
3173                            // are appended to `touched` so they land in
3174                            // the MergeInsert source.
3175                            if !pending_v.contains_key(var_name) {
3176                                let storage_cfg = &self.storage.config;
3177                                let partial = storage_cfg.partial_lance_writes;
3178                                let read = read_vertex_props_with_prefetch(
3179                                    vid,
3180                                    prefetched,
3181                                    prop_manager,
3182                                    ctx,
3183                                )
3184                                .await?;
3185                                pending_v.insert(
3186                                    var_name.clone(),
3187                                    PendingVertexSet {
3188                                        vid,
3189                                        labels: labels.clone(),
3190                                        props: read,
3191                                        partial,
3192                                        touched: HashSet::new(),
3193                                    },
3194                                );
3195                            }
3196
3197                            let val = self
3198                                .evaluate_expr(value, row, prop_manager, params, ctx)
3199                                .await?;
3200                            let val = Self::coerce_and_validate_property_value(
3201                                prop_name, val, &schema, &labels,
3202                            )?;
3203
3204                            let pv = pending_v
3205                                .get_mut(var_name)
3206                                .expect("inserted above when absent");
3207                            pv.props.insert(prop_name.clone(), val.clone());
3208                            if pv.partial {
3209                                pv.touched.insert(prop_name.clone());
3210                            }
3211
3212                            // Update the row binding so subsequent RHS sees the new value.
3213                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3214                                node_map.insert(prop_name.clone(), val);
3215                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
3216                                node.properties.insert(prop_name.clone(), val);
3217                            }
3218                        } else if let Value::Map(map) = node_val
3219                            && map.get("_eid").is_some_and(|v| !v.is_null())
3220                            && map.get("_src").is_some_and(|v| !v.is_null())
3221                            && map.get("_dst").is_some_and(|v| !v.is_null())
3222                            && (map.get("_type").is_some_and(|v| !v.is_null())
3223                                || map.get("_type_name").is_some_and(|v| !v.is_null()))
3224                        {
3225                            let ei = self.extract_edge_identity(map)?;
3226                            reject_if_ephemeral_eid(ei.eid)?;
3227                            let schema = self.storage.schema_manager().schema().clone();
3228                            // Handle _type as either String or Int (Int from CREATE, String
3229                            // from queries). UNWIND on VLP edge lists emits `_type_name`
3230                            // instead of `_type`; accept either.
3231                            let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3232                            let edge_type_name = match type_val {
3233                                Some(Value::String(s)) => s.clone(),
3234                                Some(Value::Int(id)) => schema
3235                                    .edge_type_name_by_id_unified(*id as u32)
3236                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
3237                                _ => String::new(),
3238                            };
3239
3240                            if !pending_e.contains_key(var_name) {
3241                                let initial = read_edge_props_with_prefetch(
3242                                    ei.eid,
3243                                    prefetched,
3244                                    prop_manager,
3245                                    ctx,
3246                                )
3247                                .await?;
3248                                let partial = self.storage.config.partial_lance_writes;
3249                                pending_e.insert(
3250                                    var_name.clone(),
3251                                    PendingEdgeSet {
3252                                        src: ei.src,
3253                                        dst: ei.dst,
3254                                        edge_type_id: ei.edge_type_id,
3255                                        eid: ei.eid,
3256                                        edge_type_name: edge_type_name.clone(),
3257                                        props: initial,
3258                                        partial,
3259                                        touched: HashSet::new(),
3260                                    },
3261                                );
3262                            }
3263
3264                            let val = self
3265                                .evaluate_expr(value, row, prop_manager, params, ctx)
3266                                .await?;
3267                            let val = Self::coerce_and_validate_property_value(
3268                                prop_name,
3269                                val,
3270                                &schema,
3271                                std::slice::from_ref(&edge_type_name),
3272                            )?;
3273
3274                            let pe = pending_e
3275                                .get_mut(var_name)
3276                                .expect("inserted above when absent");
3277                            pe.props.insert(prop_name.clone(), val.clone());
3278                            if pe.partial {
3279                                pe.touched.insert(prop_name.clone());
3280                            }
3281
3282                            // Update the row object so subsequent RHS sees the new value.
3283                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3284                                edge_map.insert(prop_name.clone(), val);
3285                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3286                                edge.properties.insert(prop_name.clone(), val);
3287                            }
3288                        } else if let Value::Edge(edge) = node_val {
3289                            // Handle Value::Edge directly (when traverse returns Edge objects).
3290                            reject_if_ephemeral_eid(edge.eid)?;
3291                            let eid = edge.eid;
3292                            let src = edge.src;
3293                            let dst = edge.dst;
3294                            let edge_type_name = edge.edge_type.clone();
3295                            let etype =
3296                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3297                            let schema = self.storage.schema_manager().schema().clone();
3298
3299                            if !pending_e.contains_key(var_name) {
3300                                let initial = read_edge_props_with_prefetch(
3301                                    eid,
3302                                    prefetched,
3303                                    prop_manager,
3304                                    ctx,
3305                                )
3306                                .await?;
3307                                let partial = self.storage.config.partial_lance_writes;
3308                                pending_e.insert(
3309                                    var_name.clone(),
3310                                    PendingEdgeSet {
3311                                        src,
3312                                        dst,
3313                                        edge_type_id: etype,
3314                                        eid,
3315                                        edge_type_name: edge_type_name.clone(),
3316                                        props: initial,
3317                                        partial,
3318                                        touched: HashSet::new(),
3319                                    },
3320                                );
3321                            }
3322
3323                            let val = self
3324                                .evaluate_expr(value, row, prop_manager, params, ctx)
3325                                .await?;
3326                            let val = Self::coerce_and_validate_property_value(
3327                                prop_name,
3328                                val,
3329                                &schema,
3330                                std::slice::from_ref(&edge_type_name),
3331                            )?;
3332
3333                            let pe = pending_e
3334                                .get_mut(var_name)
3335                                .expect("inserted above when absent");
3336                            pe.props.insert(prop_name.clone(), val.clone());
3337                            if pe.partial {
3338                                pe.touched.insert(prop_name.clone());
3339                            }
3340
3341                            // Update the row object so subsequent RHS sees the new value.
3342                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3343                                edge.properties.insert(prop_name.clone(), val);
3344                            }
3345                        }
3346                    }
3347                }
3348                SetItem::Labels { variable, labels } => {
3349                    // Flush any pending writes for this var so the Labels op
3350                    // sees latest L0 state. Other variables' pending writes
3351                    // can keep waiting (they're independent).
3352                    self.flush_pending_var(
3353                        variable,
3354                        &mut pending_v,
3355                        &mut pending_e,
3356                        writer,
3357                        prop_manager,
3358                        params,
3359                        ctx,
3360                        tx_l0,
3361                        prefetched,
3362                    )
3363                    .await?;
3364
3365                    if let Some(node_val) = row.get(variable)
3366                        && let Ok(vid) = Self::vid_from_value(node_val)
3367                    {
3368                        reject_if_ephemeral_vid(vid)?;
3369                        let registry = self
3370                            .procedure_registry
3371                            .as_ref()
3372                            .and_then(|pr| pr.plugin_registry());
3373                        reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3374
3375                        // Get current labels from node value
3376                        let current_labels =
3377                            Self::extract_labels_from_node(node_val).unwrap_or_default();
3378
3379                        // Determine new labels to add (skip duplicates)
3380                        let labels_to_add: Vec<_> = labels
3381                            .iter()
3382                            .filter(|l| !current_labels.contains(l))
3383                            .cloned()
3384                            .collect();
3385
3386                        if !labels_to_add.is_empty() {
3387                            // Resolve the FULL new label set and write it to the
3388                            // TRANSACTION buffer (so the change is transactional
3389                            // and OCC-conflictable), falling back to the context
3390                            // (main) L0 for non-transactional callers. Replace
3391                            // semantics via `set_vertex_labels`.
3392                            let mut new_labels = current_labels;
3393                            new_labels.extend(labels_to_add);
3394                            if let Some(ctx) = ctx {
3395                                let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3396                                l0.write().set_vertex_labels(vid, &new_labels);
3397                            }
3398
3399                            // Update the node value in the row with the new labels.
3400                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
3401                                let labels_list =
3402                                    new_labels.into_iter().map(Value::String).collect();
3403                                obj.insert("_labels".to_string(), Value::List(labels_list));
3404                            }
3405                        }
3406                    }
3407                }
3408                SetItem::Variable { variable, value }
3409                | SetItem::VariablePlus { variable, value } => {
3410                    // Flush this var's pending writes first so the
3411                    // replace/merge op sees them as latest L0 state.
3412                    self.flush_pending_var(
3413                        variable,
3414                        &mut pending_v,
3415                        &mut pending_e,
3416                        writer,
3417                        prop_manager,
3418                        params,
3419                        ctx,
3420                        tx_l0,
3421                        prefetched,
3422                    )
3423                    .await?;
3424
3425                    let replace = matches!(item, SetItem::Variable { .. });
3426                    let op_str = if replace { "=" } else { "+=" };
3427
3428                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3429                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3430                        continue;
3431                    }
3432                    let rhs = self
3433                        .evaluate_expr(value, row, prop_manager, params, ctx)
3434                        .await?;
3435                    let new_props =
3436                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3437                            anyhow!(
3438                                "SET {} {} expr: right-hand side must evaluate to a map, \
3439                                 node, or relationship",
3440                                variable,
3441                                op_str
3442                            )
3443                        })?;
3444                    self.apply_properties_to_entity(
3445                        variable,
3446                        new_props,
3447                        replace,
3448                        row,
3449                        writer,
3450                        prop_manager,
3451                        params,
3452                        ctx,
3453                        tx_l0,
3454                        prefetched,
3455                    )
3456                    .await?;
3457                }
3458            }
3459        }
3460
3461        // Flush all remaining coalesced writes — one writer call per target.
3462        // Partial entries (no generated columns) call
3463        // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3464        // but the touched-keys hint drives a MergeInsert at flush. Full
3465        // entries continue through the legacy
3466        // `insert_vertex_with_labels` (Append) path with
3467        // generated-column enrichment.
3468        for (_var_name, mut pv) in pending_v {
3469            if pv.partial {
3470                // Round 12 §C: run the generator enrichment over the
3471                // merged-in-L0 full row, then add the produced generator
3472                // keys to `touched` so they ride the MergeInsert source.
3473                // Idempotent — generators always recompute against the
3474                // post-merge property map.
3475                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3476                for label_name in &pv.labels {
3477                    self.enrich_properties_with_generated_columns(
3478                        label_name,
3479                        &mut pv.props,
3480                        prop_manager,
3481                        params,
3482                        ctx,
3483                    )
3484                    .await?;
3485                }
3486                for k in pv.props.keys() {
3487                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3488                        pv.touched.insert(k.clone());
3489                    }
3490                }
3491                writer
3492                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3493                    .await?;
3494            } else {
3495                for label_name in &pv.labels {
3496                    self.enrich_properties_with_generated_columns(
3497                        label_name,
3498                        &mut pv.props,
3499                        prop_manager,
3500                        params,
3501                        ctx,
3502                    )
3503                    .await?;
3504                }
3505                let _ = writer
3506                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3507                    .await?;
3508            }
3509        }
3510        for (_var_name, pe) in pending_e {
3511            if pe.partial {
3512                writer
3513                    .insert_edge_partial_full(
3514                        pe.src,
3515                        pe.dst,
3516                        pe.edge_type_id,
3517                        pe.eid,
3518                        pe.props,
3519                        Some(pe.edge_type_name),
3520                        pe.touched,
3521                        tx_l0,
3522                    )
3523                    .await?;
3524            } else {
3525                writer
3526                    .insert_edge(
3527                        pe.src,
3528                        pe.dst,
3529                        pe.edge_type_id,
3530                        pe.eid,
3531                        pe.props,
3532                        Some(pe.edge_type_name),
3533                        tx_l0,
3534                    )
3535                    .await?;
3536            }
3537        }
3538
3539        Ok(())
3540    }
3541
3542    /// Flush pending SET state for a single variable to the writer.
3543    ///
3544    /// Called from the SET loop when about to process a Labels /
3545    /// Variable / VariablePlus item on `var`, so the subsequent op
3546    /// sees latest L0 state and ordering is preserved.
3547    #[expect(clippy::too_many_arguments)]
3548    async fn flush_pending_var(
3549        &self,
3550        var: &str,
3551        pending_v: &mut HashMap<String, PendingVertexSet>,
3552        pending_e: &mut HashMap<String, PendingEdgeSet>,
3553        writer: &Writer,
3554        prop_manager: &PropertyManager,
3555        _params: &HashMap<String, Value>,
3556        ctx: Option<&QueryContext>,
3557        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3558        _prefetched: &Prefetch,
3559    ) -> Result<()> {
3560        if let Some(mut pv) = pending_v.remove(var) {
3561            if pv.partial {
3562                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3563                for label_name in &pv.labels {
3564                    self.enrich_properties_with_generated_columns(
3565                        label_name,
3566                        &mut pv.props,
3567                        prop_manager,
3568                        _params,
3569                        ctx,
3570                    )
3571                    .await?;
3572                }
3573                for k in pv.props.keys() {
3574                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3575                        pv.touched.insert(k.clone());
3576                    }
3577                }
3578                writer
3579                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3580                    .await?;
3581            } else {
3582                for label_name in &pv.labels {
3583                    self.enrich_properties_with_generated_columns(
3584                        label_name,
3585                        &mut pv.props,
3586                        prop_manager,
3587                        _params,
3588                        ctx,
3589                    )
3590                    .await?;
3591                }
3592                let _ = writer
3593                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3594                    .await?;
3595            }
3596        }
3597        if let Some(pe) = pending_e.remove(var) {
3598            if pe.partial {
3599                writer
3600                    .insert_edge_partial_full(
3601                        pe.src,
3602                        pe.dst,
3603                        pe.edge_type_id,
3604                        pe.eid,
3605                        pe.props,
3606                        Some(pe.edge_type_name),
3607                        pe.touched,
3608                        tx_l0,
3609                    )
3610                    .await?;
3611            } else {
3612                writer
3613                    .insert_edge(
3614                        pe.src,
3615                        pe.dst,
3616                        pe.edge_type_id,
3617                        pe.eid,
3618                        pe.props,
3619                        Some(pe.edge_type_name),
3620                        tx_l0,
3621                    )
3622                    .await?;
3623            }
3624        }
3625        Ok(())
3626    }
3627
3628    /// Execute REMOVE clause items (property removal or label removal).
3629    ///
3630    /// Property removals are batched per variable to avoid stale reads: when
3631    /// multiple properties of the same entity are removed in one REMOVE clause,
3632    /// we read from storage once, null all specified properties, and write back
3633    /// once. This prevents the second removal from reading stale data that
3634    /// doesn't reflect the first removal's L0 write.
3635    #[expect(clippy::too_many_arguments)]
3636    pub(crate) async fn execute_remove_items_locked(
3637        &self,
3638        items: &[RemoveItem],
3639        row: &mut HashMap<String, Value>,
3640        writer: &Writer,
3641        prop_manager: &PropertyManager,
3642        ctx: Option<&QueryContext>,
3643        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3644        prefetched: &Prefetch,
3645    ) -> Result<()> {
3646        // Collect property names to remove, grouped by variable.
3647        // Use Vec<(String, Vec<String>)> to preserve insertion order.
3648        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3649
3650        for item in items {
3651            match item {
3652                RemoveItem::Property(expr) => {
3653                    if let Expr::Property(var_expr, prop_name) = expr
3654                        && let Expr::Variable(var_name) = &**var_expr
3655                    {
3656                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3657                            entry.1.push(prop_name.clone());
3658                        } else {
3659                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3660                        }
3661                    }
3662                }
3663                RemoveItem::Labels { variable, labels } => {
3664                    self.execute_remove_labels(variable, labels, row, ctx)?;
3665                }
3666            }
3667        }
3668
3669        // Execute batched property removals per variable.
3670        for (var_name, prop_names) in &prop_removals {
3671            let Some(node_val) = row.get(var_name) else {
3672                continue;
3673            };
3674
3675            if let Ok(vid) = Self::vid_from_value(node_val) {
3676                // Vertex property removal
3677                let mut props =
3678                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3679
3680                // Only write back if at least one property actually exists
3681                let removed_count = prop_names
3682                    .iter()
3683                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3684                    .count();
3685                let any_exist = removed_count > 0;
3686                if any_exist {
3687                    writer.track_properties_removed(removed_count, tx_l0);
3688                    for prop_name in prop_names {
3689                        props.insert(prop_name.clone(), Value::Null);
3690                    }
3691                }
3692                // Compute effective properties (post-removal) for _all_props
3693                let effective: HashMap<String, Value> = props
3694                    .iter()
3695                    .filter(|(_, v)| !v.is_null())
3696                    .map(|(k, v)| (k.clone(), v.clone()))
3697                    .collect();
3698                if any_exist {
3699                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3700                    let _ = writer
3701                        .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3702                        .await?;
3703                }
3704
3705                // Update the row map: set removed props to Null
3706                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3707                    for prop_name in prop_names {
3708                        node_map.insert(prop_name.clone(), Value::Null);
3709                    }
3710                    // Set _all_props to the complete effective property set
3711                    node_map.insert("_all_props".to_string(), Value::Map(effective));
3712                }
3713            } else if let Value::Map(map) = node_val {
3714                // Edge property removal (map-encoded)
3715                // Check for non-null _eid to skip OPTIONAL MATCH null edges
3716                let mut edge_effective: Option<HashMap<String, Value>> = None;
3717                if map.get("_eid").is_some_and(|v| !v.is_null()) {
3718                    let ei = self.extract_edge_identity(map)?;
3719                    let mut props =
3720                        read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3721                            .await?;
3722
3723                    let removed_count = prop_names
3724                        .iter()
3725                        .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3726                        .count();
3727                    let any_exist = removed_count > 0;
3728                    if any_exist {
3729                        writer.track_properties_removed(removed_count, tx_l0);
3730                        for prop_name in prop_names {
3731                            props.insert(prop_name.to_string(), Value::Null);
3732                        }
3733                    }
3734                    // Compute effective properties (post-removal) for _all_props
3735                    edge_effective = Some(
3736                        props
3737                            .iter()
3738                            .filter(|(_, v)| !v.is_null())
3739                            .map(|(k, v)| (k.clone(), v.clone()))
3740                            .collect(),
3741                    );
3742                    if any_exist {
3743                        let edge_type_name = map
3744                            .get("_type")
3745                            .and_then(|v| v.as_str())
3746                            .map(|s| s.to_string())
3747                            .or_else(|| {
3748                                self.storage
3749                                    .schema_manager()
3750                                    .edge_type_name_by_id_unified(ei.edge_type_id)
3751                            });
3752                        writer
3753                            .insert_edge(
3754                                ei.src,
3755                                ei.dst,
3756                                ei.edge_type_id,
3757                                ei.eid,
3758                                props,
3759                                edge_type_name,
3760                                tx_l0,
3761                            )
3762                            .await?;
3763                    }
3764                }
3765
3766                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3767                    for prop_name in prop_names {
3768                        edge_map.insert(prop_name.clone(), Value::Null);
3769                    }
3770                    if let Some(effective) = edge_effective {
3771                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
3772                    }
3773                }
3774            } else if let Value::Edge(edge) = node_val {
3775                // Edge property removal (Value::Edge)
3776                let eid = edge.eid;
3777                let src = edge.src;
3778                let dst = edge.dst;
3779                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3780
3781                let mut props =
3782                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3783
3784                let removed_count = prop_names
3785                    .iter()
3786                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3787                    .count();
3788                if removed_count > 0 {
3789                    writer.track_properties_removed(removed_count, tx_l0);
3790                    for prop_name in prop_names {
3791                        props.insert(prop_name.to_string(), Value::Null);
3792                    }
3793                    writer
3794                        .insert_edge(
3795                            src,
3796                            dst,
3797                            etype,
3798                            eid,
3799                            props,
3800                            Some(edge.edge_type.clone()),
3801                            tx_l0,
3802                        )
3803                        .await?;
3804                }
3805
3806                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3807                    for prop_name in prop_names {
3808                        edge.properties.insert(prop_name.to_string(), Value::Null);
3809                    }
3810                }
3811            }
3812        }
3813
3814        Ok(())
3815    }
3816
3817    /// Execute label removal.
3818    pub(crate) fn execute_remove_labels(
3819        &self,
3820        variable: &str,
3821        labels: &[String],
3822        row: &mut HashMap<String, Value>,
3823        ctx: Option<&QueryContext>,
3824    ) -> Result<()> {
3825        if let Some(node_val) = row.get(variable)
3826            && let Ok(vid) = Self::vid_from_value(node_val)
3827        {
3828            reject_if_ephemeral_vid(vid)?;
3829            let registry = self
3830                .procedure_registry
3831                .as_ref()
3832                .and_then(|pr| pr.plugin_registry());
3833            reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3834
3835            // Get current labels from node value
3836            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3837
3838            // Determine which labels to actually remove (only those currently present)
3839            let labels_to_remove: Vec<_> = labels
3840                .iter()
3841                .filter(|l| current_labels.contains(l))
3842                .collect();
3843
3844            if !labels_to_remove.is_empty() {
3845                // Resolve the FULL remaining label set and write it to the
3846                // TRANSACTION buffer (transactional + OCC-conflictable), falling
3847                // back to the context (main) L0 for non-transactional callers.
3848                let remaining_labels: Vec<String> = current_labels
3849                    .iter()
3850                    .filter(|l| !labels_to_remove.contains(l))
3851                    .cloned()
3852                    .collect();
3853                if let Some(ctx) = ctx {
3854                    let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3855                    l0.write().set_vertex_labels(vid, &remaining_labels);
3856                }
3857
3858                // Update the node value in the row with the remaining labels.
3859                if let Some(Value::Map(obj)) = row.get_mut(variable) {
3860                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3861                    obj.insert("_labels".to_string(), Value::List(labels_list));
3862                }
3863            }
3864        }
3865        Ok(())
3866    }
3867
3868    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3869    /// by looking up the type from the L0 buffer's edge endpoints.
3870    fn resolve_edge_type_id_for_edge(
3871        &self,
3872        edge: &crate::types::Edge,
3873        writer: &Writer,
3874        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3875    ) -> Result<u32> {
3876        if !edge.edge_type.is_empty() {
3877            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3878        }
3879        // Edge type name is empty (e.g., from anonymous MATCH patterns).
3880        // Look up the edge type ID from the L0 buffer's edge endpoints.
3881        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3882            return Ok(etype);
3883        }
3884        Err(anyhow!(
3885            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3886            edge.eid
3887        ))
3888    }
3889
3890    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3891    pub(crate) async fn execute_delete_item_locked(
3892        &self,
3893        val: &Value,
3894        detach: bool,
3895        writer: &Writer,
3896        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3897    ) -> Result<()> {
3898        match val {
3899            Value::Null => {
3900                // DELETE null is a no-op per OpenCypher spec
3901            }
3902            Value::Path(path) => {
3903                // Delete path edges first, then nodes
3904                for edge in &path.edges {
3905                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3906                    writer
3907                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3908                        .await?;
3909                }
3910                for node in &path.nodes {
3911                    self.execute_delete_vertex(
3912                        node.vid,
3913                        detach,
3914                        Some(node.labels.clone()),
3915                        writer,
3916                        tx_l0,
3917                    )
3918                    .await?;
3919                }
3920            }
3921            _ => {
3922                // Try Path reconstruction from Map first (Arrow loses Path type)
3923                if let Ok(path) = Path::try_from(val) {
3924                    for edge in &path.edges {
3925                        let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3926                        writer
3927                            .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3928                            .await?;
3929                    }
3930                    for node in &path.nodes {
3931                        self.execute_delete_vertex(
3932                            node.vid,
3933                            detach,
3934                            Some(node.labels.clone()),
3935                            writer,
3936                            tx_l0,
3937                        )
3938                        .await?;
3939                    }
3940                } else if let Ok(vid) = Self::vid_from_value(val) {
3941                    let labels = Self::extract_labels_from_node(val);
3942                    self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3943                        .await?;
3944                } else if let Value::Map(map) = val {
3945                    self.execute_delete_edge_from_map(map, writer, tx_l0)
3946                        .await?;
3947                } else if let Value::Edge(edge) = val {
3948                    reject_if_ephemeral_eid(edge.eid)?;
3949                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3950                    let registry = self
3951                        .procedure_registry
3952                        .as_ref()
3953                        .and_then(|pr| pr.plugin_registry());
3954                    reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3955                    writer
3956                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3957                        .await?;
3958                }
3959            }
3960        }
3961        Ok(())
3962    }
3963
3964    /// Execute vertex deletion with optional detach.
3965    pub(crate) async fn execute_delete_vertex(
3966        &self,
3967        vid: Vid,
3968        detach: bool,
3969        labels: Option<Vec<String>>,
3970        writer: &Writer,
3971        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3972    ) -> Result<()> {
3973        reject_if_ephemeral_vid(vid)?;
3974        if let Some(ls) = labels.as_deref() {
3975            let registry = self
3976                .procedure_registry
3977                .as_ref()
3978                .and_then(|pr| pr.plugin_registry());
3979            reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3980        }
3981        if detach {
3982            self.detach_delete_vertex(vid, writer, tx_l0).await?;
3983        } else {
3984            self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3985        }
3986        writer.delete_vertex(vid, labels, tx_l0).await?;
3987        Ok(())
3988    }
3989
3990    /// Check that a vertex has no edges (required for non-DETACH DELETE).
3991    ///
3992    /// Loads the subgraph from storage, then excludes edges that have been
3993    /// tombstoned in the writer's L0 or the transaction's L0. This ensures
3994    /// edges deleted earlier in the same DELETE clause are properly excluded.
3995    pub(crate) async fn check_vertex_has_no_edges(
3996        &self,
3997        vid: Vid,
3998        writer: &Writer,
3999        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4000    ) -> Result<()> {
4001        let schema = self.storage.schema_manager().schema();
4002        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
4003
4004        // Collect tombstoned edge IDs from both the writer L0 and tx L0.
4005        let mut tombstoned_eids = std::collections::HashSet::new();
4006        {
4007            let writer_l0 = writer.l0_manager.get_current();
4008            let guard = writer_l0.read();
4009            for &eid in guard.tombstones.keys() {
4010                tombstoned_eids.insert(eid);
4011            }
4012        }
4013        if let Some(tx) = tx_l0 {
4014            let guard = tx.read();
4015            for &eid in guard.tombstones.keys() {
4016                tombstoned_eids.insert(eid);
4017            }
4018        }
4019
4020        let out_graph = self
4021            .storage
4022            .load_subgraph_cached(
4023                &[vid],
4024                &edge_type_ids,
4025                1,
4026                uni_store::runtime::Direction::Outgoing,
4027                Some(writer.l0_manager.get_current()),
4028            )
4029            .await?;
4030        let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4031
4032        let in_graph = self
4033            .storage
4034            .load_subgraph_cached(
4035                &[vid],
4036                &edge_type_ids,
4037                1,
4038                uni_store::runtime::Direction::Incoming,
4039                Some(writer.l0_manager.get_current()),
4040            )
4041            .await?;
4042        let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4043
4044        if has_out || has_in {
4045            return Err(anyhow!(
4046                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
4047                vid
4048            ));
4049        }
4050        Ok(())
4051    }
4052
4053    /// Execute edge deletion from a map representation.
4054    pub(crate) async fn execute_delete_edge_from_map(
4055        &self,
4056        map: &HashMap<String, Value>,
4057        writer: &Writer,
4058        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4059    ) -> Result<()> {
4060        // Check for non-null _eid to skip OPTIONAL MATCH null edges
4061        if map.get("_eid").is_some_and(|v| !v.is_null()) {
4062            let ei = self.extract_edge_identity(map)?;
4063            reject_if_ephemeral_eid(ei.eid)?;
4064            let registry = self
4065                .procedure_registry
4066                .as_ref()
4067                .and_then(|pr| pr.plugin_registry());
4068            reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
4069            writer
4070                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
4071                .await?;
4072        }
4073        Ok(())
4074    }
4075
4076    /// Build a scan plan node.
4077    ///
4078    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
4079    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
4080    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
4081    fn make_scan_plan(
4082        label_id: u16,
4083        labels: Vec<String>,
4084        variable: String,
4085        filter: Option<Expr>,
4086    ) -> LogicalPlan {
4087        if label_id > 0 {
4088            LogicalPlan::Scan {
4089                label_id,
4090                labels,
4091                variable,
4092                filter,
4093                optional: false,
4094            }
4095        } else if !labels.is_empty() {
4096            // Schemaless label: use ScanMainByLabels to filter by label name
4097            LogicalPlan::ScanMainByLabels {
4098                labels,
4099                variable,
4100                filter,
4101                optional: false,
4102            }
4103        } else {
4104            LogicalPlan::ScanAll {
4105                variable,
4106                filter,
4107                optional: false,
4108            }
4109        }
4110    }
4111
4112    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
4113    /// already contains prior operators.
4114    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
4115        if matches!(plan, LogicalPlan::Empty) {
4116            scan
4117        } else {
4118            LogicalPlan::CrossJoin {
4119                left: Box::new(plan),
4120                right: Box::new(scan),
4121            }
4122        }
4123    }
4124
4125    /// Resolve MERGE property map expressions against the current row context.
4126    ///
4127    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
4128    /// property expressions that reference bound variables. These need to be
4129    /// evaluated to concrete literal values before being converted to filter
4130    /// expressions by `properties_to_expr()`.
4131    async fn resolve_merge_properties(
4132        &self,
4133        properties: &Option<Expr>,
4134        row: &HashMap<String, Value>,
4135        prop_manager: &PropertyManager,
4136        params: &HashMap<String, Value>,
4137        ctx: Option<&QueryContext>,
4138    ) -> Result<Option<Expr>> {
4139        let entries = match properties {
4140            Some(Expr::Map(entries)) => entries,
4141            other => return Ok(other.clone()),
4142        };
4143        let mut resolved = Vec::new();
4144        for (key, val_expr) in entries {
4145            if matches!(val_expr, Expr::Literal(_)) {
4146                resolved.push((key.clone(), val_expr.clone()));
4147            } else {
4148                let value = self
4149                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
4150                    .await?;
4151                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
4152            }
4153        }
4154        Ok(Some(Expr::Map(resolved)))
4155    }
4156
4157    /// Convert a runtime Value back to an AST literal expression.
4158    fn value_to_literal_expr(value: &Value) -> Expr {
4159        match value {
4160            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
4161            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
4162            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
4163            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
4164            Value::Null => Expr::Literal(CypherLiteral::Null),
4165            Value::List(items) => {
4166                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
4167            }
4168            Value::Map(entries) => Expr::Map(
4169                entries
4170                    .iter()
4171                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
4172                    .collect(),
4173            ),
4174            _ => Expr::Literal(CypherLiteral::Null),
4175        }
4176    }
4177
4178    pub(crate) async fn execute_merge_match(
4179        &self,
4180        pattern: &Pattern,
4181        row: &HashMap<String, Value>,
4182        prop_manager: &PropertyManager,
4183        params: &HashMap<String, Value>,
4184        ctx: Option<&QueryContext>,
4185    ) -> Result<Vec<HashMap<String, Value>>> {
4186        // Construct a LogicalPlan for the MATCH part of MERGE
4187        let planner =
4188            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
4189
4190        // We need to construct a CypherQuery to use the planner's plan() method,
4191        // or we can manually construct the LogicalPlan.
4192        // Manual construction is safer as we don't have to round-trip through AST.
4193
4194        let mut plan = LogicalPlan::Empty;
4195        let mut vars_in_scope = Vec::new();
4196
4197        // Add existing bound variables from row to scope
4198        for key in row.keys() {
4199            vars_in_scope.push(key.clone());
4200        }
4201
4202        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
4203        for path in &pattern.paths {
4204            let elements = &path.elements;
4205            let mut i = 0;
4206            while i < elements.len() {
4207                let part = &elements[i];
4208                match part {
4209                    PatternElement::Node(n) => {
4210                        let variable = n.variable.clone().unwrap_or_default();
4211
4212                        // If variable is already bound in the input row, we filter
4213                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
4214
4215                        if is_bound {
4216                            // If bound, we must Scan this specific VID to start the chain
4217                            // Extract VID from row
4218                            let val = row.get(&variable).unwrap();
4219                            let vid = Self::vid_from_value(val)?;
4220
4221                            // In the new storage model, VIDs don't embed label info.
4222                            // We get label from the node value if available, otherwise use 0 to scan all.
4223                            let extracted_labels =
4224                                Self::extract_labels_from_node(val).unwrap_or_default();
4225                            let label_id = {
4226                                let schema = self.storage.schema_manager().schema();
4227                                extracted_labels
4228                                    .first()
4229                                    .and_then(|l| schema.label_id_by_name(l))
4230                                    .unwrap_or(0)
4231                            };
4232
4233                            let resolved_props = self
4234                                .resolve_merge_properties(
4235                                    &n.properties,
4236                                    row,
4237                                    prop_manager,
4238                                    params,
4239                                    ctx,
4240                                )
4241                                .await?;
4242                            let prop_filter =
4243                                planner.properties_to_expr(&variable, &resolved_props);
4244
4245                            // Create a filter expression for VID: variable._vid = vid
4246                            // But our expression engine handles `Expr::Variable` as column.
4247                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
4248                            // Or we use internal property `_vid`.
4249
4250                            // Note: Scan supports `filter`.
4251                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4252
4253                            let vid_filter = Expr::BinaryOp {
4254                                left: Box::new(Expr::Property(
4255                                    Box::new(Expr::Variable(variable.clone())),
4256                                    "_vid".to_string(),
4257                                )),
4258                                op: BinaryOp::Eq,
4259                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
4260                                    vid.as_u64() as i64,
4261                                ))),
4262                            };
4263
4264                            let combined_filter = if let Some(pf) = prop_filter {
4265                                Some(Expr::BinaryOp {
4266                                    left: Box::new(vid_filter),
4267                                    op: BinaryOp::And,
4268                                    right: Box::new(pf),
4269                                })
4270                            } else {
4271                                Some(vid_filter)
4272                            };
4273
4274                            let scan = Self::make_scan_plan(
4275                                label_id,
4276                                extracted_labels,
4277                                variable.clone(),
4278                                combined_filter,
4279                            );
4280                            plan = Self::attach_scan(plan, scan);
4281                        } else {
4282                            let label_id = if n.labels.is_empty() {
4283                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4284                                0
4285                            } else {
4286                                let label_name = &n.labels[0];
4287                                let schema = self.storage.schema_manager().schema();
4288                                if self.config.strict_schema {
4289                                    schema
4290                                        .get_label_case_insensitive(label_name)
4291                                        .map(|m| m.id)
4292                                        .ok_or_else(|| {
4293                                            anyhow!(
4294                                                "Label '{}' is not defined in the schema \
4295                                                 (strict_schema is enabled). \
4296                                                 Declare it with db.schema().label(...).apply() first.",
4297                                                label_name
4298                                            )
4299                                        })?
4300                                } else {
4301                                    // Fall back to label_id 0 (any/schemaless) when not in schema.
4302                                    schema
4303                                        .get_label_case_insensitive(label_name)
4304                                        .map(|m| m.id)
4305                                        .unwrap_or(0)
4306                                }
4307                            };
4308
4309                            let resolved_props = self
4310                                .resolve_merge_properties(
4311                                    &n.properties,
4312                                    row,
4313                                    prop_manager,
4314                                    params,
4315                                    ctx,
4316                                )
4317                                .await?;
4318                            let prop_filter =
4319                                planner.properties_to_expr(&variable, &resolved_props);
4320                            let scan = Self::make_scan_plan(
4321                                label_id,
4322                                n.labels.names().to_vec(),
4323                                variable.clone(),
4324                                prop_filter,
4325                            );
4326                            plan = Self::attach_scan(plan, scan);
4327
4328                            // Add label filters when:
4329                            // 1. Multiple labels with a known schema label: filter for
4330                            //    additional labels (Scan only scans by the first label).
4331                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4332                            //    nodes, so we must filter to only those with the
4333                            //    specified label(s).
4334                            if !n.labels.is_empty()
4335                                && !variable.is_empty()
4336                                && (label_id == 0 || n.labels.len() > 1)
4337                                && let Some(label_filter) =
4338                                    planner.node_filter_expr(&variable, &n.labels, &None)
4339                            {
4340                                plan = LogicalPlan::Filter {
4341                                    input: Box::new(plan),
4342                                    predicate: label_filter,
4343                                    optional_variables: std::collections::HashSet::new(),
4344                                };
4345                            }
4346
4347                            if !variable.is_empty() {
4348                                vars_in_scope.push(variable.clone());
4349                            }
4350                        }
4351
4352                        // Now look ahead for relationship
4353                        i += 1;
4354                        while i < elements.len() {
4355                            if let PatternElement::Relationship(r) = &elements[i] {
4356                                let target_node_part = &elements[i + 1];
4357                                if let PatternElement::Node(n_target) = target_node_part {
4358                                    let schema = self.storage.schema_manager().schema();
4359                                    let mut edge_type_ids = Vec::new();
4360
4361                                    if r.types.is_empty() {
4362                                        return Err(anyhow!("MERGE edge must have a type"));
4363                                    } else if r.types.len() > 1 {
4364                                        return Err(anyhow!(
4365                                            "MERGE does not support multiple edge types"
4366                                        ));
4367                                    } else {
4368                                        let type_name = &r.types[0];
4369                                        let type_id = if self.config.strict_schema {
4370                                            let s = self.storage.schema_manager().schema();
4371                                            s.edge_type_id_by_name_case_insensitive(type_name)
4372                                                .ok_or_else(|| {
4373                                                    anyhow!(
4374                                                        "Edge type '{}' is not defined in the schema \
4375                                                         (strict_schema is enabled).",
4376                                                        type_name
4377                                                    )
4378                                                })?
4379                                        } else {
4380                                            // Schemaless: assign new ID if not found.
4381                                            self.storage
4382                                                .schema_manager()
4383                                                .get_or_assign_edge_type_id(type_name)
4384                                        };
4385                                        edge_type_ids.push(type_id);
4386                                    }
4387
4388                                    // Resolve target label ID. For schemaless labels (not in the
4389                                    // schema), fall back to 0 which means "any label" in traversal.
4390                                    let target_label_id: u16 = if let Some(lbl) =
4391                                        n_target.labels.first()
4392                                    {
4393                                        schema
4394                                            .get_label_case_insensitive(lbl)
4395                                            .map(|m| m.id)
4396                                            .unwrap_or(0)
4397                                    } else if let Some(var) = &n_target.variable {
4398                                        if let Some(val) = row.get(var) {
4399                                            // In the new storage model, get labels from node value
4400                                            if let Some(labels) =
4401                                                Self::extract_labels_from_node(val)
4402                                            {
4403                                                if let Some(first_label) = labels.first() {
4404                                                    schema
4405                                                        .get_label_case_insensitive(first_label)
4406                                                        .map(|m| m.id)
4407                                                        .unwrap_or(0)
4408                                                } else {
4409                                                    // Bound node with no labels — schemaless, any
4410                                                    0
4411                                                }
4412                                            } else if Self::vid_from_value(val).is_ok() {
4413                                                // VID without label info — schemaless, any
4414                                                0
4415                                            } else {
4416                                                return Err(anyhow!(
4417                                                    "Variable {} is not a node",
4418                                                    var
4419                                                ));
4420                                            }
4421                                        } else {
4422                                            return Err(anyhow!(
4423                                                "MERGE pattern node must have a label or be a bound variable"
4424                                            ));
4425                                        }
4426                                    } else {
4427                                        return Err(anyhow!(
4428                                            "MERGE pattern node must have a label"
4429                                        ));
4430                                    };
4431
4432                                    let target_variable =
4433                                        n_target.variable.clone().unwrap_or_default();
4434                                    let source_variable = match &elements[i - 1] {
4435                                        PatternElement::Node(n) => {
4436                                            n.variable.clone().unwrap_or_default()
4437                                        }
4438                                        _ => String::new(),
4439                                    };
4440
4441                                    let is_variable_length = r.range.is_some();
4442                                    let type_name = &r.types[0];
4443
4444                                    // Use TraverseMainByType for schemaless edge types
4445                                    // (same as MATCH planner) so edge properties are loaded
4446                                    // correctly from storage + L0 via the adjacency map.
4447                                    // Regular Traverse only loads properties via
4448                                    // property_manager which doesn't handle schemaless types.
4449                                    let is_schemaless = edge_type_ids.iter().all(|id| {
4450                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
4451                                    });
4452
4453                                    if is_schemaless {
4454                                        plan = LogicalPlan::TraverseMainByType {
4455                                            type_names: vec![type_name.clone()],
4456                                            input: Box::new(plan),
4457                                            direction: r.direction.clone(),
4458                                            source_variable,
4459                                            target_variable: target_variable.clone(),
4460                                            step_variable: r.variable.clone(),
4461                                            min_hops: r
4462                                                .range
4463                                                .as_ref()
4464                                                .and_then(|r| r.min)
4465                                                .unwrap_or(1)
4466                                                as usize,
4467                                            max_hops: r
4468                                                .range
4469                                                .as_ref()
4470                                                .and_then(|r| r.max)
4471                                                .unwrap_or(1)
4472                                                as usize,
4473                                            optional: false,
4474                                            target_filter: None,
4475                                            path_variable: None,
4476                                            is_variable_length,
4477                                            optional_pattern_vars: std::collections::HashSet::new(),
4478                                            scope_match_variables: std::collections::HashSet::new(),
4479                                            edge_filter_expr: None,
4480                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4481                                        };
4482                                    } else {
4483                                        // Collect edge property names needed for MERGE filter
4484                                        let mut edge_props = std::collections::HashSet::new();
4485                                        if let Some(Expr::Map(entries)) = &r.properties {
4486                                            for (key, _) in entries {
4487                                                edge_props.insert(key.clone());
4488                                            }
4489                                        }
4490                                        plan = LogicalPlan::Traverse {
4491                                            input: Box::new(plan),
4492                                            edge_type_ids: edge_type_ids.clone(),
4493                                            direction: r.direction.clone(),
4494                                            source_variable,
4495                                            target_variable: target_variable.clone(),
4496                                            target_label_id,
4497                                            step_variable: r.variable.clone(),
4498                                            min_hops: r
4499                                                .range
4500                                                .as_ref()
4501                                                .and_then(|r| r.min)
4502                                                .unwrap_or(1)
4503                                                as usize,
4504                                            max_hops: r
4505                                                .range
4506                                                .as_ref()
4507                                                .and_then(|r| r.max)
4508                                                .unwrap_or(1)
4509                                                as usize,
4510                                            optional: false,
4511                                            target_filter: None,
4512                                            path_variable: None,
4513                                            edge_properties: edge_props,
4514                                            is_variable_length,
4515                                            optional_pattern_vars: std::collections::HashSet::new(),
4516                                            scope_match_variables: std::collections::HashSet::new(),
4517                                            edge_filter_expr: None,
4518                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4519                                            qpp_steps: None,
4520                                        };
4521                                    }
4522
4523                                    // Apply property filters for relationship
4524                                    if r.properties.is_some()
4525                                        && let Some(r_var) = &r.variable
4526                                    {
4527                                        let resolved_rel_props = self
4528                                            .resolve_merge_properties(
4529                                                &r.properties,
4530                                                row,
4531                                                prop_manager,
4532                                                params,
4533                                                ctx,
4534                                            )
4535                                            .await?;
4536                                        if let Some(prop_filter) =
4537                                            planner.properties_to_expr(r_var, &resolved_rel_props)
4538                                        {
4539                                            plan = LogicalPlan::Filter {
4540                                                input: Box::new(plan),
4541                                                predicate: prop_filter,
4542                                                optional_variables: std::collections::HashSet::new(
4543                                                ),
4544                                            };
4545                                        }
4546                                    }
4547
4548                                    // Apply property filters for target node if it was new
4549                                    if !target_variable.is_empty() {
4550                                        let resolved_target_props = self
4551                                            .resolve_merge_properties(
4552                                                &n_target.properties,
4553                                                row,
4554                                                prop_manager,
4555                                                params,
4556                                                ctx,
4557                                            )
4558                                            .await?;
4559                                        if let Some(prop_filter) = planner.properties_to_expr(
4560                                            &target_variable,
4561                                            &resolved_target_props,
4562                                        ) {
4563                                            plan = LogicalPlan::Filter {
4564                                                input: Box::new(plan),
4565                                                predicate: prop_filter,
4566                                                optional_variables: std::collections::HashSet::new(
4567                                                ),
4568                                            };
4569                                        }
4570                                        vars_in_scope.push(target_variable.clone());
4571                                    }
4572
4573                                    if let Some(sv) = &r.variable {
4574                                        vars_in_scope.push(sv.clone());
4575                                    }
4576                                    i += 2;
4577                                } else {
4578                                    break;
4579                                }
4580                            } else {
4581                                break;
4582                            }
4583                        }
4584                    }
4585                    _ => return Err(anyhow!("Pattern must start with a node")),
4586                }
4587            }
4588
4589            // Execute the plan to find all matches, then filter against bound variables in `row`.
4590        }
4591
4592        let db_matches = self
4593            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4594            .await?;
4595
4596        // Keep only DB results that are consistent with the input row bindings.
4597        // Skip internal keys (starting with "__") as they are implementation
4598        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4599        // Also skip the empty-string key (""), which is the placeholder variable
4600        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4601        // and must not constrain the current pattern's match.
4602        let final_matches = db_matches
4603            .into_iter()
4604            .filter(|db_match| {
4605                row.iter().all(|(key, val)| {
4606                    if key.is_empty() || key.starts_with("__") {
4607                        return true;
4608                    }
4609                    let Some(db_val) = db_match.get(key) else {
4610                        return true;
4611                    };
4612                    if db_val == val {
4613                        return true;
4614                    }
4615                    // Values differ -- treat as consistent if they represent the same VID
4616                    matches!(
4617                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4618                        (Ok(v1), Ok(v2)) if v1 == v2
4619                    )
4620                })
4621            })
4622            .map(|db_match| {
4623                let mut merged = row.clone();
4624                merged.extend(db_match);
4625                merged
4626            })
4627            .collect();
4628
4629        Ok(final_matches)
4630    }
4631
4632    /// Prepare a MERGE pattern for path variable binding.
4633    ///
4634    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4635    /// unnamed relationships need internal variable names so that `execute_create_pattern`
4636    /// stores the edge data in the row for later path construction.
4637    ///
4638    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4639    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4640        let has_path_vars = pattern
4641            .paths
4642            .iter()
4643            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4644
4645        if !has_path_vars {
4646            return (pattern.clone(), Vec::new());
4647        }
4648
4649        let mut modified = pattern.clone();
4650        let mut temp_vars = Vec::new();
4651
4652        for path in &mut modified.paths {
4653            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4654                continue;
4655            }
4656            for (idx, element) in path.elements.iter_mut().enumerate() {
4657                if let PatternElement::Relationship(r) = element
4658                    && r.variable.as_ref().is_none_or(String::is_empty)
4659                {
4660                    let temp_var = format!("__path_r_{}", idx);
4661                    r.variable = Some(temp_var.clone());
4662                    temp_vars.push(temp_var);
4663                }
4664            }
4665        }
4666
4667        (modified, temp_vars)
4668    }
4669
4670    /// Bind path variables in the result row based on the MERGE pattern.
4671    ///
4672    /// Walks each path in the pattern, collects node/edge values from the row
4673    /// by variable name, and constructs a `Value::Path`.
4674    fn bind_path_variables(
4675        pattern: &Pattern,
4676        row: &mut HashMap<String, Value>,
4677        temp_vars: &[String],
4678    ) {
4679        for path in &pattern.paths {
4680            let Some(path_var) = path.variable.as_ref() else {
4681                continue;
4682            };
4683            if path_var.is_empty() {
4684                continue;
4685            }
4686
4687            let mut nodes = Vec::new();
4688            let mut edges = Vec::new();
4689
4690            for element in &path.elements {
4691                match element {
4692                    PatternElement::Node(n) => {
4693                        if let Some(var) = &n.variable
4694                            && let Some(val) = row.get(var)
4695                            && let Some(node) = Self::value_to_node_for_path(val)
4696                        {
4697                            nodes.push(node);
4698                        }
4699                    }
4700                    PatternElement::Relationship(r) => {
4701                        if let Some(var) = &r.variable
4702                            && let Some(val) = row.get(var)
4703                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4704                        {
4705                            edges.push(edge);
4706                        }
4707                    }
4708                    _ => {}
4709                }
4710            }
4711
4712            if !nodes.is_empty() {
4713                use uni_common::value::Path;
4714                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4715            }
4716        }
4717
4718        // Clean up internal temp variables
4719        for var in temp_vars {
4720            row.remove(var);
4721        }
4722    }
4723
4724    /// Convert a Value (Map or Node) to a Node for path construction.
4725    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4726        match val {
4727            Value::Node(n) => Some(n.clone()),
4728            Value::Map(map) => {
4729                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4730                let labels = if let Some(Value::List(l)) = map.get("_labels") {
4731                    l.iter()
4732                        .filter_map(|v| {
4733                            if let Value::String(s) = v {
4734                                Some(s.clone())
4735                            } else {
4736                                None
4737                            }
4738                        })
4739                        .collect()
4740                } else {
4741                    vec![]
4742                };
4743                let properties: HashMap<String, Value> = map
4744                    .iter()
4745                    .filter(|(k, _)| !k.starts_with('_'))
4746                    .map(|(k, v)| (k.clone(), v.clone()))
4747                    .collect();
4748                Some(uni_common::value::Node {
4749                    vid,
4750                    labels,
4751                    properties,
4752                })
4753            }
4754            _ => None,
4755        }
4756    }
4757
4758    /// Convert a Value (Map or Edge) to an Edge for path construction.
4759    fn value_to_edge_for_path(
4760        val: &Value,
4761        type_names: &[String],
4762    ) -> Option<uni_common::value::Edge> {
4763        match val {
4764            Value::Edge(e) => Some(e.clone()),
4765            Value::Map(map) => {
4766                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4767                let edge_type = map
4768                    .get("_type_name")
4769                    .and_then(|v| {
4770                        if let Value::String(s) = v {
4771                            Some(s.clone())
4772                        } else {
4773                            None
4774                        }
4775                    })
4776                    .or_else(|| type_names.first().cloned())
4777                    .unwrap_or_default();
4778                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4779                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4780                let properties: HashMap<String, Value> = map
4781                    .iter()
4782                    .filter(|(k, _)| !k.starts_with('_'))
4783                    .map(|(k, v)| (k.clone(), v.clone()))
4784                    .collect();
4785                Some(uni_common::value::Edge {
4786                    eid,
4787                    edge_type,
4788                    src,
4789                    dst,
4790                    properties,
4791                })
4792            }
4793            _ => None,
4794        }
4795    }
4796}
4797
4798/// Read a vertex's full property map, preferring `prefetched` over a fresh
4799/// per-row `Backend::scan`.
4800///
4801/// `prefetched` is built once at the top of `apply_mutations` via
4802/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4803/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4804/// same `apply_mutations` invocation (counter increments, same-VID
4805/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4806/// storage state at SET entry. On a miss, fall back to the existing
4807/// per-row path; this preserves correctness for newly created VIDs,
4808/// schemaless rows, multi-label corner cases, and non-Mutation callers
4809/// that pass `&Prefetch::default()`.
4810pub(crate) async fn read_vertex_props_with_prefetch(
4811    vid: Vid,
4812    prefetched: &Prefetch,
4813    prop_manager: &PropertyManager,
4814    ctx: Option<&QueryContext>,
4815) -> Result<uni_common::Properties> {
4816    match prefetched.vertex.get(&vid).cloned() {
4817        Some(mut base) => {
4818            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4819                for (k, v) in l0 {
4820                    base.insert(k, v);
4821                }
4822            }
4823            Ok(base)
4824        }
4825        None => Ok(prop_manager
4826            .get_all_vertex_props_with_ctx(vid, ctx)
4827            .await?
4828            .unwrap_or_default()),
4829    }
4830}
4831
4832/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4833/// in L0 edge props so writes from earlier rows of the same
4834/// `apply_mutations` invocation take precedence. On a miss, fall back to
4835/// the per-EID storage path.
4836pub(crate) async fn read_edge_props_with_prefetch(
4837    eid: Eid,
4838    prefetched: &Prefetch,
4839    prop_manager: &PropertyManager,
4840    ctx: Option<&QueryContext>,
4841) -> Result<uni_common::Properties> {
4842    match prefetched.edge.get(&eid).cloned() {
4843        Some(mut base) => {
4844            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4845                for (k, v) in l0 {
4846                    base.insert(k, v);
4847                }
4848            }
4849            Ok(base)
4850        }
4851        None => Ok(prop_manager
4852            .get_all_edge_props_with_ctx(eid, ctx)
4853            .await?
4854            .unwrap_or_default()),
4855    }
4856}
4857
4858#[cfg(test)]
4859mod tests {
4860    use super::*;
4861
4862    // ── merge_props tests ────────────────────────────────────────────
4863
4864    #[test]
4865    fn test_merge_props_replace_tombstones_missing_keys() {
4866        let current: HashMap<String, Value> = [
4867            ("name".into(), Value::String("Alice".into())),
4868            ("age".into(), Value::Int(30)),
4869        ]
4870        .into();
4871        let incoming: HashMap<String, Value> =
4872            [("name".into(), Value::String("Bob".into()))].into();
4873
4874        let result = Executor::merge_props(current, incoming, true);
4875        assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4876        assert_eq!(
4877            result.get("age"),
4878            Some(&Value::Null),
4879            "Missing keys should be tombstoned in replace mode"
4880        );
4881    }
4882
4883    #[test]
4884    fn test_merge_props_merge_preserves_existing() {
4885        let current: HashMap<String, Value> = [
4886            ("name".into(), Value::String("Alice".into())),
4887            ("age".into(), Value::Int(30)),
4888        ]
4889        .into();
4890        let incoming: HashMap<String, Value> =
4891            [("city".into(), Value::String("NYC".into()))].into();
4892
4893        let result = Executor::merge_props(current, incoming, false);
4894        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4895        assert_eq!(result.get("age"), Some(&Value::Int(30)));
4896        assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4897    }
4898
4899    #[test]
4900    fn test_merge_props_null_incoming_is_tombstone() {
4901        let current: HashMap<String, Value> =
4902            [("name".into(), Value::String("Alice".into()))].into();
4903        let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4904
4905        // Merge mode: null overwrites
4906        let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4907        assert_eq!(result.get("name"), Some(&Value::Null));
4908
4909        // Replace mode: null is tombstone
4910        let result = Executor::merge_props(current, incoming, true);
4911        assert_eq!(result.get("name"), Some(&Value::Null));
4912    }
4913
4914    #[test]
4915    fn test_merge_props_empty_current() {
4916        let current: HashMap<String, Value> = HashMap::new();
4917        let incoming: HashMap<String, Value> =
4918            [("name".into(), Value::String("Alice".into()))].into();
4919
4920        let result = Executor::merge_props(current, incoming, false);
4921        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4922        assert_eq!(result.len(), 1);
4923    }
4924
4925    #[test]
4926    fn test_merge_props_empty_incoming_replace_tombstones_all() {
4927        let current: HashMap<String, Value> = [
4928            ("name".into(), Value::String("Alice".into())),
4929            ("age".into(), Value::Int(30)),
4930        ]
4931        .into();
4932        let incoming: HashMap<String, Value> = HashMap::new();
4933
4934        let result = Executor::merge_props(current, incoming, true);
4935        assert_eq!(result.get("name"), Some(&Value::Null));
4936        assert_eq!(result.get("age"), Some(&Value::Null));
4937    }
4938
4939    // ── extract_labels_from_node tests ───────────────────────────────
4940
4941    #[test]
4942    fn test_extract_labels_from_map() {
4943        let mut map = HashMap::new();
4944        map.insert("_vid".into(), Value::Int(1));
4945        map.insert(
4946            "_labels".into(),
4947            Value::List(vec![
4948                Value::String("Person".into()),
4949                Value::String("Employee".into()),
4950            ]),
4951        );
4952        let val = Value::Map(map);
4953
4954        let labels = Executor::extract_labels_from_node(&val);
4955        assert_eq!(
4956            labels,
4957            Some(vec!["Person".to_string(), "Employee".to_string()])
4958        );
4959    }
4960
4961    #[test]
4962    fn test_extract_labels_from_value_node() {
4963        let node = uni_common::Node {
4964            vid: uni_common::core::id::Vid::from(1u64),
4965            labels: vec!["Person".to_string()],
4966            properties: HashMap::new(),
4967        };
4968        let labels = Executor::extract_labels_from_node(&Value::Node(node));
4969        assert_eq!(labels, Some(vec!["Person".to_string()]));
4970    }
4971
4972    #[test]
4973    fn test_extract_labels_non_node_returns_none() {
4974        assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4975        assert_eq!(
4976            Executor::extract_labels_from_node(&Value::String("hello".into())),
4977            None
4978        );
4979    }
4980
4981    // ── extract_user_properties_from_value tests ─────────────────────
4982
4983    #[test]
4984    fn test_extract_user_props_strips_internal_keys() {
4985        let mut map = HashMap::new();
4986        map.insert("_vid".into(), Value::Int(1));
4987        map.insert(
4988            "_labels".into(),
4989            Value::List(vec![Value::String("Person".into())]),
4990        );
4991        map.insert("name".into(), Value::String("Alice".into()));
4992        map.insert("age".into(), Value::Int(30));
4993
4994        let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4995        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4996        assert_eq!(props.get("age"), Some(&Value::Int(30)));
4997        assert!(!props.contains_key("_vid"));
4998        assert!(!props.contains_key("_labels"));
4999    }
5000
5001    #[test]
5002    fn test_extract_user_props_plain_map_returns_as_is() {
5003        let mut map = HashMap::new();
5004        map.insert("key".into(), Value::String("value".into()));
5005
5006        let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
5007        assert_eq!(props, map);
5008    }
5009
5010    #[test]
5011    fn test_extract_user_props_from_value_node() {
5012        let mut properties = HashMap::new();
5013        properties.insert("name".into(), Value::String("Alice".into()));
5014        let node = uni_common::Node {
5015            vid: uni_common::core::id::Vid::from(1u64),
5016            labels: vec!["Person".to_string()],
5017            properties,
5018        };
5019        let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
5020        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
5021    }
5022}