Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17    DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18    SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32    eid: Eid,
33    src: Vid,
34    dst: Vid,
35    edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44    vid: Vid,
45    labels: Vec<String>,
46    /// Full property map (storage union L0 from
47    /// `get_all_vertex_props_with_ctx` plus the touched values applied
48    /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49    /// tells the flush which columns to send to Lance via MergeInsert.
50    props: HashMap<String, Value>,
51    /// `true` when the SET should flush via the partial-column MergeInsert
52    /// path: set when `UniConfig::partial_lance_writes` is on AND the
53    /// label has no generated columns. Generated-column labels still need
54    /// the full-row Append so the regenerated values land.
55    partial: bool,
56    /// Set of property keys touched by this statement. Threaded into L0
57    /// so the flush emits a `MergeInsertBuilder` source with exactly
58    /// these columns. Empty when `partial == false`.
59    touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64    src: Vid,
65    dst: Vid,
66    edge_type_id: u32,
67    eid: Eid,
68    edge_type_name: String,
69    /// `true` when the SET should flush via the partial-column
70    /// MergeInsert path on the per-edge-type delta tables (Round 12
71    /// §A). Set when `UniConfig::partial_lance_writes` is on.
72    partial: bool,
73    /// Property keys touched by this statement. Threaded into L0 so
74    /// the flush emits a `MergeInsertBuilder` source with exactly
75    /// these columns. Empty when `partial == false`.
76    touched: HashSet<String>,
77    props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84    if vid.is_ephemeral() {
85        return Err(anyhow::Error::from(
86            uni_common::UniError::EphemeralWriteAttempt {
87                kind: "node",
88                id: vid.transient_id().unwrap_or(vid.as_u64()),
89            },
90        ));
91    }
92    Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97    match val {
98        Value::Null => "Null",
99        Value::Bool(_) => "Bool",
100        Value::Int(_) => "Int",
101        Value::Float(_) => "Float",
102        Value::String(_) => "String",
103        Value::Bytes(_) => "Bytes",
104        Value::List(_) => "List",
105        Value::Map(_) => "Map",
106        Value::Node(_) => "Node",
107        Value::Edge(_) => "Edge",
108        Value::Path(_) => "Path",
109        Value::Vector(_) => "Vector",
110        Value::Temporal(_) => "Temporal",
111        _ => "value",
112    }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117    if eid.is_ephemeral() {
118        return Err(anyhow::Error::from(
119            uni_common::UniError::EphemeralWriteAttempt {
120                kind: "edge",
121                id: eid.transient_id().unwrap_or(eid.as_u64()),
122            },
123        ));
124    }
125    Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150    labels: &[String],
151    op: &str,
152) -> Result<()> {
153    let Some(registry) = registry else {
154        return Ok(());
155    };
156    for label in labels {
157        if registry.virtual_label_by_name(label).is_some() {
158            return Err(anyhow!(
159                "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160                 labels are read-only; write back via the originating catalog instead"
161            ));
162        }
163    }
164    Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178    edge_type_id: u32,
179    op: &str,
180) -> Result<()> {
181    let Some(registry) = registry else {
182        return Ok(());
183    };
184    if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185        return Err(anyhow!(
186            "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187             types are read-only; write back via the originating catalog instead",
188            entry.name
189        ));
190    }
191    Ok(())
192}
193
194impl Executor {
195    /// Extracts labels from a node value.
196    ///
197    /// Handles both `Value::Map` (with a `_labels` list field) and
198    /// `Value::Node` (with a `labels` vec field).
199    ///
200    /// Returns `None` when the value is not a node or has no labels.
201    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202        match node_val {
203            Value::Map(map) => {
204                // Map-encoded node: look for _labels array
205                if let Some(Value::List(labels_arr)) = map.get("_labels") {
206                    let labels: Vec<String> = labels_arr
207                        .iter()
208                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
209                        .collect();
210                    if !labels.is_empty() {
211                        return Some(labels);
212                    }
213                }
214                None
215            }
216            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217            _ => None,
218        }
219    }
220
221    /// Extracts user-visible properties from a value that represents a node or edge.
222    ///
223    /// Strips internal bookkeeping keys (those prefixed with `_` or named
224    /// `ext_id`) from map-encoded entities and returns only the user-facing
225    /// property key-value pairs.
226    ///
227    /// Returns `None` when `val` is not a map, node, or edge.
228    pub(crate) fn extract_user_properties_from_value(
229        val: &Value,
230    ) -> Option<HashMap<String, Value>> {
231        match val {
232            Value::Map(map) => {
233                // Distinguish entity-encoded maps from plain map literals.
234                // A node map has both `_vid` and `_labels`.
235                // An edge map has `_eid`, `_src`, and `_dst`.
236                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237                let is_edge_map = map.contains_key("_eid")
238                    && map.contains_key("_src")
239                    && map.contains_key("_dst");
240
241                if is_node_map || is_edge_map {
242                    // Filter out internal bookkeeping keys
243                    let user_props: HashMap<String, Value> = map
244                        .iter()
245                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246                        .map(|(k, v)| (k.clone(), v.clone()))
247                        .collect();
248                    // When mutation output omits dotted property columns, user
249                    // properties live inside `_all_props` rather than at the
250                    // top level of the entity map.
251                    if user_props.is_empty()
252                        && let Some(Value::Map(all_props)) = map.get("_all_props")
253                    {
254                        return Some(all_props.clone());
255                    }
256                    Some(user_props)
257                } else {
258                    // Plain map literal — return as-is
259                    Some(map.clone())
260                }
261            }
262            Value::Node(node) => Some(node.properties.clone()),
263            Value::Edge(edge) => Some(edge.properties.clone()),
264            _ => None,
265        }
266    }
267
268    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269    ///
270    /// When `replace` is `true` the entity's property set is replaced: keys absent
271    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272    /// layer removes them.  When `replace` is `false` the map is merged: keys in
273    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275    /// modes.
276    ///
277    /// Labels are never altered — the spec states that `SET n = map` replaces
278    /// properties only.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the entity cannot be found in the storage layer, or
283    /// if the writer fails to persist the updated properties.
284    #[expect(clippy::too_many_arguments)]
285    async fn apply_properties_to_entity(
286        &self,
287        variable: &str,
288        new_props: HashMap<String, Value>,
289        replace: bool,
290        row: &mut HashMap<String, Value>,
291        writer: &Writer,
292        prop_manager: &PropertyManager,
293        params: &HashMap<String, Value>,
294        ctx: Option<&QueryContext>,
295        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296        prefetched: &Prefetch,
297    ) -> Result<()> {
298        // Clone the target so we can hold &row references elsewhere.
299        let target = row.get(variable).cloned();
300
301        // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302        // forms, mirroring the per-property SET path (issue #68).
303        let schema = self.storage.schema_manager().schema();
304
305        match target {
306            Some(Value::Node(ref node)) => {
307                let vid = node.vid;
308                let labels = node.labels.clone();
309                let current =
310                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311                let write_props = Self::merge_props(current, new_props, replace);
312                let mut enriched = write_props.clone();
313                for label_name in &labels {
314                    self.enrich_properties_with_generated_columns(
315                        label_name,
316                        &mut enriched,
317                        prop_manager,
318                        params,
319                        ctx,
320                    )
321                    .await?;
322                }
323                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324                let _ = writer
325                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326                    .await?;
327                // Update the in-memory row binding
328                if let Some(Value::Node(n)) = row.get_mut(variable) {
329                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330                }
331            }
332            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333                let vid = Self::vid_from_value(node_val)?;
334                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335                let current =
336                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337                let write_props = Self::merge_props(current, new_props, replace);
338                let mut enriched = write_props.clone();
339                for label_name in &labels {
340                    self.enrich_properties_with_generated_columns(
341                        label_name,
342                        &mut enriched,
343                        prop_manager,
344                        params,
345                        ctx,
346                    )
347                    .await?;
348                }
349                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350                let _ = writer
351                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352                    .await?;
353                // Update the in-memory map-encoded node binding
354                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355                    // Remove old user property keys, keep internal fields
356                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357                    // Build effective (non-null) properties
358                    let effective: HashMap<String, Value> =
359                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360                    for (k, v) in &effective {
361                        node_map.insert(k.clone(), v.clone());
362                    }
363                    // Replace _all_props to reflect the complete property set
364                    node_map.insert("_all_props".to_string(), Value::Map(effective));
365                }
366            }
367            Some(Value::Edge(ref edge)) => {
368                let eid = edge.eid;
369                let src = edge.src;
370                let dst = edge.dst;
371                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372                let current =
373                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374                let write_props = Self::merge_props(current, new_props, replace);
375                let write_props = Self::coerce_and_validate_props(
376                    write_props,
377                    &schema,
378                    std::slice::from_ref(&edge.edge_type),
379                )?;
380                writer
381                    .insert_edge(
382                        src,
383                        dst,
384                        etype,
385                        eid,
386                        write_props.clone(),
387                        Some(edge.edge_type.clone()),
388                        tx_l0,
389                    )
390                    .await?;
391                // Update the in-memory row binding
392                if let Some(Value::Edge(e)) = row.get_mut(variable) {
393                    e.properties = write_props
394                        .into_iter()
395                        .filter(|(_, v)| !v.is_null())
396                        .collect();
397                }
398            }
399            Some(Value::Map(ref map))
400                if map.contains_key("_eid")
401                    && map.contains_key("_src")
402                    && map.contains_key("_dst") =>
403            {
404                let ei = self.extract_edge_identity(map)?;
405                let current =
406                    read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407                let write_props = Self::merge_props(current, new_props, replace);
408                let edge_type_name = map
409                    .get("_type")
410                    .and_then(|v| v.as_str())
411                    .map(|s| s.to_string())
412                    .or_else(|| {
413                        self.storage
414                            .schema_manager()
415                            .edge_type_name_by_id_unified(ei.edge_type_id)
416                    });
417                let write_props = match &edge_type_name {
418                    Some(name) => Self::coerce_and_validate_props(
419                        write_props,
420                        &schema,
421                        std::slice::from_ref(name),
422                    )?,
423                    None => write_props,
424                };
425                writer
426                    .insert_edge(
427                        ei.src,
428                        ei.dst,
429                        ei.edge_type_id,
430                        ei.eid,
431                        write_props.clone(),
432                        edge_type_name,
433                        tx_l0,
434                    )
435                    .await?;
436                // Update the in-memory map-encoded edge binding
437                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438                    edge_map.retain(|k, _| k.starts_with('_'));
439                    let effective: HashMap<String, Value> = write_props
440                        .into_iter()
441                        .filter(|(_, v)| !v.is_null())
442                        .collect();
443                    for (k, v) in &effective {
444                        edge_map.insert(k.clone(), v.clone());
445                    }
446                    // Replace _all_props to reflect the complete property set
447                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
448                }
449            }
450            _ => {
451                // No matching entity — nothing to do (caller already guarded against Null)
452            }
453        }
454        Ok(())
455    }
456
457    /// Computes the property map to write given current storage state and the
458    /// incoming change map.
459    ///
460    /// When `replace` is `true`, keys present in `current` but absent from
461    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
462    /// `incoming` are always preserved as explicit tombstones.
463    ///
464    /// When `replace` is `false`, `current` is the base and `incoming` is
465    /// merged on top: each key in `incoming` overwrites or tombstones the
466    /// corresponding entry in `current`.
467    fn merge_props(
468        current: HashMap<String, Value>,
469        incoming: HashMap<String, Value>,
470        replace: bool,
471    ) -> HashMap<String, Value> {
472        if replace {
473            // Start from the non-null incoming entries only.
474            let mut result: HashMap<String, Value> = incoming
475                .iter()
476                .filter(|(_, v)| !v.is_null())
477                .map(|(k, v)| (k.clone(), v.clone()))
478                .collect();
479            // Tombstone every current key that is absent from incoming OR explicitly
480            // set to null in incoming (both mean "delete this property").
481            for k in current.keys() {
482                if incoming.get(k).is_none_or(|v| v.is_null()) {
483                    result.insert(k.clone(), Value::Null);
484                }
485            }
486            result
487        } else {
488            // Merge: start from current and apply incoming on top
489            let mut result = current;
490            result.extend(incoming);
491            result
492        }
493    }
494
495    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497        let eid = Eid::from(
498            map.get("_eid")
499                .and_then(|v| v.as_u64())
500                .ok_or_else(|| anyhow!("Invalid _eid"))?,
501        );
502        let src = Vid::from(
503            map.get("_src")
504                .and_then(|v| v.as_u64())
505                .ok_or_else(|| anyhow!("Invalid _src"))?,
506        );
507        let dst = Vid::from(
508            map.get("_dst")
509                .and_then(|v| v.as_u64())
510                .ok_or_else(|| anyhow!("Invalid _dst"))?,
511        );
512        let edge_type_id = self.resolve_edge_type_id(
513            map.get("_type")
514                .or_else(|| map.get("_type_name"))
515                .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516        )?;
517        Ok(EdgeIdentity {
518            eid,
519            src,
520            dst,
521            edge_type_id,
522        })
523    }
524
525    /// Resolve edge type ID from a Value, supporting both Int and String representations.
526    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527    ///
528    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530    /// where the edge type was just created and may not be in the read-only lookup yet.
531    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532        match type_val {
533            Value::Int(i) => Ok(*i as u32),
534            Value::String(name) => {
535                if self.config.strict_schema {
536                    let schema = self.storage.schema_manager().schema();
537                    schema
538                        .edge_type_id_by_name_case_insensitive(name)
539                        .ok_or_else(|| {
540                            anyhow!(
541                                "Edge type '{}' is not defined in the schema \
542                                 (strict_schema is enabled). \
543                                 Declare it with db.schema().edge_type(...).apply() first.",
544                                name
545                            )
546                        })
547                } else {
548                    // Schemaless: assign new ID if not found in schema or registry.
549                    Ok(self
550                        .storage
551                        .schema_manager()
552                        .get_or_assign_edge_type_id(name))
553                }
554            }
555            _ => Err(anyhow!(
556                "Invalid _type value: expected Int or String, got {:?}",
557                type_val
558            )),
559        }
560    }
561
562    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563        if let Some(writer_arc) = &self.writer {
564            // Flush first while holding the lock
565            {
566                let writer: &uni_store::Writer = writer_arc.as_ref();
567                writer.flush_to_l1(None).await?;
568            } // Drop lock before compacting to avoid blocking reads/writes
569
570            // Compaction can run without holding the writer lock
571            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572            let compaction_results = compactor.compact_all().await?;
573
574            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575            let am = self.storage.adjacency_manager();
576            let schema = self.storage.schema_manager().schema();
577            for info in compaction_results {
578                // Convert string direction to Direction enum
579                let direction = match info.direction.as_str() {
580                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
581                    "bwd" => uni_store::storage::direction::Direction::Incoming,
582                    _ => continue,
583                };
584
585                // Get edge_type_id
586                if let Some(edge_type_id) =
587                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588                {
589                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591                }
592            }
593        }
594        Ok(())
595    }
596
597    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598        if let Some(writer_arc) = &self.writer {
599            let writer: &uni_store::Writer = writer_arc.as_ref();
600            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601        }
602        Ok(())
603    }
604
605    pub(crate) async fn execute_copy_to(
606        &self,
607        identifier: &str,
608        path: &str,
609        format: &str,
610        options: &HashMap<String, Value>,
611    ) -> Result<usize> {
612        // Check schema to determine if identifier is an edge type or vertex label
613        let schema = self.storage.schema_manager().schema();
614
615        // Try as edge type first
616        if schema.get_edge_type_case_insensitive(identifier).is_some() {
617            return self
618                .export_edge_type_in_format(identifier, path, format)
619                .await;
620        }
621
622        // Try as vertex label
623        if schema.get_label_case_insensitive(identifier).is_some() {
624            return self
625                .export_vertex_label_in_format(identifier, path, format, options)
626                .await;
627        }
628
629        // Neither edge type nor vertex label found
630        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631    }
632
633    async fn export_vertex_label_in_format(
634        &self,
635        label: &str,
636        path: &str,
637        format: &str,
638        _options: &HashMap<String, Value>,
639    ) -> Result<usize> {
640        match format {
641            "parquet" => self.export_vertex_label(label, path).await,
642            "csv" => {
643                let mut stream = self
644                    .storage
645                    .scan_vertex_table_stream(label)
646                    .await?
647                    .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649                // Collect all batches
650                let mut all_rows = Vec::new();
651                let mut column_names = Vec::new();
652
653                // Iterate stream using StreamExt
654                use futures::StreamExt;
655                while let Some(batch_result) = stream.next().await {
656                    let batch = batch_result?;
657
658                    // Get column names from first batch
659                    if column_names.is_empty() {
660                        column_names = batch
661                            .schema()
662                            .fields()
663                            .iter()
664                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665                            .map(|f| f.name().clone())
666                            .collect();
667                    }
668
669                    // Convert batch to rows
670                    for row_idx in 0..batch.num_rows() {
671                        let mut row = Vec::new();
672                        for field in batch.schema().fields() {
673                            if field.name().starts_with('_') || field.name() == "ext_id" {
674                                continue;
675                            }
676
677                            let col_idx = batch.schema().index_of(field.name())?;
678                            let column = batch.column(col_idx);
679                            let value = self.arrow_value_to_json(column, row_idx)?;
680
681                            // Convert value to CSV string
682                            let csv_value = match value {
683                                Value::Null => String::new(),
684                                Value::Bool(b) => b.to_string(),
685                                Value::Int(i) => i.to_string(),
686                                Value::Float(f) => f.to_string(),
687                                Value::String(s) => s,
688                                _ => format!("{value}"),
689                            };
690                            row.push(csv_value);
691                        }
692                        all_rows.push(row);
693                    }
694                }
695
696                // Write CSV
697                let file = std::fs::File::create(path)?;
698                let mut wtr = csv::Writer::from_writer(file);
699
700                // Write headers
701                log::debug!("CSV export headers: {:?}", column_names);
702                wtr.write_record(&column_names)?;
703
704                // Write rows
705                for (i, row) in all_rows.iter().enumerate() {
706                    log::debug!("CSV export row {}: {:?}", i, row);
707                    wtr.write_record(row)?;
708                }
709
710                wtr.flush()?;
711                Ok(all_rows.len())
712            }
713            _ => Err(anyhow!(
714                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715                format
716            )),
717        }
718    }
719
720    async fn export_edge_type_in_format(
721        &self,
722        edge_type: &str,
723        path: &str,
724        format: &str,
725    ) -> Result<usize> {
726        match format {
727            "parquet" => self.export_edge_type(edge_type, path).await,
728            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729            _ => Err(anyhow!(
730                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731                format
732            )),
733        }
734    }
735
736    /// Write a stream of record batches to a Parquet file.
737    /// Returns the total number of rows written, or 0 if the stream is empty.
738    async fn write_batches_to_parquet(
739        mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740        path: &str,
741        entity_description: &str,
742    ) -> Result<usize> {
743        use futures::TryStreamExt;
744
745        // Get first batch to determine schema and create writer
746        let first_batch = match stream.try_next().await? {
747            Some(batch) => batch,
748            None => {
749                log::info!("No data to export from {}", entity_description);
750                return Ok(0);
751            }
752        };
753
754        // Create Parquet writer using schema from first batch
755        let file = std::fs::File::create(path)?;
756        let arrow_schema = first_batch.schema();
757        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759        // Write first batch
760        let mut count = first_batch.num_rows();
761        writer.write(&first_batch)?;
762
763        // Write remaining batches
764        while let Some(batch) = stream.try_next().await? {
765            count += batch.num_rows();
766            writer.write(&batch)?;
767        }
768
769        writer.close()?;
770
771        log::info!(
772            "Exported {} rows from {} to '{}'",
773            count,
774            entity_description,
775            path
776        );
777        Ok(count)
778    }
779
780    /// Export vertices of a specific label to Parquet
781    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782        let stream = self
783            .storage
784            .scan_vertex_table_stream(label)
785            .await?
786            .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789    }
790
791    /// Export edges of a specific type to Parquet
792    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793        let schema = self.storage.schema_manager().schema();
794        if !schema.edge_types.contains_key(edge_type) {
795            return Err(anyhow!("Edge type '{}' not found", edge_type));
796        }
797
798        let filter = format!("type = '{}'", edge_type);
799        let stream = self
800            .storage
801            .scan_main_edge_table_stream(Some(&filter))
802            .await?
803            .ok_or_else(|| anyhow!("No edge data found"))?;
804
805        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806    }
807
808    pub(crate) async fn execute_copy_from(
809        &self,
810        label: &str,
811        path: &str,
812        format: &str,
813        options: &HashMap<String, Value>,
814    ) -> Result<usize> {
815        // Read data from file
816        let batches = match format {
817            "parquet" => self.read_parquet_file(path)?,
818            "csv" => self.read_csv_file(path, label, options)?,
819            _ => {
820                return Err(anyhow!(
821                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822                    format
823                ));
824            }
825        };
826
827        // Get writer
828        let writer_arc = self
829            .writer
830            .as_ref()
831            .ok_or_else(|| anyhow!("No writer available"))?;
832
833        let db_schema = self.storage.schema_manager().schema();
834
835        // Check if this is a label (vertex) or edge type
836        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838        if is_edge {
839            // Import edges
840            let edge_type_id = db_schema
841                .edge_type_id_by_name(label)
842                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844            // Get src and dst column names from options
845            let src_col = options
846                .get("src_col")
847                .and_then(|v| v.as_str())
848                .unwrap_or("src");
849            let dst_col = options
850                .get("dst_col")
851                .and_then(|v| v.as_str())
852                .unwrap_or("dst");
853
854            // §5.7 of concurrent_writer.md: writer is hoisted above the row
855            // loop now that there is no per-row lock acquisition cost.
856            let writer: &uni_store::Writer = writer_arc.as_ref();
857            let mut total_rows = 0;
858            for batch in batches {
859                let num_rows = batch.num_rows();
860                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861                let eids = writer.allocate_eids(num_rows).await?;
862
863                for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864                    let mut properties = HashMap::new();
865                    let mut src_vid: Option<Vid> = None;
866                    let mut dst_vid: Option<Vid> = None;
867
868                    // Extract properties and VIDs from each column
869                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870                        let col_name = field.name();
871                        let column = batch.column(col_idx);
872                        let value = self.arrow_value_to_json(column, row_idx)?;
873
874                        if col_name == src_col {
875                            let raw = value.as_u64().unwrap_or_else(|| {
876                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877                            });
878                            src_vid = Some(Vid::new(raw));
879                        } else if col_name == dst_col {
880                            let raw = value.as_u64().unwrap_or_else(|| {
881                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882                            });
883                            dst_vid = Some(Vid::new(raw));
884                        } else if !col_name.starts_with('_') && !value.is_null() {
885                            properties.insert(col_name.clone(), value);
886                        }
887                    }
888
889                    let src = src_vid
890                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891                    let dst = dst_vid
892                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894                    writer
895                        .insert_edge(
896                            src,
897                            dst,
898                            edge_type_id,
899                            eid,
900                            properties,
901                            Some(label.to_string()),
902                            None,
903                        )
904                        .await?;
905
906                    total_rows += 1;
907                }
908            }
909
910            log::info!(
911                "Imported {} edge rows from '{}' into edge type '{}'",
912                total_rows,
913                path,
914                label
915            );
916
917            // Flush to persist edges
918            if total_rows > 0 {
919                writer.flush_to_l1(None).await?;
920            }
921
922            Ok(total_rows)
923        } else {
924            // Import vertices
925            // Validate the label exists in schema
926            db_schema
927                .label_id_by_name_case_insensitive(label)
928                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930            // §5.7 of concurrent_writer.md: writer is hoisted above the row
931            // loop now that there is no per-row lock acquisition cost.
932            let writer: &uni_store::Writer = writer_arc.as_ref();
933            let mut total_rows = 0;
934            for batch in batches {
935                let num_rows = batch.num_rows();
936                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937                let vids = writer.allocate_vids(num_rows).await?;
938
939                // Convert Arrow batch to rows
940                for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941                    let mut properties = HashMap::new();
942
943                    // Extract properties from each column
944                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945                        let col_name = field.name();
946
947                        // Skip internal columns
948                        if col_name.starts_with('_') {
949                            continue;
950                        }
951
952                        let column = batch.column(col_idx);
953                        let value = self.arrow_value_to_json(column, row_idx)?;
954
955                        if !value.is_null() {
956                            properties.insert(col_name.clone(), value);
957                        }
958                    }
959
960                    let _ = writer
961                        .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962                        .await?;
963
964                    total_rows += 1;
965                }
966            }
967
968            log::info!(
969                "Imported {} rows from '{}' into label '{}'",
970                total_rows,
971                path,
972                label
973            );
974
975            // Flush to persist vertices
976            if total_rows > 0 {
977                writer.flush_to_l1(None).await?;
978            }
979
980            Ok(total_rows)
981        }
982    }
983
984    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985        use arrow_array::Array;
986        use arrow_schema::DataType as ArrowDataType;
987
988        if column.is_null(row_idx) {
989            return Ok(Value::Null);
990        }
991
992        match column.data_type() {
993            ArrowDataType::Utf8 => {
994                let array = column
995                    .as_any()
996                    .downcast_ref::<arrow_array::StringArray>()
997                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998                Ok(Value::String(array.value(row_idx).to_string()))
999            }
1000            ArrowDataType::Int32 => {
1001                let array = column
1002                    .as_any()
1003                    .downcast_ref::<arrow_array::Int32Array>()
1004                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005                Ok(Value::Int(array.value(row_idx) as i64))
1006            }
1007            ArrowDataType::Int64 => {
1008                let array = column
1009                    .as_any()
1010                    .downcast_ref::<arrow_array::Int64Array>()
1011                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012                Ok(Value::Int(array.value(row_idx)))
1013            }
1014            ArrowDataType::Float32 => {
1015                let array = column
1016                    .as_any()
1017                    .downcast_ref::<arrow_array::Float32Array>()
1018                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019                Ok(Value::Float(array.value(row_idx) as f64))
1020            }
1021            ArrowDataType::Float64 => {
1022                let array = column
1023                    .as_any()
1024                    .downcast_ref::<arrow_array::Float64Array>()
1025                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026                Ok(Value::Float(array.value(row_idx)))
1027            }
1028            ArrowDataType::Boolean => {
1029                let array = column
1030                    .as_any()
1031                    .downcast_ref::<arrow_array::BooleanArray>()
1032                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033                Ok(Value::Bool(array.value(row_idx)))
1034            }
1035            ArrowDataType::UInt64 => {
1036                let array = column
1037                    .as_any()
1038                    .downcast_ref::<arrow_array::UInt64Array>()
1039                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040                Ok(Value::Int(array.value(row_idx) as i64))
1041            }
1042            _ => {
1043                // For other types, try to convert to string
1044                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045                if let Some(arr) = array {
1046                    Ok(Value::String(arr.value(row_idx).to_string()))
1047                } else {
1048                    Ok(Value::Null)
1049                }
1050            }
1051        }
1052    }
1053
1054    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055        let file = std::fs::File::open(path)?;
1056        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057            .build()?;
1058        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059    }
1060
1061    fn read_csv_file(
1062        &self,
1063        path: &str,
1064        label: &str,
1065        options: &HashMap<String, Value>,
1066    ) -> Result<Vec<arrow_array::RecordBatch>> {
1067        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069        use std::sync::Arc;
1070
1071        // Parse CSV options
1072        let has_headers = options
1073            .get("headers")
1074            .and_then(|v| v.as_bool())
1075            .unwrap_or(true);
1076
1077        // Read CSV file
1078        let file = std::fs::File::open(path)?;
1079        let mut rdr = csv::ReaderBuilder::new()
1080            .has_headers(has_headers)
1081            .from_reader(file);
1082
1083        // Get schema for type conversion
1084        let db_schema = self.storage.schema_manager().schema();
1085        let properties = db_schema.properties.get(label);
1086
1087        // Collect all rows first to determine schema
1088        let mut rows: Vec<Vec<String>> = Vec::new();
1089        let headers: Vec<String> = if has_headers {
1090            rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091        } else {
1092            Vec::new()
1093        };
1094
1095        for result in rdr.records() {
1096            let record = result?;
1097            rows.push(record.iter().map(|s| s.to_string()).collect());
1098        }
1099
1100        if rows.is_empty() {
1101            return Ok(Vec::new());
1102        }
1103
1104        // Build Arrow schema with proper types based on DB schema
1105        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106        let col_names: Vec<String> = if has_headers {
1107            headers
1108        } else {
1109            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110        };
1111
1112        for name in &col_names {
1113            let arrow_type = if let Some(props) = properties {
1114                if let Some(prop_meta) = props.get(name) {
1115                    match prop_meta.r#type {
1116                        DataType::Int32 => ArrowDataType::Int32,
1117                        DataType::Int64 => ArrowDataType::Int64,
1118                        DataType::Float32 => ArrowDataType::Float32,
1119                        DataType::Float64 => ArrowDataType::Float64,
1120                        DataType::Bool => ArrowDataType::Boolean,
1121                        _ => ArrowDataType::Utf8,
1122                    }
1123                } else {
1124                    ArrowDataType::Utf8
1125                }
1126            } else {
1127                ArrowDataType::Utf8
1128            };
1129            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130        }
1131
1132        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134        // Convert rows to Arrow arrays with proper types
1135        let mut columns: Vec<ArrayRef> = Vec::new();
1136        for (col_idx, field) in arrow_fields.iter().enumerate() {
1137            match field.data_type() {
1138                ArrowDataType::Int32 => {
1139                    let values: Vec<Option<i32>> = rows
1140                        .iter()
1141                        .map(|row| {
1142                            if col_idx < row.len() {
1143                                row[col_idx].parse().ok()
1144                            } else {
1145                                None
1146                            }
1147                        })
1148                        .collect();
1149                    columns.push(Arc::new(Int32Array::from(values)));
1150                }
1151                _ => {
1152                    // Default to string
1153                    let values: Vec<Option<String>> = rows
1154                        .iter()
1155                        .map(|row| {
1156                            if col_idx < row.len() {
1157                                Some(row[col_idx].clone())
1158                            } else {
1159                                None
1160                            }
1161                        })
1162                        .collect();
1163                    columns.push(Arc::new(StringArray::from(values)));
1164                }
1165            }
1166        }
1167
1168        let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169        Ok(vec![batch])
1170    }
1171
1172    fn parse_data_type(type_str: &str) -> Result<DataType> {
1173        use uni_common::core::schema::{CrdtType, PointType};
1174        let type_str = type_str.to_lowercase();
1175        let type_str = type_str.trim();
1176        match type_str {
1177            "string" | "text" | "varchar" => Ok(DataType::String),
1178            "int" | "integer" | "int32" => Ok(DataType::Int32),
1179            "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180            "float" | "float32" | "real" => Ok(DataType::Float32),
1181            "double" | "float64" => Ok(DataType::Float64),
1182            "bool" | "boolean" => Ok(DataType::Bool),
1183            "timestamp" => Ok(DataType::Timestamp),
1184            "date" => Ok(DataType::Date),
1185            "time" => Ok(DataType::Time),
1186            "datetime" => Ok(DataType::DateTime),
1187            "duration" => Ok(DataType::Duration),
1188            "btic" => Ok(DataType::Btic),
1189            "json" | "jsonb" => Ok(DataType::CypherValue),
1190            "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194            s if s.starts_with("vector(") && s.ends_with(')') => {
1195                let dims_str = &s[7..s.len() - 1];
1196                let dimensions = dims_str
1197                    .parse::<usize>()
1198                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199                Ok(DataType::Vector { dimensions })
1200            }
1201            s if s.starts_with("list<") && s.ends_with('>') => {
1202                let inner_type_str = &s[5..s.len() - 1];
1203                let inner_type = Self::parse_data_type(inner_type_str)?;
1204                Ok(DataType::List(Box::new(inner_type)))
1205            }
1206            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1207            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1208            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1209        }
1210    }
1211
1212    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1213        let sm = self.storage.schema_manager_arc();
1214        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1215            return Ok(());
1216        }
1217        sm.add_label_with_desc(&clause.name, clause.description)?;
1218        for prop in clause.properties {
1219            let dt = Self::parse_data_type(&prop.data_type)?;
1220            sm.add_property_with_desc(
1221                &clause.name,
1222                &prop.name,
1223                dt,
1224                prop.nullable,
1225                prop.description,
1226            )?;
1227            if prop.unique {
1228                let constraint = Constraint {
1229                    name: format!("{}_{}_unique", clause.name, prop.name),
1230                    constraint_type: ConstraintType::Unique {
1231                        properties: vec![prop.name],
1232                    },
1233                    target: ConstraintTarget::Label(clause.name.clone()),
1234                    enabled: true,
1235                };
1236                sm.add_constraint(constraint)?;
1237            }
1238        }
1239        sm.save().await?;
1240        Ok(())
1241    }
1242
1243    /// True if `key` is a generated property on any of the given labels.
1244    /// Used by the partial-write flush path (Round 12 §C) to decide
1245    /// whether the property should be added to `touched_keys` so that
1246    /// Lance MergeInsert sends the recomputed value.
1247    fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1248        let schema = self.storage.schema_manager().schema();
1249        for label in labels {
1250            if let Some(props_meta) = schema.properties.get(label)
1251                && let Some(meta) = props_meta.get(key)
1252                && meta.generation_expression.is_some()
1253            {
1254                return true;
1255            }
1256        }
1257        false
1258    }
1259
1260    pub(crate) async fn enrich_properties_with_generated_columns(
1261        &self,
1262        label_name: &str,
1263        properties: &mut HashMap<String, Value>,
1264        prop_manager: &PropertyManager,
1265        params: &HashMap<String, Value>,
1266        ctx: Option<&QueryContext>,
1267    ) -> Result<()> {
1268        let schema = self.storage.schema_manager().schema();
1269
1270        if let Some(props_meta) = schema.properties.get(label_name) {
1271            let mut generators = Vec::new();
1272            for (prop_name, meta) in props_meta {
1273                if let Some(expr_str) = &meta.generation_expression {
1274                    generators.push((prop_name.clone(), expr_str.clone()));
1275                }
1276            }
1277
1278            for (prop_name, expr_str) in generators {
1279                let cache_key = (label_name.to_string(), prop_name.clone());
1280                let expr = {
1281                    let cache = self.gen_expr_cache.read().await;
1282                    cache.get(&cache_key).cloned()
1283                };
1284
1285                let expr = match expr {
1286                    Some(e) => e,
1287                    None => {
1288                        let parsed = uni_cypher::parse_expression(&expr_str)
1289                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1290                        let mut cache = self.gen_expr_cache.write().await;
1291                        cache.insert(cache_key, parsed.clone());
1292                        parsed
1293                    }
1294                };
1295
1296                let mut scope = HashMap::new();
1297
1298                // If expression has an explicit variable, use it as an object
1299                if let Some(var) = expr.extract_variable() {
1300                    scope.insert(var, Value::Map(properties.clone()));
1301                } else {
1302                    // No explicit variable - add properties directly to scope for bare references
1303                    // e.g., "lower(email)" can reference "email" directly
1304                    for (k, v) in properties.iter() {
1305                        scope.insert(k.clone(), v.clone());
1306                    }
1307                }
1308
1309                let val = self
1310                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1311                    .await?;
1312                properties.insert(prop_name, val);
1313            }
1314        }
1315        Ok(())
1316    }
1317
1318    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1319        let sm = self.storage.schema_manager_arc();
1320        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1321            return Ok(());
1322        }
1323        sm.add_edge_type_with_desc(
1324            &clause.name,
1325            clause.src_labels,
1326            clause.dst_labels,
1327            clause.description,
1328        )?;
1329        for prop in clause.properties {
1330            let dt = Self::parse_data_type(&prop.data_type)?;
1331            sm.add_property_with_desc(
1332                &clause.name,
1333                &prop.name,
1334                dt,
1335                prop.nullable,
1336                prop.description,
1337            )?;
1338        }
1339        sm.save().await?;
1340        Ok(())
1341    }
1342
1343    /// Executes an ALTER action on a schema entity.
1344    ///
1345    /// This is a shared helper for both `execute_alter_label` and
1346    /// `execute_alter_edge_type` since they have identical logic.
1347    pub(crate) async fn execute_alter_entity(
1348        sm: &Arc<SchemaManager>,
1349        entity_name: &str,
1350        action: AlterAction,
1351    ) -> Result<()> {
1352        match action {
1353            AlterAction::AddProperty(prop) => {
1354                let dt = Self::parse_data_type(&prop.data_type)?;
1355                sm.add_property_with_desc(
1356                    entity_name,
1357                    &prop.name,
1358                    dt,
1359                    prop.nullable,
1360                    prop.description,
1361                )?;
1362            }
1363            AlterAction::DropProperty(prop_name) => {
1364                sm.drop_property(entity_name, &prop_name)?;
1365            }
1366            AlterAction::RenameProperty { old_name, new_name } => {
1367                sm.rename_property(entity_name, &old_name, &new_name)?;
1368            }
1369            AlterAction::SetDescription(desc) => {
1370                if sm.schema().labels.contains_key(entity_name) {
1371                    sm.set_label_description(entity_name, desc)?;
1372                } else {
1373                    sm.set_edge_type_description(entity_name, desc)?;
1374                }
1375            }
1376            AlterAction::SetPropertyDescription {
1377                property,
1378                description,
1379            } => {
1380                sm.set_property_description(entity_name, &property, description)?;
1381            }
1382        }
1383        sm.save().await?;
1384        Ok(())
1385    }
1386
1387    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1388        Self::execute_alter_entity(
1389            &self.storage.schema_manager_arc(),
1390            &clause.name,
1391            clause.action,
1392        )
1393        .await
1394    }
1395
1396    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1397        Self::execute_alter_entity(
1398            &self.storage.schema_manager_arc(),
1399            &clause.name,
1400            clause.action,
1401        )
1402        .await
1403    }
1404
1405    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1406        let sm = self.storage.schema_manager_arc();
1407        sm.drop_label(&clause.name, clause.if_exists)?;
1408        sm.save().await?;
1409        Ok(())
1410    }
1411
1412    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1413        let sm = self.storage.schema_manager_arc();
1414        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1415        sm.save().await?;
1416        Ok(())
1417    }
1418
1419    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1420        let sm = self.storage.schema_manager_arc();
1421        let target = ConstraintTarget::Label(clause.label);
1422        let c_type = match clause.constraint_type {
1423            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1424                properties: clause.properties,
1425            },
1426            AstConstraintType::Exists => {
1427                let property = clause
1428                    .properties
1429                    .into_iter()
1430                    .next()
1431                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1432                ConstraintType::Exists { property }
1433            }
1434            AstConstraintType::Check => {
1435                let expression = clause
1436                    .expression
1437                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1438                ConstraintType::Check {
1439                    expression: expression.to_string_repr(),
1440                }
1441            }
1442        };
1443
1444        let constraint = Constraint {
1445            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1446            constraint_type: c_type,
1447            target,
1448            enabled: true,
1449        };
1450
1451        sm.add_constraint(constraint)?;
1452        sm.save().await?;
1453        Ok(())
1454    }
1455
1456    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1457        let sm = self.storage.schema_manager_arc();
1458        sm.drop_constraint(&clause.name, false)?;
1459        sm.save().await?;
1460        Ok(())
1461    }
1462
1463    /// Detects the single-node, single-label MERGE shape the fast path serves.
1464    ///
1465    /// Returns the node pattern and its label when `pattern` is one path with
1466    /// one node element, exactly one label, and a static map-literal property
1467    /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1468    /// per-row query planning. The keys do NOT need to be indexed: the persisted
1469    /// lookup degrades to a (single, filtered) label scan when no scalar index
1470    /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1471    /// Any other shape (edges, multiple labels, non-literal properties) returns
1472    /// `None` so the caller uses the general per-row path.
1473    fn merge_single_node_fastpath<'p>(
1474        &self,
1475        pattern: &'p Pattern,
1476    ) -> Option<(&'p NodePattern, String)> {
1477        if pattern.paths.len() != 1 {
1478            return None;
1479        }
1480        let path = &pattern.paths[0];
1481        if path.elements.len() != 1 {
1482            return None;
1483        }
1484        let PatternElement::Node(n) = &path.elements[0] else {
1485            return None;
1486        };
1487        let labels = n.labels.names();
1488        if labels.len() != 1 {
1489            return None;
1490        }
1491        // The key must be a static map literal so the key names are known.
1492        let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1493            return None;
1494        };
1495        if entries.is_empty() {
1496            return None;
1497        }
1498        // Resolve the label to its schema-canonical case so the fast path agrees
1499        // with the general MERGE path (which matches labels case-insensitively).
1500        // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1501        // scans/keys a different label than the canonical one and creates a
1502        // duplicate (review #3a). Falls back to the as-written label when the
1503        // schema does not know it (schemaless).
1504        let canonical = self
1505            .storage
1506            .schema_manager()
1507            .schema()
1508            .canonical_label_name(&labels[0])
1509            .unwrap_or_else(|| labels[0].clone());
1510        Some((n, canonical))
1511    }
1512
1513    /// RC3: detect the bound-endpoints, anonymous-edge relationship MERGE shape
1514    /// `(a)-[:TYPE]->(b)` whose edge existence can be resolved with one O(1)
1515    /// adjacency probe instead of building and running a per-row traversal
1516    /// `LogicalPlan` (the general path is ~19x the bulk CREATE of the same edges).
1517    ///
1518    /// Returns `(source_var, target_var, edge_type_id, direction)` when the
1519    /// pattern is exactly one path of `[Node, Rel, Node]` where the relationship
1520    /// is a single concrete type, **anonymous** (no variable → no edge binding to
1521    /// reproduce), fixed-length, and unfiltered, and both endpoint nodes are plain
1522    /// variables with no re-specified MERGE properties or inline WHERE (those are
1523    /// filters only the general path applies). Any deviation returns `None` and
1524    /// the caller keeps the general path. The caller still verifies per row that
1525    /// both endpoints are actually bound to vids.
1526    fn merge_relationship_fastpath_shape(
1527        &self,
1528        pattern: &Pattern,
1529    ) -> Option<(
1530        String,
1531        String,
1532        u32,
1533        uni_store::storage::direction::Direction,
1534    )> {
1535        if pattern.paths.len() != 1 {
1536            return None;
1537        }
1538        let [
1539            PatternElement::Node(a),
1540            PatternElement::Relationship(r),
1541            PatternElement::Node(b),
1542        ] = pattern.paths[0].elements.as_slice()
1543        else {
1544            return None;
1545        };
1546        // Endpoints: plain variables, no extra MERGE-pattern properties / inline
1547        // WHERE (a re-specified property is a filter the general path must apply).
1548        let src_var = a.variable.as_ref()?;
1549        let dst_var = b.variable.as_ref()?;
1550        if a.properties.is_some()
1551            || a.where_clause.is_some()
1552            || b.properties.is_some()
1553            || b.where_clause.is_some()
1554        {
1555            return None;
1556        }
1557        // Relationship: single concrete type, anonymous, fixed-length, unfiltered.
1558        if r.variable.is_some()
1559            || r.range.is_some()
1560            || r.properties.is_some()
1561            || r.where_clause.is_some()
1562            || r.types.names().len() != 1
1563        {
1564            return None;
1565        }
1566        let type_name = &r.types.names()[0];
1567        let type_id = if self.config.strict_schema {
1568            self.storage
1569                .schema_manager()
1570                .schema()
1571                .edge_type_id_by_name_case_insensitive(type_name)?
1572        } else {
1573            self.storage
1574                .schema_manager()
1575                .get_or_assign_edge_type_id(type_name)
1576        };
1577        let dir = match r.direction {
1578            uni_cypher::ast::Direction::Outgoing => {
1579                uni_store::storage::direction::Direction::Outgoing
1580            }
1581            uni_cypher::ast::Direction::Incoming => {
1582                uni_store::storage::direction::Direction::Incoming
1583            }
1584            // Undirected existence is ambiguous to encode as one probe; the
1585            // general path handles it.
1586            uni_cypher::ast::Direction::Both => return None,
1587        };
1588        Some((src_var.clone(), dst_var.clone(), type_id, dir))
1589    }
1590
1591    /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1592    /// is not a scalar this fast path can represent.
1593    ///
1594    /// Returning `None` makes the caller fall back to the general per-row path,
1595    /// so unusual key value types (lists, maps, temporals, nulls) are never
1596    /// silently mis-matched. The `_deleted = false` clause mirrors the
1597    /// persisted-read predicate used elsewhere; the version high-water-mark
1598    /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1599    fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1600        if key_props.is_empty() {
1601            return None;
1602        }
1603        let mut parts = Vec::with_capacity(key_props.len() + 1);
1604        for (k, v) in key_props {
1605            if !Self::is_safe_key_ident(k) {
1606                return None;
1607            }
1608            let lit = Self::render_key_literal(v)?;
1609            // Unquoted identifier: the Lance filter parser does not resolve a
1610            // double-quoted column name against the table here, so `"k" = v`
1611            // silently matches nothing. Keys are validated above to be safe
1612            // bare identifiers.
1613            parts.push(format!("{k} = {lit}"));
1614        }
1615        parts.push("_deleted = false".to_string());
1616        Some(parts.join(" AND "))
1617    }
1618
1619    /// True when a MERGE key name is a safe bare identifier for a Lance
1620    /// filter (issue #8). Keys come from a static map literal, but validate
1621    /// anyway.
1622    fn is_safe_key_ident(k: &str) -> bool {
1623        !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1624    }
1625
1626    /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1627    /// for value types this fast path cannot represent (lists, maps,
1628    /// temporals, nulls) — the caller then falls back to the general path.
1629    fn render_key_literal(v: &Value) -> Option<String> {
1630        Some(match v {
1631            Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1632            Value::Int(i) => i.to_string(),
1633            Value::Float(f) => f.to_string(),
1634            Value::Bool(b) => b.to_string(),
1635            _ => return None,
1636        })
1637    }
1638
1639    /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1640    /// sorted by `key_names` order, values canonicalized).
1641    ///
1642    /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1643    /// never compares mixed literal types against one column); composite keys
1644    /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1645    /// the same `_deleted = false` clause the per-row filter used.
1646    fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1647        if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1648            return None;
1649        }
1650        let disjunction = if let [key] = key_names {
1651            // Group literals by value variant so each IN list is homogeneous.
1652            let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1653            for tuple in keys {
1654                let (_, v) = tuple.first()?;
1655                groups
1656                    .entry(std::mem::discriminant(v))
1657                    .or_default()
1658                    .push(Self::render_key_literal(v)?);
1659            }
1660            groups
1661                .into_values()
1662                .map(|lits| {
1663                    if let [lit] = lits.as_slice() {
1664                        format!("{key} = {lit}")
1665                    } else {
1666                        format!("{key} IN ({})", lits.join(", "))
1667                    }
1668                })
1669                .collect::<Vec<_>>()
1670                .join(" OR ")
1671        } else {
1672            keys.iter()
1673                .map(|tuple| {
1674                    let conj = tuple
1675                        .iter()
1676                        .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1677                        .collect::<Option<Vec<_>>>()?
1678                        .join(" AND ");
1679                    Some(format!("({conj})"))
1680                })
1681                .collect::<Option<Vec<_>>>()?
1682                .join(" OR ")
1683        };
1684        Some(format!("({disjunction}) AND _deleted = false"))
1685    }
1686
1687    /// Canonicalize a numeric MERGE-key value for *matching only*.
1688    ///
1689    /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1690    /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1691    /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1692    /// path already applies (review #3a). Non-numeric and non-integral values are
1693    /// returned unchanged. Used only to build match keys / comparisons, never the
1694    /// value written to a created node.
1695    fn canonical_key_value(v: &Value) -> Value {
1696        match v {
1697            Value::Float(f)
1698                if f.is_finite()
1699                    && f.fract() == 0.0
1700                    && *f >= i64::MIN as f64
1701                    && *f <= i64::MAX as f64 =>
1702            {
1703                Value::Int(*f as i64)
1704            }
1705            other => other.clone(),
1706        }
1707    }
1708
1709    /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1710    ///
1711    /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1712    /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1713    /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1714    /// created node's properties come from the original, un-canonicalized map.
1715    fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1716        let mut tuple: MergeKey = key_props
1717            .iter()
1718            .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1719            .collect();
1720        tuple.sort_by(|a, b| a.0.cmp(&b.0));
1721        tuple
1722    }
1723
1724    /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1725    ///
1726    /// Walked once per MERGE statement (issue #69): the per-row fast path then
1727    /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1728    /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1729    /// rows and rows created earlier in the same transaction; rows created by
1730    /// later rows of this same statement are folded in incrementally by
1731    /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1732    /// [`Self::merge_key_tuple`].
1733    fn merge_l0_existing(
1734        &self,
1735        label: &str,
1736        key_names: &[String],
1737        ctx: Option<&QueryContext>,
1738    ) -> HashMap<MergeKey, Vec<Vid>> {
1739        let mut candidates: Vec<Vid> = Vec::new();
1740        l0_visibility::visit_l0_buffers(ctx, |l0| {
1741            if let Some(vids) = l0.label_to_vids.get(label) {
1742                candidates.extend(vids.iter().copied());
1743            }
1744            false
1745        });
1746
1747        let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1748        let mut seen: HashSet<Vid> = HashSet::new();
1749        for vid in candidates {
1750            if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1751                continue;
1752            }
1753            // `lookup_vertex_prop` merges across L0 layers (newest wins).
1754            let tuple: MergeKey = key_names
1755                .iter()
1756                .map(|k| {
1757                    let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1758                    (k.clone(), Self::canonical_key_value(&v))
1759                })
1760                .collect();
1761            map.entry(tuple).or_default().push(vid);
1762        }
1763        map
1764    }
1765
1766    /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1767    /// size and Lance/DataFusion parse cost; chunks run sequentially.
1768    const MERGE_SCAN_CHUNK: usize = 1000;
1769
1770    /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1771    /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1772    /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1773    /// independent Lance scans).
1774    ///
1775    /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1776    /// read path `MATCH` uses, so it honors the version high-water-mark and
1777    /// sees flushed rows. On the declared-label branch the key-filtered scan
1778    /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1779    /// picks each candidate's max-`_version` row and requires it to be live
1780    /// and still keyed as requested (per-label tables are MVCC-append, so a
1781    /// superseded version's row would otherwise stale-match a rewritten key).
1782    /// Matched rows are grouped by their CANONICAL key tuple (stored values
1783    /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1784    /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1785    /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1786    /// by earlier rows of the same statement) is NOT checked here — the
1787    /// per-row consumer re-checks at row time, exactly as the old per-row
1788    /// scan did.
1789    ///
1790    /// The second returned map carries the FULL property maps the schemaless
1791    /// branch already decoded for each matched vid (empty on the declared-label
1792    /// branch, which projects only key columns) — the caller seeds the
1793    /// statement-level [`Prefetch`] from it at zero extra scans.
1794    ///
1795    /// # Errors
1796    /// Propagates persisted-scan and filter-build failures — fail-closed: a
1797    /// MERGE must never treat a failed lookup as "no match" and create
1798    /// duplicates.
1799    async fn merge_lookup_persisted_batch(
1800        &self,
1801        label: &str,
1802        key_names: &[String],
1803        keys: &HashSet<MergeKey>,
1804    ) -> Result<(
1805        HashMap<MergeKey, Vec<Vid>>,
1806        HashMap<Vid, uni_common::Properties>,
1807    )> {
1808        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1809        if keys.is_empty() {
1810            return Ok((out, HashMap::new()));
1811        }
1812        // An undeclared (schemaless) label has no per-label table — its flushed
1813        // rows live only in the unified main vertex table. Route to the
1814        // main-table lookup, mirroring the planner's scan routing (a schemaless
1815        // MATCH plans `ScanMainByLabels` on the same schema predicate).
1816        if self
1817            .storage
1818            .schema_manager()
1819            .schema()
1820            .get_label_case_insensitive(label)
1821            .is_none()
1822        {
1823            return self
1824                .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1825                .await;
1826        }
1827        // Declared label — the per-label table is MVCC-append (an update
1828        // flush adds a higher-`_version` row for the same vid) and the key
1829        // predicate is pushed into the Lance filter, so a SUPERSEDED version
1830        // whose row still carries a requested key is returned while the vid's
1831        // current row (key rewritten, fails the filter) is invisible to the
1832        // scan. Version dedup among the returned rows cannot detect that, so
1833        // the lookup runs in two passes: the key-filtered scan only nominates
1834        // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1835        // each candidate's max-`_version` row to be live and still keyed as
1836        // requested.
1837        let mut columns: Vec<&str> = vec!["_vid"];
1838        columns.extend(key_names.iter().map(String::as_str));
1839
1840        let key_list: Vec<&MergeKey> = keys.iter().collect();
1841        let mut candidates: Vec<Vid> = Vec::new();
1842        let mut seen: HashSet<Vid> = HashSet::new();
1843        for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1844            let filter = Self::merge_batch_filter(key_names, chunk)
1845                .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1846            let scanned = self
1847                .storage
1848                .scan_vertex_table(label, &columns, Some(&filter))
1849                .await?;
1850            let Some(batch) = scanned else { continue };
1851            let Some(vid_col) = batch
1852                .column_by_name("_vid")
1853                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1854            else {
1855                continue;
1856            };
1857            for i in 0..vid_col.len() {
1858                let vid = Vid::from(vid_col.value(i));
1859                if seen.insert(vid) {
1860                    candidates.push(vid);
1861                }
1862            }
1863        }
1864
1865        // Verification pass — tombstones are NOT filtered Lance-side (the
1866        // max-version pick must see them so a deleted winner cannot let an
1867        // older live version resurrect the match), exactly like the
1868        // schemaless branch below.
1869        let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1870        verify_columns.extend(key_names.iter().map(String::as_str));
1871        for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1872            let vid_list = chunk
1873                .iter()
1874                .map(|v| v.as_u64().to_string())
1875                .collect::<Vec<_>>()
1876                .join(", ");
1877            let filter = format!("_vid IN ({vid_list})");
1878            let scanned = self
1879                .storage
1880                .scan_vertex_table(label, &verify_columns, Some(&filter))
1881                .await?;
1882            let Some(batch) = scanned else { continue };
1883            let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1884                batch
1885                    .column_by_name("_vid")
1886                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1887                batch
1888                    .column_by_name("_deleted")
1889                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1890                batch
1891                    .column_by_name("_version")
1892                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1893            ) else {
1894                return Err(anyhow!(
1895                    "MERGE batched lookup: verification scan missing a required column"
1896                ));
1897            };
1898            let key_cols: Vec<_> = key_names
1899                .iter()
1900                .map(|k| batch.column_by_name(k))
1901                .collect::<Option<Vec<_>>>()
1902                .ok_or_else(|| {
1903                    anyhow!("MERGE batched lookup: projected key column missing from scan result")
1904                })?;
1905            // Per-vid MVCC dedup: keep the highest-version row for each vid.
1906            let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1907            for i in 0..batch.num_rows() {
1908                let vid = Vid::from(vid_col.value(i));
1909                let ver = ver_col.value(i);
1910                let entry = winners.entry(vid).or_insert((ver, i));
1911                if ver > entry.0 {
1912                    *entry = (ver, i);
1913                }
1914            }
1915            for (vid, (_ver, row)) in winners {
1916                if del_col.value(row) {
1917                    continue;
1918                }
1919                let tuple: MergeKey = key_names
1920                    .iter()
1921                    .zip(&key_cols)
1922                    .map(|(k, col)| {
1923                        let v = uni_store::storage::arrow_convert::arrow_to_value(
1924                            col.as_ref(),
1925                            row,
1926                            None,
1927                        );
1928                        (k.clone(), Self::canonical_key_value(&v))
1929                    })
1930                    .collect();
1931                if keys.contains(&tuple) {
1932                    out.entry(tuple).or_default().push(vid);
1933                }
1934            }
1935        }
1936        Ok((out, HashMap::new()))
1937    }
1938
1939    /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1940    ///
1941    /// Schemaless rows live only in the unified main vertex table (per-label
1942    /// tables exist only for declared labels), with all properties encoded in
1943    /// the `props_json` CypherValue blob — so key values cannot be pushed into
1944    /// the Lance filter; the key match happens in memory after decoding,
1945    /// exactly like the schemaless MATCH scan. One main-table scan regardless
1946    /// of key count.
1947    ///
1948    /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1949    /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1950    /// version per vid); the per-vid max-`_version` dedup runs here, then
1951    /// deleted winners are dropped.
1952    ///
1953    /// Also returns the full decoded property map per matched vid — the blob
1954    /// is decoded here anyway, and the caller seeds the statement-level
1955    /// [`Prefetch`] from it instead of re-reading per row.
1956    ///
1957    /// # Errors
1958    /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
1959    /// never treat a failed lookup as "no match" and create duplicates.
1960    async fn merge_lookup_persisted_batch_schemaless(
1961        &self,
1962        label: &str,
1963        key_names: &[String],
1964        keys: &HashSet<MergeKey>,
1965    ) -> Result<(
1966        HashMap<MergeKey, Vec<Vid>>,
1967        HashMap<Vid, uni_common::Properties>,
1968    )> {
1969        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1970        let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
1971        let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
1972        let Some(batch) = self
1973            .storage
1974            .scan_main_vertex_table(
1975                &["_vid", "_deleted", "props_json", "_version"],
1976                Some(&filter),
1977            )
1978            .await?
1979        else {
1980            return Ok((out, props_by_vid));
1981        };
1982        let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1983            batch
1984                .column_by_name("_vid")
1985                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1986            batch
1987                .column_by_name("_deleted")
1988                .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1989            batch
1990                .column_by_name("_version")
1991                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1992        ) else {
1993            return Err(anyhow!(
1994                "schemaless MERGE lookup: main vertex table scan missing a required column"
1995            ));
1996        };
1997        let props_col = batch
1998            .column_by_name("props_json")
1999            .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2000
2001        // Per-vid MVCC dedup: keep the highest-version row for each vid.
2002        let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
2003        for i in 0..batch.num_rows() {
2004            let vid = Vid::from(vid_col.value(i));
2005            let ver = ver_col.value(i);
2006            let entry = winners.entry(vid).or_insert((ver, i));
2007            if ver > entry.0 {
2008                *entry = (ver, i);
2009            }
2010        }
2011        for (vid, (_ver, row)) in winners {
2012            // Drop deletion tombstones AFTER picking the winner — a deleted
2013            // winner must not let an older live version resurrect the match.
2014            if del_col.value(row) {
2015                continue;
2016            }
2017            // A row without properties matches only an all-Null key tuple.
2018            let props = match props_col {
2019                Some(arr) if !arrow_array::Array::is_null(arr, row) => {
2020                    match uni_common::cypher_value_codec::decode(arr.value(row))
2021                        .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
2022                    {
2023                        Value::Map(m) => m,
2024                        _ => HashMap::new(),
2025                    }
2026                }
2027                _ => HashMap::new(),
2028            };
2029            let tuple: MergeKey = key_names
2030                .iter()
2031                .map(|k| {
2032                    (
2033                        k.clone(),
2034                        Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
2035                    )
2036                })
2037                .collect();
2038            if keys.contains(&tuple) {
2039                out.entry(tuple).or_default().push(vid);
2040                props_by_vid.insert(vid, props);
2041            }
2042        }
2043        Ok((out, props_by_vid))
2044    }
2045
2046    /// True if the statement-level MERGE property prefetch is safe for `label`.
2047    ///
2048    /// False when the label declares any CRDT-typed property: a prefetch HIT in
2049    /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
2050    /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
2051    /// labels keep the per-row read path. Undeclared labels are trivially safe
2052    /// (normalization is a no-op without schema CRDT entries).
2053    fn merge_label_prefetch_safe(&self, label: &str) -> bool {
2054        let schema = self.storage.schema_manager().schema();
2055        schema.properties.get(label).is_none_or(|props| {
2056            !props
2057                .values()
2058                .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
2059        })
2060    }
2061
2062    /// True if an L0 override rewrote any key column of a persisted match away
2063    /// from its requested value (so the persisted row no longer matches).
2064    fn vid_overrides_break_key(
2065        vid: Vid,
2066        key_props: &HashMap<String, Value>,
2067        ctx: Option<&QueryContext>,
2068    ) -> bool {
2069        key_props.iter().any(|(k, want)| {
2070            matches!(
2071                l0_visibility::lookup_vertex_prop(vid, k, ctx),
2072                Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
2073            )
2074        })
2075    }
2076
2077    /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2078    /// node variable.
2079    ///
2080    /// Matches the binding shape produced by `execute_create_pattern` and the
2081    /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2082    /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2083    /// valid node binding for those consumers.
2084    fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2085        let mut obj = HashMap::new();
2086        obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2087        obj.insert(
2088            "_labels".to_string(),
2089            Value::List(vec![Value::String(label.to_string())]),
2090        );
2091        for (k, v) in props {
2092            obj.insert(k, v);
2093        }
2094        Value::Map(obj)
2095    }
2096
2097    /// True if an L0-only vertex has every key column set to the requested
2098    /// value. A missing column matches only a requested `Null`.
2099    fn l0_vid_matches_key(
2100        vid: Vid,
2101        key_props: &HashMap<String, Value>,
2102        ctx: Option<&QueryContext>,
2103    ) -> bool {
2104        key_props.iter().all(
2105            |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2106                Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2107                None => *want == Value::Null,
2108            },
2109        )
2110    }
2111
2112    /// Index fast-path execution for one MERGE row of the shape detected by
2113    /// [`Self::merge_single_node_fastpath`].
2114    ///
2115    /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2116    /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2117    /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2118    /// applies ON MATCH SET to every match, or creates the node and applies
2119    /// ON CREATE SET when there is none. A newly created vertex is folded into
2120    /// `existing` so a later row of the same batch with the same key matches it
2121    /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2122    /// match, or one for a create).
2123    ///
2124    /// `prefetched` is the statement-level property prefetch (`None` when the
2125    /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2126    /// vids carry their persisted base row, freshly created vids are seeded
2127    /// with an empty base — per-row reads then resolve as base + L0 layering
2128    /// (every SET flush writes the full row to L0 before the next read, so a
2129    /// prefetch hit equals a fresh read) instead of one storage scan each.
2130    ///
2131    /// # Errors
2132    /// Propagates evaluation, create, and SET failures.
2133    #[expect(
2134        clippy::too_many_arguments,
2135        reason = "mirrors execute_merge's threaded execution state"
2136    )]
2137    async fn execute_merge_row_indexed(
2138        &self,
2139        label: &str,
2140        node: &NodePattern,
2141        path_pattern: &Pattern,
2142        temp_vars: &[String],
2143        mut row: HashMap<String, Value>,
2144        key_props: &HashMap<String, Value>,
2145        persisted: &HashMap<MergeKey, Vec<Vid>>,
2146        key_tuple: &MergeKey,
2147        existing: &mut HashMap<MergeKey, Vec<Vid>>,
2148        on_match: Option<&SetClause>,
2149        on_create: Option<&SetClause>,
2150        prop_manager: &PropertyManager,
2151        params: &HashMap<String, Value>,
2152        ctx: Option<&QueryContext>,
2153        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2154        writer: &Writer,
2155        mut prefetched: Option<&mut Prefetch>,
2156    ) -> Result<Vec<HashMap<String, Value>>> {
2157        let empty_prefetch = Prefetch::default();
2158        let mut seen: HashSet<Vid> = HashSet::new();
2159        let mut matches: Vec<Vid> = Vec::new();
2160        // Persisted (flushed) matches from the per-statement prefetch. The
2161        // prefetch is static for the statement, so re-verify liveness at row
2162        // time — an earlier row of this batch may have deleted the candidate
2163        // or rewritten its key (the old per-row scan saw those through its L0
2164        // overlay checks; these are the same checks, moved to row time).
2165        if let Some(vids) = persisted.get(key_tuple) {
2166            for &vid in vids {
2167                if l0_visibility::is_vertex_deleted(vid, ctx) {
2168                    continue;
2169                }
2170                if Self::vid_overrides_break_key(vid, key_props, ctx) {
2171                    continue;
2172                }
2173                if seen.insert(vid) {
2174                    matches.push(vid);
2175                }
2176            }
2177        }
2178        // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2179        // in case a prior row of this batch mutated or deleted the candidate.
2180        if let Some(vids) = existing.get(key_tuple) {
2181            for &vid in vids {
2182                if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2183                    continue;
2184                }
2185                if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2186                    matches.push(vid);
2187                }
2188            }
2189        }
2190
2191        let mut out = Vec::new();
2192        if matches.is_empty() {
2193            // No match: create the node, then apply ON CREATE SET. Fold the
2194            // ON CREATE SET property assignments into seed props first so a
2195            // NOT-NULL property supplied only by ON CREATE SET passes
2196            // create-time validation (RC4); the post-create SET below settles
2197            // the final values.
2198            let seed_props = self
2199                .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2200                .await?;
2201            self.execute_create_pattern(
2202                path_pattern,
2203                &mut row,
2204                writer,
2205                prop_manager,
2206                params,
2207                ctx,
2208                tx_l0_override,
2209                Some(&seed_props),
2210            )
2211            .await?;
2212            // Fold the new vertex into the batch snapshot for intra-batch
2213            // dedup, and seed the statement prefetch with an empty base: a
2214            // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2215            // resolves from the L0 row the create just wrote instead of
2216            // issuing a per-row storage scan that finds nothing.
2217            if let Some(var) = &node.variable
2218                && let Some(val) = row.get(var)
2219                && let Ok(vid) = Self::vid_from_value(val)
2220            {
2221                existing.entry(key_tuple.clone()).or_default().push(vid);
2222                if let Some(p) = prefetched.as_deref_mut() {
2223                    p.vertex.entry(vid).or_default();
2224                }
2225                // Phantom guard (RC2): register this MERGE-create's key so a
2226                // concurrent MERGE of the same key aborts retriably at commit
2227                // (converging to one node) instead of silently duplicating —
2228                // even with no declared UNIQUE constraint. Only inside a
2229                // transaction, where commit re-probes the guard; a plain CREATE
2230                // never registers a key, so it is unaffected.
2231                if let Some(tx_l0) = tx_l0_override {
2232                    let key_values: Vec<(String, Value)> = key_props
2233                        .iter()
2234                        .map(|(k, v)| (k.clone(), v.clone()))
2235                        .collect();
2236                    let guard_key =
2237                        uni_store::runtime::l0::serialize_constraint_key(label, &key_values);
2238                    tx_l0.write().insert_merge_guard_key(guard_key, vid);
2239                }
2240            }
2241            if let Some(set) = on_create {
2242                self.execute_set_items_locked(
2243                    &set.items,
2244                    &mut row,
2245                    writer,
2246                    prop_manager,
2247                    params,
2248                    ctx,
2249                    tx_l0_override,
2250                    prefetched.as_deref().unwrap_or(&empty_prefetch),
2251                )
2252                .await?;
2253            }
2254            Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2255            out.push(row);
2256        } else {
2257            // Apply ON MATCH SET to every matched node (multi-match semantics),
2258            // binding the node variable as a Map with _vid/_labels/props so
2259            // RETURN and downstream operators resolve it as they would for the
2260            // general MATCH and CREATE paths.
2261            for vid in matches {
2262                let mut m = row.clone();
2263                if let Some(var) = &node.variable {
2264                    // Minimal binding so ON MATCH SET resolves the node by _vid.
2265                    m.insert(
2266                        var.clone(),
2267                        Self::build_node_map(vid, label, HashMap::new()),
2268                    );
2269                }
2270                if let Some(set) = on_match {
2271                    self.execute_set_items_locked(
2272                        &set.items,
2273                        &mut m,
2274                        writer,
2275                        prop_manager,
2276                        params,
2277                        ctx,
2278                        tx_l0_override,
2279                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2280                    )
2281                    .await?;
2282                }
2283                if let Some(var) = &node.variable {
2284                    // Rebind with full, post-SET properties for RETURN
2285                    // fidelity. The SET above flushed the full row to L0, so a
2286                    // prefetch hit (base + L0 layering) reproduces exactly
2287                    // what a fresh storage read would return.
2288                    let props = read_vertex_props_with_prefetch(
2289                        vid,
2290                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2291                        prop_manager,
2292                        ctx,
2293                    )
2294                    .await?;
2295                    m.insert(var.clone(), Self::build_node_map(vid, label, props));
2296                }
2297                Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2298                out.push(m);
2299            }
2300        }
2301        Ok(out)
2302    }
2303
2304    #[expect(clippy::too_many_arguments)]
2305    pub(crate) async fn execute_merge(
2306        &self,
2307        rows: Vec<HashMap<String, Value>>,
2308        pattern: &Pattern,
2309        on_match: Option<&SetClause>,
2310        on_create: Option<&SetClause>,
2311        prop_manager: &PropertyManager,
2312        params: &HashMap<String, Value>,
2313        ctx: Option<&QueryContext>,
2314        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2315    ) -> Result<Vec<HashMap<String, Value>>> {
2316        let writer_lock = self
2317            .writer
2318            .as_ref()
2319            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2320
2321        // Prepare pattern for path variable binding: assign temp edge variable
2322        // names to unnamed relationships in paths that have path variables.
2323        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2324
2325        // Issue #69: a single-node, single-label MERGE takes the fast path,
2326        // skipping the per-row query planning that made batched MERGE no faster
2327        // than a per-entity loop. Indexed keys get an index point-lookup;
2328        // un-indexed keys still skip planning (the lookup is a filtered scan).
2329        // The shape is the same for every row, so it is detected once.
2330        let fastpath = self.merge_single_node_fastpath(pattern);
2331
2332        // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2333        // fast path then resolves L0/intra-batch matches with an O(1) lookup
2334        // instead of re-walking L0 for every row. `key_names` is the sorted
2335        // static key set, matching `merge_key_tuple`.
2336        let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2337        // Per-row pre-evaluated fast-path keys (None = that row falls back to
2338        // the general path), and the per-statement persisted prefetch over the
2339        // deduped key tuples — ONE chunked scan instead of one scan per row.
2340        // Key expressions only see the row's own bindings + params, so
2341        // evaluating them ahead of any creates cannot observe earlier rows.
2342        let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2343        let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2344        // Statement-level property prefetch for the fast path (review perf
2345        // residual): every persisted match's full row is batch-read ONCE, so
2346        // the per-row ON MATCH SET read and the post-SET rebind resolve as
2347        // prefetch-base + L0 layering instead of one storage scan each.
2348        // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2349        // skips CRDT normalization).
2350        let mut merge_prefetch: Option<Prefetch> = None;
2351        if let Some((node, label)) = &fastpath {
2352            let mut key_names: Vec<String> = match &node.properties {
2353                Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2354                _ => Vec::new(),
2355            };
2356            key_names.sort();
2357            fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2358
2359            row_fast.reserve(rows.len());
2360            for row in &rows {
2361                let mut key_props: HashMap<String, Value> = HashMap::new();
2362                if let Some(props_expr) = &node.properties
2363                    && let Value::Map(map) = self
2364                        .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2365                        .await?
2366                {
2367                    key_props = map;
2368                }
2369                // Only rows whose every key value is a scalar the persisted
2370                // scan can express take the fast path (same gate as before,
2371                // via the filter builder).
2372                if Self::merge_key_filter(&key_props).is_some() {
2373                    let tuple = Self::merge_key_tuple(&key_props);
2374                    row_fast.push(Some((key_props, tuple)));
2375                } else {
2376                    row_fast.push(None);
2377                }
2378            }
2379            let unique_keys: HashSet<MergeKey> = row_fast
2380                .iter()
2381                .flatten()
2382                .map(|(_, tuple)| tuple.clone())
2383                .collect();
2384            let (persisted, schemaless_props) = self
2385                .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2386                .await?;
2387            fast_persisted = persisted;
2388            if self.merge_label_prefetch_safe(label) {
2389                let mut pf = Prefetch::default();
2390                if !schemaless_props.is_empty() {
2391                    // The schemaless lookup already decoded each matched vid's
2392                    // full property map — zero extra scans.
2393                    pf.vertex.extend(schemaless_props);
2394                } else {
2395                    let vids: Vec<Vid> = fast_persisted
2396                        .values()
2397                        .flatten()
2398                        .copied()
2399                        .collect::<HashSet<Vid>>()
2400                        .into_iter()
2401                        .collect();
2402                    if !vids.is_empty()
2403                        && let Ok(batch_props) = prop_manager
2404                            .get_batch_vertex_props_for_label(&vids, label, ctx)
2405                            .await
2406                    {
2407                        // One `_vid IN (…)` scan for every matched row's base.
2408                        // On Err the map stays empty — every read falls back to
2409                        // the per-row path (fail-open, same posture as
2410                        // prefetch_set_targets).
2411                        pf.vertex.extend(batch_props);
2412                    }
2413                }
2414                merge_prefetch = Some(pf);
2415            }
2416        }
2417
2418        // RC3: relationship-MERGE existence fast-path. The single-node fast path
2419        // above does not cover `(a)-[:R]->(b)`; the general path rebuilds and runs
2420        // a per-row traversal `LogicalPlan` just to check whether the edge exists
2421        // (~19x the bulk CREATE of the same edges). For the bound-endpoints,
2422        // anonymous-edge shape (and no ON MATCH SET, whose match-row semantics the
2423        // general path materialises) we resolve existence with one MVCC-correct
2424        // adjacency probe — `GraphExecutionContext::get_neighbors` merges CSR + all
2425        // L0 buffers including the transaction's own writes, so intra-batch edges
2426        // are seen — and reuse the general create / ON CREATE handling unchanged.
2427        // An ON MATCH SET with actual items needs the general path's materialised
2428        // match rows; a plain MERGE carries an *empty* on_match, which the fast
2429        // path can serve (it emits the row directly, applying nothing on match).
2430        let on_match_empty = on_match.is_none_or(|s| s.items.is_empty());
2431        let rel_fast = if fastpath.is_none() && on_match_empty {
2432            self.merge_relationship_fastpath_shape(pattern)
2433        } else {
2434            None
2435        };
2436        let rel_graph_ctx = rel_fast.as_ref().map(|_| {
2437            let l0_context = match ctx {
2438                Some(c) => crate::query::df_graph::L0Context::from_query_context(c),
2439                None => crate::query::df_graph::L0Context::empty(),
2440            };
2441            let pm_arc = self.prop_manager_arc.clone().unwrap_or_else(|| {
2442                Arc::new(PropertyManager::new(
2443                    self.storage.clone(),
2444                    self.storage.schema_manager_arc(),
2445                    prop_manager.cache_size(),
2446                ))
2447            });
2448            crate::query::df_graph::GraphExecutionContext::with_l0_context(
2449                self.effective_storage(),
2450                l0_context,
2451                pm_arc,
2452            )
2453        });
2454
2455        let mut results = Vec::new();
2456        for (idx, mut row) in rows.into_iter().enumerate() {
2457            // Rows with a pre-evaluated scalar key take the fast path; rows
2458            // with a non-scalar key fall through to the general path below.
2459            if let Some((node, label)) = &fastpath
2460                && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2461            {
2462                let writer: &uni_store::Writer = writer_lock.as_ref();
2463                let row_out = self
2464                    .execute_merge_row_indexed(
2465                        label,
2466                        node,
2467                        &path_pattern,
2468                        &temp_vars,
2469                        row,
2470                        key_props,
2471                        &fast_persisted,
2472                        key_tuple,
2473                        &mut fast_existing,
2474                        on_match,
2475                        on_create,
2476                        prop_manager,
2477                        params,
2478                        ctx,
2479                        tx_l0_override,
2480                        writer,
2481                        merge_prefetch.as_mut(),
2482                    )
2483                    .await?;
2484                results.extend(row_out);
2485                continue;
2486            }
2487
2488            // RC3 relationship fast path: bound endpoints → resolve edge
2489            // existence with one adjacency probe and reuse the general
2490            // create / ON CREATE handling, skipping the per-row traversal plan.
2491            if let (Some((src_var, dst_var, type_id, dir)), Some(graph_ctx)) =
2492                (rel_fast.as_ref(), rel_graph_ctx.as_ref())
2493            {
2494                let src_vid = row.get(src_var).and_then(|v| Self::vid_from_value(v).ok());
2495                let dst_vid = row.get(dst_var).and_then(|v| Self::vid_from_value(v).ok());
2496                if let (Some(src_vid), Some(dst_vid)) = (src_vid, dst_vid) {
2497                    let exists = graph_ctx
2498                        .get_neighbors(src_vid, *type_id, *dir)
2499                        .into_iter()
2500                        .any(|(n, _eid)| n == dst_vid);
2501                    let writer: &uni_store::Writer = writer_lock.as_ref();
2502                    if !exists {
2503                        // Edge absent: create only the edge (endpoints are bound),
2504                        // then apply ON CREATE SET — identical to the general
2505                        // create branch below.
2506                        let seed_props = self
2507                            .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2508                            .await?;
2509                        self.execute_create_pattern(
2510                            &path_pattern,
2511                            &mut row,
2512                            writer,
2513                            prop_manager,
2514                            params,
2515                            ctx,
2516                            tx_l0_override,
2517                            Some(&seed_props),
2518                        )
2519                        .await?;
2520                        if let Some(set) = on_create {
2521                            self.execute_set_items_locked(
2522                                &set.items,
2523                                &mut row,
2524                                writer,
2525                                prop_manager,
2526                                params,
2527                                ctx,
2528                                tx_l0_override,
2529                                &Prefetch::default(),
2530                            )
2531                            .await?;
2532                        }
2533                    }
2534                    // Whether matched or just created, the edge now exists; bind
2535                    // path variables and emit the row (the edge is anonymous, so
2536                    // there is no edge binding to reproduce, and ON MATCH SET is
2537                    // excluded from this fast path).
2538                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2539                    results.push(row);
2540                    continue;
2541                }
2542                // Endpoints not bound to vids → fall through to the general path.
2543            }
2544
2545            // General execution: match-or-create per row. (The index fast path
2546            // above already handles single-node, single-label, scalar-indexed
2547            // MERGE — including unique-constrained labels, whose keys are
2548            // indexed — so there is no separate constraint-only fast path.)
2549            let matches = self
2550                .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2551                .await?;
2552            let writer: &uni_store::Writer = writer_lock.as_ref();
2553
2554            let result: Result<Vec<HashMap<String, Value>>> = async {
2555                let mut batch = Vec::new();
2556                if !matches.is_empty() {
2557                    for mut m in matches {
2558                        if let Some(set) = on_match {
2559                            self.execute_set_items_locked(
2560                                &set.items,
2561                                &mut m,
2562                                writer,
2563                                prop_manager,
2564                                params,
2565                                ctx,
2566                                tx_l0_override,
2567                                &Prefetch::default(),
2568                            )
2569                            .await?;
2570                        }
2571                        Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2572                        batch.push(m);
2573                    }
2574                } else {
2575                    // Fold ON CREATE SET into seed props so a NOT-NULL property
2576                    // set only by ON CREATE SET passes create-time validation
2577                    // (RC4); the post-create SET below settles the final values.
2578                    let seed_props = self
2579                        .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2580                        .await?;
2581                    self.execute_create_pattern(
2582                        &path_pattern,
2583                        &mut row,
2584                        writer,
2585                        prop_manager,
2586                        params,
2587                        ctx,
2588                        tx_l0_override,
2589                        Some(&seed_props),
2590                    )
2591                    .await?;
2592                    if let Some(set) = on_create {
2593                        self.execute_set_items_locked(
2594                            &set.items,
2595                            &mut row,
2596                            writer,
2597                            prop_manager,
2598                            params,
2599                            ctx,
2600                            tx_l0_override,
2601                            &Prefetch::default(),
2602                        )
2603                        .await?;
2604                    }
2605                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2606                    batch.push(row);
2607                }
2608                Ok(batch)
2609            }
2610            .await;
2611
2612            results.extend(result?);
2613        }
2614        Ok(results)
2615    }
2616
2617    /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2618    ///
2619    /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2620    /// only by `ON CREATE SET` is present when the MERGE node is created and
2621    /// passes constraint validation (RC4). The right-hand side is evaluated
2622    /// against the current `row`.
2623    ///
2624    /// Items whose right-hand side references the target variable (e.g.
2625    /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2626    /// let the post-create SET read the seeded value and apply the assignment
2627    /// twice. Such items run only post-create, exactly once (unchanged behavior).
2628    ///
2629    /// # Errors
2630    /// Returns an error if evaluating an assignment's right-hand side fails.
2631    pub(crate) async fn on_create_seed_props(
2632        &self,
2633        on_create: Option<&SetClause>,
2634        row: &HashMap<String, Value>,
2635        prop_manager: &PropertyManager,
2636        params: &HashMap<String, Value>,
2637        ctx: Option<&QueryContext>,
2638    ) -> Result<HashMap<String, HashMap<String, Value>>> {
2639        let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2640        let Some(set) = on_create else {
2641            return Ok(seed);
2642        };
2643        for item in &set.items {
2644            if let SetItem::Property { expr, value } = item
2645                && let Expr::Property(var_expr, prop_name) = expr
2646                && let Expr::Variable(var_name) = &**var_expr
2647                // Skip self-referential RHS so the post-create SET (which also
2648                // runs) applies it exactly once rather than reading the seed.
2649                && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2650                    value, var_name,
2651                )
2652            {
2653                let val = self
2654                    .evaluate_expr(value, row, prop_manager, params, ctx)
2655                    .await?;
2656                seed.entry(var_name.clone())
2657                    .or_default()
2658                    .insert(prop_name.clone(), val);
2659            }
2660        }
2661        Ok(seed)
2662    }
2663
2664    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2665    #[expect(clippy::too_many_arguments)]
2666    pub(crate) async fn execute_create_pattern(
2667        &self,
2668        pattern: &Pattern,
2669        row: &mut HashMap<String, Value>,
2670        writer: &Writer,
2671        prop_manager: &PropertyManager,
2672        params: &HashMap<String, Value>,
2673        ctx: Option<&QueryContext>,
2674        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2675        // Per-variable properties to gap-fill into newly-created nodes before
2676        // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2677        // NOT-NULL property supplied only by ON CREATE SET passes create-time
2678        // validation (RC4). `None` for plain CREATE.
2679        seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2680    ) -> Result<()> {
2681        for path in &pattern.paths {
2682            let mut prev_vid: Option<Vid> = None;
2683            // (rel_var, type_id, type_name, props_expr, direction)
2684            type PendingRel = (String, u32, String, Option<Expr>, Direction);
2685            let mut rel_pending: Option<PendingRel> = None;
2686
2687            for element in &path.elements {
2688                match element {
2689                    PatternElement::Node(n) => {
2690                        let mut vid = None;
2691
2692                        // Check if node variable already bound in row
2693                        if let Some(var) = &n.variable
2694                            && let Some(val) = row.get(var)
2695                            && let Ok(existing_vid) = Self::vid_from_value(val)
2696                        {
2697                            vid = Some(existing_vid);
2698                        }
2699
2700                        // If not bound, create it
2701                        if vid.is_none() {
2702                            let mut props = HashMap::new();
2703                            if let Some(props_expr) = &n.properties {
2704                                let props_val = self
2705                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2706                                    .await?;
2707                                if let Value::Map(map) = props_val {
2708                                    for (k, v) in map {
2709                                        props.insert(k, v);
2710                                    }
2711                                } else {
2712                                    return Err(anyhow!("Properties must evaluate to a map"));
2713                                }
2714                            }
2715
2716                            // MERGE ON CREATE SET: gap-fill properties supplied
2717                            // only by ON CREATE SET so a NOT-NULL property absent
2718                            // from the merge key passes create-time validation
2719                            // (RC4). `or_insert` keeps the merge-key/pattern props
2720                            // authoritative; the post-create SET re-applies the
2721                            // real values, so the final state is unchanged.
2722                            if let Some(seed) = seed_props
2723                                && let Some(var) = &n.variable
2724                                && let Some(var_seed) = seed.get(var)
2725                            {
2726                                for (k, v) in var_seed {
2727                                    props.entry(k.clone()).or_insert_with(|| v.clone());
2728                                }
2729                            }
2730
2731                            let schema = self.storage.schema_manager().schema();
2732
2733                            // Strict schema: reject undeclared labels.
2734                            if self.config.strict_schema {
2735                                for label_name in &n.labels {
2736                                    if schema.get_label_case_insensitive(label_name).is_none() {
2737                                        return Err(anyhow!(
2738                                            "Label '{}' is not defined in the schema \
2739                                             (strict_schema is enabled). \
2740                                             Declare it with db.schema().label(...).apply() first.",
2741                                            label_name
2742                                        ));
2743                                    }
2744                                }
2745                            }
2746
2747                            // VID generation is label-independent. Pull from the
2748                            // per-tx reservoir if set (amortizes the global
2749                            // IdAllocator mutex), else fall back to the direct
2750                            // per-VID path.
2751                            let new_vid = match &self.id_reservoir {
2752                                Some(r) => r.next_vid().await?,
2753                                None => writer.next_vid().await?,
2754                            };
2755
2756                            // Enrich with generated columns only for known labels
2757                            for label_name in &n.labels {
2758                                if schema.get_label_case_insensitive(label_name).is_some() {
2759                                    self.enrich_properties_with_generated_columns(
2760                                        label_name,
2761                                        &mut props,
2762                                        prop_manager,
2763                                        params,
2764                                        ctx,
2765                                    )
2766                                    .await?;
2767                                }
2768                            }
2769
2770                            // Validate/coerce against declared types AFTER enrichment, so
2771                            // a type mismatch is rejected here rather than silently nulled
2772                            // (and the row dropped) at flush — issue #68.
2773                            let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2774
2775                            // Insert vertex and get back final properties (includes auto-generated embeddings)
2776                            let final_props = writer
2777                                .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2778                                .await?;
2779
2780                            // Build node object with final properties (includes embeddings)
2781                            if let Some(var) = &n.variable {
2782                                let mut obj = HashMap::new();
2783                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2784                                let labels_list: Vec<Value> =
2785                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
2786                                obj.insert("_labels".to_string(), Value::List(labels_list));
2787                                for (k, v) in &final_props {
2788                                    obj.insert(k.clone(), v.clone());
2789                                }
2790                                // Store node as a Map with _vid, matching MATCH behavior
2791                                row.insert(var.clone(), Value::Map(obj));
2792                            }
2793                            vid = Some(new_vid);
2794                        }
2795
2796                        let current_vid = vid.unwrap();
2797
2798                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2799                            rel_pending.take()
2800                            && let Some(src) = prev_vid
2801                        {
2802                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2803
2804                            if !is_rel_bound {
2805                                let mut rel_props = HashMap::new();
2806                                if let Some(expr) = rel_props_expr {
2807                                    let val = self
2808                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
2809                                        .await?;
2810                                    if let Value::Map(map) = val {
2811                                        rel_props.extend(map);
2812                                    }
2813                                }
2814                                // Validate/coerce edge properties against the declared
2815                                // edge-type schema before storing — issue #68.
2816                                let edge_schema = self.storage.schema_manager().schema();
2817                                let rel_props = Self::coerce_and_validate_props(
2818                                    rel_props,
2819                                    &edge_schema,
2820                                    std::slice::from_ref(&type_name),
2821                                )?;
2822                                let eid = match &self.id_reservoir {
2823                                    Some(r) => r.next_eid().await?,
2824                                    None => writer.next_eid(type_id).await?,
2825                                };
2826
2827                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2828                                let (edge_src, edge_dst) = match dir {
2829                                    Direction::Incoming => (current_vid, src),
2830                                    _ => (src, current_vid),
2831                                };
2832
2833                                let store_props = !rel_var.is_empty();
2834                                let user_props = if store_props {
2835                                    rel_props.clone()
2836                                } else {
2837                                    HashMap::new()
2838                                };
2839
2840                                writer
2841                                    .insert_edge(
2842                                        edge_src,
2843                                        edge_dst,
2844                                        type_id,
2845                                        eid,
2846                                        rel_props,
2847                                        Some(type_name.clone()),
2848                                        tx_l0,
2849                                    )
2850                                    .await?;
2851
2852                                // Edge type name is now stored by insert_edge
2853
2854                                if store_props {
2855                                    let mut edge_map = HashMap::new();
2856                                    edge_map.insert(
2857                                        "_eid".to_string(),
2858                                        Value::Int(eid.as_u64() as i64),
2859                                    );
2860                                    edge_map.insert(
2861                                        "_src".to_string(),
2862                                        Value::Int(edge_src.as_u64() as i64),
2863                                    );
2864                                    edge_map.insert(
2865                                        "_dst".to_string(),
2866                                        Value::Int(edge_dst.as_u64() as i64),
2867                                    );
2868                                    edge_map
2869                                        .insert("_type".to_string(), Value::Int(type_id as i64));
2870                                    // Include user properties so downstream RETURN sees them
2871                                    for (k, v) in user_props {
2872                                        edge_map.insert(k, v);
2873                                    }
2874                                    row.insert(rel_var, Value::Map(edge_map));
2875                                }
2876                            }
2877                        }
2878                        prev_vid = Some(current_vid);
2879                    }
2880                    PatternElement::Relationship(r) => {
2881                        if r.types.len() != 1 {
2882                            return Err(anyhow!(
2883                                "CREATE relationship must specify exactly one type"
2884                            ));
2885                        }
2886                        let type_name = &r.types[0];
2887                        let type_id = if self.config.strict_schema {
2888                            let schema = self.storage.schema_manager().schema();
2889                            schema
2890                                .edge_type_id_by_name_case_insensitive(type_name)
2891                                .ok_or_else(|| {
2892                                    anyhow!(
2893                                        "Edge type '{}' is not defined in the schema \
2894                                         (strict_schema is enabled). \
2895                                         Declare it with db.schema().edge_type(...).apply() first.",
2896                                        type_name
2897                                    )
2898                                })?
2899                        } else {
2900                            // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2901                            self.storage
2902                                .schema_manager()
2903                                .get_or_assign_edge_type_id(type_name)
2904                        };
2905
2906                        rel_pending = Some((
2907                            r.variable.clone().unwrap_or_default(),
2908                            type_id,
2909                            type_name.clone(),
2910                            r.properties.clone(),
2911                            r.direction.clone(),
2912                        ));
2913                    }
2914                    PatternElement::Parenthesized { .. } => {
2915                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2916                    }
2917                }
2918            }
2919        }
2920        Ok(())
2921    }
2922
2923    /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2924    ///
2925    /// These are never valid OpenCypher property values regardless of the declared column
2926    /// type. A `CypherValue` column is the sole exception and is handled by the caller
2927    /// before this is reached.
2928    ///
2929    /// # Errors
2930    /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2931    fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2932        match val {
2933            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2934                anyhow::bail!(
2935                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2936                    prop_name
2937                );
2938            }
2939            Value::List(items) => {
2940                for item in items {
2941                    if matches!(
2942                        item,
2943                        Value::Map(_)
2944                            | Value::Node(_)
2945                            | Value::Edge(_)
2946                            | Value::Path(_)
2947                            | Value::List(_)
2948                    ) {
2949                        anyhow::bail!(
2950                            "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2951                            prop_name
2952                        );
2953                    }
2954                }
2955            }
2956            _ => {}
2957        }
2958        Ok(())
2959    }
2960
2961    /// Validates and coerces `val` against the declared schema type for `prop_name`.
2962    ///
2963    /// Returns the value to actually persist. Beyond the structural checks in
2964    /// [`Self::validate_structural_property_value`], this compares the value against the
2965    /// column's declared `DataType` and:
2966    ///
2967    /// - returns it unchanged when directly storable (including the intentional
2968    ///   `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
2969    /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
2970    ///   column into the proper `Temporal` value, using the same parser as the Cypher
2971    ///   `date()`/`time()`/`datetime()`/`duration()` constructors;
2972    /// - otherwise returns an error, so a type mismatch is surfaced at the call site
2973    ///   rather than silently nulled — and the row dropped at flush. See issue #68.
2974    ///
2975    /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
2976    /// behavior.
2977    ///
2978    /// # Errors
2979    /// Returns an error if the value's type is incompatible with the declared column type,
2980    /// or if a string destined for a temporal column is not a valid temporal literal.
2981    fn coerce_and_validate_property_value(
2982        prop_name: &str,
2983        val: Value,
2984        schema: &uni_common::core::schema::Schema,
2985        labels: &[String],
2986    ) -> Result<Value> {
2987        use uni_common::core::schema::DataType;
2988
2989        // Resolve the declared type from the first label that declares this property.
2990        let declared = labels.iter().find_map(|label| {
2991            schema
2992                .properties
2993                .get(label)
2994                .and_then(|props| props.get(prop_name))
2995                .map(|meta| &meta.r#type)
2996        });
2997
2998        // CypherValue columns accept any value (including maps) — skip all checks.
2999        if matches!(declared, Some(DataType::CypherValue)) {
3000            return Ok(val);
3001        }
3002
3003        let Some(dt) = declared else {
3004            // Schemaless property: reject structural values (maps/nodes/edges/paths and
3005            // lists containing them), otherwise store as-is.
3006            Self::validate_structural_property_value(prop_name, &val)?;
3007            return Ok(val);
3008        };
3009
3010        // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
3011        // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
3012        // `Vector`) receiving their matching value, and `Null` (always accepted).
3013        if dt.accepts(&val) {
3014            return Ok(val);
3015        }
3016
3017        // Known-safe coercion: a string into a temporal column is parsed as if it had
3018        // been wrapped in the matching Cypher temporal constructor.
3019        if matches!(val, Value::String(_)) {
3020            let ctor = match dt {
3021                DataType::DateTime => Some("DATETIME"),
3022                DataType::Date => Some("DATE"),
3023                DataType::Time => Some("TIME"),
3024                DataType::Duration => Some("DURATION"),
3025                _ => None,
3026            };
3027            if let Some(name) = ctor {
3028                return uni_query_functions::datetime::eval_datetime_function(
3029                    name,
3030                    std::slice::from_ref(&val),
3031                )
3032                .map_err(|e| {
3033                    anyhow!(
3034                        "TypeError: property '{}' is declared {:?} but the string value could \
3035                         not be parsed as a {} literal: {}",
3036                        prop_name,
3037                        dt,
3038                        name,
3039                        e
3040                    )
3041                });
3042            }
3043        }
3044
3045        // Not storable and not coercible. Prefer the structural message when the value
3046        // is itself structural (e.g. a map into a scalar column), preserving prior
3047        // behavior; otherwise report the scalar type mismatch.
3048        Self::validate_structural_property_value(prop_name, &val)?;
3049        anyhow::bail!(
3050            "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
3051            prop_name,
3052            dt,
3053            value_type_name(&val)
3054        );
3055    }
3056
3057    /// Coerces and validates every property in `props` against the declared types for `labels`.
3058    ///
3059    /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
3060    /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
3061    /// before handing properties to the writer, so a type mismatch is rejected up front rather
3062    /// than silently nulled — and the row dropped — at flush (issue #68).
3063    ///
3064    /// # Errors
3065    /// Returns an error on the first property whose value is incompatible with its declared type.
3066    fn coerce_and_validate_props(
3067        props: HashMap<String, Value>,
3068        schema: &uni_common::core::schema::Schema,
3069        labels: &[String],
3070    ) -> Result<HashMap<String, Value>> {
3071        let mut out = HashMap::with_capacity(props.len());
3072        for (k, v) in props {
3073            let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
3074            out.insert(k, cv);
3075        }
3076        Ok(out)
3077    }
3078
3079    #[expect(clippy::too_many_arguments)]
3080    pub(crate) async fn execute_set_items_locked(
3081        &self,
3082        items: &[SetItem],
3083        row: &mut HashMap<String, Value>,
3084        writer: &Writer,
3085        prop_manager: &PropertyManager,
3086        params: &HashMap<String, Value>,
3087        ctx: Option<&QueryContext>,
3088        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3089        prefetched: &Prefetch,
3090    ) -> Result<()> {
3091        // Coalesce SetItem::Property items by target so we do ONE read + ONE
3092        // write per (variable, target) instead of one read-modify-write cycle
3093        // per item. For an UPDATE that sets N properties on the same vertex
3094        // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
3095        // n.confidence = ...`), this collapses N redundant
3096        // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
3097        // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
3098        // the measurement, and the plan in
3099        // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
3100        // for the rationale.
3101        //
3102        // RHS evaluation order is preserved: we evaluate each RHS inline and
3103        // update the row binding immediately, so a later SetItem on the same
3104        // variable that reads `n.<earlier-prop>` sees the new value.
3105        //
3106        // Non-Property variants (Labels, Variable, VariablePlus) are less
3107        // common and have lower payoff; before processing one, we flush any
3108        // pending updates for the same variable so it sees the latest L0
3109        // state and ordering semantics are preserved.
3110        let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
3111        let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
3112
3113        for item in items {
3114            match item {
3115                SetItem::Property { expr, value } => {
3116                    if let Expr::Property(var_expr, prop_name) = expr
3117                        && let Expr::Variable(var_name) = &**var_expr
3118                        && let Some(node_val) = row.get(var_name)
3119                    {
3120                        if let Ok(vid) = Self::vid_from_value(node_val) {
3121                            reject_if_ephemeral_vid(vid)?;
3122                            let labels =
3123                                Self::extract_labels_from_node(node_val).unwrap_or_default();
3124                            let schema = self.storage.schema_manager().schema().clone();
3125
3126                            // Lazy one-time read. Always read the full row
3127                            // (preserves CRDT merge + constraint validation
3128                            // + scan-side L0 visibility). The
3129                            // partial-lance-writes optimization happens
3130                            // PURELY AT FLUSH TIME via the per-VID
3131                            // `vertex_partial_keys` set tracked in L0 — so
3132                            // L0 holds the full row, scans see the full
3133                            // row, and Lance only receives the touched
3134                            // columns. Generated-column-bearing labels
3135                            // ride the partial path too (Round 12 §C):
3136                            // `enrich_properties_with_generated_columns`
3137                            // runs at flush time over the merged-in-L0
3138                            // full row, and the produced generator keys
3139                            // are appended to `touched` so they land in
3140                            // the MergeInsert source.
3141                            if !pending_v.contains_key(var_name) {
3142                                let storage_cfg = &self.storage.config;
3143                                let partial = storage_cfg.partial_lance_writes;
3144                                let read = read_vertex_props_with_prefetch(
3145                                    vid,
3146                                    prefetched,
3147                                    prop_manager,
3148                                    ctx,
3149                                )
3150                                .await?;
3151                                pending_v.insert(
3152                                    var_name.clone(),
3153                                    PendingVertexSet {
3154                                        vid,
3155                                        labels: labels.clone(),
3156                                        props: read,
3157                                        partial,
3158                                        touched: HashSet::new(),
3159                                    },
3160                                );
3161                            }
3162
3163                            let val = self
3164                                .evaluate_expr(value, row, prop_manager, params, ctx)
3165                                .await?;
3166                            let val = Self::coerce_and_validate_property_value(
3167                                prop_name, val, &schema, &labels,
3168                            )?;
3169
3170                            let pv = pending_v
3171                                .get_mut(var_name)
3172                                .expect("inserted above when absent");
3173                            pv.props.insert(prop_name.clone(), val.clone());
3174                            if pv.partial {
3175                                pv.touched.insert(prop_name.clone());
3176                            }
3177
3178                            // Update the row binding so subsequent RHS sees the new value.
3179                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3180                                node_map.insert(prop_name.clone(), val);
3181                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
3182                                node.properties.insert(prop_name.clone(), val);
3183                            }
3184                        } else if let Value::Map(map) = node_val
3185                            && map.get("_eid").is_some_and(|v| !v.is_null())
3186                            && map.get("_src").is_some_and(|v| !v.is_null())
3187                            && map.get("_dst").is_some_and(|v| !v.is_null())
3188                            && (map.get("_type").is_some_and(|v| !v.is_null())
3189                                || map.get("_type_name").is_some_and(|v| !v.is_null()))
3190                        {
3191                            let ei = self.extract_edge_identity(map)?;
3192                            reject_if_ephemeral_eid(ei.eid)?;
3193                            let schema = self.storage.schema_manager().schema().clone();
3194                            // Handle _type as either String or Int (Int from CREATE, String
3195                            // from queries). UNWIND on VLP edge lists emits `_type_name`
3196                            // instead of `_type`; accept either.
3197                            let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3198                            let edge_type_name = match type_val {
3199                                Some(Value::String(s)) => s.clone(),
3200                                Some(Value::Int(id)) => schema
3201                                    .edge_type_name_by_id_unified(*id as u32)
3202                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
3203                                _ => String::new(),
3204                            };
3205
3206                            if !pending_e.contains_key(var_name) {
3207                                let initial = read_edge_props_with_prefetch(
3208                                    ei.eid,
3209                                    prefetched,
3210                                    prop_manager,
3211                                    ctx,
3212                                )
3213                                .await?;
3214                                let partial = self.storage.config.partial_lance_writes;
3215                                pending_e.insert(
3216                                    var_name.clone(),
3217                                    PendingEdgeSet {
3218                                        src: ei.src,
3219                                        dst: ei.dst,
3220                                        edge_type_id: ei.edge_type_id,
3221                                        eid: ei.eid,
3222                                        edge_type_name: edge_type_name.clone(),
3223                                        props: initial,
3224                                        partial,
3225                                        touched: HashSet::new(),
3226                                    },
3227                                );
3228                            }
3229
3230                            let val = self
3231                                .evaluate_expr(value, row, prop_manager, params, ctx)
3232                                .await?;
3233                            let val = Self::coerce_and_validate_property_value(
3234                                prop_name,
3235                                val,
3236                                &schema,
3237                                std::slice::from_ref(&edge_type_name),
3238                            )?;
3239
3240                            let pe = pending_e
3241                                .get_mut(var_name)
3242                                .expect("inserted above when absent");
3243                            pe.props.insert(prop_name.clone(), val.clone());
3244                            if pe.partial {
3245                                pe.touched.insert(prop_name.clone());
3246                            }
3247
3248                            // Update the row object so subsequent RHS sees the new value.
3249                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3250                                edge_map.insert(prop_name.clone(), val);
3251                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3252                                edge.properties.insert(prop_name.clone(), val);
3253                            }
3254                        } else if let Value::Edge(edge) = node_val {
3255                            // Handle Value::Edge directly (when traverse returns Edge objects).
3256                            reject_if_ephemeral_eid(edge.eid)?;
3257                            let eid = edge.eid;
3258                            let src = edge.src;
3259                            let dst = edge.dst;
3260                            let edge_type_name = edge.edge_type.clone();
3261                            let etype =
3262                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3263                            let schema = self.storage.schema_manager().schema().clone();
3264
3265                            if !pending_e.contains_key(var_name) {
3266                                let initial = read_edge_props_with_prefetch(
3267                                    eid,
3268                                    prefetched,
3269                                    prop_manager,
3270                                    ctx,
3271                                )
3272                                .await?;
3273                                let partial = self.storage.config.partial_lance_writes;
3274                                pending_e.insert(
3275                                    var_name.clone(),
3276                                    PendingEdgeSet {
3277                                        src,
3278                                        dst,
3279                                        edge_type_id: etype,
3280                                        eid,
3281                                        edge_type_name: edge_type_name.clone(),
3282                                        props: initial,
3283                                        partial,
3284                                        touched: HashSet::new(),
3285                                    },
3286                                );
3287                            }
3288
3289                            let val = self
3290                                .evaluate_expr(value, row, prop_manager, params, ctx)
3291                                .await?;
3292                            let val = Self::coerce_and_validate_property_value(
3293                                prop_name,
3294                                val,
3295                                &schema,
3296                                std::slice::from_ref(&edge_type_name),
3297                            )?;
3298
3299                            let pe = pending_e
3300                                .get_mut(var_name)
3301                                .expect("inserted above when absent");
3302                            pe.props.insert(prop_name.clone(), val.clone());
3303                            if pe.partial {
3304                                pe.touched.insert(prop_name.clone());
3305                            }
3306
3307                            // Update the row object so subsequent RHS sees the new value.
3308                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3309                                edge.properties.insert(prop_name.clone(), val);
3310                            }
3311                        }
3312                    }
3313                }
3314                SetItem::Labels { variable, labels } => {
3315                    // Flush any pending writes for this var so the Labels op
3316                    // sees latest L0 state. Other variables' pending writes
3317                    // can keep waiting (they're independent).
3318                    self.flush_pending_var(
3319                        variable,
3320                        &mut pending_v,
3321                        &mut pending_e,
3322                        writer,
3323                        prop_manager,
3324                        params,
3325                        ctx,
3326                        tx_l0,
3327                        prefetched,
3328                    )
3329                    .await?;
3330
3331                    if let Some(node_val) = row.get(variable)
3332                        && let Ok(vid) = Self::vid_from_value(node_val)
3333                    {
3334                        reject_if_ephemeral_vid(vid)?;
3335                        let registry = self
3336                            .procedure_registry
3337                            .as_ref()
3338                            .and_then(|pr| pr.plugin_registry());
3339                        reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3340
3341                        // Get current labels from node value
3342                        let current_labels =
3343                            Self::extract_labels_from_node(node_val).unwrap_or_default();
3344
3345                        // Determine new labels to add (skip duplicates)
3346                        let labels_to_add: Vec<_> = labels
3347                            .iter()
3348                            .filter(|l| !current_labels.contains(l))
3349                            .cloned()
3350                            .collect();
3351
3352                        if !labels_to_add.is_empty() {
3353                            // Resolve the FULL new label set and write it to the
3354                            // TRANSACTION buffer (so the change is transactional
3355                            // and OCC-conflictable), falling back to the context
3356                            // (main) L0 for non-transactional callers. Replace
3357                            // semantics via `set_vertex_labels`.
3358                            let mut new_labels = current_labels;
3359                            new_labels.extend(labels_to_add);
3360                            if let Some(ctx) = ctx {
3361                                let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3362                                l0.write().set_vertex_labels(vid, &new_labels);
3363                            }
3364
3365                            // Update the node value in the row with the new labels.
3366                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
3367                                let labels_list =
3368                                    new_labels.into_iter().map(Value::String).collect();
3369                                obj.insert("_labels".to_string(), Value::List(labels_list));
3370                            }
3371                        }
3372                    }
3373                }
3374                SetItem::Variable { variable, value }
3375                | SetItem::VariablePlus { variable, value } => {
3376                    // Flush this var's pending writes first so the
3377                    // replace/merge op sees them as latest L0 state.
3378                    self.flush_pending_var(
3379                        variable,
3380                        &mut pending_v,
3381                        &mut pending_e,
3382                        writer,
3383                        prop_manager,
3384                        params,
3385                        ctx,
3386                        tx_l0,
3387                        prefetched,
3388                    )
3389                    .await?;
3390
3391                    let replace = matches!(item, SetItem::Variable { .. });
3392                    let op_str = if replace { "=" } else { "+=" };
3393
3394                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3395                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3396                        continue;
3397                    }
3398                    let rhs = self
3399                        .evaluate_expr(value, row, prop_manager, params, ctx)
3400                        .await?;
3401                    let new_props =
3402                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3403                            anyhow!(
3404                                "SET {} {} expr: right-hand side must evaluate to a map, \
3405                                 node, or relationship",
3406                                variable,
3407                                op_str
3408                            )
3409                        })?;
3410                    self.apply_properties_to_entity(
3411                        variable,
3412                        new_props,
3413                        replace,
3414                        row,
3415                        writer,
3416                        prop_manager,
3417                        params,
3418                        ctx,
3419                        tx_l0,
3420                        prefetched,
3421                    )
3422                    .await?;
3423                }
3424            }
3425        }
3426
3427        // Flush all remaining coalesced writes — one writer call per target.
3428        // Partial entries (no generated columns) call
3429        // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3430        // but the touched-keys hint drives a MergeInsert at flush. Full
3431        // entries continue through the legacy
3432        // `insert_vertex_with_labels` (Append) path with
3433        // generated-column enrichment.
3434        for (_var_name, mut pv) in pending_v {
3435            if pv.partial {
3436                // Round 12 §C: run the generator enrichment over the
3437                // merged-in-L0 full row, then add the produced generator
3438                // keys to `touched` so they ride the MergeInsert source.
3439                // Idempotent — generators always recompute against the
3440                // post-merge property map.
3441                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3442                for label_name in &pv.labels {
3443                    self.enrich_properties_with_generated_columns(
3444                        label_name,
3445                        &mut pv.props,
3446                        prop_manager,
3447                        params,
3448                        ctx,
3449                    )
3450                    .await?;
3451                }
3452                for k in pv.props.keys() {
3453                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3454                        pv.touched.insert(k.clone());
3455                    }
3456                }
3457                writer
3458                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3459                    .await?;
3460            } else {
3461                for label_name in &pv.labels {
3462                    self.enrich_properties_with_generated_columns(
3463                        label_name,
3464                        &mut pv.props,
3465                        prop_manager,
3466                        params,
3467                        ctx,
3468                    )
3469                    .await?;
3470                }
3471                let _ = writer
3472                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3473                    .await?;
3474            }
3475        }
3476        for (_var_name, pe) in pending_e {
3477            if pe.partial {
3478                writer
3479                    .insert_edge_partial_full(
3480                        pe.src,
3481                        pe.dst,
3482                        pe.edge_type_id,
3483                        pe.eid,
3484                        pe.props,
3485                        Some(pe.edge_type_name),
3486                        pe.touched,
3487                        tx_l0,
3488                    )
3489                    .await?;
3490            } else {
3491                writer
3492                    .insert_edge(
3493                        pe.src,
3494                        pe.dst,
3495                        pe.edge_type_id,
3496                        pe.eid,
3497                        pe.props,
3498                        Some(pe.edge_type_name),
3499                        tx_l0,
3500                    )
3501                    .await?;
3502            }
3503        }
3504
3505        Ok(())
3506    }
3507
3508    /// Flush pending SET state for a single variable to the writer.
3509    ///
3510    /// Called from the SET loop when about to process a Labels /
3511    /// Variable / VariablePlus item on `var`, so the subsequent op
3512    /// sees latest L0 state and ordering is preserved.
3513    #[expect(clippy::too_many_arguments)]
3514    async fn flush_pending_var(
3515        &self,
3516        var: &str,
3517        pending_v: &mut HashMap<String, PendingVertexSet>,
3518        pending_e: &mut HashMap<String, PendingEdgeSet>,
3519        writer: &Writer,
3520        prop_manager: &PropertyManager,
3521        _params: &HashMap<String, Value>,
3522        ctx: Option<&QueryContext>,
3523        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3524        _prefetched: &Prefetch,
3525    ) -> Result<()> {
3526        if let Some(mut pv) = pending_v.remove(var) {
3527            if pv.partial {
3528                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3529                for label_name in &pv.labels {
3530                    self.enrich_properties_with_generated_columns(
3531                        label_name,
3532                        &mut pv.props,
3533                        prop_manager,
3534                        _params,
3535                        ctx,
3536                    )
3537                    .await?;
3538                }
3539                for k in pv.props.keys() {
3540                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3541                        pv.touched.insert(k.clone());
3542                    }
3543                }
3544                writer
3545                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3546                    .await?;
3547            } else {
3548                for label_name in &pv.labels {
3549                    self.enrich_properties_with_generated_columns(
3550                        label_name,
3551                        &mut pv.props,
3552                        prop_manager,
3553                        _params,
3554                        ctx,
3555                    )
3556                    .await?;
3557                }
3558                let _ = writer
3559                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3560                    .await?;
3561            }
3562        }
3563        if let Some(pe) = pending_e.remove(var) {
3564            if pe.partial {
3565                writer
3566                    .insert_edge_partial_full(
3567                        pe.src,
3568                        pe.dst,
3569                        pe.edge_type_id,
3570                        pe.eid,
3571                        pe.props,
3572                        Some(pe.edge_type_name),
3573                        pe.touched,
3574                        tx_l0,
3575                    )
3576                    .await?;
3577            } else {
3578                writer
3579                    .insert_edge(
3580                        pe.src,
3581                        pe.dst,
3582                        pe.edge_type_id,
3583                        pe.eid,
3584                        pe.props,
3585                        Some(pe.edge_type_name),
3586                        tx_l0,
3587                    )
3588                    .await?;
3589            }
3590        }
3591        Ok(())
3592    }
3593
3594    /// Execute REMOVE clause items (property removal or label removal).
3595    ///
3596    /// Property removals are batched per variable to avoid stale reads: when
3597    /// multiple properties of the same entity are removed in one REMOVE clause,
3598    /// we read from storage once, null all specified properties, and write back
3599    /// once. This prevents the second removal from reading stale data that
3600    /// doesn't reflect the first removal's L0 write.
3601    #[expect(clippy::too_many_arguments)]
3602    pub(crate) async fn execute_remove_items_locked(
3603        &self,
3604        items: &[RemoveItem],
3605        row: &mut HashMap<String, Value>,
3606        writer: &Writer,
3607        prop_manager: &PropertyManager,
3608        ctx: Option<&QueryContext>,
3609        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3610        prefetched: &Prefetch,
3611    ) -> Result<()> {
3612        // Collect property names to remove, grouped by variable.
3613        // Use Vec<(String, Vec<String>)> to preserve insertion order.
3614        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3615
3616        for item in items {
3617            match item {
3618                RemoveItem::Property(expr) => {
3619                    if let Expr::Property(var_expr, prop_name) = expr
3620                        && let Expr::Variable(var_name) = &**var_expr
3621                    {
3622                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3623                            entry.1.push(prop_name.clone());
3624                        } else {
3625                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3626                        }
3627                    }
3628                }
3629                RemoveItem::Labels { variable, labels } => {
3630                    self.execute_remove_labels(variable, labels, row, ctx)?;
3631                }
3632            }
3633        }
3634
3635        // Execute batched property removals per variable.
3636        for (var_name, prop_names) in &prop_removals {
3637            let Some(node_val) = row.get(var_name) else {
3638                continue;
3639            };
3640
3641            if let Ok(vid) = Self::vid_from_value(node_val) {
3642                // Vertex property removal
3643                let mut props =
3644                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3645
3646                // Only write back if at least one property actually exists
3647                let removed_count = prop_names
3648                    .iter()
3649                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3650                    .count();
3651                let any_exist = removed_count > 0;
3652                if any_exist {
3653                    writer.track_properties_removed(removed_count, tx_l0);
3654                    for prop_name in prop_names {
3655                        props.insert(prop_name.clone(), Value::Null);
3656                    }
3657                }
3658                // Compute effective properties (post-removal) for _all_props
3659                let effective: HashMap<String, Value> = props
3660                    .iter()
3661                    .filter(|(_, v)| !v.is_null())
3662                    .map(|(k, v)| (k.clone(), v.clone()))
3663                    .collect();
3664                if any_exist {
3665                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3666                    let _ = writer
3667                        .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3668                        .await?;
3669                }
3670
3671                // Update the row map: set removed props to Null
3672                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3673                    for prop_name in prop_names {
3674                        node_map.insert(prop_name.clone(), Value::Null);
3675                    }
3676                    // Set _all_props to the complete effective property set
3677                    node_map.insert("_all_props".to_string(), Value::Map(effective));
3678                }
3679            } else if let Value::Map(map) = node_val {
3680                // Edge property removal (map-encoded)
3681                // Check for non-null _eid to skip OPTIONAL MATCH null edges
3682                let mut edge_effective: Option<HashMap<String, Value>> = None;
3683                if map.get("_eid").is_some_and(|v| !v.is_null()) {
3684                    let ei = self.extract_edge_identity(map)?;
3685                    let mut props =
3686                        read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3687                            .await?;
3688
3689                    let removed_count = prop_names
3690                        .iter()
3691                        .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3692                        .count();
3693                    let any_exist = removed_count > 0;
3694                    if any_exist {
3695                        writer.track_properties_removed(removed_count, tx_l0);
3696                        for prop_name in prop_names {
3697                            props.insert(prop_name.to_string(), Value::Null);
3698                        }
3699                    }
3700                    // Compute effective properties (post-removal) for _all_props
3701                    edge_effective = Some(
3702                        props
3703                            .iter()
3704                            .filter(|(_, v)| !v.is_null())
3705                            .map(|(k, v)| (k.clone(), v.clone()))
3706                            .collect(),
3707                    );
3708                    if any_exist {
3709                        let edge_type_name = map
3710                            .get("_type")
3711                            .and_then(|v| v.as_str())
3712                            .map(|s| s.to_string())
3713                            .or_else(|| {
3714                                self.storage
3715                                    .schema_manager()
3716                                    .edge_type_name_by_id_unified(ei.edge_type_id)
3717                            });
3718                        writer
3719                            .insert_edge(
3720                                ei.src,
3721                                ei.dst,
3722                                ei.edge_type_id,
3723                                ei.eid,
3724                                props,
3725                                edge_type_name,
3726                                tx_l0,
3727                            )
3728                            .await?;
3729                    }
3730                }
3731
3732                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3733                    for prop_name in prop_names {
3734                        edge_map.insert(prop_name.clone(), Value::Null);
3735                    }
3736                    if let Some(effective) = edge_effective {
3737                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
3738                    }
3739                }
3740            } else if let Value::Edge(edge) = node_val {
3741                // Edge property removal (Value::Edge)
3742                let eid = edge.eid;
3743                let src = edge.src;
3744                let dst = edge.dst;
3745                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3746
3747                let mut props =
3748                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3749
3750                let removed_count = prop_names
3751                    .iter()
3752                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3753                    .count();
3754                if removed_count > 0 {
3755                    writer.track_properties_removed(removed_count, tx_l0);
3756                    for prop_name in prop_names {
3757                        props.insert(prop_name.to_string(), Value::Null);
3758                    }
3759                    writer
3760                        .insert_edge(
3761                            src,
3762                            dst,
3763                            etype,
3764                            eid,
3765                            props,
3766                            Some(edge.edge_type.clone()),
3767                            tx_l0,
3768                        )
3769                        .await?;
3770                }
3771
3772                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3773                    for prop_name in prop_names {
3774                        edge.properties.insert(prop_name.to_string(), Value::Null);
3775                    }
3776                }
3777            }
3778        }
3779
3780        Ok(())
3781    }
3782
3783    /// Execute label removal.
3784    pub(crate) fn execute_remove_labels(
3785        &self,
3786        variable: &str,
3787        labels: &[String],
3788        row: &mut HashMap<String, Value>,
3789        ctx: Option<&QueryContext>,
3790    ) -> Result<()> {
3791        if let Some(node_val) = row.get(variable)
3792            && let Ok(vid) = Self::vid_from_value(node_val)
3793        {
3794            reject_if_ephemeral_vid(vid)?;
3795            let registry = self
3796                .procedure_registry
3797                .as_ref()
3798                .and_then(|pr| pr.plugin_registry());
3799            reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3800
3801            // Get current labels from node value
3802            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3803
3804            // Determine which labels to actually remove (only those currently present)
3805            let labels_to_remove: Vec<_> = labels
3806                .iter()
3807                .filter(|l| current_labels.contains(l))
3808                .collect();
3809
3810            if !labels_to_remove.is_empty() {
3811                // Resolve the FULL remaining label set and write it to the
3812                // TRANSACTION buffer (transactional + OCC-conflictable), falling
3813                // back to the context (main) L0 for non-transactional callers.
3814                let remaining_labels: Vec<String> = current_labels
3815                    .iter()
3816                    .filter(|l| !labels_to_remove.contains(l))
3817                    .cloned()
3818                    .collect();
3819                if let Some(ctx) = ctx {
3820                    let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3821                    l0.write().set_vertex_labels(vid, &remaining_labels);
3822                }
3823
3824                // Update the node value in the row with the remaining labels.
3825                if let Some(Value::Map(obj)) = row.get_mut(variable) {
3826                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3827                    obj.insert("_labels".to_string(), Value::List(labels_list));
3828                }
3829            }
3830        }
3831        Ok(())
3832    }
3833
3834    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3835    /// by looking up the type from the L0 buffer's edge endpoints.
3836    fn resolve_edge_type_id_for_edge(
3837        &self,
3838        edge: &crate::types::Edge,
3839        writer: &Writer,
3840        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3841    ) -> Result<u32> {
3842        if !edge.edge_type.is_empty() {
3843            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3844        }
3845        // Edge type name is empty (e.g., from anonymous MATCH patterns).
3846        // Look up the edge type ID from the L0 buffer's edge endpoints.
3847        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3848            return Ok(etype);
3849        }
3850        Err(anyhow!(
3851            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3852            edge.eid
3853        ))
3854    }
3855
3856    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3857    pub(crate) async fn execute_delete_item_locked(
3858        &self,
3859        val: &Value,
3860        detach: bool,
3861        writer: &Writer,
3862        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3863    ) -> Result<()> {
3864        match val {
3865            Value::Null => {
3866                // DELETE null is a no-op per OpenCypher spec
3867            }
3868            Value::Path(path) => {
3869                // Delete path edges first, then nodes
3870                for edge in &path.edges {
3871                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3872                    writer
3873                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3874                        .await?;
3875                }
3876                for node in &path.nodes {
3877                    self.execute_delete_vertex(
3878                        node.vid,
3879                        detach,
3880                        Some(node.labels.clone()),
3881                        writer,
3882                        tx_l0,
3883                    )
3884                    .await?;
3885                }
3886            }
3887            _ => {
3888                // Try Path reconstruction from Map first (Arrow loses Path type)
3889                if let Ok(path) = Path::try_from(val) {
3890                    for edge in &path.edges {
3891                        let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3892                        writer
3893                            .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3894                            .await?;
3895                    }
3896                    for node in &path.nodes {
3897                        self.execute_delete_vertex(
3898                            node.vid,
3899                            detach,
3900                            Some(node.labels.clone()),
3901                            writer,
3902                            tx_l0,
3903                        )
3904                        .await?;
3905                    }
3906                } else if let Ok(vid) = Self::vid_from_value(val) {
3907                    let labels = Self::extract_labels_from_node(val);
3908                    self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3909                        .await?;
3910                } else if let Value::Map(map) = val {
3911                    self.execute_delete_edge_from_map(map, writer, tx_l0)
3912                        .await?;
3913                } else if let Value::Edge(edge) = val {
3914                    reject_if_ephemeral_eid(edge.eid)?;
3915                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3916                    let registry = self
3917                        .procedure_registry
3918                        .as_ref()
3919                        .and_then(|pr| pr.plugin_registry());
3920                    reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3921                    writer
3922                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3923                        .await?;
3924                }
3925            }
3926        }
3927        Ok(())
3928    }
3929
3930    /// Execute vertex deletion with optional detach.
3931    pub(crate) async fn execute_delete_vertex(
3932        &self,
3933        vid: Vid,
3934        detach: bool,
3935        labels: Option<Vec<String>>,
3936        writer: &Writer,
3937        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3938    ) -> Result<()> {
3939        reject_if_ephemeral_vid(vid)?;
3940        if let Some(ls) = labels.as_deref() {
3941            let registry = self
3942                .procedure_registry
3943                .as_ref()
3944                .and_then(|pr| pr.plugin_registry());
3945            reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3946        }
3947        if detach {
3948            self.detach_delete_vertex(vid, writer, tx_l0).await?;
3949        } else {
3950            self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3951        }
3952        writer.delete_vertex(vid, labels, tx_l0).await?;
3953        Ok(())
3954    }
3955
3956    /// Check that a vertex has no edges (required for non-DETACH DELETE).
3957    ///
3958    /// Loads the subgraph from storage, then excludes edges that have been
3959    /// tombstoned in the writer's L0 or the transaction's L0. This ensures
3960    /// edges deleted earlier in the same DELETE clause are properly excluded.
3961    pub(crate) async fn check_vertex_has_no_edges(
3962        &self,
3963        vid: Vid,
3964        writer: &Writer,
3965        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3966    ) -> Result<()> {
3967        let schema = self.storage.schema_manager().schema();
3968        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
3969
3970        // Collect tombstoned edge IDs from both the writer L0 and tx L0.
3971        let mut tombstoned_eids = std::collections::HashSet::new();
3972        {
3973            let writer_l0 = writer.l0_manager.get_current();
3974            let guard = writer_l0.read();
3975            for &eid in guard.tombstones.keys() {
3976                tombstoned_eids.insert(eid);
3977            }
3978        }
3979        if let Some(tx) = tx_l0 {
3980            let guard = tx.read();
3981            for &eid in guard.tombstones.keys() {
3982                tombstoned_eids.insert(eid);
3983            }
3984        }
3985
3986        let out_graph = self
3987            .storage
3988            .load_subgraph_cached(
3989                &[vid],
3990                &edge_type_ids,
3991                1,
3992                uni_store::runtime::Direction::Outgoing,
3993                Some(writer.l0_manager.get_current()),
3994            )
3995            .await?;
3996        let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3997
3998        let in_graph = self
3999            .storage
4000            .load_subgraph_cached(
4001                &[vid],
4002                &edge_type_ids,
4003                1,
4004                uni_store::runtime::Direction::Incoming,
4005                Some(writer.l0_manager.get_current()),
4006            )
4007            .await?;
4008        let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4009
4010        if has_out || has_in {
4011            return Err(anyhow!(
4012                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
4013                vid
4014            ));
4015        }
4016        Ok(())
4017    }
4018
4019    /// Execute edge deletion from a map representation.
4020    pub(crate) async fn execute_delete_edge_from_map(
4021        &self,
4022        map: &HashMap<String, Value>,
4023        writer: &Writer,
4024        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4025    ) -> Result<()> {
4026        // Check for non-null _eid to skip OPTIONAL MATCH null edges
4027        if map.get("_eid").is_some_and(|v| !v.is_null()) {
4028            let ei = self.extract_edge_identity(map)?;
4029            reject_if_ephemeral_eid(ei.eid)?;
4030            let registry = self
4031                .procedure_registry
4032                .as_ref()
4033                .and_then(|pr| pr.plugin_registry());
4034            reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
4035            writer
4036                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
4037                .await?;
4038        }
4039        Ok(())
4040    }
4041
4042    /// Build a scan plan node.
4043    ///
4044    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
4045    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
4046    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
4047    fn make_scan_plan(
4048        label_id: u16,
4049        labels: Vec<String>,
4050        variable: String,
4051        filter: Option<Expr>,
4052    ) -> LogicalPlan {
4053        if label_id > 0 {
4054            LogicalPlan::Scan {
4055                label_id,
4056                labels,
4057                variable,
4058                filter,
4059                optional: false,
4060            }
4061        } else if !labels.is_empty() {
4062            // Schemaless label: use ScanMainByLabels to filter by label name
4063            LogicalPlan::ScanMainByLabels {
4064                labels,
4065                variable,
4066                filter,
4067                optional: false,
4068            }
4069        } else {
4070            LogicalPlan::ScanAll {
4071                variable,
4072                filter,
4073                optional: false,
4074            }
4075        }
4076    }
4077
4078    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
4079    /// already contains prior operators.
4080    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
4081        if matches!(plan, LogicalPlan::Empty) {
4082            scan
4083        } else {
4084            LogicalPlan::CrossJoin {
4085                left: Box::new(plan),
4086                right: Box::new(scan),
4087            }
4088        }
4089    }
4090
4091    /// Resolve MERGE property map expressions against the current row context.
4092    ///
4093    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
4094    /// property expressions that reference bound variables. These need to be
4095    /// evaluated to concrete literal values before being converted to filter
4096    /// expressions by `properties_to_expr()`.
4097    async fn resolve_merge_properties(
4098        &self,
4099        properties: &Option<Expr>,
4100        row: &HashMap<String, Value>,
4101        prop_manager: &PropertyManager,
4102        params: &HashMap<String, Value>,
4103        ctx: Option<&QueryContext>,
4104    ) -> Result<Option<Expr>> {
4105        let entries = match properties {
4106            Some(Expr::Map(entries)) => entries,
4107            other => return Ok(other.clone()),
4108        };
4109        let mut resolved = Vec::new();
4110        for (key, val_expr) in entries {
4111            if matches!(val_expr, Expr::Literal(_)) {
4112                resolved.push((key.clone(), val_expr.clone()));
4113            } else {
4114                let value = self
4115                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
4116                    .await?;
4117                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
4118            }
4119        }
4120        Ok(Some(Expr::Map(resolved)))
4121    }
4122
4123    /// Convert a runtime Value back to an AST literal expression.
4124    fn value_to_literal_expr(value: &Value) -> Expr {
4125        match value {
4126            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
4127            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
4128            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
4129            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
4130            Value::Null => Expr::Literal(CypherLiteral::Null),
4131            Value::List(items) => {
4132                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
4133            }
4134            Value::Map(entries) => Expr::Map(
4135                entries
4136                    .iter()
4137                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
4138                    .collect(),
4139            ),
4140            _ => Expr::Literal(CypherLiteral::Null),
4141        }
4142    }
4143
4144    pub(crate) async fn execute_merge_match(
4145        &self,
4146        pattern: &Pattern,
4147        row: &HashMap<String, Value>,
4148        prop_manager: &PropertyManager,
4149        params: &HashMap<String, Value>,
4150        ctx: Option<&QueryContext>,
4151    ) -> Result<Vec<HashMap<String, Value>>> {
4152        // Construct a LogicalPlan for the MATCH part of MERGE
4153        let planner =
4154            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
4155
4156        // We need to construct a CypherQuery to use the planner's plan() method,
4157        // or we can manually construct the LogicalPlan.
4158        // Manual construction is safer as we don't have to round-trip through AST.
4159
4160        let mut plan = LogicalPlan::Empty;
4161        let mut vars_in_scope = Vec::new();
4162
4163        // Add existing bound variables from row to scope
4164        for key in row.keys() {
4165            vars_in_scope.push(key.clone());
4166        }
4167
4168        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
4169        for path in &pattern.paths {
4170            let elements = &path.elements;
4171            let mut i = 0;
4172            while i < elements.len() {
4173                let part = &elements[i];
4174                match part {
4175                    PatternElement::Node(n) => {
4176                        let variable = n.variable.clone().unwrap_or_default();
4177
4178                        // If variable is already bound in the input row, we filter
4179                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
4180
4181                        if is_bound {
4182                            // If bound, we must Scan this specific VID to start the chain
4183                            // Extract VID from row
4184                            let val = row.get(&variable).unwrap();
4185                            let vid = Self::vid_from_value(val)?;
4186
4187                            // In the new storage model, VIDs don't embed label info.
4188                            // We get label from the node value if available, otherwise use 0 to scan all.
4189                            let extracted_labels =
4190                                Self::extract_labels_from_node(val).unwrap_or_default();
4191                            let label_id = {
4192                                let schema = self.storage.schema_manager().schema();
4193                                extracted_labels
4194                                    .first()
4195                                    .and_then(|l| schema.label_id_by_name(l))
4196                                    .unwrap_or(0)
4197                            };
4198
4199                            let resolved_props = self
4200                                .resolve_merge_properties(
4201                                    &n.properties,
4202                                    row,
4203                                    prop_manager,
4204                                    params,
4205                                    ctx,
4206                                )
4207                                .await?;
4208                            let prop_filter =
4209                                planner.properties_to_expr(&variable, &resolved_props);
4210
4211                            // Create a filter expression for VID: variable._vid = vid
4212                            // But our expression engine handles `Expr::Variable` as column.
4213                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
4214                            // Or we use internal property `_vid`.
4215
4216                            // Note: Scan supports `filter`.
4217                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4218
4219                            let vid_filter = Expr::BinaryOp {
4220                                left: Box::new(Expr::Property(
4221                                    Box::new(Expr::Variable(variable.clone())),
4222                                    "_vid".to_string(),
4223                                )),
4224                                op: BinaryOp::Eq,
4225                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
4226                                    vid.as_u64() as i64,
4227                                ))),
4228                            };
4229
4230                            let combined_filter = if let Some(pf) = prop_filter {
4231                                Some(Expr::BinaryOp {
4232                                    left: Box::new(vid_filter),
4233                                    op: BinaryOp::And,
4234                                    right: Box::new(pf),
4235                                })
4236                            } else {
4237                                Some(vid_filter)
4238                            };
4239
4240                            let scan = Self::make_scan_plan(
4241                                label_id,
4242                                extracted_labels,
4243                                variable.clone(),
4244                                combined_filter,
4245                            );
4246                            plan = Self::attach_scan(plan, scan);
4247                        } else {
4248                            let label_id = if n.labels.is_empty() {
4249                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4250                                0
4251                            } else {
4252                                let label_name = &n.labels[0];
4253                                let schema = self.storage.schema_manager().schema();
4254                                if self.config.strict_schema {
4255                                    schema
4256                                        .get_label_case_insensitive(label_name)
4257                                        .map(|m| m.id)
4258                                        .ok_or_else(|| {
4259                                            anyhow!(
4260                                                "Label '{}' is not defined in the schema \
4261                                                 (strict_schema is enabled). \
4262                                                 Declare it with db.schema().label(...).apply() first.",
4263                                                label_name
4264                                            )
4265                                        })?
4266                                } else {
4267                                    // Fall back to label_id 0 (any/schemaless) when not in schema.
4268                                    schema
4269                                        .get_label_case_insensitive(label_name)
4270                                        .map(|m| m.id)
4271                                        .unwrap_or(0)
4272                                }
4273                            };
4274
4275                            let resolved_props = self
4276                                .resolve_merge_properties(
4277                                    &n.properties,
4278                                    row,
4279                                    prop_manager,
4280                                    params,
4281                                    ctx,
4282                                )
4283                                .await?;
4284                            let prop_filter =
4285                                planner.properties_to_expr(&variable, &resolved_props);
4286                            let scan = Self::make_scan_plan(
4287                                label_id,
4288                                n.labels.names().to_vec(),
4289                                variable.clone(),
4290                                prop_filter,
4291                            );
4292                            plan = Self::attach_scan(plan, scan);
4293
4294                            // Add label filters when:
4295                            // 1. Multiple labels with a known schema label: filter for
4296                            //    additional labels (Scan only scans by the first label).
4297                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4298                            //    nodes, so we must filter to only those with the
4299                            //    specified label(s).
4300                            if !n.labels.is_empty()
4301                                && !variable.is_empty()
4302                                && (label_id == 0 || n.labels.len() > 1)
4303                                && let Some(label_filter) =
4304                                    planner.node_filter_expr(&variable, &n.labels, &None)
4305                            {
4306                                plan = LogicalPlan::Filter {
4307                                    input: Box::new(plan),
4308                                    predicate: label_filter,
4309                                    optional_variables: std::collections::HashSet::new(),
4310                                };
4311                            }
4312
4313                            if !variable.is_empty() {
4314                                vars_in_scope.push(variable.clone());
4315                            }
4316                        }
4317
4318                        // Now look ahead for relationship
4319                        i += 1;
4320                        while i < elements.len() {
4321                            if let PatternElement::Relationship(r) = &elements[i] {
4322                                let target_node_part = &elements[i + 1];
4323                                if let PatternElement::Node(n_target) = target_node_part {
4324                                    let schema = self.storage.schema_manager().schema();
4325                                    let mut edge_type_ids = Vec::new();
4326
4327                                    if r.types.is_empty() {
4328                                        return Err(anyhow!("MERGE edge must have a type"));
4329                                    } else if r.types.len() > 1 {
4330                                        return Err(anyhow!(
4331                                            "MERGE does not support multiple edge types"
4332                                        ));
4333                                    } else {
4334                                        let type_name = &r.types[0];
4335                                        let type_id = if self.config.strict_schema {
4336                                            let s = self.storage.schema_manager().schema();
4337                                            s.edge_type_id_by_name_case_insensitive(type_name)
4338                                                .ok_or_else(|| {
4339                                                    anyhow!(
4340                                                        "Edge type '{}' is not defined in the schema \
4341                                                         (strict_schema is enabled).",
4342                                                        type_name
4343                                                    )
4344                                                })?
4345                                        } else {
4346                                            // Schemaless: assign new ID if not found.
4347                                            self.storage
4348                                                .schema_manager()
4349                                                .get_or_assign_edge_type_id(type_name)
4350                                        };
4351                                        edge_type_ids.push(type_id);
4352                                    }
4353
4354                                    // Resolve target label ID. For schemaless labels (not in the
4355                                    // schema), fall back to 0 which means "any label" in traversal.
4356                                    let target_label_id: u16 = if let Some(lbl) =
4357                                        n_target.labels.first()
4358                                    {
4359                                        schema
4360                                            .get_label_case_insensitive(lbl)
4361                                            .map(|m| m.id)
4362                                            .unwrap_or(0)
4363                                    } else if let Some(var) = &n_target.variable {
4364                                        if let Some(val) = row.get(var) {
4365                                            // In the new storage model, get labels from node value
4366                                            if let Some(labels) =
4367                                                Self::extract_labels_from_node(val)
4368                                            {
4369                                                if let Some(first_label) = labels.first() {
4370                                                    schema
4371                                                        .get_label_case_insensitive(first_label)
4372                                                        .map(|m| m.id)
4373                                                        .unwrap_or(0)
4374                                                } else {
4375                                                    // Bound node with no labels — schemaless, any
4376                                                    0
4377                                                }
4378                                            } else if Self::vid_from_value(val).is_ok() {
4379                                                // VID without label info — schemaless, any
4380                                                0
4381                                            } else {
4382                                                return Err(anyhow!(
4383                                                    "Variable {} is not a node",
4384                                                    var
4385                                                ));
4386                                            }
4387                                        } else {
4388                                            return Err(anyhow!(
4389                                                "MERGE pattern node must have a label or be a bound variable"
4390                                            ));
4391                                        }
4392                                    } else {
4393                                        return Err(anyhow!(
4394                                            "MERGE pattern node must have a label"
4395                                        ));
4396                                    };
4397
4398                                    let target_variable =
4399                                        n_target.variable.clone().unwrap_or_default();
4400                                    let source_variable = match &elements[i - 1] {
4401                                        PatternElement::Node(n) => {
4402                                            n.variable.clone().unwrap_or_default()
4403                                        }
4404                                        _ => String::new(),
4405                                    };
4406
4407                                    let is_variable_length = r.range.is_some();
4408                                    let type_name = &r.types[0];
4409
4410                                    // Use TraverseMainByType for schemaless edge types
4411                                    // (same as MATCH planner) so edge properties are loaded
4412                                    // correctly from storage + L0 via the adjacency map.
4413                                    // Regular Traverse only loads properties via
4414                                    // property_manager which doesn't handle schemaless types.
4415                                    let is_schemaless = edge_type_ids.iter().all(|id| {
4416                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
4417                                    });
4418
4419                                    if is_schemaless {
4420                                        plan = LogicalPlan::TraverseMainByType {
4421                                            type_names: vec![type_name.clone()],
4422                                            input: Box::new(plan),
4423                                            direction: r.direction.clone(),
4424                                            source_variable,
4425                                            target_variable: target_variable.clone(),
4426                                            step_variable: r.variable.clone(),
4427                                            min_hops: r
4428                                                .range
4429                                                .as_ref()
4430                                                .and_then(|r| r.min)
4431                                                .unwrap_or(1)
4432                                                as usize,
4433                                            max_hops: r
4434                                                .range
4435                                                .as_ref()
4436                                                .and_then(|r| r.max)
4437                                                .unwrap_or(1)
4438                                                as usize,
4439                                            optional: false,
4440                                            target_filter: None,
4441                                            path_variable: None,
4442                                            is_variable_length,
4443                                            optional_pattern_vars: std::collections::HashSet::new(),
4444                                            scope_match_variables: std::collections::HashSet::new(),
4445                                            edge_filter_expr: None,
4446                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4447                                        };
4448                                    } else {
4449                                        // Collect edge property names needed for MERGE filter
4450                                        let mut edge_props = std::collections::HashSet::new();
4451                                        if let Some(Expr::Map(entries)) = &r.properties {
4452                                            for (key, _) in entries {
4453                                                edge_props.insert(key.clone());
4454                                            }
4455                                        }
4456                                        plan = LogicalPlan::Traverse {
4457                                            input: Box::new(plan),
4458                                            edge_type_ids: edge_type_ids.clone(),
4459                                            direction: r.direction.clone(),
4460                                            source_variable,
4461                                            target_variable: target_variable.clone(),
4462                                            target_label_id,
4463                                            step_variable: r.variable.clone(),
4464                                            min_hops: r
4465                                                .range
4466                                                .as_ref()
4467                                                .and_then(|r| r.min)
4468                                                .unwrap_or(1)
4469                                                as usize,
4470                                            max_hops: r
4471                                                .range
4472                                                .as_ref()
4473                                                .and_then(|r| r.max)
4474                                                .unwrap_or(1)
4475                                                as usize,
4476                                            optional: false,
4477                                            target_filter: None,
4478                                            path_variable: None,
4479                                            edge_properties: edge_props,
4480                                            is_variable_length,
4481                                            optional_pattern_vars: std::collections::HashSet::new(),
4482                                            scope_match_variables: std::collections::HashSet::new(),
4483                                            edge_filter_expr: None,
4484                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4485                                            qpp_steps: None,
4486                                        };
4487                                    }
4488
4489                                    // Apply property filters for relationship
4490                                    if r.properties.is_some()
4491                                        && let Some(r_var) = &r.variable
4492                                    {
4493                                        let resolved_rel_props = self
4494                                            .resolve_merge_properties(
4495                                                &r.properties,
4496                                                row,
4497                                                prop_manager,
4498                                                params,
4499                                                ctx,
4500                                            )
4501                                            .await?;
4502                                        if let Some(prop_filter) =
4503                                            planner.properties_to_expr(r_var, &resolved_rel_props)
4504                                        {
4505                                            plan = LogicalPlan::Filter {
4506                                                input: Box::new(plan),
4507                                                predicate: prop_filter,
4508                                                optional_variables: std::collections::HashSet::new(
4509                                                ),
4510                                            };
4511                                        }
4512                                    }
4513
4514                                    // Apply property filters for target node if it was new
4515                                    if !target_variable.is_empty() {
4516                                        let resolved_target_props = self
4517                                            .resolve_merge_properties(
4518                                                &n_target.properties,
4519                                                row,
4520                                                prop_manager,
4521                                                params,
4522                                                ctx,
4523                                            )
4524                                            .await?;
4525                                        if let Some(prop_filter) = planner.properties_to_expr(
4526                                            &target_variable,
4527                                            &resolved_target_props,
4528                                        ) {
4529                                            plan = LogicalPlan::Filter {
4530                                                input: Box::new(plan),
4531                                                predicate: prop_filter,
4532                                                optional_variables: std::collections::HashSet::new(
4533                                                ),
4534                                            };
4535                                        }
4536                                        vars_in_scope.push(target_variable.clone());
4537                                    }
4538
4539                                    if let Some(sv) = &r.variable {
4540                                        vars_in_scope.push(sv.clone());
4541                                    }
4542                                    i += 2;
4543                                } else {
4544                                    break;
4545                                }
4546                            } else {
4547                                break;
4548                            }
4549                        }
4550                    }
4551                    _ => return Err(anyhow!("Pattern must start with a node")),
4552                }
4553            }
4554
4555            // Execute the plan to find all matches, then filter against bound variables in `row`.
4556        }
4557
4558        let db_matches = self
4559            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4560            .await?;
4561
4562        // Keep only DB results that are consistent with the input row bindings.
4563        // Skip internal keys (starting with "__") as they are implementation
4564        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4565        // Also skip the empty-string key (""), which is the placeholder variable
4566        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4567        // and must not constrain the current pattern's match.
4568        let final_matches = db_matches
4569            .into_iter()
4570            .filter(|db_match| {
4571                row.iter().all(|(key, val)| {
4572                    if key.is_empty() || key.starts_with("__") {
4573                        return true;
4574                    }
4575                    let Some(db_val) = db_match.get(key) else {
4576                        return true;
4577                    };
4578                    if db_val == val {
4579                        return true;
4580                    }
4581                    // Values differ -- treat as consistent if they represent the same VID
4582                    matches!(
4583                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4584                        (Ok(v1), Ok(v2)) if v1 == v2
4585                    )
4586                })
4587            })
4588            .map(|db_match| {
4589                let mut merged = row.clone();
4590                merged.extend(db_match);
4591                merged
4592            })
4593            .collect();
4594
4595        Ok(final_matches)
4596    }
4597
4598    /// Prepare a MERGE pattern for path variable binding.
4599    ///
4600    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4601    /// unnamed relationships need internal variable names so that `execute_create_pattern`
4602    /// stores the edge data in the row for later path construction.
4603    ///
4604    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4605    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4606        let has_path_vars = pattern
4607            .paths
4608            .iter()
4609            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4610
4611        if !has_path_vars {
4612            return (pattern.clone(), Vec::new());
4613        }
4614
4615        let mut modified = pattern.clone();
4616        let mut temp_vars = Vec::new();
4617
4618        for path in &mut modified.paths {
4619            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4620                continue;
4621            }
4622            for (idx, element) in path.elements.iter_mut().enumerate() {
4623                if let PatternElement::Relationship(r) = element
4624                    && r.variable.as_ref().is_none_or(String::is_empty)
4625                {
4626                    let temp_var = format!("__path_r_{}", idx);
4627                    r.variable = Some(temp_var.clone());
4628                    temp_vars.push(temp_var);
4629                }
4630            }
4631        }
4632
4633        (modified, temp_vars)
4634    }
4635
4636    /// Bind path variables in the result row based on the MERGE pattern.
4637    ///
4638    /// Walks each path in the pattern, collects node/edge values from the row
4639    /// by variable name, and constructs a `Value::Path`.
4640    fn bind_path_variables(
4641        pattern: &Pattern,
4642        row: &mut HashMap<String, Value>,
4643        temp_vars: &[String],
4644    ) {
4645        for path in &pattern.paths {
4646            let Some(path_var) = path.variable.as_ref() else {
4647                continue;
4648            };
4649            if path_var.is_empty() {
4650                continue;
4651            }
4652
4653            let mut nodes = Vec::new();
4654            let mut edges = Vec::new();
4655
4656            for element in &path.elements {
4657                match element {
4658                    PatternElement::Node(n) => {
4659                        if let Some(var) = &n.variable
4660                            && let Some(val) = row.get(var)
4661                            && let Some(node) = Self::value_to_node_for_path(val)
4662                        {
4663                            nodes.push(node);
4664                        }
4665                    }
4666                    PatternElement::Relationship(r) => {
4667                        if let Some(var) = &r.variable
4668                            && let Some(val) = row.get(var)
4669                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4670                        {
4671                            edges.push(edge);
4672                        }
4673                    }
4674                    _ => {}
4675                }
4676            }
4677
4678            if !nodes.is_empty() {
4679                use uni_common::value::Path;
4680                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4681            }
4682        }
4683
4684        // Clean up internal temp variables
4685        for var in temp_vars {
4686            row.remove(var);
4687        }
4688    }
4689
4690    /// Convert a Value (Map or Node) to a Node for path construction.
4691    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4692        match val {
4693            Value::Node(n) => Some(n.clone()),
4694            Value::Map(map) => {
4695                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4696                let labels = if let Some(Value::List(l)) = map.get("_labels") {
4697                    l.iter()
4698                        .filter_map(|v| {
4699                            if let Value::String(s) = v {
4700                                Some(s.clone())
4701                            } else {
4702                                None
4703                            }
4704                        })
4705                        .collect()
4706                } else {
4707                    vec![]
4708                };
4709                let properties: HashMap<String, Value> = map
4710                    .iter()
4711                    .filter(|(k, _)| !k.starts_with('_'))
4712                    .map(|(k, v)| (k.clone(), v.clone()))
4713                    .collect();
4714                Some(uni_common::value::Node {
4715                    vid,
4716                    labels,
4717                    properties,
4718                })
4719            }
4720            _ => None,
4721        }
4722    }
4723
4724    /// Convert a Value (Map or Edge) to an Edge for path construction.
4725    fn value_to_edge_for_path(
4726        val: &Value,
4727        type_names: &[String],
4728    ) -> Option<uni_common::value::Edge> {
4729        match val {
4730            Value::Edge(e) => Some(e.clone()),
4731            Value::Map(map) => {
4732                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4733                let edge_type = map
4734                    .get("_type_name")
4735                    .and_then(|v| {
4736                        if let Value::String(s) = v {
4737                            Some(s.clone())
4738                        } else {
4739                            None
4740                        }
4741                    })
4742                    .or_else(|| type_names.first().cloned())
4743                    .unwrap_or_default();
4744                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4745                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4746                let properties: HashMap<String, Value> = map
4747                    .iter()
4748                    .filter(|(k, _)| !k.starts_with('_'))
4749                    .map(|(k, v)| (k.clone(), v.clone()))
4750                    .collect();
4751                Some(uni_common::value::Edge {
4752                    eid,
4753                    edge_type,
4754                    src,
4755                    dst,
4756                    properties,
4757                })
4758            }
4759            _ => None,
4760        }
4761    }
4762}
4763
4764/// Read a vertex's full property map, preferring `prefetched` over a fresh
4765/// per-row `Backend::scan`.
4766///
4767/// `prefetched` is built once at the top of `apply_mutations` via
4768/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4769/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4770/// same `apply_mutations` invocation (counter increments, same-VID
4771/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4772/// storage state at SET entry. On a miss, fall back to the existing
4773/// per-row path; this preserves correctness for newly created VIDs,
4774/// schemaless rows, multi-label corner cases, and non-Mutation callers
4775/// that pass `&Prefetch::default()`.
4776pub(crate) async fn read_vertex_props_with_prefetch(
4777    vid: Vid,
4778    prefetched: &Prefetch,
4779    prop_manager: &PropertyManager,
4780    ctx: Option<&QueryContext>,
4781) -> Result<uni_common::Properties> {
4782    match prefetched.vertex.get(&vid).cloned() {
4783        Some(mut base) => {
4784            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4785                for (k, v) in l0 {
4786                    base.insert(k, v);
4787                }
4788            }
4789            Ok(base)
4790        }
4791        None => Ok(prop_manager
4792            .get_all_vertex_props_with_ctx(vid, ctx)
4793            .await?
4794            .unwrap_or_default()),
4795    }
4796}
4797
4798/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4799/// in L0 edge props so writes from earlier rows of the same
4800/// `apply_mutations` invocation take precedence. On a miss, fall back to
4801/// the per-EID storage path.
4802pub(crate) async fn read_edge_props_with_prefetch(
4803    eid: Eid,
4804    prefetched: &Prefetch,
4805    prop_manager: &PropertyManager,
4806    ctx: Option<&QueryContext>,
4807) -> Result<uni_common::Properties> {
4808    match prefetched.edge.get(&eid).cloned() {
4809        Some(mut base) => {
4810            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4811                for (k, v) in l0 {
4812                    base.insert(k, v);
4813                }
4814            }
4815            Ok(base)
4816        }
4817        None => Ok(prop_manager
4818            .get_all_edge_props_with_ctx(eid, ctx)
4819            .await?
4820            .unwrap_or_default()),
4821    }
4822}
4823
4824#[cfg(test)]
4825mod tests {
4826    use super::*;
4827
4828    // ── merge_props tests ────────────────────────────────────────────
4829
4830    #[test]
4831    fn test_merge_props_replace_tombstones_missing_keys() {
4832        let current: HashMap<String, Value> = [
4833            ("name".into(), Value::String("Alice".into())),
4834            ("age".into(), Value::Int(30)),
4835        ]
4836        .into();
4837        let incoming: HashMap<String, Value> =
4838            [("name".into(), Value::String("Bob".into()))].into();
4839
4840        let result = Executor::merge_props(current, incoming, true);
4841        assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4842        assert_eq!(
4843            result.get("age"),
4844            Some(&Value::Null),
4845            "Missing keys should be tombstoned in replace mode"
4846        );
4847    }
4848
4849    #[test]
4850    fn test_merge_props_merge_preserves_existing() {
4851        let current: HashMap<String, Value> = [
4852            ("name".into(), Value::String("Alice".into())),
4853            ("age".into(), Value::Int(30)),
4854        ]
4855        .into();
4856        let incoming: HashMap<String, Value> =
4857            [("city".into(), Value::String("NYC".into()))].into();
4858
4859        let result = Executor::merge_props(current, incoming, false);
4860        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4861        assert_eq!(result.get("age"), Some(&Value::Int(30)));
4862        assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4863    }
4864
4865    #[test]
4866    fn test_merge_props_null_incoming_is_tombstone() {
4867        let current: HashMap<String, Value> =
4868            [("name".into(), Value::String("Alice".into()))].into();
4869        let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4870
4871        // Merge mode: null overwrites
4872        let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4873        assert_eq!(result.get("name"), Some(&Value::Null));
4874
4875        // Replace mode: null is tombstone
4876        let result = Executor::merge_props(current, incoming, true);
4877        assert_eq!(result.get("name"), Some(&Value::Null));
4878    }
4879
4880    #[test]
4881    fn test_merge_props_empty_current() {
4882        let current: HashMap<String, Value> = HashMap::new();
4883        let incoming: HashMap<String, Value> =
4884            [("name".into(), Value::String("Alice".into()))].into();
4885
4886        let result = Executor::merge_props(current, incoming, false);
4887        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4888        assert_eq!(result.len(), 1);
4889    }
4890
4891    #[test]
4892    fn test_merge_props_empty_incoming_replace_tombstones_all() {
4893        let current: HashMap<String, Value> = [
4894            ("name".into(), Value::String("Alice".into())),
4895            ("age".into(), Value::Int(30)),
4896        ]
4897        .into();
4898        let incoming: HashMap<String, Value> = HashMap::new();
4899
4900        let result = Executor::merge_props(current, incoming, true);
4901        assert_eq!(result.get("name"), Some(&Value::Null));
4902        assert_eq!(result.get("age"), Some(&Value::Null));
4903    }
4904
4905    // ── extract_labels_from_node tests ───────────────────────────────
4906
4907    #[test]
4908    fn test_extract_labels_from_map() {
4909        let mut map = HashMap::new();
4910        map.insert("_vid".into(), Value::Int(1));
4911        map.insert(
4912            "_labels".into(),
4913            Value::List(vec![
4914                Value::String("Person".into()),
4915                Value::String("Employee".into()),
4916            ]),
4917        );
4918        let val = Value::Map(map);
4919
4920        let labels = Executor::extract_labels_from_node(&val);
4921        assert_eq!(
4922            labels,
4923            Some(vec!["Person".to_string(), "Employee".to_string()])
4924        );
4925    }
4926
4927    #[test]
4928    fn test_extract_labels_from_value_node() {
4929        let node = uni_common::Node {
4930            vid: uni_common::core::id::Vid::from(1u64),
4931            labels: vec!["Person".to_string()],
4932            properties: HashMap::new(),
4933        };
4934        let labels = Executor::extract_labels_from_node(&Value::Node(node));
4935        assert_eq!(labels, Some(vec!["Person".to_string()]));
4936    }
4937
4938    #[test]
4939    fn test_extract_labels_non_node_returns_none() {
4940        assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4941        assert_eq!(
4942            Executor::extract_labels_from_node(&Value::String("hello".into())),
4943            None
4944        );
4945    }
4946
4947    // ── extract_user_properties_from_value tests ─────────────────────
4948
4949    #[test]
4950    fn test_extract_user_props_strips_internal_keys() {
4951        let mut map = HashMap::new();
4952        map.insert("_vid".into(), Value::Int(1));
4953        map.insert(
4954            "_labels".into(),
4955            Value::List(vec![Value::String("Person".into())]),
4956        );
4957        map.insert("name".into(), Value::String("Alice".into()));
4958        map.insert("age".into(), Value::Int(30));
4959
4960        let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4961        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4962        assert_eq!(props.get("age"), Some(&Value::Int(30)));
4963        assert!(!props.contains_key("_vid"));
4964        assert!(!props.contains_key("_labels"));
4965    }
4966
4967    #[test]
4968    fn test_extract_user_props_plain_map_returns_as_is() {
4969        let mut map = HashMap::new();
4970        map.insert("key".into(), Value::String("value".into()));
4971
4972        let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
4973        assert_eq!(props, map);
4974    }
4975
4976    #[test]
4977    fn test_extract_user_props_from_value_node() {
4978        let mut properties = HashMap::new();
4979        properties.insert("name".into(), Value::String("Alice".into()));
4980        let node = uni_common::Node {
4981            vid: uni_common::core::id::Vid::from(1u64),
4982            labels: vec!["Person".to_string()],
4983            properties,
4984        };
4985        let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
4986        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4987    }
4988}