Skip to main content

uni_query/query/executor/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15    AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16    CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17    DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18    SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32    eid: Eid,
33    src: Vid,
34    dst: Vid,
35    edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44    vid: Vid,
45    labels: Vec<String>,
46    /// Full property map (storage union L0 from
47    /// `get_all_vertex_props_with_ctx` plus the touched values applied
48    /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49    /// tells the flush which columns to send to Lance via MergeInsert.
50    props: HashMap<String, Value>,
51    /// `true` when the SET should flush via the partial-column MergeInsert
52    /// path: set when `UniConfig::partial_lance_writes` is on AND the
53    /// label has no generated columns. Generated-column labels still need
54    /// the full-row Append so the regenerated values land.
55    partial: bool,
56    /// Set of property keys touched by this statement. Threaded into L0
57    /// so the flush emits a `MergeInsertBuilder` source with exactly
58    /// these columns. Empty when `partial == false`.
59    touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64    src: Vid,
65    dst: Vid,
66    edge_type_id: u32,
67    eid: Eid,
68    edge_type_name: String,
69    /// `true` when the SET should flush via the partial-column
70    /// MergeInsert path on the per-edge-type delta tables (Round 12
71    /// §A). Set when `UniConfig::partial_lance_writes` is on.
72    partial: bool,
73    /// Property keys touched by this statement. Threaded into L0 so
74    /// the flush emits a `MergeInsertBuilder` source with exactly
75    /// these columns. Empty when `partial == false`.
76    touched: HashSet<String>,
77    props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84    if vid.is_ephemeral() {
85        return Err(anyhow::Error::from(
86            uni_common::UniError::EphemeralWriteAttempt {
87                kind: "node",
88                id: vid.transient_id().unwrap_or(vid.as_u64()),
89            },
90        ));
91    }
92    Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97    match val {
98        Value::Null => "Null",
99        Value::Bool(_) => "Bool",
100        Value::Int(_) => "Int",
101        Value::Float(_) => "Float",
102        Value::String(_) => "String",
103        Value::Bytes(_) => "Bytes",
104        Value::List(_) => "List",
105        Value::Map(_) => "Map",
106        Value::Node(_) => "Node",
107        Value::Edge(_) => "Edge",
108        Value::Path(_) => "Path",
109        Value::Vector(_) => "Vector",
110        Value::Temporal(_) => "Temporal",
111        _ => "value",
112    }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117    if eid.is_ephemeral() {
118        return Err(anyhow::Error::from(
119            uni_common::UniError::EphemeralWriteAttempt {
120                kind: "edge",
121                id: eid.transient_id().unwrap_or(eid.as_u64()),
122            },
123        ));
124    }
125    Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150    labels: &[String],
151    op: &str,
152) -> Result<()> {
153    let Some(registry) = registry else {
154        return Ok(());
155    };
156    for label in labels {
157        if registry.virtual_label_by_name(label).is_some() {
158            return Err(anyhow!(
159                "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160                 labels are read-only; write back via the originating catalog instead"
161            ));
162        }
163    }
164    Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177    registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178    edge_type_id: u32,
179    op: &str,
180) -> Result<()> {
181    let Some(registry) = registry else {
182        return Ok(());
183    };
184    if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185        return Err(anyhow!(
186            "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187             types are read-only; write back via the originating catalog instead",
188            entry.name
189        ));
190    }
191    Ok(())
192}
193
194impl Executor {
195    /// Extracts labels from a node value.
196    ///
197    /// Handles both `Value::Map` (with a `_labels` list field) and
198    /// `Value::Node` (with a `labels` vec field).
199    ///
200    /// Returns `None` when the value is not a node or has no labels.
201    pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202        match node_val {
203            Value::Map(map) => {
204                // Map-encoded node: look for _labels array
205                if let Some(Value::List(labels_arr)) = map.get("_labels") {
206                    let labels: Vec<String> = labels_arr
207                        .iter()
208                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
209                        .collect();
210                    if !labels.is_empty() {
211                        return Some(labels);
212                    }
213                }
214                None
215            }
216            Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217            _ => None,
218        }
219    }
220
221    /// Extracts user-visible properties from a value that represents a node or edge.
222    ///
223    /// Strips internal bookkeeping keys (those prefixed with `_` or named
224    /// `ext_id`) from map-encoded entities and returns only the user-facing
225    /// property key-value pairs.
226    ///
227    /// Returns `None` when `val` is not a map, node, or edge.
228    pub(crate) fn extract_user_properties_from_value(
229        val: &Value,
230    ) -> Option<HashMap<String, Value>> {
231        match val {
232            Value::Map(map) => {
233                // Distinguish entity-encoded maps from plain map literals.
234                // A node map has both `_vid` and `_labels`.
235                // An edge map has `_eid`, `_src`, and `_dst`.
236                let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237                let is_edge_map = map.contains_key("_eid")
238                    && map.contains_key("_src")
239                    && map.contains_key("_dst");
240
241                if is_node_map || is_edge_map {
242                    // Filter out internal bookkeeping keys
243                    let user_props: HashMap<String, Value> = map
244                        .iter()
245                        .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246                        .map(|(k, v)| (k.clone(), v.clone()))
247                        .collect();
248                    // When mutation output omits dotted property columns, user
249                    // properties live inside `_all_props` rather than at the
250                    // top level of the entity map.
251                    if user_props.is_empty()
252                        && let Some(Value::Map(all_props)) = map.get("_all_props")
253                    {
254                        return Some(all_props.clone());
255                    }
256                    Some(user_props)
257                } else {
258                    // Plain map literal — return as-is
259                    Some(map.clone())
260                }
261            }
262            Value::Node(node) => Some(node.properties.clone()),
263            Value::Edge(edge) => Some(edge.properties.clone()),
264            _ => None,
265        }
266    }
267
268    /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269    ///
270    /// When `replace` is `true` the entity's property set is replaced: keys absent
271    /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272    /// layer removes them.  When `replace` is `false` the map is merged: keys in
273    /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274    /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275    /// modes.
276    ///
277    /// Labels are never altered — the spec states that `SET n = map` replaces
278    /// properties only.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the entity cannot be found in the storage layer, or
283    /// if the writer fails to persist the updated properties.
284    #[expect(clippy::too_many_arguments)]
285    async fn apply_properties_to_entity(
286        &self,
287        variable: &str,
288        new_props: HashMap<String, Value>,
289        replace: bool,
290        row: &mut HashMap<String, Value>,
291        writer: &Writer,
292        prop_manager: &PropertyManager,
293        params: &HashMap<String, Value>,
294        ctx: Option<&QueryContext>,
295        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296        prefetched: &Prefetch,
297    ) -> Result<()> {
298        // Clone the target so we can hold &row references elsewhere.
299        let target = row.get(variable).cloned();
300
301        // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302        // forms, mirroring the per-property SET path (issue #68).
303        let schema = self.storage.schema_manager().schema();
304
305        match target {
306            Some(Value::Node(ref node)) => {
307                let vid = node.vid;
308                let labels = node.labels.clone();
309                let current =
310                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311                let write_props = Self::merge_props(current, new_props, replace);
312                let mut enriched = write_props.clone();
313                for label_name in &labels {
314                    self.enrich_properties_with_generated_columns(
315                        label_name,
316                        &mut enriched,
317                        prop_manager,
318                        params,
319                        ctx,
320                    )
321                    .await?;
322                }
323                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324                let _ = writer
325                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326                    .await?;
327                // Update the in-memory row binding
328                if let Some(Value::Node(n)) = row.get_mut(variable) {
329                    n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330                }
331            }
332            Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333                let vid = Self::vid_from_value(node_val)?;
334                let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335                let current =
336                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337                let write_props = Self::merge_props(current, new_props, replace);
338                let mut enriched = write_props.clone();
339                for label_name in &labels {
340                    self.enrich_properties_with_generated_columns(
341                        label_name,
342                        &mut enriched,
343                        prop_manager,
344                        params,
345                        ctx,
346                    )
347                    .await?;
348                }
349                let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350                let _ = writer
351                    .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352                    .await?;
353                // Update the in-memory map-encoded node binding
354                if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355                    // Remove old user property keys, keep internal fields
356                    node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357                    // Build effective (non-null) properties
358                    let effective: HashMap<String, Value> =
359                        enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360                    for (k, v) in &effective {
361                        node_map.insert(k.clone(), v.clone());
362                    }
363                    // Replace _all_props to reflect the complete property set
364                    node_map.insert("_all_props".to_string(), Value::Map(effective));
365                }
366            }
367            Some(Value::Edge(ref edge)) => {
368                let eid = edge.eid;
369                let src = edge.src;
370                let dst = edge.dst;
371                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372                let current =
373                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374                let write_props = Self::merge_props(current, new_props, replace);
375                let write_props = Self::coerce_and_validate_props(
376                    write_props,
377                    &schema,
378                    std::slice::from_ref(&edge.edge_type),
379                )?;
380                writer
381                    .insert_edge(
382                        src,
383                        dst,
384                        etype,
385                        eid,
386                        write_props.clone(),
387                        Some(edge.edge_type.clone()),
388                        tx_l0,
389                    )
390                    .await?;
391                // Update the in-memory row binding
392                if let Some(Value::Edge(e)) = row.get_mut(variable) {
393                    e.properties = write_props
394                        .into_iter()
395                        .filter(|(_, v)| !v.is_null())
396                        .collect();
397                }
398            }
399            Some(Value::Map(ref map))
400                if map.contains_key("_eid")
401                    && map.contains_key("_src")
402                    && map.contains_key("_dst") =>
403            {
404                let ei = self.extract_edge_identity(map)?;
405                let current =
406                    read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407                let write_props = Self::merge_props(current, new_props, replace);
408                let edge_type_name = map
409                    .get("_type")
410                    .and_then(|v| v.as_str())
411                    .map(|s| s.to_string())
412                    .or_else(|| {
413                        self.storage
414                            .schema_manager()
415                            .edge_type_name_by_id_unified(ei.edge_type_id)
416                    });
417                let write_props = match &edge_type_name {
418                    Some(name) => Self::coerce_and_validate_props(
419                        write_props,
420                        &schema,
421                        std::slice::from_ref(name),
422                    )?,
423                    None => write_props,
424                };
425                writer
426                    .insert_edge(
427                        ei.src,
428                        ei.dst,
429                        ei.edge_type_id,
430                        ei.eid,
431                        write_props.clone(),
432                        edge_type_name,
433                        tx_l0,
434                    )
435                    .await?;
436                // Update the in-memory map-encoded edge binding
437                if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438                    edge_map.retain(|k, _| k.starts_with('_'));
439                    let effective: HashMap<String, Value> = write_props
440                        .into_iter()
441                        .filter(|(_, v)| !v.is_null())
442                        .collect();
443                    for (k, v) in &effective {
444                        edge_map.insert(k.clone(), v.clone());
445                    }
446                    // Replace _all_props to reflect the complete property set
447                    edge_map.insert("_all_props".to_string(), Value::Map(effective));
448                }
449            }
450            _ => {
451                // No matching entity — nothing to do (caller already guarded against Null)
452            }
453        }
454        Ok(())
455    }
456
457    /// Computes the property map to write given current storage state and the
458    /// incoming change map.
459    ///
460    /// When `replace` is `true`, keys present in `current` but absent from
461    /// `incoming` are tombstoned with `Value::Null`.  Null values inside
462    /// `incoming` are always preserved as explicit tombstones.
463    ///
464    /// When `replace` is `false`, `current` is the base and `incoming` is
465    /// merged on top: each key in `incoming` overwrites or tombstones the
466    /// corresponding entry in `current`.
467    fn merge_props(
468        current: HashMap<String, Value>,
469        incoming: HashMap<String, Value>,
470        replace: bool,
471    ) -> HashMap<String, Value> {
472        if replace {
473            // Start from the non-null incoming entries only.
474            let mut result: HashMap<String, Value> = incoming
475                .iter()
476                .filter(|(_, v)| !v.is_null())
477                .map(|(k, v)| (k.clone(), v.clone()))
478                .collect();
479            // Tombstone every current key that is absent from incoming OR explicitly
480            // set to null in incoming (both mean "delete this property").
481            for k in current.keys() {
482                if incoming.get(k).is_none_or(|v| v.is_null()) {
483                    result.insert(k.clone(), Value::Null);
484                }
485            }
486            result
487        } else {
488            // Merge: start from current and apply incoming on top
489            let mut result = current;
490            result.extend(incoming);
491            result
492        }
493    }
494
495    /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496    fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497        let eid = Eid::from(
498            map.get("_eid")
499                .and_then(|v| v.as_u64())
500                .ok_or_else(|| anyhow!("Invalid _eid"))?,
501        );
502        let src = Vid::from(
503            map.get("_src")
504                .and_then(|v| v.as_u64())
505                .ok_or_else(|| anyhow!("Invalid _src"))?,
506        );
507        let dst = Vid::from(
508            map.get("_dst")
509                .and_then(|v| v.as_u64())
510                .ok_or_else(|| anyhow!("Invalid _dst"))?,
511        );
512        let edge_type_id = self.resolve_edge_type_id(
513            map.get("_type")
514                .or_else(|| map.get("_type_name"))
515                .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516        )?;
517        Ok(EdgeIdentity {
518            eid,
519            src,
520            dst,
521            edge_type_id,
522        })
523    }
524
525    /// Resolve edge type ID from a Value, supporting both Int and String representations.
526    /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527    ///
528    /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529    /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530    /// where the edge type was just created and may not be in the read-only lookup yet.
531    fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532        match type_val {
533            Value::Int(i) => Ok(*i as u32),
534            Value::String(name) => {
535                if self.config.strict_schema {
536                    let schema = self.storage.schema_manager().schema();
537                    schema
538                        .edge_type_id_by_name_case_insensitive(name)
539                        .ok_or_else(|| {
540                            anyhow!(
541                                "Edge type '{}' is not defined in the schema \
542                                 (strict_schema is enabled). \
543                                 Declare it with db.schema().edge_type(...).apply() first.",
544                                name
545                            )
546                        })
547                } else {
548                    // Schemaless: assign new ID if not found in schema or registry.
549                    Ok(self
550                        .storage
551                        .schema_manager()
552                        .get_or_assign_edge_type_id(name))
553                }
554            }
555            _ => Err(anyhow!(
556                "Invalid _type value: expected Int or String, got {:?}",
557                type_val
558            )),
559        }
560    }
561
562    pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563        if let Some(writer_arc) = &self.writer {
564            // Flush first while holding the lock
565            {
566                let writer: &uni_store::Writer = writer_arc.as_ref();
567                writer.flush_to_l1(None).await?;
568            } // Drop lock before compacting to avoid blocking reads/writes
569
570            // Compaction can run without holding the writer lock
571            let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572            let compaction_results = compactor.compact_all().await?;
573
574            // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575            let am = self.storage.adjacency_manager();
576            let schema = self.storage.schema_manager().schema();
577            for info in compaction_results {
578                // Convert string direction to Direction enum
579                let direction = match info.direction.as_str() {
580                    "fwd" => uni_store::storage::direction::Direction::Outgoing,
581                    "bwd" => uni_store::storage::direction::Direction::Incoming,
582                    _ => continue,
583                };
584
585                // Get edge_type_id
586                if let Some(edge_type_id) =
587                    schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588                {
589                    // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590                    let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591                }
592            }
593        }
594        Ok(())
595    }
596
597    pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598        if let Some(writer_arc) = &self.writer {
599            let writer: &uni_store::Writer = writer_arc.as_ref();
600            writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601        }
602        Ok(())
603    }
604
605    pub(crate) async fn execute_copy_to(
606        &self,
607        identifier: &str,
608        path: &str,
609        format: &str,
610        options: &HashMap<String, Value>,
611    ) -> Result<usize> {
612        // Check schema to determine if identifier is an edge type or vertex label
613        let schema = self.storage.schema_manager().schema();
614
615        // Try as edge type first
616        if schema.get_edge_type_case_insensitive(identifier).is_some() {
617            return self
618                .export_edge_type_in_format(identifier, path, format)
619                .await;
620        }
621
622        // Try as vertex label
623        if schema.get_label_case_insensitive(identifier).is_some() {
624            return self
625                .export_vertex_label_in_format(identifier, path, format, options)
626                .await;
627        }
628
629        // Neither edge type nor vertex label found
630        Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631    }
632
633    async fn export_vertex_label_in_format(
634        &self,
635        label: &str,
636        path: &str,
637        format: &str,
638        _options: &HashMap<String, Value>,
639    ) -> Result<usize> {
640        match format {
641            "parquet" => self.export_vertex_label(label, path).await,
642            "csv" => {
643                let mut stream = self
644                    .storage
645                    .scan_vertex_table_stream(label)
646                    .await?
647                    .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649                // Collect all batches
650                let mut all_rows = Vec::new();
651                let mut column_names = Vec::new();
652
653                // Iterate stream using StreamExt
654                use futures::StreamExt;
655                while let Some(batch_result) = stream.next().await {
656                    let batch = batch_result?;
657
658                    // Get column names from first batch
659                    if column_names.is_empty() {
660                        column_names = batch
661                            .schema()
662                            .fields()
663                            .iter()
664                            .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665                            .map(|f| f.name().clone())
666                            .collect();
667                    }
668
669                    // Convert batch to rows
670                    for row_idx in 0..batch.num_rows() {
671                        let mut row = Vec::new();
672                        for field in batch.schema().fields() {
673                            if field.name().starts_with('_') || field.name() == "ext_id" {
674                                continue;
675                            }
676
677                            let col_idx = batch.schema().index_of(field.name())?;
678                            let column = batch.column(col_idx);
679                            let value = self.arrow_value_to_json(column, row_idx)?;
680
681                            // Convert value to CSV string
682                            let csv_value = match value {
683                                Value::Null => String::new(),
684                                Value::Bool(b) => b.to_string(),
685                                Value::Int(i) => i.to_string(),
686                                Value::Float(f) => f.to_string(),
687                                Value::String(s) => s,
688                                _ => format!("{value}"),
689                            };
690                            row.push(csv_value);
691                        }
692                        all_rows.push(row);
693                    }
694                }
695
696                // Write CSV
697                let file = std::fs::File::create(path)?;
698                let mut wtr = csv::Writer::from_writer(file);
699
700                // Write headers
701                log::debug!("CSV export headers: {:?}", column_names);
702                wtr.write_record(&column_names)?;
703
704                // Write rows
705                for (i, row) in all_rows.iter().enumerate() {
706                    log::debug!("CSV export row {}: {:?}", i, row);
707                    wtr.write_record(row)?;
708                }
709
710                wtr.flush()?;
711                Ok(all_rows.len())
712            }
713            _ => Err(anyhow!(
714                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715                format
716            )),
717        }
718    }
719
720    async fn export_edge_type_in_format(
721        &self,
722        edge_type: &str,
723        path: &str,
724        format: &str,
725    ) -> Result<usize> {
726        match format {
727            "parquet" => self.export_edge_type(edge_type, path).await,
728            "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729            _ => Err(anyhow!(
730                "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731                format
732            )),
733        }
734    }
735
736    /// Write a stream of record batches to a Parquet file.
737    /// Returns the total number of rows written, or 0 if the stream is empty.
738    async fn write_batches_to_parquet(
739        mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740        path: &str,
741        entity_description: &str,
742    ) -> Result<usize> {
743        use futures::TryStreamExt;
744
745        // Get first batch to determine schema and create writer
746        let first_batch = match stream.try_next().await? {
747            Some(batch) => batch,
748            None => {
749                log::info!("No data to export from {}", entity_description);
750                return Ok(0);
751            }
752        };
753
754        // Create Parquet writer using schema from first batch
755        let file = std::fs::File::create(path)?;
756        let arrow_schema = first_batch.schema();
757        let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759        // Write first batch
760        let mut count = first_batch.num_rows();
761        writer.write(&first_batch)?;
762
763        // Write remaining batches
764        while let Some(batch) = stream.try_next().await? {
765            count += batch.num_rows();
766            writer.write(&batch)?;
767        }
768
769        writer.close()?;
770
771        log::info!(
772            "Exported {} rows from {} to '{}'",
773            count,
774            entity_description,
775            path
776        );
777        Ok(count)
778    }
779
780    /// Export vertices of a specific label to Parquet
781    async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782        let stream = self
783            .storage
784            .scan_vertex_table_stream(label)
785            .await?
786            .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788        Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789    }
790
791    /// Export edges of a specific type to Parquet
792    async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793        let schema = self.storage.schema_manager().schema();
794        if !schema.edge_types.contains_key(edge_type) {
795            return Err(anyhow!("Edge type '{}' not found", edge_type));
796        }
797
798        let filter = format!("type = '{}'", edge_type);
799        let stream = self
800            .storage
801            .scan_main_edge_table_stream(Some(&filter))
802            .await?
803            .ok_or_else(|| anyhow!("No edge data found"))?;
804
805        Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806    }
807
808    pub(crate) async fn execute_copy_from(
809        &self,
810        label: &str,
811        path: &str,
812        format: &str,
813        options: &HashMap<String, Value>,
814    ) -> Result<usize> {
815        // Read data from file
816        let batches = match format {
817            "parquet" => self.read_parquet_file(path)?,
818            "csv" => self.read_csv_file(path, label, options)?,
819            _ => {
820                return Err(anyhow!(
821                    "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822                    format
823                ));
824            }
825        };
826
827        // Get writer
828        let writer_arc = self
829            .writer
830            .as_ref()
831            .ok_or_else(|| anyhow!("No writer available"))?;
832
833        let db_schema = self.storage.schema_manager().schema();
834
835        // Check if this is a label (vertex) or edge type
836        let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838        if is_edge {
839            // Import edges
840            let edge_type_id = db_schema
841                .edge_type_id_by_name(label)
842                .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844            // Get src and dst column names from options
845            let src_col = options
846                .get("src_col")
847                .and_then(|v| v.as_str())
848                .unwrap_or("src");
849            let dst_col = options
850                .get("dst_col")
851                .and_then(|v| v.as_str())
852                .unwrap_or("dst");
853
854            // §5.7 of concurrent_writer.md: writer is hoisted above the row
855            // loop now that there is no per-row lock acquisition cost.
856            let writer: &uni_store::Writer = writer_arc.as_ref();
857            let mut total_rows = 0;
858            for batch in batches {
859                let num_rows = batch.num_rows();
860                // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861                let eids = writer.allocate_eids(num_rows).await?;
862
863                for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864                    let mut properties = HashMap::new();
865                    let mut src_vid: Option<Vid> = None;
866                    let mut dst_vid: Option<Vid> = None;
867
868                    // Extract properties and VIDs from each column
869                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870                        let col_name = field.name();
871                        let column = batch.column(col_idx);
872                        let value = self.arrow_value_to_json(column, row_idx)?;
873
874                        if col_name == src_col {
875                            let raw = value.as_u64().unwrap_or_else(|| {
876                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877                            });
878                            src_vid = Some(Vid::new(raw));
879                        } else if col_name == dst_col {
880                            let raw = value.as_u64().unwrap_or_else(|| {
881                                value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882                            });
883                            dst_vid = Some(Vid::new(raw));
884                        } else if !col_name.starts_with('_') && !value.is_null() {
885                            properties.insert(col_name.clone(), value);
886                        }
887                    }
888
889                    let src = src_vid
890                        .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891                    let dst = dst_vid
892                        .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894                    writer
895                        .insert_edge(
896                            src,
897                            dst,
898                            edge_type_id,
899                            eid,
900                            properties,
901                            Some(label.to_string()),
902                            None,
903                        )
904                        .await?;
905
906                    total_rows += 1;
907                }
908            }
909
910            log::info!(
911                "Imported {} edge rows from '{}' into edge type '{}'",
912                total_rows,
913                path,
914                label
915            );
916
917            // Flush to persist edges
918            if total_rows > 0 {
919                writer.flush_to_l1(None).await?;
920            }
921
922            Ok(total_rows)
923        } else {
924            // Import vertices
925            // Validate the label exists in schema
926            db_schema
927                .label_id_by_name_case_insensitive(label)
928                .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930            // §5.7 of concurrent_writer.md: writer is hoisted above the row
931            // loop now that there is no per-row lock acquisition cost.
932            let writer: &uni_store::Writer = writer_arc.as_ref();
933            let mut total_rows = 0;
934            for batch in batches {
935                let num_rows = batch.num_rows();
936                // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937                let vids = writer.allocate_vids(num_rows).await?;
938
939                // Convert Arrow batch to rows
940                for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941                    let mut properties = HashMap::new();
942
943                    // Extract properties from each column
944                    for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945                        let col_name = field.name();
946
947                        // Skip internal columns
948                        if col_name.starts_with('_') {
949                            continue;
950                        }
951
952                        let column = batch.column(col_idx);
953                        let value = self.arrow_value_to_json(column, row_idx)?;
954
955                        if !value.is_null() {
956                            properties.insert(col_name.clone(), value);
957                        }
958                    }
959
960                    let _ = writer
961                        .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962                        .await?;
963
964                    total_rows += 1;
965                }
966            }
967
968            log::info!(
969                "Imported {} rows from '{}' into label '{}'",
970                total_rows,
971                path,
972                label
973            );
974
975            // Flush to persist vertices
976            if total_rows > 0 {
977                writer.flush_to_l1(None).await?;
978            }
979
980            Ok(total_rows)
981        }
982    }
983
984    fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985        use arrow_array::Array;
986        use arrow_schema::DataType as ArrowDataType;
987
988        if column.is_null(row_idx) {
989            return Ok(Value::Null);
990        }
991
992        match column.data_type() {
993            ArrowDataType::Utf8 => {
994                let array = column
995                    .as_any()
996                    .downcast_ref::<arrow_array::StringArray>()
997                    .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998                Ok(Value::String(array.value(row_idx).to_string()))
999            }
1000            ArrowDataType::Int32 => {
1001                let array = column
1002                    .as_any()
1003                    .downcast_ref::<arrow_array::Int32Array>()
1004                    .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005                Ok(Value::Int(array.value(row_idx) as i64))
1006            }
1007            ArrowDataType::Int64 => {
1008                let array = column
1009                    .as_any()
1010                    .downcast_ref::<arrow_array::Int64Array>()
1011                    .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012                Ok(Value::Int(array.value(row_idx)))
1013            }
1014            ArrowDataType::Float32 => {
1015                let array = column
1016                    .as_any()
1017                    .downcast_ref::<arrow_array::Float32Array>()
1018                    .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019                Ok(Value::Float(array.value(row_idx) as f64))
1020            }
1021            ArrowDataType::Float64 => {
1022                let array = column
1023                    .as_any()
1024                    .downcast_ref::<arrow_array::Float64Array>()
1025                    .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026                Ok(Value::Float(array.value(row_idx)))
1027            }
1028            ArrowDataType::Boolean => {
1029                let array = column
1030                    .as_any()
1031                    .downcast_ref::<arrow_array::BooleanArray>()
1032                    .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033                Ok(Value::Bool(array.value(row_idx)))
1034            }
1035            ArrowDataType::UInt64 => {
1036                let array = column
1037                    .as_any()
1038                    .downcast_ref::<arrow_array::UInt64Array>()
1039                    .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040                Ok(Value::Int(array.value(row_idx) as i64))
1041            }
1042            _ => {
1043                // For other types, try to convert to string
1044                let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045                if let Some(arr) = array {
1046                    Ok(Value::String(arr.value(row_idx).to_string()))
1047                } else {
1048                    Ok(Value::Null)
1049                }
1050            }
1051        }
1052    }
1053
1054    fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055        let file = std::fs::File::open(path)?;
1056        let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057            .build()?;
1058        reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059    }
1060
1061    fn read_csv_file(
1062        &self,
1063        path: &str,
1064        label: &str,
1065        options: &HashMap<String, Value>,
1066    ) -> Result<Vec<arrow_array::RecordBatch>> {
1067        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068        use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069        use std::sync::Arc;
1070
1071        // Parse CSV options
1072        let has_headers = options
1073            .get("headers")
1074            .and_then(|v| v.as_bool())
1075            .unwrap_or(true);
1076
1077        // Read CSV file
1078        let file = std::fs::File::open(path)?;
1079        let mut rdr = csv::ReaderBuilder::new()
1080            .has_headers(has_headers)
1081            .from_reader(file);
1082
1083        // Get schema for type conversion
1084        let db_schema = self.storage.schema_manager().schema();
1085        let properties = db_schema.properties.get(label);
1086
1087        // Collect all rows first to determine schema
1088        let mut rows: Vec<Vec<String>> = Vec::new();
1089        let headers: Vec<String> = if has_headers {
1090            rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091        } else {
1092            Vec::new()
1093        };
1094
1095        for result in rdr.records() {
1096            let record = result?;
1097            rows.push(record.iter().map(|s| s.to_string()).collect());
1098        }
1099
1100        if rows.is_empty() {
1101            return Ok(Vec::new());
1102        }
1103
1104        // Build Arrow schema with proper types based on DB schema
1105        let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106        let col_names: Vec<String> = if has_headers {
1107            headers
1108        } else {
1109            (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110        };
1111
1112        for name in &col_names {
1113            let arrow_type = if let Some(props) = properties {
1114                if let Some(prop_meta) = props.get(name) {
1115                    match prop_meta.r#type {
1116                        DataType::Int32 => ArrowDataType::Int32,
1117                        DataType::Int64 => ArrowDataType::Int64,
1118                        DataType::Float32 => ArrowDataType::Float32,
1119                        DataType::Float64 => ArrowDataType::Float64,
1120                        DataType::Bool => ArrowDataType::Boolean,
1121                        _ => ArrowDataType::Utf8,
1122                    }
1123                } else {
1124                    ArrowDataType::Utf8
1125                }
1126            } else {
1127                ArrowDataType::Utf8
1128            };
1129            arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130        }
1131
1132        let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134        // Convert rows to Arrow arrays with proper types
1135        let mut columns: Vec<ArrayRef> = Vec::new();
1136        for (col_idx, field) in arrow_fields.iter().enumerate() {
1137            match field.data_type() {
1138                ArrowDataType::Int32 => {
1139                    let values: Vec<Option<i32>> = rows
1140                        .iter()
1141                        .map(|row| {
1142                            if col_idx < row.len() {
1143                                row[col_idx].parse().ok()
1144                            } else {
1145                                None
1146                            }
1147                        })
1148                        .collect();
1149                    columns.push(Arc::new(Int32Array::from(values)));
1150                }
1151                _ => {
1152                    // Default to string
1153                    let values: Vec<Option<String>> = rows
1154                        .iter()
1155                        .map(|row| {
1156                            if col_idx < row.len() {
1157                                Some(row[col_idx].clone())
1158                            } else {
1159                                None
1160                            }
1161                        })
1162                        .collect();
1163                    columns.push(Arc::new(StringArray::from(values)));
1164                }
1165            }
1166        }
1167
1168        let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169        Ok(vec![batch])
1170    }
1171
1172    fn parse_data_type(type_str: &str) -> Result<DataType> {
1173        use uni_common::core::schema::{CrdtType, PointType};
1174        let type_str = type_str.to_lowercase();
1175        let type_str = type_str.trim();
1176        match type_str {
1177            "string" | "text" | "varchar" => Ok(DataType::String),
1178            "int" | "integer" | "int32" => Ok(DataType::Int32),
1179            "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180            "float" | "float32" | "real" => Ok(DataType::Float32),
1181            "double" | "float64" => Ok(DataType::Float64),
1182            "bool" | "boolean" => Ok(DataType::Bool),
1183            "timestamp" => Ok(DataType::Timestamp),
1184            "date" => Ok(DataType::Date),
1185            "time" => Ok(DataType::Time),
1186            "datetime" => Ok(DataType::DateTime),
1187            "duration" => Ok(DataType::Duration),
1188            "btic" => Ok(DataType::Btic),
1189            "json" | "jsonb" => Ok(DataType::CypherValue),
1190            "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191            "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192            "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193            "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194            s if s.starts_with("vector(") && s.ends_with(')') => {
1195                let dims_str = &s[7..s.len() - 1];
1196                let dimensions = dims_str
1197                    .parse::<usize>()
1198                    .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199                Ok(DataType::Vector { dimensions })
1200            }
1201            s if s.starts_with("list<") && s.ends_with('>') => {
1202                let inner_type_str = &s[5..s.len() - 1];
1203                let inner_type = Self::parse_data_type(inner_type_str)?;
1204                Ok(DataType::List(Box::new(inner_type)))
1205            }
1206            "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1207            "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1208            _ => Err(anyhow!("Unknown data type: {}", type_str)),
1209        }
1210    }
1211
1212    pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1213        let sm = self.storage.schema_manager_arc();
1214        if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1215            return Ok(());
1216        }
1217        sm.add_label_with_desc(&clause.name, clause.description)?;
1218        for prop in clause.properties {
1219            let dt = Self::parse_data_type(&prop.data_type)?;
1220            sm.add_property_with_desc(
1221                &clause.name,
1222                &prop.name,
1223                dt,
1224                prop.nullable,
1225                prop.description,
1226            )?;
1227            if prop.unique {
1228                let constraint = Constraint {
1229                    name: format!("{}_{}_unique", clause.name, prop.name),
1230                    constraint_type: ConstraintType::Unique {
1231                        properties: vec![prop.name],
1232                    },
1233                    target: ConstraintTarget::Label(clause.name.clone()),
1234                    enabled: true,
1235                };
1236                sm.add_constraint(constraint)?;
1237            }
1238        }
1239        sm.save().await?;
1240        Ok(())
1241    }
1242
1243    /// True if `key` is a generated property on any of the given labels.
1244    /// Used by the partial-write flush path (Round 12 §C) to decide
1245    /// whether the property should be added to `touched_keys` so that
1246    /// Lance MergeInsert sends the recomputed value.
1247    fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1248        let schema = self.storage.schema_manager().schema();
1249        for label in labels {
1250            if let Some(props_meta) = schema.properties.get(label)
1251                && let Some(meta) = props_meta.get(key)
1252                && meta.generation_expression.is_some()
1253            {
1254                return true;
1255            }
1256        }
1257        false
1258    }
1259
1260    pub(crate) async fn enrich_properties_with_generated_columns(
1261        &self,
1262        label_name: &str,
1263        properties: &mut HashMap<String, Value>,
1264        prop_manager: &PropertyManager,
1265        params: &HashMap<String, Value>,
1266        ctx: Option<&QueryContext>,
1267    ) -> Result<()> {
1268        let schema = self.storage.schema_manager().schema();
1269
1270        if let Some(props_meta) = schema.properties.get(label_name) {
1271            let mut generators = Vec::new();
1272            for (prop_name, meta) in props_meta {
1273                if let Some(expr_str) = &meta.generation_expression {
1274                    generators.push((prop_name.clone(), expr_str.clone()));
1275                }
1276            }
1277
1278            for (prop_name, expr_str) in generators {
1279                let cache_key = (label_name.to_string(), prop_name.clone());
1280                let expr = {
1281                    let cache = self.gen_expr_cache.read().await;
1282                    cache.get(&cache_key).cloned()
1283                };
1284
1285                let expr = match expr {
1286                    Some(e) => e,
1287                    None => {
1288                        let parsed = uni_cypher::parse_expression(&expr_str)
1289                            .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1290                        let mut cache = self.gen_expr_cache.write().await;
1291                        cache.insert(cache_key, parsed.clone());
1292                        parsed
1293                    }
1294                };
1295
1296                let mut scope = HashMap::new();
1297
1298                // If expression has an explicit variable, use it as an object
1299                if let Some(var) = expr.extract_variable() {
1300                    scope.insert(var, Value::Map(properties.clone()));
1301                } else {
1302                    // No explicit variable - add properties directly to scope for bare references
1303                    // e.g., "lower(email)" can reference "email" directly
1304                    for (k, v) in properties.iter() {
1305                        scope.insert(k.clone(), v.clone());
1306                    }
1307                }
1308
1309                let val = self
1310                    .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1311                    .await?;
1312                properties.insert(prop_name, val);
1313            }
1314        }
1315        Ok(())
1316    }
1317
1318    pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1319        let sm = self.storage.schema_manager_arc();
1320        if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1321            return Ok(());
1322        }
1323        sm.add_edge_type_with_desc(
1324            &clause.name,
1325            clause.src_labels,
1326            clause.dst_labels,
1327            clause.description,
1328        )?;
1329        for prop in clause.properties {
1330            let dt = Self::parse_data_type(&prop.data_type)?;
1331            sm.add_property_with_desc(
1332                &clause.name,
1333                &prop.name,
1334                dt,
1335                prop.nullable,
1336                prop.description,
1337            )?;
1338        }
1339        sm.save().await?;
1340        Ok(())
1341    }
1342
1343    /// Executes an ALTER action on a schema entity.
1344    ///
1345    /// This is a shared helper for both `execute_alter_label` and
1346    /// `execute_alter_edge_type` since they have identical logic.
1347    pub(crate) async fn execute_alter_entity(
1348        sm: &Arc<SchemaManager>,
1349        entity_name: &str,
1350        action: AlterAction,
1351    ) -> Result<()> {
1352        match action {
1353            AlterAction::AddProperty(prop) => {
1354                let dt = Self::parse_data_type(&prop.data_type)?;
1355                sm.add_property_with_desc(
1356                    entity_name,
1357                    &prop.name,
1358                    dt,
1359                    prop.nullable,
1360                    prop.description,
1361                )?;
1362            }
1363            AlterAction::DropProperty(prop_name) => {
1364                sm.drop_property(entity_name, &prop_name)?;
1365            }
1366            AlterAction::RenameProperty { old_name, new_name } => {
1367                sm.rename_property(entity_name, &old_name, &new_name)?;
1368            }
1369            AlterAction::SetDescription(desc) => {
1370                if sm.schema().labels.contains_key(entity_name) {
1371                    sm.set_label_description(entity_name, desc)?;
1372                } else {
1373                    sm.set_edge_type_description(entity_name, desc)?;
1374                }
1375            }
1376            AlterAction::SetPropertyDescription {
1377                property,
1378                description,
1379            } => {
1380                sm.set_property_description(entity_name, &property, description)?;
1381            }
1382        }
1383        sm.save().await?;
1384        Ok(())
1385    }
1386
1387    pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1388        Self::execute_alter_entity(
1389            &self.storage.schema_manager_arc(),
1390            &clause.name,
1391            clause.action,
1392        )
1393        .await
1394    }
1395
1396    pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1397        Self::execute_alter_entity(
1398            &self.storage.schema_manager_arc(),
1399            &clause.name,
1400            clause.action,
1401        )
1402        .await
1403    }
1404
1405    pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1406        let sm = self.storage.schema_manager_arc();
1407        sm.drop_label(&clause.name, clause.if_exists)?;
1408        sm.save().await?;
1409        Ok(())
1410    }
1411
1412    pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1413        let sm = self.storage.schema_manager_arc();
1414        sm.drop_edge_type(&clause.name, clause.if_exists)?;
1415        sm.save().await?;
1416        Ok(())
1417    }
1418
1419    pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1420        let sm = self.storage.schema_manager_arc();
1421        let target = ConstraintTarget::Label(clause.label);
1422        let c_type = match clause.constraint_type {
1423            AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1424                properties: clause.properties,
1425            },
1426            AstConstraintType::Exists => {
1427                let property = clause
1428                    .properties
1429                    .into_iter()
1430                    .next()
1431                    .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1432                ConstraintType::Exists { property }
1433            }
1434            AstConstraintType::Check => {
1435                let expression = clause
1436                    .expression
1437                    .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1438                ConstraintType::Check {
1439                    expression: expression.to_string_repr(),
1440                }
1441            }
1442        };
1443
1444        let constraint = Constraint {
1445            name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1446            constraint_type: c_type,
1447            target,
1448            enabled: true,
1449        };
1450
1451        sm.add_constraint(constraint)?;
1452        sm.save().await?;
1453        Ok(())
1454    }
1455
1456    pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1457        let sm = self.storage.schema_manager_arc();
1458        sm.drop_constraint(&clause.name, false)?;
1459        sm.save().await?;
1460        Ok(())
1461    }
1462
1463    /// Detects the single-node, single-label MERGE shape the fast path serves.
1464    ///
1465    /// Returns the node pattern and its label when `pattern` is one path with
1466    /// one node element, exactly one label, and a static map-literal property
1467    /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1468    /// per-row query planning. The keys do NOT need to be indexed: the persisted
1469    /// lookup degrades to a (single, filtered) label scan when no scalar index
1470    /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1471    /// Any other shape (edges, multiple labels, non-literal properties) returns
1472    /// `None` so the caller uses the general per-row path.
1473    fn merge_single_node_fastpath<'p>(
1474        &self,
1475        pattern: &'p Pattern,
1476    ) -> Option<(&'p NodePattern, String)> {
1477        if pattern.paths.len() != 1 {
1478            return None;
1479        }
1480        let path = &pattern.paths[0];
1481        if path.elements.len() != 1 {
1482            return None;
1483        }
1484        let PatternElement::Node(n) = &path.elements[0] else {
1485            return None;
1486        };
1487        let labels = n.labels.names();
1488        if labels.len() != 1 {
1489            return None;
1490        }
1491        // The key must be a static map literal so the key names are known.
1492        let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1493            return None;
1494        };
1495        if entries.is_empty() {
1496            return None;
1497        }
1498        Some((n, labels[0].clone()))
1499    }
1500
1501    /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1502    /// is not a scalar this fast path can represent.
1503    ///
1504    /// Returning `None` makes the caller fall back to the general per-row path,
1505    /// so unusual key value types (lists, maps, temporals, nulls) are never
1506    /// silently mis-matched. The `_deleted = false` clause mirrors the
1507    /// persisted-read predicate used elsewhere; the version high-water-mark
1508    /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1509    fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1510        if key_props.is_empty() {
1511            return None;
1512        }
1513        let mut parts = Vec::with_capacity(key_props.len() + 1);
1514        for (k, v) in key_props {
1515            // Keys come from a static map literal, but validate anyway (issue #8).
1516            if k.is_empty() || !k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
1517                return None;
1518            }
1519            let lit = match v {
1520                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1521                Value::Int(i) => i.to_string(),
1522                Value::Float(f) => f.to_string(),
1523                Value::Bool(b) => b.to_string(),
1524                _ => return None,
1525            };
1526            // Unquoted identifier: the Lance filter parser does not resolve a
1527            // double-quoted column name against the table here, so `"k" = v`
1528            // silently matches nothing. Keys are validated above to be safe
1529            // bare identifiers.
1530            parts.push(format!("{k} = {lit}"));
1531        }
1532        parts.push("_deleted = false".to_string());
1533        Some(parts.join(" AND "))
1534    }
1535
1536    /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1537    fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1538        let mut tuple: MergeKey = key_props
1539            .iter()
1540            .map(|(k, v)| (k.clone(), v.clone()))
1541            .collect();
1542        tuple.sort_by(|a, b| a.0.cmp(&b.0));
1543        tuple
1544    }
1545
1546    /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1547    ///
1548    /// Walked once per MERGE statement (issue #69): the per-row fast path then
1549    /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1550    /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1551    /// rows and rows created earlier in the same transaction; rows created by
1552    /// later rows of this same statement are folded in incrementally by
1553    /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1554    /// [`Self::merge_key_tuple`].
1555    fn merge_l0_existing(
1556        &self,
1557        label: &str,
1558        key_names: &[String],
1559        ctx: Option<&QueryContext>,
1560    ) -> HashMap<MergeKey, Vec<Vid>> {
1561        let mut candidates: Vec<Vid> = Vec::new();
1562        l0_visibility::visit_l0_buffers(ctx, |l0| {
1563            if let Some(vids) = l0.label_to_vids.get(label) {
1564                candidates.extend(vids.iter().copied());
1565            }
1566            false
1567        });
1568
1569        let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1570        let mut seen: HashSet<Vid> = HashSet::new();
1571        for vid in candidates {
1572            if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1573                continue;
1574            }
1575            // `lookup_vertex_prop` merges across L0 layers (newest wins).
1576            let tuple: MergeKey = key_names
1577                .iter()
1578                .map(|k| {
1579                    let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1580                    (k.clone(), v)
1581                })
1582                .collect();
1583            map.entry(tuple).or_default().push(vid);
1584        }
1585        map
1586    }
1587
1588    /// Persisted (flushed) vertices of `label` matching `key_props`.
1589    ///
1590    /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1591    /// read path `MATCH` uses, so it honors the version high-water-mark and sees
1592    /// flushed rows — then drops rows an L0 overlay deleted or whose key an L0
1593    /// overlay rewrote. L0-only matches are supplied separately by the per-batch
1594    /// snapshot ([`Self::merge_l0_existing`]).
1595    ///
1596    /// # Errors
1597    /// Propagates persisted-scan failures.
1598    async fn merge_lookup_persisted(
1599        &self,
1600        label: &str,
1601        key_props: &HashMap<String, Value>,
1602        filter: &str,
1603        ctx: Option<&QueryContext>,
1604    ) -> Result<Vec<Vid>> {
1605        let mut matches: Vec<Vid> = Vec::new();
1606        let scanned = self
1607            .storage
1608            .scan_vertex_table(label, &["_vid"], Some(filter))
1609            .await?;
1610        if let Some(batch) = scanned
1611            && let Some(col) = batch
1612                .column_by_name("_vid")
1613                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1614        {
1615            for i in 0..col.len() {
1616                let vid = Vid::from(col.value(i));
1617                if l0_visibility::is_vertex_deleted(vid, ctx) {
1618                    continue;
1619                }
1620                if Self::vid_overrides_break_key(vid, key_props, ctx) {
1621                    continue;
1622                }
1623                matches.push(vid);
1624            }
1625        }
1626        Ok(matches)
1627    }
1628
1629    /// True if an L0 override rewrote any key column of a persisted match away
1630    /// from its requested value (so the persisted row no longer matches).
1631    fn vid_overrides_break_key(
1632        vid: Vid,
1633        key_props: &HashMap<String, Value>,
1634        ctx: Option<&QueryContext>,
1635    ) -> bool {
1636        key_props.iter().any(|(k, want)| {
1637            matches!(l0_visibility::lookup_vertex_prop(vid, k, ctx), Some(got) if &got != want)
1638        })
1639    }
1640
1641    /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
1642    /// node variable.
1643    ///
1644    /// Matches the binding shape produced by `execute_create_pattern` and the
1645    /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
1646    /// resolve the variable identically — a bare `Value::Int(vid)` is not a
1647    /// valid node binding for those consumers.
1648    fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
1649        let mut obj = HashMap::new();
1650        obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
1651        obj.insert(
1652            "_labels".to_string(),
1653            Value::List(vec![Value::String(label.to_string())]),
1654        );
1655        for (k, v) in props {
1656            obj.insert(k, v);
1657        }
1658        Value::Map(obj)
1659    }
1660
1661    /// True if an L0-only vertex has every key column set to the requested
1662    /// value. A missing column matches only a requested `Null`.
1663    fn l0_vid_matches_key(
1664        vid: Vid,
1665        key_props: &HashMap<String, Value>,
1666        ctx: Option<&QueryContext>,
1667    ) -> bool {
1668        key_props.iter().all(
1669            |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
1670                Some(got) => &got == want,
1671                None => *want == Value::Null,
1672            },
1673        )
1674    }
1675
1676    /// Index fast-path execution for one MERGE row of the shape detected by
1677    /// [`Self::merge_indexed_fastpath`].
1678    ///
1679    /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
1680    /// no per-row L0 enumeration) plus a persisted index scan
1681    /// ([`Self::merge_lookup_persisted`]); applies ON MATCH SET to every match,
1682    /// or creates the node and applies ON CREATE SET when there is none. A newly
1683    /// created vertex is folded into `existing` so a later row of the same batch
1684    /// with the same key matches it (intra-batch dedup). Returns the RETURN rows
1685    /// for this input row (one per match, or one for a create).
1686    ///
1687    /// # Errors
1688    /// Propagates evaluation, lookup, create, and SET failures.
1689    #[expect(
1690        clippy::too_many_arguments,
1691        reason = "mirrors execute_merge's threaded execution state"
1692    )]
1693    async fn execute_merge_row_indexed(
1694        &self,
1695        label: &str,
1696        node: &NodePattern,
1697        path_pattern: &Pattern,
1698        temp_vars: &[String],
1699        mut row: HashMap<String, Value>,
1700        key_props: &HashMap<String, Value>,
1701        filter: &str,
1702        key_tuple: &MergeKey,
1703        existing: &mut HashMap<MergeKey, Vec<Vid>>,
1704        on_match: Option<&SetClause>,
1705        on_create: Option<&SetClause>,
1706        prop_manager: &PropertyManager,
1707        params: &HashMap<String, Value>,
1708        ctx: Option<&QueryContext>,
1709        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1710        writer: &Writer,
1711    ) -> Result<Vec<HashMap<String, Value>>> {
1712        let mut seen: HashSet<Vid> = HashSet::new();
1713        let mut matches: Vec<Vid> = Vec::new();
1714        // Persisted (flushed) matches via the index scan.
1715        for vid in self
1716            .merge_lookup_persisted(label, key_props, filter, ctx)
1717            .await?
1718        {
1719            if seen.insert(vid) {
1720                matches.push(vid);
1721            }
1722        }
1723        // L0 / intra-batch matches from the per-batch snapshot, re-verified live
1724        // in case a prior row of this batch mutated or deleted the candidate.
1725        if let Some(vids) = existing.get(key_tuple) {
1726            for &vid in vids {
1727                if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1728                    continue;
1729                }
1730                if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
1731                    matches.push(vid);
1732                }
1733            }
1734        }
1735
1736        let mut out = Vec::new();
1737        if matches.is_empty() {
1738            // No match: create the node, then apply ON CREATE SET.
1739            self.execute_create_pattern(
1740                path_pattern,
1741                &mut row,
1742                writer,
1743                prop_manager,
1744                params,
1745                ctx,
1746                tx_l0_override,
1747            )
1748            .await?;
1749            if let Some(set) = on_create {
1750                self.execute_set_items_locked(
1751                    &set.items,
1752                    &mut row,
1753                    writer,
1754                    prop_manager,
1755                    params,
1756                    ctx,
1757                    tx_l0_override,
1758                    &Prefetch::default(),
1759                )
1760                .await?;
1761            }
1762            // Fold the new vertex into the batch snapshot for intra-batch dedup.
1763            if let Some(var) = &node.variable
1764                && let Some(val) = row.get(var)
1765                && let Ok(vid) = Self::vid_from_value(val)
1766            {
1767                existing.entry(key_tuple.clone()).or_default().push(vid);
1768            }
1769            Self::bind_path_variables(path_pattern, &mut row, temp_vars);
1770            out.push(row);
1771        } else {
1772            // Apply ON MATCH SET to every matched node (multi-match semantics),
1773            // binding the node variable as a Map with _vid/_labels/props so
1774            // RETURN and downstream operators resolve it as they would for the
1775            // general MATCH and CREATE paths.
1776            for vid in matches {
1777                let mut m = row.clone();
1778                if let Some(var) = &node.variable {
1779                    // Minimal binding so ON MATCH SET resolves the node by _vid.
1780                    m.insert(
1781                        var.clone(),
1782                        Self::build_node_map(vid, label, HashMap::new()),
1783                    );
1784                }
1785                if let Some(set) = on_match {
1786                    self.execute_set_items_locked(
1787                        &set.items,
1788                        &mut m,
1789                        writer,
1790                        prop_manager,
1791                        params,
1792                        ctx,
1793                        tx_l0_override,
1794                        &Prefetch::default(),
1795                    )
1796                    .await?;
1797                }
1798                if let Some(var) = &node.variable {
1799                    // Rebind with full, post-SET properties for RETURN fidelity.
1800                    let props = read_vertex_props_with_prefetch(
1801                        vid,
1802                        &Prefetch::default(),
1803                        prop_manager,
1804                        ctx,
1805                    )
1806                    .await?;
1807                    m.insert(var.clone(), Self::build_node_map(vid, label, props));
1808                }
1809                Self::bind_path_variables(path_pattern, &mut m, temp_vars);
1810                out.push(m);
1811            }
1812        }
1813        Ok(out)
1814    }
1815
1816    #[expect(clippy::too_many_arguments)]
1817    pub(crate) async fn execute_merge(
1818        &self,
1819        rows: Vec<HashMap<String, Value>>,
1820        pattern: &Pattern,
1821        on_match: Option<&SetClause>,
1822        on_create: Option<&SetClause>,
1823        prop_manager: &PropertyManager,
1824        params: &HashMap<String, Value>,
1825        ctx: Option<&QueryContext>,
1826        tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1827    ) -> Result<Vec<HashMap<String, Value>>> {
1828        let writer_lock = self
1829            .writer
1830            .as_ref()
1831            .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1832
1833        // Prepare pattern for path variable binding: assign temp edge variable
1834        // names to unnamed relationships in paths that have path variables.
1835        let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1836
1837        // Issue #69: a single-node, single-label MERGE takes the fast path,
1838        // skipping the per-row query planning that made batched MERGE no faster
1839        // than a per-entity loop. Indexed keys get an index point-lookup;
1840        // un-indexed keys still skip planning (the lookup is a filtered scan).
1841        // The shape is the same for every row, so it is detected once.
1842        let fastpath = self.merge_single_node_fastpath(pattern);
1843
1844        // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
1845        // fast path then resolves L0/intra-batch matches with an O(1) lookup
1846        // instead of re-walking L0 for every row. `key_names` is the sorted
1847        // static key set, matching `merge_key_tuple`.
1848        let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1849        if let Some((node, label)) = &fastpath {
1850            let mut key_names: Vec<String> = match &node.properties {
1851                Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
1852                _ => Vec::new(),
1853            };
1854            key_names.sort();
1855            fast_existing = self.merge_l0_existing(label, &key_names, ctx);
1856        }
1857
1858        let mut results = Vec::new();
1859        for mut row in rows {
1860            if let Some((node, label)) = &fastpath {
1861                // Evaluate the MERGE key for this row. Only take the fast path
1862                // when every key value is a scalar the persisted scan can
1863                // express; otherwise fall through to the general per-row path.
1864                let mut key_props: HashMap<String, Value> = HashMap::new();
1865                if let Some(props_expr) = &node.properties
1866                    && let Value::Map(map) = self
1867                        .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1868                        .await?
1869                {
1870                    key_props = map;
1871                }
1872                if let Some(filter) = Self::merge_key_filter(&key_props) {
1873                    let key_tuple = Self::merge_key_tuple(&key_props);
1874                    let writer: &uni_store::Writer = writer_lock.as_ref();
1875                    let row_out = self
1876                        .execute_merge_row_indexed(
1877                            label,
1878                            node,
1879                            &path_pattern,
1880                            &temp_vars,
1881                            row,
1882                            &key_props,
1883                            &filter,
1884                            &key_tuple,
1885                            &mut fast_existing,
1886                            on_match,
1887                            on_create,
1888                            prop_manager,
1889                            params,
1890                            ctx,
1891                            tx_l0_override,
1892                            writer,
1893                        )
1894                        .await?;
1895                    results.extend(row_out);
1896                    continue;
1897                }
1898                // Non-scalar key value: fall through to the general path below.
1899            }
1900
1901            // General execution: match-or-create per row. (The index fast path
1902            // above already handles single-node, single-label, scalar-indexed
1903            // MERGE — including unique-constrained labels, whose keys are
1904            // indexed — so there is no separate constraint-only fast path.)
1905            let matches = self
1906                .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1907                .await?;
1908            let writer: &uni_store::Writer = writer_lock.as_ref();
1909
1910            let result: Result<Vec<HashMap<String, Value>>> = async {
1911                let mut batch = Vec::new();
1912                if !matches.is_empty() {
1913                    for mut m in matches {
1914                        if let Some(set) = on_match {
1915                            self.execute_set_items_locked(
1916                                &set.items,
1917                                &mut m,
1918                                writer,
1919                                prop_manager,
1920                                params,
1921                                ctx,
1922                                tx_l0_override,
1923                                &Prefetch::default(),
1924                            )
1925                            .await?;
1926                        }
1927                        Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1928                        batch.push(m);
1929                    }
1930                } else {
1931                    self.execute_create_pattern(
1932                        &path_pattern,
1933                        &mut row,
1934                        writer,
1935                        prop_manager,
1936                        params,
1937                        ctx,
1938                        tx_l0_override,
1939                    )
1940                    .await?;
1941                    if let Some(set) = on_create {
1942                        self.execute_set_items_locked(
1943                            &set.items,
1944                            &mut row,
1945                            writer,
1946                            prop_manager,
1947                            params,
1948                            ctx,
1949                            tx_l0_override,
1950                            &Prefetch::default(),
1951                        )
1952                        .await?;
1953                    }
1954                    Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1955                    batch.push(row);
1956                }
1957                Ok(batch)
1958            }
1959            .await;
1960
1961            results.extend(result?);
1962        }
1963        Ok(results)
1964    }
1965
1966    /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
1967    #[expect(clippy::too_many_arguments)]
1968    pub(crate) async fn execute_create_pattern(
1969        &self,
1970        pattern: &Pattern,
1971        row: &mut HashMap<String, Value>,
1972        writer: &Writer,
1973        prop_manager: &PropertyManager,
1974        params: &HashMap<String, Value>,
1975        ctx: Option<&QueryContext>,
1976        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1977    ) -> Result<()> {
1978        for path in &pattern.paths {
1979            let mut prev_vid: Option<Vid> = None;
1980            // (rel_var, type_id, type_name, props_expr, direction)
1981            type PendingRel = (String, u32, String, Option<Expr>, Direction);
1982            let mut rel_pending: Option<PendingRel> = None;
1983
1984            for element in &path.elements {
1985                match element {
1986                    PatternElement::Node(n) => {
1987                        let mut vid = None;
1988
1989                        // Check if node variable already bound in row
1990                        if let Some(var) = &n.variable
1991                            && let Some(val) = row.get(var)
1992                            && let Ok(existing_vid) = Self::vid_from_value(val)
1993                        {
1994                            vid = Some(existing_vid);
1995                        }
1996
1997                        // If not bound, create it
1998                        if vid.is_none() {
1999                            let mut props = HashMap::new();
2000                            if let Some(props_expr) = &n.properties {
2001                                let props_val = self
2002                                    .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2003                                    .await?;
2004                                if let Value::Map(map) = props_val {
2005                                    for (k, v) in map {
2006                                        props.insert(k, v);
2007                                    }
2008                                } else {
2009                                    return Err(anyhow!("Properties must evaluate to a map"));
2010                                }
2011                            }
2012
2013                            let schema = self.storage.schema_manager().schema();
2014
2015                            // Strict schema: reject undeclared labels.
2016                            if self.config.strict_schema {
2017                                for label_name in &n.labels {
2018                                    if schema.get_label_case_insensitive(label_name).is_none() {
2019                                        return Err(anyhow!(
2020                                            "Label '{}' is not defined in the schema \
2021                                             (strict_schema is enabled). \
2022                                             Declare it with db.schema().label(...).apply() first.",
2023                                            label_name
2024                                        ));
2025                                    }
2026                                }
2027                            }
2028
2029                            // VID generation is label-independent. Pull from the
2030                            // per-tx reservoir if set (amortizes the global
2031                            // IdAllocator mutex), else fall back to the direct
2032                            // per-VID path.
2033                            let new_vid = match &self.id_reservoir {
2034                                Some(r) => r.next_vid().await?,
2035                                None => writer.next_vid().await?,
2036                            };
2037
2038                            // Enrich with generated columns only for known labels
2039                            for label_name in &n.labels {
2040                                if schema.get_label_case_insensitive(label_name).is_some() {
2041                                    self.enrich_properties_with_generated_columns(
2042                                        label_name,
2043                                        &mut props,
2044                                        prop_manager,
2045                                        params,
2046                                        ctx,
2047                                    )
2048                                    .await?;
2049                                }
2050                            }
2051
2052                            // Validate/coerce against declared types AFTER enrichment, so
2053                            // a type mismatch is rejected here rather than silently nulled
2054                            // (and the row dropped) at flush — issue #68.
2055                            let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2056
2057                            // Insert vertex and get back final properties (includes auto-generated embeddings)
2058                            let final_props = writer
2059                                .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2060                                .await?;
2061
2062                            // Build node object with final properties (includes embeddings)
2063                            if let Some(var) = &n.variable {
2064                                let mut obj = HashMap::new();
2065                                obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2066                                let labels_list: Vec<Value> =
2067                                    n.labels.iter().map(|l| Value::String(l.clone())).collect();
2068                                obj.insert("_labels".to_string(), Value::List(labels_list));
2069                                for (k, v) in &final_props {
2070                                    obj.insert(k.clone(), v.clone());
2071                                }
2072                                // Store node as a Map with _vid, matching MATCH behavior
2073                                row.insert(var.clone(), Value::Map(obj));
2074                            }
2075                            vid = Some(new_vid);
2076                        }
2077
2078                        let current_vid = vid.unwrap();
2079
2080                        if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2081                            rel_pending.take()
2082                            && let Some(src) = prev_vid
2083                        {
2084                            let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2085
2086                            if !is_rel_bound {
2087                                let mut rel_props = HashMap::new();
2088                                if let Some(expr) = rel_props_expr {
2089                                    let val = self
2090                                        .evaluate_expr(&expr, row, prop_manager, params, ctx)
2091                                        .await?;
2092                                    if let Value::Map(map) = val {
2093                                        rel_props.extend(map);
2094                                    }
2095                                }
2096                                // Validate/coerce edge properties against the declared
2097                                // edge-type schema before storing — issue #68.
2098                                let edge_schema = self.storage.schema_manager().schema();
2099                                let rel_props = Self::coerce_and_validate_props(
2100                                    rel_props,
2101                                    &edge_schema,
2102                                    std::slice::from_ref(&type_name),
2103                                )?;
2104                                let eid = match &self.id_reservoir {
2105                                    Some(r) => r.next_eid().await?,
2106                                    None => writer.next_eid(type_id).await?,
2107                                };
2108
2109                                // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2110                                let (edge_src, edge_dst) = match dir {
2111                                    Direction::Incoming => (current_vid, src),
2112                                    _ => (src, current_vid),
2113                                };
2114
2115                                let store_props = !rel_var.is_empty();
2116                                let user_props = if store_props {
2117                                    rel_props.clone()
2118                                } else {
2119                                    HashMap::new()
2120                                };
2121
2122                                writer
2123                                    .insert_edge(
2124                                        edge_src,
2125                                        edge_dst,
2126                                        type_id,
2127                                        eid,
2128                                        rel_props,
2129                                        Some(type_name.clone()),
2130                                        tx_l0,
2131                                    )
2132                                    .await?;
2133
2134                                // Edge type name is now stored by insert_edge
2135
2136                                if store_props {
2137                                    let mut edge_map = HashMap::new();
2138                                    edge_map.insert(
2139                                        "_eid".to_string(),
2140                                        Value::Int(eid.as_u64() as i64),
2141                                    );
2142                                    edge_map.insert(
2143                                        "_src".to_string(),
2144                                        Value::Int(edge_src.as_u64() as i64),
2145                                    );
2146                                    edge_map.insert(
2147                                        "_dst".to_string(),
2148                                        Value::Int(edge_dst.as_u64() as i64),
2149                                    );
2150                                    edge_map
2151                                        .insert("_type".to_string(), Value::Int(type_id as i64));
2152                                    // Include user properties so downstream RETURN sees them
2153                                    for (k, v) in user_props {
2154                                        edge_map.insert(k, v);
2155                                    }
2156                                    row.insert(rel_var, Value::Map(edge_map));
2157                                }
2158                            }
2159                        }
2160                        prev_vid = Some(current_vid);
2161                    }
2162                    PatternElement::Relationship(r) => {
2163                        if r.types.len() != 1 {
2164                            return Err(anyhow!(
2165                                "CREATE relationship must specify exactly one type"
2166                            ));
2167                        }
2168                        let type_name = &r.types[0];
2169                        let type_id = if self.config.strict_schema {
2170                            let schema = self.storage.schema_manager().schema();
2171                            schema
2172                                .edge_type_id_by_name_case_insensitive(type_name)
2173                                .ok_or_else(|| {
2174                                    anyhow!(
2175                                        "Edge type '{}' is not defined in the schema \
2176                                         (strict_schema is enabled). \
2177                                         Declare it with db.schema().edge_type(...).apply() first.",
2178                                        type_name
2179                                    )
2180                                })?
2181                        } else {
2182                            // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2183                            self.storage
2184                                .schema_manager()
2185                                .get_or_assign_edge_type_id(type_name)
2186                        };
2187
2188                        rel_pending = Some((
2189                            r.variable.clone().unwrap_or_default(),
2190                            type_id,
2191                            type_name.clone(),
2192                            r.properties.clone(),
2193                            r.direction.clone(),
2194                        ));
2195                    }
2196                    PatternElement::Parenthesized { .. } => {
2197                        return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2198                    }
2199                }
2200            }
2201        }
2202        Ok(())
2203    }
2204
2205    /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2206    ///
2207    /// These are never valid OpenCypher property values regardless of the declared column
2208    /// type. A `CypherValue` column is the sole exception and is handled by the caller
2209    /// before this is reached.
2210    ///
2211    /// # Errors
2212    /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2213    fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2214        match val {
2215            Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2216                anyhow::bail!(
2217                    "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2218                    prop_name
2219                );
2220            }
2221            Value::List(items) => {
2222                for item in items {
2223                    if matches!(
2224                        item,
2225                        Value::Map(_)
2226                            | Value::Node(_)
2227                            | Value::Edge(_)
2228                            | Value::Path(_)
2229                            | Value::List(_)
2230                    ) {
2231                        anyhow::bail!(
2232                            "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2233                            prop_name
2234                        );
2235                    }
2236                }
2237            }
2238            _ => {}
2239        }
2240        Ok(())
2241    }
2242
2243    /// Validates and coerces `val` against the declared schema type for `prop_name`.
2244    ///
2245    /// Returns the value to actually persist. Beyond the structural checks in
2246    /// [`Self::validate_structural_property_value`], this compares the value against the
2247    /// column's declared `DataType` and:
2248    ///
2249    /// - returns it unchanged when directly storable (including the intentional
2250    ///   `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
2251    /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
2252    ///   column into the proper `Temporal` value, using the same parser as the Cypher
2253    ///   `date()`/`time()`/`datetime()`/`duration()` constructors;
2254    /// - otherwise returns an error, so a type mismatch is surfaced at the call site
2255    ///   rather than silently nulled — and the row dropped at flush. See issue #68.
2256    ///
2257    /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
2258    /// behavior.
2259    ///
2260    /// # Errors
2261    /// Returns an error if the value's type is incompatible with the declared column type,
2262    /// or if a string destined for a temporal column is not a valid temporal literal.
2263    fn coerce_and_validate_property_value(
2264        prop_name: &str,
2265        val: Value,
2266        schema: &uni_common::core::schema::Schema,
2267        labels: &[String],
2268    ) -> Result<Value> {
2269        use uni_common::core::schema::DataType;
2270
2271        // Resolve the declared type from the first label that declares this property.
2272        let declared = labels.iter().find_map(|label| {
2273            schema
2274                .properties
2275                .get(label)
2276                .and_then(|props| props.get(prop_name))
2277                .map(|meta| &meta.r#type)
2278        });
2279
2280        // CypherValue columns accept any value (including maps) — skip all checks.
2281        if matches!(declared, Some(DataType::CypherValue)) {
2282            return Ok(val);
2283        }
2284
2285        let Some(dt) = declared else {
2286            // Schemaless property: reject structural values (maps/nodes/edges/paths and
2287            // lists containing them), otherwise store as-is.
2288            Self::validate_structural_property_value(prop_name, &val)?;
2289            return Ok(val);
2290        };
2291
2292        // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
2293        // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
2294        // `Vector`) receiving their matching value, and `Null` (always accepted).
2295        if dt.accepts(&val) {
2296            return Ok(val);
2297        }
2298
2299        // Known-safe coercion: a string into a temporal column is parsed as if it had
2300        // been wrapped in the matching Cypher temporal constructor.
2301        if matches!(val, Value::String(_)) {
2302            let ctor = match dt {
2303                DataType::DateTime => Some("DATETIME"),
2304                DataType::Date => Some("DATE"),
2305                DataType::Time => Some("TIME"),
2306                DataType::Duration => Some("DURATION"),
2307                _ => None,
2308            };
2309            if let Some(name) = ctor {
2310                return uni_query_functions::datetime::eval_datetime_function(
2311                    name,
2312                    std::slice::from_ref(&val),
2313                )
2314                .map_err(|e| {
2315                    anyhow!(
2316                        "TypeError: property '{}' is declared {:?} but the string value could \
2317                         not be parsed as a {} literal: {}",
2318                        prop_name,
2319                        dt,
2320                        name,
2321                        e
2322                    )
2323                });
2324            }
2325        }
2326
2327        // Not storable and not coercible. Prefer the structural message when the value
2328        // is itself structural (e.g. a map into a scalar column), preserving prior
2329        // behavior; otherwise report the scalar type mismatch.
2330        Self::validate_structural_property_value(prop_name, &val)?;
2331        anyhow::bail!(
2332            "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
2333            prop_name,
2334            dt,
2335            value_type_name(&val)
2336        );
2337    }
2338
2339    /// Coerces and validates every property in `props` against the declared types for `labels`.
2340    ///
2341    /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
2342    /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
2343    /// before handing properties to the writer, so a type mismatch is rejected up front rather
2344    /// than silently nulled — and the row dropped — at flush (issue #68).
2345    ///
2346    /// # Errors
2347    /// Returns an error on the first property whose value is incompatible with its declared type.
2348    fn coerce_and_validate_props(
2349        props: HashMap<String, Value>,
2350        schema: &uni_common::core::schema::Schema,
2351        labels: &[String],
2352    ) -> Result<HashMap<String, Value>> {
2353        let mut out = HashMap::with_capacity(props.len());
2354        for (k, v) in props {
2355            let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
2356            out.insert(k, cv);
2357        }
2358        Ok(out)
2359    }
2360
2361    #[expect(clippy::too_many_arguments)]
2362    pub(crate) async fn execute_set_items_locked(
2363        &self,
2364        items: &[SetItem],
2365        row: &mut HashMap<String, Value>,
2366        writer: &Writer,
2367        prop_manager: &PropertyManager,
2368        params: &HashMap<String, Value>,
2369        ctx: Option<&QueryContext>,
2370        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2371        prefetched: &Prefetch,
2372    ) -> Result<()> {
2373        // Coalesce SetItem::Property items by target so we do ONE read + ONE
2374        // write per (variable, target) instead of one read-modify-write cycle
2375        // per item. For an UPDATE that sets N properties on the same vertex
2376        // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
2377        // n.confidence = ...`), this collapses N redundant
2378        // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
2379        // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
2380        // the measurement, and the plan in
2381        // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
2382        // for the rationale.
2383        //
2384        // RHS evaluation order is preserved: we evaluate each RHS inline and
2385        // update the row binding immediately, so a later SetItem on the same
2386        // variable that reads `n.<earlier-prop>` sees the new value.
2387        //
2388        // Non-Property variants (Labels, Variable, VariablePlus) are less
2389        // common and have lower payoff; before processing one, we flush any
2390        // pending updates for the same variable so it sees the latest L0
2391        // state and ordering semantics are preserved.
2392        let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
2393        let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
2394
2395        for item in items {
2396            match item {
2397                SetItem::Property { expr, value } => {
2398                    if let Expr::Property(var_expr, prop_name) = expr
2399                        && let Expr::Variable(var_name) = &**var_expr
2400                        && let Some(node_val) = row.get(var_name)
2401                    {
2402                        if let Ok(vid) = Self::vid_from_value(node_val) {
2403                            reject_if_ephemeral_vid(vid)?;
2404                            let labels =
2405                                Self::extract_labels_from_node(node_val).unwrap_or_default();
2406                            let schema = self.storage.schema_manager().schema().clone();
2407
2408                            // Lazy one-time read. Always read the full row
2409                            // (preserves CRDT merge + constraint validation
2410                            // + scan-side L0 visibility). The
2411                            // partial-lance-writes optimization happens
2412                            // PURELY AT FLUSH TIME via the per-VID
2413                            // `vertex_partial_keys` set tracked in L0 — so
2414                            // L0 holds the full row, scans see the full
2415                            // row, and Lance only receives the touched
2416                            // columns. Generated-column-bearing labels
2417                            // ride the partial path too (Round 12 §C):
2418                            // `enrich_properties_with_generated_columns`
2419                            // runs at flush time over the merged-in-L0
2420                            // full row, and the produced generator keys
2421                            // are appended to `touched` so they land in
2422                            // the MergeInsert source.
2423                            if !pending_v.contains_key(var_name) {
2424                                let storage_cfg = &self.storage.config;
2425                                let partial = storage_cfg.partial_lance_writes;
2426                                let read = read_vertex_props_with_prefetch(
2427                                    vid,
2428                                    prefetched,
2429                                    prop_manager,
2430                                    ctx,
2431                                )
2432                                .await?;
2433                                pending_v.insert(
2434                                    var_name.clone(),
2435                                    PendingVertexSet {
2436                                        vid,
2437                                        labels: labels.clone(),
2438                                        props: read,
2439                                        partial,
2440                                        touched: HashSet::new(),
2441                                    },
2442                                );
2443                            }
2444
2445                            let val = self
2446                                .evaluate_expr(value, row, prop_manager, params, ctx)
2447                                .await?;
2448                            let val = Self::coerce_and_validate_property_value(
2449                                prop_name, val, &schema, &labels,
2450                            )?;
2451
2452                            let pv = pending_v
2453                                .get_mut(var_name)
2454                                .expect("inserted above when absent");
2455                            pv.props.insert(prop_name.clone(), val.clone());
2456                            if pv.partial {
2457                                pv.touched.insert(prop_name.clone());
2458                            }
2459
2460                            // Update the row binding so subsequent RHS sees the new value.
2461                            if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
2462                                node_map.insert(prop_name.clone(), val);
2463                            } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
2464                                node.properties.insert(prop_name.clone(), val);
2465                            }
2466                        } else if let Value::Map(map) = node_val
2467                            && map.get("_eid").is_some_and(|v| !v.is_null())
2468                            && map.get("_src").is_some_and(|v| !v.is_null())
2469                            && map.get("_dst").is_some_and(|v| !v.is_null())
2470                            && (map.get("_type").is_some_and(|v| !v.is_null())
2471                                || map.get("_type_name").is_some_and(|v| !v.is_null()))
2472                        {
2473                            let ei = self.extract_edge_identity(map)?;
2474                            reject_if_ephemeral_eid(ei.eid)?;
2475                            let schema = self.storage.schema_manager().schema().clone();
2476                            // Handle _type as either String or Int (Int from CREATE, String
2477                            // from queries). UNWIND on VLP edge lists emits `_type_name`
2478                            // instead of `_type`; accept either.
2479                            let type_val = map.get("_type").or_else(|| map.get("_type_name"));
2480                            let edge_type_name = match type_val {
2481                                Some(Value::String(s)) => s.clone(),
2482                                Some(Value::Int(id)) => schema
2483                                    .edge_type_name_by_id_unified(*id as u32)
2484                                    .unwrap_or_else(|| format!("EdgeType{}", id)),
2485                                _ => String::new(),
2486                            };
2487
2488                            if !pending_e.contains_key(var_name) {
2489                                let initial = read_edge_props_with_prefetch(
2490                                    ei.eid,
2491                                    prefetched,
2492                                    prop_manager,
2493                                    ctx,
2494                                )
2495                                .await?;
2496                                let partial = self.storage.config.partial_lance_writes;
2497                                pending_e.insert(
2498                                    var_name.clone(),
2499                                    PendingEdgeSet {
2500                                        src: ei.src,
2501                                        dst: ei.dst,
2502                                        edge_type_id: ei.edge_type_id,
2503                                        eid: ei.eid,
2504                                        edge_type_name: edge_type_name.clone(),
2505                                        props: initial,
2506                                        partial,
2507                                        touched: HashSet::new(),
2508                                    },
2509                                );
2510                            }
2511
2512                            let val = self
2513                                .evaluate_expr(value, row, prop_manager, params, ctx)
2514                                .await?;
2515                            let val = Self::coerce_and_validate_property_value(
2516                                prop_name,
2517                                val,
2518                                &schema,
2519                                std::slice::from_ref(&edge_type_name),
2520                            )?;
2521
2522                            let pe = pending_e
2523                                .get_mut(var_name)
2524                                .expect("inserted above when absent");
2525                            pe.props.insert(prop_name.clone(), val.clone());
2526                            if pe.partial {
2527                                pe.touched.insert(prop_name.clone());
2528                            }
2529
2530                            // Update the row object so subsequent RHS sees the new value.
2531                            if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
2532                                edge_map.insert(prop_name.clone(), val);
2533                            } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2534                                edge.properties.insert(prop_name.clone(), val);
2535                            }
2536                        } else if let Value::Edge(edge) = node_val {
2537                            // Handle Value::Edge directly (when traverse returns Edge objects).
2538                            reject_if_ephemeral_eid(edge.eid)?;
2539                            let eid = edge.eid;
2540                            let src = edge.src;
2541                            let dst = edge.dst;
2542                            let edge_type_name = edge.edge_type.clone();
2543                            let etype =
2544                                self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
2545                            let schema = self.storage.schema_manager().schema().clone();
2546
2547                            if !pending_e.contains_key(var_name) {
2548                                let initial = read_edge_props_with_prefetch(
2549                                    eid,
2550                                    prefetched,
2551                                    prop_manager,
2552                                    ctx,
2553                                )
2554                                .await?;
2555                                let partial = self.storage.config.partial_lance_writes;
2556                                pending_e.insert(
2557                                    var_name.clone(),
2558                                    PendingEdgeSet {
2559                                        src,
2560                                        dst,
2561                                        edge_type_id: etype,
2562                                        eid,
2563                                        edge_type_name: edge_type_name.clone(),
2564                                        props: initial,
2565                                        partial,
2566                                        touched: HashSet::new(),
2567                                    },
2568                                );
2569                            }
2570
2571                            let val = self
2572                                .evaluate_expr(value, row, prop_manager, params, ctx)
2573                                .await?;
2574                            let val = Self::coerce_and_validate_property_value(
2575                                prop_name,
2576                                val,
2577                                &schema,
2578                                std::slice::from_ref(&edge_type_name),
2579                            )?;
2580
2581                            let pe = pending_e
2582                                .get_mut(var_name)
2583                                .expect("inserted above when absent");
2584                            pe.props.insert(prop_name.clone(), val.clone());
2585                            if pe.partial {
2586                                pe.touched.insert(prop_name.clone());
2587                            }
2588
2589                            // Update the row object so subsequent RHS sees the new value.
2590                            if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2591                                edge.properties.insert(prop_name.clone(), val);
2592                            }
2593                        }
2594                    }
2595                }
2596                SetItem::Labels { variable, labels } => {
2597                    // Flush any pending writes for this var so the Labels op
2598                    // sees latest L0 state. Other variables' pending writes
2599                    // can keep waiting (they're independent).
2600                    self.flush_pending_var(
2601                        variable,
2602                        &mut pending_v,
2603                        &mut pending_e,
2604                        writer,
2605                        prop_manager,
2606                        params,
2607                        ctx,
2608                        tx_l0,
2609                        prefetched,
2610                    )
2611                    .await?;
2612
2613                    if let Some(node_val) = row.get(variable)
2614                        && let Ok(vid) = Self::vid_from_value(node_val)
2615                    {
2616                        reject_if_ephemeral_vid(vid)?;
2617                        let registry = self
2618                            .procedure_registry
2619                            .as_ref()
2620                            .and_then(|pr| pr.plugin_registry());
2621                        reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
2622
2623                        // Get current labels from node value
2624                        let current_labels =
2625                            Self::extract_labels_from_node(node_val).unwrap_or_default();
2626
2627                        // Determine new labels to add (skip duplicates)
2628                        let labels_to_add: Vec<_> = labels
2629                            .iter()
2630                            .filter(|l| !current_labels.contains(l))
2631                            .cloned()
2632                            .collect();
2633
2634                        if !labels_to_add.is_empty() {
2635                            // Resolve the FULL new label set and write it to the
2636                            // TRANSACTION buffer (so the change is transactional
2637                            // and OCC-conflictable), falling back to the context
2638                            // (main) L0 for non-transactional callers. Replace
2639                            // semantics via `set_vertex_labels`.
2640                            let mut new_labels = current_labels;
2641                            new_labels.extend(labels_to_add);
2642                            if let Some(ctx) = ctx {
2643                                let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
2644                                l0.write().set_vertex_labels(vid, &new_labels);
2645                            }
2646
2647                            // Update the node value in the row with the new labels.
2648                            if let Some(Value::Map(obj)) = row.get_mut(variable) {
2649                                let labels_list =
2650                                    new_labels.into_iter().map(Value::String).collect();
2651                                obj.insert("_labels".to_string(), Value::List(labels_list));
2652                            }
2653                        }
2654                    }
2655                }
2656                SetItem::Variable { variable, value }
2657                | SetItem::VariablePlus { variable, value } => {
2658                    // Flush this var's pending writes first so the
2659                    // replace/merge op sees them as latest L0 state.
2660                    self.flush_pending_var(
2661                        variable,
2662                        &mut pending_v,
2663                        &mut pending_e,
2664                        writer,
2665                        prop_manager,
2666                        params,
2667                        ctx,
2668                        tx_l0,
2669                        prefetched,
2670                    )
2671                    .await?;
2672
2673                    let replace = matches!(item, SetItem::Variable { .. });
2674                    let op_str = if replace { "=" } else { "+=" };
2675
2676                    // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
2677                    if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
2678                        continue;
2679                    }
2680                    let rhs = self
2681                        .evaluate_expr(value, row, prop_manager, params, ctx)
2682                        .await?;
2683                    let new_props =
2684                        Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
2685                            anyhow!(
2686                                "SET {} {} expr: right-hand side must evaluate to a map, \
2687                                 node, or relationship",
2688                                variable,
2689                                op_str
2690                            )
2691                        })?;
2692                    self.apply_properties_to_entity(
2693                        variable,
2694                        new_props,
2695                        replace,
2696                        row,
2697                        writer,
2698                        prop_manager,
2699                        params,
2700                        ctx,
2701                        tx_l0,
2702                        prefetched,
2703                    )
2704                    .await?;
2705                }
2706            }
2707        }
2708
2709        // Flush all remaining coalesced writes — one writer call per target.
2710        // Partial entries (no generated columns) call
2711        // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
2712        // but the touched-keys hint drives a MergeInsert at flush. Full
2713        // entries continue through the legacy
2714        // `insert_vertex_with_labels` (Append) path with
2715        // generated-column enrichment.
2716        for (_var_name, mut pv) in pending_v {
2717            if pv.partial {
2718                // Round 12 §C: run the generator enrichment over the
2719                // merged-in-L0 full row, then add the produced generator
2720                // keys to `touched` so they ride the MergeInsert source.
2721                // Idempotent — generators always recompute against the
2722                // post-merge property map.
2723                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
2724                for label_name in &pv.labels {
2725                    self.enrich_properties_with_generated_columns(
2726                        label_name,
2727                        &mut pv.props,
2728                        prop_manager,
2729                        params,
2730                        ctx,
2731                    )
2732                    .await?;
2733                }
2734                for k in pv.props.keys() {
2735                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
2736                        pv.touched.insert(k.clone());
2737                    }
2738                }
2739                writer
2740                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
2741                    .await?;
2742            } else {
2743                for label_name in &pv.labels {
2744                    self.enrich_properties_with_generated_columns(
2745                        label_name,
2746                        &mut pv.props,
2747                        prop_manager,
2748                        params,
2749                        ctx,
2750                    )
2751                    .await?;
2752                }
2753                let _ = writer
2754                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
2755                    .await?;
2756            }
2757        }
2758        for (_var_name, pe) in pending_e {
2759            if pe.partial {
2760                writer
2761                    .insert_edge_partial_full(
2762                        pe.src,
2763                        pe.dst,
2764                        pe.edge_type_id,
2765                        pe.eid,
2766                        pe.props,
2767                        Some(pe.edge_type_name),
2768                        pe.touched,
2769                        tx_l0,
2770                    )
2771                    .await?;
2772            } else {
2773                writer
2774                    .insert_edge(
2775                        pe.src,
2776                        pe.dst,
2777                        pe.edge_type_id,
2778                        pe.eid,
2779                        pe.props,
2780                        Some(pe.edge_type_name),
2781                        tx_l0,
2782                    )
2783                    .await?;
2784            }
2785        }
2786
2787        Ok(())
2788    }
2789
2790    /// Flush pending SET state for a single variable to the writer.
2791    ///
2792    /// Called from the SET loop when about to process a Labels /
2793    /// Variable / VariablePlus item on `var`, so the subsequent op
2794    /// sees latest L0 state and ordering is preserved.
2795    #[expect(clippy::too_many_arguments)]
2796    async fn flush_pending_var(
2797        &self,
2798        var: &str,
2799        pending_v: &mut HashMap<String, PendingVertexSet>,
2800        pending_e: &mut HashMap<String, PendingEdgeSet>,
2801        writer: &Writer,
2802        prop_manager: &PropertyManager,
2803        _params: &HashMap<String, Value>,
2804        ctx: Option<&QueryContext>,
2805        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2806        _prefetched: &Prefetch,
2807    ) -> Result<()> {
2808        if let Some(mut pv) = pending_v.remove(var) {
2809            if pv.partial {
2810                let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
2811                for label_name in &pv.labels {
2812                    self.enrich_properties_with_generated_columns(
2813                        label_name,
2814                        &mut pv.props,
2815                        prop_manager,
2816                        _params,
2817                        ctx,
2818                    )
2819                    .await?;
2820                }
2821                for k in pv.props.keys() {
2822                    if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
2823                        pv.touched.insert(k.clone());
2824                    }
2825                }
2826                writer
2827                    .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
2828                    .await?;
2829            } else {
2830                for label_name in &pv.labels {
2831                    self.enrich_properties_with_generated_columns(
2832                        label_name,
2833                        &mut pv.props,
2834                        prop_manager,
2835                        _params,
2836                        ctx,
2837                    )
2838                    .await?;
2839                }
2840                let _ = writer
2841                    .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
2842                    .await?;
2843            }
2844        }
2845        if let Some(pe) = pending_e.remove(var) {
2846            if pe.partial {
2847                writer
2848                    .insert_edge_partial_full(
2849                        pe.src,
2850                        pe.dst,
2851                        pe.edge_type_id,
2852                        pe.eid,
2853                        pe.props,
2854                        Some(pe.edge_type_name),
2855                        pe.touched,
2856                        tx_l0,
2857                    )
2858                    .await?;
2859            } else {
2860                writer
2861                    .insert_edge(
2862                        pe.src,
2863                        pe.dst,
2864                        pe.edge_type_id,
2865                        pe.eid,
2866                        pe.props,
2867                        Some(pe.edge_type_name),
2868                        tx_l0,
2869                    )
2870                    .await?;
2871            }
2872        }
2873        Ok(())
2874    }
2875
2876    /// Execute REMOVE clause items (property removal or label removal).
2877    ///
2878    /// Property removals are batched per variable to avoid stale reads: when
2879    /// multiple properties of the same entity are removed in one REMOVE clause,
2880    /// we read from storage once, null all specified properties, and write back
2881    /// once. This prevents the second removal from reading stale data that
2882    /// doesn't reflect the first removal's L0 write.
2883    #[expect(clippy::too_many_arguments)]
2884    pub(crate) async fn execute_remove_items_locked(
2885        &self,
2886        items: &[RemoveItem],
2887        row: &mut HashMap<String, Value>,
2888        writer: &Writer,
2889        prop_manager: &PropertyManager,
2890        ctx: Option<&QueryContext>,
2891        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2892        prefetched: &Prefetch,
2893    ) -> Result<()> {
2894        // Collect property names to remove, grouped by variable.
2895        // Use Vec<(String, Vec<String>)> to preserve insertion order.
2896        let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
2897
2898        for item in items {
2899            match item {
2900                RemoveItem::Property(expr) => {
2901                    if let Expr::Property(var_expr, prop_name) = expr
2902                        && let Expr::Variable(var_name) = &**var_expr
2903                    {
2904                        if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
2905                            entry.1.push(prop_name.clone());
2906                        } else {
2907                            prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
2908                        }
2909                    }
2910                }
2911                RemoveItem::Labels { variable, labels } => {
2912                    self.execute_remove_labels(variable, labels, row, ctx)?;
2913                }
2914            }
2915        }
2916
2917        // Execute batched property removals per variable.
2918        for (var_name, prop_names) in &prop_removals {
2919            let Some(node_val) = row.get(var_name) else {
2920                continue;
2921            };
2922
2923            if let Ok(vid) = Self::vid_from_value(node_val) {
2924                // Vertex property removal
2925                let mut props =
2926                    read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
2927
2928                // Only write back if at least one property actually exists
2929                let removed_count = prop_names
2930                    .iter()
2931                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2932                    .count();
2933                let any_exist = removed_count > 0;
2934                if any_exist {
2935                    writer.track_properties_removed(removed_count, tx_l0);
2936                    for prop_name in prop_names {
2937                        props.insert(prop_name.clone(), Value::Null);
2938                    }
2939                }
2940                // Compute effective properties (post-removal) for _all_props
2941                let effective: HashMap<String, Value> = props
2942                    .iter()
2943                    .filter(|(_, v)| !v.is_null())
2944                    .map(|(k, v)| (k.clone(), v.clone()))
2945                    .collect();
2946                if any_exist {
2947                    let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2948                    let _ = writer
2949                        .insert_vertex_with_labels(vid, props, &labels, tx_l0)
2950                        .await?;
2951                }
2952
2953                // Update the row map: set removed props to Null
2954                if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
2955                    for prop_name in prop_names {
2956                        node_map.insert(prop_name.clone(), Value::Null);
2957                    }
2958                    // Set _all_props to the complete effective property set
2959                    node_map.insert("_all_props".to_string(), Value::Map(effective));
2960                }
2961            } else if let Value::Map(map) = node_val {
2962                // Edge property removal (map-encoded)
2963                // Check for non-null _eid to skip OPTIONAL MATCH null edges
2964                let mut edge_effective: Option<HashMap<String, Value>> = None;
2965                if map.get("_eid").is_some_and(|v| !v.is_null()) {
2966                    let ei = self.extract_edge_identity(map)?;
2967                    let mut props =
2968                        read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
2969                            .await?;
2970
2971                    let removed_count = prop_names
2972                        .iter()
2973                        .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2974                        .count();
2975                    let any_exist = removed_count > 0;
2976                    if any_exist {
2977                        writer.track_properties_removed(removed_count, tx_l0);
2978                        for prop_name in prop_names {
2979                            props.insert(prop_name.to_string(), Value::Null);
2980                        }
2981                    }
2982                    // Compute effective properties (post-removal) for _all_props
2983                    edge_effective = Some(
2984                        props
2985                            .iter()
2986                            .filter(|(_, v)| !v.is_null())
2987                            .map(|(k, v)| (k.clone(), v.clone()))
2988                            .collect(),
2989                    );
2990                    if any_exist {
2991                        let edge_type_name = map
2992                            .get("_type")
2993                            .and_then(|v| v.as_str())
2994                            .map(|s| s.to_string())
2995                            .or_else(|| {
2996                                self.storage
2997                                    .schema_manager()
2998                                    .edge_type_name_by_id_unified(ei.edge_type_id)
2999                            });
3000                        writer
3001                            .insert_edge(
3002                                ei.src,
3003                                ei.dst,
3004                                ei.edge_type_id,
3005                                ei.eid,
3006                                props,
3007                                edge_type_name,
3008                                tx_l0,
3009                            )
3010                            .await?;
3011                    }
3012                }
3013
3014                if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3015                    for prop_name in prop_names {
3016                        edge_map.insert(prop_name.clone(), Value::Null);
3017                    }
3018                    if let Some(effective) = edge_effective {
3019                        edge_map.insert("_all_props".to_string(), Value::Map(effective));
3020                    }
3021                }
3022            } else if let Value::Edge(edge) = node_val {
3023                // Edge property removal (Value::Edge)
3024                let eid = edge.eid;
3025                let src = edge.src;
3026                let dst = edge.dst;
3027                let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3028
3029                let mut props =
3030                    read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3031
3032                let removed_count = prop_names
3033                    .iter()
3034                    .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3035                    .count();
3036                if removed_count > 0 {
3037                    writer.track_properties_removed(removed_count, tx_l0);
3038                    for prop_name in prop_names {
3039                        props.insert(prop_name.to_string(), Value::Null);
3040                    }
3041                    writer
3042                        .insert_edge(
3043                            src,
3044                            dst,
3045                            etype,
3046                            eid,
3047                            props,
3048                            Some(edge.edge_type.clone()),
3049                            tx_l0,
3050                        )
3051                        .await?;
3052                }
3053
3054                if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3055                    for prop_name in prop_names {
3056                        edge.properties.insert(prop_name.to_string(), Value::Null);
3057                    }
3058                }
3059            }
3060        }
3061
3062        Ok(())
3063    }
3064
3065    /// Execute label removal.
3066    pub(crate) fn execute_remove_labels(
3067        &self,
3068        variable: &str,
3069        labels: &[String],
3070        row: &mut HashMap<String, Value>,
3071        ctx: Option<&QueryContext>,
3072    ) -> Result<()> {
3073        if let Some(node_val) = row.get(variable)
3074            && let Ok(vid) = Self::vid_from_value(node_val)
3075        {
3076            reject_if_ephemeral_vid(vid)?;
3077            let registry = self
3078                .procedure_registry
3079                .as_ref()
3080                .and_then(|pr| pr.plugin_registry());
3081            reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3082
3083            // Get current labels from node value
3084            let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3085
3086            // Determine which labels to actually remove (only those currently present)
3087            let labels_to_remove: Vec<_> = labels
3088                .iter()
3089                .filter(|l| current_labels.contains(l))
3090                .collect();
3091
3092            if !labels_to_remove.is_empty() {
3093                // Resolve the FULL remaining label set and write it to the
3094                // TRANSACTION buffer (transactional + OCC-conflictable), falling
3095                // back to the context (main) L0 for non-transactional callers.
3096                let remaining_labels: Vec<String> = current_labels
3097                    .iter()
3098                    .filter(|l| !labels_to_remove.contains(l))
3099                    .cloned()
3100                    .collect();
3101                if let Some(ctx) = ctx {
3102                    let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3103                    l0.write().set_vertex_labels(vid, &remaining_labels);
3104                }
3105
3106                // Update the node value in the row with the remaining labels.
3107                if let Some(Value::Map(obj)) = row.get_mut(variable) {
3108                    let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3109                    obj.insert("_labels".to_string(), Value::List(labels_list));
3110                }
3111            }
3112        }
3113        Ok(())
3114    }
3115
3116    /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3117    /// by looking up the type from the L0 buffer's edge endpoints.
3118    fn resolve_edge_type_id_for_edge(
3119        &self,
3120        edge: &crate::types::Edge,
3121        writer: &Writer,
3122        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3123    ) -> Result<u32> {
3124        if !edge.edge_type.is_empty() {
3125            return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3126        }
3127        // Edge type name is empty (e.g., from anonymous MATCH patterns).
3128        // Look up the edge type ID from the L0 buffer's edge endpoints.
3129        if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3130            return Ok(etype);
3131        }
3132        Err(anyhow!(
3133            "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3134            edge.eid
3135        ))
3136    }
3137
3138    /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3139    pub(crate) async fn execute_delete_item_locked(
3140        &self,
3141        val: &Value,
3142        detach: bool,
3143        writer: &Writer,
3144        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3145    ) -> Result<()> {
3146        match val {
3147            Value::Null => {
3148                // DELETE null is a no-op per OpenCypher spec
3149            }
3150            Value::Path(path) => {
3151                // Delete path edges first, then nodes
3152                for edge in &path.edges {
3153                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3154                    writer
3155                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3156                        .await?;
3157                }
3158                for node in &path.nodes {
3159                    self.execute_delete_vertex(
3160                        node.vid,
3161                        detach,
3162                        Some(node.labels.clone()),
3163                        writer,
3164                        tx_l0,
3165                    )
3166                    .await?;
3167                }
3168            }
3169            _ => {
3170                // Try Path reconstruction from Map first (Arrow loses Path type)
3171                if let Ok(path) = Path::try_from(val) {
3172                    for edge in &path.edges {
3173                        let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3174                        writer
3175                            .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3176                            .await?;
3177                    }
3178                    for node in &path.nodes {
3179                        self.execute_delete_vertex(
3180                            node.vid,
3181                            detach,
3182                            Some(node.labels.clone()),
3183                            writer,
3184                            tx_l0,
3185                        )
3186                        .await?;
3187                    }
3188                } else if let Ok(vid) = Self::vid_from_value(val) {
3189                    let labels = Self::extract_labels_from_node(val);
3190                    self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3191                        .await?;
3192                } else if let Value::Map(map) = val {
3193                    self.execute_delete_edge_from_map(map, writer, tx_l0)
3194                        .await?;
3195                } else if let Value::Edge(edge) = val {
3196                    reject_if_ephemeral_eid(edge.eid)?;
3197                    let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3198                    let registry = self
3199                        .procedure_registry
3200                        .as_ref()
3201                        .and_then(|pr| pr.plugin_registry());
3202                    reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3203                    writer
3204                        .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3205                        .await?;
3206                }
3207            }
3208        }
3209        Ok(())
3210    }
3211
3212    /// Execute vertex deletion with optional detach.
3213    pub(crate) async fn execute_delete_vertex(
3214        &self,
3215        vid: Vid,
3216        detach: bool,
3217        labels: Option<Vec<String>>,
3218        writer: &Writer,
3219        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3220    ) -> Result<()> {
3221        reject_if_ephemeral_vid(vid)?;
3222        if let Some(ls) = labels.as_deref() {
3223            let registry = self
3224                .procedure_registry
3225                .as_ref()
3226                .and_then(|pr| pr.plugin_registry());
3227            reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3228        }
3229        if detach {
3230            self.detach_delete_vertex(vid, writer, tx_l0).await?;
3231        } else {
3232            self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3233        }
3234        writer.delete_vertex(vid, labels, tx_l0).await?;
3235        Ok(())
3236    }
3237
3238    /// Check that a vertex has no edges (required for non-DETACH DELETE).
3239    ///
3240    /// Loads the subgraph from storage, then excludes edges that have been
3241    /// tombstoned in the writer's L0 or the transaction's L0. This ensures
3242    /// edges deleted earlier in the same DELETE clause are properly excluded.
3243    pub(crate) async fn check_vertex_has_no_edges(
3244        &self,
3245        vid: Vid,
3246        writer: &Writer,
3247        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3248    ) -> Result<()> {
3249        let schema = self.storage.schema_manager().schema();
3250        let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
3251
3252        // Collect tombstoned edge IDs from both the writer L0 and tx L0.
3253        let mut tombstoned_eids = std::collections::HashSet::new();
3254        {
3255            let writer_l0 = writer.l0_manager.get_current();
3256            let guard = writer_l0.read();
3257            for &eid in guard.tombstones.keys() {
3258                tombstoned_eids.insert(eid);
3259            }
3260        }
3261        if let Some(tx) = tx_l0 {
3262            let guard = tx.read();
3263            for &eid in guard.tombstones.keys() {
3264                tombstoned_eids.insert(eid);
3265            }
3266        }
3267
3268        let out_graph = self
3269            .storage
3270            .load_subgraph_cached(
3271                &[vid],
3272                &edge_type_ids,
3273                1,
3274                uni_store::runtime::Direction::Outgoing,
3275                Some(writer.l0_manager.get_current()),
3276            )
3277            .await?;
3278        let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3279
3280        let in_graph = self
3281            .storage
3282            .load_subgraph_cached(
3283                &[vid],
3284                &edge_type_ids,
3285                1,
3286                uni_store::runtime::Direction::Incoming,
3287                Some(writer.l0_manager.get_current()),
3288            )
3289            .await?;
3290        let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3291
3292        if has_out || has_in {
3293            return Err(anyhow!(
3294                "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
3295                vid
3296            ));
3297        }
3298        Ok(())
3299    }
3300
3301    /// Execute edge deletion from a map representation.
3302    pub(crate) async fn execute_delete_edge_from_map(
3303        &self,
3304        map: &HashMap<String, Value>,
3305        writer: &Writer,
3306        tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3307    ) -> Result<()> {
3308        // Check for non-null _eid to skip OPTIONAL MATCH null edges
3309        if map.get("_eid").is_some_and(|v| !v.is_null()) {
3310            let ei = self.extract_edge_identity(map)?;
3311            reject_if_ephemeral_eid(ei.eid)?;
3312            let registry = self
3313                .procedure_registry
3314                .as_ref()
3315                .and_then(|pr| pr.plugin_registry());
3316            reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
3317            writer
3318                .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
3319                .await?;
3320        }
3321        Ok(())
3322    }
3323
3324    /// Build a scan plan node.
3325    ///
3326    /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
3327    /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
3328    /// - `label_id == 0` without labels: unlabeled → `ScanAll`
3329    fn make_scan_plan(
3330        label_id: u16,
3331        labels: Vec<String>,
3332        variable: String,
3333        filter: Option<Expr>,
3334    ) -> LogicalPlan {
3335        if label_id > 0 {
3336            LogicalPlan::Scan {
3337                label_id,
3338                labels,
3339                variable,
3340                filter,
3341                optional: false,
3342            }
3343        } else if !labels.is_empty() {
3344            // Schemaless label: use ScanMainByLabels to filter by label name
3345            LogicalPlan::ScanMainByLabels {
3346                labels,
3347                variable,
3348                filter,
3349                optional: false,
3350            }
3351        } else {
3352            LogicalPlan::ScanAll {
3353                variable,
3354                filter,
3355                optional: false,
3356            }
3357        }
3358    }
3359
3360    /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
3361    /// already contains prior operators.
3362    fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
3363        if matches!(plan, LogicalPlan::Empty) {
3364            scan
3365        } else {
3366            LogicalPlan::CrossJoin {
3367                left: Box::new(plan),
3368                right: Box::new(scan),
3369            }
3370        }
3371    }
3372
3373    /// Resolve MERGE property map expressions against the current row context.
3374    ///
3375    /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
3376    /// property expressions that reference bound variables. These need to be
3377    /// evaluated to concrete literal values before being converted to filter
3378    /// expressions by `properties_to_expr()`.
3379    async fn resolve_merge_properties(
3380        &self,
3381        properties: &Option<Expr>,
3382        row: &HashMap<String, Value>,
3383        prop_manager: &PropertyManager,
3384        params: &HashMap<String, Value>,
3385        ctx: Option<&QueryContext>,
3386    ) -> Result<Option<Expr>> {
3387        let entries = match properties {
3388            Some(Expr::Map(entries)) => entries,
3389            other => return Ok(other.clone()),
3390        };
3391        let mut resolved = Vec::new();
3392        for (key, val_expr) in entries {
3393            if matches!(val_expr, Expr::Literal(_)) {
3394                resolved.push((key.clone(), val_expr.clone()));
3395            } else {
3396                let value = self
3397                    .evaluate_expr(val_expr, row, prop_manager, params, ctx)
3398                    .await?;
3399                resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
3400            }
3401        }
3402        Ok(Some(Expr::Map(resolved)))
3403    }
3404
3405    /// Convert a runtime Value back to an AST literal expression.
3406    fn value_to_literal_expr(value: &Value) -> Expr {
3407        match value {
3408            Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
3409            Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
3410            Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
3411            Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
3412            Value::Null => Expr::Literal(CypherLiteral::Null),
3413            Value::List(items) => {
3414                Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
3415            }
3416            Value::Map(entries) => Expr::Map(
3417                entries
3418                    .iter()
3419                    .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
3420                    .collect(),
3421            ),
3422            _ => Expr::Literal(CypherLiteral::Null),
3423        }
3424    }
3425
3426    pub(crate) async fn execute_merge_match(
3427        &self,
3428        pattern: &Pattern,
3429        row: &HashMap<String, Value>,
3430        prop_manager: &PropertyManager,
3431        params: &HashMap<String, Value>,
3432        ctx: Option<&QueryContext>,
3433    ) -> Result<Vec<HashMap<String, Value>>> {
3434        // Construct a LogicalPlan for the MATCH part of MERGE
3435        let planner =
3436            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
3437
3438        // We need to construct a CypherQuery to use the planner's plan() method,
3439        // or we can manually construct the LogicalPlan.
3440        // Manual construction is safer as we don't have to round-trip through AST.
3441
3442        let mut plan = LogicalPlan::Empty;
3443        let mut vars_in_scope = Vec::new();
3444
3445        // Add existing bound variables from row to scope
3446        for key in row.keys() {
3447            vars_in_scope.push(key.clone());
3448        }
3449
3450        // Reconstruct Match logic from Planner (simplified for MERGE pattern)
3451        for path in &pattern.paths {
3452            let elements = &path.elements;
3453            let mut i = 0;
3454            while i < elements.len() {
3455                let part = &elements[i];
3456                match part {
3457                    PatternElement::Node(n) => {
3458                        let variable = n.variable.clone().unwrap_or_default();
3459
3460                        // If variable is already bound in the input row, we filter
3461                        let is_bound = !variable.is_empty() && row.contains_key(&variable);
3462
3463                        if is_bound {
3464                            // If bound, we must Scan this specific VID to start the chain
3465                            // Extract VID from row
3466                            let val = row.get(&variable).unwrap();
3467                            let vid = Self::vid_from_value(val)?;
3468
3469                            // In the new storage model, VIDs don't embed label info.
3470                            // We get label from the node value if available, otherwise use 0 to scan all.
3471                            let extracted_labels =
3472                                Self::extract_labels_from_node(val).unwrap_or_default();
3473                            let label_id = {
3474                                let schema = self.storage.schema_manager().schema();
3475                                extracted_labels
3476                                    .first()
3477                                    .and_then(|l| schema.label_id_by_name(l))
3478                                    .unwrap_or(0)
3479                            };
3480
3481                            let resolved_props = self
3482                                .resolve_merge_properties(
3483                                    &n.properties,
3484                                    row,
3485                                    prop_manager,
3486                                    params,
3487                                    ctx,
3488                                )
3489                                .await?;
3490                            let prop_filter =
3491                                planner.properties_to_expr(&variable, &resolved_props);
3492
3493                            // Create a filter expression for VID: variable._vid = vid
3494                            // But our expression engine handles `Expr::Variable` as column.
3495                            // We can inject a filter `id(variable) = vid` if we had `id()` function.
3496                            // Or we use internal property `_vid`.
3497
3498                            // Note: Scan supports `filter`.
3499                            // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
3500
3501                            let vid_filter = Expr::BinaryOp {
3502                                left: Box::new(Expr::Property(
3503                                    Box::new(Expr::Variable(variable.clone())),
3504                                    "_vid".to_string(),
3505                                )),
3506                                op: BinaryOp::Eq,
3507                                right: Box::new(Expr::Literal(CypherLiteral::Integer(
3508                                    vid.as_u64() as i64,
3509                                ))),
3510                            };
3511
3512                            let combined_filter = if let Some(pf) = prop_filter {
3513                                Some(Expr::BinaryOp {
3514                                    left: Box::new(vid_filter),
3515                                    op: BinaryOp::And,
3516                                    right: Box::new(pf),
3517                                })
3518                            } else {
3519                                Some(vid_filter)
3520                            };
3521
3522                            let scan = Self::make_scan_plan(
3523                                label_id,
3524                                extracted_labels,
3525                                variable.clone(),
3526                                combined_filter,
3527                            );
3528                            plan = Self::attach_scan(plan, scan);
3529                        } else {
3530                            let label_id = if n.labels.is_empty() {
3531                                // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
3532                                0
3533                            } else {
3534                                let label_name = &n.labels[0];
3535                                let schema = self.storage.schema_manager().schema();
3536                                if self.config.strict_schema {
3537                                    schema
3538                                        .get_label_case_insensitive(label_name)
3539                                        .map(|m| m.id)
3540                                        .ok_or_else(|| {
3541                                            anyhow!(
3542                                                "Label '{}' is not defined in the schema \
3543                                                 (strict_schema is enabled). \
3544                                                 Declare it with db.schema().label(...).apply() first.",
3545                                                label_name
3546                                            )
3547                                        })?
3548                                } else {
3549                                    // Fall back to label_id 0 (any/schemaless) when not in schema.
3550                                    schema
3551                                        .get_label_case_insensitive(label_name)
3552                                        .map(|m| m.id)
3553                                        .unwrap_or(0)
3554                                }
3555                            };
3556
3557                            let resolved_props = self
3558                                .resolve_merge_properties(
3559                                    &n.properties,
3560                                    row,
3561                                    prop_manager,
3562                                    params,
3563                                    ctx,
3564                                )
3565                                .await?;
3566                            let prop_filter =
3567                                planner.properties_to_expr(&variable, &resolved_props);
3568                            let scan = Self::make_scan_plan(
3569                                label_id,
3570                                n.labels.names().to_vec(),
3571                                variable.clone(),
3572                                prop_filter,
3573                            );
3574                            plan = Self::attach_scan(plan, scan);
3575
3576                            // Add label filters when:
3577                            // 1. Multiple labels with a known schema label: filter for
3578                            //    additional labels (Scan only scans by the first label).
3579                            // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
3580                            //    nodes, so we must filter to only those with the
3581                            //    specified label(s).
3582                            if !n.labels.is_empty()
3583                                && !variable.is_empty()
3584                                && (label_id == 0 || n.labels.len() > 1)
3585                                && let Some(label_filter) =
3586                                    planner.node_filter_expr(&variable, &n.labels, &None)
3587                            {
3588                                plan = LogicalPlan::Filter {
3589                                    input: Box::new(plan),
3590                                    predicate: label_filter,
3591                                    optional_variables: std::collections::HashSet::new(),
3592                                };
3593                            }
3594
3595                            if !variable.is_empty() {
3596                                vars_in_scope.push(variable.clone());
3597                            }
3598                        }
3599
3600                        // Now look ahead for relationship
3601                        i += 1;
3602                        while i < elements.len() {
3603                            if let PatternElement::Relationship(r) = &elements[i] {
3604                                let target_node_part = &elements[i + 1];
3605                                if let PatternElement::Node(n_target) = target_node_part {
3606                                    let schema = self.storage.schema_manager().schema();
3607                                    let mut edge_type_ids = Vec::new();
3608
3609                                    if r.types.is_empty() {
3610                                        return Err(anyhow!("MERGE edge must have a type"));
3611                                    } else if r.types.len() > 1 {
3612                                        return Err(anyhow!(
3613                                            "MERGE does not support multiple edge types"
3614                                        ));
3615                                    } else {
3616                                        let type_name = &r.types[0];
3617                                        let type_id = if self.config.strict_schema {
3618                                            let s = self.storage.schema_manager().schema();
3619                                            s.edge_type_id_by_name_case_insensitive(type_name)
3620                                                .ok_or_else(|| {
3621                                                    anyhow!(
3622                                                        "Edge type '{}' is not defined in the schema \
3623                                                         (strict_schema is enabled).",
3624                                                        type_name
3625                                                    )
3626                                                })?
3627                                        } else {
3628                                            // Schemaless: assign new ID if not found.
3629                                            self.storage
3630                                                .schema_manager()
3631                                                .get_or_assign_edge_type_id(type_name)
3632                                        };
3633                                        edge_type_ids.push(type_id);
3634                                    }
3635
3636                                    // Resolve target label ID. For schemaless labels (not in the
3637                                    // schema), fall back to 0 which means "any label" in traversal.
3638                                    let target_label_id: u16 = if let Some(lbl) =
3639                                        n_target.labels.first()
3640                                    {
3641                                        schema
3642                                            .get_label_case_insensitive(lbl)
3643                                            .map(|m| m.id)
3644                                            .unwrap_or(0)
3645                                    } else if let Some(var) = &n_target.variable {
3646                                        if let Some(val) = row.get(var) {
3647                                            // In the new storage model, get labels from node value
3648                                            if let Some(labels) =
3649                                                Self::extract_labels_from_node(val)
3650                                            {
3651                                                if let Some(first_label) = labels.first() {
3652                                                    schema
3653                                                        .get_label_case_insensitive(first_label)
3654                                                        .map(|m| m.id)
3655                                                        .unwrap_or(0)
3656                                                } else {
3657                                                    // Bound node with no labels — schemaless, any
3658                                                    0
3659                                                }
3660                                            } else if Self::vid_from_value(val).is_ok() {
3661                                                // VID without label info — schemaless, any
3662                                                0
3663                                            } else {
3664                                                return Err(anyhow!(
3665                                                    "Variable {} is not a node",
3666                                                    var
3667                                                ));
3668                                            }
3669                                        } else {
3670                                            return Err(anyhow!(
3671                                                "MERGE pattern node must have a label or be a bound variable"
3672                                            ));
3673                                        }
3674                                    } else {
3675                                        return Err(anyhow!(
3676                                            "MERGE pattern node must have a label"
3677                                        ));
3678                                    };
3679
3680                                    let target_variable =
3681                                        n_target.variable.clone().unwrap_or_default();
3682                                    let source_variable = match &elements[i - 1] {
3683                                        PatternElement::Node(n) => {
3684                                            n.variable.clone().unwrap_or_default()
3685                                        }
3686                                        _ => String::new(),
3687                                    };
3688
3689                                    let is_variable_length = r.range.is_some();
3690                                    let type_name = &r.types[0];
3691
3692                                    // Use TraverseMainByType for schemaless edge types
3693                                    // (same as MATCH planner) so edge properties are loaded
3694                                    // correctly from storage + L0 via the adjacency map.
3695                                    // Regular Traverse only loads properties via
3696                                    // property_manager which doesn't handle schemaless types.
3697                                    let is_schemaless = edge_type_ids.iter().all(|id| {
3698                                        uni_common::core::edge_type::is_schemaless_edge_type(*id)
3699                                    });
3700
3701                                    if is_schemaless {
3702                                        plan = LogicalPlan::TraverseMainByType {
3703                                            type_names: vec![type_name.clone()],
3704                                            input: Box::new(plan),
3705                                            direction: r.direction.clone(),
3706                                            source_variable,
3707                                            target_variable: target_variable.clone(),
3708                                            step_variable: r.variable.clone(),
3709                                            min_hops: r
3710                                                .range
3711                                                .as_ref()
3712                                                .and_then(|r| r.min)
3713                                                .unwrap_or(1)
3714                                                as usize,
3715                                            max_hops: r
3716                                                .range
3717                                                .as_ref()
3718                                                .and_then(|r| r.max)
3719                                                .unwrap_or(1)
3720                                                as usize,
3721                                            optional: false,
3722                                            target_filter: None,
3723                                            path_variable: None,
3724                                            is_variable_length,
3725                                            optional_pattern_vars: std::collections::HashSet::new(),
3726                                            scope_match_variables: std::collections::HashSet::new(),
3727                                            edge_filter_expr: None,
3728                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
3729                                        };
3730                                    } else {
3731                                        // Collect edge property names needed for MERGE filter
3732                                        let mut edge_props = std::collections::HashSet::new();
3733                                        if let Some(Expr::Map(entries)) = &r.properties {
3734                                            for (key, _) in entries {
3735                                                edge_props.insert(key.clone());
3736                                            }
3737                                        }
3738                                        plan = LogicalPlan::Traverse {
3739                                            input: Box::new(plan),
3740                                            edge_type_ids: edge_type_ids.clone(),
3741                                            direction: r.direction.clone(),
3742                                            source_variable,
3743                                            target_variable: target_variable.clone(),
3744                                            target_label_id,
3745                                            step_variable: r.variable.clone(),
3746                                            min_hops: r
3747                                                .range
3748                                                .as_ref()
3749                                                .and_then(|r| r.min)
3750                                                .unwrap_or(1)
3751                                                as usize,
3752                                            max_hops: r
3753                                                .range
3754                                                .as_ref()
3755                                                .and_then(|r| r.max)
3756                                                .unwrap_or(1)
3757                                                as usize,
3758                                            optional: false,
3759                                            target_filter: None,
3760                                            path_variable: None,
3761                                            edge_properties: edge_props,
3762                                            is_variable_length,
3763                                            optional_pattern_vars: std::collections::HashSet::new(),
3764                                            scope_match_variables: std::collections::HashSet::new(),
3765                                            edge_filter_expr: None,
3766                                            path_mode: crate::query::df_graph::nfa::PathMode::Trail,
3767                                            qpp_steps: None,
3768                                        };
3769                                    }
3770
3771                                    // Apply property filters for relationship
3772                                    if r.properties.is_some()
3773                                        && let Some(r_var) = &r.variable
3774                                    {
3775                                        let resolved_rel_props = self
3776                                            .resolve_merge_properties(
3777                                                &r.properties,
3778                                                row,
3779                                                prop_manager,
3780                                                params,
3781                                                ctx,
3782                                            )
3783                                            .await?;
3784                                        if let Some(prop_filter) =
3785                                            planner.properties_to_expr(r_var, &resolved_rel_props)
3786                                        {
3787                                            plan = LogicalPlan::Filter {
3788                                                input: Box::new(plan),
3789                                                predicate: prop_filter,
3790                                                optional_variables: std::collections::HashSet::new(
3791                                                ),
3792                                            };
3793                                        }
3794                                    }
3795
3796                                    // Apply property filters for target node if it was new
3797                                    if !target_variable.is_empty() {
3798                                        let resolved_target_props = self
3799                                            .resolve_merge_properties(
3800                                                &n_target.properties,
3801                                                row,
3802                                                prop_manager,
3803                                                params,
3804                                                ctx,
3805                                            )
3806                                            .await?;
3807                                        if let Some(prop_filter) = planner.properties_to_expr(
3808                                            &target_variable,
3809                                            &resolved_target_props,
3810                                        ) {
3811                                            plan = LogicalPlan::Filter {
3812                                                input: Box::new(plan),
3813                                                predicate: prop_filter,
3814                                                optional_variables: std::collections::HashSet::new(
3815                                                ),
3816                                            };
3817                                        }
3818                                        vars_in_scope.push(target_variable.clone());
3819                                    }
3820
3821                                    if let Some(sv) = &r.variable {
3822                                        vars_in_scope.push(sv.clone());
3823                                    }
3824                                    i += 2;
3825                                } else {
3826                                    break;
3827                                }
3828                            } else {
3829                                break;
3830                            }
3831                        }
3832                    }
3833                    _ => return Err(anyhow!("Pattern must start with a node")),
3834                }
3835            }
3836
3837            // Execute the plan to find all matches, then filter against bound variables in `row`.
3838        }
3839
3840        let db_matches = self
3841            .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
3842            .await?;
3843
3844        // Keep only DB results that are consistent with the input row bindings.
3845        // Skip internal keys (starting with "__") as they are implementation
3846        // artifacts (e.g. __used_edges) and not user-visible variable bindings.
3847        // Also skip the empty-string key (""), which is the placeholder variable
3848        // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
3849        // and must not constrain the current pattern's match.
3850        let final_matches = db_matches
3851            .into_iter()
3852            .filter(|db_match| {
3853                row.iter().all(|(key, val)| {
3854                    if key.is_empty() || key.starts_with("__") {
3855                        return true;
3856                    }
3857                    let Some(db_val) = db_match.get(key) else {
3858                        return true;
3859                    };
3860                    if db_val == val {
3861                        return true;
3862                    }
3863                    // Values differ -- treat as consistent if they represent the same VID
3864                    matches!(
3865                        (Self::vid_from_value(val), Self::vid_from_value(db_val)),
3866                        (Ok(v1), Ok(v2)) if v1 == v2
3867                    )
3868                })
3869            })
3870            .map(|db_match| {
3871                let mut merged = row.clone();
3872                merged.extend(db_match);
3873                merged
3874            })
3875            .collect();
3876
3877        Ok(final_matches)
3878    }
3879
3880    /// Prepare a MERGE pattern for path variable binding.
3881    ///
3882    /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
3883    /// unnamed relationships need internal variable names so that `execute_create_pattern`
3884    /// stores the edge data in the row for later path construction.
3885    ///
3886    /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
3887    fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
3888        let has_path_vars = pattern
3889            .paths
3890            .iter()
3891            .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
3892
3893        if !has_path_vars {
3894            return (pattern.clone(), Vec::new());
3895        }
3896
3897        let mut modified = pattern.clone();
3898        let mut temp_vars = Vec::new();
3899
3900        for path in &mut modified.paths {
3901            if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
3902                continue;
3903            }
3904            for (idx, element) in path.elements.iter_mut().enumerate() {
3905                if let PatternElement::Relationship(r) = element
3906                    && r.variable.as_ref().is_none_or(String::is_empty)
3907                {
3908                    let temp_var = format!("__path_r_{}", idx);
3909                    r.variable = Some(temp_var.clone());
3910                    temp_vars.push(temp_var);
3911                }
3912            }
3913        }
3914
3915        (modified, temp_vars)
3916    }
3917
3918    /// Bind path variables in the result row based on the MERGE pattern.
3919    ///
3920    /// Walks each path in the pattern, collects node/edge values from the row
3921    /// by variable name, and constructs a `Value::Path`.
3922    fn bind_path_variables(
3923        pattern: &Pattern,
3924        row: &mut HashMap<String, Value>,
3925        temp_vars: &[String],
3926    ) {
3927        for path in &pattern.paths {
3928            let Some(path_var) = path.variable.as_ref() else {
3929                continue;
3930            };
3931            if path_var.is_empty() {
3932                continue;
3933            }
3934
3935            let mut nodes = Vec::new();
3936            let mut edges = Vec::new();
3937
3938            for element in &path.elements {
3939                match element {
3940                    PatternElement::Node(n) => {
3941                        if let Some(var) = &n.variable
3942                            && let Some(val) = row.get(var)
3943                            && let Some(node) = Self::value_to_node_for_path(val)
3944                        {
3945                            nodes.push(node);
3946                        }
3947                    }
3948                    PatternElement::Relationship(r) => {
3949                        if let Some(var) = &r.variable
3950                            && let Some(val) = row.get(var)
3951                            && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
3952                        {
3953                            edges.push(edge);
3954                        }
3955                    }
3956                    _ => {}
3957                }
3958            }
3959
3960            if !nodes.is_empty() {
3961                use uni_common::value::Path;
3962                row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
3963            }
3964        }
3965
3966        // Clean up internal temp variables
3967        for var in temp_vars {
3968            row.remove(var);
3969        }
3970    }
3971
3972    /// Convert a Value (Map or Node) to a Node for path construction.
3973    fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
3974        match val {
3975            Value::Node(n) => Some(n.clone()),
3976            Value::Map(map) => {
3977                let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
3978                let labels = if let Some(Value::List(l)) = map.get("_labels") {
3979                    l.iter()
3980                        .filter_map(|v| {
3981                            if let Value::String(s) = v {
3982                                Some(s.clone())
3983                            } else {
3984                                None
3985                            }
3986                        })
3987                        .collect()
3988                } else {
3989                    vec![]
3990                };
3991                let properties: HashMap<String, Value> = map
3992                    .iter()
3993                    .filter(|(k, _)| !k.starts_with('_'))
3994                    .map(|(k, v)| (k.clone(), v.clone()))
3995                    .collect();
3996                Some(uni_common::value::Node {
3997                    vid,
3998                    labels,
3999                    properties,
4000                })
4001            }
4002            _ => None,
4003        }
4004    }
4005
4006    /// Convert a Value (Map or Edge) to an Edge for path construction.
4007    fn value_to_edge_for_path(
4008        val: &Value,
4009        type_names: &[String],
4010    ) -> Option<uni_common::value::Edge> {
4011        match val {
4012            Value::Edge(e) => Some(e.clone()),
4013            Value::Map(map) => {
4014                let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4015                let edge_type = map
4016                    .get("_type_name")
4017                    .and_then(|v| {
4018                        if let Value::String(s) = v {
4019                            Some(s.clone())
4020                        } else {
4021                            None
4022                        }
4023                    })
4024                    .or_else(|| type_names.first().cloned())
4025                    .unwrap_or_default();
4026                let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4027                let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4028                let properties: HashMap<String, Value> = map
4029                    .iter()
4030                    .filter(|(k, _)| !k.starts_with('_'))
4031                    .map(|(k, v)| (k.clone(), v.clone()))
4032                    .collect();
4033                Some(uni_common::value::Edge {
4034                    eid,
4035                    edge_type,
4036                    src,
4037                    dst,
4038                    properties,
4039                })
4040            }
4041            _ => None,
4042        }
4043    }
4044}
4045
4046/// Read a vertex's full property map, preferring `prefetched` over a fresh
4047/// per-row `Backend::scan`.
4048///
4049/// `prefetched` is built once at the top of `apply_mutations` via
4050/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4051/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4052/// same `apply_mutations` invocation (counter increments, same-VID
4053/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4054/// storage state at SET entry. On a miss, fall back to the existing
4055/// per-row path; this preserves correctness for newly created VIDs,
4056/// schemaless rows, multi-label corner cases, and non-Mutation callers
4057/// that pass `&Prefetch::default()`.
4058pub(crate) async fn read_vertex_props_with_prefetch(
4059    vid: Vid,
4060    prefetched: &Prefetch,
4061    prop_manager: &PropertyManager,
4062    ctx: Option<&QueryContext>,
4063) -> Result<uni_common::Properties> {
4064    match prefetched.vertex.get(&vid).cloned() {
4065        Some(mut base) => {
4066            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4067                for (k, v) in l0 {
4068                    base.insert(k, v);
4069                }
4070            }
4071            Ok(base)
4072        }
4073        None => Ok(prop_manager
4074            .get_all_vertex_props_with_ctx(vid, ctx)
4075            .await?
4076            .unwrap_or_default()),
4077    }
4078}
4079
4080/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4081/// in L0 edge props so writes from earlier rows of the same
4082/// `apply_mutations` invocation take precedence. On a miss, fall back to
4083/// the per-EID storage path.
4084pub(crate) async fn read_edge_props_with_prefetch(
4085    eid: Eid,
4086    prefetched: &Prefetch,
4087    prop_manager: &PropertyManager,
4088    ctx: Option<&QueryContext>,
4089) -> Result<uni_common::Properties> {
4090    match prefetched.edge.get(&eid).cloned() {
4091        Some(mut base) => {
4092            if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4093                for (k, v) in l0 {
4094                    base.insert(k, v);
4095                }
4096            }
4097            Ok(base)
4098        }
4099        None => Ok(prop_manager
4100            .get_all_edge_props_with_ctx(eid, ctx)
4101            .await?
4102            .unwrap_or_default()),
4103    }
4104}
4105
4106#[cfg(test)]
4107mod tests {
4108    use super::*;
4109
4110    // ── merge_props tests ────────────────────────────────────────────
4111
4112    #[test]
4113    fn test_merge_props_replace_tombstones_missing_keys() {
4114        let current: HashMap<String, Value> = [
4115            ("name".into(), Value::String("Alice".into())),
4116            ("age".into(), Value::Int(30)),
4117        ]
4118        .into();
4119        let incoming: HashMap<String, Value> =
4120            [("name".into(), Value::String("Bob".into()))].into();
4121
4122        let result = Executor::merge_props(current, incoming, true);
4123        assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4124        assert_eq!(
4125            result.get("age"),
4126            Some(&Value::Null),
4127            "Missing keys should be tombstoned in replace mode"
4128        );
4129    }
4130
4131    #[test]
4132    fn test_merge_props_merge_preserves_existing() {
4133        let current: HashMap<String, Value> = [
4134            ("name".into(), Value::String("Alice".into())),
4135            ("age".into(), Value::Int(30)),
4136        ]
4137        .into();
4138        let incoming: HashMap<String, Value> =
4139            [("city".into(), Value::String("NYC".into()))].into();
4140
4141        let result = Executor::merge_props(current, incoming, false);
4142        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4143        assert_eq!(result.get("age"), Some(&Value::Int(30)));
4144        assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4145    }
4146
4147    #[test]
4148    fn test_merge_props_null_incoming_is_tombstone() {
4149        let current: HashMap<String, Value> =
4150            [("name".into(), Value::String("Alice".into()))].into();
4151        let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4152
4153        // Merge mode: null overwrites
4154        let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4155        assert_eq!(result.get("name"), Some(&Value::Null));
4156
4157        // Replace mode: null is tombstone
4158        let result = Executor::merge_props(current, incoming, true);
4159        assert_eq!(result.get("name"), Some(&Value::Null));
4160    }
4161
4162    #[test]
4163    fn test_merge_props_empty_current() {
4164        let current: HashMap<String, Value> = HashMap::new();
4165        let incoming: HashMap<String, Value> =
4166            [("name".into(), Value::String("Alice".into()))].into();
4167
4168        let result = Executor::merge_props(current, incoming, false);
4169        assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4170        assert_eq!(result.len(), 1);
4171    }
4172
4173    #[test]
4174    fn test_merge_props_empty_incoming_replace_tombstones_all() {
4175        let current: HashMap<String, Value> = [
4176            ("name".into(), Value::String("Alice".into())),
4177            ("age".into(), Value::Int(30)),
4178        ]
4179        .into();
4180        let incoming: HashMap<String, Value> = HashMap::new();
4181
4182        let result = Executor::merge_props(current, incoming, true);
4183        assert_eq!(result.get("name"), Some(&Value::Null));
4184        assert_eq!(result.get("age"), Some(&Value::Null));
4185    }
4186
4187    // ── extract_labels_from_node tests ───────────────────────────────
4188
4189    #[test]
4190    fn test_extract_labels_from_map() {
4191        let mut map = HashMap::new();
4192        map.insert("_vid".into(), Value::Int(1));
4193        map.insert(
4194            "_labels".into(),
4195            Value::List(vec![
4196                Value::String("Person".into()),
4197                Value::String("Employee".into()),
4198            ]),
4199        );
4200        let val = Value::Map(map);
4201
4202        let labels = Executor::extract_labels_from_node(&val);
4203        assert_eq!(
4204            labels,
4205            Some(vec!["Person".to_string(), "Employee".to_string()])
4206        );
4207    }
4208
4209    #[test]
4210    fn test_extract_labels_from_value_node() {
4211        let node = uni_common::Node {
4212            vid: uni_common::core::id::Vid::from(1u64),
4213            labels: vec!["Person".to_string()],
4214            properties: HashMap::new(),
4215        };
4216        let labels = Executor::extract_labels_from_node(&Value::Node(node));
4217        assert_eq!(labels, Some(vec!["Person".to_string()]));
4218    }
4219
4220    #[test]
4221    fn test_extract_labels_non_node_returns_none() {
4222        assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4223        assert_eq!(
4224            Executor::extract_labels_from_node(&Value::String("hello".into())),
4225            None
4226        );
4227    }
4228
4229    // ── extract_user_properties_from_value tests ─────────────────────
4230
4231    #[test]
4232    fn test_extract_user_props_strips_internal_keys() {
4233        let mut map = HashMap::new();
4234        map.insert("_vid".into(), Value::Int(1));
4235        map.insert(
4236            "_labels".into(),
4237            Value::List(vec![Value::String("Person".into())]),
4238        );
4239        map.insert("name".into(), Value::String("Alice".into()));
4240        map.insert("age".into(), Value::Int(30));
4241
4242        let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4243        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4244        assert_eq!(props.get("age"), Some(&Value::Int(30)));
4245        assert!(!props.contains_key("_vid"));
4246        assert!(!props.contains_key("_labels"));
4247    }
4248
4249    #[test]
4250    fn test_extract_user_props_plain_map_returns_as_is() {
4251        let mut map = HashMap::new();
4252        map.insert("key".into(), Value::String("value".into()));
4253
4254        let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
4255        assert_eq!(props, map);
4256    }
4257
4258    #[test]
4259    fn test_extract_user_props_from_value_node() {
4260        let mut properties = HashMap::new();
4261        properties.insert("name".into(), Value::String("Alice".into()));
4262        let node = uni_common::Node {
4263            vid: uni_common::core::id::Vid::from(1u64),
4264            labels: vec!["Person".to_string()],
4265            properties,
4266        };
4267        let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
4268        assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4269    }
4270}