uni_query/query/executor/write.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17 DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18 SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32 eid: Eid,
33 src: Vid,
34 dst: Vid,
35 edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44 vid: Vid,
45 labels: Vec<String>,
46 /// Full property map (storage union L0 from
47 /// `get_all_vertex_props_with_ctx` plus the touched values applied
48 /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49 /// tells the flush which columns to send to Lance via MergeInsert.
50 props: HashMap<String, Value>,
51 /// `true` when the SET should flush via the partial-column MergeInsert
52 /// path: set when `UniConfig::partial_lance_writes` is on AND the
53 /// label has no generated columns. Generated-column labels still need
54 /// the full-row Append so the regenerated values land.
55 partial: bool,
56 /// Set of property keys touched by this statement. Threaded into L0
57 /// so the flush emits a `MergeInsertBuilder` source with exactly
58 /// these columns. Empty when `partial == false`.
59 touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64 src: Vid,
65 dst: Vid,
66 edge_type_id: u32,
67 eid: Eid,
68 edge_type_name: String,
69 /// `true` when the SET should flush via the partial-column
70 /// MergeInsert path on the per-edge-type delta tables (Round 12
71 /// §A). Set when `UniConfig::partial_lance_writes` is on.
72 partial: bool,
73 /// Property keys touched by this statement. Threaded into L0 so
74 /// the flush emits a `MergeInsertBuilder` source with exactly
75 /// these columns. Empty when `partial == false`.
76 touched: HashSet<String>,
77 props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84 if vid.is_ephemeral() {
85 return Err(anyhow::Error::from(
86 uni_common::UniError::EphemeralWriteAttempt {
87 kind: "node",
88 id: vid.transient_id().unwrap_or(vid.as_u64()),
89 },
90 ));
91 }
92 Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97 match val {
98 Value::Null => "Null",
99 Value::Bool(_) => "Bool",
100 Value::Int(_) => "Int",
101 Value::Float(_) => "Float",
102 Value::String(_) => "String",
103 Value::Bytes(_) => "Bytes",
104 Value::List(_) => "List",
105 Value::Map(_) => "Map",
106 Value::Node(_) => "Node",
107 Value::Edge(_) => "Edge",
108 Value::Path(_) => "Path",
109 Value::Vector(_) => "Vector",
110 Value::Temporal(_) => "Temporal",
111 _ => "value",
112 }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117 if eid.is_ephemeral() {
118 return Err(anyhow::Error::from(
119 uni_common::UniError::EphemeralWriteAttempt {
120 kind: "edge",
121 id: eid.transient_id().unwrap_or(eid.as_u64()),
122 },
123 ));
124 }
125 Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150 labels: &[String],
151 op: &str,
152) -> Result<()> {
153 let Some(registry) = registry else {
154 return Ok(());
155 };
156 for label in labels {
157 if registry.virtual_label_by_name(label).is_some() {
158 return Err(anyhow!(
159 "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160 labels are read-only; write back via the originating catalog instead"
161 ));
162 }
163 }
164 Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178 edge_type_id: u32,
179 op: &str,
180) -> Result<()> {
181 let Some(registry) = registry else {
182 return Ok(());
183 };
184 if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185 return Err(anyhow!(
186 "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187 types are read-only; write back via the originating catalog instead",
188 entry.name
189 ));
190 }
191 Ok(())
192}
193
194impl Executor {
195 /// Extracts labels from a node value.
196 ///
197 /// Handles both `Value::Map` (with a `_labels` list field) and
198 /// `Value::Node` (with a `labels` vec field).
199 ///
200 /// Returns `None` when the value is not a node or has no labels.
201 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202 match node_val {
203 Value::Map(map) => {
204 // Map-encoded node: look for _labels array
205 if let Some(Value::List(labels_arr)) = map.get("_labels") {
206 let labels: Vec<String> = labels_arr
207 .iter()
208 .filter_map(|v| v.as_str().map(|s| s.to_string()))
209 .collect();
210 if !labels.is_empty() {
211 return Some(labels);
212 }
213 }
214 None
215 }
216 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217 _ => None,
218 }
219 }
220
221 /// Extracts user-visible properties from a value that represents a node or edge.
222 ///
223 /// Strips internal bookkeeping keys (those prefixed with `_` or named
224 /// `ext_id`) from map-encoded entities and returns only the user-facing
225 /// property key-value pairs.
226 ///
227 /// Returns `None` when `val` is not a map, node, or edge.
228 pub(crate) fn extract_user_properties_from_value(
229 val: &Value,
230 ) -> Option<HashMap<String, Value>> {
231 match val {
232 Value::Map(map) => {
233 // Distinguish entity-encoded maps from plain map literals.
234 // A node map has both `_vid` and `_labels`.
235 // An edge map has `_eid`, `_src`, and `_dst`.
236 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237 let is_edge_map = map.contains_key("_eid")
238 && map.contains_key("_src")
239 && map.contains_key("_dst");
240
241 if is_node_map || is_edge_map {
242 // Filter out internal bookkeeping keys
243 let user_props: HashMap<String, Value> = map
244 .iter()
245 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246 .map(|(k, v)| (k.clone(), v.clone()))
247 .collect();
248 // When mutation output omits dotted property columns, user
249 // properties live inside `_all_props` rather than at the
250 // top level of the entity map.
251 if user_props.is_empty()
252 && let Some(Value::Map(all_props)) = map.get("_all_props")
253 {
254 return Some(all_props.clone());
255 }
256 Some(user_props)
257 } else {
258 // Plain map literal — return as-is
259 Some(map.clone())
260 }
261 }
262 Value::Node(node) => Some(node.properties.clone()),
263 Value::Edge(edge) => Some(edge.properties.clone()),
264 _ => None,
265 }
266 }
267
268 /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269 ///
270 /// When `replace` is `true` the entity's property set is replaced: keys absent
271 /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272 /// layer removes them. When `replace` is `false` the map is merged: keys in
273 /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274 /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275 /// modes.
276 ///
277 /// Labels are never altered — the spec states that `SET n = map` replaces
278 /// properties only.
279 ///
280 /// # Errors
281 ///
282 /// Returns an error if the entity cannot be found in the storage layer, or
283 /// if the writer fails to persist the updated properties.
284 #[expect(clippy::too_many_arguments)]
285 async fn apply_properties_to_entity(
286 &self,
287 variable: &str,
288 new_props: HashMap<String, Value>,
289 replace: bool,
290 row: &mut HashMap<String, Value>,
291 writer: &Writer,
292 prop_manager: &PropertyManager,
293 params: &HashMap<String, Value>,
294 ctx: Option<&QueryContext>,
295 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296 prefetched: &Prefetch,
297 ) -> Result<()> {
298 // Clone the target so we can hold &row references elsewhere.
299 let target = row.get(variable).cloned();
300
301 // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302 // forms, mirroring the per-property SET path (issue #68).
303 let schema = self.storage.schema_manager().schema();
304
305 match target {
306 Some(Value::Node(ref node)) => {
307 let vid = node.vid;
308 let labels = node.labels.clone();
309 let current =
310 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311 let write_props = Self::merge_props(current, new_props, replace);
312 let mut enriched = write_props.clone();
313 for label_name in &labels {
314 self.enrich_properties_with_generated_columns(
315 label_name,
316 &mut enriched,
317 prop_manager,
318 params,
319 ctx,
320 )
321 .await?;
322 }
323 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324 let _ = writer
325 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326 .await?;
327 // Update the in-memory row binding
328 if let Some(Value::Node(n)) = row.get_mut(variable) {
329 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330 }
331 }
332 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333 let vid = Self::vid_from_value(node_val)?;
334 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335 let current =
336 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337 let write_props = Self::merge_props(current, new_props, replace);
338 let mut enriched = write_props.clone();
339 for label_name in &labels {
340 self.enrich_properties_with_generated_columns(
341 label_name,
342 &mut enriched,
343 prop_manager,
344 params,
345 ctx,
346 )
347 .await?;
348 }
349 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350 let _ = writer
351 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352 .await?;
353 // Update the in-memory map-encoded node binding
354 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355 // Remove old user property keys, keep internal fields
356 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357 // Build effective (non-null) properties
358 let effective: HashMap<String, Value> =
359 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360 for (k, v) in &effective {
361 node_map.insert(k.clone(), v.clone());
362 }
363 // Replace _all_props to reflect the complete property set
364 node_map.insert("_all_props".to_string(), Value::Map(effective));
365 }
366 }
367 Some(Value::Edge(ref edge)) => {
368 let eid = edge.eid;
369 let src = edge.src;
370 let dst = edge.dst;
371 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372 let current =
373 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374 let write_props = Self::merge_props(current, new_props, replace);
375 let write_props = Self::coerce_and_validate_props(
376 write_props,
377 &schema,
378 std::slice::from_ref(&edge.edge_type),
379 )?;
380 writer
381 .insert_edge(
382 src,
383 dst,
384 etype,
385 eid,
386 write_props.clone(),
387 Some(edge.edge_type.clone()),
388 tx_l0,
389 )
390 .await?;
391 // Update the in-memory row binding
392 if let Some(Value::Edge(e)) = row.get_mut(variable) {
393 e.properties = write_props
394 .into_iter()
395 .filter(|(_, v)| !v.is_null())
396 .collect();
397 }
398 }
399 Some(Value::Map(ref map))
400 if map.contains_key("_eid")
401 && map.contains_key("_src")
402 && map.contains_key("_dst") =>
403 {
404 let ei = self.extract_edge_identity(map)?;
405 let current =
406 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407 let write_props = Self::merge_props(current, new_props, replace);
408 let edge_type_name = map
409 .get("_type")
410 .and_then(|v| v.as_str())
411 .map(|s| s.to_string())
412 .or_else(|| {
413 self.storage
414 .schema_manager()
415 .edge_type_name_by_id_unified(ei.edge_type_id)
416 });
417 let write_props = match &edge_type_name {
418 Some(name) => Self::coerce_and_validate_props(
419 write_props,
420 &schema,
421 std::slice::from_ref(name),
422 )?,
423 None => write_props,
424 };
425 writer
426 .insert_edge(
427 ei.src,
428 ei.dst,
429 ei.edge_type_id,
430 ei.eid,
431 write_props.clone(),
432 edge_type_name,
433 tx_l0,
434 )
435 .await?;
436 // Update the in-memory map-encoded edge binding
437 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438 edge_map.retain(|k, _| k.starts_with('_'));
439 let effective: HashMap<String, Value> = write_props
440 .into_iter()
441 .filter(|(_, v)| !v.is_null())
442 .collect();
443 for (k, v) in &effective {
444 edge_map.insert(k.clone(), v.clone());
445 }
446 // Replace _all_props to reflect the complete property set
447 edge_map.insert("_all_props".to_string(), Value::Map(effective));
448 }
449 }
450 _ => {
451 // No matching entity — nothing to do (caller already guarded against Null)
452 }
453 }
454 Ok(())
455 }
456
457 /// Computes the property map to write given current storage state and the
458 /// incoming change map.
459 ///
460 /// When `replace` is `true`, keys present in `current` but absent from
461 /// `incoming` are tombstoned with `Value::Null`. Null values inside
462 /// `incoming` are always preserved as explicit tombstones.
463 ///
464 /// When `replace` is `false`, `current` is the base and `incoming` is
465 /// merged on top: each key in `incoming` overwrites or tombstones the
466 /// corresponding entry in `current`.
467 fn merge_props(
468 current: HashMap<String, Value>,
469 incoming: HashMap<String, Value>,
470 replace: bool,
471 ) -> HashMap<String, Value> {
472 if replace {
473 // Start from the non-null incoming entries only.
474 let mut result: HashMap<String, Value> = incoming
475 .iter()
476 .filter(|(_, v)| !v.is_null())
477 .map(|(k, v)| (k.clone(), v.clone()))
478 .collect();
479 // Tombstone every current key that is absent from incoming OR explicitly
480 // set to null in incoming (both mean "delete this property").
481 for k in current.keys() {
482 if incoming.get(k).is_none_or(|v| v.is_null()) {
483 result.insert(k.clone(), Value::Null);
484 }
485 }
486 result
487 } else {
488 // Merge: start from current and apply incoming on top
489 let mut result = current;
490 result.extend(incoming);
491 result
492 }
493 }
494
495 /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497 let eid = Eid::from(
498 map.get("_eid")
499 .and_then(|v| v.as_u64())
500 .ok_or_else(|| anyhow!("Invalid _eid"))?,
501 );
502 let src = Vid::from(
503 map.get("_src")
504 .and_then(|v| v.as_u64())
505 .ok_or_else(|| anyhow!("Invalid _src"))?,
506 );
507 let dst = Vid::from(
508 map.get("_dst")
509 .and_then(|v| v.as_u64())
510 .ok_or_else(|| anyhow!("Invalid _dst"))?,
511 );
512 let edge_type_id = self.resolve_edge_type_id(
513 map.get("_type")
514 .or_else(|| map.get("_type_name"))
515 .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516 )?;
517 Ok(EdgeIdentity {
518 eid,
519 src,
520 dst,
521 edge_type_id,
522 })
523 }
524
525 /// Resolve edge type ID from a Value, supporting both Int and String representations.
526 /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527 ///
528 /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529 /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530 /// where the edge type was just created and may not be in the read-only lookup yet.
531 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532 match type_val {
533 Value::Int(i) => Ok(*i as u32),
534 Value::String(name) => {
535 if self.config.strict_schema {
536 let schema = self.storage.schema_manager().schema();
537 schema
538 .edge_type_id_by_name_case_insensitive(name)
539 .ok_or_else(|| {
540 anyhow!(
541 "Edge type '{}' is not defined in the schema \
542 (strict_schema is enabled). \
543 Declare it with db.schema().edge_type(...).apply() first.",
544 name
545 )
546 })
547 } else {
548 // Schemaless: assign new ID if not found in schema or registry.
549 Ok(self
550 .storage
551 .schema_manager()
552 .get_or_assign_edge_type_id(name))
553 }
554 }
555 _ => Err(anyhow!(
556 "Invalid _type value: expected Int or String, got {:?}",
557 type_val
558 )),
559 }
560 }
561
562 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563 if let Some(writer_arc) = &self.writer {
564 // Flush first while holding the lock
565 {
566 let writer: &uni_store::Writer = writer_arc.as_ref();
567 writer.flush_to_l1(None).await?;
568 } // Drop lock before compacting to avoid blocking reads/writes
569
570 // Compaction can run without holding the writer lock
571 let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572 let compaction_results = compactor.compact_all().await?;
573
574 // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575 let am = self.storage.adjacency_manager();
576 let schema = self.storage.schema_manager().schema();
577 for info in compaction_results {
578 // Convert string direction to Direction enum
579 let direction = match info.direction.as_str() {
580 "fwd" => uni_store::storage::direction::Direction::Outgoing,
581 "bwd" => uni_store::storage::direction::Direction::Incoming,
582 _ => continue,
583 };
584
585 // Get edge_type_id
586 if let Some(edge_type_id) =
587 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588 {
589 // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591 }
592 }
593 }
594 Ok(())
595 }
596
597 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598 if let Some(writer_arc) = &self.writer {
599 let writer: &uni_store::Writer = writer_arc.as_ref();
600 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601 }
602 Ok(())
603 }
604
605 pub(crate) async fn execute_copy_to(
606 &self,
607 identifier: &str,
608 path: &str,
609 format: &str,
610 options: &HashMap<String, Value>,
611 ) -> Result<usize> {
612 // Check schema to determine if identifier is an edge type or vertex label
613 let schema = self.storage.schema_manager().schema();
614
615 // Try as edge type first
616 if schema.get_edge_type_case_insensitive(identifier).is_some() {
617 return self
618 .export_edge_type_in_format(identifier, path, format)
619 .await;
620 }
621
622 // Try as vertex label
623 if schema.get_label_case_insensitive(identifier).is_some() {
624 return self
625 .export_vertex_label_in_format(identifier, path, format, options)
626 .await;
627 }
628
629 // Neither edge type nor vertex label found
630 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631 }
632
633 async fn export_vertex_label_in_format(
634 &self,
635 label: &str,
636 path: &str,
637 format: &str,
638 _options: &HashMap<String, Value>,
639 ) -> Result<usize> {
640 match format {
641 "parquet" => self.export_vertex_label(label, path).await,
642 "csv" => {
643 let mut stream = self
644 .storage
645 .scan_vertex_table_stream(label)
646 .await?
647 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649 // Collect all batches
650 let mut all_rows = Vec::new();
651 let mut column_names = Vec::new();
652
653 // Iterate stream using StreamExt
654 use futures::StreamExt;
655 while let Some(batch_result) = stream.next().await {
656 let batch = batch_result?;
657
658 // Get column names from first batch
659 if column_names.is_empty() {
660 column_names = batch
661 .schema()
662 .fields()
663 .iter()
664 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665 .map(|f| f.name().clone())
666 .collect();
667 }
668
669 // Convert batch to rows
670 for row_idx in 0..batch.num_rows() {
671 let mut row = Vec::new();
672 for field in batch.schema().fields() {
673 if field.name().starts_with('_') || field.name() == "ext_id" {
674 continue;
675 }
676
677 let col_idx = batch.schema().index_of(field.name())?;
678 let column = batch.column(col_idx);
679 let value = self.arrow_value_to_json(column, row_idx)?;
680
681 // Convert value to CSV string
682 let csv_value = match value {
683 Value::Null => String::new(),
684 Value::Bool(b) => b.to_string(),
685 Value::Int(i) => i.to_string(),
686 Value::Float(f) => f.to_string(),
687 Value::String(s) => s,
688 _ => format!("{value}"),
689 };
690 row.push(csv_value);
691 }
692 all_rows.push(row);
693 }
694 }
695
696 // Write CSV
697 let file = std::fs::File::create(path)?;
698 let mut wtr = csv::Writer::from_writer(file);
699
700 // Write headers
701 log::debug!("CSV export headers: {:?}", column_names);
702 wtr.write_record(&column_names)?;
703
704 // Write rows
705 for (i, row) in all_rows.iter().enumerate() {
706 log::debug!("CSV export row {}: {:?}", i, row);
707 wtr.write_record(row)?;
708 }
709
710 wtr.flush()?;
711 Ok(all_rows.len())
712 }
713 _ => Err(anyhow!(
714 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715 format
716 )),
717 }
718 }
719
720 async fn export_edge_type_in_format(
721 &self,
722 edge_type: &str,
723 path: &str,
724 format: &str,
725 ) -> Result<usize> {
726 match format {
727 "parquet" => self.export_edge_type(edge_type, path).await,
728 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729 _ => Err(anyhow!(
730 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731 format
732 )),
733 }
734 }
735
736 /// Write a stream of record batches to a Parquet file.
737 /// Returns the total number of rows written, or 0 if the stream is empty.
738 async fn write_batches_to_parquet(
739 mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740 path: &str,
741 entity_description: &str,
742 ) -> Result<usize> {
743 use futures::TryStreamExt;
744
745 // Get first batch to determine schema and create writer
746 let first_batch = match stream.try_next().await? {
747 Some(batch) => batch,
748 None => {
749 log::info!("No data to export from {}", entity_description);
750 return Ok(0);
751 }
752 };
753
754 // Create Parquet writer using schema from first batch
755 let file = std::fs::File::create(path)?;
756 let arrow_schema = first_batch.schema();
757 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759 // Write first batch
760 let mut count = first_batch.num_rows();
761 writer.write(&first_batch)?;
762
763 // Write remaining batches
764 while let Some(batch) = stream.try_next().await? {
765 count += batch.num_rows();
766 writer.write(&batch)?;
767 }
768
769 writer.close()?;
770
771 log::info!(
772 "Exported {} rows from {} to '{}'",
773 count,
774 entity_description,
775 path
776 );
777 Ok(count)
778 }
779
780 /// Export vertices of a specific label to Parquet
781 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782 let stream = self
783 .storage
784 .scan_vertex_table_stream(label)
785 .await?
786 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789 }
790
791 /// Export edges of a specific type to Parquet
792 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793 let schema = self.storage.schema_manager().schema();
794 if !schema.edge_types.contains_key(edge_type) {
795 return Err(anyhow!("Edge type '{}' not found", edge_type));
796 }
797
798 let filter = format!("type = '{}'", edge_type);
799 let stream = self
800 .storage
801 .scan_main_edge_table_stream(Some(&filter))
802 .await?
803 .ok_or_else(|| anyhow!("No edge data found"))?;
804
805 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806 }
807
808 pub(crate) async fn execute_copy_from(
809 &self,
810 label: &str,
811 path: &str,
812 format: &str,
813 options: &HashMap<String, Value>,
814 ) -> Result<usize> {
815 // Read data from file
816 let batches = match format {
817 "parquet" => self.read_parquet_file(path)?,
818 "csv" => self.read_csv_file(path, label, options)?,
819 _ => {
820 return Err(anyhow!(
821 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822 format
823 ));
824 }
825 };
826
827 // Get writer
828 let writer_arc = self
829 .writer
830 .as_ref()
831 .ok_or_else(|| anyhow!("No writer available"))?;
832
833 let db_schema = self.storage.schema_manager().schema();
834
835 // Check if this is a label (vertex) or edge type
836 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838 if is_edge {
839 // Import edges
840 let edge_type_id = db_schema
841 .edge_type_id_by_name(label)
842 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844 // Get src and dst column names from options
845 let src_col = options
846 .get("src_col")
847 .and_then(|v| v.as_str())
848 .unwrap_or("src");
849 let dst_col = options
850 .get("dst_col")
851 .and_then(|v| v.as_str())
852 .unwrap_or("dst");
853
854 // §5.7 of concurrent_writer.md: writer is hoisted above the row
855 // loop now that there is no per-row lock acquisition cost.
856 let writer: &uni_store::Writer = writer_arc.as_ref();
857 let mut total_rows = 0;
858 for batch in batches {
859 let num_rows = batch.num_rows();
860 // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861 let eids = writer.allocate_eids(num_rows).await?;
862
863 for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864 let mut properties = HashMap::new();
865 let mut src_vid: Option<Vid> = None;
866 let mut dst_vid: Option<Vid> = None;
867
868 // Extract properties and VIDs from each column
869 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870 let col_name = field.name();
871 let column = batch.column(col_idx);
872 let value = self.arrow_value_to_json(column, row_idx)?;
873
874 if col_name == src_col {
875 let raw = value.as_u64().unwrap_or_else(|| {
876 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877 });
878 src_vid = Some(Vid::new(raw));
879 } else if col_name == dst_col {
880 let raw = value.as_u64().unwrap_or_else(|| {
881 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882 });
883 dst_vid = Some(Vid::new(raw));
884 } else if !col_name.starts_with('_') && !value.is_null() {
885 properties.insert(col_name.clone(), value);
886 }
887 }
888
889 let src = src_vid
890 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891 let dst = dst_vid
892 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894 writer
895 .insert_edge(
896 src,
897 dst,
898 edge_type_id,
899 eid,
900 properties,
901 Some(label.to_string()),
902 None,
903 )
904 .await?;
905
906 total_rows += 1;
907 }
908 }
909
910 log::info!(
911 "Imported {} edge rows from '{}' into edge type '{}'",
912 total_rows,
913 path,
914 label
915 );
916
917 // Flush to persist edges
918 if total_rows > 0 {
919 writer.flush_to_l1(None).await?;
920 }
921
922 Ok(total_rows)
923 } else {
924 // Import vertices
925 // Validate the label exists in schema
926 db_schema
927 .label_id_by_name_case_insensitive(label)
928 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930 // §5.7 of concurrent_writer.md: writer is hoisted above the row
931 // loop now that there is no per-row lock acquisition cost.
932 let writer: &uni_store::Writer = writer_arc.as_ref();
933 let mut total_rows = 0;
934 for batch in batches {
935 let num_rows = batch.num_rows();
936 // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937 let vids = writer.allocate_vids(num_rows).await?;
938
939 // Convert Arrow batch to rows
940 for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941 let mut properties = HashMap::new();
942
943 // Extract properties from each column
944 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945 let col_name = field.name();
946
947 // Skip internal columns
948 if col_name.starts_with('_') {
949 continue;
950 }
951
952 let column = batch.column(col_idx);
953 let value = self.arrow_value_to_json(column, row_idx)?;
954
955 if !value.is_null() {
956 properties.insert(col_name.clone(), value);
957 }
958 }
959
960 let _ = writer
961 .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962 .await?;
963
964 total_rows += 1;
965 }
966 }
967
968 log::info!(
969 "Imported {} rows from '{}' into label '{}'",
970 total_rows,
971 path,
972 label
973 );
974
975 // Flush to persist vertices
976 if total_rows > 0 {
977 writer.flush_to_l1(None).await?;
978 }
979
980 Ok(total_rows)
981 }
982 }
983
984 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985 use arrow_array::Array;
986 use arrow_schema::DataType as ArrowDataType;
987
988 if column.is_null(row_idx) {
989 return Ok(Value::Null);
990 }
991
992 match column.data_type() {
993 ArrowDataType::Utf8 => {
994 let array = column
995 .as_any()
996 .downcast_ref::<arrow_array::StringArray>()
997 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998 Ok(Value::String(array.value(row_idx).to_string()))
999 }
1000 ArrowDataType::Int32 => {
1001 let array = column
1002 .as_any()
1003 .downcast_ref::<arrow_array::Int32Array>()
1004 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005 Ok(Value::Int(array.value(row_idx) as i64))
1006 }
1007 ArrowDataType::Int64 => {
1008 let array = column
1009 .as_any()
1010 .downcast_ref::<arrow_array::Int64Array>()
1011 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012 Ok(Value::Int(array.value(row_idx)))
1013 }
1014 ArrowDataType::Float32 => {
1015 let array = column
1016 .as_any()
1017 .downcast_ref::<arrow_array::Float32Array>()
1018 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019 Ok(Value::Float(array.value(row_idx) as f64))
1020 }
1021 ArrowDataType::Float64 => {
1022 let array = column
1023 .as_any()
1024 .downcast_ref::<arrow_array::Float64Array>()
1025 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026 Ok(Value::Float(array.value(row_idx)))
1027 }
1028 ArrowDataType::Boolean => {
1029 let array = column
1030 .as_any()
1031 .downcast_ref::<arrow_array::BooleanArray>()
1032 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033 Ok(Value::Bool(array.value(row_idx)))
1034 }
1035 ArrowDataType::UInt64 => {
1036 let array = column
1037 .as_any()
1038 .downcast_ref::<arrow_array::UInt64Array>()
1039 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040 Ok(Value::Int(array.value(row_idx) as i64))
1041 }
1042 _ => {
1043 // For other types, try to convert to string
1044 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045 if let Some(arr) = array {
1046 Ok(Value::String(arr.value(row_idx).to_string()))
1047 } else {
1048 Ok(Value::Null)
1049 }
1050 }
1051 }
1052 }
1053
1054 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055 let file = std::fs::File::open(path)?;
1056 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057 .build()?;
1058 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059 }
1060
1061 fn read_csv_file(
1062 &self,
1063 path: &str,
1064 label: &str,
1065 options: &HashMap<String, Value>,
1066 ) -> Result<Vec<arrow_array::RecordBatch>> {
1067 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069 use std::sync::Arc;
1070
1071 // Parse CSV options
1072 let has_headers = options
1073 .get("headers")
1074 .and_then(|v| v.as_bool())
1075 .unwrap_or(true);
1076
1077 // Read CSV file
1078 let file = std::fs::File::open(path)?;
1079 let mut rdr = csv::ReaderBuilder::new()
1080 .has_headers(has_headers)
1081 .from_reader(file);
1082
1083 // Get schema for type conversion
1084 let db_schema = self.storage.schema_manager().schema();
1085 let properties = db_schema.properties.get(label);
1086
1087 // Collect all rows first to determine schema
1088 let mut rows: Vec<Vec<String>> = Vec::new();
1089 let headers: Vec<String> = if has_headers {
1090 rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091 } else {
1092 Vec::new()
1093 };
1094
1095 for result in rdr.records() {
1096 let record = result?;
1097 rows.push(record.iter().map(|s| s.to_string()).collect());
1098 }
1099
1100 if rows.is_empty() {
1101 return Ok(Vec::new());
1102 }
1103
1104 // Build Arrow schema with proper types based on DB schema
1105 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106 let col_names: Vec<String> = if has_headers {
1107 headers
1108 } else {
1109 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110 };
1111
1112 for name in &col_names {
1113 let arrow_type = if let Some(props) = properties {
1114 if let Some(prop_meta) = props.get(name) {
1115 match prop_meta.r#type {
1116 DataType::Int32 => ArrowDataType::Int32,
1117 DataType::Int64 => ArrowDataType::Int64,
1118 DataType::Float32 => ArrowDataType::Float32,
1119 DataType::Float64 => ArrowDataType::Float64,
1120 DataType::Bool => ArrowDataType::Boolean,
1121 _ => ArrowDataType::Utf8,
1122 }
1123 } else {
1124 ArrowDataType::Utf8
1125 }
1126 } else {
1127 ArrowDataType::Utf8
1128 };
1129 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130 }
1131
1132 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134 // Convert rows to Arrow arrays with proper types
1135 let mut columns: Vec<ArrayRef> = Vec::new();
1136 for (col_idx, field) in arrow_fields.iter().enumerate() {
1137 match field.data_type() {
1138 ArrowDataType::Int32 => {
1139 let values: Vec<Option<i32>> = rows
1140 .iter()
1141 .map(|row| {
1142 if col_idx < row.len() {
1143 row[col_idx].parse().ok()
1144 } else {
1145 None
1146 }
1147 })
1148 .collect();
1149 columns.push(Arc::new(Int32Array::from(values)));
1150 }
1151 _ => {
1152 // Default to string
1153 let values: Vec<Option<String>> = rows
1154 .iter()
1155 .map(|row| {
1156 if col_idx < row.len() {
1157 Some(row[col_idx].clone())
1158 } else {
1159 None
1160 }
1161 })
1162 .collect();
1163 columns.push(Arc::new(StringArray::from(values)));
1164 }
1165 }
1166 }
1167
1168 let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169 Ok(vec![batch])
1170 }
1171
1172 fn parse_data_type(type_str: &str) -> Result<DataType> {
1173 use uni_common::core::schema::{CrdtType, PointType};
1174 let type_str = type_str.to_lowercase();
1175 let type_str = type_str.trim();
1176 match type_str {
1177 "string" | "text" | "varchar" => Ok(DataType::String),
1178 "int" | "integer" | "int32" => Ok(DataType::Int32),
1179 "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180 "float" | "float32" | "real" => Ok(DataType::Float32),
1181 "double" | "float64" => Ok(DataType::Float64),
1182 "bool" | "boolean" => Ok(DataType::Bool),
1183 "timestamp" => Ok(DataType::Timestamp),
1184 "date" => Ok(DataType::Date),
1185 "time" => Ok(DataType::Time),
1186 "datetime" => Ok(DataType::DateTime),
1187 "duration" => Ok(DataType::Duration),
1188 "btic" => Ok(DataType::Btic),
1189 "json" | "jsonb" => Ok(DataType::CypherValue),
1190 "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194 s if s.starts_with("vector(") && s.ends_with(')') => {
1195 let dims_str = &s[7..s.len() - 1];
1196 let dimensions = dims_str
1197 .parse::<usize>()
1198 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199 Ok(DataType::Vector { dimensions })
1200 }
1201 s if s.starts_with("list<") && s.ends_with('>') => {
1202 let inner_type_str = &s[5..s.len() - 1];
1203 let inner_type = Self::parse_data_type(inner_type_str)?;
1204 Ok(DataType::List(Box::new(inner_type)))
1205 }
1206 s if s.starts_with("map<") && s.ends_with('>') => {
1207 let (k_str, v_str) = Self::split_map_kv(&s[4..s.len() - 1])?;
1208 let key_type = Self::parse_data_type(&k_str)?;
1209 if !matches!(key_type, DataType::String) {
1210 return Err(anyhow!("MAP key type must be STRING, got: {k_str}"));
1211 }
1212 let value_type = Self::parse_data_type(&v_str)?;
1213 Ok(DataType::Map(Box::new(key_type), Box::new(value_type)))
1214 }
1215 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1216 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1217 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1218 }
1219 }
1220
1221 /// Split a `MAP<K, V>` inner string on the top-level comma, respecting `<>`/`()` depth
1222 /// so nested value types (`STRING, LIST<INT>`, `STRING, MAP<STRING,INT>`) split at the
1223 /// right comma. Returns trimmed `(key, value)` type strings.
1224 fn split_map_kv(inner: &str) -> Result<(String, String)> {
1225 let mut depth = 0i32;
1226 for (i, c) in inner.char_indices() {
1227 match c {
1228 '<' | '(' => depth += 1,
1229 '>' | ')' => depth -= 1,
1230 ',' if depth == 0 => {
1231 let k = inner[..i].trim();
1232 let v = inner[i + 1..].trim();
1233 if k.is_empty() || v.is_empty() {
1234 return Err(anyhow!("MAP<K,V> requires both a key and a value type"));
1235 }
1236 return Ok((k.to_string(), v.to_string()));
1237 }
1238 _ => {}
1239 }
1240 }
1241 Err(anyhow!(
1242 "MAP<K,V> requires a comma separating key and value types"
1243 ))
1244 }
1245
1246 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1247 let sm = self.storage.schema_manager_arc();
1248 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1249 return Ok(());
1250 }
1251 sm.add_label_with_desc(&clause.name, clause.description)?;
1252 for prop in clause.properties {
1253 let dt = Self::parse_data_type(&prop.data_type)?;
1254 sm.add_property_with_desc(
1255 &clause.name,
1256 &prop.name,
1257 dt,
1258 prop.nullable,
1259 prop.description,
1260 )?;
1261 if prop.unique {
1262 let constraint = Constraint {
1263 name: format!("{}_{}_unique", clause.name, prop.name),
1264 constraint_type: ConstraintType::Unique {
1265 properties: vec![prop.name],
1266 },
1267 target: ConstraintTarget::Label(clause.name.clone()),
1268 enabled: true,
1269 };
1270 sm.add_constraint(constraint)?;
1271 }
1272 }
1273 sm.save().await?;
1274 Ok(())
1275 }
1276
1277 /// True if `key` is a generated property on any of the given labels.
1278 /// Used by the partial-write flush path (Round 12 §C) to decide
1279 /// whether the property should be added to `touched_keys` so that
1280 /// Lance MergeInsert sends the recomputed value.
1281 fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1282 let schema = self.storage.schema_manager().schema();
1283 for label in labels {
1284 if let Some(props_meta) = schema.properties.get(label)
1285 && let Some(meta) = props_meta.get(key)
1286 && meta.generation_expression.is_some()
1287 {
1288 return true;
1289 }
1290 }
1291 false
1292 }
1293
1294 pub(crate) async fn enrich_properties_with_generated_columns(
1295 &self,
1296 label_name: &str,
1297 properties: &mut HashMap<String, Value>,
1298 prop_manager: &PropertyManager,
1299 params: &HashMap<String, Value>,
1300 ctx: Option<&QueryContext>,
1301 ) -> Result<()> {
1302 let schema = self.storage.schema_manager().schema();
1303
1304 if let Some(props_meta) = schema.properties.get(label_name) {
1305 let mut generators = Vec::new();
1306 for (prop_name, meta) in props_meta {
1307 if let Some(expr_str) = &meta.generation_expression {
1308 generators.push((prop_name.clone(), expr_str.clone()));
1309 }
1310 }
1311
1312 for (prop_name, expr_str) in generators {
1313 let cache_key = (label_name.to_string(), prop_name.clone());
1314 let expr = {
1315 let cache = self.gen_expr_cache.read().await;
1316 cache.get(&cache_key).cloned()
1317 };
1318
1319 let expr = match expr {
1320 Some(e) => e,
1321 None => {
1322 let parsed = uni_cypher::parse_expression(&expr_str)
1323 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1324 let mut cache = self.gen_expr_cache.write().await;
1325 cache.insert(cache_key, parsed.clone());
1326 parsed
1327 }
1328 };
1329
1330 let mut scope = HashMap::new();
1331
1332 // If expression has an explicit variable, use it as an object
1333 if let Some(var) = expr.extract_variable() {
1334 scope.insert(var, Value::Map(properties.clone()));
1335 } else {
1336 // No explicit variable - add properties directly to scope for bare references
1337 // e.g., "lower(email)" can reference "email" directly
1338 for (k, v) in properties.iter() {
1339 scope.insert(k.clone(), v.clone());
1340 }
1341 }
1342
1343 let val = self
1344 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1345 .await?;
1346 properties.insert(prop_name, val);
1347 }
1348 }
1349 Ok(())
1350 }
1351
1352 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1353 let sm = self.storage.schema_manager_arc();
1354 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1355 return Ok(());
1356 }
1357 sm.add_edge_type_with_desc(
1358 &clause.name,
1359 clause.src_labels,
1360 clause.dst_labels,
1361 clause.description,
1362 )?;
1363 for prop in clause.properties {
1364 let dt = Self::parse_data_type(&prop.data_type)?;
1365 sm.add_property_with_desc(
1366 &clause.name,
1367 &prop.name,
1368 dt,
1369 prop.nullable,
1370 prop.description,
1371 )?;
1372 }
1373 sm.save().await?;
1374 Ok(())
1375 }
1376
1377 /// Executes an ALTER action on a schema entity.
1378 ///
1379 /// This is a shared helper for both `execute_alter_label` and
1380 /// `execute_alter_edge_type` since they have identical logic.
1381 pub(crate) async fn execute_alter_entity(
1382 sm: &Arc<SchemaManager>,
1383 entity_name: &str,
1384 action: AlterAction,
1385 ) -> Result<()> {
1386 match action {
1387 AlterAction::AddProperty(prop) => {
1388 let dt = Self::parse_data_type(&prop.data_type)?;
1389 sm.add_property_with_desc(
1390 entity_name,
1391 &prop.name,
1392 dt,
1393 prop.nullable,
1394 prop.description,
1395 )?;
1396 }
1397 AlterAction::DropProperty(prop_name) => {
1398 sm.drop_property(entity_name, &prop_name)?;
1399 }
1400 AlterAction::RenameProperty { old_name, new_name } => {
1401 sm.rename_property(entity_name, &old_name, &new_name)?;
1402 }
1403 AlterAction::SetDescription(desc) => {
1404 if sm.schema().labels.contains_key(entity_name) {
1405 sm.set_label_description(entity_name, desc)?;
1406 } else {
1407 sm.set_edge_type_description(entity_name, desc)?;
1408 }
1409 }
1410 AlterAction::SetPropertyDescription {
1411 property,
1412 description,
1413 } => {
1414 sm.set_property_description(entity_name, &property, description)?;
1415 }
1416 }
1417 sm.save().await?;
1418 Ok(())
1419 }
1420
1421 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1422 Self::execute_alter_entity(
1423 &self.storage.schema_manager_arc(),
1424 &clause.name,
1425 clause.action,
1426 )
1427 .await
1428 }
1429
1430 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1431 Self::execute_alter_entity(
1432 &self.storage.schema_manager_arc(),
1433 &clause.name,
1434 clause.action,
1435 )
1436 .await
1437 }
1438
1439 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1440 let sm = self.storage.schema_manager_arc();
1441 sm.drop_label(&clause.name, clause.if_exists)?;
1442 sm.save().await?;
1443 Ok(())
1444 }
1445
1446 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1447 let sm = self.storage.schema_manager_arc();
1448 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1449 sm.save().await?;
1450 Ok(())
1451 }
1452
1453 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1454 let sm = self.storage.schema_manager_arc();
1455 let target = ConstraintTarget::Label(clause.label);
1456 let c_type = match clause.constraint_type {
1457 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1458 properties: clause.properties,
1459 },
1460 AstConstraintType::Exists => {
1461 let property = clause
1462 .properties
1463 .into_iter()
1464 .next()
1465 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1466 ConstraintType::Exists { property }
1467 }
1468 AstConstraintType::Check => {
1469 let expression = clause
1470 .expression
1471 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1472 ConstraintType::Check {
1473 expression: expression.to_string_repr(),
1474 }
1475 }
1476 };
1477
1478 let constraint = Constraint {
1479 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1480 constraint_type: c_type,
1481 target,
1482 enabled: true,
1483 };
1484
1485 sm.add_constraint(constraint)?;
1486 sm.save().await?;
1487 Ok(())
1488 }
1489
1490 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1491 let sm = self.storage.schema_manager_arc();
1492 sm.drop_constraint(&clause.name, false)?;
1493 sm.save().await?;
1494 Ok(())
1495 }
1496
1497 /// Detects the single-node, single-label MERGE shape the fast path serves.
1498 ///
1499 /// Returns the node pattern and its label when `pattern` is one path with
1500 /// one node element, exactly one label, and a static map-literal property
1501 /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1502 /// per-row query planning. The keys do NOT need to be indexed: the persisted
1503 /// lookup degrades to a (single, filtered) label scan when no scalar index
1504 /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1505 /// Any other shape (edges, multiple labels, non-literal properties) returns
1506 /// `None` so the caller uses the general per-row path.
1507 fn merge_single_node_fastpath<'p>(
1508 &self,
1509 pattern: &'p Pattern,
1510 ) -> Option<(&'p NodePattern, String)> {
1511 if pattern.paths.len() != 1 {
1512 return None;
1513 }
1514 let path = &pattern.paths[0];
1515 if path.elements.len() != 1 {
1516 return None;
1517 }
1518 let PatternElement::Node(n) = &path.elements[0] else {
1519 return None;
1520 };
1521 let labels = n.labels.names();
1522 if labels.len() != 1 {
1523 return None;
1524 }
1525 // The key must be a static map literal so the key names are known.
1526 let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1527 return None;
1528 };
1529 if entries.is_empty() {
1530 return None;
1531 }
1532 // Resolve the label to its schema-canonical case so the fast path agrees
1533 // with the general MERGE path (which matches labels case-insensitively).
1534 // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1535 // scans/keys a different label than the canonical one and creates a
1536 // duplicate (review #3a). Falls back to the as-written label when the
1537 // schema does not know it (schemaless).
1538 let canonical = self
1539 .storage
1540 .schema_manager()
1541 .schema()
1542 .canonical_label_name(&labels[0])
1543 .unwrap_or_else(|| labels[0].clone());
1544 Some((n, canonical))
1545 }
1546
1547 /// RC3: detect the bound-endpoints, anonymous-edge relationship MERGE shape
1548 /// `(a)-[:TYPE]->(b)` whose edge existence can be resolved with one O(1)
1549 /// adjacency probe instead of building and running a per-row traversal
1550 /// `LogicalPlan` (the general path is ~19x the bulk CREATE of the same edges).
1551 ///
1552 /// Returns `(source_var, target_var, edge_type_id, direction)` when the
1553 /// pattern is exactly one path of `[Node, Rel, Node]` where the relationship
1554 /// is a single concrete type, **anonymous** (no variable → no edge binding to
1555 /// reproduce), fixed-length, and unfiltered, and both endpoint nodes are plain
1556 /// variables with no re-specified MERGE properties or inline WHERE (those are
1557 /// filters only the general path applies). Any deviation returns `None` and
1558 /// the caller keeps the general path. The caller still verifies per row that
1559 /// both endpoints are actually bound to vids.
1560 fn merge_relationship_fastpath_shape(
1561 &self,
1562 pattern: &Pattern,
1563 ) -> Option<(
1564 String,
1565 String,
1566 u32,
1567 uni_store::storage::direction::Direction,
1568 )> {
1569 if pattern.paths.len() != 1 {
1570 return None;
1571 }
1572 let [
1573 PatternElement::Node(a),
1574 PatternElement::Relationship(r),
1575 PatternElement::Node(b),
1576 ] = pattern.paths[0].elements.as_slice()
1577 else {
1578 return None;
1579 };
1580 // Endpoints: plain variables, no extra MERGE-pattern properties / inline
1581 // WHERE (a re-specified property is a filter the general path must apply).
1582 let src_var = a.variable.as_ref()?;
1583 let dst_var = b.variable.as_ref()?;
1584 if a.properties.is_some()
1585 || a.where_clause.is_some()
1586 || b.properties.is_some()
1587 || b.where_clause.is_some()
1588 {
1589 return None;
1590 }
1591 // Relationship: single concrete type, anonymous, fixed-length, unfiltered.
1592 if r.variable.is_some()
1593 || r.range.is_some()
1594 || r.properties.is_some()
1595 || r.where_clause.is_some()
1596 || r.types.names().len() != 1
1597 {
1598 return None;
1599 }
1600 let type_name = &r.types.names()[0];
1601 let type_id = if self.config.strict_schema {
1602 self.storage
1603 .schema_manager()
1604 .schema()
1605 .edge_type_id_by_name_case_insensitive(type_name)?
1606 } else {
1607 self.storage
1608 .schema_manager()
1609 .get_or_assign_edge_type_id(type_name)
1610 };
1611 let dir = match r.direction {
1612 uni_cypher::ast::Direction::Outgoing => {
1613 uni_store::storage::direction::Direction::Outgoing
1614 }
1615 uni_cypher::ast::Direction::Incoming => {
1616 uni_store::storage::direction::Direction::Incoming
1617 }
1618 // Undirected existence is ambiguous to encode as one probe; the
1619 // general path handles it.
1620 uni_cypher::ast::Direction::Both => return None,
1621 };
1622 Some((src_var.clone(), dst_var.clone(), type_id, dir))
1623 }
1624
1625 /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1626 /// is not a scalar this fast path can represent.
1627 ///
1628 /// Returning `None` makes the caller fall back to the general per-row path,
1629 /// so unusual key value types (lists, maps, temporals, nulls) are never
1630 /// silently mis-matched. The `_deleted = false` clause mirrors the
1631 /// persisted-read predicate used elsewhere; the version high-water-mark
1632 /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1633 fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1634 if key_props.is_empty() {
1635 return None;
1636 }
1637 let mut parts = Vec::with_capacity(key_props.len() + 1);
1638 for (k, v) in key_props {
1639 if !Self::is_safe_key_ident(k) {
1640 return None;
1641 }
1642 let lit = Self::render_key_literal(v)?;
1643 // Unquoted identifier: the Lance filter parser does not resolve a
1644 // double-quoted column name against the table here, so `"k" = v`
1645 // silently matches nothing. Keys are validated above to be safe
1646 // bare identifiers.
1647 parts.push(format!("{k} = {lit}"));
1648 }
1649 parts.push("_deleted = false".to_string());
1650 Some(parts.join(" AND "))
1651 }
1652
1653 /// True when a MERGE key name is a safe bare identifier for a Lance
1654 /// filter (issue #8). Keys come from a static map literal, but validate
1655 /// anyway.
1656 fn is_safe_key_ident(k: &str) -> bool {
1657 !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1658 }
1659
1660 /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1661 /// for value types this fast path cannot represent (lists, maps,
1662 /// temporals, nulls) — the caller then falls back to the general path.
1663 fn render_key_literal(v: &Value) -> Option<String> {
1664 Some(match v {
1665 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1666 Value::Int(i) => i.to_string(),
1667 Value::Float(f) => f.to_string(),
1668 Value::Bool(b) => b.to_string(),
1669 _ => return None,
1670 })
1671 }
1672
1673 /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1674 /// sorted by `key_names` order, values canonicalized).
1675 ///
1676 /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1677 /// never compares mixed literal types against one column); composite keys
1678 /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1679 /// the same `_deleted = false` clause the per-row filter used.
1680 fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1681 if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1682 return None;
1683 }
1684 let disjunction = if let [key] = key_names {
1685 // Group literals by value variant so each IN list is homogeneous.
1686 let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1687 for tuple in keys {
1688 let (_, v) = tuple.first()?;
1689 groups
1690 .entry(std::mem::discriminant(v))
1691 .or_default()
1692 .push(Self::render_key_literal(v)?);
1693 }
1694 groups
1695 .into_values()
1696 .map(|lits| {
1697 if let [lit] = lits.as_slice() {
1698 format!("{key} = {lit}")
1699 } else {
1700 format!("{key} IN ({})", lits.join(", "))
1701 }
1702 })
1703 .collect::<Vec<_>>()
1704 .join(" OR ")
1705 } else {
1706 keys.iter()
1707 .map(|tuple| {
1708 let conj = tuple
1709 .iter()
1710 .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1711 .collect::<Option<Vec<_>>>()?
1712 .join(" AND ");
1713 Some(format!("({conj})"))
1714 })
1715 .collect::<Option<Vec<_>>>()?
1716 .join(" OR ")
1717 };
1718 Some(format!("({disjunction}) AND _deleted = false"))
1719 }
1720
1721 /// Canonicalize a numeric MERGE-key value for *matching only*.
1722 ///
1723 /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1724 /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1725 /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1726 /// path already applies (review #3a). Non-numeric and non-integral values are
1727 /// returned unchanged. Used only to build match keys / comparisons, never the
1728 /// value written to a created node.
1729 fn canonical_key_value(v: &Value) -> Value {
1730 match v {
1731 Value::Float(f)
1732 if f.is_finite()
1733 && f.fract() == 0.0
1734 && *f >= i64::MIN as f64
1735 && *f <= i64::MAX as f64 =>
1736 {
1737 Value::Int(*f as i64)
1738 }
1739 other => other.clone(),
1740 }
1741 }
1742
1743 /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1744 ///
1745 /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1746 /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1747 /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1748 /// created node's properties come from the original, un-canonicalized map.
1749 fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1750 let mut tuple: MergeKey = key_props
1751 .iter()
1752 .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1753 .collect();
1754 tuple.sort_by(|a, b| a.0.cmp(&b.0));
1755 tuple
1756 }
1757
1758 /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1759 ///
1760 /// Walked once per MERGE statement (issue #69): the per-row fast path then
1761 /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1762 /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1763 /// rows and rows created earlier in the same transaction; rows created by
1764 /// later rows of this same statement are folded in incrementally by
1765 /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1766 /// [`Self::merge_key_tuple`].
1767 fn merge_l0_existing(
1768 &self,
1769 label: &str,
1770 key_names: &[String],
1771 ctx: Option<&QueryContext>,
1772 ) -> HashMap<MergeKey, Vec<Vid>> {
1773 let mut candidates: Vec<Vid> = Vec::new();
1774 l0_visibility::visit_l0_buffers(ctx, |l0| {
1775 if let Some(vids) = l0.label_to_vids.get(label) {
1776 candidates.extend(vids.iter().copied());
1777 }
1778 false
1779 });
1780
1781 let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1782 let mut seen: HashSet<Vid> = HashSet::new();
1783 for vid in candidates {
1784 if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1785 continue;
1786 }
1787 // `lookup_vertex_prop` merges across L0 layers (newest wins).
1788 let tuple: MergeKey = key_names
1789 .iter()
1790 .map(|k| {
1791 let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1792 (k.clone(), Self::canonical_key_value(&v))
1793 })
1794 .collect();
1795 map.entry(tuple).or_default().push(vid);
1796 }
1797 map
1798 }
1799
1800 /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1801 /// size and Lance/DataFusion parse cost; chunks run sequentially.
1802 const MERGE_SCAN_CHUNK: usize = 1000;
1803
1804 /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1805 /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1806 /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1807 /// independent Lance scans).
1808 ///
1809 /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1810 /// read path `MATCH` uses, so it honors the version high-water-mark and
1811 /// sees flushed rows. On the declared-label branch the key-filtered scan
1812 /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1813 /// picks each candidate's max-`_version` row and requires it to be live
1814 /// and still keyed as requested (per-label tables are MVCC-append, so a
1815 /// superseded version's row would otherwise stale-match a rewritten key).
1816 /// Matched rows are grouped by their CANONICAL key tuple (stored values
1817 /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1818 /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1819 /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1820 /// by earlier rows of the same statement) is NOT checked here — the
1821 /// per-row consumer re-checks at row time, exactly as the old per-row
1822 /// scan did.
1823 ///
1824 /// The second returned map carries the FULL property maps the schemaless
1825 /// branch already decoded for each matched vid (empty on the declared-label
1826 /// branch, which projects only key columns) — the caller seeds the
1827 /// statement-level [`Prefetch`] from it at zero extra scans.
1828 ///
1829 /// # Errors
1830 /// Propagates persisted-scan and filter-build failures — fail-closed: a
1831 /// MERGE must never treat a failed lookup as "no match" and create
1832 /// duplicates.
1833 async fn merge_lookup_persisted_batch(
1834 &self,
1835 label: &str,
1836 key_names: &[String],
1837 keys: &HashSet<MergeKey>,
1838 ) -> Result<(
1839 HashMap<MergeKey, Vec<Vid>>,
1840 HashMap<Vid, uni_common::Properties>,
1841 )> {
1842 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1843 if keys.is_empty() {
1844 return Ok((out, HashMap::new()));
1845 }
1846 // An undeclared (schemaless) label has no per-label table — its flushed
1847 // rows live only in the unified main vertex table. Route to the
1848 // main-table lookup, mirroring the planner's scan routing (a schemaless
1849 // MATCH plans `ScanMainByLabels` on the same schema predicate).
1850 if self
1851 .storage
1852 .schema_manager()
1853 .schema()
1854 .get_label_case_insensitive(label)
1855 .is_none()
1856 {
1857 return self
1858 .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1859 .await;
1860 }
1861 // Declared label — the per-label table is MVCC-append (an update
1862 // flush adds a higher-`_version` row for the same vid) and the key
1863 // predicate is pushed into the Lance filter, so a SUPERSEDED version
1864 // whose row still carries a requested key is returned while the vid's
1865 // current row (key rewritten, fails the filter) is invisible to the
1866 // scan. Version dedup among the returned rows cannot detect that, so
1867 // the lookup runs in two passes: the key-filtered scan only nominates
1868 // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1869 // each candidate's max-`_version` row to be live and still keyed as
1870 // requested.
1871 let mut columns: Vec<&str> = vec!["_vid"];
1872 columns.extend(key_names.iter().map(String::as_str));
1873
1874 let key_list: Vec<&MergeKey> = keys.iter().collect();
1875 let mut candidates: Vec<Vid> = Vec::new();
1876 let mut seen: HashSet<Vid> = HashSet::new();
1877 for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1878 let filter = Self::merge_batch_filter(key_names, chunk)
1879 .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1880 let scanned = self
1881 .storage
1882 .scan_vertex_table(label, &columns, Some(&filter))
1883 .await?;
1884 let Some(batch) = scanned else { continue };
1885 let Some(vid_col) = batch
1886 .column_by_name("_vid")
1887 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1888 else {
1889 continue;
1890 };
1891 for i in 0..vid_col.len() {
1892 let vid = Vid::from(vid_col.value(i));
1893 if seen.insert(vid) {
1894 candidates.push(vid);
1895 }
1896 }
1897 }
1898
1899 // Verification pass — tombstones are NOT filtered Lance-side (the
1900 // max-version pick must see them so a deleted winner cannot let an
1901 // older live version resurrect the match), exactly like the
1902 // schemaless branch below.
1903 let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1904 verify_columns.extend(key_names.iter().map(String::as_str));
1905 for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1906 let vid_list = chunk
1907 .iter()
1908 .map(|v| v.as_u64().to_string())
1909 .collect::<Vec<_>>()
1910 .join(", ");
1911 let filter = format!("_vid IN ({vid_list})");
1912 let scanned = self
1913 .storage
1914 .scan_vertex_table(label, &verify_columns, Some(&filter))
1915 .await?;
1916 let Some(batch) = scanned else { continue };
1917 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1918 batch
1919 .column_by_name("_vid")
1920 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1921 batch
1922 .column_by_name("_deleted")
1923 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1924 batch
1925 .column_by_name("_version")
1926 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1927 ) else {
1928 return Err(anyhow!(
1929 "MERGE batched lookup: verification scan missing a required column"
1930 ));
1931 };
1932 let key_cols: Vec<_> = key_names
1933 .iter()
1934 .map(|k| batch.column_by_name(k))
1935 .collect::<Option<Vec<_>>>()
1936 .ok_or_else(|| {
1937 anyhow!("MERGE batched lookup: projected key column missing from scan result")
1938 })?;
1939 // Per-vid MVCC dedup: keep the highest-version row for each vid.
1940 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1941 for i in 0..batch.num_rows() {
1942 let vid = Vid::from(vid_col.value(i));
1943 let ver = ver_col.value(i);
1944 let entry = winners.entry(vid).or_insert((ver, i));
1945 if ver > entry.0 {
1946 *entry = (ver, i);
1947 }
1948 }
1949 for (vid, (_ver, row)) in winners {
1950 if del_col.value(row) {
1951 continue;
1952 }
1953 let tuple: MergeKey = key_names
1954 .iter()
1955 .zip(&key_cols)
1956 .map(|(k, col)| {
1957 let v = uni_store::storage::arrow_convert::arrow_to_value(
1958 col.as_ref(),
1959 row,
1960 None,
1961 );
1962 (k.clone(), Self::canonical_key_value(&v))
1963 })
1964 .collect();
1965 if keys.contains(&tuple) {
1966 out.entry(tuple).or_default().push(vid);
1967 }
1968 }
1969 }
1970 Ok((out, HashMap::new()))
1971 }
1972
1973 /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1974 ///
1975 /// Schemaless rows live only in the unified main vertex table (per-label
1976 /// tables exist only for declared labels), with all properties encoded in
1977 /// the `props_json` CypherValue blob — so key values cannot be pushed into
1978 /// the Lance filter; the key match happens in memory after decoding,
1979 /// exactly like the schemaless MATCH scan. One main-table scan regardless
1980 /// of key count.
1981 ///
1982 /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1983 /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1984 /// version per vid); the per-vid max-`_version` dedup runs here, then
1985 /// deleted winners are dropped.
1986 ///
1987 /// Also returns the full decoded property map per matched vid — the blob
1988 /// is decoded here anyway, and the caller seeds the statement-level
1989 /// [`Prefetch`] from it instead of re-reading per row.
1990 ///
1991 /// # Errors
1992 /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
1993 /// never treat a failed lookup as "no match" and create duplicates.
1994 async fn merge_lookup_persisted_batch_schemaless(
1995 &self,
1996 label: &str,
1997 key_names: &[String],
1998 keys: &HashSet<MergeKey>,
1999 ) -> Result<(
2000 HashMap<MergeKey, Vec<Vid>>,
2001 HashMap<Vid, uni_common::Properties>,
2002 )> {
2003 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2004 let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
2005 let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
2006 let Some(batch) = self
2007 .storage
2008 .scan_main_vertex_table(
2009 &["_vid", "_deleted", "props_json", "_version"],
2010 Some(&filter),
2011 )
2012 .await?
2013 else {
2014 return Ok((out, props_by_vid));
2015 };
2016 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
2017 batch
2018 .column_by_name("_vid")
2019 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2020 batch
2021 .column_by_name("_deleted")
2022 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
2023 batch
2024 .column_by_name("_version")
2025 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2026 ) else {
2027 return Err(anyhow!(
2028 "schemaless MERGE lookup: main vertex table scan missing a required column"
2029 ));
2030 };
2031 let props_col = batch
2032 .column_by_name("props_json")
2033 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2034
2035 // Per-vid MVCC dedup: keep the highest-version row for each vid.
2036 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
2037 for i in 0..batch.num_rows() {
2038 let vid = Vid::from(vid_col.value(i));
2039 let ver = ver_col.value(i);
2040 let entry = winners.entry(vid).or_insert((ver, i));
2041 if ver > entry.0 {
2042 *entry = (ver, i);
2043 }
2044 }
2045 for (vid, (_ver, row)) in winners {
2046 // Drop deletion tombstones AFTER picking the winner — a deleted
2047 // winner must not let an older live version resurrect the match.
2048 if del_col.value(row) {
2049 continue;
2050 }
2051 // A row without properties matches only an all-Null key tuple.
2052 let props = match props_col {
2053 Some(arr) if !arrow_array::Array::is_null(arr, row) => {
2054 match uni_common::cypher_value_codec::decode(arr.value(row))
2055 .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
2056 {
2057 Value::Map(m) => m,
2058 _ => HashMap::new(),
2059 }
2060 }
2061 _ => HashMap::new(),
2062 };
2063 let tuple: MergeKey = key_names
2064 .iter()
2065 .map(|k| {
2066 (
2067 k.clone(),
2068 Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
2069 )
2070 })
2071 .collect();
2072 if keys.contains(&tuple) {
2073 out.entry(tuple).or_default().push(vid);
2074 props_by_vid.insert(vid, props);
2075 }
2076 }
2077 Ok((out, props_by_vid))
2078 }
2079
2080 /// True if the statement-level MERGE property prefetch is safe for `label`.
2081 ///
2082 /// False when the label declares any CRDT-typed property: a prefetch HIT in
2083 /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
2084 /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
2085 /// labels keep the per-row read path. Undeclared labels are trivially safe
2086 /// (normalization is a no-op without schema CRDT entries).
2087 fn merge_label_prefetch_safe(&self, label: &str) -> bool {
2088 let schema = self.storage.schema_manager().schema();
2089 schema.properties.get(label).is_none_or(|props| {
2090 !props
2091 .values()
2092 .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
2093 })
2094 }
2095
2096 /// True if an L0 override rewrote any key column of a persisted match away
2097 /// from its requested value (so the persisted row no longer matches).
2098 fn vid_overrides_break_key(
2099 vid: Vid,
2100 key_props: &HashMap<String, Value>,
2101 ctx: Option<&QueryContext>,
2102 ) -> bool {
2103 key_props.iter().any(|(k, want)| {
2104 matches!(
2105 l0_visibility::lookup_vertex_prop(vid, k, ctx),
2106 Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
2107 )
2108 })
2109 }
2110
2111 /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2112 /// node variable.
2113 ///
2114 /// Matches the binding shape produced by `execute_create_pattern` and the
2115 /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2116 /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2117 /// valid node binding for those consumers.
2118 fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2119 let mut obj = HashMap::new();
2120 obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2121 obj.insert(
2122 "_labels".to_string(),
2123 Value::List(vec![Value::String(label.to_string())]),
2124 );
2125 for (k, v) in props {
2126 obj.insert(k, v);
2127 }
2128 Value::Map(obj)
2129 }
2130
2131 /// True if an L0-only vertex has every key column set to the requested
2132 /// value. A missing column matches only a requested `Null`.
2133 fn l0_vid_matches_key(
2134 vid: Vid,
2135 key_props: &HashMap<String, Value>,
2136 ctx: Option<&QueryContext>,
2137 ) -> bool {
2138 key_props.iter().all(
2139 |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2140 Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2141 None => *want == Value::Null,
2142 },
2143 )
2144 }
2145
2146 /// Index fast-path execution for one MERGE row of the shape detected by
2147 /// [`Self::merge_single_node_fastpath`].
2148 ///
2149 /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2150 /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2151 /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2152 /// applies ON MATCH SET to every match, or creates the node and applies
2153 /// ON CREATE SET when there is none. A newly created vertex is folded into
2154 /// `existing` so a later row of the same batch with the same key matches it
2155 /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2156 /// match, or one for a create).
2157 ///
2158 /// `prefetched` is the statement-level property prefetch (`None` when the
2159 /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2160 /// vids carry their persisted base row, freshly created vids are seeded
2161 /// with an empty base — per-row reads then resolve as base + L0 layering
2162 /// (every SET flush writes the full row to L0 before the next read, so a
2163 /// prefetch hit equals a fresh read) instead of one storage scan each.
2164 ///
2165 /// # Errors
2166 /// Propagates evaluation, create, and SET failures.
2167 #[expect(
2168 clippy::too_many_arguments,
2169 reason = "mirrors execute_merge's threaded execution state"
2170 )]
2171 async fn execute_merge_row_indexed(
2172 &self,
2173 label: &str,
2174 node: &NodePattern,
2175 path_pattern: &Pattern,
2176 temp_vars: &[String],
2177 mut row: HashMap<String, Value>,
2178 key_props: &HashMap<String, Value>,
2179 persisted: &HashMap<MergeKey, Vec<Vid>>,
2180 key_tuple: &MergeKey,
2181 existing: &mut HashMap<MergeKey, Vec<Vid>>,
2182 on_match: Option<&SetClause>,
2183 on_create: Option<&SetClause>,
2184 prop_manager: &PropertyManager,
2185 params: &HashMap<String, Value>,
2186 ctx: Option<&QueryContext>,
2187 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2188 writer: &Writer,
2189 mut prefetched: Option<&mut Prefetch>,
2190 ) -> Result<Vec<HashMap<String, Value>>> {
2191 let empty_prefetch = Prefetch::default();
2192 let mut seen: HashSet<Vid> = HashSet::new();
2193 let mut matches: Vec<Vid> = Vec::new();
2194 // Persisted (flushed) matches from the per-statement prefetch. The
2195 // prefetch is static for the statement, so re-verify liveness at row
2196 // time — an earlier row of this batch may have deleted the candidate
2197 // or rewritten its key (the old per-row scan saw those through its L0
2198 // overlay checks; these are the same checks, moved to row time).
2199 if let Some(vids) = persisted.get(key_tuple) {
2200 for &vid in vids {
2201 if l0_visibility::is_vertex_deleted(vid, ctx) {
2202 continue;
2203 }
2204 if Self::vid_overrides_break_key(vid, key_props, ctx) {
2205 continue;
2206 }
2207 if seen.insert(vid) {
2208 matches.push(vid);
2209 }
2210 }
2211 }
2212 // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2213 // in case a prior row of this batch mutated or deleted the candidate.
2214 if let Some(vids) = existing.get(key_tuple) {
2215 for &vid in vids {
2216 if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2217 continue;
2218 }
2219 if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2220 matches.push(vid);
2221 }
2222 }
2223 }
2224
2225 let mut out = Vec::new();
2226 if matches.is_empty() {
2227 // No match: create the node, then apply ON CREATE SET. Fold the
2228 // ON CREATE SET property assignments into seed props first so a
2229 // NOT-NULL property supplied only by ON CREATE SET passes
2230 // create-time validation (RC4); the post-create SET below settles
2231 // the final values.
2232 let seed_props = self
2233 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2234 .await?;
2235 self.execute_create_pattern(
2236 path_pattern,
2237 &mut row,
2238 writer,
2239 prop_manager,
2240 params,
2241 ctx,
2242 tx_l0_override,
2243 Some(&seed_props),
2244 )
2245 .await?;
2246 // Fold the new vertex into the batch snapshot for intra-batch
2247 // dedup, and seed the statement prefetch with an empty base: a
2248 // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2249 // resolves from the L0 row the create just wrote instead of
2250 // issuing a per-row storage scan that finds nothing.
2251 if let Some(var) = &node.variable
2252 && let Some(val) = row.get(var)
2253 && let Ok(vid) = Self::vid_from_value(val)
2254 {
2255 existing.entry(key_tuple.clone()).or_default().push(vid);
2256 if let Some(p) = prefetched.as_deref_mut() {
2257 p.vertex.entry(vid).or_default();
2258 }
2259 // Phantom guard (RC2): register this MERGE-create's key so a
2260 // concurrent MERGE of the same key aborts retriably at commit
2261 // (converging to one node) instead of silently duplicating —
2262 // even with no declared UNIQUE constraint. Only inside a
2263 // transaction, where commit re-probes the guard; a plain CREATE
2264 // never registers a key, so it is unaffected.
2265 if let Some(tx_l0) = tx_l0_override {
2266 let key_values: Vec<(String, Value)> = key_props
2267 .iter()
2268 .map(|(k, v)| (k.clone(), v.clone()))
2269 .collect();
2270 let guard_key =
2271 uni_store::runtime::l0::serialize_constraint_key(label, &key_values);
2272 tx_l0.write().insert_merge_guard_key(guard_key, vid);
2273 }
2274 }
2275 if let Some(set) = on_create {
2276 self.execute_set_items_locked(
2277 &set.items,
2278 &mut row,
2279 writer,
2280 prop_manager,
2281 params,
2282 ctx,
2283 tx_l0_override,
2284 prefetched.as_deref().unwrap_or(&empty_prefetch),
2285 )
2286 .await?;
2287 }
2288 Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2289 out.push(row);
2290 } else {
2291 // Apply ON MATCH SET to every matched node (multi-match semantics),
2292 // binding the node variable as a Map with _vid/_labels/props so
2293 // RETURN and downstream operators resolve it as they would for the
2294 // general MATCH and CREATE paths.
2295 for vid in matches {
2296 let mut m = row.clone();
2297 if let Some(var) = &node.variable {
2298 // Minimal binding so ON MATCH SET resolves the node by _vid.
2299 m.insert(
2300 var.clone(),
2301 Self::build_node_map(vid, label, HashMap::new()),
2302 );
2303 }
2304 if let Some(set) = on_match {
2305 self.execute_set_items_locked(
2306 &set.items,
2307 &mut m,
2308 writer,
2309 prop_manager,
2310 params,
2311 ctx,
2312 tx_l0_override,
2313 prefetched.as_deref().unwrap_or(&empty_prefetch),
2314 )
2315 .await?;
2316 }
2317 if let Some(var) = &node.variable {
2318 // Rebind with full, post-SET properties for RETURN
2319 // fidelity. The SET above flushed the full row to L0, so a
2320 // prefetch hit (base + L0 layering) reproduces exactly
2321 // what a fresh storage read would return.
2322 let props = read_vertex_props_with_prefetch(
2323 vid,
2324 prefetched.as_deref().unwrap_or(&empty_prefetch),
2325 prop_manager,
2326 ctx,
2327 )
2328 .await?;
2329 m.insert(var.clone(), Self::build_node_map(vid, label, props));
2330 }
2331 Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2332 out.push(m);
2333 }
2334 }
2335 Ok(out)
2336 }
2337
2338 #[expect(clippy::too_many_arguments)]
2339 pub(crate) async fn execute_merge(
2340 &self,
2341 rows: Vec<HashMap<String, Value>>,
2342 pattern: &Pattern,
2343 on_match: Option<&SetClause>,
2344 on_create: Option<&SetClause>,
2345 prop_manager: &PropertyManager,
2346 params: &HashMap<String, Value>,
2347 ctx: Option<&QueryContext>,
2348 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2349 ) -> Result<Vec<HashMap<String, Value>>> {
2350 let writer_lock = self
2351 .writer
2352 .as_ref()
2353 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2354
2355 // Prepare pattern for path variable binding: assign temp edge variable
2356 // names to unnamed relationships in paths that have path variables.
2357 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2358
2359 // Issue #69: a single-node, single-label MERGE takes the fast path,
2360 // skipping the per-row query planning that made batched MERGE no faster
2361 // than a per-entity loop. Indexed keys get an index point-lookup;
2362 // un-indexed keys still skip planning (the lookup is a filtered scan).
2363 // The shape is the same for every row, so it is detected once.
2364 let fastpath = self.merge_single_node_fastpath(pattern);
2365
2366 // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2367 // fast path then resolves L0/intra-batch matches with an O(1) lookup
2368 // instead of re-walking L0 for every row. `key_names` is the sorted
2369 // static key set, matching `merge_key_tuple`.
2370 let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2371 // Per-row pre-evaluated fast-path keys (None = that row falls back to
2372 // the general path), and the per-statement persisted prefetch over the
2373 // deduped key tuples — ONE chunked scan instead of one scan per row.
2374 // Key expressions only see the row's own bindings + params, so
2375 // evaluating them ahead of any creates cannot observe earlier rows.
2376 let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2377 let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2378 // Statement-level property prefetch for the fast path (review perf
2379 // residual): every persisted match's full row is batch-read ONCE, so
2380 // the per-row ON MATCH SET read and the post-SET rebind resolve as
2381 // prefetch-base + L0 layering instead of one storage scan each.
2382 // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2383 // skips CRDT normalization).
2384 let mut merge_prefetch: Option<Prefetch> = None;
2385 if let Some((node, label)) = &fastpath {
2386 let mut key_names: Vec<String> = match &node.properties {
2387 Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2388 _ => Vec::new(),
2389 };
2390 key_names.sort();
2391 fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2392
2393 row_fast.reserve(rows.len());
2394 for row in &rows {
2395 let mut key_props: HashMap<String, Value> = HashMap::new();
2396 if let Some(props_expr) = &node.properties
2397 && let Value::Map(map) = self
2398 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2399 .await?
2400 {
2401 key_props = map;
2402 }
2403 // Only rows whose every key value is a scalar the persisted
2404 // scan can express take the fast path (same gate as before,
2405 // via the filter builder).
2406 if Self::merge_key_filter(&key_props).is_some() {
2407 let tuple = Self::merge_key_tuple(&key_props);
2408 row_fast.push(Some((key_props, tuple)));
2409 } else {
2410 row_fast.push(None);
2411 }
2412 }
2413 let unique_keys: HashSet<MergeKey> = row_fast
2414 .iter()
2415 .flatten()
2416 .map(|(_, tuple)| tuple.clone())
2417 .collect();
2418 let (persisted, schemaless_props) = self
2419 .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2420 .await?;
2421 fast_persisted = persisted;
2422 if self.merge_label_prefetch_safe(label) {
2423 let mut pf = Prefetch::default();
2424 if !schemaless_props.is_empty() {
2425 // The schemaless lookup already decoded each matched vid's
2426 // full property map — zero extra scans.
2427 pf.vertex.extend(schemaless_props);
2428 } else {
2429 let vids: Vec<Vid> = fast_persisted
2430 .values()
2431 .flatten()
2432 .copied()
2433 .collect::<HashSet<Vid>>()
2434 .into_iter()
2435 .collect();
2436 if !vids.is_empty()
2437 && let Ok(batch_props) = prop_manager
2438 .get_batch_vertex_props_for_label(&vids, label, ctx)
2439 .await
2440 {
2441 // One `_vid IN (…)` scan for every matched row's base.
2442 // On Err the map stays empty — every read falls back to
2443 // the per-row path (fail-open, same posture as
2444 // prefetch_set_targets).
2445 pf.vertex.extend(batch_props);
2446 }
2447 }
2448 merge_prefetch = Some(pf);
2449 }
2450 }
2451
2452 // RC3: relationship-MERGE existence fast-path. The single-node fast path
2453 // above does not cover `(a)-[:R]->(b)`; the general path rebuilds and runs
2454 // a per-row traversal `LogicalPlan` just to check whether the edge exists
2455 // (~19x the bulk CREATE of the same edges). For the bound-endpoints,
2456 // anonymous-edge shape (and no ON MATCH SET, whose match-row semantics the
2457 // general path materialises) we resolve existence with one MVCC-correct
2458 // adjacency probe — `GraphExecutionContext::get_neighbors` merges CSR + all
2459 // L0 buffers including the transaction's own writes, so intra-batch edges
2460 // are seen — and reuse the general create / ON CREATE handling unchanged.
2461 // An ON MATCH SET with actual items needs the general path's materialised
2462 // match rows; a plain MERGE carries an *empty* on_match, which the fast
2463 // path can serve (it emits the row directly, applying nothing on match).
2464 let on_match_empty = on_match.is_none_or(|s| s.items.is_empty());
2465 let rel_fast = if fastpath.is_none() && on_match_empty {
2466 self.merge_relationship_fastpath_shape(pattern)
2467 } else {
2468 None
2469 };
2470 let rel_graph_ctx = rel_fast.as_ref().map(|_| {
2471 let l0_context = match ctx {
2472 Some(c) => crate::query::df_graph::L0Context::from_query_context(c),
2473 None => crate::query::df_graph::L0Context::empty(),
2474 };
2475 let pm_arc = self.prop_manager_arc.clone().unwrap_or_else(|| {
2476 Arc::new(PropertyManager::new(
2477 self.storage.clone(),
2478 self.storage.schema_manager_arc(),
2479 prop_manager.cache_size(),
2480 ))
2481 });
2482 crate::query::df_graph::GraphExecutionContext::with_l0_context(
2483 self.effective_storage(),
2484 l0_context,
2485 pm_arc,
2486 )
2487 });
2488
2489 let mut results = Vec::new();
2490 for (idx, mut row) in rows.into_iter().enumerate() {
2491 // Rows with a pre-evaluated scalar key take the fast path; rows
2492 // with a non-scalar key fall through to the general path below.
2493 if let Some((node, label)) = &fastpath
2494 && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2495 {
2496 let writer: &uni_store::Writer = writer_lock.as_ref();
2497 let row_out = self
2498 .execute_merge_row_indexed(
2499 label,
2500 node,
2501 &path_pattern,
2502 &temp_vars,
2503 row,
2504 key_props,
2505 &fast_persisted,
2506 key_tuple,
2507 &mut fast_existing,
2508 on_match,
2509 on_create,
2510 prop_manager,
2511 params,
2512 ctx,
2513 tx_l0_override,
2514 writer,
2515 merge_prefetch.as_mut(),
2516 )
2517 .await?;
2518 results.extend(row_out);
2519 continue;
2520 }
2521
2522 // RC3 relationship fast path: bound endpoints → resolve edge
2523 // existence with one adjacency probe and reuse the general
2524 // create / ON CREATE handling, skipping the per-row traversal plan.
2525 if let (Some((src_var, dst_var, type_id, dir)), Some(graph_ctx)) =
2526 (rel_fast.as_ref(), rel_graph_ctx.as_ref())
2527 {
2528 let src_vid = row.get(src_var).and_then(|v| Self::vid_from_value(v).ok());
2529 let dst_vid = row.get(dst_var).and_then(|v| Self::vid_from_value(v).ok());
2530 if let (Some(src_vid), Some(dst_vid)) = (src_vid, dst_vid) {
2531 let exists = graph_ctx
2532 .get_neighbors(src_vid, *type_id, *dir)
2533 .into_iter()
2534 .any(|(n, _eid)| n == dst_vid);
2535 let writer: &uni_store::Writer = writer_lock.as_ref();
2536 if !exists {
2537 // Edge absent: create only the edge (endpoints are bound),
2538 // then apply ON CREATE SET — identical to the general
2539 // create branch below.
2540 let seed_props = self
2541 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2542 .await?;
2543 self.execute_create_pattern(
2544 &path_pattern,
2545 &mut row,
2546 writer,
2547 prop_manager,
2548 params,
2549 ctx,
2550 tx_l0_override,
2551 Some(&seed_props),
2552 )
2553 .await?;
2554 if let Some(set) = on_create {
2555 self.execute_set_items_locked(
2556 &set.items,
2557 &mut row,
2558 writer,
2559 prop_manager,
2560 params,
2561 ctx,
2562 tx_l0_override,
2563 &Prefetch::default(),
2564 )
2565 .await?;
2566 }
2567 }
2568 // Whether matched or just created, the edge now exists; bind
2569 // path variables and emit the row (the edge is anonymous, so
2570 // there is no edge binding to reproduce, and ON MATCH SET is
2571 // excluded from this fast path).
2572 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2573 results.push(row);
2574 continue;
2575 }
2576 // Endpoints not bound to vids → fall through to the general path.
2577 }
2578
2579 // General execution: match-or-create per row. (The index fast path
2580 // above already handles single-node, single-label, scalar-indexed
2581 // MERGE — including unique-constrained labels, whose keys are
2582 // indexed — so there is no separate constraint-only fast path.)
2583 let matches = self
2584 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2585 .await?;
2586 let writer: &uni_store::Writer = writer_lock.as_ref();
2587
2588 let result: Result<Vec<HashMap<String, Value>>> = async {
2589 let mut batch = Vec::new();
2590 if !matches.is_empty() {
2591 for mut m in matches {
2592 if let Some(set) = on_match {
2593 self.execute_set_items_locked(
2594 &set.items,
2595 &mut m,
2596 writer,
2597 prop_manager,
2598 params,
2599 ctx,
2600 tx_l0_override,
2601 &Prefetch::default(),
2602 )
2603 .await?;
2604 }
2605 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2606 batch.push(m);
2607 }
2608 } else {
2609 // Fold ON CREATE SET into seed props so a NOT-NULL property
2610 // set only by ON CREATE SET passes create-time validation
2611 // (RC4); the post-create SET below settles the final values.
2612 let seed_props = self
2613 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2614 .await?;
2615 self.execute_create_pattern(
2616 &path_pattern,
2617 &mut row,
2618 writer,
2619 prop_manager,
2620 params,
2621 ctx,
2622 tx_l0_override,
2623 Some(&seed_props),
2624 )
2625 .await?;
2626 if let Some(set) = on_create {
2627 self.execute_set_items_locked(
2628 &set.items,
2629 &mut row,
2630 writer,
2631 prop_manager,
2632 params,
2633 ctx,
2634 tx_l0_override,
2635 &Prefetch::default(),
2636 )
2637 .await?;
2638 }
2639 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2640 batch.push(row);
2641 }
2642 Ok(batch)
2643 }
2644 .await;
2645
2646 results.extend(result?);
2647 }
2648 Ok(results)
2649 }
2650
2651 /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2652 ///
2653 /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2654 /// only by `ON CREATE SET` is present when the MERGE node is created and
2655 /// passes constraint validation (RC4). The right-hand side is evaluated
2656 /// against the current `row`.
2657 ///
2658 /// Items whose right-hand side references the target variable (e.g.
2659 /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2660 /// let the post-create SET read the seeded value and apply the assignment
2661 /// twice. Such items run only post-create, exactly once (unchanged behavior).
2662 ///
2663 /// # Errors
2664 /// Returns an error if evaluating an assignment's right-hand side fails.
2665 pub(crate) async fn on_create_seed_props(
2666 &self,
2667 on_create: Option<&SetClause>,
2668 row: &HashMap<String, Value>,
2669 prop_manager: &PropertyManager,
2670 params: &HashMap<String, Value>,
2671 ctx: Option<&QueryContext>,
2672 ) -> Result<HashMap<String, HashMap<String, Value>>> {
2673 let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2674 let Some(set) = on_create else {
2675 return Ok(seed);
2676 };
2677 for item in &set.items {
2678 if let SetItem::Property { expr, value } = item
2679 && let Expr::Property(var_expr, prop_name) = expr
2680 && let Expr::Variable(var_name) = &**var_expr
2681 // Skip self-referential RHS so the post-create SET (which also
2682 // runs) applies it exactly once rather than reading the seed.
2683 && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2684 value, var_name,
2685 )
2686 {
2687 let val = self
2688 .evaluate_expr(value, row, prop_manager, params, ctx)
2689 .await?;
2690 seed.entry(var_name.clone())
2691 .or_default()
2692 .insert(prop_name.clone(), val);
2693 }
2694 }
2695 Ok(seed)
2696 }
2697
2698 /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2699 #[expect(clippy::too_many_arguments)]
2700 pub(crate) async fn execute_create_pattern(
2701 &self,
2702 pattern: &Pattern,
2703 row: &mut HashMap<String, Value>,
2704 writer: &Writer,
2705 prop_manager: &PropertyManager,
2706 params: &HashMap<String, Value>,
2707 ctx: Option<&QueryContext>,
2708 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2709 // Per-variable properties to gap-fill into newly-created nodes before
2710 // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2711 // NOT-NULL property supplied only by ON CREATE SET passes create-time
2712 // validation (RC4). `None` for plain CREATE.
2713 seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2714 ) -> Result<()> {
2715 for path in &pattern.paths {
2716 let mut prev_vid: Option<Vid> = None;
2717 // (rel_var, type_id, type_name, props_expr, direction)
2718 type PendingRel = (String, u32, String, Option<Expr>, Direction);
2719 let mut rel_pending: Option<PendingRel> = None;
2720
2721 for element in &path.elements {
2722 match element {
2723 PatternElement::Node(n) => {
2724 let mut vid = None;
2725
2726 // Check if node variable already bound in row
2727 if let Some(var) = &n.variable
2728 && let Some(val) = row.get(var)
2729 && let Ok(existing_vid) = Self::vid_from_value(val)
2730 {
2731 vid = Some(existing_vid);
2732 }
2733
2734 // If not bound, create it
2735 if vid.is_none() {
2736 let mut props = HashMap::new();
2737 if let Some(props_expr) = &n.properties {
2738 let props_val = self
2739 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2740 .await?;
2741 if let Value::Map(map) = props_val {
2742 for (k, v) in map {
2743 props.insert(k, v);
2744 }
2745 } else {
2746 return Err(anyhow!("Properties must evaluate to a map"));
2747 }
2748 }
2749
2750 // MERGE ON CREATE SET: gap-fill properties supplied
2751 // only by ON CREATE SET so a NOT-NULL property absent
2752 // from the merge key passes create-time validation
2753 // (RC4). `or_insert` keeps the merge-key/pattern props
2754 // authoritative; the post-create SET re-applies the
2755 // real values, so the final state is unchanged.
2756 if let Some(seed) = seed_props
2757 && let Some(var) = &n.variable
2758 && let Some(var_seed) = seed.get(var)
2759 {
2760 for (k, v) in var_seed {
2761 props.entry(k.clone()).or_insert_with(|| v.clone());
2762 }
2763 }
2764
2765 let schema = self.storage.schema_manager().schema();
2766
2767 // Strict schema: reject undeclared labels.
2768 if self.config.strict_schema {
2769 for label_name in &n.labels {
2770 if schema.get_label_case_insensitive(label_name).is_none() {
2771 return Err(anyhow!(
2772 "Label '{}' is not defined in the schema \
2773 (strict_schema is enabled). \
2774 Declare it with db.schema().label(...).apply() first.",
2775 label_name
2776 ));
2777 }
2778 }
2779 }
2780
2781 // VID generation is label-independent. Pull from the
2782 // per-tx reservoir if set (amortizes the global
2783 // IdAllocator mutex), else fall back to the direct
2784 // per-VID path.
2785 let new_vid = match &self.id_reservoir {
2786 Some(r) => r.next_vid().await?,
2787 None => writer.next_vid().await?,
2788 };
2789
2790 // Enrich with generated columns only for known labels
2791 for label_name in &n.labels {
2792 if schema.get_label_case_insensitive(label_name).is_some() {
2793 self.enrich_properties_with_generated_columns(
2794 label_name,
2795 &mut props,
2796 prop_manager,
2797 params,
2798 ctx,
2799 )
2800 .await?;
2801 }
2802 }
2803
2804 // Validate/coerce against declared types AFTER enrichment, so
2805 // a type mismatch is rejected here rather than silently nulled
2806 // (and the row dropped) at flush — issue #68.
2807 let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2808
2809 // Insert vertex and get back final properties (includes auto-generated embeddings)
2810 let final_props = writer
2811 .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2812 .await?;
2813
2814 // Build node object with final properties (includes embeddings)
2815 if let Some(var) = &n.variable {
2816 let mut obj = HashMap::new();
2817 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2818 let labels_list: Vec<Value> =
2819 n.labels.iter().map(|l| Value::String(l.clone())).collect();
2820 obj.insert("_labels".to_string(), Value::List(labels_list));
2821 for (k, v) in &final_props {
2822 obj.insert(k.clone(), v.clone());
2823 }
2824 // Store node as a Map with _vid, matching MATCH behavior
2825 row.insert(var.clone(), Value::Map(obj));
2826 }
2827 vid = Some(new_vid);
2828 }
2829
2830 let current_vid = vid.unwrap();
2831
2832 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2833 rel_pending.take()
2834 && let Some(src) = prev_vid
2835 {
2836 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2837
2838 if !is_rel_bound {
2839 let mut rel_props = HashMap::new();
2840 if let Some(expr) = rel_props_expr {
2841 let val = self
2842 .evaluate_expr(&expr, row, prop_manager, params, ctx)
2843 .await?;
2844 if let Value::Map(map) = val {
2845 rel_props.extend(map);
2846 }
2847 }
2848 // Validate/coerce edge properties against the declared
2849 // edge-type schema before storing — issue #68.
2850 let edge_schema = self.storage.schema_manager().schema();
2851 let rel_props = Self::coerce_and_validate_props(
2852 rel_props,
2853 &edge_schema,
2854 std::slice::from_ref(&type_name),
2855 )?;
2856 let eid = match &self.id_reservoir {
2857 Some(r) => r.next_eid().await?,
2858 None => writer.next_eid(type_id).await?,
2859 };
2860
2861 // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2862 let (edge_src, edge_dst) = match dir {
2863 Direction::Incoming => (current_vid, src),
2864 _ => (src, current_vid),
2865 };
2866
2867 let store_props = !rel_var.is_empty();
2868 let user_props = if store_props {
2869 rel_props.clone()
2870 } else {
2871 HashMap::new()
2872 };
2873
2874 writer
2875 .insert_edge(
2876 edge_src,
2877 edge_dst,
2878 type_id,
2879 eid,
2880 rel_props,
2881 Some(type_name.clone()),
2882 tx_l0,
2883 )
2884 .await?;
2885
2886 // Edge type name is now stored by insert_edge
2887
2888 if store_props {
2889 let mut edge_map = HashMap::new();
2890 edge_map.insert(
2891 "_eid".to_string(),
2892 Value::Int(eid.as_u64() as i64),
2893 );
2894 edge_map.insert(
2895 "_src".to_string(),
2896 Value::Int(edge_src.as_u64() as i64),
2897 );
2898 edge_map.insert(
2899 "_dst".to_string(),
2900 Value::Int(edge_dst.as_u64() as i64),
2901 );
2902 edge_map
2903 .insert("_type".to_string(), Value::Int(type_id as i64));
2904 // Include user properties so downstream RETURN sees them
2905 for (k, v) in user_props {
2906 edge_map.insert(k, v);
2907 }
2908 row.insert(rel_var, Value::Map(edge_map));
2909 }
2910 }
2911 }
2912 prev_vid = Some(current_vid);
2913 }
2914 PatternElement::Relationship(r) => {
2915 if r.types.len() != 1 {
2916 return Err(anyhow!(
2917 "CREATE relationship must specify exactly one type"
2918 ));
2919 }
2920 let type_name = &r.types[0];
2921 let type_id = if self.config.strict_schema {
2922 let schema = self.storage.schema_manager().schema();
2923 schema
2924 .edge_type_id_by_name_case_insensitive(type_name)
2925 .ok_or_else(|| {
2926 anyhow!(
2927 "Edge type '{}' is not defined in the schema \
2928 (strict_schema is enabled). \
2929 Declare it with db.schema().edge_type(...).apply() first.",
2930 type_name
2931 )
2932 })?
2933 } else {
2934 // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2935 self.storage
2936 .schema_manager()
2937 .get_or_assign_edge_type_id(type_name)
2938 };
2939
2940 rel_pending = Some((
2941 r.variable.clone().unwrap_or_default(),
2942 type_id,
2943 type_name.clone(),
2944 r.properties.clone(),
2945 r.direction.clone(),
2946 ));
2947 }
2948 PatternElement::Parenthesized { .. } => {
2949 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2950 }
2951 }
2952 }
2953 }
2954 Ok(())
2955 }
2956
2957 /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2958 ///
2959 /// These are never valid OpenCypher property values regardless of the declared column
2960 /// type. A `CypherValue` column is the sole exception and is handled by the caller
2961 /// before this is reached.
2962 ///
2963 /// # Errors
2964 /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2965 fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2966 match val {
2967 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2968 anyhow::bail!(
2969 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2970 prop_name
2971 );
2972 }
2973 Value::List(items) => {
2974 for item in items {
2975 if matches!(
2976 item,
2977 Value::Map(_)
2978 | Value::Node(_)
2979 | Value::Edge(_)
2980 | Value::Path(_)
2981 | Value::List(_)
2982 ) {
2983 anyhow::bail!(
2984 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2985 prop_name
2986 );
2987 }
2988 }
2989 }
2990 _ => {}
2991 }
2992 Ok(())
2993 }
2994
2995 /// Validates and coerces `val` against the declared schema type for `prop_name`.
2996 ///
2997 /// Returns the value to actually persist. Beyond the structural checks in
2998 /// [`Self::validate_structural_property_value`], this compares the value against the
2999 /// column's declared `DataType` and:
3000 ///
3001 /// - returns it unchanged when directly storable (including the intentional
3002 /// `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
3003 /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
3004 /// column into the proper `Temporal` value, using the same parser as the Cypher
3005 /// `date()`/`time()`/`datetime()`/`duration()` constructors;
3006 /// - otherwise returns an error, so a type mismatch is surfaced at the call site
3007 /// rather than silently nulled — and the row dropped at flush. See issue #68.
3008 ///
3009 /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
3010 /// behavior.
3011 ///
3012 /// # Errors
3013 /// Returns an error if the value's type is incompatible with the declared column type,
3014 /// or if a string destined for a temporal column is not a valid temporal literal.
3015 fn coerce_and_validate_property_value(
3016 prop_name: &str,
3017 val: Value,
3018 schema: &uni_common::core::schema::Schema,
3019 labels: &[String],
3020 ) -> Result<Value> {
3021 use uni_common::core::schema::DataType;
3022
3023 // Resolve the declared type from the first label that declares this property.
3024 let declared = labels.iter().find_map(|label| {
3025 schema
3026 .properties
3027 .get(label)
3028 .and_then(|props| props.get(prop_name))
3029 .map(|meta| &meta.r#type)
3030 });
3031
3032 // CypherValue columns accept any value (including maps) — skip all checks.
3033 if matches!(declared, Some(DataType::CypherValue)) {
3034 return Ok(val);
3035 }
3036
3037 let Some(dt) = declared else {
3038 // Schemaless property: reject structural values (maps/nodes/edges/paths and
3039 // lists containing them), otherwise store as-is.
3040 Self::validate_structural_property_value(prop_name, &val)?;
3041 return Ok(val);
3042 };
3043
3044 // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
3045 // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
3046 // `Vector`) receiving their matching value, and `Null` (always accepted).
3047 if dt.accepts(&val) {
3048 return Ok(val);
3049 }
3050
3051 // Known-safe coercion: a string into a temporal column is parsed as if it had
3052 // been wrapped in the matching Cypher temporal constructor.
3053 if matches!(val, Value::String(_)) {
3054 let ctor = match dt {
3055 DataType::DateTime => Some("DATETIME"),
3056 DataType::Date => Some("DATE"),
3057 DataType::Time => Some("TIME"),
3058 DataType::Duration => Some("DURATION"),
3059 _ => None,
3060 };
3061 if let Some(name) = ctor {
3062 return uni_query_functions::datetime::eval_datetime_function(
3063 name,
3064 std::slice::from_ref(&val),
3065 )
3066 .map_err(|e| {
3067 anyhow!(
3068 "TypeError: property '{}' is declared {:?} but the string value could \
3069 not be parsed as a {} literal: {}",
3070 prop_name,
3071 dt,
3072 name,
3073 e
3074 )
3075 });
3076 }
3077 }
3078
3079 // Not storable and not coercible. Prefer the structural message when the value
3080 // is itself structural (e.g. a map into a scalar column), preserving prior
3081 // behavior; otherwise report the scalar type mismatch.
3082 Self::validate_structural_property_value(prop_name, &val)?;
3083 anyhow::bail!(
3084 "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
3085 prop_name,
3086 dt,
3087 value_type_name(&val)
3088 );
3089 }
3090
3091 /// Coerces and validates every property in `props` against the declared types for `labels`.
3092 ///
3093 /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
3094 /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
3095 /// before handing properties to the writer, so a type mismatch is rejected up front rather
3096 /// than silently nulled — and the row dropped — at flush (issue #68).
3097 ///
3098 /// # Errors
3099 /// Returns an error on the first property whose value is incompatible with its declared type.
3100 fn coerce_and_validate_props(
3101 props: HashMap<String, Value>,
3102 schema: &uni_common::core::schema::Schema,
3103 labels: &[String],
3104 ) -> Result<HashMap<String, Value>> {
3105 let mut out = HashMap::with_capacity(props.len());
3106 for (k, v) in props {
3107 let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
3108 out.insert(k, cv);
3109 }
3110 Ok(out)
3111 }
3112
3113 #[expect(clippy::too_many_arguments)]
3114 pub(crate) async fn execute_set_items_locked(
3115 &self,
3116 items: &[SetItem],
3117 row: &mut HashMap<String, Value>,
3118 writer: &Writer,
3119 prop_manager: &PropertyManager,
3120 params: &HashMap<String, Value>,
3121 ctx: Option<&QueryContext>,
3122 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3123 prefetched: &Prefetch,
3124 ) -> Result<()> {
3125 // Coalesce SetItem::Property items by target so we do ONE read + ONE
3126 // write per (variable, target) instead of one read-modify-write cycle
3127 // per item. For an UPDATE that sets N properties on the same vertex
3128 // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
3129 // n.confidence = ...`), this collapses N redundant
3130 // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
3131 // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
3132 // the measurement, and the plan in
3133 // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
3134 // for the rationale.
3135 //
3136 // RHS evaluation order is preserved: we evaluate each RHS inline and
3137 // update the row binding immediately, so a later SetItem on the same
3138 // variable that reads `n.<earlier-prop>` sees the new value.
3139 //
3140 // Non-Property variants (Labels, Variable, VariablePlus) are less
3141 // common and have lower payoff; before processing one, we flush any
3142 // pending updates for the same variable so it sees the latest L0
3143 // state and ordering semantics are preserved.
3144 let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
3145 let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
3146
3147 for item in items {
3148 match item {
3149 SetItem::Property { expr, value } => {
3150 if let Expr::Property(var_expr, prop_name) = expr
3151 && let Expr::Variable(var_name) = &**var_expr
3152 && let Some(node_val) = row.get(var_name)
3153 {
3154 if let Ok(vid) = Self::vid_from_value(node_val) {
3155 reject_if_ephemeral_vid(vid)?;
3156 let labels =
3157 Self::extract_labels_from_node(node_val).unwrap_or_default();
3158 let schema = self.storage.schema_manager().schema().clone();
3159
3160 // Lazy one-time read. Always read the full row
3161 // (preserves CRDT merge + constraint validation
3162 // + scan-side L0 visibility). The
3163 // partial-lance-writes optimization happens
3164 // PURELY AT FLUSH TIME via the per-VID
3165 // `vertex_partial_keys` set tracked in L0 — so
3166 // L0 holds the full row, scans see the full
3167 // row, and Lance only receives the touched
3168 // columns. Generated-column-bearing labels
3169 // ride the partial path too (Round 12 §C):
3170 // `enrich_properties_with_generated_columns`
3171 // runs at flush time over the merged-in-L0
3172 // full row, and the produced generator keys
3173 // are appended to `touched` so they land in
3174 // the MergeInsert source.
3175 if !pending_v.contains_key(var_name) {
3176 let storage_cfg = &self.storage.config;
3177 let partial = storage_cfg.partial_lance_writes;
3178 let read = read_vertex_props_with_prefetch(
3179 vid,
3180 prefetched,
3181 prop_manager,
3182 ctx,
3183 )
3184 .await?;
3185 pending_v.insert(
3186 var_name.clone(),
3187 PendingVertexSet {
3188 vid,
3189 labels: labels.clone(),
3190 props: read,
3191 partial,
3192 touched: HashSet::new(),
3193 },
3194 );
3195 }
3196
3197 let val = self
3198 .evaluate_expr(value, row, prop_manager, params, ctx)
3199 .await?;
3200 let val = Self::coerce_and_validate_property_value(
3201 prop_name, val, &schema, &labels,
3202 )?;
3203
3204 let pv = pending_v
3205 .get_mut(var_name)
3206 .expect("inserted above when absent");
3207 pv.props.insert(prop_name.clone(), val.clone());
3208 if pv.partial {
3209 pv.touched.insert(prop_name.clone());
3210 }
3211
3212 // Update the row binding so subsequent RHS sees the new value.
3213 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3214 node_map.insert(prop_name.clone(), val);
3215 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
3216 node.properties.insert(prop_name.clone(), val);
3217 }
3218 } else if let Value::Map(map) = node_val
3219 && map.get("_eid").is_some_and(|v| !v.is_null())
3220 && map.get("_src").is_some_and(|v| !v.is_null())
3221 && map.get("_dst").is_some_and(|v| !v.is_null())
3222 && (map.get("_type").is_some_and(|v| !v.is_null())
3223 || map.get("_type_name").is_some_and(|v| !v.is_null()))
3224 {
3225 let ei = self.extract_edge_identity(map)?;
3226 reject_if_ephemeral_eid(ei.eid)?;
3227 let schema = self.storage.schema_manager().schema().clone();
3228 // Handle _type as either String or Int (Int from CREATE, String
3229 // from queries). UNWIND on VLP edge lists emits `_type_name`
3230 // instead of `_type`; accept either.
3231 let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3232 let edge_type_name = match type_val {
3233 Some(Value::String(s)) => s.clone(),
3234 Some(Value::Int(id)) => schema
3235 .edge_type_name_by_id_unified(*id as u32)
3236 .unwrap_or_else(|| format!("EdgeType{}", id)),
3237 _ => String::new(),
3238 };
3239
3240 if !pending_e.contains_key(var_name) {
3241 let initial = read_edge_props_with_prefetch(
3242 ei.eid,
3243 prefetched,
3244 prop_manager,
3245 ctx,
3246 )
3247 .await?;
3248 let partial = self.storage.config.partial_lance_writes;
3249 pending_e.insert(
3250 var_name.clone(),
3251 PendingEdgeSet {
3252 src: ei.src,
3253 dst: ei.dst,
3254 edge_type_id: ei.edge_type_id,
3255 eid: ei.eid,
3256 edge_type_name: edge_type_name.clone(),
3257 props: initial,
3258 partial,
3259 touched: HashSet::new(),
3260 },
3261 );
3262 }
3263
3264 let val = self
3265 .evaluate_expr(value, row, prop_manager, params, ctx)
3266 .await?;
3267 let val = Self::coerce_and_validate_property_value(
3268 prop_name,
3269 val,
3270 &schema,
3271 std::slice::from_ref(&edge_type_name),
3272 )?;
3273
3274 let pe = pending_e
3275 .get_mut(var_name)
3276 .expect("inserted above when absent");
3277 pe.props.insert(prop_name.clone(), val.clone());
3278 if pe.partial {
3279 pe.touched.insert(prop_name.clone());
3280 }
3281
3282 // Update the row object so subsequent RHS sees the new value.
3283 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3284 edge_map.insert(prop_name.clone(), val);
3285 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3286 edge.properties.insert(prop_name.clone(), val);
3287 }
3288 } else if let Value::Edge(edge) = node_val {
3289 // Handle Value::Edge directly (when traverse returns Edge objects).
3290 reject_if_ephemeral_eid(edge.eid)?;
3291 let eid = edge.eid;
3292 let src = edge.src;
3293 let dst = edge.dst;
3294 let edge_type_name = edge.edge_type.clone();
3295 let etype =
3296 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3297 let schema = self.storage.schema_manager().schema().clone();
3298
3299 if !pending_e.contains_key(var_name) {
3300 let initial = read_edge_props_with_prefetch(
3301 eid,
3302 prefetched,
3303 prop_manager,
3304 ctx,
3305 )
3306 .await?;
3307 let partial = self.storage.config.partial_lance_writes;
3308 pending_e.insert(
3309 var_name.clone(),
3310 PendingEdgeSet {
3311 src,
3312 dst,
3313 edge_type_id: etype,
3314 eid,
3315 edge_type_name: edge_type_name.clone(),
3316 props: initial,
3317 partial,
3318 touched: HashSet::new(),
3319 },
3320 );
3321 }
3322
3323 let val = self
3324 .evaluate_expr(value, row, prop_manager, params, ctx)
3325 .await?;
3326 let val = Self::coerce_and_validate_property_value(
3327 prop_name,
3328 val,
3329 &schema,
3330 std::slice::from_ref(&edge_type_name),
3331 )?;
3332
3333 let pe = pending_e
3334 .get_mut(var_name)
3335 .expect("inserted above when absent");
3336 pe.props.insert(prop_name.clone(), val.clone());
3337 if pe.partial {
3338 pe.touched.insert(prop_name.clone());
3339 }
3340
3341 // Update the row object so subsequent RHS sees the new value.
3342 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3343 edge.properties.insert(prop_name.clone(), val);
3344 }
3345 }
3346 }
3347 }
3348 SetItem::Labels { variable, labels } => {
3349 // Flush any pending writes for this var so the Labels op
3350 // sees latest L0 state. Other variables' pending writes
3351 // can keep waiting (they're independent).
3352 self.flush_pending_var(
3353 variable,
3354 &mut pending_v,
3355 &mut pending_e,
3356 writer,
3357 prop_manager,
3358 params,
3359 ctx,
3360 tx_l0,
3361 prefetched,
3362 )
3363 .await?;
3364
3365 if let Some(node_val) = row.get(variable)
3366 && let Ok(vid) = Self::vid_from_value(node_val)
3367 {
3368 reject_if_ephemeral_vid(vid)?;
3369 let registry = self
3370 .procedure_registry
3371 .as_ref()
3372 .and_then(|pr| pr.plugin_registry());
3373 reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3374
3375 // Get current labels from node value
3376 let current_labels =
3377 Self::extract_labels_from_node(node_val).unwrap_or_default();
3378
3379 // Determine new labels to add (skip duplicates)
3380 let labels_to_add: Vec<_> = labels
3381 .iter()
3382 .filter(|l| !current_labels.contains(l))
3383 .cloned()
3384 .collect();
3385
3386 if !labels_to_add.is_empty() {
3387 // Resolve the FULL new label set and write it to the
3388 // TRANSACTION buffer (so the change is transactional
3389 // and OCC-conflictable), falling back to the context
3390 // (main) L0 for non-transactional callers. Replace
3391 // semantics via `set_vertex_labels`.
3392 let mut new_labels = current_labels;
3393 new_labels.extend(labels_to_add);
3394 if let Some(ctx) = ctx {
3395 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3396 l0.write().set_vertex_labels(vid, &new_labels);
3397 }
3398
3399 // Update the node value in the row with the new labels.
3400 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3401 let labels_list =
3402 new_labels.into_iter().map(Value::String).collect();
3403 obj.insert("_labels".to_string(), Value::List(labels_list));
3404 }
3405 }
3406 }
3407 }
3408 SetItem::Variable { variable, value }
3409 | SetItem::VariablePlus { variable, value } => {
3410 // Flush this var's pending writes first so the
3411 // replace/merge op sees them as latest L0 state.
3412 self.flush_pending_var(
3413 variable,
3414 &mut pending_v,
3415 &mut pending_e,
3416 writer,
3417 prop_manager,
3418 params,
3419 ctx,
3420 tx_l0,
3421 prefetched,
3422 )
3423 .await?;
3424
3425 let replace = matches!(item, SetItem::Variable { .. });
3426 let op_str = if replace { "=" } else { "+=" };
3427
3428 // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3429 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3430 continue;
3431 }
3432 let rhs = self
3433 .evaluate_expr(value, row, prop_manager, params, ctx)
3434 .await?;
3435 let new_props =
3436 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3437 anyhow!(
3438 "SET {} {} expr: right-hand side must evaluate to a map, \
3439 node, or relationship",
3440 variable,
3441 op_str
3442 )
3443 })?;
3444 self.apply_properties_to_entity(
3445 variable,
3446 new_props,
3447 replace,
3448 row,
3449 writer,
3450 prop_manager,
3451 params,
3452 ctx,
3453 tx_l0,
3454 prefetched,
3455 )
3456 .await?;
3457 }
3458 }
3459 }
3460
3461 // Flush all remaining coalesced writes — one writer call per target.
3462 // Partial entries (no generated columns) call
3463 // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3464 // but the touched-keys hint drives a MergeInsert at flush. Full
3465 // entries continue through the legacy
3466 // `insert_vertex_with_labels` (Append) path with
3467 // generated-column enrichment.
3468 for (_var_name, mut pv) in pending_v {
3469 if pv.partial {
3470 // Round 12 §C: run the generator enrichment over the
3471 // merged-in-L0 full row, then add the produced generator
3472 // keys to `touched` so they ride the MergeInsert source.
3473 // Idempotent — generators always recompute against the
3474 // post-merge property map.
3475 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3476 for label_name in &pv.labels {
3477 self.enrich_properties_with_generated_columns(
3478 label_name,
3479 &mut pv.props,
3480 prop_manager,
3481 params,
3482 ctx,
3483 )
3484 .await?;
3485 }
3486 for k in pv.props.keys() {
3487 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3488 pv.touched.insert(k.clone());
3489 }
3490 }
3491 writer
3492 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3493 .await?;
3494 } else {
3495 for label_name in &pv.labels {
3496 self.enrich_properties_with_generated_columns(
3497 label_name,
3498 &mut pv.props,
3499 prop_manager,
3500 params,
3501 ctx,
3502 )
3503 .await?;
3504 }
3505 let _ = writer
3506 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3507 .await?;
3508 }
3509 }
3510 for (_var_name, pe) in pending_e {
3511 if pe.partial {
3512 writer
3513 .insert_edge_partial_full(
3514 pe.src,
3515 pe.dst,
3516 pe.edge_type_id,
3517 pe.eid,
3518 pe.props,
3519 Some(pe.edge_type_name),
3520 pe.touched,
3521 tx_l0,
3522 )
3523 .await?;
3524 } else {
3525 writer
3526 .insert_edge(
3527 pe.src,
3528 pe.dst,
3529 pe.edge_type_id,
3530 pe.eid,
3531 pe.props,
3532 Some(pe.edge_type_name),
3533 tx_l0,
3534 )
3535 .await?;
3536 }
3537 }
3538
3539 Ok(())
3540 }
3541
3542 /// Flush pending SET state for a single variable to the writer.
3543 ///
3544 /// Called from the SET loop when about to process a Labels /
3545 /// Variable / VariablePlus item on `var`, so the subsequent op
3546 /// sees latest L0 state and ordering is preserved.
3547 #[expect(clippy::too_many_arguments)]
3548 async fn flush_pending_var(
3549 &self,
3550 var: &str,
3551 pending_v: &mut HashMap<String, PendingVertexSet>,
3552 pending_e: &mut HashMap<String, PendingEdgeSet>,
3553 writer: &Writer,
3554 prop_manager: &PropertyManager,
3555 _params: &HashMap<String, Value>,
3556 ctx: Option<&QueryContext>,
3557 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3558 _prefetched: &Prefetch,
3559 ) -> Result<()> {
3560 if let Some(mut pv) = pending_v.remove(var) {
3561 if pv.partial {
3562 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3563 for label_name in &pv.labels {
3564 self.enrich_properties_with_generated_columns(
3565 label_name,
3566 &mut pv.props,
3567 prop_manager,
3568 _params,
3569 ctx,
3570 )
3571 .await?;
3572 }
3573 for k in pv.props.keys() {
3574 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3575 pv.touched.insert(k.clone());
3576 }
3577 }
3578 writer
3579 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3580 .await?;
3581 } else {
3582 for label_name in &pv.labels {
3583 self.enrich_properties_with_generated_columns(
3584 label_name,
3585 &mut pv.props,
3586 prop_manager,
3587 _params,
3588 ctx,
3589 )
3590 .await?;
3591 }
3592 let _ = writer
3593 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3594 .await?;
3595 }
3596 }
3597 if let Some(pe) = pending_e.remove(var) {
3598 if pe.partial {
3599 writer
3600 .insert_edge_partial_full(
3601 pe.src,
3602 pe.dst,
3603 pe.edge_type_id,
3604 pe.eid,
3605 pe.props,
3606 Some(pe.edge_type_name),
3607 pe.touched,
3608 tx_l0,
3609 )
3610 .await?;
3611 } else {
3612 writer
3613 .insert_edge(
3614 pe.src,
3615 pe.dst,
3616 pe.edge_type_id,
3617 pe.eid,
3618 pe.props,
3619 Some(pe.edge_type_name),
3620 tx_l0,
3621 )
3622 .await?;
3623 }
3624 }
3625 Ok(())
3626 }
3627
3628 /// Execute REMOVE clause items (property removal or label removal).
3629 ///
3630 /// Property removals are batched per variable to avoid stale reads: when
3631 /// multiple properties of the same entity are removed in one REMOVE clause,
3632 /// we read from storage once, null all specified properties, and write back
3633 /// once. This prevents the second removal from reading stale data that
3634 /// doesn't reflect the first removal's L0 write.
3635 #[expect(clippy::too_many_arguments)]
3636 pub(crate) async fn execute_remove_items_locked(
3637 &self,
3638 items: &[RemoveItem],
3639 row: &mut HashMap<String, Value>,
3640 writer: &Writer,
3641 prop_manager: &PropertyManager,
3642 ctx: Option<&QueryContext>,
3643 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3644 prefetched: &Prefetch,
3645 ) -> Result<()> {
3646 // Collect property names to remove, grouped by variable.
3647 // Use Vec<(String, Vec<String>)> to preserve insertion order.
3648 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3649
3650 for item in items {
3651 match item {
3652 RemoveItem::Property(expr) => {
3653 if let Expr::Property(var_expr, prop_name) = expr
3654 && let Expr::Variable(var_name) = &**var_expr
3655 {
3656 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3657 entry.1.push(prop_name.clone());
3658 } else {
3659 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3660 }
3661 }
3662 }
3663 RemoveItem::Labels { variable, labels } => {
3664 self.execute_remove_labels(variable, labels, row, ctx)?;
3665 }
3666 }
3667 }
3668
3669 // Execute batched property removals per variable.
3670 for (var_name, prop_names) in &prop_removals {
3671 let Some(node_val) = row.get(var_name) else {
3672 continue;
3673 };
3674
3675 if let Ok(vid) = Self::vid_from_value(node_val) {
3676 // Vertex property removal
3677 let mut props =
3678 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3679
3680 // Only write back if at least one property actually exists
3681 let removed_count = prop_names
3682 .iter()
3683 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3684 .count();
3685 let any_exist = removed_count > 0;
3686 if any_exist {
3687 writer.track_properties_removed(removed_count, tx_l0);
3688 for prop_name in prop_names {
3689 props.insert(prop_name.clone(), Value::Null);
3690 }
3691 }
3692 // Compute effective properties (post-removal) for _all_props
3693 let effective: HashMap<String, Value> = props
3694 .iter()
3695 .filter(|(_, v)| !v.is_null())
3696 .map(|(k, v)| (k.clone(), v.clone()))
3697 .collect();
3698 if any_exist {
3699 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3700 let _ = writer
3701 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3702 .await?;
3703 }
3704
3705 // Update the row map: set removed props to Null
3706 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3707 for prop_name in prop_names {
3708 node_map.insert(prop_name.clone(), Value::Null);
3709 }
3710 // Set _all_props to the complete effective property set
3711 node_map.insert("_all_props".to_string(), Value::Map(effective));
3712 }
3713 } else if let Value::Map(map) = node_val {
3714 // Edge property removal (map-encoded)
3715 // Check for non-null _eid to skip OPTIONAL MATCH null edges
3716 let mut edge_effective: Option<HashMap<String, Value>> = None;
3717 if map.get("_eid").is_some_and(|v| !v.is_null()) {
3718 let ei = self.extract_edge_identity(map)?;
3719 let mut props =
3720 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3721 .await?;
3722
3723 let removed_count = prop_names
3724 .iter()
3725 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3726 .count();
3727 let any_exist = removed_count > 0;
3728 if any_exist {
3729 writer.track_properties_removed(removed_count, tx_l0);
3730 for prop_name in prop_names {
3731 props.insert(prop_name.to_string(), Value::Null);
3732 }
3733 }
3734 // Compute effective properties (post-removal) for _all_props
3735 edge_effective = Some(
3736 props
3737 .iter()
3738 .filter(|(_, v)| !v.is_null())
3739 .map(|(k, v)| (k.clone(), v.clone()))
3740 .collect(),
3741 );
3742 if any_exist {
3743 let edge_type_name = map
3744 .get("_type")
3745 .and_then(|v| v.as_str())
3746 .map(|s| s.to_string())
3747 .or_else(|| {
3748 self.storage
3749 .schema_manager()
3750 .edge_type_name_by_id_unified(ei.edge_type_id)
3751 });
3752 writer
3753 .insert_edge(
3754 ei.src,
3755 ei.dst,
3756 ei.edge_type_id,
3757 ei.eid,
3758 props,
3759 edge_type_name,
3760 tx_l0,
3761 )
3762 .await?;
3763 }
3764 }
3765
3766 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3767 for prop_name in prop_names {
3768 edge_map.insert(prop_name.clone(), Value::Null);
3769 }
3770 if let Some(effective) = edge_effective {
3771 edge_map.insert("_all_props".to_string(), Value::Map(effective));
3772 }
3773 }
3774 } else if let Value::Edge(edge) = node_val {
3775 // Edge property removal (Value::Edge)
3776 let eid = edge.eid;
3777 let src = edge.src;
3778 let dst = edge.dst;
3779 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3780
3781 let mut props =
3782 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3783
3784 let removed_count = prop_names
3785 .iter()
3786 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3787 .count();
3788 if removed_count > 0 {
3789 writer.track_properties_removed(removed_count, tx_l0);
3790 for prop_name in prop_names {
3791 props.insert(prop_name.to_string(), Value::Null);
3792 }
3793 writer
3794 .insert_edge(
3795 src,
3796 dst,
3797 etype,
3798 eid,
3799 props,
3800 Some(edge.edge_type.clone()),
3801 tx_l0,
3802 )
3803 .await?;
3804 }
3805
3806 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3807 for prop_name in prop_names {
3808 edge.properties.insert(prop_name.to_string(), Value::Null);
3809 }
3810 }
3811 }
3812 }
3813
3814 Ok(())
3815 }
3816
3817 /// Execute label removal.
3818 pub(crate) fn execute_remove_labels(
3819 &self,
3820 variable: &str,
3821 labels: &[String],
3822 row: &mut HashMap<String, Value>,
3823 ctx: Option<&QueryContext>,
3824 ) -> Result<()> {
3825 if let Some(node_val) = row.get(variable)
3826 && let Ok(vid) = Self::vid_from_value(node_val)
3827 {
3828 reject_if_ephemeral_vid(vid)?;
3829 let registry = self
3830 .procedure_registry
3831 .as_ref()
3832 .and_then(|pr| pr.plugin_registry());
3833 reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3834
3835 // Get current labels from node value
3836 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3837
3838 // Determine which labels to actually remove (only those currently present)
3839 let labels_to_remove: Vec<_> = labels
3840 .iter()
3841 .filter(|l| current_labels.contains(l))
3842 .collect();
3843
3844 if !labels_to_remove.is_empty() {
3845 // Resolve the FULL remaining label set and write it to the
3846 // TRANSACTION buffer (transactional + OCC-conflictable), falling
3847 // back to the context (main) L0 for non-transactional callers.
3848 let remaining_labels: Vec<String> = current_labels
3849 .iter()
3850 .filter(|l| !labels_to_remove.contains(l))
3851 .cloned()
3852 .collect();
3853 if let Some(ctx) = ctx {
3854 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3855 l0.write().set_vertex_labels(vid, &remaining_labels);
3856 }
3857
3858 // Update the node value in the row with the remaining labels.
3859 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3860 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3861 obj.insert("_labels".to_string(), Value::List(labels_list));
3862 }
3863 }
3864 }
3865 Ok(())
3866 }
3867
3868 /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3869 /// by looking up the type from the L0 buffer's edge endpoints.
3870 fn resolve_edge_type_id_for_edge(
3871 &self,
3872 edge: &crate::types::Edge,
3873 writer: &Writer,
3874 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3875 ) -> Result<u32> {
3876 if !edge.edge_type.is_empty() {
3877 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3878 }
3879 // Edge type name is empty (e.g., from anonymous MATCH patterns).
3880 // Look up the edge type ID from the L0 buffer's edge endpoints.
3881 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3882 return Ok(etype);
3883 }
3884 Err(anyhow!(
3885 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3886 edge.eid
3887 ))
3888 }
3889
3890 /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3891 pub(crate) async fn execute_delete_item_locked(
3892 &self,
3893 val: &Value,
3894 detach: bool,
3895 writer: &Writer,
3896 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3897 ) -> Result<()> {
3898 match val {
3899 Value::Null => {
3900 // DELETE null is a no-op per OpenCypher spec
3901 }
3902 Value::Path(path) => {
3903 // Delete path edges first, then nodes
3904 for edge in &path.edges {
3905 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3906 writer
3907 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3908 .await?;
3909 }
3910 for node in &path.nodes {
3911 self.execute_delete_vertex(
3912 node.vid,
3913 detach,
3914 Some(node.labels.clone()),
3915 writer,
3916 tx_l0,
3917 )
3918 .await?;
3919 }
3920 }
3921 _ => {
3922 // Try Path reconstruction from Map first (Arrow loses Path type)
3923 if let Ok(path) = Path::try_from(val) {
3924 for edge in &path.edges {
3925 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3926 writer
3927 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3928 .await?;
3929 }
3930 for node in &path.nodes {
3931 self.execute_delete_vertex(
3932 node.vid,
3933 detach,
3934 Some(node.labels.clone()),
3935 writer,
3936 tx_l0,
3937 )
3938 .await?;
3939 }
3940 } else if let Ok(vid) = Self::vid_from_value(val) {
3941 let labels = Self::extract_labels_from_node(val);
3942 self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3943 .await?;
3944 } else if let Value::Map(map) = val {
3945 self.execute_delete_edge_from_map(map, writer, tx_l0)
3946 .await?;
3947 } else if let Value::Edge(edge) = val {
3948 reject_if_ephemeral_eid(edge.eid)?;
3949 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3950 let registry = self
3951 .procedure_registry
3952 .as_ref()
3953 .and_then(|pr| pr.plugin_registry());
3954 reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3955 writer
3956 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3957 .await?;
3958 }
3959 }
3960 }
3961 Ok(())
3962 }
3963
3964 /// Execute vertex deletion with optional detach.
3965 pub(crate) async fn execute_delete_vertex(
3966 &self,
3967 vid: Vid,
3968 detach: bool,
3969 labels: Option<Vec<String>>,
3970 writer: &Writer,
3971 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3972 ) -> Result<()> {
3973 reject_if_ephemeral_vid(vid)?;
3974 if let Some(ls) = labels.as_deref() {
3975 let registry = self
3976 .procedure_registry
3977 .as_ref()
3978 .and_then(|pr| pr.plugin_registry());
3979 reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3980 }
3981 if detach {
3982 self.detach_delete_vertex(vid, writer, tx_l0).await?;
3983 } else {
3984 self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3985 }
3986 writer.delete_vertex(vid, labels, tx_l0).await?;
3987 Ok(())
3988 }
3989
3990 /// Check that a vertex has no edges (required for non-DETACH DELETE).
3991 ///
3992 /// Loads the subgraph from storage, then excludes edges that have been
3993 /// tombstoned in the writer's L0 or the transaction's L0. This ensures
3994 /// edges deleted earlier in the same DELETE clause are properly excluded.
3995 pub(crate) async fn check_vertex_has_no_edges(
3996 &self,
3997 vid: Vid,
3998 writer: &Writer,
3999 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4000 ) -> Result<()> {
4001 let schema = self.storage.schema_manager().schema();
4002 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
4003
4004 // Collect tombstoned edge IDs from both the writer L0 and tx L0.
4005 let mut tombstoned_eids = std::collections::HashSet::new();
4006 {
4007 let writer_l0 = writer.l0_manager.get_current();
4008 let guard = writer_l0.read();
4009 for &eid in guard.tombstones.keys() {
4010 tombstoned_eids.insert(eid);
4011 }
4012 }
4013 if let Some(tx) = tx_l0 {
4014 let guard = tx.read();
4015 for &eid in guard.tombstones.keys() {
4016 tombstoned_eids.insert(eid);
4017 }
4018 }
4019
4020 let out_graph = self
4021 .storage
4022 .load_subgraph_cached(
4023 &[vid],
4024 &edge_type_ids,
4025 1,
4026 uni_store::runtime::Direction::Outgoing,
4027 Some(writer.l0_manager.get_current()),
4028 )
4029 .await?;
4030 let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4031
4032 let in_graph = self
4033 .storage
4034 .load_subgraph_cached(
4035 &[vid],
4036 &edge_type_ids,
4037 1,
4038 uni_store::runtime::Direction::Incoming,
4039 Some(writer.l0_manager.get_current()),
4040 )
4041 .await?;
4042 let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4043
4044 if has_out || has_in {
4045 return Err(anyhow!(
4046 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
4047 vid
4048 ));
4049 }
4050 Ok(())
4051 }
4052
4053 /// Execute edge deletion from a map representation.
4054 pub(crate) async fn execute_delete_edge_from_map(
4055 &self,
4056 map: &HashMap<String, Value>,
4057 writer: &Writer,
4058 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4059 ) -> Result<()> {
4060 // Check for non-null _eid to skip OPTIONAL MATCH null edges
4061 if map.get("_eid").is_some_and(|v| !v.is_null()) {
4062 let ei = self.extract_edge_identity(map)?;
4063 reject_if_ephemeral_eid(ei.eid)?;
4064 let registry = self
4065 .procedure_registry
4066 .as_ref()
4067 .and_then(|pr| pr.plugin_registry());
4068 reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
4069 writer
4070 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
4071 .await?;
4072 }
4073 Ok(())
4074 }
4075
4076 /// Build a scan plan node.
4077 ///
4078 /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
4079 /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
4080 /// - `label_id == 0` without labels: unlabeled → `ScanAll`
4081 fn make_scan_plan(
4082 label_id: u16,
4083 labels: Vec<String>,
4084 variable: String,
4085 filter: Option<Expr>,
4086 ) -> LogicalPlan {
4087 if label_id > 0 {
4088 LogicalPlan::Scan {
4089 label_id,
4090 labels,
4091 variable,
4092 filter,
4093 optional: false,
4094 }
4095 } else if !labels.is_empty() {
4096 // Schemaless label: use ScanMainByLabels to filter by label name
4097 LogicalPlan::ScanMainByLabels {
4098 labels,
4099 variable,
4100 filter,
4101 optional: false,
4102 }
4103 } else {
4104 LogicalPlan::ScanAll {
4105 variable,
4106 filter,
4107 optional: false,
4108 }
4109 }
4110 }
4111
4112 /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
4113 /// already contains prior operators.
4114 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
4115 if matches!(plan, LogicalPlan::Empty) {
4116 scan
4117 } else {
4118 LogicalPlan::CrossJoin {
4119 left: Box::new(plan),
4120 right: Box::new(scan),
4121 }
4122 }
4123 }
4124
4125 /// Resolve MERGE property map expressions against the current row context.
4126 ///
4127 /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
4128 /// property expressions that reference bound variables. These need to be
4129 /// evaluated to concrete literal values before being converted to filter
4130 /// expressions by `properties_to_expr()`.
4131 async fn resolve_merge_properties(
4132 &self,
4133 properties: &Option<Expr>,
4134 row: &HashMap<String, Value>,
4135 prop_manager: &PropertyManager,
4136 params: &HashMap<String, Value>,
4137 ctx: Option<&QueryContext>,
4138 ) -> Result<Option<Expr>> {
4139 let entries = match properties {
4140 Some(Expr::Map(entries)) => entries,
4141 other => return Ok(other.clone()),
4142 };
4143 let mut resolved = Vec::new();
4144 for (key, val_expr) in entries {
4145 if matches!(val_expr, Expr::Literal(_)) {
4146 resolved.push((key.clone(), val_expr.clone()));
4147 } else {
4148 let value = self
4149 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
4150 .await?;
4151 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
4152 }
4153 }
4154 Ok(Some(Expr::Map(resolved)))
4155 }
4156
4157 /// Convert a runtime Value back to an AST literal expression.
4158 fn value_to_literal_expr(value: &Value) -> Expr {
4159 match value {
4160 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
4161 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
4162 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
4163 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
4164 Value::Null => Expr::Literal(CypherLiteral::Null),
4165 Value::List(items) => {
4166 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
4167 }
4168 Value::Map(entries) => Expr::Map(
4169 entries
4170 .iter()
4171 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
4172 .collect(),
4173 ),
4174 _ => Expr::Literal(CypherLiteral::Null),
4175 }
4176 }
4177
4178 pub(crate) async fn execute_merge_match(
4179 &self,
4180 pattern: &Pattern,
4181 row: &HashMap<String, Value>,
4182 prop_manager: &PropertyManager,
4183 params: &HashMap<String, Value>,
4184 ctx: Option<&QueryContext>,
4185 ) -> Result<Vec<HashMap<String, Value>>> {
4186 // Construct a LogicalPlan for the MATCH part of MERGE
4187 let planner =
4188 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
4189
4190 // We need to construct a CypherQuery to use the planner's plan() method,
4191 // or we can manually construct the LogicalPlan.
4192 // Manual construction is safer as we don't have to round-trip through AST.
4193
4194 let mut plan = LogicalPlan::Empty;
4195 let mut vars_in_scope = Vec::new();
4196
4197 // Add existing bound variables from row to scope
4198 for key in row.keys() {
4199 vars_in_scope.push(key.clone());
4200 }
4201
4202 // Reconstruct Match logic from Planner (simplified for MERGE pattern)
4203 for path in &pattern.paths {
4204 let elements = &path.elements;
4205 let mut i = 0;
4206 while i < elements.len() {
4207 let part = &elements[i];
4208 match part {
4209 PatternElement::Node(n) => {
4210 let variable = n.variable.clone().unwrap_or_default();
4211
4212 // If variable is already bound in the input row, we filter
4213 let is_bound = !variable.is_empty() && row.contains_key(&variable);
4214
4215 if is_bound {
4216 // If bound, we must Scan this specific VID to start the chain
4217 // Extract VID from row
4218 let val = row.get(&variable).unwrap();
4219 let vid = Self::vid_from_value(val)?;
4220
4221 // In the new storage model, VIDs don't embed label info.
4222 // We get label from the node value if available, otherwise use 0 to scan all.
4223 let extracted_labels =
4224 Self::extract_labels_from_node(val).unwrap_or_default();
4225 let label_id = {
4226 let schema = self.storage.schema_manager().schema();
4227 extracted_labels
4228 .first()
4229 .and_then(|l| schema.label_id_by_name(l))
4230 .unwrap_or(0)
4231 };
4232
4233 let resolved_props = self
4234 .resolve_merge_properties(
4235 &n.properties,
4236 row,
4237 prop_manager,
4238 params,
4239 ctx,
4240 )
4241 .await?;
4242 let prop_filter =
4243 planner.properties_to_expr(&variable, &resolved_props);
4244
4245 // Create a filter expression for VID: variable._vid = vid
4246 // But our expression engine handles `Expr::Variable` as column.
4247 // We can inject a filter `id(variable) = vid` if we had `id()` function.
4248 // Or we use internal property `_vid`.
4249
4250 // Note: Scan supports `filter`.
4251 // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4252
4253 let vid_filter = Expr::BinaryOp {
4254 left: Box::new(Expr::Property(
4255 Box::new(Expr::Variable(variable.clone())),
4256 "_vid".to_string(),
4257 )),
4258 op: BinaryOp::Eq,
4259 right: Box::new(Expr::Literal(CypherLiteral::Integer(
4260 vid.as_u64() as i64,
4261 ))),
4262 };
4263
4264 let combined_filter = if let Some(pf) = prop_filter {
4265 Some(Expr::BinaryOp {
4266 left: Box::new(vid_filter),
4267 op: BinaryOp::And,
4268 right: Box::new(pf),
4269 })
4270 } else {
4271 Some(vid_filter)
4272 };
4273
4274 let scan = Self::make_scan_plan(
4275 label_id,
4276 extracted_labels,
4277 variable.clone(),
4278 combined_filter,
4279 );
4280 plan = Self::attach_scan(plan, scan);
4281 } else {
4282 let label_id = if n.labels.is_empty() {
4283 // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4284 0
4285 } else {
4286 let label_name = &n.labels[0];
4287 let schema = self.storage.schema_manager().schema();
4288 if self.config.strict_schema {
4289 schema
4290 .get_label_case_insensitive(label_name)
4291 .map(|m| m.id)
4292 .ok_or_else(|| {
4293 anyhow!(
4294 "Label '{}' is not defined in the schema \
4295 (strict_schema is enabled). \
4296 Declare it with db.schema().label(...).apply() first.",
4297 label_name
4298 )
4299 })?
4300 } else {
4301 // Fall back to label_id 0 (any/schemaless) when not in schema.
4302 schema
4303 .get_label_case_insensitive(label_name)
4304 .map(|m| m.id)
4305 .unwrap_or(0)
4306 }
4307 };
4308
4309 let resolved_props = self
4310 .resolve_merge_properties(
4311 &n.properties,
4312 row,
4313 prop_manager,
4314 params,
4315 ctx,
4316 )
4317 .await?;
4318 let prop_filter =
4319 planner.properties_to_expr(&variable, &resolved_props);
4320 let scan = Self::make_scan_plan(
4321 label_id,
4322 n.labels.names().to_vec(),
4323 variable.clone(),
4324 prop_filter,
4325 );
4326 plan = Self::attach_scan(plan, scan);
4327
4328 // Add label filters when:
4329 // 1. Multiple labels with a known schema label: filter for
4330 // additional labels (Scan only scans by the first label).
4331 // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4332 // nodes, so we must filter to only those with the
4333 // specified label(s).
4334 if !n.labels.is_empty()
4335 && !variable.is_empty()
4336 && (label_id == 0 || n.labels.len() > 1)
4337 && let Some(label_filter) =
4338 planner.node_filter_expr(&variable, &n.labels, &None)
4339 {
4340 plan = LogicalPlan::Filter {
4341 input: Box::new(plan),
4342 predicate: label_filter,
4343 optional_variables: std::collections::HashSet::new(),
4344 };
4345 }
4346
4347 if !variable.is_empty() {
4348 vars_in_scope.push(variable.clone());
4349 }
4350 }
4351
4352 // Now look ahead for relationship
4353 i += 1;
4354 while i < elements.len() {
4355 if let PatternElement::Relationship(r) = &elements[i] {
4356 let target_node_part = &elements[i + 1];
4357 if let PatternElement::Node(n_target) = target_node_part {
4358 let schema = self.storage.schema_manager().schema();
4359 let mut edge_type_ids = Vec::new();
4360
4361 if r.types.is_empty() {
4362 return Err(anyhow!("MERGE edge must have a type"));
4363 } else if r.types.len() > 1 {
4364 return Err(anyhow!(
4365 "MERGE does not support multiple edge types"
4366 ));
4367 } else {
4368 let type_name = &r.types[0];
4369 let type_id = if self.config.strict_schema {
4370 let s = self.storage.schema_manager().schema();
4371 s.edge_type_id_by_name_case_insensitive(type_name)
4372 .ok_or_else(|| {
4373 anyhow!(
4374 "Edge type '{}' is not defined in the schema \
4375 (strict_schema is enabled).",
4376 type_name
4377 )
4378 })?
4379 } else {
4380 // Schemaless: assign new ID if not found.
4381 self.storage
4382 .schema_manager()
4383 .get_or_assign_edge_type_id(type_name)
4384 };
4385 edge_type_ids.push(type_id);
4386 }
4387
4388 // Resolve target label ID. For schemaless labels (not in the
4389 // schema), fall back to 0 which means "any label" in traversal.
4390 let target_label_id: u16 = if let Some(lbl) =
4391 n_target.labels.first()
4392 {
4393 schema
4394 .get_label_case_insensitive(lbl)
4395 .map(|m| m.id)
4396 .unwrap_or(0)
4397 } else if let Some(var) = &n_target.variable {
4398 if let Some(val) = row.get(var) {
4399 // In the new storage model, get labels from node value
4400 if let Some(labels) =
4401 Self::extract_labels_from_node(val)
4402 {
4403 if let Some(first_label) = labels.first() {
4404 schema
4405 .get_label_case_insensitive(first_label)
4406 .map(|m| m.id)
4407 .unwrap_or(0)
4408 } else {
4409 // Bound node with no labels — schemaless, any
4410 0
4411 }
4412 } else if Self::vid_from_value(val).is_ok() {
4413 // VID without label info — schemaless, any
4414 0
4415 } else {
4416 return Err(anyhow!(
4417 "Variable {} is not a node",
4418 var
4419 ));
4420 }
4421 } else {
4422 return Err(anyhow!(
4423 "MERGE pattern node must have a label or be a bound variable"
4424 ));
4425 }
4426 } else {
4427 return Err(anyhow!(
4428 "MERGE pattern node must have a label"
4429 ));
4430 };
4431
4432 let target_variable =
4433 n_target.variable.clone().unwrap_or_default();
4434 let source_variable = match &elements[i - 1] {
4435 PatternElement::Node(n) => {
4436 n.variable.clone().unwrap_or_default()
4437 }
4438 _ => String::new(),
4439 };
4440
4441 let is_variable_length = r.range.is_some();
4442 let type_name = &r.types[0];
4443
4444 // Use TraverseMainByType for schemaless edge types
4445 // (same as MATCH planner) so edge properties are loaded
4446 // correctly from storage + L0 via the adjacency map.
4447 // Regular Traverse only loads properties via
4448 // property_manager which doesn't handle schemaless types.
4449 let is_schemaless = edge_type_ids.iter().all(|id| {
4450 uni_common::core::edge_type::is_schemaless_edge_type(*id)
4451 });
4452
4453 if is_schemaless {
4454 plan = LogicalPlan::TraverseMainByType {
4455 type_names: vec![type_name.clone()],
4456 input: Box::new(plan),
4457 direction: r.direction.clone(),
4458 source_variable,
4459 target_variable: target_variable.clone(),
4460 step_variable: r.variable.clone(),
4461 min_hops: r
4462 .range
4463 .as_ref()
4464 .and_then(|r| r.min)
4465 .unwrap_or(1)
4466 as usize,
4467 max_hops: r
4468 .range
4469 .as_ref()
4470 .and_then(|r| r.max)
4471 .unwrap_or(1)
4472 as usize,
4473 optional: false,
4474 target_filter: None,
4475 path_variable: None,
4476 is_variable_length,
4477 optional_pattern_vars: std::collections::HashSet::new(),
4478 scope_match_variables: std::collections::HashSet::new(),
4479 edge_filter_expr: None,
4480 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4481 };
4482 } else {
4483 // Collect edge property names needed for MERGE filter
4484 let mut edge_props = std::collections::HashSet::new();
4485 if let Some(Expr::Map(entries)) = &r.properties {
4486 for (key, _) in entries {
4487 edge_props.insert(key.clone());
4488 }
4489 }
4490 plan = LogicalPlan::Traverse {
4491 input: Box::new(plan),
4492 edge_type_ids: edge_type_ids.clone(),
4493 direction: r.direction.clone(),
4494 source_variable,
4495 target_variable: target_variable.clone(),
4496 target_label_id,
4497 step_variable: r.variable.clone(),
4498 min_hops: r
4499 .range
4500 .as_ref()
4501 .and_then(|r| r.min)
4502 .unwrap_or(1)
4503 as usize,
4504 max_hops: r
4505 .range
4506 .as_ref()
4507 .and_then(|r| r.max)
4508 .unwrap_or(1)
4509 as usize,
4510 optional: false,
4511 target_filter: None,
4512 path_variable: None,
4513 edge_properties: edge_props,
4514 is_variable_length,
4515 optional_pattern_vars: std::collections::HashSet::new(),
4516 scope_match_variables: std::collections::HashSet::new(),
4517 edge_filter_expr: None,
4518 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4519 qpp_steps: None,
4520 };
4521 }
4522
4523 // Apply property filters for relationship
4524 if r.properties.is_some()
4525 && let Some(r_var) = &r.variable
4526 {
4527 let resolved_rel_props = self
4528 .resolve_merge_properties(
4529 &r.properties,
4530 row,
4531 prop_manager,
4532 params,
4533 ctx,
4534 )
4535 .await?;
4536 if let Some(prop_filter) =
4537 planner.properties_to_expr(r_var, &resolved_rel_props)
4538 {
4539 plan = LogicalPlan::Filter {
4540 input: Box::new(plan),
4541 predicate: prop_filter,
4542 optional_variables: std::collections::HashSet::new(
4543 ),
4544 };
4545 }
4546 }
4547
4548 // Apply property filters for target node if it was new
4549 if !target_variable.is_empty() {
4550 let resolved_target_props = self
4551 .resolve_merge_properties(
4552 &n_target.properties,
4553 row,
4554 prop_manager,
4555 params,
4556 ctx,
4557 )
4558 .await?;
4559 if let Some(prop_filter) = planner.properties_to_expr(
4560 &target_variable,
4561 &resolved_target_props,
4562 ) {
4563 plan = LogicalPlan::Filter {
4564 input: Box::new(plan),
4565 predicate: prop_filter,
4566 optional_variables: std::collections::HashSet::new(
4567 ),
4568 };
4569 }
4570 vars_in_scope.push(target_variable.clone());
4571 }
4572
4573 if let Some(sv) = &r.variable {
4574 vars_in_scope.push(sv.clone());
4575 }
4576 i += 2;
4577 } else {
4578 break;
4579 }
4580 } else {
4581 break;
4582 }
4583 }
4584 }
4585 _ => return Err(anyhow!("Pattern must start with a node")),
4586 }
4587 }
4588
4589 // Execute the plan to find all matches, then filter against bound variables in `row`.
4590 }
4591
4592 let db_matches = self
4593 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4594 .await?;
4595
4596 // Keep only DB results that are consistent with the input row bindings.
4597 // Skip internal keys (starting with "__") as they are implementation
4598 // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4599 // Also skip the empty-string key (""), which is the placeholder variable
4600 // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4601 // and must not constrain the current pattern's match.
4602 let final_matches = db_matches
4603 .into_iter()
4604 .filter(|db_match| {
4605 row.iter().all(|(key, val)| {
4606 if key.is_empty() || key.starts_with("__") {
4607 return true;
4608 }
4609 let Some(db_val) = db_match.get(key) else {
4610 return true;
4611 };
4612 if db_val == val {
4613 return true;
4614 }
4615 // Values differ -- treat as consistent if they represent the same VID
4616 matches!(
4617 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4618 (Ok(v1), Ok(v2)) if v1 == v2
4619 )
4620 })
4621 })
4622 .map(|db_match| {
4623 let mut merged = row.clone();
4624 merged.extend(db_match);
4625 merged
4626 })
4627 .collect();
4628
4629 Ok(final_matches)
4630 }
4631
4632 /// Prepare a MERGE pattern for path variable binding.
4633 ///
4634 /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4635 /// unnamed relationships need internal variable names so that `execute_create_pattern`
4636 /// stores the edge data in the row for later path construction.
4637 ///
4638 /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4639 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4640 let has_path_vars = pattern
4641 .paths
4642 .iter()
4643 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4644
4645 if !has_path_vars {
4646 return (pattern.clone(), Vec::new());
4647 }
4648
4649 let mut modified = pattern.clone();
4650 let mut temp_vars = Vec::new();
4651
4652 for path in &mut modified.paths {
4653 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4654 continue;
4655 }
4656 for (idx, element) in path.elements.iter_mut().enumerate() {
4657 if let PatternElement::Relationship(r) = element
4658 && r.variable.as_ref().is_none_or(String::is_empty)
4659 {
4660 let temp_var = format!("__path_r_{}", idx);
4661 r.variable = Some(temp_var.clone());
4662 temp_vars.push(temp_var);
4663 }
4664 }
4665 }
4666
4667 (modified, temp_vars)
4668 }
4669
4670 /// Bind path variables in the result row based on the MERGE pattern.
4671 ///
4672 /// Walks each path in the pattern, collects node/edge values from the row
4673 /// by variable name, and constructs a `Value::Path`.
4674 fn bind_path_variables(
4675 pattern: &Pattern,
4676 row: &mut HashMap<String, Value>,
4677 temp_vars: &[String],
4678 ) {
4679 for path in &pattern.paths {
4680 let Some(path_var) = path.variable.as_ref() else {
4681 continue;
4682 };
4683 if path_var.is_empty() {
4684 continue;
4685 }
4686
4687 let mut nodes = Vec::new();
4688 let mut edges = Vec::new();
4689
4690 for element in &path.elements {
4691 match element {
4692 PatternElement::Node(n) => {
4693 if let Some(var) = &n.variable
4694 && let Some(val) = row.get(var)
4695 && let Some(node) = Self::value_to_node_for_path(val)
4696 {
4697 nodes.push(node);
4698 }
4699 }
4700 PatternElement::Relationship(r) => {
4701 if let Some(var) = &r.variable
4702 && let Some(val) = row.get(var)
4703 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4704 {
4705 edges.push(edge);
4706 }
4707 }
4708 _ => {}
4709 }
4710 }
4711
4712 if !nodes.is_empty() {
4713 use uni_common::value::Path;
4714 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4715 }
4716 }
4717
4718 // Clean up internal temp variables
4719 for var in temp_vars {
4720 row.remove(var);
4721 }
4722 }
4723
4724 /// Convert a Value (Map or Node) to a Node for path construction.
4725 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4726 match val {
4727 Value::Node(n) => Some(n.clone()),
4728 Value::Map(map) => {
4729 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4730 let labels = if let Some(Value::List(l)) = map.get("_labels") {
4731 l.iter()
4732 .filter_map(|v| {
4733 if let Value::String(s) = v {
4734 Some(s.clone())
4735 } else {
4736 None
4737 }
4738 })
4739 .collect()
4740 } else {
4741 vec![]
4742 };
4743 let properties: HashMap<String, Value> = map
4744 .iter()
4745 .filter(|(k, _)| !k.starts_with('_'))
4746 .map(|(k, v)| (k.clone(), v.clone()))
4747 .collect();
4748 Some(uni_common::value::Node {
4749 vid,
4750 labels,
4751 properties,
4752 })
4753 }
4754 _ => None,
4755 }
4756 }
4757
4758 /// Convert a Value (Map or Edge) to an Edge for path construction.
4759 fn value_to_edge_for_path(
4760 val: &Value,
4761 type_names: &[String],
4762 ) -> Option<uni_common::value::Edge> {
4763 match val {
4764 Value::Edge(e) => Some(e.clone()),
4765 Value::Map(map) => {
4766 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4767 let edge_type = map
4768 .get("_type_name")
4769 .and_then(|v| {
4770 if let Value::String(s) = v {
4771 Some(s.clone())
4772 } else {
4773 None
4774 }
4775 })
4776 .or_else(|| type_names.first().cloned())
4777 .unwrap_or_default();
4778 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4779 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4780 let properties: HashMap<String, Value> = map
4781 .iter()
4782 .filter(|(k, _)| !k.starts_with('_'))
4783 .map(|(k, v)| (k.clone(), v.clone()))
4784 .collect();
4785 Some(uni_common::value::Edge {
4786 eid,
4787 edge_type,
4788 src,
4789 dst,
4790 properties,
4791 })
4792 }
4793 _ => None,
4794 }
4795 }
4796}
4797
4798/// Read a vertex's full property map, preferring `prefetched` over a fresh
4799/// per-row `Backend::scan`.
4800///
4801/// `prefetched` is built once at the top of `apply_mutations` via
4802/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4803/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4804/// same `apply_mutations` invocation (counter increments, same-VID
4805/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4806/// storage state at SET entry. On a miss, fall back to the existing
4807/// per-row path; this preserves correctness for newly created VIDs,
4808/// schemaless rows, multi-label corner cases, and non-Mutation callers
4809/// that pass `&Prefetch::default()`.
4810pub(crate) async fn read_vertex_props_with_prefetch(
4811 vid: Vid,
4812 prefetched: &Prefetch,
4813 prop_manager: &PropertyManager,
4814 ctx: Option<&QueryContext>,
4815) -> Result<uni_common::Properties> {
4816 match prefetched.vertex.get(&vid).cloned() {
4817 Some(mut base) => {
4818 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4819 for (k, v) in l0 {
4820 base.insert(k, v);
4821 }
4822 }
4823 Ok(base)
4824 }
4825 None => Ok(prop_manager
4826 .get_all_vertex_props_with_ctx(vid, ctx)
4827 .await?
4828 .unwrap_or_default()),
4829 }
4830}
4831
4832/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4833/// in L0 edge props so writes from earlier rows of the same
4834/// `apply_mutations` invocation take precedence. On a miss, fall back to
4835/// the per-EID storage path.
4836pub(crate) async fn read_edge_props_with_prefetch(
4837 eid: Eid,
4838 prefetched: &Prefetch,
4839 prop_manager: &PropertyManager,
4840 ctx: Option<&QueryContext>,
4841) -> Result<uni_common::Properties> {
4842 match prefetched.edge.get(&eid).cloned() {
4843 Some(mut base) => {
4844 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4845 for (k, v) in l0 {
4846 base.insert(k, v);
4847 }
4848 }
4849 Ok(base)
4850 }
4851 None => Ok(prop_manager
4852 .get_all_edge_props_with_ctx(eid, ctx)
4853 .await?
4854 .unwrap_or_default()),
4855 }
4856}
4857
4858#[cfg(test)]
4859mod tests {
4860 use super::*;
4861
4862 // ── merge_props tests ────────────────────────────────────────────
4863
4864 #[test]
4865 fn test_merge_props_replace_tombstones_missing_keys() {
4866 let current: HashMap<String, Value> = [
4867 ("name".into(), Value::String("Alice".into())),
4868 ("age".into(), Value::Int(30)),
4869 ]
4870 .into();
4871 let incoming: HashMap<String, Value> =
4872 [("name".into(), Value::String("Bob".into()))].into();
4873
4874 let result = Executor::merge_props(current, incoming, true);
4875 assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4876 assert_eq!(
4877 result.get("age"),
4878 Some(&Value::Null),
4879 "Missing keys should be tombstoned in replace mode"
4880 );
4881 }
4882
4883 #[test]
4884 fn test_merge_props_merge_preserves_existing() {
4885 let current: HashMap<String, Value> = [
4886 ("name".into(), Value::String("Alice".into())),
4887 ("age".into(), Value::Int(30)),
4888 ]
4889 .into();
4890 let incoming: HashMap<String, Value> =
4891 [("city".into(), Value::String("NYC".into()))].into();
4892
4893 let result = Executor::merge_props(current, incoming, false);
4894 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4895 assert_eq!(result.get("age"), Some(&Value::Int(30)));
4896 assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4897 }
4898
4899 #[test]
4900 fn test_merge_props_null_incoming_is_tombstone() {
4901 let current: HashMap<String, Value> =
4902 [("name".into(), Value::String("Alice".into()))].into();
4903 let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4904
4905 // Merge mode: null overwrites
4906 let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4907 assert_eq!(result.get("name"), Some(&Value::Null));
4908
4909 // Replace mode: null is tombstone
4910 let result = Executor::merge_props(current, incoming, true);
4911 assert_eq!(result.get("name"), Some(&Value::Null));
4912 }
4913
4914 #[test]
4915 fn test_merge_props_empty_current() {
4916 let current: HashMap<String, Value> = HashMap::new();
4917 let incoming: HashMap<String, Value> =
4918 [("name".into(), Value::String("Alice".into()))].into();
4919
4920 let result = Executor::merge_props(current, incoming, false);
4921 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4922 assert_eq!(result.len(), 1);
4923 }
4924
4925 #[test]
4926 fn test_merge_props_empty_incoming_replace_tombstones_all() {
4927 let current: HashMap<String, Value> = [
4928 ("name".into(), Value::String("Alice".into())),
4929 ("age".into(), Value::Int(30)),
4930 ]
4931 .into();
4932 let incoming: HashMap<String, Value> = HashMap::new();
4933
4934 let result = Executor::merge_props(current, incoming, true);
4935 assert_eq!(result.get("name"), Some(&Value::Null));
4936 assert_eq!(result.get("age"), Some(&Value::Null));
4937 }
4938
4939 // ── extract_labels_from_node tests ───────────────────────────────
4940
4941 #[test]
4942 fn test_extract_labels_from_map() {
4943 let mut map = HashMap::new();
4944 map.insert("_vid".into(), Value::Int(1));
4945 map.insert(
4946 "_labels".into(),
4947 Value::List(vec![
4948 Value::String("Person".into()),
4949 Value::String("Employee".into()),
4950 ]),
4951 );
4952 let val = Value::Map(map);
4953
4954 let labels = Executor::extract_labels_from_node(&val);
4955 assert_eq!(
4956 labels,
4957 Some(vec!["Person".to_string(), "Employee".to_string()])
4958 );
4959 }
4960
4961 #[test]
4962 fn test_extract_labels_from_value_node() {
4963 let node = uni_common::Node {
4964 vid: uni_common::core::id::Vid::from(1u64),
4965 labels: vec!["Person".to_string()],
4966 properties: HashMap::new(),
4967 };
4968 let labels = Executor::extract_labels_from_node(&Value::Node(node));
4969 assert_eq!(labels, Some(vec!["Person".to_string()]));
4970 }
4971
4972 #[test]
4973 fn test_extract_labels_non_node_returns_none() {
4974 assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4975 assert_eq!(
4976 Executor::extract_labels_from_node(&Value::String("hello".into())),
4977 None
4978 );
4979 }
4980
4981 // ── extract_user_properties_from_value tests ─────────────────────
4982
4983 #[test]
4984 fn test_extract_user_props_strips_internal_keys() {
4985 let mut map = HashMap::new();
4986 map.insert("_vid".into(), Value::Int(1));
4987 map.insert(
4988 "_labels".into(),
4989 Value::List(vec![Value::String("Person".into())]),
4990 );
4991 map.insert("name".into(), Value::String("Alice".into()));
4992 map.insert("age".into(), Value::Int(30));
4993
4994 let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4995 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4996 assert_eq!(props.get("age"), Some(&Value::Int(30)));
4997 assert!(!props.contains_key("_vid"));
4998 assert!(!props.contains_key("_labels"));
4999 }
5000
5001 #[test]
5002 fn test_extract_user_props_plain_map_returns_as_is() {
5003 let mut map = HashMap::new();
5004 map.insert("key".into(), Value::String("value".into()));
5005
5006 let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
5007 assert_eq!(props, map);
5008 }
5009
5010 #[test]
5011 fn test_extract_user_props_from_value_node() {
5012 let mut properties = HashMap::new();
5013 properties.insert("name".into(), Value::String("Alice".into()));
5014 let node = uni_common::Node {
5015 vid: uni_common::core::id::Vid::from(1u64),
5016 labels: vec!["Person".to_string()],
5017 properties,
5018 };
5019 let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
5020 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
5021 }
5022}