Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17    DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18    SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32    eid: Eid,
33    src: Vid,
34    dst: Vid,
35    edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44    vid: Vid,
45    labels: Vec<String>,
46    /// Full property map (storage union L0 from
47    /// `get_all_vertex_props_with_ctx` plus the touched values applied
48    /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49    /// tells the flush which columns to send to Lance via MergeInsert.
50    props: HashMap<String, Value>,
51    /// `true` when the SET should flush via the partial-column MergeInsert
52    /// path: set when `UniConfig::partial_lance_writes` is on AND the
53    /// label has no generated columns. Generated-column labels still need
54    /// the full-row Append so the regenerated values land.
55    partial: bool,
56    /// Set of property keys touched by this statement. Threaded into L0
57    /// so the flush emits a `MergeInsertBuilder` source with exactly
58    /// these columns. Empty when `partial == false`.
59    touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64    src: Vid,
65    dst: Vid,
66    edge_type_id: u32,
67    eid: Eid,
68    edge_type_name: String,
69    /// `true` when the SET should flush via the partial-column
70    /// MergeInsert path on the per-edge-type delta tables (Round 12
71    /// §A). Set when `UniConfig::partial_lance_writes` is on.
72    partial: bool,
73    /// Property keys touched by this statement. Threaded into L0 so
74    /// the flush emits a `MergeInsertBuilder` source with exactly
75    /// these columns. Empty when `partial == false`.
76    touched: HashSet<String>,
77    props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84    if vid.is_ephemeral() {
85        return Err(anyhow::Error::from(
86            uni_common::UniError::EphemeralWriteAttempt {
87                kind: "node",
88                id: vid.transient_id().unwrap_or(vid.as_u64()),
89            },
90        ));
91    }
92    Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97    match val {
98        Value::Null => "Null",
99        Value::Bool(_) => "Bool",
100        Value::Int(_) => "Int",
101        Value::Float(_) => "Float",
102        Value::String(_) => "String",
103        Value::Bytes(_) => "Bytes",
104        Value::List(_) => "List",
105        Value::Map(_) => "Map",
106        Value::Node(_) => "Node",
107        Value::Edge(_) => "Edge",
108        Value::Path(_) => "Path",
109        Value::Vector(_) => "Vector",
110        Value::Temporal(_) => "Temporal",
111        _ => "value",
112    }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117    if eid.is_ephemeral() {
118        return Err(anyhow::Error::from(
119            uni_common::UniError::EphemeralWriteAttempt {
120                kind: "edge",
121                id: eid.transient_id().unwrap_or(eid.as_u64()),
122            },
123        ));
124    }
125    Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150    labels: &[String],
151    op: &str,
152) -> Result<()> {
153    let Some(registry) = registry else {
154        return Ok(());
155    };
156    for label in labels {
157        if registry.virtual_label_by_name(label).is_some() {
158            return Err(anyhow!(
159                "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160                 labels are read-only; write back via the originating catalog instead"
161            ));
162        }
163    }
164    Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178    edge_type_id: u32,
179    op: &str,
180) -> Result<()> {
181    let Some(registry) = registry else {
182        return Ok(());
183    };
184    if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185        return Err(anyhow!(
186            "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187             types are read-only; write back via the originating catalog instead",
188            entry.name
189        ));
190    }
191    Ok(())
192}
193
194impl Executor {
195    /// Extracts labels from a node value.
196    ///
197    /// Handles both `Value::Map` (with a `_labels` list field) and
198    /// `Value::Node` (with a `labels` vec field).
199    ///
200    /// Returns `None` when the value is not a node or has no labels.
201    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202        match node_val {
203            Value::Map(map) => {
204                // Map-encoded node: look for _labels array
205                if let Some(Value::List(labels_arr)) = map.get("_labels") {
206                    let labels: Vec<String> = labels_arr
207                        .iter()
208                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
209                        .collect();
210                    if !labels.is_empty() {
211                        return Some(labels);
212                    }
213                }
214                None
215            }
216            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217            _ => None,
218        }
219    }
220
221    /// Extracts user-visible properties from a value that represents a node or edge.
222    ///
223    /// Strips internal bookkeeping keys (those prefixed with `_` or named
224    /// `ext_id`) from map-encoded entities and returns only the user-facing
225    /// property key-value pairs.
226    ///
227    /// Returns `None` when `val` is not a map, node, or edge.
228    pub(crate) fn extract_user_properties_from_value(
229        val: &Value,
230    ) -> Option<HashMap<String, Value>> {
231        match val {
232            Value::Map(map) => {
233                // Distinguish entity-encoded maps from plain map literals.
234                // A node map has both `_vid` and `_labels`.
235                // An edge map has `_eid`, `_src`, and `_dst`.
236                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237                let is_edge_map = map.contains_key("_eid")
238                    && map.contains_key("_src")
239                    && map.contains_key("_dst");
240
241                if is_node_map || is_edge_map {
242                    // Filter out internal bookkeeping keys
243                    let user_props: HashMap<String, Value> = map
244                        .iter()
245                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246                        .map(|(k, v)| (k.clone(), v.clone()))
247                        .collect();
248                    // When mutation output omits dotted property columns, user
249                    // properties live inside `_all_props` rather than at the
250                    // top level of the entity map.
251                    if user_props.is_empty()
252                        && let Some(Value::Map(all_props)) = map.get("_all_props")
253                    {
254                        return Some(all_props.clone());
255                    }
256                    Some(user_props)
257                } else {
258                    // Plain map literal — return as-is
259                    Some(map.clone())
260                }
261            }
262            Value::Node(node) => Some(node.properties.clone()),
263            Value::Edge(edge) => Some(edge.properties.clone()),
264            _ => None,
265        }
266    }
267
268    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269    ///
270    /// When `replace` is `true` the entity's property set is replaced: keys absent
271    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272    /// layer removes them.  When `replace` is `false` the map is merged: keys in
273    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275    /// modes.
276    ///
277    /// Labels are never altered — the spec states that `SET n = map` replaces
278    /// properties only.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the entity cannot be found in the storage layer, or
283    /// if the writer fails to persist the updated properties.
284    #[expect(clippy::too_many_arguments)]
285    async fn apply_properties_to_entity(
286        &self,
287        variable: &str,
288        new_props: HashMap<String, Value>,
289        replace: bool,
290        row: &mut HashMap<String, Value>,
291        writer: &Writer,
292        prop_manager: &PropertyManager,
293        params: &HashMap<String, Value>,
294        ctx: Option<&QueryContext>,
295        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296        prefetched: &Prefetch,
297    ) -> Result<()> {
298        // Clone the target so we can hold &row references elsewhere.
299        let target = row.get(variable).cloned();
300
301        // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302        // forms, mirroring the per-property SET path (issue #68).
303        let schema = self.storage.schema_manager().schema();
304
305        match target {
306            Some(Value::Node(ref node)) => {
307                let vid = node.vid;
308                let labels = node.labels.clone();
309                let current =
310                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311                let write_props = Self::merge_props(current, new_props, replace);
312                let mut enriched = write_props.clone();
313                for label_name in &labels {
314                    self.enrich_properties_with_generated_columns(
315                        label_name,
316                        &mut enriched,
317                        prop_manager,
318                        params,
319                        ctx,
320                    )
321                    .await?;
322                }
323                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324                let _ = writer
325                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326                    .await?;
327                // Update the in-memory row binding
328                if let Some(Value::Node(n)) = row.get_mut(variable) {
329                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330                }
331            }
332            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333                let vid = Self::vid_from_value(node_val)?;
334                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335                let current =
336                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337                let write_props = Self::merge_props(current, new_props, replace);
338                let mut enriched = write_props.clone();
339                for label_name in &labels {
340                    self.enrich_properties_with_generated_columns(
341                        label_name,
342                        &mut enriched,
343                        prop_manager,
344                        params,
345                        ctx,
346                    )
347                    .await?;
348                }
349                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350                let _ = writer
351                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352                    .await?;
353                // Update the in-memory map-encoded node binding
354                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355                    // Remove old user property keys, keep internal fields
356                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357                    // Build effective (non-null) properties
358                    let effective: HashMap<String, Value> =
359                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360                    for (k, v) in &effective {
361                        node_map.insert(k.clone(), v.clone());
362                    }
363                    // Replace _all_props to reflect the complete property set
364                    node_map.insert("_all_props".to_string(), Value::Map(effective));
365                }
366            }
367            Some(Value::Edge(ref edge)) => {
368                let eid = edge.eid;
369                let src = edge.src;
370                let dst = edge.dst;
371                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372                let current =
373                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374                let write_props = Self::merge_props(current, new_props, replace);
375                let write_props = Self::coerce_and_validate_props(
376                    write_props,
377                    &schema,
378                    std::slice::from_ref(&edge.edge_type),
379                )?;
380                writer
381                    .insert_edge(
382                        src,
383                        dst,
384                        etype,
385                        eid,
386                        write_props.clone(),
387                        Some(edge.edge_type.clone()),
388                        tx_l0,
389                    )
390                    .await?;
391                // Update the in-memory row binding
392                if let Some(Value::Edge(e)) = row.get_mut(variable) {
393                    e.properties = write_props
394                        .into_iter()
395                        .filter(|(_, v)| !v.is_null())
396                        .collect();
397                }
398            }
399            Some(Value::Map(ref map))
400                if map.contains_key("_eid")
401                    && map.contains_key("_src")
402                    && map.contains_key("_dst") =>
403            {
404                let ei = self.extract_edge_identity(map)?;
405                let current =
406                    read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407                let write_props = Self::merge_props(current, new_props, replace);
408                let edge_type_name = map
409                    .get("_type")
410                    .and_then(|v| v.as_str())
411                    .map(|s| s.to_string())
412                    .or_else(|| {
413                        self.storage
414                            .schema_manager()
415                            .edge_type_name_by_id_unified(ei.edge_type_id)
416                    });
417                let write_props = match &edge_type_name {
418                    Some(name) => Self::coerce_and_validate_props(
419                        write_props,
420                        &schema,
421                        std::slice::from_ref(name),
422                    )?,
423                    None => write_props,
424                };
425                writer
426                    .insert_edge(
427                        ei.src,
428                        ei.dst,
429                        ei.edge_type_id,
430                        ei.eid,
431                        write_props.clone(),
432                        edge_type_name,
433                        tx_l0,
434                    )
435                    .await?;
436                // Update the in-memory map-encoded edge binding
437                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438                    edge_map.retain(|k, _| k.starts_with('_'));
439                    let effective: HashMap<String, Value> = write_props
440                        .into_iter()
441                        .filter(|(_, v)| !v.is_null())
442                        .collect();
443                    for (k, v) in &effective {
444                        edge_map.insert(k.clone(), v.clone());
445                    }
446                    // Replace _all_props to reflect the complete property set
447                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
448                }
449            }
450            _ => {
451                // No matching entity — nothing to do (caller already guarded against Null)
452            }
453        }
454        Ok(())
455    }
456
457    /// Computes the property map to write given current storage state and the
458    /// incoming change map.
459    ///
460    /// When `replace` is `true`, keys present in `current` but absent from
461    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
462    /// `incoming` are always preserved as explicit tombstones.
463    ///
464    /// When `replace` is `false`, `current` is the base and `incoming` is
465    /// merged on top: each key in `incoming` overwrites or tombstones the
466    /// corresponding entry in `current`.
467    fn merge_props(
468        current: HashMap<String, Value>,
469        incoming: HashMap<String, Value>,
470        replace: bool,
471    ) -> HashMap<String, Value> {
472        if replace {
473            // Start from the non-null incoming entries only.
474            let mut result: HashMap<String, Value> = incoming
475                .iter()
476                .filter(|(_, v)| !v.is_null())
477                .map(|(k, v)| (k.clone(), v.clone()))
478                .collect();
479            // Tombstone every current key that is absent from incoming OR explicitly
480            // set to null in incoming (both mean "delete this property").
481            for k in current.keys() {
482                if incoming.get(k).is_none_or(|v| v.is_null()) {
483                    result.insert(k.clone(), Value::Null);
484                }
485            }
486            result
487        } else {
488            // Merge: start from current and apply incoming on top
489            let mut result = current;
490            result.extend(incoming);
491            result
492        }
493    }
494
495    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497        let eid = Eid::from(
498            map.get("_eid")
499                .and_then(|v| v.as_u64())
500                .ok_or_else(|| anyhow!("Invalid _eid"))?,
501        );
502        let src = Vid::from(
503            map.get("_src")
504                .and_then(|v| v.as_u64())
505                .ok_or_else(|| anyhow!("Invalid _src"))?,
506        );
507        let dst = Vid::from(
508            map.get("_dst")
509                .and_then(|v| v.as_u64())
510                .ok_or_else(|| anyhow!("Invalid _dst"))?,
511        );
512        let edge_type_id = self.resolve_edge_type_id(
513            map.get("_type")
514                .or_else(|| map.get("_type_name"))
515                .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516        )?;
517        Ok(EdgeIdentity {
518            eid,
519            src,
520            dst,
521            edge_type_id,
522        })
523    }
524
525    /// Resolve edge type ID from a Value, supporting both Int and String representations.
526    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527    ///
528    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530    /// where the edge type was just created and may not be in the read-only lookup yet.
531    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532        match type_val {
533            Value::Int(i) => Ok(*i as u32),
534            Value::String(name) => {
535                if self.config.strict_schema {
536                    let schema = self.storage.schema_manager().schema();
537                    schema
538                        .edge_type_id_by_name_case_insensitive(name)
539                        .ok_or_else(|| {
540                            anyhow!(
541                                "Edge type '{}' is not defined in the schema \
542                                 (strict_schema is enabled). \
543                                 Declare it with db.schema().edge_type(...).apply() first.",
544                                name
545                            )
546                        })
547                } else {
548                    // Schemaless: assign new ID if not found in schema or registry.
549                    Ok(self
550                        .storage
551                        .schema_manager()
552                        .get_or_assign_edge_type_id(name))
553                }
554            }
555            _ => Err(anyhow!(
556                "Invalid _type value: expected Int or String, got {:?}",
557                type_val
558            )),
559        }
560    }
561
562    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563        if let Some(writer_arc) = &self.writer {
564            // Flush first while holding the lock
565            {
566                let writer: &uni_store::Writer = writer_arc.as_ref();
567                writer.flush_to_l1(None).await?;
568            } // Drop lock before compacting to avoid blocking reads/writes
569
570            // Compaction can run without holding the writer lock
571            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572            let compaction_results = compactor.compact_all().await?;
573
574            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575            let am = self.storage.adjacency_manager();
576            let schema = self.storage.schema_manager().schema();
577            for info in compaction_results {
578                // Convert string direction to Direction enum
579                let direction = match info.direction.as_str() {
580                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
581                    "bwd" => uni_store::storage::direction::Direction::Incoming,
582                    _ => continue,
583                };
584
585                // Get edge_type_id
586                if let Some(edge_type_id) =
587                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588                {
589                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591                }
592            }
593        }
594        Ok(())
595    }
596
597    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598        if let Some(writer_arc) = &self.writer {
599            let writer: &uni_store::Writer = writer_arc.as_ref();
600            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601        }
602        Ok(())
603    }
604
605    pub(crate) async fn execute_copy_to(
606        &self,
607        identifier: &str,
608        path: &str,
609        format: &str,
610        options: &HashMap<String, Value>,
611    ) -> Result<usize> {
612        // Check schema to determine if identifier is an edge type or vertex label
613        let schema = self.storage.schema_manager().schema();
614
615        // Try as edge type first
616        if schema.get_edge_type_case_insensitive(identifier).is_some() {
617            return self
618                .export_edge_type_in_format(identifier, path, format)
619                .await;
620        }
621
622        // Try as vertex label
623        if schema.get_label_case_insensitive(identifier).is_some() {
624            return self
625                .export_vertex_label_in_format(identifier, path, format, options)
626                .await;
627        }
628
629        // Neither edge type nor vertex label found
630        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631    }
632
633    async fn export_vertex_label_in_format(
634        &self,
635        label: &str,
636        path: &str,
637        format: &str,
638        _options: &HashMap<String, Value>,
639    ) -> Result<usize> {
640        match format {
641            "parquet" => self.export_vertex_label(label, path).await,
642            "csv" => {
643                let mut stream = self
644                    .storage
645                    .scan_vertex_table_stream(label)
646                    .await?
647                    .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649                // Collect all batches
650                let mut all_rows = Vec::new();
651                let mut column_names = Vec::new();
652
653                // Iterate stream using StreamExt
654                use futures::StreamExt;
655                while let Some(batch_result) = stream.next().await {
656                    let batch = batch_result?;
657
658                    // Get column names from first batch
659                    if column_names.is_empty() {
660                        column_names = batch
661                            .schema()
662                            .fields()
663                            .iter()
664                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665                            .map(|f| f.name().clone())
666                            .collect();
667                    }
668
669                    // Convert batch to rows
670                    for row_idx in 0..batch.num_rows() {
671                        let mut row = Vec::new();
672                        for field in batch.schema().fields() {
673                            if field.name().starts_with('_') || field.name() == "ext_id" {
674                                continue;
675                            }
676
677                            let col_idx = batch.schema().index_of(field.name())?;
678                            let column = batch.column(col_idx);
679                            let value = self.arrow_value_to_json(column, row_idx)?;
680
681                            // Convert value to CSV string
682                            let csv_value = match value {
683                                Value::Null => String::new(),
684                                Value::Bool(b) => b.to_string(),
685                                Value::Int(i) => i.to_string(),
686                                Value::Float(f) => f.to_string(),
687                                Value::String(s) => s,
688                                _ => format!("{value}"),
689                            };
690                            row.push(csv_value);
691                        }
692                        all_rows.push(row);
693                    }
694                }
695
696                // Write CSV
697                let file = std::fs::File::create(path)?;
698                let mut wtr = csv::Writer::from_writer(file);
699
700                // Write headers
701                log::debug!("CSV export headers: {:?}", column_names);
702                wtr.write_record(&column_names)?;
703
704                // Write rows
705                for (i, row) in all_rows.iter().enumerate() {
706                    log::debug!("CSV export row {}: {:?}", i, row);
707                    wtr.write_record(row)?;
708                }
709
710                wtr.flush()?;
711                Ok(all_rows.len())
712            }
713            _ => Err(anyhow!(
714                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715                format
716            )),
717        }
718    }
719
720    async fn export_edge_type_in_format(
721        &self,
722        edge_type: &str,
723        path: &str,
724        format: &str,
725    ) -> Result<usize> {
726        match format {
727            "parquet" => self.export_edge_type(edge_type, path).await,
728            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729            _ => Err(anyhow!(
730                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731                format
732            )),
733        }
734    }
735
736    /// Write a stream of record batches to a Parquet file.
737    /// Returns the total number of rows written, or 0 if the stream is empty.
738    async fn write_batches_to_parquet(
739        mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740        path: &str,
741        entity_description: &str,
742    ) -> Result<usize> {
743        use futures::TryStreamExt;
744
745        // Get first batch to determine schema and create writer
746        let first_batch = match stream.try_next().await? {
747            Some(batch) => batch,
748            None => {
749                log::info!("No data to export from {}", entity_description);
750                return Ok(0);
751            }
752        };
753
754        // Create Parquet writer using schema from first batch
755        let file = std::fs::File::create(path)?;
756        let arrow_schema = first_batch.schema();
757        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759        // Write first batch
760        let mut count = first_batch.num_rows();
761        writer.write(&first_batch)?;
762
763        // Write remaining batches
764        while let Some(batch) = stream.try_next().await? {
765            count += batch.num_rows();
766            writer.write(&batch)?;
767        }
768
769        writer.close()?;
770
771        log::info!(
772            "Exported {} rows from {} to '{}'",
773            count,
774            entity_description,
775            path
776        );
777        Ok(count)
778    }
779
780    /// Export vertices of a specific label to Parquet
781    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782        let stream = self
783            .storage
784            .scan_vertex_table_stream(label)
785            .await?
786            .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789    }
790
791    /// Export edges of a specific type to Parquet
792    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793        let schema = self.storage.schema_manager().schema();
794        if !schema.edge_types.contains_key(edge_type) {
795            return Err(anyhow!("Edge type '{}' not found", edge_type));
796        }
797
798        let filter = format!("type = '{}'", edge_type);
799        let stream = self
800            .storage
801            .scan_main_edge_table_stream(Some(&filter))
802            .await?
803            .ok_or_else(|| anyhow!("No edge data found"))?;
804
805        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806    }
807
808    pub(crate) async fn execute_copy_from(
809        &self,
810        label: &str,
811        path: &str,
812        format: &str,
813        options: &HashMap<String, Value>,
814    ) -> Result<usize> {
815        // Read data from file
816        let batches = match format {
817            "parquet" => self.read_parquet_file(path)?,
818            "csv" => self.read_csv_file(path, label, options)?,
819            _ => {
820                return Err(anyhow!(
821                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822                    format
823                ));
824            }
825        };
826
827        // Get writer
828        let writer_arc = self
829            .writer
830            .as_ref()
831            .ok_or_else(|| anyhow!("No writer available"))?;
832
833        let db_schema = self.storage.schema_manager().schema();
834
835        // Check if this is a label (vertex) or edge type
836        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838        if is_edge {
839            // Import edges
840            let edge_type_id = db_schema
841                .edge_type_id_by_name(label)
842                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844            // Get src and dst column names from options
845            let src_col = options
846                .get("src_col")
847                .and_then(|v| v.as_str())
848                .unwrap_or("src");
849            let dst_col = options
850                .get("dst_col")
851                .and_then(|v| v.as_str())
852                .unwrap_or("dst");
853
854            // §5.7 of concurrent_writer.md: writer is hoisted above the row
855            // loop now that there is no per-row lock acquisition cost.
856            let writer: &uni_store::Writer = writer_arc.as_ref();
857            let mut total_rows = 0;
858            for batch in batches {
859                let num_rows = batch.num_rows();
860                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861                let eids = writer.allocate_eids(num_rows).await?;
862
863                for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864                    let mut properties = HashMap::new();
865                    let mut src_vid: Option<Vid> = None;
866                    let mut dst_vid: Option<Vid> = None;
867
868                    // Extract properties and VIDs from each column
869                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870                        let col_name = field.name();
871                        let column = batch.column(col_idx);
872                        let value = self.arrow_value_to_json(column, row_idx)?;
873
874                        if col_name == src_col {
875                            let raw = value.as_u64().unwrap_or_else(|| {
876                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877                            });
878                            src_vid = Some(Vid::new(raw));
879                        } else if col_name == dst_col {
880                            let raw = value.as_u64().unwrap_or_else(|| {
881                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882                            });
883                            dst_vid = Some(Vid::new(raw));
884                        } else if !col_name.starts_with('_') && !value.is_null() {
885                            properties.insert(col_name.clone(), value);
886                        }
887                    }
888
889                    let src = src_vid
890                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891                    let dst = dst_vid
892                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894                    writer
895                        .insert_edge(
896                            src,
897                            dst,
898                            edge_type_id,
899                            eid,
900                            properties,
901                            Some(label.to_string()),
902                            None,
903                        )
904                        .await?;
905
906                    total_rows += 1;
907                }
908            }
909
910            log::info!(
911                "Imported {} edge rows from '{}' into edge type '{}'",
912                total_rows,
913                path,
914                label
915            );
916
917            // Flush to persist edges
918            if total_rows > 0 {
919                writer.flush_to_l1(None).await?;
920            }
921
922            Ok(total_rows)
923        } else {
924            // Import vertices
925            // Validate the label exists in schema
926            db_schema
927                .label_id_by_name_case_insensitive(label)
928                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930            // §5.7 of concurrent_writer.md: writer is hoisted above the row
931            // loop now that there is no per-row lock acquisition cost.
932            let writer: &uni_store::Writer = writer_arc.as_ref();
933            let mut total_rows = 0;
934            for batch in batches {
935                let num_rows = batch.num_rows();
936                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937                let vids = writer.allocate_vids(num_rows).await?;
938
939                // Convert Arrow batch to rows
940                for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941                    let mut properties = HashMap::new();
942
943                    // Extract properties from each column
944                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945                        let col_name = field.name();
946
947                        // Skip internal columns
948                        if col_name.starts_with('_') {
949                            continue;
950                        }
951
952                        let column = batch.column(col_idx);
953                        let value = self.arrow_value_to_json(column, row_idx)?;
954
955                        if !value.is_null() {
956                            properties.insert(col_name.clone(), value);
957                        }
958                    }
959
960                    let _ = writer
961                        .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962                        .await?;
963
964                    total_rows += 1;
965                }
966            }
967
968            log::info!(
969                "Imported {} rows from '{}' into label '{}'",
970                total_rows,
971                path,
972                label
973            );
974
975            // Flush to persist vertices
976            if total_rows > 0 {
977                writer.flush_to_l1(None).await?;
978            }
979
980            Ok(total_rows)
981        }
982    }
983
984    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985        use arrow_array::Array;
986        use arrow_schema::DataType as ArrowDataType;
987
988        if column.is_null(row_idx) {
989            return Ok(Value::Null);
990        }
991
992        match column.data_type() {
993            ArrowDataType::Utf8 => {
994                let array = column
995                    .as_any()
996                    .downcast_ref::<arrow_array::StringArray>()
997                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998                Ok(Value::String(array.value(row_idx).to_string()))
999            }
1000            ArrowDataType::Int32 => {
1001                let array = column
1002                    .as_any()
1003                    .downcast_ref::<arrow_array::Int32Array>()
1004                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005                Ok(Value::Int(array.value(row_idx) as i64))
1006            }
1007            ArrowDataType::Int64 => {
1008                let array = column
1009                    .as_any()
1010                    .downcast_ref::<arrow_array::Int64Array>()
1011                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012                Ok(Value::Int(array.value(row_idx)))
1013            }
1014            ArrowDataType::Float32 => {
1015                let array = column
1016                    .as_any()
1017                    .downcast_ref::<arrow_array::Float32Array>()
1018                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019                Ok(Value::Float(array.value(row_idx) as f64))
1020            }
1021            ArrowDataType::Float64 => {
1022                let array = column
1023                    .as_any()
1024                    .downcast_ref::<arrow_array::Float64Array>()
1025                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026                Ok(Value::Float(array.value(row_idx)))
1027            }
1028            ArrowDataType::Boolean => {
1029                let array = column
1030                    .as_any()
1031                    .downcast_ref::<arrow_array::BooleanArray>()
1032                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033                Ok(Value::Bool(array.value(row_idx)))
1034            }
1035            ArrowDataType::UInt64 => {
1036                let array = column
1037                    .as_any()
1038                    .downcast_ref::<arrow_array::UInt64Array>()
1039                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040                Ok(Value::Int(array.value(row_idx) as i64))
1041            }
1042            _ => {
1043                // For other types, try to convert to string
1044                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045                if let Some(arr) = array {
1046                    Ok(Value::String(arr.value(row_idx).to_string()))
1047                } else {
1048                    Ok(Value::Null)
1049                }
1050            }
1051        }
1052    }
1053
1054    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055        let file = std::fs::File::open(path)?;
1056        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057            .build()?;
1058        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059    }
1060
1061    fn read_csv_file(
1062        &self,
1063        path: &str,
1064        label: &str,
1065        options: &HashMap<String, Value>,
1066    ) -> Result<Vec<arrow_array::RecordBatch>> {
1067        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069        use std::sync::Arc;
1070
1071        // Parse CSV options
1072        let has_headers = options
1073            .get("headers")
1074            .and_then(|v| v.as_bool())
1075            .unwrap_or(true);
1076
1077        // Read CSV file
1078        let file = std::fs::File::open(path)?;
1079        let mut rdr = csv::ReaderBuilder::new()
1080            .has_headers(has_headers)
1081            .from_reader(file);
1082
1083        // Get schema for type conversion
1084        let db_schema = self.storage.schema_manager().schema();
1085        let properties = db_schema.properties.get(label);
1086
1087        // Collect all rows first to determine schema
1088        let mut rows: Vec<Vec<String>> = Vec::new();
1089        let headers: Vec<String> = if has_headers {
1090            rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091        } else {
1092            Vec::new()
1093        };
1094
1095        for result in rdr.records() {
1096            let record = result?;
1097            rows.push(record.iter().map(|s| s.to_string()).collect());
1098        }
1099
1100        if rows.is_empty() {
1101            return Ok(Vec::new());
1102        }
1103
1104        // Build Arrow schema with proper types based on DB schema
1105        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106        let col_names: Vec<String> = if has_headers {
1107            headers
1108        } else {
1109            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110        };
1111
1112        for name in &col_names {
1113            let arrow_type = if let Some(props) = properties {
1114                if let Some(prop_meta) = props.get(name) {
1115                    match prop_meta.r#type {
1116                        DataType::Int32 => ArrowDataType::Int32,
1117                        DataType::Int64 => ArrowDataType::Int64,
1118                        DataType::Float32 => ArrowDataType::Float32,
1119                        DataType::Float64 => ArrowDataType::Float64,
1120                        DataType::Bool => ArrowDataType::Boolean,
1121                        _ => ArrowDataType::Utf8,
1122                    }
1123                } else {
1124                    ArrowDataType::Utf8
1125                }
1126            } else {
1127                ArrowDataType::Utf8
1128            };
1129            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130        }
1131
1132        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134        // Convert rows to Arrow arrays with proper types
1135        let mut columns: Vec<ArrayRef> = Vec::new();
1136        for (col_idx, field) in arrow_fields.iter().enumerate() {
1137            match field.data_type() {
1138                ArrowDataType::Int32 => {
1139                    let values: Vec<Option<i32>> = rows
1140                        .iter()
1141                        .map(|row| {
1142                            if col_idx < row.len() {
1143                                row[col_idx].parse().ok()
1144                            } else {
1145                                None
1146                            }
1147                        })
1148                        .collect();
1149                    columns.push(Arc::new(Int32Array::from(values)));
1150                }
1151                _ => {
1152                    // Default to string
1153                    let values: Vec<Option<String>> = rows
1154                        .iter()
1155                        .map(|row| {
1156                            if col_idx < row.len() {
1157                                Some(row[col_idx].clone())
1158                            } else {
1159                                None
1160                            }
1161                        })
1162                        .collect();
1163                    columns.push(Arc::new(StringArray::from(values)));
1164                }
1165            }
1166        }
1167
1168        let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169        Ok(vec![batch])
1170    }
1171
1172    fn parse_data_type(type_str: &str) -> Result<DataType> {
1173        use uni_common::core::schema::{CrdtType, PointType};
1174        let type_str = type_str.to_lowercase();
1175        let type_str = type_str.trim();
1176        match type_str {
1177            "string" | "text" | "varchar" => Ok(DataType::String),
1178            "int" | "integer" | "int32" => Ok(DataType::Int32),
1179            "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180            "float" | "float32" | "real" => Ok(DataType::Float32),
1181            "double" | "float64" => Ok(DataType::Float64),
1182            "bool" | "boolean" => Ok(DataType::Bool),
1183            "timestamp" => Ok(DataType::Timestamp),
1184            "date" => Ok(DataType::Date),
1185            "time" => Ok(DataType::Time),
1186            "datetime" => Ok(DataType::DateTime),
1187            "duration" => Ok(DataType::Duration),
1188            "btic" => Ok(DataType::Btic),
1189            "json" | "jsonb" => Ok(DataType::CypherValue),
1190            "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194            s if s.starts_with("sparse_vector(") && s.ends_with(')') => {
1195                let dims_str = &s["sparse_vector(".len()..s.len() - 1];
1196                let dimensions = dims_str
1197                    .parse::<usize>()
1198                    .map_err(|_| anyhow!("Invalid sparse_vector dimensions: {}", dims_str))?;
1199                Ok(DataType::SparseVector { dimensions })
1200            }
1201            s if s.starts_with("vector(") && s.ends_with(')') => {
1202                let dims_str = &s[7..s.len() - 1];
1203                let dimensions = dims_str
1204                    .parse::<usize>()
1205                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1206                Ok(DataType::Vector { dimensions })
1207            }
1208            s if s.starts_with("list<") && s.ends_with('>') => {
1209                let inner_type_str = &s[5..s.len() - 1];
1210                let inner_type = Self::parse_data_type(inner_type_str)?;
1211                Ok(DataType::List(Box::new(inner_type)))
1212            }
1213            s if s.starts_with("map<") && s.ends_with('>') => {
1214                let (k_str, v_str) = Self::split_map_kv(&s[4..s.len() - 1])?;
1215                let key_type = Self::parse_data_type(&k_str)?;
1216                if !matches!(key_type, DataType::String) {
1217                    return Err(anyhow!("MAP key type must be STRING, got: {k_str}"));
1218                }
1219                let value_type = Self::parse_data_type(&v_str)?;
1220                Ok(DataType::Map(Box::new(key_type), Box::new(value_type)))
1221            }
1222            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1223            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1224            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1225        }
1226    }
1227
1228    /// Split a `MAP<K, V>` inner string on the top-level comma, respecting `<>`/`()` depth
1229    /// so nested value types (`STRING, LIST<INT>`, `STRING, MAP<STRING,INT>`) split at the
1230    /// right comma. Returns trimmed `(key, value)` type strings.
1231    fn split_map_kv(inner: &str) -> Result<(String, String)> {
1232        let mut depth = 0i32;
1233        for (i, c) in inner.char_indices() {
1234            match c {
1235                '<' | '(' => depth += 1,
1236                '>' | ')' => depth -= 1,
1237                ',' if depth == 0 => {
1238                    let k = inner[..i].trim();
1239                    let v = inner[i + 1..].trim();
1240                    if k.is_empty() || v.is_empty() {
1241                        return Err(anyhow!("MAP<K,V> requires both a key and a value type"));
1242                    }
1243                    return Ok((k.to_string(), v.to_string()));
1244                }
1245                _ => {}
1246            }
1247        }
1248        Err(anyhow!(
1249            "MAP<K,V> requires a comma separating key and value types"
1250        ))
1251    }
1252
1253    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1254        let sm = self.storage.schema_manager_arc();
1255        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1256            return Ok(());
1257        }
1258        sm.add_label_with_desc(&clause.name, clause.description)?;
1259        for prop in clause.properties {
1260            let dt = Self::parse_data_type(&prop.data_type)?;
1261            sm.add_property_with_desc(
1262                &clause.name,
1263                &prop.name,
1264                dt,
1265                prop.nullable,
1266                prop.description,
1267            )?;
1268            if prop.unique {
1269                let constraint = Constraint {
1270                    name: format!("{}_{}_unique", clause.name, prop.name),
1271                    constraint_type: ConstraintType::Unique {
1272                        properties: vec![prop.name],
1273                    },
1274                    target: ConstraintTarget::Label(clause.name.clone()),
1275                    enabled: true,
1276                };
1277                sm.add_constraint(constraint)?;
1278            }
1279        }
1280        sm.save().await?;
1281        Ok(())
1282    }
1283
1284    /// True if `key` is a generated property on any of the given labels.
1285    /// Used by the partial-write flush path (Round 12 §C) to decide
1286    /// whether the property should be added to `touched_keys` so that
1287    /// Lance MergeInsert sends the recomputed value.
1288    fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1289        let schema = self.storage.schema_manager().schema();
1290        for label in labels {
1291            if let Some(props_meta) = schema.properties.get(label)
1292                && let Some(meta) = props_meta.get(key)
1293                && meta.generation_expression.is_some()
1294            {
1295                return true;
1296            }
1297        }
1298        false
1299    }
1300
1301    pub(crate) async fn enrich_properties_with_generated_columns(
1302        &self,
1303        label_name: &str,
1304        properties: &mut HashMap<String, Value>,
1305        prop_manager: &PropertyManager,
1306        params: &HashMap<String, Value>,
1307        ctx: Option<&QueryContext>,
1308    ) -> Result<()> {
1309        let schema = self.storage.schema_manager().schema();
1310
1311        if let Some(props_meta) = schema.properties.get(label_name) {
1312            let mut generators = Vec::new();
1313            for (prop_name, meta) in props_meta {
1314                if let Some(expr_str) = &meta.generation_expression {
1315                    generators.push((prop_name.clone(), expr_str.clone()));
1316                }
1317            }
1318
1319            for (prop_name, expr_str) in generators {
1320                let cache_key = (label_name.to_string(), prop_name.clone());
1321                let expr = {
1322                    let cache = self.gen_expr_cache.read().await;
1323                    cache.get(&cache_key).cloned()
1324                };
1325
1326                let expr = match expr {
1327                    Some(e) => e,
1328                    None => {
1329                        let parsed = uni_cypher::parse_expression(&expr_str)
1330                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1331                        let mut cache = self.gen_expr_cache.write().await;
1332                        cache.insert(cache_key, parsed.clone());
1333                        parsed
1334                    }
1335                };
1336
1337                let mut scope = HashMap::new();
1338
1339                // If expression has an explicit variable, use it as an object
1340                if let Some(var) = expr.extract_variable() {
1341                    scope.insert(var, Value::Map(properties.clone()));
1342                } else {
1343                    // No explicit variable - add properties directly to scope for bare references
1344                    // e.g., "lower(email)" can reference "email" directly
1345                    for (k, v) in properties.iter() {
1346                        scope.insert(k.clone(), v.clone());
1347                    }
1348                }
1349
1350                let val = self
1351                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1352                    .await?;
1353                properties.insert(prop_name, val);
1354            }
1355        }
1356        Ok(())
1357    }
1358
1359    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1360        let sm = self.storage.schema_manager_arc();
1361        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1362            return Ok(());
1363        }
1364        sm.add_edge_type_with_desc(
1365            &clause.name,
1366            clause.src_labels,
1367            clause.dst_labels,
1368            clause.description,
1369        )?;
1370        for prop in clause.properties {
1371            let dt = Self::parse_data_type(&prop.data_type)?;
1372            sm.add_property_with_desc(
1373                &clause.name,
1374                &prop.name,
1375                dt,
1376                prop.nullable,
1377                prop.description,
1378            )?;
1379        }
1380        sm.save().await?;
1381        Ok(())
1382    }
1383
1384    /// Executes an ALTER action on a schema entity.
1385    ///
1386    /// This is a shared helper for both `execute_alter_label` and
1387    /// `execute_alter_edge_type` since they have identical logic.
1388    pub(crate) async fn execute_alter_entity(
1389        sm: &Arc<SchemaManager>,
1390        entity_name: &str,
1391        action: AlterAction,
1392    ) -> Result<()> {
1393        match action {
1394            AlterAction::AddProperty(prop) => {
1395                let dt = Self::parse_data_type(&prop.data_type)?;
1396                sm.add_property_with_desc(
1397                    entity_name,
1398                    &prop.name,
1399                    dt,
1400                    prop.nullable,
1401                    prop.description,
1402                )?;
1403            }
1404            AlterAction::DropProperty(prop_name) => {
1405                sm.drop_property(entity_name, &prop_name)?;
1406            }
1407            AlterAction::RenameProperty { old_name, new_name } => {
1408                sm.rename_property(entity_name, &old_name, &new_name)?;
1409            }
1410            AlterAction::SetDescription(desc) => {
1411                if sm.schema().labels.contains_key(entity_name) {
1412                    sm.set_label_description(entity_name, desc)?;
1413                } else {
1414                    sm.set_edge_type_description(entity_name, desc)?;
1415                }
1416            }
1417            AlterAction::SetPropertyDescription {
1418                property,
1419                description,
1420            } => {
1421                sm.set_property_description(entity_name, &property, description)?;
1422            }
1423        }
1424        sm.save().await?;
1425        Ok(())
1426    }
1427
1428    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1429        Self::execute_alter_entity(
1430            &self.storage.schema_manager_arc(),
1431            &clause.name,
1432            clause.action,
1433        )
1434        .await
1435    }
1436
1437    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1438        Self::execute_alter_entity(
1439            &self.storage.schema_manager_arc(),
1440            &clause.name,
1441            clause.action,
1442        )
1443        .await
1444    }
1445
1446    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1447        let sm = self.storage.schema_manager_arc();
1448        sm.drop_label(&clause.name, clause.if_exists)?;
1449        sm.save().await?;
1450        Ok(())
1451    }
1452
1453    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1454        let sm = self.storage.schema_manager_arc();
1455        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1456        sm.save().await?;
1457        Ok(())
1458    }
1459
1460    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1461        let sm = self.storage.schema_manager_arc();
1462        let target = ConstraintTarget::Label(clause.label);
1463        let c_type = match clause.constraint_type {
1464            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1465                properties: clause.properties,
1466            },
1467            AstConstraintType::Exists => {
1468                let property = clause
1469                    .properties
1470                    .into_iter()
1471                    .next()
1472                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1473                ConstraintType::Exists { property }
1474            }
1475            AstConstraintType::Check => {
1476                let expression = clause
1477                    .expression
1478                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1479                ConstraintType::Check {
1480                    expression: expression.to_string_repr(),
1481                }
1482            }
1483        };
1484
1485        let constraint = Constraint {
1486            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1487            constraint_type: c_type,
1488            target,
1489            enabled: true,
1490        };
1491
1492        sm.add_constraint(constraint)?;
1493        sm.save().await?;
1494        Ok(())
1495    }
1496
1497    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1498        let sm = self.storage.schema_manager_arc();
1499        sm.drop_constraint(&clause.name, false)?;
1500        sm.save().await?;
1501        Ok(())
1502    }
1503
1504    /// Detects the single-node, single-label MERGE shape the fast path serves.
1505    ///
1506    /// Returns the node pattern and its label when `pattern` is one path with
1507    /// one node element, exactly one label, and a static map-literal property
1508    /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1509    /// per-row query planning. The keys do NOT need to be indexed: the persisted
1510    /// lookup degrades to a (single, filtered) label scan when no scalar index
1511    /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1512    /// Any other shape (edges, multiple labels, non-literal properties) returns
1513    /// `None` so the caller uses the general per-row path.
1514    fn merge_single_node_fastpath<'p>(
1515        &self,
1516        pattern: &'p Pattern,
1517    ) -> Option<(&'p NodePattern, String)> {
1518        if pattern.paths.len() != 1 {
1519            return None;
1520        }
1521        let path = &pattern.paths[0];
1522        if path.elements.len() != 1 {
1523            return None;
1524        }
1525        let PatternElement::Node(n) = &path.elements[0] else {
1526            return None;
1527        };
1528        let labels = n.labels.names();
1529        if labels.len() != 1 {
1530            return None;
1531        }
1532        // The key must be a static map literal so the key names are known.
1533        let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1534            return None;
1535        };
1536        if entries.is_empty() {
1537            return None;
1538        }
1539        // Resolve the label to its schema-canonical case so the fast path agrees
1540        // with the general MERGE path (which matches labels case-insensitively).
1541        // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1542        // scans/keys a different label than the canonical one and creates a
1543        // duplicate (review #3a). Falls back to the as-written label when the
1544        // schema does not know it (schemaless).
1545        let canonical = self
1546            .storage
1547            .schema_manager()
1548            .schema()
1549            .canonical_label_name(&labels[0])
1550            .unwrap_or_else(|| labels[0].clone());
1551        Some((n, canonical))
1552    }
1553
1554    /// RC3: detect the bound-endpoints, anonymous-edge relationship MERGE shape
1555    /// `(a)-[:TYPE]->(b)` whose edge existence can be resolved with one O(1)
1556    /// adjacency probe instead of building and running a per-row traversal
1557    /// `LogicalPlan` (the general path is ~19x the bulk CREATE of the same edges).
1558    ///
1559    /// Returns `(source_var, target_var, edge_type_id, direction)` when the
1560    /// pattern is exactly one path of `[Node, Rel, Node]` where the relationship
1561    /// is a single concrete type, **anonymous** (no variable → no edge binding to
1562    /// reproduce), fixed-length, and unfiltered, and both endpoint nodes are plain
1563    /// variables with no re-specified MERGE properties or inline WHERE (those are
1564    /// filters only the general path applies). Any deviation returns `None` and
1565    /// the caller keeps the general path. The caller still verifies per row that
1566    /// both endpoints are actually bound to vids.
1567    fn merge_relationship_fastpath_shape(
1568        &self,
1569        pattern: &Pattern,
1570    ) -> Option<(
1571        String,
1572        String,
1573        u32,
1574        uni_store::storage::direction::Direction,
1575    )> {
1576        if pattern.paths.len() != 1 {
1577            return None;
1578        }
1579        let [
1580            PatternElement::Node(a),
1581            PatternElement::Relationship(r),
1582            PatternElement::Node(b),
1583        ] = pattern.paths[0].elements.as_slice()
1584        else {
1585            return None;
1586        };
1587        // Endpoints: plain variables, no extra MERGE-pattern properties / inline
1588        // WHERE (a re-specified property is a filter the general path must apply).
1589        let src_var = a.variable.as_ref()?;
1590        let dst_var = b.variable.as_ref()?;
1591        if a.properties.is_some()
1592            || a.where_clause.is_some()
1593            || b.properties.is_some()
1594            || b.where_clause.is_some()
1595        {
1596            return None;
1597        }
1598        // Relationship: single concrete type, anonymous, fixed-length, unfiltered.
1599        if r.variable.is_some()
1600            || r.range.is_some()
1601            || r.properties.is_some()
1602            || r.where_clause.is_some()
1603            || r.types.names().len() != 1
1604        {
1605            return None;
1606        }
1607        let type_name = &r.types.names()[0];
1608        let type_id = if self.config.strict_schema {
1609            self.storage
1610                .schema_manager()
1611                .schema()
1612                .edge_type_id_by_name_case_insensitive(type_name)?
1613        } else {
1614            self.storage
1615                .schema_manager()
1616                .get_or_assign_edge_type_id(type_name)
1617        };
1618        let dir = match r.direction {
1619            uni_cypher::ast::Direction::Outgoing => {
1620                uni_store::storage::direction::Direction::Outgoing
1621            }
1622            uni_cypher::ast::Direction::Incoming => {
1623                uni_store::storage::direction::Direction::Incoming
1624            }
1625            // Undirected existence is ambiguous to encode as one probe; the
1626            // general path handles it.
1627            uni_cypher::ast::Direction::Both => return None,
1628        };
1629        Some((src_var.clone(), dst_var.clone(), type_id, dir))
1630    }
1631
1632    /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1633    /// is not a scalar this fast path can represent.
1634    ///
1635    /// Returning `None` makes the caller fall back to the general per-row path,
1636    /// so unusual key value types (lists, maps, temporals, nulls) are never
1637    /// silently mis-matched. The `_deleted = false` clause mirrors the
1638    /// persisted-read predicate used elsewhere; the version high-water-mark
1639    /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1640    fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1641        if key_props.is_empty() {
1642            return None;
1643        }
1644        let mut parts = Vec::with_capacity(key_props.len() + 1);
1645        for (k, v) in key_props {
1646            if !Self::is_safe_key_ident(k) {
1647                return None;
1648            }
1649            let lit = Self::render_key_literal(v)?;
1650            // Unquoted identifier: the Lance filter parser does not resolve a
1651            // double-quoted column name against the table here, so `"k" = v`
1652            // silently matches nothing. Keys are validated above to be safe
1653            // bare identifiers.
1654            parts.push(format!("{k} = {lit}"));
1655        }
1656        parts.push("_deleted = false".to_string());
1657        Some(parts.join(" AND "))
1658    }
1659
1660    /// True when a MERGE key name is a safe bare identifier for a Lance
1661    /// filter (issue #8). Keys come from a static map literal, but validate
1662    /// anyway.
1663    fn is_safe_key_ident(k: &str) -> bool {
1664        !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1665    }
1666
1667    /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1668    /// for value types this fast path cannot represent (lists, maps,
1669    /// temporals, nulls) — the caller then falls back to the general path.
1670    fn render_key_literal(v: &Value) -> Option<String> {
1671        Some(match v {
1672            Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1673            Value::Int(i) => i.to_string(),
1674            Value::Float(f) => f.to_string(),
1675            Value::Bool(b) => b.to_string(),
1676            _ => return None,
1677        })
1678    }
1679
1680    /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1681    /// sorted by `key_names` order, values canonicalized).
1682    ///
1683    /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1684    /// never compares mixed literal types against one column); composite keys
1685    /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1686    /// the same `_deleted = false` clause the per-row filter used.
1687    fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1688        if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1689            return None;
1690        }
1691        let disjunction = if let [key] = key_names {
1692            // Group literals by value variant so each IN list is homogeneous.
1693            let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1694            for tuple in keys {
1695                let (_, v) = tuple.first()?;
1696                groups
1697                    .entry(std::mem::discriminant(v))
1698                    .or_default()
1699                    .push(Self::render_key_literal(v)?);
1700            }
1701            groups
1702                .into_values()
1703                .map(|lits| {
1704                    if let [lit] = lits.as_slice() {
1705                        format!("{key} = {lit}")
1706                    } else {
1707                        format!("{key} IN ({})", lits.join(", "))
1708                    }
1709                })
1710                .collect::<Vec<_>>()
1711                .join(" OR ")
1712        } else {
1713            keys.iter()
1714                .map(|tuple| {
1715                    let conj = tuple
1716                        .iter()
1717                        .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1718                        .collect::<Option<Vec<_>>>()?
1719                        .join(" AND ");
1720                    Some(format!("({conj})"))
1721                })
1722                .collect::<Option<Vec<_>>>()?
1723                .join(" OR ")
1724        };
1725        Some(format!("({disjunction}) AND _deleted = false"))
1726    }
1727
1728    /// Canonicalize a numeric MERGE-key value for *matching only*.
1729    ///
1730    /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1731    /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1732    /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1733    /// path already applies (review #3a). Non-numeric and non-integral values are
1734    /// returned unchanged. Used only to build match keys / comparisons, never the
1735    /// value written to a created node.
1736    fn canonical_key_value(v: &Value) -> Value {
1737        match v {
1738            Value::Float(f)
1739                if f.is_finite()
1740                    && f.fract() == 0.0
1741                    && *f >= i64::MIN as f64
1742                    && *f <= i64::MAX as f64 =>
1743            {
1744                Value::Int(*f as i64)
1745            }
1746            other => other.clone(),
1747        }
1748    }
1749
1750    /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1751    ///
1752    /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1753    /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1754    /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1755    /// created node's properties come from the original, un-canonicalized map.
1756    fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1757        let mut tuple: MergeKey = key_props
1758            .iter()
1759            .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1760            .collect();
1761        tuple.sort_by(|a, b| a.0.cmp(&b.0));
1762        tuple
1763    }
1764
1765    /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1766    ///
1767    /// Walked once per MERGE statement (issue #69): the per-row fast path then
1768    /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1769    /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1770    /// rows and rows created earlier in the same transaction; rows created by
1771    /// later rows of this same statement are folded in incrementally by
1772    /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1773    /// [`Self::merge_key_tuple`].
1774    fn merge_l0_existing(
1775        &self,
1776        label: &str,
1777        key_names: &[String],
1778        ctx: Option<&QueryContext>,
1779    ) -> HashMap<MergeKey, Vec<Vid>> {
1780        let mut candidates: Vec<Vid> = Vec::new();
1781        l0_visibility::visit_l0_buffers(ctx, |l0| {
1782            if let Some(vids) = l0.label_to_vids.get(label) {
1783                candidates.extend(vids.iter().copied());
1784            }
1785            false
1786        });
1787
1788        let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1789        let mut seen: HashSet<Vid> = HashSet::new();
1790        for vid in candidates {
1791            if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1792                continue;
1793            }
1794            // `lookup_vertex_prop` merges across L0 layers (newest wins).
1795            let tuple: MergeKey = key_names
1796                .iter()
1797                .map(|k| {
1798                    let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1799                    (k.clone(), Self::canonical_key_value(&v))
1800                })
1801                .collect();
1802            map.entry(tuple).or_default().push(vid);
1803        }
1804        map
1805    }
1806
1807    /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1808    /// size and Lance/DataFusion parse cost; chunks run sequentially.
1809    const MERGE_SCAN_CHUNK: usize = 1000;
1810
1811    /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1812    /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1813    /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1814    /// independent Lance scans).
1815    ///
1816    /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1817    /// read path `MATCH` uses, so it honors the version high-water-mark and
1818    /// sees flushed rows. On the declared-label branch the key-filtered scan
1819    /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1820    /// picks each candidate's max-`_version` row and requires it to be live
1821    /// and still keyed as requested (per-label tables are MVCC-append, so a
1822    /// superseded version's row would otherwise stale-match a rewritten key).
1823    /// Matched rows are grouped by their CANONICAL key tuple (stored values
1824    /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1825    /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1826    /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1827    /// by earlier rows of the same statement) is NOT checked here — the
1828    /// per-row consumer re-checks at row time, exactly as the old per-row
1829    /// scan did.
1830    ///
1831    /// The second returned map carries the FULL property maps the schemaless
1832    /// branch already decoded for each matched vid (empty on the declared-label
1833    /// branch, which projects only key columns) — the caller seeds the
1834    /// statement-level [`Prefetch`] from it at zero extra scans.
1835    ///
1836    /// # Errors
1837    /// Propagates persisted-scan and filter-build failures — fail-closed: a
1838    /// MERGE must never treat a failed lookup as "no match" and create
1839    /// duplicates.
1840    async fn merge_lookup_persisted_batch(
1841        &self,
1842        label: &str,
1843        key_names: &[String],
1844        keys: &HashSet<MergeKey>,
1845    ) -> Result<(
1846        HashMap<MergeKey, Vec<Vid>>,
1847        HashMap<Vid, uni_common::Properties>,
1848    )> {
1849        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1850        if keys.is_empty() {
1851            return Ok((out, HashMap::new()));
1852        }
1853        // An undeclared (schemaless) label has no per-label table — its flushed
1854        // rows live only in the unified main vertex table. Route to the
1855        // main-table lookup, mirroring the planner's scan routing (a schemaless
1856        // MATCH plans `ScanMainByLabels` on the same schema predicate).
1857        if self
1858            .storage
1859            .schema_manager()
1860            .schema()
1861            .get_label_case_insensitive(label)
1862            .is_none()
1863        {
1864            return self
1865                .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1866                .await;
1867        }
1868        // Declared label — the per-label table is MVCC-append (an update
1869        // flush adds a higher-`_version` row for the same vid) and the key
1870        // predicate is pushed into the Lance filter, so a SUPERSEDED version
1871        // whose row still carries a requested key is returned while the vid's
1872        // current row (key rewritten, fails the filter) is invisible to the
1873        // scan. Version dedup among the returned rows cannot detect that, so
1874        // the lookup runs in two passes: the key-filtered scan only nominates
1875        // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1876        // each candidate's max-`_version` row to be live and still keyed as
1877        // requested.
1878        let mut columns: Vec<&str> = vec!["_vid"];
1879        columns.extend(key_names.iter().map(String::as_str));
1880
1881        let key_list: Vec<&MergeKey> = keys.iter().collect();
1882        let mut candidates: Vec<Vid> = Vec::new();
1883        let mut seen: HashSet<Vid> = HashSet::new();
1884        for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1885            let filter = Self::merge_batch_filter(key_names, chunk)
1886                .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1887            let scanned = self
1888                .storage
1889                .scan_vertex_table(label, &columns, Some(&filter))
1890                .await?;
1891            let Some(batch) = scanned else { continue };
1892            let Some(vid_col) = batch
1893                .column_by_name("_vid")
1894                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1895            else {
1896                continue;
1897            };
1898            for i in 0..vid_col.len() {
1899                let vid = Vid::from(vid_col.value(i));
1900                if seen.insert(vid) {
1901                    candidates.push(vid);
1902                }
1903            }
1904        }
1905
1906        // Verification pass — tombstones are NOT filtered Lance-side (the
1907        // max-version pick must see them so a deleted winner cannot let an
1908        // older live version resurrect the match), exactly like the
1909        // schemaless branch below.
1910        let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1911        verify_columns.extend(key_names.iter().map(String::as_str));
1912        for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1913            let vid_list = chunk
1914                .iter()
1915                .map(|v| v.as_u64().to_string())
1916                .collect::<Vec<_>>()
1917                .join(", ");
1918            let filter = format!("_vid IN ({vid_list})");
1919            let scanned = self
1920                .storage
1921                .scan_vertex_table(label, &verify_columns, Some(&filter))
1922                .await?;
1923            let Some(batch) = scanned else { continue };
1924            let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1925                batch
1926                    .column_by_name("_vid")
1927                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1928                batch
1929                    .column_by_name("_deleted")
1930                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1931                batch
1932                    .column_by_name("_version")
1933                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1934            ) else {
1935                return Err(anyhow!(
1936                    "MERGE batched lookup: verification scan missing a required column"
1937                ));
1938            };
1939            let key_cols: Vec<_> = key_names
1940                .iter()
1941                .map(|k| batch.column_by_name(k))
1942                .collect::<Option<Vec<_>>>()
1943                .ok_or_else(|| {
1944                    anyhow!("MERGE batched lookup: projected key column missing from scan result")
1945                })?;
1946            // Per-vid MVCC dedup: keep the highest-version row for each vid.
1947            let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1948            for i in 0..batch.num_rows() {
1949                let vid = Vid::from(vid_col.value(i));
1950                let ver = ver_col.value(i);
1951                let entry = winners.entry(vid).or_insert((ver, i));
1952                if ver > entry.0 {
1953                    *entry = (ver, i);
1954                }
1955            }
1956            for (vid, (_ver, row)) in winners {
1957                if del_col.value(row) {
1958                    continue;
1959                }
1960                let tuple: MergeKey = key_names
1961                    .iter()
1962                    .zip(&key_cols)
1963                    .map(|(k, col)| {
1964                        let v = uni_store::storage::arrow_convert::arrow_to_value(
1965                            col.as_ref(),
1966                            row,
1967                            None,
1968                        );
1969                        (k.clone(), Self::canonical_key_value(&v))
1970                    })
1971                    .collect();
1972                if keys.contains(&tuple) {
1973                    out.entry(tuple).or_default().push(vid);
1974                }
1975            }
1976        }
1977        Ok((out, HashMap::new()))
1978    }
1979
1980    /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1981    ///
1982    /// Schemaless rows live only in the unified main vertex table (per-label
1983    /// tables exist only for declared labels), with all properties encoded in
1984    /// the `props_json` CypherValue blob — so key values cannot be pushed into
1985    /// the Lance filter; the key match happens in memory after decoding,
1986    /// exactly like the schemaless MATCH scan. One main-table scan regardless
1987    /// of key count.
1988    ///
1989    /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1990    /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1991    /// version per vid); the per-vid max-`_version` dedup runs here, then
1992    /// deleted winners are dropped.
1993    ///
1994    /// Also returns the full decoded property map per matched vid — the blob
1995    /// is decoded here anyway, and the caller seeds the statement-level
1996    /// [`Prefetch`] from it instead of re-reading per row.
1997    ///
1998    /// # Errors
1999    /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
2000    /// never treat a failed lookup as "no match" and create duplicates.
2001    async fn merge_lookup_persisted_batch_schemaless(
2002        &self,
2003        label: &str,
2004        key_names: &[String],
2005        keys: &HashSet<MergeKey>,
2006    ) -> Result<(
2007        HashMap<MergeKey, Vec<Vid>>,
2008        HashMap<Vid, uni_common::Properties>,
2009    )> {
2010        let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2011        let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
2012        let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
2013        let Some(batch) = self
2014            .storage
2015            .scan_main_vertex_table(
2016                &["_vid", "_deleted", "props_json", "_version"],
2017                Some(&filter),
2018            )
2019            .await?
2020        else {
2021            return Ok((out, props_by_vid));
2022        };
2023        let (Some(vid_col), Some(del_col), Some(ver_col)) = (
2024            batch
2025                .column_by_name("_vid")
2026                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2027            batch
2028                .column_by_name("_deleted")
2029                .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
2030            batch
2031                .column_by_name("_version")
2032                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2033        ) else {
2034            return Err(anyhow!(
2035                "schemaless MERGE lookup: main vertex table scan missing a required column"
2036            ));
2037        };
2038        let props_col = batch
2039            .column_by_name("props_json")
2040            .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2041
2042        // Per-vid MVCC dedup: keep the highest-version row for each vid.
2043        let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
2044        for i in 0..batch.num_rows() {
2045            let vid = Vid::from(vid_col.value(i));
2046            let ver = ver_col.value(i);
2047            let entry = winners.entry(vid).or_insert((ver, i));
2048            if ver > entry.0 {
2049                *entry = (ver, i);
2050            }
2051        }
2052        for (vid, (_ver, row)) in winners {
2053            // Drop deletion tombstones AFTER picking the winner — a deleted
2054            // winner must not let an older live version resurrect the match.
2055            if del_col.value(row) {
2056                continue;
2057            }
2058            // A row without properties matches only an all-Null key tuple.
2059            let props = match props_col {
2060                Some(arr) if !arrow_array::Array::is_null(arr, row) => {
2061                    match uni_common::cypher_value_codec::decode(arr.value(row))
2062                        .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
2063                    {
2064                        Value::Map(m) => m,
2065                        _ => HashMap::new(),
2066                    }
2067                }
2068                _ => HashMap::new(),
2069            };
2070            let tuple: MergeKey = key_names
2071                .iter()
2072                .map(|k| {
2073                    (
2074                        k.clone(),
2075                        Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
2076                    )
2077                })
2078                .collect();
2079            if keys.contains(&tuple) {
2080                out.entry(tuple).or_default().push(vid);
2081                props_by_vid.insert(vid, props);
2082            }
2083        }
2084        Ok((out, props_by_vid))
2085    }
2086
2087    /// True if the statement-level MERGE property prefetch is safe for `label`.
2088    ///
2089    /// False when the label declares any CRDT-typed property: a prefetch HIT in
2090    /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
2091    /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
2092    /// labels keep the per-row read path. Undeclared labels are trivially safe
2093    /// (normalization is a no-op without schema CRDT entries).
2094    fn merge_label_prefetch_safe(&self, label: &str) -> bool {
2095        let schema = self.storage.schema_manager().schema();
2096        schema.properties.get(label).is_none_or(|props| {
2097            !props
2098                .values()
2099                .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
2100        })
2101    }
2102
2103    /// True if an L0 override rewrote any key column of a persisted match away
2104    /// from its requested value (so the persisted row no longer matches).
2105    fn vid_overrides_break_key(
2106        vid: Vid,
2107        key_props: &HashMap<String, Value>,
2108        ctx: Option<&QueryContext>,
2109    ) -> bool {
2110        key_props.iter().any(|(k, want)| {
2111            matches!(
2112                l0_visibility::lookup_vertex_prop(vid, k, ctx),
2113                Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
2114            )
2115        })
2116    }
2117
2118    /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2119    /// node variable.
2120    ///
2121    /// Matches the binding shape produced by `execute_create_pattern` and the
2122    /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2123    /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2124    /// valid node binding for those consumers.
2125    fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2126        let mut obj = HashMap::new();
2127        obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2128        obj.insert(
2129            "_labels".to_string(),
2130            Value::List(vec![Value::String(label.to_string())]),
2131        );
2132        for (k, v) in props {
2133            obj.insert(k, v);
2134        }
2135        Value::Map(obj)
2136    }
2137
2138    /// True if an L0-only vertex has every key column set to the requested
2139    /// value. A missing column matches only a requested `Null`.
2140    fn l0_vid_matches_key(
2141        vid: Vid,
2142        key_props: &HashMap<String, Value>,
2143        ctx: Option<&QueryContext>,
2144    ) -> bool {
2145        key_props.iter().all(
2146            |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2147                Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2148                None => *want == Value::Null,
2149            },
2150        )
2151    }
2152
2153    /// Index fast-path execution for one MERGE row of the shape detected by
2154    /// [`Self::merge_single_node_fastpath`].
2155    ///
2156    /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2157    /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2158    /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2159    /// applies ON MATCH SET to every match, or creates the node and applies
2160    /// ON CREATE SET when there is none. A newly created vertex is folded into
2161    /// `existing` so a later row of the same batch with the same key matches it
2162    /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2163    /// match, or one for a create).
2164    ///
2165    /// `prefetched` is the statement-level property prefetch (`None` when the
2166    /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2167    /// vids carry their persisted base row, freshly created vids are seeded
2168    /// with an empty base — per-row reads then resolve as base + L0 layering
2169    /// (every SET flush writes the full row to L0 before the next read, so a
2170    /// prefetch hit equals a fresh read) instead of one storage scan each.
2171    ///
2172    /// # Errors
2173    /// Propagates evaluation, create, and SET failures.
2174    #[expect(
2175        clippy::too_many_arguments,
2176        reason = "mirrors execute_merge's threaded execution state"
2177    )]
2178    async fn execute_merge_row_indexed(
2179        &self,
2180        label: &str,
2181        node: &NodePattern,
2182        path_pattern: &Pattern,
2183        temp_vars: &[String],
2184        mut row: HashMap<String, Value>,
2185        key_props: &HashMap<String, Value>,
2186        persisted: &HashMap<MergeKey, Vec<Vid>>,
2187        key_tuple: &MergeKey,
2188        existing: &mut HashMap<MergeKey, Vec<Vid>>,
2189        on_match: Option<&SetClause>,
2190        on_create: Option<&SetClause>,
2191        prop_manager: &PropertyManager,
2192        params: &HashMap<String, Value>,
2193        ctx: Option<&QueryContext>,
2194        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2195        writer: &Writer,
2196        mut prefetched: Option<&mut Prefetch>,
2197    ) -> Result<Vec<HashMap<String, Value>>> {
2198        let empty_prefetch = Prefetch::default();
2199        let mut seen: HashSet<Vid> = HashSet::new();
2200        let mut matches: Vec<Vid> = Vec::new();
2201        // Persisted (flushed) matches from the per-statement prefetch. The
2202        // prefetch is static for the statement, so re-verify liveness at row
2203        // time — an earlier row of this batch may have deleted the candidate
2204        // or rewritten its key (the old per-row scan saw those through its L0
2205        // overlay checks; these are the same checks, moved to row time).
2206        if let Some(vids) = persisted.get(key_tuple) {
2207            for &vid in vids {
2208                if l0_visibility::is_vertex_deleted(vid, ctx) {
2209                    continue;
2210                }
2211                if Self::vid_overrides_break_key(vid, key_props, ctx) {
2212                    continue;
2213                }
2214                if seen.insert(vid) {
2215                    matches.push(vid);
2216                }
2217            }
2218        }
2219        // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2220        // in case a prior row of this batch mutated or deleted the candidate.
2221        if let Some(vids) = existing.get(key_tuple) {
2222            for &vid in vids {
2223                if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2224                    continue;
2225                }
2226                if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2227                    matches.push(vid);
2228                }
2229            }
2230        }
2231
2232        let mut out = Vec::new();
2233        if matches.is_empty() {
2234            // No match: create the node, then apply ON CREATE SET. Fold the
2235            // ON CREATE SET property assignments into seed props first so a
2236            // NOT-NULL property supplied only by ON CREATE SET passes
2237            // create-time validation (RC4); the post-create SET below settles
2238            // the final values.
2239            let seed_props = self
2240                .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2241                .await?;
2242            self.execute_create_pattern(
2243                path_pattern,
2244                &mut row,
2245                writer,
2246                prop_manager,
2247                params,
2248                ctx,
2249                tx_l0_override,
2250                Some(&seed_props),
2251            )
2252            .await?;
2253            // Fold the new vertex into the batch snapshot for intra-batch
2254            // dedup, and seed the statement prefetch with an empty base: a
2255            // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2256            // resolves from the L0 row the create just wrote instead of
2257            // issuing a per-row storage scan that finds nothing.
2258            if let Some(var) = &node.variable
2259                && let Some(val) = row.get(var)
2260                && let Ok(vid) = Self::vid_from_value(val)
2261            {
2262                existing.entry(key_tuple.clone()).or_default().push(vid);
2263                if let Some(p) = prefetched.as_deref_mut() {
2264                    p.vertex.entry(vid).or_default();
2265                }
2266                // Phantom guard (RC2): register this MERGE-create's key so a
2267                // concurrent MERGE of the same key aborts retriably at commit
2268                // (converging to one node) instead of silently duplicating —
2269                // even with no declared UNIQUE constraint. Only inside a
2270                // transaction, where commit re-probes the guard; a plain CREATE
2271                // never registers a key, so it is unaffected.
2272                if let Some(tx_l0) = tx_l0_override {
2273                    let key_values: Vec<(String, Value)> = key_props
2274                        .iter()
2275                        .map(|(k, v)| (k.clone(), v.clone()))
2276                        .collect();
2277                    let guard_key =
2278                        uni_store::runtime::l0::serialize_constraint_key(label, &key_values);
2279                    tx_l0.write().insert_merge_guard_key(guard_key, vid);
2280                }
2281            }
2282            if let Some(set) = on_create {
2283                self.execute_set_items_locked(
2284                    &set.items,
2285                    &mut row,
2286                    writer,
2287                    prop_manager,
2288                    params,
2289                    ctx,
2290                    tx_l0_override,
2291                    prefetched.as_deref().unwrap_or(&empty_prefetch),
2292                )
2293                .await?;
2294            }
2295            Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2296            out.push(row);
2297        } else {
2298            // Apply ON MATCH SET to every matched node (multi-match semantics),
2299            // binding the node variable as a Map with _vid/_labels/props so
2300            // RETURN and downstream operators resolve it as they would for the
2301            // general MATCH and CREATE paths.
2302            for vid in matches {
2303                let mut m = row.clone();
2304                if let Some(var) = &node.variable {
2305                    // Minimal binding so ON MATCH SET resolves the node by _vid.
2306                    m.insert(
2307                        var.clone(),
2308                        Self::build_node_map(vid, label, HashMap::new()),
2309                    );
2310                }
2311                if let Some(set) = on_match {
2312                    self.execute_set_items_locked(
2313                        &set.items,
2314                        &mut m,
2315                        writer,
2316                        prop_manager,
2317                        params,
2318                        ctx,
2319                        tx_l0_override,
2320                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2321                    )
2322                    .await?;
2323                }
2324                if let Some(var) = &node.variable {
2325                    // Rebind with full, post-SET properties for RETURN
2326                    // fidelity. The SET above flushed the full row to L0, so a
2327                    // prefetch hit (base + L0 layering) reproduces exactly
2328                    // what a fresh storage read would return.
2329                    let props = read_vertex_props_with_prefetch(
2330                        vid,
2331                        prefetched.as_deref().unwrap_or(&empty_prefetch),
2332                        prop_manager,
2333                        ctx,
2334                    )
2335                    .await?;
2336                    m.insert(var.clone(), Self::build_node_map(vid, label, props));
2337                }
2338                Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2339                out.push(m);
2340            }
2341        }
2342        Ok(out)
2343    }
2344
2345    #[expect(clippy::too_many_arguments)]
2346    pub(crate) async fn execute_merge(
2347        &self,
2348        rows: Vec<HashMap<String, Value>>,
2349        pattern: &Pattern,
2350        on_match: Option<&SetClause>,
2351        on_create: Option<&SetClause>,
2352        prop_manager: &PropertyManager,
2353        params: &HashMap<String, Value>,
2354        ctx: Option<&QueryContext>,
2355        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2356    ) -> Result<Vec<HashMap<String, Value>>> {
2357        let writer_lock = self
2358            .writer
2359            .as_ref()
2360            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2361
2362        // Prepare pattern for path variable binding: assign temp edge variable
2363        // names to unnamed relationships in paths that have path variables.
2364        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2365
2366        // Issue #69: a single-node, single-label MERGE takes the fast path,
2367        // skipping the per-row query planning that made batched MERGE no faster
2368        // than a per-entity loop. Indexed keys get an index point-lookup;
2369        // un-indexed keys still skip planning (the lookup is a filtered scan).
2370        // The shape is the same for every row, so it is detected once.
2371        let fastpath = self.merge_single_node_fastpath(pattern);
2372
2373        // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2374        // fast path then resolves L0/intra-batch matches with an O(1) lookup
2375        // instead of re-walking L0 for every row. `key_names` is the sorted
2376        // static key set, matching `merge_key_tuple`.
2377        let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2378        // Per-row pre-evaluated fast-path keys (None = that row falls back to
2379        // the general path), and the per-statement persisted prefetch over the
2380        // deduped key tuples — ONE chunked scan instead of one scan per row.
2381        // Key expressions only see the row's own bindings + params, so
2382        // evaluating them ahead of any creates cannot observe earlier rows.
2383        let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2384        let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2385        // Statement-level property prefetch for the fast path (review perf
2386        // residual): every persisted match's full row is batch-read ONCE, so
2387        // the per-row ON MATCH SET read and the post-SET rebind resolve as
2388        // prefetch-base + L0 layering instead of one storage scan each.
2389        // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2390        // skips CRDT normalization).
2391        let mut merge_prefetch: Option<Prefetch> = None;
2392        if let Some((node, label)) = &fastpath {
2393            let mut key_names: Vec<String> = match &node.properties {
2394                Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2395                _ => Vec::new(),
2396            };
2397            key_names.sort();
2398            fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2399
2400            row_fast.reserve(rows.len());
2401            for row in &rows {
2402                let mut key_props: HashMap<String, Value> = HashMap::new();
2403                if let Some(props_expr) = &node.properties
2404                    && let Value::Map(map) = self
2405                        .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2406                        .await?
2407                {
2408                    key_props = map;
2409                }
2410                // Only rows whose every key value is a scalar the persisted
2411                // scan can express take the fast path (same gate as before,
2412                // via the filter builder).
2413                if Self::merge_key_filter(&key_props).is_some() {
2414                    let tuple = Self::merge_key_tuple(&key_props);
2415                    row_fast.push(Some((key_props, tuple)));
2416                } else {
2417                    row_fast.push(None);
2418                }
2419            }
2420            let unique_keys: HashSet<MergeKey> = row_fast
2421                .iter()
2422                .flatten()
2423                .map(|(_, tuple)| tuple.clone())
2424                .collect();
2425            let (persisted, schemaless_props) = self
2426                .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2427                .await?;
2428            fast_persisted = persisted;
2429            if self.merge_label_prefetch_safe(label) {
2430                let mut pf = Prefetch::default();
2431                if !schemaless_props.is_empty() {
2432                    // The schemaless lookup already decoded each matched vid's
2433                    // full property map — zero extra scans.
2434                    pf.vertex.extend(schemaless_props);
2435                } else {
2436                    let vids: Vec<Vid> = fast_persisted
2437                        .values()
2438                        .flatten()
2439                        .copied()
2440                        .collect::<HashSet<Vid>>()
2441                        .into_iter()
2442                        .collect();
2443                    if !vids.is_empty()
2444                        && let Ok(batch_props) = prop_manager
2445                            .get_batch_vertex_props_for_label(&vids, label, ctx)
2446                            .await
2447                    {
2448                        // One `_vid IN (…)` scan for every matched row's base.
2449                        // On Err the map stays empty — every read falls back to
2450                        // the per-row path (fail-open, same posture as
2451                        // prefetch_set_targets).
2452                        pf.vertex.extend(batch_props);
2453                    }
2454                }
2455                merge_prefetch = Some(pf);
2456            }
2457        }
2458
2459        // RC3: relationship-MERGE existence fast-path. The single-node fast path
2460        // above does not cover `(a)-[:R]->(b)`; the general path rebuilds and runs
2461        // a per-row traversal `LogicalPlan` just to check whether the edge exists
2462        // (~19x the bulk CREATE of the same edges). For the bound-endpoints,
2463        // anonymous-edge shape (and no ON MATCH SET, whose match-row semantics the
2464        // general path materialises) we resolve existence with one MVCC-correct
2465        // adjacency probe — `GraphExecutionContext::get_neighbors` merges CSR + all
2466        // L0 buffers including the transaction's own writes, so intra-batch edges
2467        // are seen — and reuse the general create / ON CREATE handling unchanged.
2468        // An ON MATCH SET with actual items needs the general path's materialised
2469        // match rows; a plain MERGE carries an *empty* on_match, which the fast
2470        // path can serve (it emits the row directly, applying nothing on match).
2471        let on_match_empty = on_match.is_none_or(|s| s.items.is_empty());
2472        let rel_fast = if fastpath.is_none() && on_match_empty {
2473            self.merge_relationship_fastpath_shape(pattern)
2474        } else {
2475            None
2476        };
2477        let rel_graph_ctx = rel_fast.as_ref().map(|_| {
2478            let l0_context = match ctx {
2479                Some(c) => crate::query::df_graph::L0Context::from_query_context(c),
2480                None => crate::query::df_graph::L0Context::empty(),
2481            };
2482            let pm_arc = self.prop_manager_arc.clone().unwrap_or_else(|| {
2483                Arc::new(PropertyManager::new(
2484                    self.storage.clone(),
2485                    self.storage.schema_manager_arc(),
2486                    prop_manager.cache_size(),
2487                ))
2488            });
2489            crate::query::df_graph::GraphExecutionContext::with_l0_context(
2490                self.effective_storage(),
2491                l0_context,
2492                pm_arc,
2493            )
2494        });
2495
2496        let mut results = Vec::new();
2497        for (idx, mut row) in rows.into_iter().enumerate() {
2498            // Rows with a pre-evaluated scalar key take the fast path; rows
2499            // with a non-scalar key fall through to the general path below.
2500            if let Some((node, label)) = &fastpath
2501                && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2502            {
2503                let writer: &uni_store::Writer = writer_lock.as_ref();
2504                let row_out = self
2505                    .execute_merge_row_indexed(
2506                        label,
2507                        node,
2508                        &path_pattern,
2509                        &temp_vars,
2510                        row,
2511                        key_props,
2512                        &fast_persisted,
2513                        key_tuple,
2514                        &mut fast_existing,
2515                        on_match,
2516                        on_create,
2517                        prop_manager,
2518                        params,
2519                        ctx,
2520                        tx_l0_override,
2521                        writer,
2522                        merge_prefetch.as_mut(),
2523                    )
2524                    .await?;
2525                results.extend(row_out);
2526                continue;
2527            }
2528
2529            // RC3 relationship fast path: bound endpoints → resolve edge
2530            // existence with one adjacency probe and reuse the general
2531            // create / ON CREATE handling, skipping the per-row traversal plan.
2532            if let (Some((src_var, dst_var, type_id, dir)), Some(graph_ctx)) =
2533                (rel_fast.as_ref(), rel_graph_ctx.as_ref())
2534            {
2535                let src_vid = row.get(src_var).and_then(|v| Self::vid_from_value(v).ok());
2536                let dst_vid = row.get(dst_var).and_then(|v| Self::vid_from_value(v).ok());
2537                if let (Some(src_vid), Some(dst_vid)) = (src_vid, dst_vid) {
2538                    let exists = graph_ctx
2539                        .get_neighbors(src_vid, *type_id, *dir)
2540                        .into_iter()
2541                        .any(|(n, _eid)| n == dst_vid);
2542                    let writer: &uni_store::Writer = writer_lock.as_ref();
2543                    if !exists {
2544                        // Edge absent: create only the edge (endpoints are bound),
2545                        // then apply ON CREATE SET — identical to the general
2546                        // create branch below.
2547                        let seed_props = self
2548                            .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2549                            .await?;
2550                        self.execute_create_pattern(
2551                            &path_pattern,
2552                            &mut row,
2553                            writer,
2554                            prop_manager,
2555                            params,
2556                            ctx,
2557                            tx_l0_override,
2558                            Some(&seed_props),
2559                        )
2560                        .await?;
2561                        if let Some(set) = on_create {
2562                            self.execute_set_items_locked(
2563                                &set.items,
2564                                &mut row,
2565                                writer,
2566                                prop_manager,
2567                                params,
2568                                ctx,
2569                                tx_l0_override,
2570                                &Prefetch::default(),
2571                            )
2572                            .await?;
2573                        }
2574                    }
2575                    // Whether matched or just created, the edge now exists; bind
2576                    // path variables and emit the row (the edge is anonymous, so
2577                    // there is no edge binding to reproduce, and ON MATCH SET is
2578                    // excluded from this fast path).
2579                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2580                    results.push(row);
2581                    continue;
2582                }
2583                // Endpoints not bound to vids → fall through to the general path.
2584            }
2585
2586            // General execution: match-or-create per row. (The index fast path
2587            // above already handles single-node, single-label, scalar-indexed
2588            // MERGE — including unique-constrained labels, whose keys are
2589            // indexed — so there is no separate constraint-only fast path.)
2590            let matches = self
2591                .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2592                .await?;
2593            let writer: &uni_store::Writer = writer_lock.as_ref();
2594
2595            let result: Result<Vec<HashMap<String, Value>>> = async {
2596                let mut batch = Vec::new();
2597                if !matches.is_empty() {
2598                    for mut m in matches {
2599                        if let Some(set) = on_match {
2600                            self.execute_set_items_locked(
2601                                &set.items,
2602                                &mut m,
2603                                writer,
2604                                prop_manager,
2605                                params,
2606                                ctx,
2607                                tx_l0_override,
2608                                &Prefetch::default(),
2609                            )
2610                            .await?;
2611                        }
2612                        Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2613                        batch.push(m);
2614                    }
2615                } else {
2616                    // Fold ON CREATE SET into seed props so a NOT-NULL property
2617                    // set only by ON CREATE SET passes create-time validation
2618                    // (RC4); the post-create SET below settles the final values.
2619                    let seed_props = self
2620                        .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2621                        .await?;
2622                    self.execute_create_pattern(
2623                        &path_pattern,
2624                        &mut row,
2625                        writer,
2626                        prop_manager,
2627                        params,
2628                        ctx,
2629                        tx_l0_override,
2630                        Some(&seed_props),
2631                    )
2632                    .await?;
2633                    if let Some(set) = on_create {
2634                        self.execute_set_items_locked(
2635                            &set.items,
2636                            &mut row,
2637                            writer,
2638                            prop_manager,
2639                            params,
2640                            ctx,
2641                            tx_l0_override,
2642                            &Prefetch::default(),
2643                        )
2644                        .await?;
2645                    }
2646                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2647                    batch.push(row);
2648                }
2649                Ok(batch)
2650            }
2651            .await;
2652
2653            results.extend(result?);
2654        }
2655        Ok(results)
2656    }
2657
2658    /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2659    ///
2660    /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2661    /// only by `ON CREATE SET` is present when the MERGE node is created and
2662    /// passes constraint validation (RC4). The right-hand side is evaluated
2663    /// against the current `row`.
2664    ///
2665    /// Items whose right-hand side references the target variable (e.g.
2666    /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2667    /// let the post-create SET read the seeded value and apply the assignment
2668    /// twice. Such items run only post-create, exactly once (unchanged behavior).
2669    ///
2670    /// # Errors
2671    /// Returns an error if evaluating an assignment's right-hand side fails.
2672    pub(crate) async fn on_create_seed_props(
2673        &self,
2674        on_create: Option<&SetClause>,
2675        row: &HashMap<String, Value>,
2676        prop_manager: &PropertyManager,
2677        params: &HashMap<String, Value>,
2678        ctx: Option<&QueryContext>,
2679    ) -> Result<HashMap<String, HashMap<String, Value>>> {
2680        let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2681        let Some(set) = on_create else {
2682            return Ok(seed);
2683        };
2684        for item in &set.items {
2685            if let SetItem::Property { expr, value } = item
2686                && let Expr::Property(var_expr, prop_name) = expr
2687                && let Expr::Variable(var_name) = &**var_expr
2688                // Skip self-referential RHS so the post-create SET (which also
2689                // runs) applies it exactly once rather than reading the seed.
2690                && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2691                    value, var_name,
2692                )
2693            {
2694                let val = self
2695                    .evaluate_expr(value, row, prop_manager, params, ctx)
2696                    .await?;
2697                seed.entry(var_name.clone())
2698                    .or_default()
2699                    .insert(prop_name.clone(), val);
2700            }
2701        }
2702        Ok(seed)
2703    }
2704
2705    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2706    #[expect(clippy::too_many_arguments)]
2707    pub(crate) async fn execute_create_pattern(
2708        &self,
2709        pattern: &Pattern,
2710        row: &mut HashMap<String, Value>,
2711        writer: &Writer,
2712        prop_manager: &PropertyManager,
2713        params: &HashMap<String, Value>,
2714        ctx: Option<&QueryContext>,
2715        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2716        // Per-variable properties to gap-fill into newly-created nodes before
2717        // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2718        // NOT-NULL property supplied only by ON CREATE SET passes create-time
2719        // validation (RC4). `None` for plain CREATE.
2720        seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2721    ) -> Result<()> {
2722        for path in &pattern.paths {
2723            let mut prev_vid: Option<Vid> = None;
2724            // (rel_var, type_id, type_name, props_expr, direction)
2725            type PendingRel = (String, u32, String, Option<Expr>, Direction);
2726            let mut rel_pending: Option<PendingRel> = None;
2727
2728            for element in &path.elements {
2729                match element {
2730                    PatternElement::Node(n) => {
2731                        let mut vid = None;
2732
2733                        // Check if node variable already bound in row
2734                        if let Some(var) = &n.variable
2735                            && let Some(val) = row.get(var)
2736                            && let Ok(existing_vid) = Self::vid_from_value(val)
2737                        {
2738                            vid = Some(existing_vid);
2739                        }
2740
2741                        // If not bound, create it
2742                        if vid.is_none() {
2743                            let mut props = HashMap::new();
2744                            if let Some(props_expr) = &n.properties {
2745                                let props_val = self
2746                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2747                                    .await?;
2748                                if let Value::Map(map) = props_val {
2749                                    for (k, v) in map {
2750                                        props.insert(k, v);
2751                                    }
2752                                } else {
2753                                    return Err(anyhow!("Properties must evaluate to a map"));
2754                                }
2755                            }
2756
2757                            // MERGE ON CREATE SET: gap-fill properties supplied
2758                            // only by ON CREATE SET so a NOT-NULL property absent
2759                            // from the merge key passes create-time validation
2760                            // (RC4). `or_insert` keeps the merge-key/pattern props
2761                            // authoritative; the post-create SET re-applies the
2762                            // real values, so the final state is unchanged.
2763                            if let Some(seed) = seed_props
2764                                && let Some(var) = &n.variable
2765                                && let Some(var_seed) = seed.get(var)
2766                            {
2767                                for (k, v) in var_seed {
2768                                    props.entry(k.clone()).or_insert_with(|| v.clone());
2769                                }
2770                            }
2771
2772                            let schema = self.storage.schema_manager().schema();
2773
2774                            // Strict schema: reject undeclared labels.
2775                            if self.config.strict_schema {
2776                                for label_name in &n.labels {
2777                                    if schema.get_label_case_insensitive(label_name).is_none() {
2778                                        return Err(anyhow!(
2779                                            "Label '{}' is not defined in the schema \
2780                                             (strict_schema is enabled). \
2781                                             Declare it with db.schema().label(...).apply() first.",
2782                                            label_name
2783                                        ));
2784                                    }
2785                                }
2786                            }
2787
2788                            // VID generation is label-independent. Pull from the
2789                            // per-tx reservoir if set (amortizes the global
2790                            // IdAllocator mutex), else fall back to the direct
2791                            // per-VID path.
2792                            let new_vid = match &self.id_reservoir {
2793                                Some(r) => r.next_vid().await?,
2794                                None => writer.next_vid().await?,
2795                            };
2796
2797                            // Enrich with generated columns only for known labels
2798                            for label_name in &n.labels {
2799                                if schema.get_label_case_insensitive(label_name).is_some() {
2800                                    self.enrich_properties_with_generated_columns(
2801                                        label_name,
2802                                        &mut props,
2803                                        prop_manager,
2804                                        params,
2805                                        ctx,
2806                                    )
2807                                    .await?;
2808                                }
2809                            }
2810
2811                            // Validate/coerce against declared types AFTER enrichment, so
2812                            // a type mismatch is rejected here rather than silently nulled
2813                            // (and the row dropped) at flush — issue #68.
2814                            let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2815
2816                            // Insert vertex and get back final properties (includes auto-generated embeddings)
2817                            let final_props = writer
2818                                .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2819                                .await?;
2820
2821                            // Build node object with final properties (includes embeddings)
2822                            if let Some(var) = &n.variable {
2823                                let mut obj = HashMap::new();
2824                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2825                                let labels_list: Vec<Value> =
2826                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
2827                                obj.insert("_labels".to_string(), Value::List(labels_list));
2828                                for (k, v) in &final_props {
2829                                    obj.insert(k.clone(), v.clone());
2830                                }
2831                                // Store node as a Map with _vid, matching MATCH behavior
2832                                row.insert(var.clone(), Value::Map(obj));
2833                            }
2834                            vid = Some(new_vid);
2835                        }
2836
2837                        let current_vid = vid.unwrap();
2838
2839                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2840                            rel_pending.take()
2841                            && let Some(src) = prev_vid
2842                        {
2843                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2844
2845                            if !is_rel_bound {
2846                                let mut rel_props = HashMap::new();
2847                                if let Some(expr) = rel_props_expr {
2848                                    let val = self
2849                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
2850                                        .await?;
2851                                    if let Value::Map(map) = val {
2852                                        rel_props.extend(map);
2853                                    }
2854                                }
2855                                // Validate/coerce edge properties against the declared
2856                                // edge-type schema before storing — issue #68.
2857                                let edge_schema = self.storage.schema_manager().schema();
2858                                let rel_props = Self::coerce_and_validate_props(
2859                                    rel_props,
2860                                    &edge_schema,
2861                                    std::slice::from_ref(&type_name),
2862                                )?;
2863                                let eid = match &self.id_reservoir {
2864                                    Some(r) => r.next_eid().await?,
2865                                    None => writer.next_eid(type_id).await?,
2866                                };
2867
2868                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2869                                let (edge_src, edge_dst) = match dir {
2870                                    Direction::Incoming => (current_vid, src),
2871                                    _ => (src, current_vid),
2872                                };
2873
2874                                let store_props = !rel_var.is_empty();
2875                                let user_props = if store_props {
2876                                    rel_props.clone()
2877                                } else {
2878                                    HashMap::new()
2879                                };
2880
2881                                writer
2882                                    .insert_edge(
2883                                        edge_src,
2884                                        edge_dst,
2885                                        type_id,
2886                                        eid,
2887                                        rel_props,
2888                                        Some(type_name.clone()),
2889                                        tx_l0,
2890                                    )
2891                                    .await?;
2892
2893                                // Edge type name is now stored by insert_edge
2894
2895                                if store_props {
2896                                    let mut edge_map = HashMap::new();
2897                                    edge_map.insert(
2898                                        "_eid".to_string(),
2899                                        Value::Int(eid.as_u64() as i64),
2900                                    );
2901                                    edge_map.insert(
2902                                        "_src".to_string(),
2903                                        Value::Int(edge_src.as_u64() as i64),
2904                                    );
2905                                    edge_map.insert(
2906                                        "_dst".to_string(),
2907                                        Value::Int(edge_dst.as_u64() as i64),
2908                                    );
2909                                    edge_map
2910                                        .insert("_type".to_string(), Value::Int(type_id as i64));
2911                                    // Include user properties so downstream RETURN sees them
2912                                    for (k, v) in user_props {
2913                                        edge_map.insert(k, v);
2914                                    }
2915                                    row.insert(rel_var, Value::Map(edge_map));
2916                                }
2917                            }
2918                        }
2919                        prev_vid = Some(current_vid);
2920                    }
2921                    PatternElement::Relationship(r) => {
2922                        if r.types.len() != 1 {
2923                            return Err(anyhow!(
2924                                "CREATE relationship must specify exactly one type"
2925                            ));
2926                        }
2927                        let type_name = &r.types[0];
2928                        let type_id = if self.config.strict_schema {
2929                            let schema = self.storage.schema_manager().schema();
2930                            schema
2931                                .edge_type_id_by_name_case_insensitive(type_name)
2932                                .ok_or_else(|| {
2933                                    anyhow!(
2934                                        "Edge type '{}' is not defined in the schema \
2935                                         (strict_schema is enabled). \
2936                                         Declare it with db.schema().edge_type(...).apply() first.",
2937                                        type_name
2938                                    )
2939                                })?
2940                        } else {
2941                            // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2942                            self.storage
2943                                .schema_manager()
2944                                .get_or_assign_edge_type_id(type_name)
2945                        };
2946
2947                        rel_pending = Some((
2948                            r.variable.clone().unwrap_or_default(),
2949                            type_id,
2950                            type_name.clone(),
2951                            r.properties.clone(),
2952                            r.direction.clone(),
2953                        ));
2954                    }
2955                    PatternElement::Parenthesized { .. } => {
2956                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2957                    }
2958                }
2959            }
2960        }
2961        Ok(())
2962    }
2963
2964    /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2965    ///
2966    /// These are never valid OpenCypher property values regardless of the declared column
2967    /// type. A `CypherValue` column is the sole exception and is handled by the caller
2968    /// before this is reached.
2969    ///
2970    /// # Errors
2971    /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2972    fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2973        match val {
2974            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2975                anyhow::bail!(
2976                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2977                    prop_name
2978                );
2979            }
2980            Value::List(items) => {
2981                for item in items {
2982                    if matches!(
2983                        item,
2984                        Value::Map(_)
2985                            | Value::Node(_)
2986                            | Value::Edge(_)
2987                            | Value::Path(_)
2988                            | Value::List(_)
2989                    ) {
2990                        anyhow::bail!(
2991                            "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2992                            prop_name
2993                        );
2994                    }
2995                }
2996            }
2997            _ => {}
2998        }
2999        Ok(())
3000    }
3001
3002    /// Validates and coerces `val` against the declared schema type for `prop_name`.
3003    ///
3004    /// Returns the value to actually persist. Beyond the structural checks in
3005    /// [`Self::validate_structural_property_value`], this compares the value against the
3006    /// column's declared `DataType` and:
3007    ///
3008    /// - returns it unchanged when directly storable (including the intentional
3009    ///   `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
3010    /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
3011    ///   column into the proper `Temporal` value, using the same parser as the Cypher
3012    ///   `date()`/`time()`/`datetime()`/`duration()` constructors;
3013    /// - otherwise returns an error, so a type mismatch is surfaced at the call site
3014    ///   rather than silently nulled — and the row dropped at flush. See issue #68.
3015    ///
3016    /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
3017    /// behavior.
3018    ///
3019    /// # Errors
3020    /// Returns an error if the value's type is incompatible with the declared column type,
3021    /// or if a string destined for a temporal column is not a valid temporal literal.
3022    fn coerce_and_validate_property_value(
3023        prop_name: &str,
3024        val: Value,
3025        schema: &uni_common::core::schema::Schema,
3026        labels: &[String],
3027    ) -> Result<Value> {
3028        use uni_common::core::schema::DataType;
3029
3030        // Resolve the declared type from the first label that declares this property.
3031        let declared = labels.iter().find_map(|label| {
3032            schema
3033                .properties
3034                .get(label)
3035                .and_then(|props| props.get(prop_name))
3036                .map(|meta| &meta.r#type)
3037        });
3038
3039        // CypherValue columns accept any value (including maps) — skip all checks.
3040        if matches!(declared, Some(DataType::CypherValue)) {
3041            return Ok(val);
3042        }
3043
3044        let Some(dt) = declared else {
3045            // Schemaless property: reject structural values (maps/nodes/edges/paths and
3046            // lists containing them), otherwise store as-is.
3047            Self::validate_structural_property_value(prop_name, &val)?;
3048            return Ok(val);
3049        };
3050
3051        // Sparse vectors carry invariants the type system cannot express (strictly
3052        // ascending unique term ids, finite weights, equal-length arrays, term ids
3053        // within the declared term space). Canonicalize and validate the native value
3054        // at ingest so a malformed sparse vector is a clean `TypeError` here, rather
3055        // than a panic deep in the durable WAL value codec — which reconstructs via
3056        // `SparseVector::new` and previously `.expect()`ed (issue #95). The degraded
3057        // `Value::Map` / `Null` forms fall through to `accepts` unchanged.
3058        if let (DataType::SparseVector { dimensions }, Value::SparseVector { .. }) = (dt, &val) {
3059            return Self::canonicalize_sparse_vector(prop_name, val, *dimensions);
3060        }
3061
3062        // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
3063        // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
3064        // `Vector`) receiving their matching value, and `Null` (always accepted).
3065        if dt.accepts(&val) {
3066            return Ok(val);
3067        }
3068
3069        // Known-safe coercion: a string into a temporal column is parsed as if it had
3070        // been wrapped in the matching Cypher temporal constructor.
3071        if matches!(val, Value::String(_)) {
3072            let ctor = match dt {
3073                DataType::DateTime => Some("DATETIME"),
3074                DataType::Date => Some("DATE"),
3075                DataType::Time => Some("TIME"),
3076                DataType::Duration => Some("DURATION"),
3077                _ => None,
3078            };
3079            if let Some(name) = ctor {
3080                return uni_query_functions::datetime::eval_datetime_function(
3081                    name,
3082                    std::slice::from_ref(&val),
3083                )
3084                .map_err(|e| {
3085                    anyhow!(
3086                        "TypeError: property '{}' is declared {:?} but the string value could \
3087                         not be parsed as a {} literal: {}",
3088                        prop_name,
3089                        dt,
3090                        name,
3091                        e
3092                    )
3093                });
3094            }
3095        }
3096
3097        // Not storable and not coercible. Prefer the structural message when the value
3098        // is itself structural (e.g. a map into a scalar column), preserving prior
3099        // behavior; otherwise report the scalar type mismatch.
3100        Self::validate_structural_property_value(prop_name, &val)?;
3101        anyhow::bail!(
3102            "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
3103            prop_name,
3104            dt,
3105            value_type_name(&val)
3106        );
3107    }
3108
3109    /// Canonicalizes and validates a value destined for a `SparseVector` column.
3110    ///
3111    /// Sorts term ids ascending and sums the weights of duplicate term ids (via
3112    /// [`uni_sparse_vector::SparseVector::from_pairs`]), then rejects mismatched array
3113    /// lengths, non-finite weights, and term ids outside the declared `dimensions` term
3114    /// space. Running this at the write boundary keeps the durable WAL codec's
3115    /// `SparseVector::new(..)` reconstruction infallible and mirrors the auto-embed
3116    /// path, which canonicalizes through the same kernel constructor (issue #95).
3117    ///
3118    /// # Errors
3119    /// Returns a `TypeError` if the value violates a sparse-vector invariant or carries
3120    /// a term id at or beyond `dimensions`.
3121    fn canonicalize_sparse_vector(prop_name: &str, val: Value, dimensions: usize) -> Result<Value> {
3122        let Value::SparseVector { indices, values } = val else {
3123            // The caller only routes native `Value::SparseVector` values here.
3124            anyhow::bail!(
3125                "TypeError: property '{}' is declared SparseVector but got {}",
3126                prop_name,
3127                value_type_name(&val)
3128            );
3129        };
3130        if indices.len() != values.len() {
3131            anyhow::bail!(
3132                "TypeError: property '{}' sparse vector has {} term ids but {} weights",
3133                prop_name,
3134                indices.len(),
3135                values.len()
3136            );
3137        }
3138        let pairs: Vec<(u32, f32)> = indices.into_iter().zip(values).collect();
3139        let sv = uni_sparse_vector::SparseVector::from_pairs(pairs).map_err(|e| {
3140            anyhow!(
3141                "TypeError: property '{}' has an invalid sparse vector: {}",
3142                prop_name,
3143                e
3144            )
3145        })?;
3146        // `dimensions` is the term-space cardinality (max term id + 1); the largest
3147        // term id of a canonical (ascending) vector is its last index.
3148        if let Some(&max_term) = sv.indices().last()
3149            && max_term as usize >= dimensions
3150        {
3151            anyhow::bail!(
3152                "TypeError: property '{}' sparse vector term id {} is outside the declared \
3153                 term space (dimensions = {})",
3154                prop_name,
3155                max_term,
3156                dimensions
3157            );
3158        }
3159        let (indices, values) = sv.into_parts();
3160        Ok(Value::SparseVector { indices, values })
3161    }
3162
3163    /// Coerces and validates every property in `props` against the declared types for `labels`.
3164    ///
3165    /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
3166    /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
3167    /// before handing properties to the writer, so a type mismatch is rejected up front rather
3168    /// than silently nulled — and the row dropped — at flush (issue #68).
3169    ///
3170    /// # Errors
3171    /// Returns an error on the first property whose value is incompatible with its declared type.
3172    fn coerce_and_validate_props(
3173        props: HashMap<String, Value>,
3174        schema: &uni_common::core::schema::Schema,
3175        labels: &[String],
3176    ) -> Result<HashMap<String, Value>> {
3177        let mut out = HashMap::with_capacity(props.len());
3178        for (k, v) in props {
3179            let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
3180            out.insert(k, cv);
3181        }
3182        Ok(out)
3183    }
3184
3185    #[expect(clippy::too_many_arguments)]
3186    pub(crate) async fn execute_set_items_locked(
3187        &self,
3188        items: &[SetItem],
3189        row: &mut HashMap<String, Value>,
3190        writer: &Writer,
3191        prop_manager: &PropertyManager,
3192        params: &HashMap<String, Value>,
3193        ctx: Option<&QueryContext>,
3194        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3195        prefetched: &Prefetch,
3196    ) -> Result<()> {
3197        // Coalesce SetItem::Property items by target so we do ONE read + ONE
3198        // write per (variable, target) instead of one read-modify-write cycle
3199        // per item. For an UPDATE that sets N properties on the same vertex
3200        // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
3201        // n.confidence = ...`), this collapses N redundant
3202        // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
3203        // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
3204        // the measurement, and the plan in
3205        // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
3206        // for the rationale.
3207        //
3208        // RHS evaluation order is preserved: we evaluate each RHS inline and
3209        // update the row binding immediately, so a later SetItem on the same
3210        // variable that reads `n.<earlier-prop>` sees the new value.
3211        //
3212        // Non-Property variants (Labels, Variable, VariablePlus) are less
3213        // common and have lower payoff; before processing one, we flush any
3214        // pending updates for the same variable so it sees the latest L0
3215        // state and ordering semantics are preserved.
3216        let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
3217        let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
3218
3219        for item in items {
3220            match item {
3221                SetItem::Property { expr, value } => {
3222                    if let Expr::Property(var_expr, prop_name) = expr
3223                        && let Expr::Variable(var_name) = &**var_expr
3224                        && let Some(node_val) = row.get(var_name)
3225                    {
3226                        if let Ok(vid) = Self::vid_from_value(node_val) {
3227                            reject_if_ephemeral_vid(vid)?;
3228                            let labels =
3229                                Self::extract_labels_from_node(node_val).unwrap_or_default();
3230                            let schema = self.storage.schema_manager().schema().clone();
3231
3232                            // Lazy one-time read. Always read the full row
3233                            // (preserves CRDT merge + constraint validation
3234                            // + scan-side L0 visibility). The
3235                            // partial-lance-writes optimization happens
3236                            // PURELY AT FLUSH TIME via the per-VID
3237                            // `vertex_partial_keys` set tracked in L0 — so
3238                            // L0 holds the full row, scans see the full
3239                            // row, and Lance only receives the touched
3240                            // columns. Generated-column-bearing labels
3241                            // ride the partial path too (Round 12 §C):
3242                            // `enrich_properties_with_generated_columns`
3243                            // runs at flush time over the merged-in-L0
3244                            // full row, and the produced generator keys
3245                            // are appended to `touched` so they land in
3246                            // the MergeInsert source.
3247                            if !pending_v.contains_key(var_name) {
3248                                let storage_cfg = &self.storage.config;
3249                                let partial = storage_cfg.partial_lance_writes;
3250                                let read = read_vertex_props_with_prefetch(
3251                                    vid,
3252                                    prefetched,
3253                                    prop_manager,
3254                                    ctx,
3255                                )
3256                                .await?;
3257                                pending_v.insert(
3258                                    var_name.clone(),
3259                                    PendingVertexSet {
3260                                        vid,
3261                                        labels: labels.clone(),
3262                                        props: read,
3263                                        partial,
3264                                        touched: HashSet::new(),
3265                                    },
3266                                );
3267                            }
3268
3269                            let val = self
3270                                .evaluate_expr(value, row, prop_manager, params, ctx)
3271                                .await?;
3272                            let val = Self::coerce_and_validate_property_value(
3273                                prop_name, val, &schema, &labels,
3274                            )?;
3275
3276                            let pv = pending_v
3277                                .get_mut(var_name)
3278                                .expect("inserted above when absent");
3279                            pv.props.insert(prop_name.clone(), val.clone());
3280                            // Record every SET-assigned key. For the partial path this
3281                            // drives the MergeInsert source; for the full path it is the
3282                            // signal `refresh_embed_targets` uses to re-embed when a
3283                            // source column changed (the full writer ignores it otherwise).
3284                            pv.touched.insert(prop_name.clone());
3285
3286                            // Update the row binding so subsequent RHS sees the new value.
3287                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3288                                node_map.insert(prop_name.clone(), val);
3289                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
3290                                node.properties.insert(prop_name.clone(), val);
3291                            }
3292                        } else if let Value::Map(map) = node_val
3293                            && map.get("_eid").is_some_and(|v| !v.is_null())
3294                            && map.get("_src").is_some_and(|v| !v.is_null())
3295                            && map.get("_dst").is_some_and(|v| !v.is_null())
3296                            && (map.get("_type").is_some_and(|v| !v.is_null())
3297                                || map.get("_type_name").is_some_and(|v| !v.is_null()))
3298                        {
3299                            let ei = self.extract_edge_identity(map)?;
3300                            reject_if_ephemeral_eid(ei.eid)?;
3301                            let schema = self.storage.schema_manager().schema().clone();
3302                            // Handle _type as either String or Int (Int from CREATE, String
3303                            // from queries). UNWIND on VLP edge lists emits `_type_name`
3304                            // instead of `_type`; accept either.
3305                            let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3306                            let edge_type_name = match type_val {
3307                                Some(Value::String(s)) => s.clone(),
3308                                Some(Value::Int(id)) => schema
3309                                    .edge_type_name_by_id_unified(*id as u32)
3310                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
3311                                _ => String::new(),
3312                            };
3313
3314                            if !pending_e.contains_key(var_name) {
3315                                let initial = read_edge_props_with_prefetch(
3316                                    ei.eid,
3317                                    prefetched,
3318                                    prop_manager,
3319                                    ctx,
3320                                )
3321                                .await?;
3322                                let partial = self.storage.config.partial_lance_writes;
3323                                pending_e.insert(
3324                                    var_name.clone(),
3325                                    PendingEdgeSet {
3326                                        src: ei.src,
3327                                        dst: ei.dst,
3328                                        edge_type_id: ei.edge_type_id,
3329                                        eid: ei.eid,
3330                                        edge_type_name: edge_type_name.clone(),
3331                                        props: initial,
3332                                        partial,
3333                                        touched: HashSet::new(),
3334                                    },
3335                                );
3336                            }
3337
3338                            let val = self
3339                                .evaluate_expr(value, row, prop_manager, params, ctx)
3340                                .await?;
3341                            let val = Self::coerce_and_validate_property_value(
3342                                prop_name,
3343                                val,
3344                                &schema,
3345                                std::slice::from_ref(&edge_type_name),
3346                            )?;
3347
3348                            let pe = pending_e
3349                                .get_mut(var_name)
3350                                .expect("inserted above when absent");
3351                            pe.props.insert(prop_name.clone(), val.clone());
3352                            if pe.partial {
3353                                pe.touched.insert(prop_name.clone());
3354                            }
3355
3356                            // Update the row object so subsequent RHS sees the new value.
3357                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3358                                edge_map.insert(prop_name.clone(), val);
3359                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3360                                edge.properties.insert(prop_name.clone(), val);
3361                            }
3362                        } else if let Value::Edge(edge) = node_val {
3363                            // Handle Value::Edge directly (when traverse returns Edge objects).
3364                            reject_if_ephemeral_eid(edge.eid)?;
3365                            let eid = edge.eid;
3366                            let src = edge.src;
3367                            let dst = edge.dst;
3368                            let edge_type_name = edge.edge_type.clone();
3369                            let etype =
3370                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3371                            let schema = self.storage.schema_manager().schema().clone();
3372
3373                            if !pending_e.contains_key(var_name) {
3374                                let initial = read_edge_props_with_prefetch(
3375                                    eid,
3376                                    prefetched,
3377                                    prop_manager,
3378                                    ctx,
3379                                )
3380                                .await?;
3381                                let partial = self.storage.config.partial_lance_writes;
3382                                pending_e.insert(
3383                                    var_name.clone(),
3384                                    PendingEdgeSet {
3385                                        src,
3386                                        dst,
3387                                        edge_type_id: etype,
3388                                        eid,
3389                                        edge_type_name: edge_type_name.clone(),
3390                                        props: initial,
3391                                        partial,
3392                                        touched: HashSet::new(),
3393                                    },
3394                                );
3395                            }
3396
3397                            let val = self
3398                                .evaluate_expr(value, row, prop_manager, params, ctx)
3399                                .await?;
3400                            let val = Self::coerce_and_validate_property_value(
3401                                prop_name,
3402                                val,
3403                                &schema,
3404                                std::slice::from_ref(&edge_type_name),
3405                            )?;
3406
3407                            let pe = pending_e
3408                                .get_mut(var_name)
3409                                .expect("inserted above when absent");
3410                            pe.props.insert(prop_name.clone(), val.clone());
3411                            if pe.partial {
3412                                pe.touched.insert(prop_name.clone());
3413                            }
3414
3415                            // Update the row object so subsequent RHS sees the new value.
3416                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3417                                edge.properties.insert(prop_name.clone(), val);
3418                            }
3419                        }
3420                    }
3421                }
3422                SetItem::Labels { variable, labels } => {
3423                    // Flush any pending writes for this var so the Labels op
3424                    // sees latest L0 state. Other variables' pending writes
3425                    // can keep waiting (they're independent).
3426                    self.flush_pending_var(
3427                        variable,
3428                        &mut pending_v,
3429                        &mut pending_e,
3430                        writer,
3431                        prop_manager,
3432                        params,
3433                        ctx,
3434                        tx_l0,
3435                        prefetched,
3436                    )
3437                    .await?;
3438
3439                    if let Some(node_val) = row.get(variable)
3440                        && let Ok(vid) = Self::vid_from_value(node_val)
3441                    {
3442                        reject_if_ephemeral_vid(vid)?;
3443                        let registry = self
3444                            .procedure_registry
3445                            .as_ref()
3446                            .and_then(|pr| pr.plugin_registry());
3447                        reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3448
3449                        // Get current labels from node value
3450                        let current_labels =
3451                            Self::extract_labels_from_node(node_val).unwrap_or_default();
3452
3453                        // Determine new labels to add (skip duplicates)
3454                        let labels_to_add: Vec<_> = labels
3455                            .iter()
3456                            .filter(|l| !current_labels.contains(l))
3457                            .cloned()
3458                            .collect();
3459
3460                        if !labels_to_add.is_empty() {
3461                            // Resolve the FULL new label set and write it to the
3462                            // TRANSACTION buffer (so the change is transactional
3463                            // and OCC-conflictable), falling back to the context
3464                            // (main) L0 for non-transactional callers. Replace
3465                            // semantics via `set_vertex_labels`.
3466                            let mut new_labels = current_labels;
3467                            new_labels.extend(labels_to_add);
3468                            if let Some(ctx) = ctx {
3469                                let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3470                                l0.write().set_vertex_labels(vid, &new_labels);
3471                            }
3472
3473                            // Update the node value in the row with the new labels.
3474                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
3475                                let labels_list =
3476                                    new_labels.into_iter().map(Value::String).collect();
3477                                obj.insert("_labels".to_string(), Value::List(labels_list));
3478                            }
3479                        }
3480                    }
3481                }
3482                SetItem::Variable { variable, value }
3483                | SetItem::VariablePlus { variable, value } => {
3484                    // Flush this var's pending writes first so the
3485                    // replace/merge op sees them as latest L0 state.
3486                    self.flush_pending_var(
3487                        variable,
3488                        &mut pending_v,
3489                        &mut pending_e,
3490                        writer,
3491                        prop_manager,
3492                        params,
3493                        ctx,
3494                        tx_l0,
3495                        prefetched,
3496                    )
3497                    .await?;
3498
3499                    let replace = matches!(item, SetItem::Variable { .. });
3500                    let op_str = if replace { "=" } else { "+=" };
3501
3502                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3503                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3504                        continue;
3505                    }
3506                    let rhs = self
3507                        .evaluate_expr(value, row, prop_manager, params, ctx)
3508                        .await?;
3509                    let new_props =
3510                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3511                            anyhow!(
3512                                "SET {} {} expr: right-hand side must evaluate to a map, \
3513                                 node, or relationship",
3514                                variable,
3515                                op_str
3516                            )
3517                        })?;
3518                    self.apply_properties_to_entity(
3519                        variable,
3520                        new_props,
3521                        replace,
3522                        row,
3523                        writer,
3524                        prop_manager,
3525                        params,
3526                        ctx,
3527                        tx_l0,
3528                        prefetched,
3529                    )
3530                    .await?;
3531                }
3532            }
3533        }
3534
3535        // Flush all remaining coalesced writes — one writer call per target.
3536        // Partial entries (no generated columns) call
3537        // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3538        // but the touched-keys hint drives a MergeInsert at flush. Full
3539        // entries continue through the legacy
3540        // `insert_vertex_with_labels` (Append) path with
3541        // generated-column enrichment.
3542        for (_var_name, mut pv) in pending_v {
3543            // A SET that touches an auto-embed source column must refresh the target
3544            // embedding; both the partial MergeInsert and the full Append paths
3545            // otherwise re-write the stale vector. Applied before the branch so both
3546            // writer entry points get the refreshed props + touched keys.
3547            writer.refresh_embed_targets(&mut pv.props, &mut pv.touched, &pv.labels);
3548            if pv.partial {
3549                // Round 12 §C: run the generator enrichment over the
3550                // merged-in-L0 full row, then add the produced generator
3551                // keys to `touched` so they ride the MergeInsert source.
3552                // Idempotent — generators always recompute against the
3553                // post-merge property map.
3554                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3555                for label_name in &pv.labels {
3556                    self.enrich_properties_with_generated_columns(
3557                        label_name,
3558                        &mut pv.props,
3559                        prop_manager,
3560                        params,
3561                        ctx,
3562                    )
3563                    .await?;
3564                }
3565                for k in pv.props.keys() {
3566                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3567                        pv.touched.insert(k.clone());
3568                    }
3569                }
3570                writer
3571                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3572                    .await?;
3573            } else {
3574                for label_name in &pv.labels {
3575                    self.enrich_properties_with_generated_columns(
3576                        label_name,
3577                        &mut pv.props,
3578                        prop_manager,
3579                        params,
3580                        ctx,
3581                    )
3582                    .await?;
3583                }
3584                let _ = writer
3585                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3586                    .await?;
3587            }
3588        }
3589        for (_var_name, pe) in pending_e {
3590            if pe.partial {
3591                writer
3592                    .insert_edge_partial_full(
3593                        pe.src,
3594                        pe.dst,
3595                        pe.edge_type_id,
3596                        pe.eid,
3597                        pe.props,
3598                        Some(pe.edge_type_name),
3599                        pe.touched,
3600                        tx_l0,
3601                    )
3602                    .await?;
3603            } else {
3604                writer
3605                    .insert_edge(
3606                        pe.src,
3607                        pe.dst,
3608                        pe.edge_type_id,
3609                        pe.eid,
3610                        pe.props,
3611                        Some(pe.edge_type_name),
3612                        tx_l0,
3613                    )
3614                    .await?;
3615            }
3616        }
3617
3618        Ok(())
3619    }
3620
3621    /// Flush pending SET state for a single variable to the writer.
3622    ///
3623    /// Called from the SET loop when about to process a Labels /
3624    /// Variable / VariablePlus item on `var`, so the subsequent op
3625    /// sees latest L0 state and ordering is preserved.
3626    #[expect(clippy::too_many_arguments)]
3627    async fn flush_pending_var(
3628        &self,
3629        var: &str,
3630        pending_v: &mut HashMap<String, PendingVertexSet>,
3631        pending_e: &mut HashMap<String, PendingEdgeSet>,
3632        writer: &Writer,
3633        prop_manager: &PropertyManager,
3634        _params: &HashMap<String, Value>,
3635        ctx: Option<&QueryContext>,
3636        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3637        _prefetched: &Prefetch,
3638    ) -> Result<()> {
3639        if let Some(mut pv) = pending_v.remove(var) {
3640            if pv.partial {
3641                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3642                for label_name in &pv.labels {
3643                    self.enrich_properties_with_generated_columns(
3644                        label_name,
3645                        &mut pv.props,
3646                        prop_manager,
3647                        _params,
3648                        ctx,
3649                    )
3650                    .await?;
3651                }
3652                for k in pv.props.keys() {
3653                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3654                        pv.touched.insert(k.clone());
3655                    }
3656                }
3657                writer
3658                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3659                    .await?;
3660            } else {
3661                for label_name in &pv.labels {
3662                    self.enrich_properties_with_generated_columns(
3663                        label_name,
3664                        &mut pv.props,
3665                        prop_manager,
3666                        _params,
3667                        ctx,
3668                    )
3669                    .await?;
3670                }
3671                let _ = writer
3672                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3673                    .await?;
3674            }
3675        }
3676        if let Some(pe) = pending_e.remove(var) {
3677            if pe.partial {
3678                writer
3679                    .insert_edge_partial_full(
3680                        pe.src,
3681                        pe.dst,
3682                        pe.edge_type_id,
3683                        pe.eid,
3684                        pe.props,
3685                        Some(pe.edge_type_name),
3686                        pe.touched,
3687                        tx_l0,
3688                    )
3689                    .await?;
3690            } else {
3691                writer
3692                    .insert_edge(
3693                        pe.src,
3694                        pe.dst,
3695                        pe.edge_type_id,
3696                        pe.eid,
3697                        pe.props,
3698                        Some(pe.edge_type_name),
3699                        tx_l0,
3700                    )
3701                    .await?;
3702            }
3703        }
3704        Ok(())
3705    }
3706
3707    /// Execute REMOVE clause items (property removal or label removal).
3708    ///
3709    /// Property removals are batched per variable to avoid stale reads: when
3710    /// multiple properties of the same entity are removed in one REMOVE clause,
3711    /// we read from storage once, null all specified properties, and write back
3712    /// once. This prevents the second removal from reading stale data that
3713    /// doesn't reflect the first removal's L0 write.
3714    #[expect(clippy::too_many_arguments)]
3715    pub(crate) async fn execute_remove_items_locked(
3716        &self,
3717        items: &[RemoveItem],
3718        row: &mut HashMap<String, Value>,
3719        writer: &Writer,
3720        prop_manager: &PropertyManager,
3721        ctx: Option<&QueryContext>,
3722        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3723        prefetched: &Prefetch,
3724    ) -> Result<()> {
3725        // Collect property names to remove, grouped by variable.
3726        // Use Vec<(String, Vec<String>)> to preserve insertion order.
3727        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3728
3729        for item in items {
3730            match item {
3731                RemoveItem::Property(expr) => {
3732                    if let Expr::Property(var_expr, prop_name) = expr
3733                        && let Expr::Variable(var_name) = &**var_expr
3734                    {
3735                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3736                            entry.1.push(prop_name.clone());
3737                        } else {
3738                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3739                        }
3740                    }
3741                }
3742                RemoveItem::Labels { variable, labels } => {
3743                    self.execute_remove_labels(variable, labels, row, ctx)?;
3744                }
3745            }
3746        }
3747
3748        // Execute batched property removals per variable.
3749        for (var_name, prop_names) in &prop_removals {
3750            let Some(node_val) = row.get(var_name) else {
3751                continue;
3752            };
3753
3754            if let Ok(vid) = Self::vid_from_value(node_val) {
3755                // Vertex property removal
3756                let mut props =
3757                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3758
3759                // Only write back if at least one property actually exists
3760                let removed_count = prop_names
3761                    .iter()
3762                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3763                    .count();
3764                let any_exist = removed_count > 0;
3765                if any_exist {
3766                    writer.track_properties_removed(removed_count, tx_l0);
3767                    for prop_name in prop_names {
3768                        props.insert(prop_name.clone(), Value::Null);
3769                    }
3770                }
3771                // Compute effective properties (post-removal) for _all_props
3772                let effective: HashMap<String, Value> = props
3773                    .iter()
3774                    .filter(|(_, v)| !v.is_null())
3775                    .map(|(k, v)| (k.clone(), v.clone()))
3776                    .collect();
3777                if any_exist {
3778                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3779                    let _ = writer
3780                        .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3781                        .await?;
3782                }
3783
3784                // Update the row map: set removed props to Null
3785                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3786                    for prop_name in prop_names {
3787                        node_map.insert(prop_name.clone(), Value::Null);
3788                    }
3789                    // Set _all_props to the complete effective property set
3790                    node_map.insert("_all_props".to_string(), Value::Map(effective));
3791                }
3792            } else if let Value::Map(map) = node_val {
3793                // Edge property removal (map-encoded)
3794                // Check for non-null _eid to skip OPTIONAL MATCH null edges
3795                let mut edge_effective: Option<HashMap<String, Value>> = None;
3796                if map.get("_eid").is_some_and(|v| !v.is_null()) {
3797                    let ei = self.extract_edge_identity(map)?;
3798                    let mut props =
3799                        read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3800                            .await?;
3801
3802                    let removed_count = prop_names
3803                        .iter()
3804                        .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3805                        .count();
3806                    let any_exist = removed_count > 0;
3807                    if any_exist {
3808                        writer.track_properties_removed(removed_count, tx_l0);
3809                        for prop_name in prop_names {
3810                            props.insert(prop_name.to_string(), Value::Null);
3811                        }
3812                    }
3813                    // Compute effective properties (post-removal) for _all_props
3814                    edge_effective = Some(
3815                        props
3816                            .iter()
3817                            .filter(|(_, v)| !v.is_null())
3818                            .map(|(k, v)| (k.clone(), v.clone()))
3819                            .collect(),
3820                    );
3821                    if any_exist {
3822                        let edge_type_name = map
3823                            .get("_type")
3824                            .and_then(|v| v.as_str())
3825                            .map(|s| s.to_string())
3826                            .or_else(|| {
3827                                self.storage
3828                                    .schema_manager()
3829                                    .edge_type_name_by_id_unified(ei.edge_type_id)
3830                            });
3831                        writer
3832                            .insert_edge(
3833                                ei.src,
3834                                ei.dst,
3835                                ei.edge_type_id,
3836                                ei.eid,
3837                                props,
3838                                edge_type_name,
3839                                tx_l0,
3840                            )
3841                            .await?;
3842                    }
3843                }
3844
3845                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3846                    for prop_name in prop_names {
3847                        edge_map.insert(prop_name.clone(), Value::Null);
3848                    }
3849                    if let Some(effective) = edge_effective {
3850                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
3851                    }
3852                }
3853            } else if let Value::Edge(edge) = node_val {
3854                // Edge property removal (Value::Edge)
3855                let eid = edge.eid;
3856                let src = edge.src;
3857                let dst = edge.dst;
3858                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3859
3860                let mut props =
3861                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3862
3863                let removed_count = prop_names
3864                    .iter()
3865                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3866                    .count();
3867                if removed_count > 0 {
3868                    writer.track_properties_removed(removed_count, tx_l0);
3869                    for prop_name in prop_names {
3870                        props.insert(prop_name.to_string(), Value::Null);
3871                    }
3872                    writer
3873                        .insert_edge(
3874                            src,
3875                            dst,
3876                            etype,
3877                            eid,
3878                            props,
3879                            Some(edge.edge_type.clone()),
3880                            tx_l0,
3881                        )
3882                        .await?;
3883                }
3884
3885                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3886                    for prop_name in prop_names {
3887                        edge.properties.insert(prop_name.to_string(), Value::Null);
3888                    }
3889                }
3890            }
3891        }
3892
3893        Ok(())
3894    }
3895
3896    /// Execute label removal.
3897    pub(crate) fn execute_remove_labels(
3898        &self,
3899        variable: &str,
3900        labels: &[String],
3901        row: &mut HashMap<String, Value>,
3902        ctx: Option<&QueryContext>,
3903    ) -> Result<()> {
3904        if let Some(node_val) = row.get(variable)
3905            && let Ok(vid) = Self::vid_from_value(node_val)
3906        {
3907            reject_if_ephemeral_vid(vid)?;
3908            let registry = self
3909                .procedure_registry
3910                .as_ref()
3911                .and_then(|pr| pr.plugin_registry());
3912            reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3913
3914            // Get current labels from node value
3915            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3916
3917            // Determine which labels to actually remove (only those currently present)
3918            let labels_to_remove: Vec<_> = labels
3919                .iter()
3920                .filter(|l| current_labels.contains(l))
3921                .collect();
3922
3923            if !labels_to_remove.is_empty() {
3924                // Resolve the FULL remaining label set and write it to the
3925                // TRANSACTION buffer (transactional + OCC-conflictable), falling
3926                // back to the context (main) L0 for non-transactional callers.
3927                let remaining_labels: Vec<String> = current_labels
3928                    .iter()
3929                    .filter(|l| !labels_to_remove.contains(l))
3930                    .cloned()
3931                    .collect();
3932                if let Some(ctx) = ctx {
3933                    let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3934                    l0.write().set_vertex_labels(vid, &remaining_labels);
3935                }
3936
3937                // Update the node value in the row with the remaining labels.
3938                if let Some(Value::Map(obj)) = row.get_mut(variable) {
3939                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3940                    obj.insert("_labels".to_string(), Value::List(labels_list));
3941                }
3942            }
3943        }
3944        Ok(())
3945    }
3946
3947    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3948    /// by looking up the type from the L0 buffer's edge endpoints.
3949    fn resolve_edge_type_id_for_edge(
3950        &self,
3951        edge: &crate::types::Edge,
3952        writer: &Writer,
3953        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3954    ) -> Result<u32> {
3955        if !edge.edge_type.is_empty() {
3956            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3957        }
3958        // Edge type name is empty (e.g., from anonymous MATCH patterns).
3959        // Look up the edge type ID from the L0 buffer's edge endpoints.
3960        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3961            return Ok(etype);
3962        }
3963        Err(anyhow!(
3964            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3965            edge.eid
3966        ))
3967    }
3968
3969    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3970    pub(crate) async fn execute_delete_item_locked(
3971        &self,
3972        val: &Value,
3973        detach: bool,
3974        writer: &Writer,
3975        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3976    ) -> Result<()> {
3977        match val {
3978            Value::Null => {
3979                // DELETE null is a no-op per OpenCypher spec
3980            }
3981            Value::Path(path) => {
3982                // Delete path edges first, then nodes
3983                for edge in &path.edges {
3984                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3985                    writer
3986                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3987                        .await?;
3988                }
3989                for node in &path.nodes {
3990                    self.execute_delete_vertex(
3991                        node.vid,
3992                        detach,
3993                        Some(node.labels.clone()),
3994                        writer,
3995                        tx_l0,
3996                    )
3997                    .await?;
3998                }
3999            }
4000            _ => {
4001                // Try Path reconstruction from Map first (Arrow loses Path type)
4002                if let Ok(path) = Path::try_from(val) {
4003                    for edge in &path.edges {
4004                        let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
4005                        writer
4006                            .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
4007                            .await?;
4008                    }
4009                    for node in &path.nodes {
4010                        self.execute_delete_vertex(
4011                            node.vid,
4012                            detach,
4013                            Some(node.labels.clone()),
4014                            writer,
4015                            tx_l0,
4016                        )
4017                        .await?;
4018                    }
4019                } else if let Ok(vid) = Self::vid_from_value(val) {
4020                    let labels = Self::extract_labels_from_node(val);
4021                    self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
4022                        .await?;
4023                } else if let Value::Map(map) = val {
4024                    self.execute_delete_edge_from_map(map, writer, tx_l0)
4025                        .await?;
4026                } else if let Value::Edge(edge) = val {
4027                    reject_if_ephemeral_eid(edge.eid)?;
4028                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
4029                    let registry = self
4030                        .procedure_registry
4031                        .as_ref()
4032                        .and_then(|pr| pr.plugin_registry());
4033                    reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
4034                    writer
4035                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
4036                        .await?;
4037                }
4038            }
4039        }
4040        Ok(())
4041    }
4042
4043    /// Execute vertex deletion with optional detach.
4044    pub(crate) async fn execute_delete_vertex(
4045        &self,
4046        vid: Vid,
4047        detach: bool,
4048        labels: Option<Vec<String>>,
4049        writer: &Writer,
4050        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4051    ) -> Result<()> {
4052        reject_if_ephemeral_vid(vid)?;
4053        if let Some(ls) = labels.as_deref() {
4054            let registry = self
4055                .procedure_registry
4056                .as_ref()
4057                .and_then(|pr| pr.plugin_registry());
4058            reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
4059        }
4060        if detach {
4061            self.detach_delete_vertex(vid, writer, tx_l0).await?;
4062        } else {
4063            self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
4064        }
4065        writer.delete_vertex(vid, labels, tx_l0).await?;
4066        Ok(())
4067    }
4068
4069    /// Check that a vertex has no edges (required for non-DETACH DELETE).
4070    ///
4071    /// Loads the subgraph from storage, then excludes edges that have been
4072    /// tombstoned in the writer's L0 or the transaction's L0. This ensures
4073    /// edges deleted earlier in the same DELETE clause are properly excluded.
4074    pub(crate) async fn check_vertex_has_no_edges(
4075        &self,
4076        vid: Vid,
4077        writer: &Writer,
4078        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4079    ) -> Result<()> {
4080        let schema = self.storage.schema_manager().schema();
4081        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
4082
4083        // Collect tombstoned edge IDs from both the writer L0 and tx L0.
4084        let mut tombstoned_eids = std::collections::HashSet::new();
4085        {
4086            let writer_l0 = writer.l0_manager.get_current();
4087            let guard = writer_l0.read();
4088            for &eid in guard.tombstones.keys() {
4089                tombstoned_eids.insert(eid);
4090            }
4091        }
4092        if let Some(tx) = tx_l0 {
4093            let guard = tx.read();
4094            for &eid in guard.tombstones.keys() {
4095                tombstoned_eids.insert(eid);
4096            }
4097        }
4098
4099        let out_graph = self
4100            .storage
4101            .load_subgraph_cached(
4102                &[vid],
4103                &edge_type_ids,
4104                1,
4105                uni_store::runtime::Direction::Outgoing,
4106                Some(writer.l0_manager.get_current()),
4107            )
4108            .await?;
4109        let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4110
4111        let in_graph = self
4112            .storage
4113            .load_subgraph_cached(
4114                &[vid],
4115                &edge_type_ids,
4116                1,
4117                uni_store::runtime::Direction::Incoming,
4118                Some(writer.l0_manager.get_current()),
4119            )
4120            .await?;
4121        let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4122
4123        if has_out || has_in {
4124            return Err(anyhow!(
4125                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
4126                vid
4127            ));
4128        }
4129        Ok(())
4130    }
4131
4132    /// Execute edge deletion from a map representation.
4133    pub(crate) async fn execute_delete_edge_from_map(
4134        &self,
4135        map: &HashMap<String, Value>,
4136        writer: &Writer,
4137        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4138    ) -> Result<()> {
4139        // Check for non-null _eid to skip OPTIONAL MATCH null edges
4140        if map.get("_eid").is_some_and(|v| !v.is_null()) {
4141            let ei = self.extract_edge_identity(map)?;
4142            reject_if_ephemeral_eid(ei.eid)?;
4143            let registry = self
4144                .procedure_registry
4145                .as_ref()
4146                .and_then(|pr| pr.plugin_registry());
4147            reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
4148            writer
4149                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
4150                .await?;
4151        }
4152        Ok(())
4153    }
4154
4155    /// Build a scan plan node.
4156    ///
4157    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
4158    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
4159    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
4160    fn make_scan_plan(
4161        label_id: u16,
4162        labels: Vec<String>,
4163        variable: String,
4164        filter: Option<Expr>,
4165    ) -> LogicalPlan {
4166        if label_id > 0 {
4167            LogicalPlan::Scan {
4168                label_id,
4169                labels,
4170                variable,
4171                filter,
4172                optional: false,
4173            }
4174        } else if !labels.is_empty() {
4175            // Schemaless label: use ScanMainByLabels to filter by label name
4176            LogicalPlan::ScanMainByLabels {
4177                labels,
4178                variable,
4179                filter,
4180                optional: false,
4181            }
4182        } else {
4183            LogicalPlan::ScanAll {
4184                variable,
4185                filter,
4186                optional: false,
4187            }
4188        }
4189    }
4190
4191    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
4192    /// already contains prior operators.
4193    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
4194        if matches!(plan, LogicalPlan::Empty) {
4195            scan
4196        } else {
4197            LogicalPlan::CrossJoin {
4198                left: Box::new(plan),
4199                right: Box::new(scan),
4200            }
4201        }
4202    }
4203
4204    /// Resolve MERGE property map expressions against the current row context.
4205    ///
4206    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
4207    /// property expressions that reference bound variables. These need to be
4208    /// evaluated to concrete literal values before being converted to filter
4209    /// expressions by `properties_to_expr()`.
4210    async fn resolve_merge_properties(
4211        &self,
4212        properties: &Option<Expr>,
4213        row: &HashMap<String, Value>,
4214        prop_manager: &PropertyManager,
4215        params: &HashMap<String, Value>,
4216        ctx: Option<&QueryContext>,
4217    ) -> Result<Option<Expr>> {
4218        let entries = match properties {
4219            Some(Expr::Map(entries)) => entries,
4220            other => return Ok(other.clone()),
4221        };
4222        let mut resolved = Vec::new();
4223        for (key, val_expr) in entries {
4224            if matches!(val_expr, Expr::Literal(_)) {
4225                resolved.push((key.clone(), val_expr.clone()));
4226            } else {
4227                let value = self
4228                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
4229                    .await?;
4230                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
4231            }
4232        }
4233        Ok(Some(Expr::Map(resolved)))
4234    }
4235
4236    /// Convert a runtime Value back to an AST literal expression.
4237    fn value_to_literal_expr(value: &Value) -> Expr {
4238        match value {
4239            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
4240            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
4241            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
4242            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
4243            Value::Null => Expr::Literal(CypherLiteral::Null),
4244            Value::List(items) => {
4245                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
4246            }
4247            Value::Map(entries) => Expr::Map(
4248                entries
4249                    .iter()
4250                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
4251                    .collect(),
4252            ),
4253            _ => Expr::Literal(CypherLiteral::Null),
4254        }
4255    }
4256
4257    pub(crate) async fn execute_merge_match(
4258        &self,
4259        pattern: &Pattern,
4260        row: &HashMap<String, Value>,
4261        prop_manager: &PropertyManager,
4262        params: &HashMap<String, Value>,
4263        ctx: Option<&QueryContext>,
4264    ) -> Result<Vec<HashMap<String, Value>>> {
4265        // Construct a LogicalPlan for the MATCH part of MERGE
4266        let planner =
4267            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
4268
4269        // We need to construct a CypherQuery to use the planner's plan() method,
4270        // or we can manually construct the LogicalPlan.
4271        // Manual construction is safer as we don't have to round-trip through AST.
4272
4273        let mut plan = LogicalPlan::Empty;
4274        let mut vars_in_scope = Vec::new();
4275
4276        // Add existing bound variables from row to scope
4277        for key in row.keys() {
4278            vars_in_scope.push(key.clone());
4279        }
4280
4281        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
4282        for path in &pattern.paths {
4283            let elements = &path.elements;
4284            let mut i = 0;
4285            while i < elements.len() {
4286                let part = &elements[i];
4287                match part {
4288                    PatternElement::Node(n) => {
4289                        let variable = n.variable.clone().unwrap_or_default();
4290
4291                        // If variable is already bound in the input row, we filter
4292                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
4293
4294                        if is_bound {
4295                            // If bound, we must Scan this specific VID to start the chain
4296                            // Extract VID from row
4297                            let val = row.get(&variable).unwrap();
4298                            let vid = Self::vid_from_value(val)?;
4299
4300                            // In the new storage model, VIDs don't embed label info.
4301                            // We get label from the node value if available, otherwise use 0 to scan all.
4302                            let extracted_labels =
4303                                Self::extract_labels_from_node(val).unwrap_or_default();
4304                            let label_id = {
4305                                let schema = self.storage.schema_manager().schema();
4306                                extracted_labels
4307                                    .first()
4308                                    .and_then(|l| schema.label_id_by_name(l))
4309                                    .unwrap_or(0)
4310                            };
4311
4312                            let resolved_props = self
4313                                .resolve_merge_properties(
4314                                    &n.properties,
4315                                    row,
4316                                    prop_manager,
4317                                    params,
4318                                    ctx,
4319                                )
4320                                .await?;
4321                            let prop_filter =
4322                                planner.properties_to_expr(&variable, &resolved_props);
4323
4324                            // Create a filter expression for VID: variable._vid = vid
4325                            // But our expression engine handles `Expr::Variable` as column.
4326                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
4327                            // Or we use internal property `_vid`.
4328
4329                            // Note: Scan supports `filter`.
4330                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4331
4332                            let vid_filter = Expr::BinaryOp {
4333                                left: Box::new(Expr::Property(
4334                                    Box::new(Expr::Variable(variable.clone())),
4335                                    "_vid".to_string(),
4336                                )),
4337                                op: BinaryOp::Eq,
4338                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
4339                                    vid.as_u64() as i64,
4340                                ))),
4341                            };
4342
4343                            let combined_filter = if let Some(pf) = prop_filter {
4344                                Some(Expr::BinaryOp {
4345                                    left: Box::new(vid_filter),
4346                                    op: BinaryOp::And,
4347                                    right: Box::new(pf),
4348                                })
4349                            } else {
4350                                Some(vid_filter)
4351                            };
4352
4353                            let scan = Self::make_scan_plan(
4354                                label_id,
4355                                extracted_labels,
4356                                variable.clone(),
4357                                combined_filter,
4358                            );
4359                            plan = Self::attach_scan(plan, scan);
4360                        } else {
4361                            let label_id = if n.labels.is_empty() {
4362                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4363                                0
4364                            } else {
4365                                let label_name = &n.labels[0];
4366                                let schema = self.storage.schema_manager().schema();
4367                                if self.config.strict_schema {
4368                                    schema
4369                                        .get_label_case_insensitive(label_name)
4370                                        .map(|m| m.id)
4371                                        .ok_or_else(|| {
4372                                            anyhow!(
4373                                                "Label '{}' is not defined in the schema \
4374                                                 (strict_schema is enabled). \
4375                                                 Declare it with db.schema().label(...).apply() first.",
4376                                                label_name
4377                                            )
4378                                        })?
4379                                } else {
4380                                    // Fall back to label_id 0 (any/schemaless) when not in schema.
4381                                    schema
4382                                        .get_label_case_insensitive(label_name)
4383                                        .map(|m| m.id)
4384                                        .unwrap_or(0)
4385                                }
4386                            };
4387
4388                            let resolved_props = self
4389                                .resolve_merge_properties(
4390                                    &n.properties,
4391                                    row,
4392                                    prop_manager,
4393                                    params,
4394                                    ctx,
4395                                )
4396                                .await?;
4397                            let prop_filter =
4398                                planner.properties_to_expr(&variable, &resolved_props);
4399                            let scan = Self::make_scan_plan(
4400                                label_id,
4401                                n.labels.names().to_vec(),
4402                                variable.clone(),
4403                                prop_filter,
4404                            );
4405                            plan = Self::attach_scan(plan, scan);
4406
4407                            // Add label filters when:
4408                            // 1. Multiple labels with a known schema label: filter for
4409                            //    additional labels (Scan only scans by the first label).
4410                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4411                            //    nodes, so we must filter to only those with the
4412                            //    specified label(s).
4413                            if !n.labels.is_empty()
4414                                && !variable.is_empty()
4415                                && (label_id == 0 || n.labels.len() > 1)
4416                                && let Some(label_filter) =
4417                                    planner.node_filter_expr(&variable, &n.labels, &None)
4418                            {
4419                                plan = LogicalPlan::Filter {
4420                                    input: Box::new(plan),
4421                                    predicate: label_filter,
4422                                    optional_variables: std::collections::HashSet::new(),
4423                                };
4424                            }
4425
4426                            if !variable.is_empty() {
4427                                vars_in_scope.push(variable.clone());
4428                            }
4429                        }
4430
4431                        // Now look ahead for relationship
4432                        i += 1;
4433                        while i < elements.len() {
4434                            if let PatternElement::Relationship(r) = &elements[i] {
4435                                let target_node_part = &elements[i + 1];
4436                                if let PatternElement::Node(n_target) = target_node_part {
4437                                    let schema = self.storage.schema_manager().schema();
4438                                    let mut edge_type_ids = Vec::new();
4439
4440                                    if r.types.is_empty() {
4441                                        return Err(anyhow!("MERGE edge must have a type"));
4442                                    } else if r.types.len() > 1 {
4443                                        return Err(anyhow!(
4444                                            "MERGE does not support multiple edge types"
4445                                        ));
4446                                    } else {
4447                                        let type_name = &r.types[0];
4448                                        let type_id = if self.config.strict_schema {
4449                                            let s = self.storage.schema_manager().schema();
4450                                            s.edge_type_id_by_name_case_insensitive(type_name)
4451                                                .ok_or_else(|| {
4452                                                    anyhow!(
4453                                                        "Edge type '{}' is not defined in the schema \
4454                                                         (strict_schema is enabled).",
4455                                                        type_name
4456                                                    )
4457                                                })?
4458                                        } else {
4459                                            // Schemaless: assign new ID if not found.
4460                                            self.storage
4461                                                .schema_manager()
4462                                                .get_or_assign_edge_type_id(type_name)
4463                                        };
4464                                        edge_type_ids.push(type_id);
4465                                    }
4466
4467                                    // Resolve target label ID. For schemaless labels (not in the
4468                                    // schema), fall back to 0 which means "any label" in traversal.
4469                                    let target_label_id: u16 = if let Some(lbl) =
4470                                        n_target.labels.first()
4471                                    {
4472                                        schema
4473                                            .get_label_case_insensitive(lbl)
4474                                            .map(|m| m.id)
4475                                            .unwrap_or(0)
4476                                    } else if let Some(var) = &n_target.variable {
4477                                        if let Some(val) = row.get(var) {
4478                                            // In the new storage model, get labels from node value
4479                                            if let Some(labels) =
4480                                                Self::extract_labels_from_node(val)
4481                                            {
4482                                                if let Some(first_label) = labels.first() {
4483                                                    schema
4484                                                        .get_label_case_insensitive(first_label)
4485                                                        .map(|m| m.id)
4486                                                        .unwrap_or(0)
4487                                                } else {
4488                                                    // Bound node with no labels — schemaless, any
4489                                                    0
4490                                                }
4491                                            } else if Self::vid_from_value(val).is_ok() {
4492                                                // VID without label info — schemaless, any
4493                                                0
4494                                            } else {
4495                                                return Err(anyhow!(
4496                                                    "Variable {} is not a node",
4497                                                    var
4498                                                ));
4499                                            }
4500                                        } else {
4501                                            return Err(anyhow!(
4502                                                "MERGE pattern node must have a label or be a bound variable"
4503                                            ));
4504                                        }
4505                                    } else {
4506                                        return Err(anyhow!(
4507                                            "MERGE pattern node must have a label"
4508                                        ));
4509                                    };
4510
4511                                    let target_variable =
4512                                        n_target.variable.clone().unwrap_or_default();
4513                                    let source_variable = match &elements[i - 1] {
4514                                        PatternElement::Node(n) => {
4515                                            n.variable.clone().unwrap_or_default()
4516                                        }
4517                                        _ => String::new(),
4518                                    };
4519
4520                                    let is_variable_length = r.range.is_some();
4521                                    let type_name = &r.types[0];
4522
4523                                    // Use TraverseMainByType for schemaless edge types
4524                                    // (same as MATCH planner) so edge properties are loaded
4525                                    // correctly from storage + L0 via the adjacency map.
4526                                    // Regular Traverse only loads properties via
4527                                    // property_manager which doesn't handle schemaless types.
4528                                    let is_schemaless = edge_type_ids.iter().all(|id| {
4529                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
4530                                    });
4531
4532                                    if is_schemaless {
4533                                        plan = LogicalPlan::TraverseMainByType {
4534                                            type_names: vec![type_name.clone()],
4535                                            input: Box::new(plan),
4536                                            direction: r.direction.clone(),
4537                                            source_variable,
4538                                            target_variable: target_variable.clone(),
4539                                            step_variable: r.variable.clone(),
4540                                            min_hops: r
4541                                                .range
4542                                                .as_ref()
4543                                                .and_then(|r| r.min)
4544                                                .unwrap_or(1)
4545                                                as usize,
4546                                            max_hops: r
4547                                                .range
4548                                                .as_ref()
4549                                                .and_then(|r| r.max)
4550                                                .unwrap_or(1)
4551                                                as usize,
4552                                            optional: false,
4553                                            target_filter: None,
4554                                            path_variable: None,
4555                                            is_variable_length,
4556                                            optional_pattern_vars: std::collections::HashSet::new(),
4557                                            scope_match_variables: std::collections::HashSet::new(),
4558                                            edge_filter_expr: None,
4559                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4560                                        };
4561                                    } else {
4562                                        // Collect edge property names needed for MERGE filter
4563                                        let mut edge_props = std::collections::HashSet::new();
4564                                        if let Some(Expr::Map(entries)) = &r.properties {
4565                                            for (key, _) in entries {
4566                                                edge_props.insert(key.clone());
4567                                            }
4568                                        }
4569                                        plan = LogicalPlan::Traverse {
4570                                            input: Box::new(plan),
4571                                            edge_type_ids: edge_type_ids.clone(),
4572                                            direction: r.direction.clone(),
4573                                            source_variable,
4574                                            target_variable: target_variable.clone(),
4575                                            target_label_id,
4576                                            step_variable: r.variable.clone(),
4577                                            min_hops: r
4578                                                .range
4579                                                .as_ref()
4580                                                .and_then(|r| r.min)
4581                                                .unwrap_or(1)
4582                                                as usize,
4583                                            max_hops: r
4584                                                .range
4585                                                .as_ref()
4586                                                .and_then(|r| r.max)
4587                                                .unwrap_or(1)
4588                                                as usize,
4589                                            optional: false,
4590                                            target_filter: None,
4591                                            path_variable: None,
4592                                            edge_properties: edge_props,
4593                                            is_variable_length,
4594                                            optional_pattern_vars: std::collections::HashSet::new(),
4595                                            scope_match_variables: std::collections::HashSet::new(),
4596                                            edge_filter_expr: None,
4597                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4598                                            qpp_steps: None,
4599                                        };
4600                                    }
4601
4602                                    // Apply property filters for relationship
4603                                    if r.properties.is_some()
4604                                        && let Some(r_var) = &r.variable
4605                                    {
4606                                        let resolved_rel_props = self
4607                                            .resolve_merge_properties(
4608                                                &r.properties,
4609                                                row,
4610                                                prop_manager,
4611                                                params,
4612                                                ctx,
4613                                            )
4614                                            .await?;
4615                                        if let Some(prop_filter) =
4616                                            planner.properties_to_expr(r_var, &resolved_rel_props)
4617                                        {
4618                                            plan = LogicalPlan::Filter {
4619                                                input: Box::new(plan),
4620                                                predicate: prop_filter,
4621                                                optional_variables: std::collections::HashSet::new(
4622                                                ),
4623                                            };
4624                                        }
4625                                    }
4626
4627                                    // Apply property filters for target node if it was new
4628                                    if !target_variable.is_empty() {
4629                                        let resolved_target_props = self
4630                                            .resolve_merge_properties(
4631                                                &n_target.properties,
4632                                                row,
4633                                                prop_manager,
4634                                                params,
4635                                                ctx,
4636                                            )
4637                                            .await?;
4638                                        if let Some(prop_filter) = planner.properties_to_expr(
4639                                            &target_variable,
4640                                            &resolved_target_props,
4641                                        ) {
4642                                            plan = LogicalPlan::Filter {
4643                                                input: Box::new(plan),
4644                                                predicate: prop_filter,
4645                                                optional_variables: std::collections::HashSet::new(
4646                                                ),
4647                                            };
4648                                        }
4649                                        vars_in_scope.push(target_variable.clone());
4650                                    }
4651
4652                                    if let Some(sv) = &r.variable {
4653                                        vars_in_scope.push(sv.clone());
4654                                    }
4655                                    i += 2;
4656                                } else {
4657                                    break;
4658                                }
4659                            } else {
4660                                break;
4661                            }
4662                        }
4663                    }
4664                    _ => return Err(anyhow!("Pattern must start with a node")),
4665                }
4666            }
4667
4668            // Execute the plan to find all matches, then filter against bound variables in `row`.
4669        }
4670
4671        let db_matches = self
4672            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4673            .await?;
4674
4675        // Keep only DB results that are consistent with the input row bindings.
4676        // Skip internal keys (starting with "__") as they are implementation
4677        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4678        // Also skip the empty-string key (""), which is the placeholder variable
4679        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4680        // and must not constrain the current pattern's match.
4681        let final_matches = db_matches
4682            .into_iter()
4683            .filter(|db_match| {
4684                row.iter().all(|(key, val)| {
4685                    if key.is_empty() || key.starts_with("__") {
4686                        return true;
4687                    }
4688                    let Some(db_val) = db_match.get(key) else {
4689                        return true;
4690                    };
4691                    if db_val == val {
4692                        return true;
4693                    }
4694                    // Values differ -- treat as consistent if they represent the same VID
4695                    matches!(
4696                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4697                        (Ok(v1), Ok(v2)) if v1 == v2
4698                    )
4699                })
4700            })
4701            .map(|db_match| {
4702                let mut merged = row.clone();
4703                merged.extend(db_match);
4704                merged
4705            })
4706            .collect();
4707
4708        Ok(final_matches)
4709    }
4710
4711    /// Prepare a MERGE pattern for path variable binding.
4712    ///
4713    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4714    /// unnamed relationships need internal variable names so that `execute_create_pattern`
4715    /// stores the edge data in the row for later path construction.
4716    ///
4717    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4718    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4719        let has_path_vars = pattern
4720            .paths
4721            .iter()
4722            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4723
4724        if !has_path_vars {
4725            return (pattern.clone(), Vec::new());
4726        }
4727
4728        let mut modified = pattern.clone();
4729        let mut temp_vars = Vec::new();
4730
4731        for path in &mut modified.paths {
4732            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4733                continue;
4734            }
4735            for (idx, element) in path.elements.iter_mut().enumerate() {
4736                if let PatternElement::Relationship(r) = element
4737                    && r.variable.as_ref().is_none_or(String::is_empty)
4738                {
4739                    let temp_var = format!("__path_r_{}", idx);
4740                    r.variable = Some(temp_var.clone());
4741                    temp_vars.push(temp_var);
4742                }
4743            }
4744        }
4745
4746        (modified, temp_vars)
4747    }
4748
4749    /// Bind path variables in the result row based on the MERGE pattern.
4750    ///
4751    /// Walks each path in the pattern, collects node/edge values from the row
4752    /// by variable name, and constructs a `Value::Path`.
4753    fn bind_path_variables(
4754        pattern: &Pattern,
4755        row: &mut HashMap<String, Value>,
4756        temp_vars: &[String],
4757    ) {
4758        for path in &pattern.paths {
4759            let Some(path_var) = path.variable.as_ref() else {
4760                continue;
4761            };
4762            if path_var.is_empty() {
4763                continue;
4764            }
4765
4766            let mut nodes = Vec::new();
4767            let mut edges = Vec::new();
4768
4769            for element in &path.elements {
4770                match element {
4771                    PatternElement::Node(n) => {
4772                        if let Some(var) = &n.variable
4773                            && let Some(val) = row.get(var)
4774                            && let Some(node) = Self::value_to_node_for_path(val)
4775                        {
4776                            nodes.push(node);
4777                        }
4778                    }
4779                    PatternElement::Relationship(r) => {
4780                        if let Some(var) = &r.variable
4781                            && let Some(val) = row.get(var)
4782                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4783                        {
4784                            edges.push(edge);
4785                        }
4786                    }
4787                    _ => {}
4788                }
4789            }
4790
4791            if !nodes.is_empty() {
4792                use uni_common::value::Path;
4793                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4794            }
4795        }
4796
4797        // Clean up internal temp variables
4798        for var in temp_vars {
4799            row.remove(var);
4800        }
4801    }
4802
4803    /// Convert a Value (Map or Node) to a Node for path construction.
4804    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4805        match val {
4806            Value::Node(n) => Some(n.clone()),
4807            Value::Map(map) => {
4808                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4809                let labels = if let Some(Value::List(l)) = map.get("_labels") {
4810                    l.iter()
4811                        .filter_map(|v| {
4812                            if let Value::String(s) = v {
4813                                Some(s.clone())
4814                            } else {
4815                                None
4816                            }
4817                        })
4818                        .collect()
4819                } else {
4820                    vec![]
4821                };
4822                let properties: HashMap<String, Value> = map
4823                    .iter()
4824                    .filter(|(k, _)| !k.starts_with('_'))
4825                    .map(|(k, v)| (k.clone(), v.clone()))
4826                    .collect();
4827                Some(uni_common::value::Node {
4828                    vid,
4829                    labels,
4830                    properties,
4831                })
4832            }
4833            _ => None,
4834        }
4835    }
4836
4837    /// Convert a Value (Map or Edge) to an Edge for path construction.
4838    fn value_to_edge_for_path(
4839        val: &Value,
4840        type_names: &[String],
4841    ) -> Option<uni_common::value::Edge> {
4842        match val {
4843            Value::Edge(e) => Some(e.clone()),
4844            Value::Map(map) => {
4845                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4846                let edge_type = map
4847                    .get("_type_name")
4848                    .and_then(|v| {
4849                        if let Value::String(s) = v {
4850                            Some(s.clone())
4851                        } else {
4852                            None
4853                        }
4854                    })
4855                    .or_else(|| type_names.first().cloned())
4856                    .unwrap_or_default();
4857                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4858                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4859                let properties: HashMap<String, Value> = map
4860                    .iter()
4861                    .filter(|(k, _)| !k.starts_with('_'))
4862                    .map(|(k, v)| (k.clone(), v.clone()))
4863                    .collect();
4864                Some(uni_common::value::Edge {
4865                    eid,
4866                    edge_type,
4867                    src,
4868                    dst,
4869                    properties,
4870                })
4871            }
4872            _ => None,
4873        }
4874    }
4875}
4876
4877/// Read a vertex's full property map, preferring `prefetched` over a fresh
4878/// per-row `Backend::scan`.
4879///
4880/// `prefetched` is built once at the top of `apply_mutations` via
4881/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4882/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4883/// same `apply_mutations` invocation (counter increments, same-VID
4884/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4885/// storage state at SET entry. On a miss, fall back to the existing
4886/// per-row path; this preserves correctness for newly created VIDs,
4887/// schemaless rows, multi-label corner cases, and non-Mutation callers
4888/// that pass `&Prefetch::default()`.
4889pub(crate) async fn read_vertex_props_with_prefetch(
4890    vid: Vid,
4891    prefetched: &Prefetch,
4892    prop_manager: &PropertyManager,
4893    ctx: Option<&QueryContext>,
4894) -> Result<uni_common::Properties> {
4895    match prefetched.vertex.get(&vid).cloned() {
4896        Some(mut base) => {
4897            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4898                for (k, v) in l0 {
4899                    base.insert(k, v);
4900                }
4901            }
4902            Ok(base)
4903        }
4904        None => Ok(prop_manager
4905            .get_all_vertex_props_with_ctx(vid, ctx)
4906            .await?
4907            .unwrap_or_default()),
4908    }
4909}
4910
4911/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4912/// in L0 edge props so writes from earlier rows of the same
4913/// `apply_mutations` invocation take precedence. On a miss, fall back to
4914/// the per-EID storage path.
4915pub(crate) async fn read_edge_props_with_prefetch(
4916    eid: Eid,
4917    prefetched: &Prefetch,
4918    prop_manager: &PropertyManager,
4919    ctx: Option<&QueryContext>,
4920) -> Result<uni_common::Properties> {
4921    match prefetched.edge.get(&eid).cloned() {
4922        Some(mut base) => {
4923            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4924                for (k, v) in l0 {
4925                    base.insert(k, v);
4926                }
4927            }
4928            Ok(base)
4929        }
4930        None => Ok(prop_manager
4931            .get_all_edge_props_with_ctx(eid, ctx)
4932            .await?
4933            .unwrap_or_default()),
4934    }
4935}
4936
4937#[cfg(test)]
4938mod tests {
4939    use super::*;
4940
4941    // ── merge_props tests ────────────────────────────────────────────
4942
4943    #[test]
4944    fn test_merge_props_replace_tombstones_missing_keys() {
4945        let current: HashMap<String, Value> = [
4946            ("name".into(), Value::String("Alice".into())),
4947            ("age".into(), Value::Int(30)),
4948        ]
4949        .into();
4950        let incoming: HashMap<String, Value> =
4951            [("name".into(), Value::String("Bob".into()))].into();
4952
4953        let result = Executor::merge_props(current, incoming, true);
4954        assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4955        assert_eq!(
4956            result.get("age"),
4957            Some(&Value::Null),
4958            "Missing keys should be tombstoned in replace mode"
4959        );
4960    }
4961
4962    #[test]
4963    fn test_merge_props_merge_preserves_existing() {
4964        let current: HashMap<String, Value> = [
4965            ("name".into(), Value::String("Alice".into())),
4966            ("age".into(), Value::Int(30)),
4967        ]
4968        .into();
4969        let incoming: HashMap<String, Value> =
4970            [("city".into(), Value::String("NYC".into()))].into();
4971
4972        let result = Executor::merge_props(current, incoming, false);
4973        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4974        assert_eq!(result.get("age"), Some(&Value::Int(30)));
4975        assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4976    }
4977
4978    #[test]
4979    fn test_merge_props_null_incoming_is_tombstone() {
4980        let current: HashMap<String, Value> =
4981            [("name".into(), Value::String("Alice".into()))].into();
4982        let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4983
4984        // Merge mode: null overwrites
4985        let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4986        assert_eq!(result.get("name"), Some(&Value::Null));
4987
4988        // Replace mode: null is tombstone
4989        let result = Executor::merge_props(current, incoming, true);
4990        assert_eq!(result.get("name"), Some(&Value::Null));
4991    }
4992
4993    #[test]
4994    fn test_merge_props_empty_current() {
4995        let current: HashMap<String, Value> = HashMap::new();
4996        let incoming: HashMap<String, Value> =
4997            [("name".into(), Value::String("Alice".into()))].into();
4998
4999        let result = Executor::merge_props(current, incoming, false);
5000        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
5001        assert_eq!(result.len(), 1);
5002    }
5003
5004    #[test]
5005    fn test_merge_props_empty_incoming_replace_tombstones_all() {
5006        let current: HashMap<String, Value> = [
5007            ("name".into(), Value::String("Alice".into())),
5008            ("age".into(), Value::Int(30)),
5009        ]
5010        .into();
5011        let incoming: HashMap<String, Value> = HashMap::new();
5012
5013        let result = Executor::merge_props(current, incoming, true);
5014        assert_eq!(result.get("name"), Some(&Value::Null));
5015        assert_eq!(result.get("age"), Some(&Value::Null));
5016    }
5017
5018    // ── extract_labels_from_node tests ───────────────────────────────
5019
5020    #[test]
5021    fn test_extract_labels_from_map() {
5022        let mut map = HashMap::new();
5023        map.insert("_vid".into(), Value::Int(1));
5024        map.insert(
5025            "_labels".into(),
5026            Value::List(vec![
5027                Value::String("Person".into()),
5028                Value::String("Employee".into()),
5029            ]),
5030        );
5031        let val = Value::Map(map);
5032
5033        let labels = Executor::extract_labels_from_node(&val);
5034        assert_eq!(
5035            labels,
5036            Some(vec!["Person".to_string(), "Employee".to_string()])
5037        );
5038    }
5039
5040    #[test]
5041    fn test_extract_labels_from_value_node() {
5042        let node = uni_common::Node {
5043            vid: uni_common::core::id::Vid::from(1u64),
5044            labels: vec!["Person".to_string()],
5045            properties: HashMap::new(),
5046        };
5047        let labels = Executor::extract_labels_from_node(&Value::Node(node));
5048        assert_eq!(labels, Some(vec!["Person".to_string()]));
5049    }
5050
5051    #[test]
5052    fn test_extract_labels_non_node_returns_none() {
5053        assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
5054        assert_eq!(
5055            Executor::extract_labels_from_node(&Value::String("hello".into())),
5056            None
5057        );
5058    }
5059
5060    // ── extract_user_properties_from_value tests ─────────────────────
5061
5062    #[test]
5063    fn test_extract_user_props_strips_internal_keys() {
5064        let mut map = HashMap::new();
5065        map.insert("_vid".into(), Value::Int(1));
5066        map.insert(
5067            "_labels".into(),
5068            Value::List(vec![Value::String("Person".into())]),
5069        );
5070        map.insert("name".into(), Value::String("Alice".into()));
5071        map.insert("age".into(), Value::Int(30));
5072
5073        let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
5074        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
5075        assert_eq!(props.get("age"), Some(&Value::Int(30)));
5076        assert!(!props.contains_key("_vid"));
5077        assert!(!props.contains_key("_labels"));
5078    }
5079
5080    #[test]
5081    fn test_extract_user_props_plain_map_returns_as_is() {
5082        let mut map = HashMap::new();
5083        map.insert("key".into(), Value::String("value".into()));
5084
5085        let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
5086        assert_eq!(props, map);
5087    }
5088
5089    #[test]
5090    fn test_extract_user_props_from_value_node() {
5091        let mut properties = HashMap::new();
5092        properties.insert("name".into(), Value::String("Alice".into()));
5093        let node = uni_common::Node {
5094            vid: uni_common::core::id::Vid::from(1u64),
5095            labels: vec!["Person".to_string()],
5096            properties,
5097        };
5098        let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
5099        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
5100    }
5101}