Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17    DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18    SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32    eid: Eid,
33    src: Vid,
34    dst: Vid,
35    edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44    vid: Vid,
45    labels: Vec<String>,
46    /// Full property map (storage union L0 from
47    /// `get_all_vertex_props_with_ctx` plus the touched values applied
48    /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49    /// tells the flush which columns to send to Lance via MergeInsert.
50    props: HashMap<String, Value>,
51    /// `true` when the SET should flush via the partial-column MergeInsert
52    /// path: set when `UniConfig::partial_lance_writes` is on AND the
53    /// label has no generated columns. Generated-column labels still need
54    /// the full-row Append so the regenerated values land.
55    partial: bool,
56    /// Set of property keys touched by this statement. Threaded into L0
57    /// so the flush emits a `MergeInsertBuilder` source with exactly
58    /// these columns. Empty when `partial == false`.
59    touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64    src: Vid,
65    dst: Vid,
66    edge_type_id: u32,
67    eid: Eid,
68    edge_type_name: String,
69    /// `true` when the SET should flush via the partial-column
70    /// MergeInsert path on the per-edge-type delta tables (Round 12
71    /// §A). Set when `UniConfig::partial_lance_writes` is on.
72    partial: bool,
73    /// Property keys touched by this statement. Threaded into L0 so
74    /// the flush emits a `MergeInsertBuilder` source with exactly
75    /// these columns. Empty when `partial == false`.
76    touched: HashSet<String>,
77    props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84    if vid.is_ephemeral() {
85        return Err(anyhow::Error::from(
86            uni_common::UniError::EphemeralWriteAttempt {
87                kind: "node",
88                id: vid.transient_id().unwrap_or(vid.as_u64()),
89            },
90        ));
91    }
92    Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97    match val {
98        Value::Null => "Null",
99        Value::Bool(_) => "Bool",
100        Value::Int(_) => "Int",
101        Value::Float(_) => "Float",
102        Value::String(_) => "String",
103        Value::Bytes(_) => "Bytes",
104        Value::List(_) => "List",
105        Value::Map(_) => "Map",
106        Value::Node(_) => "Node",
107        Value::Edge(_) => "Edge",
108        Value::Path(_) => "Path",
109        Value::Vector(_) => "Vector",
110        Value::Temporal(_) => "Temporal",
111        _ => "value",
112    }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117    if eid.is_ephemeral() {
118        return Err(anyhow::Error::from(
119            uni_common::UniError::EphemeralWriteAttempt {
120                kind: "edge",
121                id: eid.transient_id().unwrap_or(eid.as_u64()),
122            },
123        ));
124    }
125    Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150    labels: &[String],
151    op: &str,
152) -> Result<()> {
153    let Some(registry) = registry else {
154        return Ok(());
155    };
156    for label in labels {
157        if registry.virtual_label_by_name(label).is_some() {
158            return Err(anyhow!(
159                "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160                 labels are read-only; write back via the originating catalog instead"
161            ));
162        }
163    }
164    Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178    edge_type_id: u32,
179    op: &str,
180) -> Result<()> {
181    let Some(registry) = registry else {
182        return Ok(());
183    };
184    if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185        return Err(anyhow!(
186            "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187             types are read-only; write back via the originating catalog instead",
188            entry.name
189        ));
190    }
191    Ok(())
192}
193
194impl Executor {
195    /// Extracts labels from a node value.
196    ///
197    /// Handles both `Value::Map` (with a `_labels` list field) and
198    /// `Value::Node` (with a `labels` vec field).
199    ///
200    /// Returns `None` when the value is not a node or has no labels.
201    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202        match node_val {
203            Value::Map(map) => {
204                // Map-encoded node: look for _labels array
205                if let Some(Value::List(labels_arr)) = map.get("_labels") {
206                    let labels: Vec<String> = labels_arr
207                        .iter()
208                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
209                        .collect();
210                    if !labels.is_empty() {
211                        return Some(labels);
212                    }
213                }
214                None
215            }
216            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217            _ => None,
218        }
219    }
220
221    /// Extracts user-visible properties from a value that represents a node or edge.
222    ///
223    /// Strips internal bookkeeping keys (those prefixed with `_` or named
224    /// `ext_id`) from map-encoded entities and returns only the user-facing
225    /// property key-value pairs.
226    ///
227    /// Returns `None` when `val` is not a map, node, or edge.
228    pub(crate) fn extract_user_properties_from_value(
229        val: &Value,
230    ) -> Option<HashMap<String, Value>> {
231        match val {
232            Value::Map(map) => {
233                // Distinguish entity-encoded maps from plain map literals.
234                // A node map has both `_vid` and `_labels`.
235                // An edge map has `_eid`, `_src`, and `_dst`.
236                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237                let is_edge_map = map.contains_key("_eid")
238                    && map.contains_key("_src")
239                    && map.contains_key("_dst");
240
241                if is_node_map || is_edge_map {
242                    // Filter out internal bookkeeping keys
243                    let user_props: HashMap<String, Value> = map
244                        .iter()
245                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246                        .map(|(k, v)| (k.clone(), v.clone()))
247                        .collect();
248                    // When mutation output omits dotted property columns, user
249                    // properties live inside `_all_props` rather than at the
250                    // top level of the entity map.
251                    if user_props.is_empty()
252                        && let Some(Value::Map(all_props)) = map.get("_all_props")
253                    {
254                        return Some(all_props.clone());
255                    }
256                    Some(user_props)
257                } else {
258                    // Plain map literal — return as-is
259                    Some(map.clone())
260                }
261            }
262            Value::Node(node) => Some(node.properties.clone()),
263            Value::Edge(edge) => Some(edge.properties.clone()),
264            _ => None,
265        }
266    }
267
268    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269    ///
270    /// When `replace` is `true` the entity's property set is replaced: keys absent
271    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272    /// layer removes them.  When `replace` is `false` the map is merged: keys in
273    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275    /// modes.
276    ///
277    /// Labels are never altered — the spec states that `SET n = map` replaces
278    /// properties only.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the entity cannot be found in the storage layer, or
283    /// if the writer fails to persist the updated properties.
284    #[expect(clippy::too_many_arguments)]
285    async fn apply_properties_to_entity(
286        &self,
287        variable: &str,
288        new_props: HashMap<String, Value>,
289        replace: bool,
290        row: &mut HashMap<String, Value>,
291        writer: &Writer,
292        prop_manager: &PropertyManager,
293        params: &HashMap<String, Value>,
294        ctx: Option<&QueryContext>,
295        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296        prefetched: &Prefetch,
297    ) -> Result<()> {
298        // Clone the target so we can hold &row references elsewhere.
299        let target = row.get(variable).cloned();
300
301        // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302        // forms, mirroring the per-property SET path (issue #68).
303        let schema = self.storage.schema_manager().schema();
304
305        match target {
306            Some(Value::Node(ref node)) => {
307                let vid = node.vid;
308                let labels = node.labels.clone();
309                let current =
310                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311                let write_props = Self::merge_props(current, new_props, replace);
312                let mut enriched = write_props.clone();
313                for label_name in &labels {
314                    self.enrich_properties_with_generated_columns(
315                        label_name,
316                        &mut enriched,
317                        prop_manager,
318                        params,
319                        ctx,
320                    )
321                    .await?;
322                }
323                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324                let _ = writer
325                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326                    .await?;
327                // Update the in-memory row binding
328                if let Some(Value::Node(n)) = row.get_mut(variable) {
329                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330                }
331            }
332            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333                let vid = Self::vid_from_value(node_val)?;
334                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335                let current =
336                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337                let write_props = Self::merge_props(current, new_props, replace);
338                let mut enriched = write_props.clone();
339                for label_name in &labels {
340                    self.enrich_properties_with_generated_columns(
341                        label_name,
342                        &mut enriched,
343                        prop_manager,
344                        params,
345                        ctx,
346                    )
347                    .await?;
348                }
349                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350                let _ = writer
351                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352                    .await?;
353                // Update the in-memory map-encoded node binding
354                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355                    // Remove old user property keys, keep internal fields
356                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357                    // Build effective (non-null) properties
358                    let effective: HashMap<String, Value> =
359                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360                    for (k, v) in &effective {
361                        node_map.insert(k.clone(), v.clone());
362                    }
363                    // Replace _all_props to reflect the complete property set
364                    node_map.insert("_all_props".to_string(), Value::Map(effective));
365                }
366            }
367            Some(Value::Edge(ref edge)) => {
368                let eid = edge.eid;
369                let src = edge.src;
370                let dst = edge.dst;
371                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372                let current =
373                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374                let write_props = Self::merge_props(current, new_props, replace);
375                let write_props = Self::coerce_and_validate_props(
376                    write_props,
377                    &schema,
378                    std::slice::from_ref(&edge.edge_type),
379                )?;
380                writer
381                    .insert_edge(
382                        src,
383                        dst,
384                        etype,
385                        eid,
386                        write_props.clone(),
387                        Some(edge.edge_type.clone()),
388                        tx_l0,
389                    )
390                    .await?;
391                // Update the in-memory row binding
392                if let Some(Value::Edge(e)) = row.get_mut(variable) {
393                    e.properties = write_props
394                        .into_iter()
395                        .filter(|(_, v)| !v.is_null())
396                        .collect();
397                }
398            }
399            Some(Value::Map(ref map))
400                if map.contains_key("_eid")
401                    && map.contains_key("_src")
402                    && map.contains_key("_dst") =>
403            {
404                let ei = self.extract_edge_identity(map)?;
405                let current =
406                    read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407                let write_props = Self::merge_props(current, new_props, replace);
408                let edge_type_name = map
409                    .get("_type")
410                    .and_then(|v| v.as_str())
411                    .map(|s| s.to_string())
412                    .or_else(|| {
413                        self.storage
414                            .schema_manager()
415                            .edge_type_name_by_id_unified(ei.edge_type_id)
416                    });
417                let write_props = match &edge_type_name {
418                    Some(name) => Self::coerce_and_validate_props(
419                        write_props,
420                        &schema,
421                        std::slice::from_ref(name),
422                    )?,
423                    None => write_props,
424                };
425                writer
426                    .insert_edge(
427                        ei.src,
428                        ei.dst,
429                        ei.edge_type_id,
430                        ei.eid,
431                        write_props.clone(),
432                        edge_type_name,
433                        tx_l0,
434                    )
435                    .await?;
436                // Update the in-memory map-encoded edge binding
437                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438                    edge_map.retain(|k, _| k.starts_with('_'));
439                    let effective: HashMap<String, Value> = write_props
440                        .into_iter()
441                        .filter(|(_, v)| !v.is_null())
442                        .collect();
443                    for (k, v) in &effective {
444                        edge_map.insert(k.clone(), v.clone());
445                    }
446                    // Replace _all_props to reflect the complete property set
447                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
448                }
449            }
450            _ => {
451                // No matching entity — nothing to do (caller already guarded against Null)
452            }
453        }
454        Ok(())
455    }
456
457    /// Computes the property map to write given current storage state and the
458    /// incoming change map.
459    ///
460    /// When `replace` is `true`, keys present in `current` but absent from
461    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
462    /// `incoming` are always preserved as explicit tombstones.
463    ///
464    /// When `replace` is `false`, `current` is the base and `incoming` is
465    /// merged on top: each key in `incoming` overwrites or tombstones the
466    /// corresponding entry in `current`.
467    fn merge_props(
468        current: HashMap<String, Value>,
469        incoming: HashMap<String, Value>,
470        replace: bool,
471    ) -> HashMap<String, Value> {
472        if replace {
473            // Start from the non-null incoming entries only.
474            let mut result: HashMap<String, Value> = incoming
475                .iter()
476                .filter(|(_, v)| !v.is_null())
477                .map(|(k, v)| (k.clone(), v.clone()))
478                .collect();
479            // Tombstone every current key that is absent from incoming OR explicitly
480            // set to null in incoming (both mean "delete this property").
481            for k in current.keys() {
482                if incoming.get(k).is_none_or(|v| v.is_null()) {
483                    result.insert(k.clone(), Value::Null);
484                }
485            }
486            result
487        } else {
488            // Merge: start from current and apply incoming on top
489            let mut result = current;
490            result.extend(incoming);
491            result
492        }
493    }
494
495    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497        let eid = Eid::from(
498            map.get("_eid")
499                .and_then(|v| v.as_u64())
500                .ok_or_else(|| anyhow!("Invalid _eid"))?,
501        );
502        let src = Vid::from(
503            map.get("_src")
504                .and_then(|v| v.as_u64())
505                .ok_or_else(|| anyhow!("Invalid _src"))?,
506        );
507        let dst = Vid::from(
508            map.get("_dst")
509                .and_then(|v| v.as_u64())
510                .ok_or_else(|| anyhow!("Invalid _dst"))?,
511        );
512        let edge_type_id = self.resolve_edge_type_id(
513            map.get("_type")
514                .or_else(|| map.get("_type_name"))
515                .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516        )?;
517        Ok(EdgeIdentity {
518            eid,
519            src,
520            dst,
521            edge_type_id,
522        })
523    }
524
525    /// Resolve edge type ID from a Value, supporting both Int and String representations.
526    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527    ///
528    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530    /// where the edge type was just created and may not be in the read-only lookup yet.
531    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532        match type_val {
533            Value::Int(i) => Ok(*i as u32),
534            Value::String(name) => {
535                if self.config.strict_schema {
536                    let schema = self.storage.schema_manager().schema();
537                    schema
538                        .edge_type_id_by_name_case_insensitive(name)
539                        .ok_or_else(|| {
540                            anyhow!(
541                                "Edge type '{}' is not defined in the schema \
542                                 (strict_schema is enabled). \
543                                 Declare it with db.schema().edge_type(...).apply() first.",
544                                name
545                            )
546                        })
547                } else {
548                    // Schemaless: assign new ID if not found in schema or registry.
549                    Ok(self
550                        .storage
551                        .schema_manager()
552                        .get_or_assign_edge_type_id(name))
553                }
554            }
555            _ => Err(anyhow!(
556                "Invalid _type value: expected Int or String, got {:?}",
557                type_val
558            )),
559        }
560    }
561
562    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563        if let Some(writer_arc) = &self.writer {
564            // Flush first while holding the lock
565            {
566                let writer: &uni_store::Writer = writer_arc.as_ref();
567                writer.flush_to_l1(None).await?;
568            } // Drop lock before compacting to avoid blocking reads/writes
569
570            // Compaction can run without holding the writer lock
571            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572            let compaction_results = compactor.compact_all().await?;
573
574            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575            let am = self.storage.adjacency_manager();
576            let schema = self.storage.schema_manager().schema();
577            for info in compaction_results {
578                // Convert string direction to Direction enum
579                let direction = match info.direction.as_str() {
580                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
581                    "bwd" => uni_store::storage::direction::Direction::Incoming,
582                    _ => continue,
583                };
584
585                // Get edge_type_id
586                if let Some(edge_type_id) =
587                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588                {
589                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591                }
592            }
593        }
594        Ok(())
595    }
596
597    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598        if let Some(writer_arc) = &self.writer {
599            let writer: &uni_store::Writer = writer_arc.as_ref();
600            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601        }
602        Ok(())
603    }
604
605    pub(crate) async fn execute_copy_to(
606        &self,
607        identifier: &str,
608        path: &str,
609        format: &str,
610        options: &HashMap<String, Value>,
611    ) -> Result<usize> {
612        // Check schema to determine if identifier is an edge type or vertex label
613        let schema = self.storage.schema_manager().schema();
614
615        // Try as edge type first
616        if schema.get_edge_type_case_insensitive(identifier).is_some() {
617            return self
618                .export_edge_type_in_format(identifier, path, format)
619                .await;
620        }
621
622        // Try as vertex label
623        if schema.get_label_case_insensitive(identifier).is_some() {
624            return self
625                .export_vertex_label_in_format(identifier, path, format, options)
626                .await;
627        }
628
629        // Neither edge type nor vertex label found
630        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631    }
632
633    async fn export_vertex_label_in_format(
634        &self,
635        label: &str,
636        path: &str,
637        format: &str,
638        _options: &HashMap<String, Value>,
639    ) -> Result<usize> {
640        match format {
641            "parquet" => self.export_vertex_label(label, path).await,
642            "csv" => {
643                let mut stream = self
644                    .storage
645                    .scan_vertex_table_stream(label)
646                    .await?
647                    .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649                // Collect all batches
650                let mut all_rows = Vec::new();
651                let mut column_names = Vec::new();
652
653                // Iterate stream using StreamExt
654                use futures::StreamExt;
655                while let Some(batch_result) = stream.next().await {
656                    let batch = batch_result?;
657
658                    // Get column names from first batch
659                    if column_names.is_empty() {
660                        column_names = batch
661                            .schema()
662                            .fields()
663                            .iter()
664                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665                            .map(|f| f.name().clone())
666                            .collect();
667                    }
668
669                    // Convert batch to rows
670                    for row_idx in 0..batch.num_rows() {
671                        let mut row = Vec::new();
672                        for field in batch.schema().fields() {
673                            if field.name().starts_with('_') || field.name() == "ext_id" {
674                                continue;
675                            }
676
677                            let col_idx = batch.schema().index_of(field.name())?;
678                            let column = batch.column(col_idx);
679                            let value = self.arrow_value_to_json(column, row_idx)?;
680
681                            // Convert value to CSV string
682                            let csv_value = match value {
683                                Value::Null => String::new(),
684                                Value::Bool(b) => b.to_string(),
685                                Value::Int(i) => i.to_string(),
686                                Value::Float(f) => f.to_string(),
687                                Value::String(s) => s,
688                                _ => format!("{value}"),
689                            };
690                            row.push(csv_value);
691                        }
692                        all_rows.push(row);
693                    }
694                }
695
696                // Write CSV
697                let file = std::fs::File::create(path)?;
698                let mut wtr = csv::Writer::from_writer(file);
699
700                // Write headers
701                log::debug!("CSV export headers: {:?}", column_names);
702                wtr.write_record(&column_names)?;
703
704                // Write rows
705                for (i, row) in all_rows.iter().enumerate() {
706                    log::debug!("CSV export row {}: {:?}", i, row);
707                    wtr.write_record(row)?;
708                }
709
710                wtr.flush()?;
711                Ok(all_rows.len())
712            }
713            _ => Err(anyhow!(
714                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715                format
716            )),
717        }
718    }
719
720    async fn export_edge_type_in_format(
721        &self,
722        edge_type: &str,
723        path: &str,
724        format: &str,
725    ) -> Result<usize> {
726        match format {
727            "parquet" => self.export_edge_type(edge_type, path).await,
728            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729            _ => Err(anyhow!(
730                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731                format
732            )),
733        }
734    }
735
736    /// Write a stream of record batches to a Parquet file.
737    /// Returns the total number of rows written, or 0 if the stream is empty.
738    async fn write_batches_to_parquet(
739        mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740        path: &str,
741        entity_description: &str,
742    ) -> Result<usize> {
743        use futures::TryStreamExt;
744
745        // Get first batch to determine schema and create writer
746        let first_batch = match stream.try_next().await? {
747            Some(batch) => batch,
748            None => {
749                log::info!("No data to export from {}", entity_description);
750                return Ok(0);
751            }
752        };
753
754        // Create Parquet writer using schema from first batch
755        let file = std::fs::File::create(path)?;
756        let arrow_schema = first_batch.schema();
757        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759        // Write first batch
760        let mut count = first_batch.num_rows();
761        writer.write(&first_batch)?;
762
763        // Write remaining batches
764        while let Some(batch) = stream.try_next().await? {
765            count += batch.num_rows();
766            writer.write(&batch)?;
767        }
768
769        writer.close()?;
770
771        log::info!(
772            "Exported {} rows from {} to '{}'",
773            count,
774            entity_description,
775            path
776        );
777        Ok(count)
778    }
779
780    /// Export vertices of a specific label to Parquet
781    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782        let stream = self
783            .storage
784            .scan_vertex_table_stream(label)
785            .await?
786            .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789    }
790
791    /// Export edges of a specific type to Parquet
792    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793        let schema = self.storage.schema_manager().schema();
794        if !schema.edge_types.contains_key(edge_type) {
795            return Err(anyhow!("Edge type '{}' not found", edge_type));
796        }
797
798        let filter = format!("type = '{}'", edge_type);
799        let stream = self
800            .storage
801            .scan_main_edge_table_stream(Some(&filter))
802            .await?
803            .ok_or_else(|| anyhow!("No edge data found"))?;
804
805        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806    }
807
808    pub(crate) async fn execute_copy_from(
809        &self,
810        label: &str,
811        path: &str,
812        format: &str,
813        options: &HashMap<String, Value>,
814    ) -> Result<usize> {
815        // Read data from file
816        let batches = match format {
817            "parquet" => self.read_parquet_file(path)?,
818            "csv" => self.read_csv_file(path, label, options)?,
819            _ => {
820                return Err(anyhow!(
821                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822                    format
823                ));
824            }
825        };
826
827        // Get writer
828        let writer_arc = self
829            .writer
830            .as_ref()
831            .ok_or_else(|| anyhow!("No writer available"))?;
832
833        let db_schema = self.storage.schema_manager().schema();
834
835        // Check if this is a label (vertex) or edge type
836        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838        if is_edge {
839            // Import edges
840            let edge_type_id = db_schema
841                .edge_type_id_by_name(label)
842                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844            // Get src and dst column names from options
845            let src_col = options
846                .get("src_col")
847                .and_then(|v| v.as_str())
848                .unwrap_or("src");
849            let dst_col = options
850                .get("dst_col")
851                .and_then(|v| v.as_str())
852                .unwrap_or("dst");
853
854            // §5.7 of concurrent_writer.md: writer is hoisted above the row
855            // loop now that there is no per-row lock acquisition cost.
856            let writer: &uni_store::Writer = writer_arc.as_ref();
857            let mut total_rows = 0;
858            for batch in batches {
859                let num_rows = batch.num_rows();
860                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861                let eids = writer.allocate_eids(num_rows).await?;
862
863                for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864                    let mut properties = HashMap::new();
865                    let mut src_vid: Option<Vid> = None;
866                    let mut dst_vid: Option<Vid> = None;
867
868                    // Extract properties and VIDs from each column
869                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870                        let col_name = field.name();
871                        let column = batch.column(col_idx);
872                        let value = self.arrow_value_to_json(column, row_idx)?;
873
874                        if col_name == src_col {
875                            let raw = value.as_u64().unwrap_or_else(|| {
876                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877                            });
878                            src_vid = Some(Vid::new(raw));
879                        } else if col_name == dst_col {
880                            let raw = value.as_u64().unwrap_or_else(|| {
881                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882                            });
883                            dst_vid = Some(Vid::new(raw));
884                        } else if !col_name.starts_with('_') && !value.is_null() {
885                            properties.insert(col_name.clone(), value);
886                        }
887                    }
888
889                    let src = src_vid
890                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891                    let dst = dst_vid
892                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894                    writer
895                        .insert_edge(
896                            src,
897                            dst,
898                            edge_type_id,
899                            eid,
900                            properties,
901                            Some(label.to_string()),
902                            None,
903                        )
904                        .await?;
905
906                    total_rows += 1;
907                }
908            }
909
910            log::info!(
911                "Imported {} edge rows from '{}' into edge type '{}'",
912                total_rows,
913                path,
914                label
915            );
916
917            // Flush to persist edges
918            if total_rows > 0 {
919                writer.flush_to_l1(None).await?;
920            }
921
922            Ok(total_rows)
923        } else {
924            // Import vertices
925            // Validate the label exists in schema
926            db_schema
927                .label_id_by_name_case_insensitive(label)
928                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930            // §5.7 of concurrent_writer.md: writer is hoisted above the row
931            // loop now that there is no per-row lock acquisition cost.
932            let writer: &uni_store::Writer = writer_arc.as_ref();
933            let mut total_rows = 0;
934            for batch in batches {
935                let num_rows = batch.num_rows();
936                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937                let vids = writer.allocate_vids(num_rows).await?;
938
939                // Convert Arrow batch to rows
940                for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941                    let mut properties = HashMap::new();
942
943                    // Extract properties from each column
944                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945                        let col_name = field.name();
946
947                        // Skip internal columns
948                        if col_name.starts_with('_') {
949                            continue;
950                        }
951
952                        let column = batch.column(col_idx);
953                        let value = self.arrow_value_to_json(column, row_idx)?;
954
955                        if !value.is_null() {
956                            properties.insert(col_name.clone(), value);
957                        }
958                    }
959
960                    let _ = writer
961                        .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962                        .await?;
963
964                    total_rows += 1;
965                }
966            }
967
968            log::info!(
969                "Imported {} rows from '{}' into label '{}'",
970                total_rows,
971                path,
972                label
973            );
974
975            // Flush to persist vertices
976            if total_rows > 0 {
977                writer.flush_to_l1(None).await?;
978            }
979
980            Ok(total_rows)
981        }
982    }
983
984    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985        use arrow_array::Array;
986        use arrow_schema::DataType as ArrowDataType;
987
988        if column.is_null(row_idx) {
989            return Ok(Value::Null);
990        }
991
992        match column.data_type() {
993            ArrowDataType::Utf8 => {
994                let array = column
995                    .as_any()
996                    .downcast_ref::<arrow_array::StringArray>()
997                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998                Ok(Value::String(array.value(row_idx).to_string()))
999            }
1000            ArrowDataType::Int32 => {
1001                let array = column
1002                    .as_any()
1003                    .downcast_ref::<arrow_array::Int32Array>()
1004                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005                Ok(Value::Int(array.value(row_idx) as i64))
1006            }
1007            ArrowDataType::Int64 => {
1008                let array = column
1009                    .as_any()
1010                    .downcast_ref::<arrow_array::Int64Array>()
1011                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012                Ok(Value::Int(array.value(row_idx)))
1013            }
1014            ArrowDataType::Float32 => {
1015                let array = column
1016                    .as_any()
1017                    .downcast_ref::<arrow_array::Float32Array>()
1018                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019                Ok(Value::Float(array.value(row_idx) as f64))
1020            }
1021            ArrowDataType::Float64 => {
1022                let array = column
1023                    .as_any()
1024                    .downcast_ref::<arrow_array::Float64Array>()
1025                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026                Ok(Value::Float(array.value(row_idx)))
1027            }
1028            ArrowDataType::Boolean => {
1029                let array = column
1030                    .as_any()
1031                    .downcast_ref::<arrow_array::BooleanArray>()
1032                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033                Ok(Value::Bool(array.value(row_idx)))
1034            }
1035            ArrowDataType::UInt64 => {
1036                let array = column
1037                    .as_any()
1038                    .downcast_ref::<arrow_array::UInt64Array>()
1039                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040                Ok(Value::Int(array.value(row_idx) as i64))
1041            }
1042            _ => {
1043                // For other types, try to convert to string
1044                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045                if let Some(arr) = array {
1046                    Ok(Value::String(arr.value(row_idx).to_string()))
1047                } else {
1048                    Ok(Value::Null)
1049                }
1050            }
1051        }
1052    }
1053
1054    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055        let file = std::fs::File::open(path)?;
1056        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057            .build()?;
1058        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059    }
1060
1061    fn read_csv_file(
1062        &self,
1063        path: &str,
1064        label: &str,
1065        options: &HashMap<String, Value>,
1066    ) -> Result<Vec<arrow_array::RecordBatch>> {
1067        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069        use std::sync::Arc;
1070
1071        // Parse CSV options
1072        let has_headers = options
1073            .get("headers")
1074            .and_then(|v| v.as_bool())
1075            .unwrap_or(true);
1076
1077        // Read CSV file
1078        let file = std::fs::File::open(path)?;
1079        let mut rdr = csv::ReaderBuilder::new()
1080            .has_headers(has_headers)
1081            .from_reader(file);
1082
1083        // Get schema for type conversion
1084        let db_schema = self.storage.schema_manager().schema();
1085        let properties = db_schema.properties.get(label);
1086
1087        // Collect all rows first to determine schema
1088        let mut rows: Vec<Vec<String>> = Vec::new();
1089        let headers: Vec<String> = if has_headers {
1090            rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091        } else {
1092            Vec::new()
1093        };
1094
1095        for result in rdr.records() {
1096            let record = result?;
1097            rows.push(record.iter().map(|s| s.to_string()).collect());
1098        }
1099
1100        if rows.is_empty() {
1101            return Ok(Vec::new());
1102        }
1103
1104        // Build Arrow schema with proper types based on DB schema
1105        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106        let col_names: Vec<String> = if has_headers {
1107            headers
1108        } else {
1109            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110        };
1111
1112        for name in &col_names {
1113            let arrow_type = if let Some(props) = properties {
1114                if let Some(prop_meta) = props.get(name) {
1115                    match prop_meta.r#type {
1116                        DataType::Int32 => ArrowDataType::Int32,
1117                        DataType::Int64 => ArrowDataType::Int64,
1118                        DataType::Float32 => ArrowDataType::Float32,
1119                        DataType::Float64 => ArrowDataType::Float64,
1120                        DataType::Bool => ArrowDataType::Boolean,
1121                        _ => ArrowDataType::Utf8,
1122                    }
1123                } else {
1124                    ArrowDataType::Utf8
1125                }
1126            } else {
1127                ArrowDataType::Utf8
1128            };
1129            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130        }
1131
1132        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134        // Convert rows to Arrow arrays with proper types
1135        let mut columns: Vec<ArrayRef> = Vec::new();
1136        for (col_idx, field) in arrow_fields.iter().enumerate() {
1137            match field.data_type() {
1138                ArrowDataType::Int32 => {
1139                    let values: Vec<Option<i32>> = rows
1140                        .iter()
1141                        .map(|row| {
1142                            if col_idx < row.len() {
1143                                row[col_idx].parse().ok()
1144                            } else {
1145                                None
1146                            }
1147                        })
1148                        .collect();
1149                    columns.push(Arc::new(Int32Array::from(values)));
1150                }
1151                _ => {
1152                    // Default to string
1153                    let values: Vec<Option<String>> = rows
1154                        .iter()
1155                        .map(|row| {
1156                            if col_idx < row.len() {
1157                                Some(row[col_idx].clone())
1158                            } else {
1159                                None
1160                            }
1161                        })
1162                        .collect();
1163                    columns.push(Arc::new(StringArray::from(values)));
1164                }
1165            }
1166        }
1167
1168        let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169        Ok(vec![batch])
1170    }
1171
1172    fn parse_data_type(type_str: &str) -> Result<DataType> {
1173        use uni_common::core::schema::{CrdtType, PointType};
1174        let type_str = type_str.to_lowercase();
1175        let type_str = type_str.trim();
1176        match type_str {
1177            "string" | "text" | "varchar" => Ok(DataType::String),
1178            "int" | "integer" | "int32" => Ok(DataType::Int32),
1179            "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180            "float" | "float32" | "real" => Ok(DataType::Float32),
1181            "double" | "float64" => Ok(DataType::Float64),
1182            "bool" | "boolean" => Ok(DataType::Bool),
1183            "timestamp" => Ok(DataType::Timestamp),
1184            "date" => Ok(DataType::Date),
1185            "time" => Ok(DataType::Time),
1186            "datetime" => Ok(DataType::DateTime),
1187            "duration" => Ok(DataType::Duration),
1188            "btic" => Ok(DataType::Btic),
1189            "json" | "jsonb" => Ok(DataType::CypherValue),
1190            "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194            s if s.starts_with("vector(") && s.ends_with(')') => {
1195                let dims_str = &s[7..s.len() - 1];
1196                let dimensions = dims_str
1197                    .parse::<usize>()
1198                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199                Ok(DataType::Vector { dimensions })
1200            }
1201            s if s.starts_with("list<") && s.ends_with('>') => {
1202                let inner_type_str = &s[5..s.len() - 1];
1203                let inner_type = Self::parse_data_type(inner_type_str)?;
1204                Ok(DataType::List(Box::new(inner_type)))
1205            }
1206            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1207            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1208            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1209        }
1210    }
1211
1212    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1213        let sm = self.storage.schema_manager_arc();
1214        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1215            return Ok(());
1216        }
1217        sm.add_label_with_desc(&clause.name, clause.description)?;
1218        for prop in clause.properties {
1219            let dt = Self::parse_data_type(&prop.data_type)?;
1220            sm.add_property_with_desc(
1221                &clause.name,
1222                &prop.name,
1223                dt,
1224                prop.nullable,
1225                prop.description,
1226            )?;
1227            if prop.unique {
1228                let constraint = Constraint {
1229                    name: format!("{}_{}_unique", clause.name, prop.name),
1230                    constraint_type: ConstraintType::Unique {
1231                        properties: vec![prop.name],
1232                    },
1233                    target: ConstraintTarget::Label(clause.name.clone()),
1234                    enabled: true,
1235                };
1236                sm.add_constraint(constraint)?;
1237            }
1238        }
1239        sm.save().await?;
1240        Ok(())
1241    }
1242
1243    /// True if `key` is a generated property on any of the given labels.
1244    /// Used by the partial-write flush path (Round 12 §C) to decide
1245    /// whether the property should be added to `touched_keys` so that
1246    /// Lance MergeInsert sends the recomputed value.
1247    fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1248        let schema = self.storage.schema_manager().schema();
1249        for label in labels {
1250            if let Some(props_meta) = schema.properties.get(label)
1251                && let Some(meta) = props_meta.get(key)
1252                && meta.generation_expression.is_some()
1253            {
1254                return true;
1255            }
1256        }
1257        false
1258    }
1259
1260    pub(crate) async fn enrich_properties_with_generated_columns(
1261        &self,
1262        label_name: &str,
1263        properties: &mut HashMap<String, Value>,
1264        prop_manager: &PropertyManager,
1265        params: &HashMap<String, Value>,
1266        ctx: Option<&QueryContext>,
1267    ) -> Result<()> {
1268        let schema = self.storage.schema_manager().schema();
1269
1270        if let Some(props_meta) = schema.properties.get(label_name) {
1271            let mut generators = Vec::new();
1272            for (prop_name, meta) in props_meta {
1273                if let Some(expr_str) = &meta.generation_expression {
1274                    generators.push((prop_name.clone(), expr_str.clone()));
1275                }
1276            }
1277
1278            for (prop_name, expr_str) in generators {
1279                let cache_key = (label_name.to_string(), prop_name.clone());
1280                let expr = {
1281                    let cache = self.gen_expr_cache.read().await;
1282                    cache.get(&cache_key).cloned()
1283                };
1284
1285                let expr = match expr {
1286                    Some(e) => e,
1287                    None => {
1288                        let parsed = uni_cypher::parse_expression(&expr_str)
1289                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1290                        let mut cache = self.gen_expr_cache.write().await;
1291                        cache.insert(cache_key, parsed.clone());
1292                        parsed
1293                    }
1294                };
1295
1296                let mut scope = HashMap::new();
1297
1298                // If expression has an explicit variable, use it as an object
1299                if let Some(var) = expr.extract_variable() {
1300                    scope.insert(var, Value::Map(properties.clone()));
1301                } else {
1302                    // No explicit variable - add properties directly to scope for bare references
1303                    // e.g., "lower(email)" can reference "email" directly
1304                    for (k, v) in properties.iter() {
1305                        scope.insert(k.clone(), v.clone());
1306                    }
1307                }
1308
1309                let val = self
1310                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1311                    .await?;
1312                properties.insert(prop_name, val);
1313            }
1314        }
1315        Ok(())
1316    }
1317
1318    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1319        let sm = self.storage.schema_manager_arc();
1320        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1321            return Ok(());
1322        }
1323        sm.add_edge_type_with_desc(
1324            &clause.name,
1325            clause.src_labels,
1326            clause.dst_labels,
1327            clause.description,
1328        )?;
1329        for prop in clause.properties {
1330            let dt = Self::parse_data_type(&prop.data_type)?;
1331            sm.add_property_with_desc(
1332                &clause.name,
1333                &prop.name,
1334                dt,
1335                prop.nullable,
1336                prop.description,
1337            )?;
1338        }
1339        sm.save().await?;
1340        Ok(())
1341    }
1342
1343    /// Executes an ALTER action on a schema entity.
1344    ///
1345    /// This is a shared helper for both `execute_alter_label` and
1346    /// `execute_alter_edge_type` since they have identical logic.
1347    pub(crate) async fn execute_alter_entity(
1348        sm: &Arc<SchemaManager>,
1349        entity_name: &str,
1350        action: AlterAction,
1351    ) -> Result<()> {
1352        match action {
1353            AlterAction::AddProperty(prop) => {
1354                let dt = Self::parse_data_type(&prop.data_type)?;
1355                sm.add_property_with_desc(
1356                    entity_name,
1357                    &prop.name,
1358                    dt,
1359                    prop.nullable,
1360                    prop.description,
1361                )?;
1362            }
1363            AlterAction::DropProperty(prop_name) => {
1364                sm.drop_property(entity_name, &prop_name)?;
1365            }
1366            AlterAction::RenameProperty { old_name, new_name } => {
1367                sm.rename_property(entity_name, &old_name, &new_name)?;
1368            }
1369            AlterAction::SetDescription(desc) => {
1370                if sm.schema().labels.contains_key(entity_name) {
1371                    sm.set_label_description(entity_name, desc)?;
1372                } else {
1373                    sm.set_edge_type_description(entity_name, desc)?;
1374                }
1375            }
1376            AlterAction::SetPropertyDescription {
1377                property,
1378                description,
1379            } => {
1380                sm.set_property_description(entity_name, &property, description)?;
1381            }
1382        }
1383        sm.save().await?;
1384        Ok(())
1385    }
1386
1387    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1388        Self::execute_alter_entity(
1389            &self.storage.schema_manager_arc(),
1390            &clause.name,
1391            clause.action,
1392        )
1393        .await
1394    }
1395
1396    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1397        Self::execute_alter_entity(
1398            &self.storage.schema_manager_arc(),
1399            &clause.name,
1400            clause.action,
1401        )
1402        .await
1403    }
1404
1405    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1406        let sm = self.storage.schema_manager_arc();
1407        sm.drop_label(&clause.name, clause.if_exists)?;
1408        sm.save().await?;
1409        Ok(())
1410    }
1411
1412    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1413        let sm = self.storage.schema_manager_arc();
1414        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1415        sm.save().await?;
1416        Ok(())
1417    }
1418
1419    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1420        let sm = self.storage.schema_manager_arc();
1421        let target = ConstraintTarget::Label(clause.label);
1422        let c_type = match clause.constraint_type {
1423            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1424                properties: clause.properties,
1425            },
1426            AstConstraintType::Exists => {
1427                let property = clause
1428                    .properties
1429                    .into_iter()
1430                    .next()
1431                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1432                ConstraintType::Exists { property }
1433            }
1434            AstConstraintType::Check => {
1435                let expression = clause
1436                    .expression
1437                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1438                ConstraintType::Check {
1439                    expression: expression.to_string_repr(),
1440                }
1441            }
1442        };
1443
1444        let constraint = Constraint {
1445            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1446            constraint_type: c_type,
1447            target,
1448            enabled: true,
1449        };
1450
1451        sm.add_constraint(constraint)?;
1452        sm.save().await?;
1453        Ok(())
1454    }
1455
1456    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1457        let sm = self.storage.schema_manager_arc();
1458        sm.drop_constraint(&clause.name, false)?;
1459        sm.save().await?;
1460        Ok(())
1461    }
1462
1463    /// Detects the single-node, single-label MERGE shape the fast path serves.
1464    ///
1465    /// Returns the node pattern and its label when `pattern` is one path with
1466    /// one node element, exactly one label, and a static map-literal property
1467    /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1468    /// per-row query planning. The keys do NOT need to be indexed: the persisted
1469    /// lookup degrades to a (single, filtered) label scan when no scalar index
1470    /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1471    /// Any other shape (edges, multiple labels, non-literal properties) returns
1472    /// `None` so the caller uses the general per-row path.
1473    fn merge_single_node_fastpath<'p>(
1474        &self,
1475        pattern: &'p Pattern,
1476    ) -> Option<(&'p NodePattern, String)> {
1477        if pattern.paths.len() != 1 {
1478            return None;
1479        }
1480        let path = &pattern.paths[0];
1481        if path.elements.len() != 1 {
1482            return None;
1483        }
1484        let PatternElement::Node(n) = &path.elements[0] else {
1485            return None;
1486        };
1487        let labels = n.labels.names();
1488        if labels.len() != 1 {
1489            return None;
1490        }
1491        // The key must be a static map literal so the key names are known.
1492        let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1493            return None;
1494        };
1495        if entries.is_empty() {
1496            return None;
1497        }
1498        // Resolve the label to its schema-canonical case so the fast path agrees
1499        // with the general MERGE path (which matches labels case-insensitively).
1500        // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1501        // scans/keys a different label than the canonical one and creates a
1502        // duplicate (review #3a). Falls back to the as-written label when the
1503        // schema does not know it (schemaless).
1504        let canonical = self
1505            .storage
1506            .schema_manager()
1507            .schema()
1508            .canonical_label_name(&labels[0])
1509            .unwrap_or_else(|| labels[0].clone());
1510        Some((n, canonical))
1511    }
1512
1513    /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1514    /// is not a scalar this fast path can represent.
1515    ///
1516    /// Returning `None` makes the caller fall back to the general per-row path,
1517    /// so unusual key value types (lists, maps, temporals, nulls) are never
1518    /// silently mis-matched. The `_deleted = false` clause mirrors the
1519    /// persisted-read predicate used elsewhere; the version high-water-mark
1520    /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1521    fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1522        if key_props.is_empty() {
1523            return None;
1524        }
1525        let mut parts = Vec::with_capacity(key_props.len() + 1);
1526        for (k, v) in key_props {
1527            if !Self::is_safe_key_ident(k) {
1528                return None;
1529            }
1530            let lit = Self::render_key_literal(v)?;
1531            // Unquoted identifier: the Lance filter parser does not resolve a
1532            // double-quoted column name against the table here, so `"k" = v`
1533            // silently matches nothing. Keys are validated above to be safe
1534            // bare identifiers.
1535            parts.push(format!("{k} = {lit}"));
1536        }
1537        parts.push("_deleted = false".to_string());
1538        Some(parts.join(" AND "))
1539    }
1540
1541    /// True when a MERGE key name is a safe bare identifier for a Lance
1542    /// filter (issue #8). Keys come from a static map literal, but validate
1543    /// anyway.
1544    fn is_safe_key_ident(k: &str) -> bool {
1545        !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1546    }
1547
1548    /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1549    /// for value types this fast path cannot represent (lists, maps,
1550    /// temporals, nulls) — the caller then falls back to the general path.
1551    fn render_key_literal(v: &Value) -> Option<String> {
1552        Some(match v {
1553            Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1554            Value::Int(i) => i.to_string(),
1555            Value::Float(f) => f.to_string(),
1556            Value::Bool(b) => b.to_string(),
1557            _ => return None,
1558        })
1559    }
1560
1561    /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1562    /// sorted by `key_names` order, values canonicalized).
1563    ///
1564    /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1565    /// never compares mixed literal types against one column); composite keys
1566    /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1567    /// the same `_deleted = false` clause the per-row filter used.
1568    fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1569        if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1570            return None;
1571        }
1572        let disjunction = if let [key] = key_names {
1573            // Group literals by value variant so each IN list is homogeneous.
1574            let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1575            for tuple in keys {
1576                let (_, v) = tuple.first()?;
1577                groups
1578                    .entry(std::mem::discriminant(v))
1579                    .or_default()
1580                    .push(Self::render_key_literal(v)?);
1581            }
1582            groups
1583                .into_values()
1584                .map(|lits| {
1585                    if let [lit] = lits.as_slice() {
1586                        format!("{key} = {lit}")
1587                    } else {
1588                        format!("{key} IN ({})", lits.join(", "))
1589                    }
1590                })
1591                .collect::<Vec<_>>()
1592                .join(" OR ")
1593        } else {
1594            keys.iter()
1595                .map(|tuple| {
1596                    let conj = tuple
1597                        .iter()
1598                        .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1599                        .collect::<Option<Vec<_>>>()?
1600                        .join(" AND ");
1601                    Some(format!("({conj})"))
1602                })
1603                .collect::<Option<Vec<_>>>()?
1604                .join(" OR ")
1605        };
1606        Some(format!("({disjunction}) AND _deleted = false"))
1607    }
1608
1609    /// Canonicalize a numeric MERGE-key value for *matching only*.
1610    ///
1611    /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1612    /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1613    /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1614    /// path already applies (review #3a). Non-numeric and non-integral values are
1615    /// returned unchanged. Used only to build match keys / comparisons, never the
1616    /// value written to a created node.
1617    fn canonical_key_value(v: &Value) -> Value {
1618        match v {
1619            Value::Float(f)
1620                if f.is_finite()
1621                    && f.fract() == 0.0
1622                    && *f >= i64::MIN as f64
1623                    && *f <= i64::MAX as f64 =>
1624            {
1625                Value::Int(*f as i64)
1626            }
1627            other => other.clone(),
1628        }
1629    }
1630
1631    /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1632    ///
1633    /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1634    /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1635    /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1636    /// created node's properties come from the original, un-canonicalized map.
1637    fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1638        let mut tuple: MergeKey = key_props
1639            .iter()
1640            .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1641            .collect();
1642        tuple.sort_by(|a, b| a.0.cmp(&b.0));
1643        tuple
1644    }
1645
1646    /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1647    ///
1648    /// Walked once per MERGE statement (issue #69): the per-row fast path then
1649    /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1650    /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1651    /// rows and rows created earlier in the same transaction; rows created by
1652    /// later rows of this same statement are folded in incrementally by
1653    /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1654    /// [`Self::merge_key_tuple`].
1655    fn merge_l0_existing(
1656        &self,
1657        label: &str,
1658        key_names: &[String],
1659        ctx: Option<&QueryContext>,
1660    ) -> HashMap<MergeKey, Vec<Vid>> {
1661        let mut candidates: Vec<Vid> = Vec::new();
1662        l0_visibility::visit_l0_buffers(ctx, |l0| {
1663            if let Some(vids) = l0.label_to_vids.get(label) {
1664                candidates.extend(vids.iter().copied());
1665            }
1666            false
1667        });
1668
1669        let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1670        let mut seen: HashSet<Vid> = HashSet::new();
1671        for vid in candidates {
1672            if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1673                continue;
1674            }
1675            // `lookup_vertex_prop` merges across L0 layers (newest wins).
1676            let tuple: MergeKey = key_names
1677                .iter()
1678                .map(|k| {
1679                    let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1680                    (k.clone(), Self::canonical_key_value(&v))
1681                })
1682                .collect();
1683            map.entry(tuple).or_default().push(vid);
1684        }
1685        map
1686    }
1687
1688    /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1689    /// size and Lance/DataFusion parse cost; chunks run sequentially.
1690    const MERGE_SCAN_CHUNK: usize = 1000;
1691
1692    /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1693    /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1694    /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1695    /// independent Lance scans).
1696    ///
1697    /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1698    /// read path `MATCH` uses, so it honors the version high-water-mark and
1699    /// sees flushed rows. On the declared-label branch the key-filtered scan
1700    /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1701    /// picks each candidate's max-`_version` row and requires it to be live
1702    /// and still keyed as requested (per-label tables are MVCC-append, so a
1703    /// superseded version's row would otherwise stale-match a rewritten key).
1704    /// Matched rows are grouped by their CANONICAL key tuple (stored values
1705    /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1706    /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1707    /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1708    /// by earlier rows of the same statement) is NOT checked here — the
1709    /// per-row consumer re-checks at row time, exactly as the old per-row
1710    /// scan did.
1711    ///
1712    /// The second returned map carries the FULL property maps the schemaless
1713    /// branch already decoded for each matched vid (empty on the declared-label
1714    /// branch, which projects only key columns) — the caller seeds the
1715    /// statement-level [`Prefetch`] from it at zero extra scans.
1716    ///
1717    /// # Errors
1718    /// Propagates persisted-scan and filter-build failures — fail-closed: a
1719    /// MERGE must never treat a failed lookup as "no match" and create
1720    /// duplicates.
1721    async fn merge_lookup_persisted_batch(
1722        &self,
1723        label: &str,
1724        key_names: &[String],
1725        keys: &HashSet<MergeKey>,
1726    ) -> Result<(
1727        HashMap<MergeKey, Vec<Vid>>,
1728        HashMap<Vid, uni_common::Properties>,
1729    )> {
1730        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1731        if keys.is_empty() {
1732            return Ok((out, HashMap::new()));
1733        }
1734        // An undeclared (schemaless) label has no per-label table — its flushed
1735        // rows live only in the unified main vertex table. Route to the
1736        // main-table lookup, mirroring the planner's scan routing (a schemaless
1737        // MATCH plans `ScanMainByLabels` on the same schema predicate).
1738        if self
1739            .storage
1740            .schema_manager()
1741            .schema()
1742            .get_label_case_insensitive(label)
1743            .is_none()
1744        {
1745            return self
1746                .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1747                .await;
1748        }
1749        // Declared label — the per-label table is MVCC-append (an update
1750        // flush adds a higher-`_version` row for the same vid) and the key
1751        // predicate is pushed into the Lance filter, so a SUPERSEDED version
1752        // whose row still carries a requested key is returned while the vid's
1753        // current row (key rewritten, fails the filter) is invisible to the
1754        // scan. Version dedup among the returned rows cannot detect that, so
1755        // the lookup runs in two passes: the key-filtered scan only nominates
1756        // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1757        // each candidate's max-`_version` row to be live and still keyed as
1758        // requested.
1759        let mut columns: Vec<&str> = vec!["_vid"];
1760        columns.extend(key_names.iter().map(String::as_str));
1761
1762        let key_list: Vec<&MergeKey> = keys.iter().collect();
1763        let mut candidates: Vec<Vid> = Vec::new();
1764        let mut seen: HashSet<Vid> = HashSet::new();
1765        for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1766            let filter = Self::merge_batch_filter(key_names, chunk)
1767                .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1768            let scanned = self
1769                .storage
1770                .scan_vertex_table(label, &columns, Some(&filter))
1771                .await?;
1772            let Some(batch) = scanned else { continue };
1773            let Some(vid_col) = batch
1774                .column_by_name("_vid")
1775                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1776            else {
1777                continue;
1778            };
1779            for i in 0..vid_col.len() {
1780                let vid = Vid::from(vid_col.value(i));
1781                if seen.insert(vid) {
1782                    candidates.push(vid);
1783                }
1784            }
1785        }
1786
1787        // Verification pass — tombstones are NOT filtered Lance-side (the
1788        // max-version pick must see them so a deleted winner cannot let an
1789        // older live version resurrect the match), exactly like the
1790        // schemaless branch below.
1791        let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1792        verify_columns.extend(key_names.iter().map(String::as_str));
1793        for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1794            let vid_list = chunk
1795                .iter()
1796                .map(|v| v.as_u64().to_string())
1797                .collect::<Vec<_>>()
1798                .join(", ");
1799            let filter = format!("_vid IN ({vid_list})");
1800            let scanned = self
1801                .storage
1802                .scan_vertex_table(label, &verify_columns, Some(&filter))
1803                .await?;
1804            let Some(batch) = scanned else { continue };
1805            let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1806                batch
1807                    .column_by_name("_vid")
1808                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1809                batch
1810                    .column_by_name("_deleted")
1811                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1812                batch
1813                    .column_by_name("_version")
1814                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1815            ) else {
1816                return Err(anyhow!(
1817                    "MERGE batched lookup: verification scan missing a required column"
1818                ));
1819            };
1820            let key_cols: Vec<_> = key_names
1821                .iter()
1822                .map(|k| batch.column_by_name(k))
1823                .collect::<Option<Vec<_>>>()
1824                .ok_or_else(|| {
1825                    anyhow!("MERGE batched lookup: projected key column missing from scan result")
1826                })?;
1827            // Per-vid MVCC dedup: keep the highest-version row for each vid.
1828            let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1829            for i in 0..batch.num_rows() {
1830                let vid = Vid::from(vid_col.value(i));
1831                let ver = ver_col.value(i);
1832                let entry = winners.entry(vid).or_insert((ver, i));
1833                if ver > entry.0 {
1834                    *entry = (ver, i);
1835                }
1836            }
1837            for (vid, (_ver, row)) in winners {
1838                if del_col.value(row) {
1839                    continue;
1840                }
1841                let tuple: MergeKey = key_names
1842                    .iter()
1843                    .zip(&key_cols)
1844                    .map(|(k, col)| {
1845                        let v = uni_store::storage::arrow_convert::arrow_to_value(
1846                            col.as_ref(),
1847                            row,
1848                            None,
1849                        );
1850                        (k.clone(), Self::canonical_key_value(&v))
1851                    })
1852                    .collect();
1853                if keys.contains(&tuple) {
1854                    out.entry(tuple).or_default().push(vid);
1855                }
1856            }
1857        }
1858        Ok((out, HashMap::new()))
1859    }
1860
1861    /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1862    ///
1863    /// Schemaless rows live only in the unified main vertex table (per-label
1864    /// tables exist only for declared labels), with all properties encoded in
1865    /// the `props_json` CypherValue blob — so key values cannot be pushed into
1866    /// the Lance filter; the key match happens in memory after decoding,
1867    /// exactly like the schemaless MATCH scan. One main-table scan regardless
1868    /// of key count.
1869    ///
1870    /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1871    /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1872    /// version per vid); the per-vid max-`_version` dedup runs here, then
1873    /// deleted winners are dropped.
1874    ///
1875    /// Also returns the full decoded property map per matched vid — the blob
1876    /// is decoded here anyway, and the caller seeds the statement-level
1877    /// [`Prefetch`] from it instead of re-reading per row.
1878    ///
1879    /// # Errors
1880    /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
1881    /// never treat a failed lookup as "no match" and create duplicates.
1882    async fn merge_lookup_persisted_batch_schemaless(
1883        &self,
1884        label: &str,
1885        key_names: &[String],
1886        keys: &HashSet<MergeKey>,
1887    ) -> Result<(
1888        HashMap<MergeKey, Vec<Vid>>,
1889        HashMap<Vid, uni_common::Properties>,
1890    )> {
1891        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1892        let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
1893        let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
1894        let Some(batch) = self
1895            .storage
1896            .scan_main_vertex_table(
1897                &["_vid", "_deleted", "props_json", "_version"],
1898                Some(&filter),
1899            )
1900            .await?
1901        else {
1902            return Ok((out, props_by_vid));
1903        };
1904        let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1905            batch
1906                .column_by_name("_vid")
1907                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1908            batch
1909                .column_by_name("_deleted")
1910                .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1911            batch
1912                .column_by_name("_version")
1913                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1914        ) else {
1915            return Err(anyhow!(
1916                "schemaless MERGE lookup: main vertex table scan missing a required column"
1917            ));
1918        };
1919        let props_col = batch
1920            .column_by_name("props_json")
1921            .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1922
1923        // Per-vid MVCC dedup: keep the highest-version row for each vid.
1924        let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1925        for i in 0..batch.num_rows() {
1926            let vid = Vid::from(vid_col.value(i));
1927            let ver = ver_col.value(i);
1928            let entry = winners.entry(vid).or_insert((ver, i));
1929            if ver > entry.0 {
1930                *entry = (ver, i);
1931            }
1932        }
1933        for (vid, (_ver, row)) in winners {
1934            // Drop deletion tombstones AFTER picking the winner — a deleted
1935            // winner must not let an older live version resurrect the match.
1936            if del_col.value(row) {
1937                continue;
1938            }
1939            // A row without properties matches only an all-Null key tuple.
1940            let props = match props_col {
1941                Some(arr) if !arrow_array::Array::is_null(arr, row) => {
1942                    match uni_common::cypher_value_codec::decode(arr.value(row))
1943                        .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
1944                    {
1945                        Value::Map(m) => m,
1946                        _ => HashMap::new(),
1947                    }
1948                }
1949                _ => HashMap::new(),
1950            };
1951            let tuple: MergeKey = key_names
1952                .iter()
1953                .map(|k| {
1954                    (
1955                        k.clone(),
1956                        Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
1957                    )
1958                })
1959                .collect();
1960            if keys.contains(&tuple) {
1961                out.entry(tuple).or_default().push(vid);
1962                props_by_vid.insert(vid, props);
1963            }
1964        }
1965        Ok((out, props_by_vid))
1966    }
1967
1968    /// True if the statement-level MERGE property prefetch is safe for `label`.
1969    ///
1970    /// False when the label declares any CRDT-typed property: a prefetch HIT in
1971    /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
1972    /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
1973    /// labels keep the per-row read path. Undeclared labels are trivially safe
1974    /// (normalization is a no-op without schema CRDT entries).
1975    fn merge_label_prefetch_safe(&self, label: &str) -> bool {
1976        let schema = self.storage.schema_manager().schema();
1977        schema.properties.get(label).is_none_or(|props| {
1978            !props
1979                .values()
1980                .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1981        })
1982    }
1983
1984    /// True if an L0 override rewrote any key column of a persisted match away
1985    /// from its requested value (so the persisted row no longer matches).
1986    fn vid_overrides_break_key(
1987        vid: Vid,
1988        key_props: &HashMap<String, Value>,
1989        ctx: Option<&QueryContext>,
1990    ) -> bool {
1991        key_props.iter().any(|(k, want)| {
1992            matches!(
1993                l0_visibility::lookup_vertex_prop(vid, k, ctx),
1994                Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
1995            )
1996        })
1997    }
1998
1999    /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2000    /// node variable.
2001    ///
2002    /// Matches the binding shape produced by `execute_create_pattern` and the
2003    /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2004    /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2005    /// valid node binding for those consumers.
2006    fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2007        let mut obj = HashMap::new();
2008        obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2009        obj.insert(
2010            "_labels".to_string(),
2011            Value::List(vec![Value::String(label.to_string())]),
2012        );
2013        for (k, v) in props {
2014            obj.insert(k, v);
2015        }
2016        Value::Map(obj)
2017    }
2018
2019    /// True if an L0-only vertex has every key column set to the requested
2020    /// value. A missing column matches only a requested `Null`.
2021    fn l0_vid_matches_key(
2022        vid: Vid,
2023        key_props: &HashMap<String, Value>,
2024        ctx: Option<&QueryContext>,
2025    ) -> bool {
2026        key_props.iter().all(
2027            |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2028                Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2029                None => *want == Value::Null,
2030            },
2031        )
2032    }
2033
2034    /// Index fast-path execution for one MERGE row of the shape detected by
2035    /// [`Self::merge_single_node_fastpath`].
2036    ///
2037    /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2038    /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2039    /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2040    /// applies ON MATCH SET to every match, or creates the node and applies
2041    /// ON CREATE SET when there is none. A newly created vertex is folded into
2042    /// `existing` so a later row of the same batch with the same key matches it
2043    /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2044    /// match, or one for a create).
2045    ///
2046    /// `prefetched` is the statement-level property prefetch (`None` when the
2047    /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2048    /// vids carry their persisted base row, freshly created vids are seeded
2049    /// with an empty base — per-row reads then resolve as base + L0 layering
2050    /// (every SET flush writes the full row to L0 before the next read, so a
2051    /// prefetch hit equals a fresh read) instead of one storage scan each.
2052    ///
2053    /// # Errors
2054    /// Propagates evaluation, create, and SET failures.
2055    #[expect(
2056        clippy::too_many_arguments,
2057        reason = "mirrors execute_merge's threaded execution state"
2058    )]
2059    async fn execute_merge_row_indexed(
2060        &self,
2061        label: &str,
2062        node: &NodePattern,
2063        path_pattern: &Pattern,
2064        temp_vars: &[String],
2065        mut row: HashMap<String, Value>,
2066        key_props: &HashMap<String, Value>,
2067        persisted: &HashMap<MergeKey, Vec<Vid>>,
2068        key_tuple: &MergeKey,
2069        existing: &mut HashMap<MergeKey, Vec<Vid>>,
2070        on_match: Option<&SetClause>,
2071        on_create: Option<&SetClause>,
2072        prop_manager: &PropertyManager,
2073        params: &HashMap<String, Value>,
2074        ctx: Option<&QueryContext>,
2075        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2076        writer: &Writer,
2077        mut prefetched: Option<&mut Prefetch>,
2078    ) -> Result<Vec<HashMap<String, Value>>> {
2079        let empty_prefetch = Prefetch::default();
2080        let mut seen: HashSet<Vid> = HashSet::new();
2081        let mut matches: Vec<Vid> = Vec::new();
2082        // Persisted (flushed) matches from the per-statement prefetch. The
2083        // prefetch is static for the statement, so re-verify liveness at row
2084        // time — an earlier row of this batch may have deleted the candidate
2085        // or rewritten its key (the old per-row scan saw those through its L0
2086        // overlay checks; these are the same checks, moved to row time).
2087        if let Some(vids) = persisted.get(key_tuple) {
2088            for &vid in vids {
2089                if l0_visibility::is_vertex_deleted(vid, ctx) {
2090                    continue;
2091                }
2092                if Self::vid_overrides_break_key(vid, key_props, ctx) {
2093                    continue;
2094                }
2095                if seen.insert(vid) {
2096                    matches.push(vid);
2097                }
2098            }
2099        }
2100        // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2101        // in case a prior row of this batch mutated or deleted the candidate.
2102        if let Some(vids) = existing.get(key_tuple) {
2103            for &vid in vids {
2104                if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2105                    continue;
2106                }
2107                if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2108                    matches.push(vid);
2109                }
2110            }
2111        }
2112
2113        let mut out = Vec::new();
2114        if matches.is_empty() {
2115            // No match: create the node, then apply ON CREATE SET. Fold the
2116            // ON CREATE SET property assignments into seed props first so a
2117            // NOT-NULL property supplied only by ON CREATE SET passes
2118            // create-time validation (RC4); the post-create SET below settles
2119            // the final values.
2120            let seed_props = self
2121                .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2122                .await?;
2123            self.execute_create_pattern(
2124                path_pattern,
2125                &mut row,
2126                writer,
2127                prop_manager,
2128                params,
2129                ctx,
2130                tx_l0_override,
2131                Some(&seed_props),
2132            )
2133            .await?;
2134            // Fold the new vertex into the batch snapshot for intra-batch
2135            // dedup, and seed the statement prefetch with an empty base: a
2136            // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2137            // resolves from the L0 row the create just wrote instead of
2138            // issuing a per-row storage scan that finds nothing.
2139            if let Some(var) = &node.variable
2140                && let Some(val) = row.get(var)
2141                && let Ok(vid) = Self::vid_from_value(val)
2142            {
2143                existing.entry(key_tuple.clone()).or_default().push(vid);
2144                if let Some(p) = prefetched.as_deref_mut() {
2145                    p.vertex.entry(vid).or_default();
2146                }
2147            }
2148            if let Some(set) = on_create {
2149                self.execute_set_items_locked(
2150                    &set.items,
2151                    &mut row,
2152                    writer,
2153                    prop_manager,
2154                    params,
2155                    ctx,
2156                    tx_l0_override,
2157                    prefetched.as_deref().unwrap_or(&empty_prefetch),
2158                )
2159                .await?;
2160            }
2161            Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2162            out.push(row);
2163        } else {
2164            // Apply ON MATCH SET to every matched node (multi-match semantics),
2165            // binding the node variable as a Map with _vid/_labels/props so
2166            // RETURN and downstream operators resolve it as they would for the
2167            // general MATCH and CREATE paths.
2168            for vid in matches {
2169                let mut m = row.clone();
2170                if let Some(var) = &node.variable {
2171                    // Minimal binding so ON MATCH SET resolves the node by _vid.
2172                    m.insert(
2173                        var.clone(),
2174                        Self::build_node_map(vid, label, HashMap::new()),
2175                    );
2176                }
2177                if let Some(set) = on_match {
2178                    self.execute_set_items_locked(
2179                        &set.items,
2180                        &mut m,
2181                        writer,
2182                        prop_manager,
2183                        params,
2184                        ctx,
2185                        tx_l0_override,
2186                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2187                    )
2188                    .await?;
2189                }
2190                if let Some(var) = &node.variable {
2191                    // Rebind with full, post-SET properties for RETURN
2192                    // fidelity. The SET above flushed the full row to L0, so a
2193                    // prefetch hit (base + L0 layering) reproduces exactly
2194                    // what a fresh storage read would return.
2195                    let props = read_vertex_props_with_prefetch(
2196                        vid,
2197                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2198                        prop_manager,
2199                        ctx,
2200                    )
2201                    .await?;
2202                    m.insert(var.clone(), Self::build_node_map(vid, label, props));
2203                }
2204                Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2205                out.push(m);
2206            }
2207        }
2208        Ok(out)
2209    }
2210
2211    #[expect(clippy::too_many_arguments)]
2212    pub(crate) async fn execute_merge(
2213        &self,
2214        rows: Vec<HashMap<String, Value>>,
2215        pattern: &Pattern,
2216        on_match: Option<&SetClause>,
2217        on_create: Option<&SetClause>,
2218        prop_manager: &PropertyManager,
2219        params: &HashMap<String, Value>,
2220        ctx: Option<&QueryContext>,
2221        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2222    ) -> Result<Vec<HashMap<String, Value>>> {
2223        let writer_lock = self
2224            .writer
2225            .as_ref()
2226            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2227
2228        // Prepare pattern for path variable binding: assign temp edge variable
2229        // names to unnamed relationships in paths that have path variables.
2230        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2231
2232        // Issue #69: a single-node, single-label MERGE takes the fast path,
2233        // skipping the per-row query planning that made batched MERGE no faster
2234        // than a per-entity loop. Indexed keys get an index point-lookup;
2235        // un-indexed keys still skip planning (the lookup is a filtered scan).
2236        // The shape is the same for every row, so it is detected once.
2237        let fastpath = self.merge_single_node_fastpath(pattern);
2238
2239        // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2240        // fast path then resolves L0/intra-batch matches with an O(1) lookup
2241        // instead of re-walking L0 for every row. `key_names` is the sorted
2242        // static key set, matching `merge_key_tuple`.
2243        let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2244        // Per-row pre-evaluated fast-path keys (None = that row falls back to
2245        // the general path), and the per-statement persisted prefetch over the
2246        // deduped key tuples — ONE chunked scan instead of one scan per row.
2247        // Key expressions only see the row's own bindings + params, so
2248        // evaluating them ahead of any creates cannot observe earlier rows.
2249        let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2250        let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2251        // Statement-level property prefetch for the fast path (review perf
2252        // residual): every persisted match's full row is batch-read ONCE, so
2253        // the per-row ON MATCH SET read and the post-SET rebind resolve as
2254        // prefetch-base + L0 layering instead of one storage scan each.
2255        // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2256        // skips CRDT normalization).
2257        let mut merge_prefetch: Option<Prefetch> = None;
2258        if let Some((node, label)) = &fastpath {
2259            let mut key_names: Vec<String> = match &node.properties {
2260                Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2261                _ => Vec::new(),
2262            };
2263            key_names.sort();
2264            fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2265
2266            row_fast.reserve(rows.len());
2267            for row in &rows {
2268                let mut key_props: HashMap<String, Value> = HashMap::new();
2269                if let Some(props_expr) = &node.properties
2270                    && let Value::Map(map) = self
2271                        .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2272                        .await?
2273                {
2274                    key_props = map;
2275                }
2276                // Only rows whose every key value is a scalar the persisted
2277                // scan can express take the fast path (same gate as before,
2278                // via the filter builder).
2279                if Self::merge_key_filter(&key_props).is_some() {
2280                    let tuple = Self::merge_key_tuple(&key_props);
2281                    row_fast.push(Some((key_props, tuple)));
2282                } else {
2283                    row_fast.push(None);
2284                }
2285            }
2286            let unique_keys: HashSet<MergeKey> = row_fast
2287                .iter()
2288                .flatten()
2289                .map(|(_, tuple)| tuple.clone())
2290                .collect();
2291            let (persisted, schemaless_props) = self
2292                .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2293                .await?;
2294            fast_persisted = persisted;
2295            if self.merge_label_prefetch_safe(label) {
2296                let mut pf = Prefetch::default();
2297                if !schemaless_props.is_empty() {
2298                    // The schemaless lookup already decoded each matched vid's
2299                    // full property map — zero extra scans.
2300                    pf.vertex.extend(schemaless_props);
2301                } else {
2302                    let vids: Vec<Vid> = fast_persisted
2303                        .values()
2304                        .flatten()
2305                        .copied()
2306                        .collect::<HashSet<Vid>>()
2307                        .into_iter()
2308                        .collect();
2309                    if !vids.is_empty()
2310                        && let Ok(batch_props) = prop_manager
2311                            .get_batch_vertex_props_for_label(&vids, label, ctx)
2312                            .await
2313                    {
2314                        // One `_vid IN (…)` scan for every matched row's base.
2315                        // On Err the map stays empty — every read falls back to
2316                        // the per-row path (fail-open, same posture as
2317                        // prefetch_set_targets).
2318                        pf.vertex.extend(batch_props);
2319                    }
2320                }
2321                merge_prefetch = Some(pf);
2322            }
2323        }
2324
2325        let mut results = Vec::new();
2326        for (idx, mut row) in rows.into_iter().enumerate() {
2327            // Rows with a pre-evaluated scalar key take the fast path; rows
2328            // with a non-scalar key fall through to the general path below.
2329            if let Some((node, label)) = &fastpath
2330                && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2331            {
2332                let writer: &uni_store::Writer = writer_lock.as_ref();
2333                let row_out = self
2334                    .execute_merge_row_indexed(
2335                        label,
2336                        node,
2337                        &path_pattern,
2338                        &temp_vars,
2339                        row,
2340                        key_props,
2341                        &fast_persisted,
2342                        key_tuple,
2343                        &mut fast_existing,
2344                        on_match,
2345                        on_create,
2346                        prop_manager,
2347                        params,
2348                        ctx,
2349                        tx_l0_override,
2350                        writer,
2351                        merge_prefetch.as_mut(),
2352                    )
2353                    .await?;
2354                results.extend(row_out);
2355                continue;
2356            }
2357
2358            // General execution: match-or-create per row. (The index fast path
2359            // above already handles single-node, single-label, scalar-indexed
2360            // MERGE — including unique-constrained labels, whose keys are
2361            // indexed — so there is no separate constraint-only fast path.)
2362            let matches = self
2363                .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2364                .await?;
2365            let writer: &uni_store::Writer = writer_lock.as_ref();
2366
2367            let result: Result<Vec<HashMap<String, Value>>> = async {
2368                let mut batch = Vec::new();
2369                if !matches.is_empty() {
2370                    for mut m in matches {
2371                        if let Some(set) = on_match {
2372                            self.execute_set_items_locked(
2373                                &set.items,
2374                                &mut m,
2375                                writer,
2376                                prop_manager,
2377                                params,
2378                                ctx,
2379                                tx_l0_override,
2380                                &Prefetch::default(),
2381                            )
2382                            .await?;
2383                        }
2384                        Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2385                        batch.push(m);
2386                    }
2387                } else {
2388                    // Fold ON CREATE SET into seed props so a NOT-NULL property
2389                    // set only by ON CREATE SET passes create-time validation
2390                    // (RC4); the post-create SET below settles the final values.
2391                    let seed_props = self
2392                        .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2393                        .await?;
2394                    self.execute_create_pattern(
2395                        &path_pattern,
2396                        &mut row,
2397                        writer,
2398                        prop_manager,
2399                        params,
2400                        ctx,
2401                        tx_l0_override,
2402                        Some(&seed_props),
2403                    )
2404                    .await?;
2405                    if let Some(set) = on_create {
2406                        self.execute_set_items_locked(
2407                            &set.items,
2408                            &mut row,
2409                            writer,
2410                            prop_manager,
2411                            params,
2412                            ctx,
2413                            tx_l0_override,
2414                            &Prefetch::default(),
2415                        )
2416                        .await?;
2417                    }
2418                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2419                    batch.push(row);
2420                }
2421                Ok(batch)
2422            }
2423            .await;
2424
2425            results.extend(result?);
2426        }
2427        Ok(results)
2428    }
2429
2430    /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2431    ///
2432    /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2433    /// only by `ON CREATE SET` is present when the MERGE node is created and
2434    /// passes constraint validation (RC4). The right-hand side is evaluated
2435    /// against the current `row`.
2436    ///
2437    /// Items whose right-hand side references the target variable (e.g.
2438    /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2439    /// let the post-create SET read the seeded value and apply the assignment
2440    /// twice. Such items run only post-create, exactly once (unchanged behavior).
2441    ///
2442    /// # Errors
2443    /// Returns an error if evaluating an assignment's right-hand side fails.
2444    pub(crate) async fn on_create_seed_props(
2445        &self,
2446        on_create: Option<&SetClause>,
2447        row: &HashMap<String, Value>,
2448        prop_manager: &PropertyManager,
2449        params: &HashMap<String, Value>,
2450        ctx: Option<&QueryContext>,
2451    ) -> Result<HashMap<String, HashMap<String, Value>>> {
2452        let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2453        let Some(set) = on_create else {
2454            return Ok(seed);
2455        };
2456        for item in &set.items {
2457            if let SetItem::Property { expr, value } = item
2458                && let Expr::Property(var_expr, prop_name) = expr
2459                && let Expr::Variable(var_name) = &**var_expr
2460                // Skip self-referential RHS so the post-create SET (which also
2461                // runs) applies it exactly once rather than reading the seed.
2462                && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2463                    value, var_name,
2464                )
2465            {
2466                let val = self
2467                    .evaluate_expr(value, row, prop_manager, params, ctx)
2468                    .await?;
2469                seed.entry(var_name.clone())
2470                    .or_default()
2471                    .insert(prop_name.clone(), val);
2472            }
2473        }
2474        Ok(seed)
2475    }
2476
2477    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2478    #[expect(clippy::too_many_arguments)]
2479    pub(crate) async fn execute_create_pattern(
2480        &self,
2481        pattern: &Pattern,
2482        row: &mut HashMap<String, Value>,
2483        writer: &Writer,
2484        prop_manager: &PropertyManager,
2485        params: &HashMap<String, Value>,
2486        ctx: Option<&QueryContext>,
2487        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2488        // Per-variable properties to gap-fill into newly-created nodes before
2489        // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2490        // NOT-NULL property supplied only by ON CREATE SET passes create-time
2491        // validation (RC4). `None` for plain CREATE.
2492        seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2493    ) -> Result<()> {
2494        for path in &pattern.paths {
2495            let mut prev_vid: Option<Vid> = None;
2496            // (rel_var, type_id, type_name, props_expr, direction)
2497            type PendingRel = (String, u32, String, Option<Expr>, Direction);
2498            let mut rel_pending: Option<PendingRel> = None;
2499
2500            for element in &path.elements {
2501                match element {
2502                    PatternElement::Node(n) => {
2503                        let mut vid = None;
2504
2505                        // Check if node variable already bound in row
2506                        if let Some(var) = &n.variable
2507                            && let Some(val) = row.get(var)
2508                            && let Ok(existing_vid) = Self::vid_from_value(val)
2509                        {
2510                            vid = Some(existing_vid);
2511                        }
2512
2513                        // If not bound, create it
2514                        if vid.is_none() {
2515                            let mut props = HashMap::new();
2516                            if let Some(props_expr) = &n.properties {
2517                                let props_val = self
2518                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2519                                    .await?;
2520                                if let Value::Map(map) = props_val {
2521                                    for (k, v) in map {
2522                                        props.insert(k, v);
2523                                    }
2524                                } else {
2525                                    return Err(anyhow!("Properties must evaluate to a map"));
2526                                }
2527                            }
2528
2529                            // MERGE ON CREATE SET: gap-fill properties supplied
2530                            // only by ON CREATE SET so a NOT-NULL property absent
2531                            // from the merge key passes create-time validation
2532                            // (RC4). `or_insert` keeps the merge-key/pattern props
2533                            // authoritative; the post-create SET re-applies the
2534                            // real values, so the final state is unchanged.
2535                            if let Some(seed) = seed_props
2536                                && let Some(var) = &n.variable
2537                                && let Some(var_seed) = seed.get(var)
2538                            {
2539                                for (k, v) in var_seed {
2540                                    props.entry(k.clone()).or_insert_with(|| v.clone());
2541                                }
2542                            }
2543
2544                            let schema = self.storage.schema_manager().schema();
2545
2546                            // Strict schema: reject undeclared labels.
2547                            if self.config.strict_schema {
2548                                for label_name in &n.labels {
2549                                    if schema.get_label_case_insensitive(label_name).is_none() {
2550                                        return Err(anyhow!(
2551                                            "Label '{}' is not defined in the schema \
2552                                             (strict_schema is enabled). \
2553                                             Declare it with db.schema().label(...).apply() first.",
2554                                            label_name
2555                                        ));
2556                                    }
2557                                }
2558                            }
2559
2560                            // VID generation is label-independent. Pull from the
2561                            // per-tx reservoir if set (amortizes the global
2562                            // IdAllocator mutex), else fall back to the direct
2563                            // per-VID path.
2564                            let new_vid = match &self.id_reservoir {
2565                                Some(r) => r.next_vid().await?,
2566                                None => writer.next_vid().await?,
2567                            };
2568
2569                            // Enrich with generated columns only for known labels
2570                            for label_name in &n.labels {
2571                                if schema.get_label_case_insensitive(label_name).is_some() {
2572                                    self.enrich_properties_with_generated_columns(
2573                                        label_name,
2574                                        &mut props,
2575                                        prop_manager,
2576                                        params,
2577                                        ctx,
2578                                    )
2579                                    .await?;
2580                                }
2581                            }
2582
2583                            // Validate/coerce against declared types AFTER enrichment, so
2584                            // a type mismatch is rejected here rather than silently nulled
2585                            // (and the row dropped) at flush — issue #68.
2586                            let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2587
2588                            // Insert vertex and get back final properties (includes auto-generated embeddings)
2589                            let final_props = writer
2590                                .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2591                                .await?;
2592
2593                            // Build node object with final properties (includes embeddings)
2594                            if let Some(var) = &n.variable {
2595                                let mut obj = HashMap::new();
2596                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2597                                let labels_list: Vec<Value> =
2598                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
2599                                obj.insert("_labels".to_string(), Value::List(labels_list));
2600                                for (k, v) in &final_props {
2601                                    obj.insert(k.clone(), v.clone());
2602                                }
2603                                // Store node as a Map with _vid, matching MATCH behavior
2604                                row.insert(var.clone(), Value::Map(obj));
2605                            }
2606                            vid = Some(new_vid);
2607                        }
2608
2609                        let current_vid = vid.unwrap();
2610
2611                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2612                            rel_pending.take()
2613                            && let Some(src) = prev_vid
2614                        {
2615                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2616
2617                            if !is_rel_bound {
2618                                let mut rel_props = HashMap::new();
2619                                if let Some(expr) = rel_props_expr {
2620                                    let val = self
2621                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
2622                                        .await?;
2623                                    if let Value::Map(map) = val {
2624                                        rel_props.extend(map);
2625                                    }
2626                                }
2627                                // Validate/coerce edge properties against the declared
2628                                // edge-type schema before storing — issue #68.
2629                                let edge_schema = self.storage.schema_manager().schema();
2630                                let rel_props = Self::coerce_and_validate_props(
2631                                    rel_props,
2632                                    &edge_schema,
2633                                    std::slice::from_ref(&type_name),
2634                                )?;
2635                                let eid = match &self.id_reservoir {
2636                                    Some(r) => r.next_eid().await?,
2637                                    None => writer.next_eid(type_id).await?,
2638                                };
2639
2640                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2641                                let (edge_src, edge_dst) = match dir {
2642                                    Direction::Incoming => (current_vid, src),
2643                                    _ => (src, current_vid),
2644                                };
2645
2646                                let store_props = !rel_var.is_empty();
2647                                let user_props = if store_props {
2648                                    rel_props.clone()
2649                                } else {
2650                                    HashMap::new()
2651                                };
2652
2653                                writer
2654                                    .insert_edge(
2655                                        edge_src,
2656                                        edge_dst,
2657                                        type_id,
2658                                        eid,
2659                                        rel_props,
2660                                        Some(type_name.clone()),
2661                                        tx_l0,
2662                                    )
2663                                    .await?;
2664
2665                                // Edge type name is now stored by insert_edge
2666
2667                                if store_props {
2668                                    let mut edge_map = HashMap::new();
2669                                    edge_map.insert(
2670                                        "_eid".to_string(),
2671                                        Value::Int(eid.as_u64() as i64),
2672                                    );
2673                                    edge_map.insert(
2674                                        "_src".to_string(),
2675                                        Value::Int(edge_src.as_u64() as i64),
2676                                    );
2677                                    edge_map.insert(
2678                                        "_dst".to_string(),
2679                                        Value::Int(edge_dst.as_u64() as i64),
2680                                    );
2681                                    edge_map
2682                                        .insert("_type".to_string(), Value::Int(type_id as i64));
2683                                    // Include user properties so downstream RETURN sees them
2684                                    for (k, v) in user_props {
2685                                        edge_map.insert(k, v);
2686                                    }
2687                                    row.insert(rel_var, Value::Map(edge_map));
2688                                }
2689                            }
2690                        }
2691                        prev_vid = Some(current_vid);
2692                    }
2693                    PatternElement::Relationship(r) => {
2694                        if r.types.len() != 1 {
2695                            return Err(anyhow!(
2696                                "CREATE relationship must specify exactly one type"
2697                            ));
2698                        }
2699                        let type_name = &r.types[0];
2700                        let type_id = if self.config.strict_schema {
2701                            let schema = self.storage.schema_manager().schema();
2702                            schema
2703                                .edge_type_id_by_name_case_insensitive(type_name)
2704                                .ok_or_else(|| {
2705                                    anyhow!(
2706                                        "Edge type '{}' is not defined in the schema \
2707                                         (strict_schema is enabled). \
2708                                         Declare it with db.schema().edge_type(...).apply() first.",
2709                                        type_name
2710                                    )
2711                                })?
2712                        } else {
2713                            // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2714                            self.storage
2715                                .schema_manager()
2716                                .get_or_assign_edge_type_id(type_name)
2717                        };
2718
2719                        rel_pending = Some((
2720                            r.variable.clone().unwrap_or_default(),
2721                            type_id,
2722                            type_name.clone(),
2723                            r.properties.clone(),
2724                            r.direction.clone(),
2725                        ));
2726                    }
2727                    PatternElement::Parenthesized { .. } => {
2728                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2729                    }
2730                }
2731            }
2732        }
2733        Ok(())
2734    }
2735
2736    /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2737    ///
2738    /// These are never valid OpenCypher property values regardless of the declared column
2739    /// type. A `CypherValue` column is the sole exception and is handled by the caller
2740    /// before this is reached.
2741    ///
2742    /// # Errors
2743    /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2744    fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2745        match val {
2746            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2747                anyhow::bail!(
2748                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2749                    prop_name
2750                );
2751            }
2752            Value::List(items) => {
2753                for item in items {
2754                    if matches!(
2755                        item,
2756                        Value::Map(_)
2757                            | Value::Node(_)
2758                            | Value::Edge(_)
2759                            | Value::Path(_)
2760                            | Value::List(_)
2761                    ) {
2762                        anyhow::bail!(
2763                            "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2764                            prop_name
2765                        );
2766                    }
2767                }
2768            }
2769            _ => {}
2770        }
2771        Ok(())
2772    }
2773
2774    /// Validates and coerces `val` against the declared schema type for `prop_name`.
2775    ///
2776    /// Returns the value to actually persist. Beyond the structural checks in
2777    /// [`Self::validate_structural_property_value`], this compares the value against the
2778    /// column's declared `DataType` and:
2779    ///
2780    /// - returns it unchanged when directly storable (including the intentional
2781    ///   `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
2782    /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
2783    ///   column into the proper `Temporal` value, using the same parser as the Cypher
2784    ///   `date()`/`time()`/`datetime()`/`duration()` constructors;
2785    /// - otherwise returns an error, so a type mismatch is surfaced at the call site
2786    ///   rather than silently nulled — and the row dropped at flush. See issue #68.
2787    ///
2788    /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
2789    /// behavior.
2790    ///
2791    /// # Errors
2792    /// Returns an error if the value's type is incompatible with the declared column type,
2793    /// or if a string destined for a temporal column is not a valid temporal literal.
2794    fn coerce_and_validate_property_value(
2795        prop_name: &str,
2796        val: Value,
2797        schema: &uni_common::core::schema::Schema,
2798        labels: &[String],
2799    ) -> Result<Value> {
2800        use uni_common::core::schema::DataType;
2801
2802        // Resolve the declared type from the first label that declares this property.
2803        let declared = labels.iter().find_map(|label| {
2804            schema
2805                .properties
2806                .get(label)
2807                .and_then(|props| props.get(prop_name))
2808                .map(|meta| &meta.r#type)
2809        });
2810
2811        // CypherValue columns accept any value (including maps) — skip all checks.
2812        if matches!(declared, Some(DataType::CypherValue)) {
2813            return Ok(val);
2814        }
2815
2816        let Some(dt) = declared else {
2817            // Schemaless property: reject structural values (maps/nodes/edges/paths and
2818            // lists containing them), otherwise store as-is.
2819            Self::validate_structural_property_value(prop_name, &val)?;
2820            return Ok(val);
2821        };
2822
2823        // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
2824        // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
2825        // `Vector`) receiving their matching value, and `Null` (always accepted).
2826        if dt.accepts(&val) {
2827            return Ok(val);
2828        }
2829
2830        // Known-safe coercion: a string into a temporal column is parsed as if it had
2831        // been wrapped in the matching Cypher temporal constructor.
2832        if matches!(val, Value::String(_)) {
2833            let ctor = match dt {
2834                DataType::DateTime => Some("DATETIME"),
2835                DataType::Date => Some("DATE"),
2836                DataType::Time => Some("TIME"),
2837                DataType::Duration => Some("DURATION"),
2838                _ => None,
2839            };
2840            if let Some(name) = ctor {
2841                return uni_query_functions::datetime::eval_datetime_function(
2842                    name,
2843                    std::slice::from_ref(&val),
2844                )
2845                .map_err(|e| {
2846                    anyhow!(
2847                        "TypeError: property '{}' is declared {:?} but the string value could \
2848                         not be parsed as a {} literal: {}",
2849                        prop_name,
2850                        dt,
2851                        name,
2852                        e
2853                    )
2854                });
2855            }
2856        }
2857
2858        // Not storable and not coercible. Prefer the structural message when the value
2859        // is itself structural (e.g. a map into a scalar column), preserving prior
2860        // behavior; otherwise report the scalar type mismatch.
2861        Self::validate_structural_property_value(prop_name, &val)?;
2862        anyhow::bail!(
2863            "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
2864            prop_name,
2865            dt,
2866            value_type_name(&val)
2867        );
2868    }
2869
2870    /// Coerces and validates every property in `props` against the declared types for `labels`.
2871    ///
2872    /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
2873    /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
2874    /// before handing properties to the writer, so a type mismatch is rejected up front rather
2875    /// than silently nulled — and the row dropped — at flush (issue #68).
2876    ///
2877    /// # Errors
2878    /// Returns an error on the first property whose value is incompatible with its declared type.
2879    fn coerce_and_validate_props(
2880        props: HashMap<String, Value>,
2881        schema: &uni_common::core::schema::Schema,
2882        labels: &[String],
2883    ) -> Result<HashMap<String, Value>> {
2884        let mut out = HashMap::with_capacity(props.len());
2885        for (k, v) in props {
2886            let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
2887            out.insert(k, cv);
2888        }
2889        Ok(out)
2890    }
2891
2892    #[expect(clippy::too_many_arguments)]
2893    pub(crate) async fn execute_set_items_locked(
2894        &self,
2895        items: &[SetItem],
2896        row: &mut HashMap<String, Value>,
2897        writer: &Writer,
2898        prop_manager: &PropertyManager,
2899        params: &HashMap<String, Value>,
2900        ctx: Option<&QueryContext>,
2901        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2902        prefetched: &Prefetch,
2903    ) -> Result<()> {
2904        // Coalesce SetItem::Property items by target so we do ONE read + ONE
2905        // write per (variable, target) instead of one read-modify-write cycle
2906        // per item. For an UPDATE that sets N properties on the same vertex
2907        // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
2908        // n.confidence = ...`), this collapses N redundant
2909        // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
2910        // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
2911        // the measurement, and the plan in
2912        // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
2913        // for the rationale.
2914        //
2915        // RHS evaluation order is preserved: we evaluate each RHS inline and
2916        // update the row binding immediately, so a later SetItem on the same
2917        // variable that reads `n.<earlier-prop>` sees the new value.
2918        //
2919        // Non-Property variants (Labels, Variable, VariablePlus) are less
2920        // common and have lower payoff; before processing one, we flush any
2921        // pending updates for the same variable so it sees the latest L0
2922        // state and ordering semantics are preserved.
2923        let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
2924        let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
2925
2926        for item in items {
2927            match item {
2928                SetItem::Property { expr, value } => {
2929                    if let Expr::Property(var_expr, prop_name) = expr
2930                        && let Expr::Variable(var_name) = &**var_expr
2931                        && let Some(node_val) = row.get(var_name)
2932                    {
2933                        if let Ok(vid) = Self::vid_from_value(node_val) {
2934                            reject_if_ephemeral_vid(vid)?;
2935                            let labels =
2936                                Self::extract_labels_from_node(node_val).unwrap_or_default();
2937                            let schema = self.storage.schema_manager().schema().clone();
2938
2939                            // Lazy one-time read. Always read the full row
2940                            // (preserves CRDT merge + constraint validation
2941                            // + scan-side L0 visibility). The
2942                            // partial-lance-writes optimization happens
2943                            // PURELY AT FLUSH TIME via the per-VID
2944                            // `vertex_partial_keys` set tracked in L0 — so
2945                            // L0 holds the full row, scans see the full
2946                            // row, and Lance only receives the touched
2947                            // columns. Generated-column-bearing labels
2948                            // ride the partial path too (Round 12 §C):
2949                            // `enrich_properties_with_generated_columns`
2950                            // runs at flush time over the merged-in-L0
2951                            // full row, and the produced generator keys
2952                            // are appended to `touched` so they land in
2953                            // the MergeInsert source.
2954                            if !pending_v.contains_key(var_name) {
2955                                let storage_cfg = &self.storage.config;
2956                                let partial = storage_cfg.partial_lance_writes;
2957                                let read = read_vertex_props_with_prefetch(
2958                                    vid,
2959                                    prefetched,
2960                                    prop_manager,
2961                                    ctx,
2962                                )
2963                                .await?;
2964                                pending_v.insert(
2965                                    var_name.clone(),
2966                                    PendingVertexSet {
2967                                        vid,
2968                                        labels: labels.clone(),
2969                                        props: read,
2970                                        partial,
2971                                        touched: HashSet::new(),
2972                                    },
2973                                );
2974                            }
2975
2976                            let val = self
2977                                .evaluate_expr(value, row, prop_manager, params, ctx)
2978                                .await?;
2979                            let val = Self::coerce_and_validate_property_value(
2980                                prop_name, val, &schema, &labels,
2981                            )?;
2982
2983                            let pv = pending_v
2984                                .get_mut(var_name)
2985                                .expect("inserted above when absent");
2986                            pv.props.insert(prop_name.clone(), val.clone());
2987                            if pv.partial {
2988                                pv.touched.insert(prop_name.clone());
2989                            }
2990
2991                            // Update the row binding so subsequent RHS sees the new value.
2992                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
2993                                node_map.insert(prop_name.clone(), val);
2994                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
2995                                node.properties.insert(prop_name.clone(), val);
2996                            }
2997                        } else if let Value::Map(map) = node_val
2998                            && map.get("_eid").is_some_and(|v| !v.is_null())
2999                            && map.get("_src").is_some_and(|v| !v.is_null())
3000                            && map.get("_dst").is_some_and(|v| !v.is_null())
3001                            && (map.get("_type").is_some_and(|v| !v.is_null())
3002                                || map.get("_type_name").is_some_and(|v| !v.is_null()))
3003                        {
3004                            let ei = self.extract_edge_identity(map)?;
3005                            reject_if_ephemeral_eid(ei.eid)?;
3006                            let schema = self.storage.schema_manager().schema().clone();
3007                            // Handle _type as either String or Int (Int from CREATE, String
3008                            // from queries). UNWIND on VLP edge lists emits `_type_name`
3009                            // instead of `_type`; accept either.
3010                            let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3011                            let edge_type_name = match type_val {
3012                                Some(Value::String(s)) => s.clone(),
3013                                Some(Value::Int(id)) => schema
3014                                    .edge_type_name_by_id_unified(*id as u32)
3015                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
3016                                _ => String::new(),
3017                            };
3018
3019                            if !pending_e.contains_key(var_name) {
3020                                let initial = read_edge_props_with_prefetch(
3021                                    ei.eid,
3022                                    prefetched,
3023                                    prop_manager,
3024                                    ctx,
3025                                )
3026                                .await?;
3027                                let partial = self.storage.config.partial_lance_writes;
3028                                pending_e.insert(
3029                                    var_name.clone(),
3030                                    PendingEdgeSet {
3031                                        src: ei.src,
3032                                        dst: ei.dst,
3033                                        edge_type_id: ei.edge_type_id,
3034                                        eid: ei.eid,
3035                                        edge_type_name: edge_type_name.clone(),
3036                                        props: initial,
3037                                        partial,
3038                                        touched: HashSet::new(),
3039                                    },
3040                                );
3041                            }
3042
3043                            let val = self
3044                                .evaluate_expr(value, row, prop_manager, params, ctx)
3045                                .await?;
3046                            let val = Self::coerce_and_validate_property_value(
3047                                prop_name,
3048                                val,
3049                                &schema,
3050                                std::slice::from_ref(&edge_type_name),
3051                            )?;
3052
3053                            let pe = pending_e
3054                                .get_mut(var_name)
3055                                .expect("inserted above when absent");
3056                            pe.props.insert(prop_name.clone(), val.clone());
3057                            if pe.partial {
3058                                pe.touched.insert(prop_name.clone());
3059                            }
3060
3061                            // Update the row object so subsequent RHS sees the new value.
3062                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3063                                edge_map.insert(prop_name.clone(), val);
3064                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3065                                edge.properties.insert(prop_name.clone(), val);
3066                            }
3067                        } else if let Value::Edge(edge) = node_val {
3068                            // Handle Value::Edge directly (when traverse returns Edge objects).
3069                            reject_if_ephemeral_eid(edge.eid)?;
3070                            let eid = edge.eid;
3071                            let src = edge.src;
3072                            let dst = edge.dst;
3073                            let edge_type_name = edge.edge_type.clone();
3074                            let etype =
3075                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3076                            let schema = self.storage.schema_manager().schema().clone();
3077
3078                            if !pending_e.contains_key(var_name) {
3079                                let initial = read_edge_props_with_prefetch(
3080                                    eid,
3081                                    prefetched,
3082                                    prop_manager,
3083                                    ctx,
3084                                )
3085                                .await?;
3086                                let partial = self.storage.config.partial_lance_writes;
3087                                pending_e.insert(
3088                                    var_name.clone(),
3089                                    PendingEdgeSet {
3090                                        src,
3091                                        dst,
3092                                        edge_type_id: etype,
3093                                        eid,
3094                                        edge_type_name: edge_type_name.clone(),
3095                                        props: initial,
3096                                        partial,
3097                                        touched: HashSet::new(),
3098                                    },
3099                                );
3100                            }
3101
3102                            let val = self
3103                                .evaluate_expr(value, row, prop_manager, params, ctx)
3104                                .await?;
3105                            let val = Self::coerce_and_validate_property_value(
3106                                prop_name,
3107                                val,
3108                                &schema,
3109                                std::slice::from_ref(&edge_type_name),
3110                            )?;
3111
3112                            let pe = pending_e
3113                                .get_mut(var_name)
3114                                .expect("inserted above when absent");
3115                            pe.props.insert(prop_name.clone(), val.clone());
3116                            if pe.partial {
3117                                pe.touched.insert(prop_name.clone());
3118                            }
3119
3120                            // Update the row object so subsequent RHS sees the new value.
3121                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3122                                edge.properties.insert(prop_name.clone(), val);
3123                            }
3124                        }
3125                    }
3126                }
3127                SetItem::Labels { variable, labels } => {
3128                    // Flush any pending writes for this var so the Labels op
3129                    // sees latest L0 state. Other variables' pending writes
3130                    // can keep waiting (they're independent).
3131                    self.flush_pending_var(
3132                        variable,
3133                        &mut pending_v,
3134                        &mut pending_e,
3135                        writer,
3136                        prop_manager,
3137                        params,
3138                        ctx,
3139                        tx_l0,
3140                        prefetched,
3141                    )
3142                    .await?;
3143
3144                    if let Some(node_val) = row.get(variable)
3145                        && let Ok(vid) = Self::vid_from_value(node_val)
3146                    {
3147                        reject_if_ephemeral_vid(vid)?;
3148                        let registry = self
3149                            .procedure_registry
3150                            .as_ref()
3151                            .and_then(|pr| pr.plugin_registry());
3152                        reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3153
3154                        // Get current labels from node value
3155                        let current_labels =
3156                            Self::extract_labels_from_node(node_val).unwrap_or_default();
3157
3158                        // Determine new labels to add (skip duplicates)
3159                        let labels_to_add: Vec<_> = labels
3160                            .iter()
3161                            .filter(|l| !current_labels.contains(l))
3162                            .cloned()
3163                            .collect();
3164
3165                        if !labels_to_add.is_empty() {
3166                            // Resolve the FULL new label set and write it to the
3167                            // TRANSACTION buffer (so the change is transactional
3168                            // and OCC-conflictable), falling back to the context
3169                            // (main) L0 for non-transactional callers. Replace
3170                            // semantics via `set_vertex_labels`.
3171                            let mut new_labels = current_labels;
3172                            new_labels.extend(labels_to_add);
3173                            if let Some(ctx) = ctx {
3174                                let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3175                                l0.write().set_vertex_labels(vid, &new_labels);
3176                            }
3177
3178                            // Update the node value in the row with the new labels.
3179                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
3180                                let labels_list =
3181                                    new_labels.into_iter().map(Value::String).collect();
3182                                obj.insert("_labels".to_string(), Value::List(labels_list));
3183                            }
3184                        }
3185                    }
3186                }
3187                SetItem::Variable { variable, value }
3188                | SetItem::VariablePlus { variable, value } => {
3189                    // Flush this var's pending writes first so the
3190                    // replace/merge op sees them as latest L0 state.
3191                    self.flush_pending_var(
3192                        variable,
3193                        &mut pending_v,
3194                        &mut pending_e,
3195                        writer,
3196                        prop_manager,
3197                        params,
3198                        ctx,
3199                        tx_l0,
3200                        prefetched,
3201                    )
3202                    .await?;
3203
3204                    let replace = matches!(item, SetItem::Variable { .. });
3205                    let op_str = if replace { "=" } else { "+=" };
3206
3207                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3208                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3209                        continue;
3210                    }
3211                    let rhs = self
3212                        .evaluate_expr(value, row, prop_manager, params, ctx)
3213                        .await?;
3214                    let new_props =
3215                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3216                            anyhow!(
3217                                "SET {} {} expr: right-hand side must evaluate to a map, \
3218                                 node, or relationship",
3219                                variable,
3220                                op_str
3221                            )
3222                        })?;
3223                    self.apply_properties_to_entity(
3224                        variable,
3225                        new_props,
3226                        replace,
3227                        row,
3228                        writer,
3229                        prop_manager,
3230                        params,
3231                        ctx,
3232                        tx_l0,
3233                        prefetched,
3234                    )
3235                    .await?;
3236                }
3237            }
3238        }
3239
3240        // Flush all remaining coalesced writes — one writer call per target.
3241        // Partial entries (no generated columns) call
3242        // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3243        // but the touched-keys hint drives a MergeInsert at flush. Full
3244        // entries continue through the legacy
3245        // `insert_vertex_with_labels` (Append) path with
3246        // generated-column enrichment.
3247        for (_var_name, mut pv) in pending_v {
3248            if pv.partial {
3249                // Round 12 §C: run the generator enrichment over the
3250                // merged-in-L0 full row, then add the produced generator
3251                // keys to `touched` so they ride the MergeInsert source.
3252                // Idempotent — generators always recompute against the
3253                // post-merge property map.
3254                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3255                for label_name in &pv.labels {
3256                    self.enrich_properties_with_generated_columns(
3257                        label_name,
3258                        &mut pv.props,
3259                        prop_manager,
3260                        params,
3261                        ctx,
3262                    )
3263                    .await?;
3264                }
3265                for k in pv.props.keys() {
3266                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3267                        pv.touched.insert(k.clone());
3268                    }
3269                }
3270                writer
3271                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3272                    .await?;
3273            } else {
3274                for label_name in &pv.labels {
3275                    self.enrich_properties_with_generated_columns(
3276                        label_name,
3277                        &mut pv.props,
3278                        prop_manager,
3279                        params,
3280                        ctx,
3281                    )
3282                    .await?;
3283                }
3284                let _ = writer
3285                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3286                    .await?;
3287            }
3288        }
3289        for (_var_name, pe) in pending_e {
3290            if pe.partial {
3291                writer
3292                    .insert_edge_partial_full(
3293                        pe.src,
3294                        pe.dst,
3295                        pe.edge_type_id,
3296                        pe.eid,
3297                        pe.props,
3298                        Some(pe.edge_type_name),
3299                        pe.touched,
3300                        tx_l0,
3301                    )
3302                    .await?;
3303            } else {
3304                writer
3305                    .insert_edge(
3306                        pe.src,
3307                        pe.dst,
3308                        pe.edge_type_id,
3309                        pe.eid,
3310                        pe.props,
3311                        Some(pe.edge_type_name),
3312                        tx_l0,
3313                    )
3314                    .await?;
3315            }
3316        }
3317
3318        Ok(())
3319    }
3320
3321    /// Flush pending SET state for a single variable to the writer.
3322    ///
3323    /// Called from the SET loop when about to process a Labels /
3324    /// Variable / VariablePlus item on `var`, so the subsequent op
3325    /// sees latest L0 state and ordering is preserved.
3326    #[expect(clippy::too_many_arguments)]
3327    async fn flush_pending_var(
3328        &self,
3329        var: &str,
3330        pending_v: &mut HashMap<String, PendingVertexSet>,
3331        pending_e: &mut HashMap<String, PendingEdgeSet>,
3332        writer: &Writer,
3333        prop_manager: &PropertyManager,
3334        _params: &HashMap<String, Value>,
3335        ctx: Option<&QueryContext>,
3336        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3337        _prefetched: &Prefetch,
3338    ) -> Result<()> {
3339        if let Some(mut pv) = pending_v.remove(var) {
3340            if pv.partial {
3341                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3342                for label_name in &pv.labels {
3343                    self.enrich_properties_with_generated_columns(
3344                        label_name,
3345                        &mut pv.props,
3346                        prop_manager,
3347                        _params,
3348                        ctx,
3349                    )
3350                    .await?;
3351                }
3352                for k in pv.props.keys() {
3353                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3354                        pv.touched.insert(k.clone());
3355                    }
3356                }
3357                writer
3358                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3359                    .await?;
3360            } else {
3361                for label_name in &pv.labels {
3362                    self.enrich_properties_with_generated_columns(
3363                        label_name,
3364                        &mut pv.props,
3365                        prop_manager,
3366                        _params,
3367                        ctx,
3368                    )
3369                    .await?;
3370                }
3371                let _ = writer
3372                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3373                    .await?;
3374            }
3375        }
3376        if let Some(pe) = pending_e.remove(var) {
3377            if pe.partial {
3378                writer
3379                    .insert_edge_partial_full(
3380                        pe.src,
3381                        pe.dst,
3382                        pe.edge_type_id,
3383                        pe.eid,
3384                        pe.props,
3385                        Some(pe.edge_type_name),
3386                        pe.touched,
3387                        tx_l0,
3388                    )
3389                    .await?;
3390            } else {
3391                writer
3392                    .insert_edge(
3393                        pe.src,
3394                        pe.dst,
3395                        pe.edge_type_id,
3396                        pe.eid,
3397                        pe.props,
3398                        Some(pe.edge_type_name),
3399                        tx_l0,
3400                    )
3401                    .await?;
3402            }
3403        }
3404        Ok(())
3405    }
3406
3407    /// Execute REMOVE clause items (property removal or label removal).
3408    ///
3409    /// Property removals are batched per variable to avoid stale reads: when
3410    /// multiple properties of the same entity are removed in one REMOVE clause,
3411    /// we read from storage once, null all specified properties, and write back
3412    /// once. This prevents the second removal from reading stale data that
3413    /// doesn't reflect the first removal's L0 write.
3414    #[expect(clippy::too_many_arguments)]
3415    pub(crate) async fn execute_remove_items_locked(
3416        &self,
3417        items: &[RemoveItem],
3418        row: &mut HashMap<String, Value>,
3419        writer: &Writer,
3420        prop_manager: &PropertyManager,
3421        ctx: Option<&QueryContext>,
3422        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3423        prefetched: &Prefetch,
3424    ) -> Result<()> {
3425        // Collect property names to remove, grouped by variable.
3426        // Use Vec<(String, Vec<String>)> to preserve insertion order.
3427        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3428
3429        for item in items {
3430            match item {
3431                RemoveItem::Property(expr) => {
3432                    if let Expr::Property(var_expr, prop_name) = expr
3433                        && let Expr::Variable(var_name) = &**var_expr
3434                    {
3435                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3436                            entry.1.push(prop_name.clone());
3437                        } else {
3438                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3439                        }
3440                    }
3441                }
3442                RemoveItem::Labels { variable, labels } => {
3443                    self.execute_remove_labels(variable, labels, row, ctx)?;
3444                }
3445            }
3446        }
3447
3448        // Execute batched property removals per variable.
3449        for (var_name, prop_names) in &prop_removals {
3450            let Some(node_val) = row.get(var_name) else {
3451                continue;
3452            };
3453
3454            if let Ok(vid) = Self::vid_from_value(node_val) {
3455                // Vertex property removal
3456                let mut props =
3457                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3458
3459                // Only write back if at least one property actually exists
3460                let removed_count = prop_names
3461                    .iter()
3462                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3463                    .count();
3464                let any_exist = removed_count > 0;
3465                if any_exist {
3466                    writer.track_properties_removed(removed_count, tx_l0);
3467                    for prop_name in prop_names {
3468                        props.insert(prop_name.clone(), Value::Null);
3469                    }
3470                }
3471                // Compute effective properties (post-removal) for _all_props
3472                let effective: HashMap<String, Value> = props
3473                    .iter()
3474                    .filter(|(_, v)| !v.is_null())
3475                    .map(|(k, v)| (k.clone(), v.clone()))
3476                    .collect();
3477                if any_exist {
3478                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3479                    let _ = writer
3480                        .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3481                        .await?;
3482                }
3483
3484                // Update the row map: set removed props to Null
3485                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3486                    for prop_name in prop_names {
3487                        node_map.insert(prop_name.clone(), Value::Null);
3488                    }
3489                    // Set _all_props to the complete effective property set
3490                    node_map.insert("_all_props".to_string(), Value::Map(effective));
3491                }
3492            } else if let Value::Map(map) = node_val {
3493                // Edge property removal (map-encoded)
3494                // Check for non-null _eid to skip OPTIONAL MATCH null edges
3495                let mut edge_effective: Option<HashMap<String, Value>> = None;
3496                if map.get("_eid").is_some_and(|v| !v.is_null()) {
3497                    let ei = self.extract_edge_identity(map)?;
3498                    let mut props =
3499                        read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3500                            .await?;
3501
3502                    let removed_count = prop_names
3503                        .iter()
3504                        .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3505                        .count();
3506                    let any_exist = removed_count > 0;
3507                    if any_exist {
3508                        writer.track_properties_removed(removed_count, tx_l0);
3509                        for prop_name in prop_names {
3510                            props.insert(prop_name.to_string(), Value::Null);
3511                        }
3512                    }
3513                    // Compute effective properties (post-removal) for _all_props
3514                    edge_effective = Some(
3515                        props
3516                            .iter()
3517                            .filter(|(_, v)| !v.is_null())
3518                            .map(|(k, v)| (k.clone(), v.clone()))
3519                            .collect(),
3520                    );
3521                    if any_exist {
3522                        let edge_type_name = map
3523                            .get("_type")
3524                            .and_then(|v| v.as_str())
3525                            .map(|s| s.to_string())
3526                            .or_else(|| {
3527                                self.storage
3528                                    .schema_manager()
3529                                    .edge_type_name_by_id_unified(ei.edge_type_id)
3530                            });
3531                        writer
3532                            .insert_edge(
3533                                ei.src,
3534                                ei.dst,
3535                                ei.edge_type_id,
3536                                ei.eid,
3537                                props,
3538                                edge_type_name,
3539                                tx_l0,
3540                            )
3541                            .await?;
3542                    }
3543                }
3544
3545                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3546                    for prop_name in prop_names {
3547                        edge_map.insert(prop_name.clone(), Value::Null);
3548                    }
3549                    if let Some(effective) = edge_effective {
3550                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
3551                    }
3552                }
3553            } else if let Value::Edge(edge) = node_val {
3554                // Edge property removal (Value::Edge)
3555                let eid = edge.eid;
3556                let src = edge.src;
3557                let dst = edge.dst;
3558                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3559
3560                let mut props =
3561                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3562
3563                let removed_count = prop_names
3564                    .iter()
3565                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3566                    .count();
3567                if removed_count > 0 {
3568                    writer.track_properties_removed(removed_count, tx_l0);
3569                    for prop_name in prop_names {
3570                        props.insert(prop_name.to_string(), Value::Null);
3571                    }
3572                    writer
3573                        .insert_edge(
3574                            src,
3575                            dst,
3576                            etype,
3577                            eid,
3578                            props,
3579                            Some(edge.edge_type.clone()),
3580                            tx_l0,
3581                        )
3582                        .await?;
3583                }
3584
3585                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3586                    for prop_name in prop_names {
3587                        edge.properties.insert(prop_name.to_string(), Value::Null);
3588                    }
3589                }
3590            }
3591        }
3592
3593        Ok(())
3594    }
3595
3596    /// Execute label removal.
3597    pub(crate) fn execute_remove_labels(
3598        &self,
3599        variable: &str,
3600        labels: &[String],
3601        row: &mut HashMap<String, Value>,
3602        ctx: Option<&QueryContext>,
3603    ) -> Result<()> {
3604        if let Some(node_val) = row.get(variable)
3605            && let Ok(vid) = Self::vid_from_value(node_val)
3606        {
3607            reject_if_ephemeral_vid(vid)?;
3608            let registry = self
3609                .procedure_registry
3610                .as_ref()
3611                .and_then(|pr| pr.plugin_registry());
3612            reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3613
3614            // Get current labels from node value
3615            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3616
3617            // Determine which labels to actually remove (only those currently present)
3618            let labels_to_remove: Vec<_> = labels
3619                .iter()
3620                .filter(|l| current_labels.contains(l))
3621                .collect();
3622
3623            if !labels_to_remove.is_empty() {
3624                // Resolve the FULL remaining label set and write it to the
3625                // TRANSACTION buffer (transactional + OCC-conflictable), falling
3626                // back to the context (main) L0 for non-transactional callers.
3627                let remaining_labels: Vec<String> = current_labels
3628                    .iter()
3629                    .filter(|l| !labels_to_remove.contains(l))
3630                    .cloned()
3631                    .collect();
3632                if let Some(ctx) = ctx {
3633                    let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3634                    l0.write().set_vertex_labels(vid, &remaining_labels);
3635                }
3636
3637                // Update the node value in the row with the remaining labels.
3638                if let Some(Value::Map(obj)) = row.get_mut(variable) {
3639                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3640                    obj.insert("_labels".to_string(), Value::List(labels_list));
3641                }
3642            }
3643        }
3644        Ok(())
3645    }
3646
3647    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3648    /// by looking up the type from the L0 buffer's edge endpoints.
3649    fn resolve_edge_type_id_for_edge(
3650        &self,
3651        edge: &crate::types::Edge,
3652        writer: &Writer,
3653        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3654    ) -> Result<u32> {
3655        if !edge.edge_type.is_empty() {
3656            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3657        }
3658        // Edge type name is empty (e.g., from anonymous MATCH patterns).
3659        // Look up the edge type ID from the L0 buffer's edge endpoints.
3660        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3661            return Ok(etype);
3662        }
3663        Err(anyhow!(
3664            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3665            edge.eid
3666        ))
3667    }
3668
3669    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3670    pub(crate) async fn execute_delete_item_locked(
3671        &self,
3672        val: &Value,
3673        detach: bool,
3674        writer: &Writer,
3675        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3676    ) -> Result<()> {
3677        match val {
3678            Value::Null => {
3679                // DELETE null is a no-op per OpenCypher spec
3680            }
3681            Value::Path(path) => {
3682                // Delete path edges first, then nodes
3683                for edge in &path.edges {
3684                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3685                    writer
3686                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3687                        .await?;
3688                }
3689                for node in &path.nodes {
3690                    self.execute_delete_vertex(
3691                        node.vid,
3692                        detach,
3693                        Some(node.labels.clone()),
3694                        writer,
3695                        tx_l0,
3696                    )
3697                    .await?;
3698                }
3699            }
3700            _ => {
3701                // Try Path reconstruction from Map first (Arrow loses Path type)
3702                if let Ok(path) = Path::try_from(val) {
3703                    for edge in &path.edges {
3704                        let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3705                        writer
3706                            .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3707                            .await?;
3708                    }
3709                    for node in &path.nodes {
3710                        self.execute_delete_vertex(
3711                            node.vid,
3712                            detach,
3713                            Some(node.labels.clone()),
3714                            writer,
3715                            tx_l0,
3716                        )
3717                        .await?;
3718                    }
3719                } else if let Ok(vid) = Self::vid_from_value(val) {
3720                    let labels = Self::extract_labels_from_node(val);
3721                    self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3722                        .await?;
3723                } else if let Value::Map(map) = val {
3724                    self.execute_delete_edge_from_map(map, writer, tx_l0)
3725                        .await?;
3726                } else if let Value::Edge(edge) = val {
3727                    reject_if_ephemeral_eid(edge.eid)?;
3728                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3729                    let registry = self
3730                        .procedure_registry
3731                        .as_ref()
3732                        .and_then(|pr| pr.plugin_registry());
3733                    reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3734                    writer
3735                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3736                        .await?;
3737                }
3738            }
3739        }
3740        Ok(())
3741    }
3742
3743    /// Execute vertex deletion with optional detach.
3744    pub(crate) async fn execute_delete_vertex(
3745        &self,
3746        vid: Vid,
3747        detach: bool,
3748        labels: Option<Vec<String>>,
3749        writer: &Writer,
3750        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3751    ) -> Result<()> {
3752        reject_if_ephemeral_vid(vid)?;
3753        if let Some(ls) = labels.as_deref() {
3754            let registry = self
3755                .procedure_registry
3756                .as_ref()
3757                .and_then(|pr| pr.plugin_registry());
3758            reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3759        }
3760        if detach {
3761            self.detach_delete_vertex(vid, writer, tx_l0).await?;
3762        } else {
3763            self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3764        }
3765        writer.delete_vertex(vid, labels, tx_l0).await?;
3766        Ok(())
3767    }
3768
3769    /// Check that a vertex has no edges (required for non-DETACH DELETE).
3770    ///
3771    /// Loads the subgraph from storage, then excludes edges that have been
3772    /// tombstoned in the writer's L0 or the transaction's L0. This ensures
3773    /// edges deleted earlier in the same DELETE clause are properly excluded.
3774    pub(crate) async fn check_vertex_has_no_edges(
3775        &self,
3776        vid: Vid,
3777        writer: &Writer,
3778        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3779    ) -> Result<()> {
3780        let schema = self.storage.schema_manager().schema();
3781        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
3782
3783        // Collect tombstoned edge IDs from both the writer L0 and tx L0.
3784        let mut tombstoned_eids = std::collections::HashSet::new();
3785        {
3786            let writer_l0 = writer.l0_manager.get_current();
3787            let guard = writer_l0.read();
3788            for &eid in guard.tombstones.keys() {
3789                tombstoned_eids.insert(eid);
3790            }
3791        }
3792        if let Some(tx) = tx_l0 {
3793            let guard = tx.read();
3794            for &eid in guard.tombstones.keys() {
3795                tombstoned_eids.insert(eid);
3796            }
3797        }
3798
3799        let out_graph = self
3800            .storage
3801            .load_subgraph_cached(
3802                &[vid],
3803                &edge_type_ids,
3804                1,
3805                uni_store::runtime::Direction::Outgoing,
3806                Some(writer.l0_manager.get_current()),
3807            )
3808            .await?;
3809        let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3810
3811        let in_graph = self
3812            .storage
3813            .load_subgraph_cached(
3814                &[vid],
3815                &edge_type_ids,
3816                1,
3817                uni_store::runtime::Direction::Incoming,
3818                Some(writer.l0_manager.get_current()),
3819            )
3820            .await?;
3821        let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3822
3823        if has_out || has_in {
3824            return Err(anyhow!(
3825                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
3826                vid
3827            ));
3828        }
3829        Ok(())
3830    }
3831
3832    /// Execute edge deletion from a map representation.
3833    pub(crate) async fn execute_delete_edge_from_map(
3834        &self,
3835        map: &HashMap<String, Value>,
3836        writer: &Writer,
3837        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3838    ) -> Result<()> {
3839        // Check for non-null _eid to skip OPTIONAL MATCH null edges
3840        if map.get("_eid").is_some_and(|v| !v.is_null()) {
3841            let ei = self.extract_edge_identity(map)?;
3842            reject_if_ephemeral_eid(ei.eid)?;
3843            let registry = self
3844                .procedure_registry
3845                .as_ref()
3846                .and_then(|pr| pr.plugin_registry());
3847            reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
3848            writer
3849                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
3850                .await?;
3851        }
3852        Ok(())
3853    }
3854
3855    /// Build a scan plan node.
3856    ///
3857    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
3858    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
3859    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
3860    fn make_scan_plan(
3861        label_id: u16,
3862        labels: Vec<String>,
3863        variable: String,
3864        filter: Option<Expr>,
3865    ) -> LogicalPlan {
3866        if label_id > 0 {
3867            LogicalPlan::Scan {
3868                label_id,
3869                labels,
3870                variable,
3871                filter,
3872                optional: false,
3873            }
3874        } else if !labels.is_empty() {
3875            // Schemaless label: use ScanMainByLabels to filter by label name
3876            LogicalPlan::ScanMainByLabels {
3877                labels,
3878                variable,
3879                filter,
3880                optional: false,
3881            }
3882        } else {
3883            LogicalPlan::ScanAll {
3884                variable,
3885                filter,
3886                optional: false,
3887            }
3888        }
3889    }
3890
3891    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
3892    /// already contains prior operators.
3893    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
3894        if matches!(plan, LogicalPlan::Empty) {
3895            scan
3896        } else {
3897            LogicalPlan::CrossJoin {
3898                left: Box::new(plan),
3899                right: Box::new(scan),
3900            }
3901        }
3902    }
3903
3904    /// Resolve MERGE property map expressions against the current row context.
3905    ///
3906    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
3907    /// property expressions that reference bound variables. These need to be
3908    /// evaluated to concrete literal values before being converted to filter
3909    /// expressions by `properties_to_expr()`.
3910    async fn resolve_merge_properties(
3911        &self,
3912        properties: &Option<Expr>,
3913        row: &HashMap<String, Value>,
3914        prop_manager: &PropertyManager,
3915        params: &HashMap<String, Value>,
3916        ctx: Option<&QueryContext>,
3917    ) -> Result<Option<Expr>> {
3918        let entries = match properties {
3919            Some(Expr::Map(entries)) => entries,
3920            other => return Ok(other.clone()),
3921        };
3922        let mut resolved = Vec::new();
3923        for (key, val_expr) in entries {
3924            if matches!(val_expr, Expr::Literal(_)) {
3925                resolved.push((key.clone(), val_expr.clone()));
3926            } else {
3927                let value = self
3928                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
3929                    .await?;
3930                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
3931            }
3932        }
3933        Ok(Some(Expr::Map(resolved)))
3934    }
3935
3936    /// Convert a runtime Value back to an AST literal expression.
3937    fn value_to_literal_expr(value: &Value) -> Expr {
3938        match value {
3939            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
3940            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
3941            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
3942            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
3943            Value::Null => Expr::Literal(CypherLiteral::Null),
3944            Value::List(items) => {
3945                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
3946            }
3947            Value::Map(entries) => Expr::Map(
3948                entries
3949                    .iter()
3950                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
3951                    .collect(),
3952            ),
3953            _ => Expr::Literal(CypherLiteral::Null),
3954        }
3955    }
3956
3957    pub(crate) async fn execute_merge_match(
3958        &self,
3959        pattern: &Pattern,
3960        row: &HashMap<String, Value>,
3961        prop_manager: &PropertyManager,
3962        params: &HashMap<String, Value>,
3963        ctx: Option<&QueryContext>,
3964    ) -> Result<Vec<HashMap<String, Value>>> {
3965        // Construct a LogicalPlan for the MATCH part of MERGE
3966        let planner =
3967            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
3968
3969        // We need to construct a CypherQuery to use the planner's plan() method,
3970        // or we can manually construct the LogicalPlan.
3971        // Manual construction is safer as we don't have to round-trip through AST.
3972
3973        let mut plan = LogicalPlan::Empty;
3974        let mut vars_in_scope = Vec::new();
3975
3976        // Add existing bound variables from row to scope
3977        for key in row.keys() {
3978            vars_in_scope.push(key.clone());
3979        }
3980
3981        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
3982        for path in &pattern.paths {
3983            let elements = &path.elements;
3984            let mut i = 0;
3985            while i < elements.len() {
3986                let part = &elements[i];
3987                match part {
3988                    PatternElement::Node(n) => {
3989                        let variable = n.variable.clone().unwrap_or_default();
3990
3991                        // If variable is already bound in the input row, we filter
3992                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
3993
3994                        if is_bound {
3995                            // If bound, we must Scan this specific VID to start the chain
3996                            // Extract VID from row
3997                            let val = row.get(&variable).unwrap();
3998                            let vid = Self::vid_from_value(val)?;
3999
4000                            // In the new storage model, VIDs don't embed label info.
4001                            // We get label from the node value if available, otherwise use 0 to scan all.
4002                            let extracted_labels =
4003                                Self::extract_labels_from_node(val).unwrap_or_default();
4004                            let label_id = {
4005                                let schema = self.storage.schema_manager().schema();
4006                                extracted_labels
4007                                    .first()
4008                                    .and_then(|l| schema.label_id_by_name(l))
4009                                    .unwrap_or(0)
4010                            };
4011
4012                            let resolved_props = self
4013                                .resolve_merge_properties(
4014                                    &n.properties,
4015                                    row,
4016                                    prop_manager,
4017                                    params,
4018                                    ctx,
4019                                )
4020                                .await?;
4021                            let prop_filter =
4022                                planner.properties_to_expr(&variable, &resolved_props);
4023
4024                            // Create a filter expression for VID: variable._vid = vid
4025                            // But our expression engine handles `Expr::Variable` as column.
4026                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
4027                            // Or we use internal property `_vid`.
4028
4029                            // Note: Scan supports `filter`.
4030                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4031
4032                            let vid_filter = Expr::BinaryOp {
4033                                left: Box::new(Expr::Property(
4034                                    Box::new(Expr::Variable(variable.clone())),
4035                                    "_vid".to_string(),
4036                                )),
4037                                op: BinaryOp::Eq,
4038                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
4039                                    vid.as_u64() as i64,
4040                                ))),
4041                            };
4042
4043                            let combined_filter = if let Some(pf) = prop_filter {
4044                                Some(Expr::BinaryOp {
4045                                    left: Box::new(vid_filter),
4046                                    op: BinaryOp::And,
4047                                    right: Box::new(pf),
4048                                })
4049                            } else {
4050                                Some(vid_filter)
4051                            };
4052
4053                            let scan = Self::make_scan_plan(
4054                                label_id,
4055                                extracted_labels,
4056                                variable.clone(),
4057                                combined_filter,
4058                            );
4059                            plan = Self::attach_scan(plan, scan);
4060                        } else {
4061                            let label_id = if n.labels.is_empty() {
4062                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4063                                0
4064                            } else {
4065                                let label_name = &n.labels[0];
4066                                let schema = self.storage.schema_manager().schema();
4067                                if self.config.strict_schema {
4068                                    schema
4069                                        .get_label_case_insensitive(label_name)
4070                                        .map(|m| m.id)
4071                                        .ok_or_else(|| {
4072                                            anyhow!(
4073                                                "Label '{}' is not defined in the schema \
4074                                                 (strict_schema is enabled). \
4075                                                 Declare it with db.schema().label(...).apply() first.",
4076                                                label_name
4077                                            )
4078                                        })?
4079                                } else {
4080                                    // Fall back to label_id 0 (any/schemaless) when not in schema.
4081                                    schema
4082                                        .get_label_case_insensitive(label_name)
4083                                        .map(|m| m.id)
4084                                        .unwrap_or(0)
4085                                }
4086                            };
4087
4088                            let resolved_props = self
4089                                .resolve_merge_properties(
4090                                    &n.properties,
4091                                    row,
4092                                    prop_manager,
4093                                    params,
4094                                    ctx,
4095                                )
4096                                .await?;
4097                            let prop_filter =
4098                                planner.properties_to_expr(&variable, &resolved_props);
4099                            let scan = Self::make_scan_plan(
4100                                label_id,
4101                                n.labels.names().to_vec(),
4102                                variable.clone(),
4103                                prop_filter,
4104                            );
4105                            plan = Self::attach_scan(plan, scan);
4106
4107                            // Add label filters when:
4108                            // 1. Multiple labels with a known schema label: filter for
4109                            //    additional labels (Scan only scans by the first label).
4110                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4111                            //    nodes, so we must filter to only those with the
4112                            //    specified label(s).
4113                            if !n.labels.is_empty()
4114                                && !variable.is_empty()
4115                                && (label_id == 0 || n.labels.len() > 1)
4116                                && let Some(label_filter) =
4117                                    planner.node_filter_expr(&variable, &n.labels, &None)
4118                            {
4119                                plan = LogicalPlan::Filter {
4120                                    input: Box::new(plan),
4121                                    predicate: label_filter,
4122                                    optional_variables: std::collections::HashSet::new(),
4123                                };
4124                            }
4125
4126                            if !variable.is_empty() {
4127                                vars_in_scope.push(variable.clone());
4128                            }
4129                        }
4130
4131                        // Now look ahead for relationship
4132                        i += 1;
4133                        while i < elements.len() {
4134                            if let PatternElement::Relationship(r) = &elements[i] {
4135                                let target_node_part = &elements[i + 1];
4136                                if let PatternElement::Node(n_target) = target_node_part {
4137                                    let schema = self.storage.schema_manager().schema();
4138                                    let mut edge_type_ids = Vec::new();
4139
4140                                    if r.types.is_empty() {
4141                                        return Err(anyhow!("MERGE edge must have a type"));
4142                                    } else if r.types.len() > 1 {
4143                                        return Err(anyhow!(
4144                                            "MERGE does not support multiple edge types"
4145                                        ));
4146                                    } else {
4147                                        let type_name = &r.types[0];
4148                                        let type_id = if self.config.strict_schema {
4149                                            let s = self.storage.schema_manager().schema();
4150                                            s.edge_type_id_by_name_case_insensitive(type_name)
4151                                                .ok_or_else(|| {
4152                                                    anyhow!(
4153                                                        "Edge type '{}' is not defined in the schema \
4154                                                         (strict_schema is enabled).",
4155                                                        type_name
4156                                                    )
4157                                                })?
4158                                        } else {
4159                                            // Schemaless: assign new ID if not found.
4160                                            self.storage
4161                                                .schema_manager()
4162                                                .get_or_assign_edge_type_id(type_name)
4163                                        };
4164                                        edge_type_ids.push(type_id);
4165                                    }
4166
4167                                    // Resolve target label ID. For schemaless labels (not in the
4168                                    // schema), fall back to 0 which means "any label" in traversal.
4169                                    let target_label_id: u16 = if let Some(lbl) =
4170                                        n_target.labels.first()
4171                                    {
4172                                        schema
4173                                            .get_label_case_insensitive(lbl)
4174                                            .map(|m| m.id)
4175                                            .unwrap_or(0)
4176                                    } else if let Some(var) = &n_target.variable {
4177                                        if let Some(val) = row.get(var) {
4178                                            // In the new storage model, get labels from node value
4179                                            if let Some(labels) =
4180                                                Self::extract_labels_from_node(val)
4181                                            {
4182                                                if let Some(first_label) = labels.first() {
4183                                                    schema
4184                                                        .get_label_case_insensitive(first_label)
4185                                                        .map(|m| m.id)
4186                                                        .unwrap_or(0)
4187                                                } else {
4188                                                    // Bound node with no labels — schemaless, any
4189                                                    0
4190                                                }
4191                                            } else if Self::vid_from_value(val).is_ok() {
4192                                                // VID without label info — schemaless, any
4193                                                0
4194                                            } else {
4195                                                return Err(anyhow!(
4196                                                    "Variable {} is not a node",
4197                                                    var
4198                                                ));
4199                                            }
4200                                        } else {
4201                                            return Err(anyhow!(
4202                                                "MERGE pattern node must have a label or be a bound variable"
4203                                            ));
4204                                        }
4205                                    } else {
4206                                        return Err(anyhow!(
4207                                            "MERGE pattern node must have a label"
4208                                        ));
4209                                    };
4210
4211                                    let target_variable =
4212                                        n_target.variable.clone().unwrap_or_default();
4213                                    let source_variable = match &elements[i - 1] {
4214                                        PatternElement::Node(n) => {
4215                                            n.variable.clone().unwrap_or_default()
4216                                        }
4217                                        _ => String::new(),
4218                                    };
4219
4220                                    let is_variable_length = r.range.is_some();
4221                                    let type_name = &r.types[0];
4222
4223                                    // Use TraverseMainByType for schemaless edge types
4224                                    // (same as MATCH planner) so edge properties are loaded
4225                                    // correctly from storage + L0 via the adjacency map.
4226                                    // Regular Traverse only loads properties via
4227                                    // property_manager which doesn't handle schemaless types.
4228                                    let is_schemaless = edge_type_ids.iter().all(|id| {
4229                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
4230                                    });
4231
4232                                    if is_schemaless {
4233                                        plan = LogicalPlan::TraverseMainByType {
4234                                            type_names: vec![type_name.clone()],
4235                                            input: Box::new(plan),
4236                                            direction: r.direction.clone(),
4237                                            source_variable,
4238                                            target_variable: target_variable.clone(),
4239                                            step_variable: r.variable.clone(),
4240                                            min_hops: r
4241                                                .range
4242                                                .as_ref()
4243                                                .and_then(|r| r.min)
4244                                                .unwrap_or(1)
4245                                                as usize,
4246                                            max_hops: r
4247                                                .range
4248                                                .as_ref()
4249                                                .and_then(|r| r.max)
4250                                                .unwrap_or(1)
4251                                                as usize,
4252                                            optional: false,
4253                                            target_filter: None,
4254                                            path_variable: None,
4255                                            is_variable_length,
4256                                            optional_pattern_vars: std::collections::HashSet::new(),
4257                                            scope_match_variables: std::collections::HashSet::new(),
4258                                            edge_filter_expr: None,
4259                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4260                                        };
4261                                    } else {
4262                                        // Collect edge property names needed for MERGE filter
4263                                        let mut edge_props = std::collections::HashSet::new();
4264                                        if let Some(Expr::Map(entries)) = &r.properties {
4265                                            for (key, _) in entries {
4266                                                edge_props.insert(key.clone());
4267                                            }
4268                                        }
4269                                        plan = LogicalPlan::Traverse {
4270                                            input: Box::new(plan),
4271                                            edge_type_ids: edge_type_ids.clone(),
4272                                            direction: r.direction.clone(),
4273                                            source_variable,
4274                                            target_variable: target_variable.clone(),
4275                                            target_label_id,
4276                                            step_variable: r.variable.clone(),
4277                                            min_hops: r
4278                                                .range
4279                                                .as_ref()
4280                                                .and_then(|r| r.min)
4281                                                .unwrap_or(1)
4282                                                as usize,
4283                                            max_hops: r
4284                                                .range
4285                                                .as_ref()
4286                                                .and_then(|r| r.max)
4287                                                .unwrap_or(1)
4288                                                as usize,
4289                                            optional: false,
4290                                            target_filter: None,
4291                                            path_variable: None,
4292                                            edge_properties: edge_props,
4293                                            is_variable_length,
4294                                            optional_pattern_vars: std::collections::HashSet::new(),
4295                                            scope_match_variables: std::collections::HashSet::new(),
4296                                            edge_filter_expr: None,
4297                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4298                                            qpp_steps: None,
4299                                        };
4300                                    }
4301
4302                                    // Apply property filters for relationship
4303                                    if r.properties.is_some()
4304                                        && let Some(r_var) = &r.variable
4305                                    {
4306                                        let resolved_rel_props = self
4307                                            .resolve_merge_properties(
4308                                                &r.properties,
4309                                                row,
4310                                                prop_manager,
4311                                                params,
4312                                                ctx,
4313                                            )
4314                                            .await?;
4315                                        if let Some(prop_filter) =
4316                                            planner.properties_to_expr(r_var, &resolved_rel_props)
4317                                        {
4318                                            plan = LogicalPlan::Filter {
4319                                                input: Box::new(plan),
4320                                                predicate: prop_filter,
4321                                                optional_variables: std::collections::HashSet::new(
4322                                                ),
4323                                            };
4324                                        }
4325                                    }
4326
4327                                    // Apply property filters for target node if it was new
4328                                    if !target_variable.is_empty() {
4329                                        let resolved_target_props = self
4330                                            .resolve_merge_properties(
4331                                                &n_target.properties,
4332                                                row,
4333                                                prop_manager,
4334                                                params,
4335                                                ctx,
4336                                            )
4337                                            .await?;
4338                                        if let Some(prop_filter) = planner.properties_to_expr(
4339                                            &target_variable,
4340                                            &resolved_target_props,
4341                                        ) {
4342                                            plan = LogicalPlan::Filter {
4343                                                input: Box::new(plan),
4344                                                predicate: prop_filter,
4345                                                optional_variables: std::collections::HashSet::new(
4346                                                ),
4347                                            };
4348                                        }
4349                                        vars_in_scope.push(target_variable.clone());
4350                                    }
4351
4352                                    if let Some(sv) = &r.variable {
4353                                        vars_in_scope.push(sv.clone());
4354                                    }
4355                                    i += 2;
4356                                } else {
4357                                    break;
4358                                }
4359                            } else {
4360                                break;
4361                            }
4362                        }
4363                    }
4364                    _ => return Err(anyhow!("Pattern must start with a node")),
4365                }
4366            }
4367
4368            // Execute the plan to find all matches, then filter against bound variables in `row`.
4369        }
4370
4371        let db_matches = self
4372            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4373            .await?;
4374
4375        // Keep only DB results that are consistent with the input row bindings.
4376        // Skip internal keys (starting with "__") as they are implementation
4377        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4378        // Also skip the empty-string key (""), which is the placeholder variable
4379        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4380        // and must not constrain the current pattern's match.
4381        let final_matches = db_matches
4382            .into_iter()
4383            .filter(|db_match| {
4384                row.iter().all(|(key, val)| {
4385                    if key.is_empty() || key.starts_with("__") {
4386                        return true;
4387                    }
4388                    let Some(db_val) = db_match.get(key) else {
4389                        return true;
4390                    };
4391                    if db_val == val {
4392                        return true;
4393                    }
4394                    // Values differ -- treat as consistent if they represent the same VID
4395                    matches!(
4396                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4397                        (Ok(v1), Ok(v2)) if v1 == v2
4398                    )
4399                })
4400            })
4401            .map(|db_match| {
4402                let mut merged = row.clone();
4403                merged.extend(db_match);
4404                merged
4405            })
4406            .collect();
4407
4408        Ok(final_matches)
4409    }
4410
4411    /// Prepare a MERGE pattern for path variable binding.
4412    ///
4413    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4414    /// unnamed relationships need internal variable names so that `execute_create_pattern`
4415    /// stores the edge data in the row for later path construction.
4416    ///
4417    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4418    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4419        let has_path_vars = pattern
4420            .paths
4421            .iter()
4422            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4423
4424        if !has_path_vars {
4425            return (pattern.clone(), Vec::new());
4426        }
4427
4428        let mut modified = pattern.clone();
4429        let mut temp_vars = Vec::new();
4430
4431        for path in &mut modified.paths {
4432            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4433                continue;
4434            }
4435            for (idx, element) in path.elements.iter_mut().enumerate() {
4436                if let PatternElement::Relationship(r) = element
4437                    && r.variable.as_ref().is_none_or(String::is_empty)
4438                {
4439                    let temp_var = format!("__path_r_{}", idx);
4440                    r.variable = Some(temp_var.clone());
4441                    temp_vars.push(temp_var);
4442                }
4443            }
4444        }
4445
4446        (modified, temp_vars)
4447    }
4448
4449    /// Bind path variables in the result row based on the MERGE pattern.
4450    ///
4451    /// Walks each path in the pattern, collects node/edge values from the row
4452    /// by variable name, and constructs a `Value::Path`.
4453    fn bind_path_variables(
4454        pattern: &Pattern,
4455        row: &mut HashMap<String, Value>,
4456        temp_vars: &[String],
4457    ) {
4458        for path in &pattern.paths {
4459            let Some(path_var) = path.variable.as_ref() else {
4460                continue;
4461            };
4462            if path_var.is_empty() {
4463                continue;
4464            }
4465
4466            let mut nodes = Vec::new();
4467            let mut edges = Vec::new();
4468
4469            for element in &path.elements {
4470                match element {
4471                    PatternElement::Node(n) => {
4472                        if let Some(var) = &n.variable
4473                            && let Some(val) = row.get(var)
4474                            && let Some(node) = Self::value_to_node_for_path(val)
4475                        {
4476                            nodes.push(node);
4477                        }
4478                    }
4479                    PatternElement::Relationship(r) => {
4480                        if let Some(var) = &r.variable
4481                            && let Some(val) = row.get(var)
4482                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4483                        {
4484                            edges.push(edge);
4485                        }
4486                    }
4487                    _ => {}
4488                }
4489            }
4490
4491            if !nodes.is_empty() {
4492                use uni_common::value::Path;
4493                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4494            }
4495        }
4496
4497        // Clean up internal temp variables
4498        for var in temp_vars {
4499            row.remove(var);
4500        }
4501    }
4502
4503    /// Convert a Value (Map or Node) to a Node for path construction.
4504    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4505        match val {
4506            Value::Node(n) => Some(n.clone()),
4507            Value::Map(map) => {
4508                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4509                let labels = if let Some(Value::List(l)) = map.get("_labels") {
4510                    l.iter()
4511                        .filter_map(|v| {
4512                            if let Value::String(s) = v {
4513                                Some(s.clone())
4514                            } else {
4515                                None
4516                            }
4517                        })
4518                        .collect()
4519                } else {
4520                    vec![]
4521                };
4522                let properties: HashMap<String, Value> = map
4523                    .iter()
4524                    .filter(|(k, _)| !k.starts_with('_'))
4525                    .map(|(k, v)| (k.clone(), v.clone()))
4526                    .collect();
4527                Some(uni_common::value::Node {
4528                    vid,
4529                    labels,
4530                    properties,
4531                })
4532            }
4533            _ => None,
4534        }
4535    }
4536
4537    /// Convert a Value (Map or Edge) to an Edge for path construction.
4538    fn value_to_edge_for_path(
4539        val: &Value,
4540        type_names: &[String],
4541    ) -> Option<uni_common::value::Edge> {
4542        match val {
4543            Value::Edge(e) => Some(e.clone()),
4544            Value::Map(map) => {
4545                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4546                let edge_type = map
4547                    .get("_type_name")
4548                    .and_then(|v| {
4549                        if let Value::String(s) = v {
4550                            Some(s.clone())
4551                        } else {
4552                            None
4553                        }
4554                    })
4555                    .or_else(|| type_names.first().cloned())
4556                    .unwrap_or_default();
4557                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4558                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4559                let properties: HashMap<String, Value> = map
4560                    .iter()
4561                    .filter(|(k, _)| !k.starts_with('_'))
4562                    .map(|(k, v)| (k.clone(), v.clone()))
4563                    .collect();
4564                Some(uni_common::value::Edge {
4565                    eid,
4566                    edge_type,
4567                    src,
4568                    dst,
4569                    properties,
4570                })
4571            }
4572            _ => None,
4573        }
4574    }
4575}
4576
4577/// Read a vertex's full property map, preferring `prefetched` over a fresh
4578/// per-row `Backend::scan`.
4579///
4580/// `prefetched` is built once at the top of `apply_mutations` via
4581/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4582/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4583/// same `apply_mutations` invocation (counter increments, same-VID
4584/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4585/// storage state at SET entry. On a miss, fall back to the existing
4586/// per-row path; this preserves correctness for newly created VIDs,
4587/// schemaless rows, multi-label corner cases, and non-Mutation callers
4588/// that pass `&Prefetch::default()`.
4589pub(crate) async fn read_vertex_props_with_prefetch(
4590    vid: Vid,
4591    prefetched: &Prefetch,
4592    prop_manager: &PropertyManager,
4593    ctx: Option<&QueryContext>,
4594) -> Result<uni_common::Properties> {
4595    match prefetched.vertex.get(&vid).cloned() {
4596        Some(mut base) => {
4597            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4598                for (k, v) in l0 {
4599                    base.insert(k, v);
4600                }
4601            }
4602            Ok(base)
4603        }
4604        None => Ok(prop_manager
4605            .get_all_vertex_props_with_ctx(vid, ctx)
4606            .await?
4607            .unwrap_or_default()),
4608    }
4609}
4610
4611/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4612/// in L0 edge props so writes from earlier rows of the same
4613/// `apply_mutations` invocation take precedence. On a miss, fall back to
4614/// the per-EID storage path.
4615pub(crate) async fn read_edge_props_with_prefetch(
4616    eid: Eid,
4617    prefetched: &Prefetch,
4618    prop_manager: &PropertyManager,
4619    ctx: Option<&QueryContext>,
4620) -> Result<uni_common::Properties> {
4621    match prefetched.edge.get(&eid).cloned() {
4622        Some(mut base) => {
4623            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4624                for (k, v) in l0 {
4625                    base.insert(k, v);
4626                }
4627            }
4628            Ok(base)
4629        }
4630        None => Ok(prop_manager
4631            .get_all_edge_props_with_ctx(eid, ctx)
4632            .await?
4633            .unwrap_or_default()),
4634    }
4635}
4636
4637#[cfg(test)]
4638mod tests {
4639    use super::*;
4640
4641    // ── merge_props tests ────────────────────────────────────────────
4642
4643    #[test]
4644    fn test_merge_props_replace_tombstones_missing_keys() {
4645        let current: HashMap<String, Value> = [
4646            ("name".into(), Value::String("Alice".into())),
4647            ("age".into(), Value::Int(30)),
4648        ]
4649        .into();
4650        let incoming: HashMap<String, Value> =
4651            [("name".into(), Value::String("Bob".into()))].into();
4652
4653        let result = Executor::merge_props(current, incoming, true);
4654        assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4655        assert_eq!(
4656            result.get("age"),
4657            Some(&Value::Null),
4658            "Missing keys should be tombstoned in replace mode"
4659        );
4660    }
4661
4662    #[test]
4663    fn test_merge_props_merge_preserves_existing() {
4664        let current: HashMap<String, Value> = [
4665            ("name".into(), Value::String("Alice".into())),
4666            ("age".into(), Value::Int(30)),
4667        ]
4668        .into();
4669        let incoming: HashMap<String, Value> =
4670            [("city".into(), Value::String("NYC".into()))].into();
4671
4672        let result = Executor::merge_props(current, incoming, false);
4673        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4674        assert_eq!(result.get("age"), Some(&Value::Int(30)));
4675        assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4676    }
4677
4678    #[test]
4679    fn test_merge_props_null_incoming_is_tombstone() {
4680        let current: HashMap<String, Value> =
4681            [("name".into(), Value::String("Alice".into()))].into();
4682        let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4683
4684        // Merge mode: null overwrites
4685        let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4686        assert_eq!(result.get("name"), Some(&Value::Null));
4687
4688        // Replace mode: null is tombstone
4689        let result = Executor::merge_props(current, incoming, true);
4690        assert_eq!(result.get("name"), Some(&Value::Null));
4691    }
4692
4693    #[test]
4694    fn test_merge_props_empty_current() {
4695        let current: HashMap<String, Value> = HashMap::new();
4696        let incoming: HashMap<String, Value> =
4697            [("name".into(), Value::String("Alice".into()))].into();
4698
4699        let result = Executor::merge_props(current, incoming, false);
4700        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4701        assert_eq!(result.len(), 1);
4702    }
4703
4704    #[test]
4705    fn test_merge_props_empty_incoming_replace_tombstones_all() {
4706        let current: HashMap<String, Value> = [
4707            ("name".into(), Value::String("Alice".into())),
4708            ("age".into(), Value::Int(30)),
4709        ]
4710        .into();
4711        let incoming: HashMap<String, Value> = HashMap::new();
4712
4713        let result = Executor::merge_props(current, incoming, true);
4714        assert_eq!(result.get("name"), Some(&Value::Null));
4715        assert_eq!(result.get("age"), Some(&Value::Null));
4716    }
4717
4718    // ── extract_labels_from_node tests ───────────────────────────────
4719
4720    #[test]
4721    fn test_extract_labels_from_map() {
4722        let mut map = HashMap::new();
4723        map.insert("_vid".into(), Value::Int(1));
4724        map.insert(
4725            "_labels".into(),
4726            Value::List(vec![
4727                Value::String("Person".into()),
4728                Value::String("Employee".into()),
4729            ]),
4730        );
4731        let val = Value::Map(map);
4732
4733        let labels = Executor::extract_labels_from_node(&val);
4734        assert_eq!(
4735            labels,
4736            Some(vec!["Person".to_string(), "Employee".to_string()])
4737        );
4738    }
4739
4740    #[test]
4741    fn test_extract_labels_from_value_node() {
4742        let node = uni_common::Node {
4743            vid: uni_common::core::id::Vid::from(1u64),
4744            labels: vec!["Person".to_string()],
4745            properties: HashMap::new(),
4746        };
4747        let labels = Executor::extract_labels_from_node(&Value::Node(node));
4748        assert_eq!(labels, Some(vec!["Person".to_string()]));
4749    }
4750
4751    #[test]
4752    fn test_extract_labels_non_node_returns_none() {
4753        assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4754        assert_eq!(
4755            Executor::extract_labels_from_node(&Value::String("hello".into())),
4756            None
4757        );
4758    }
4759
4760    // ── extract_user_properties_from_value tests ─────────────────────
4761
4762    #[test]
4763    fn test_extract_user_props_strips_internal_keys() {
4764        let mut map = HashMap::new();
4765        map.insert("_vid".into(), Value::Int(1));
4766        map.insert(
4767            "_labels".into(),
4768            Value::List(vec![Value::String("Person".into())]),
4769        );
4770        map.insert("name".into(), Value::String("Alice".into()));
4771        map.insert("age".into(), Value::Int(30));
4772
4773        let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4774        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4775        assert_eq!(props.get("age"), Some(&Value::Int(30)));
4776        assert!(!props.contains_key("_vid"));
4777        assert!(!props.contains_key("_labels"));
4778    }
4779
4780    #[test]
4781    fn test_extract_user_props_plain_map_returns_as_is() {
4782        let mut map = HashMap::new();
4783        map.insert("key".into(), Value::String("value".into()));
4784
4785        let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
4786        assert_eq!(props, map);
4787    }
4788
4789    #[test]
4790    fn test_extract_user_props_from_value_node() {
4791        let mut properties = HashMap::new();
4792        properties.insert("name".into(), Value::String("Alice".into()));
4793        let node = uni_common::Node {
4794            vid: uni_common::core::id::Vid::from(1u64),
4795            labels: vec!["Person".to_string()],
4796            properties,
4797        };
4798        let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
4799        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4800    }
4801}