uni_query/query/executor/write.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17 DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18 SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32 eid: Eid,
33 src: Vid,
34 dst: Vid,
35 edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44 vid: Vid,
45 labels: Vec<String>,
46 /// Full property map (storage union L0 from
47 /// `get_all_vertex_props_with_ctx` plus the touched values applied
48 /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49 /// tells the flush which columns to send to Lance via MergeInsert.
50 props: HashMap<String, Value>,
51 /// `true` when the SET should flush via the partial-column MergeInsert
52 /// path: set when `UniConfig::partial_lance_writes` is on AND the
53 /// label has no generated columns. Generated-column labels still need
54 /// the full-row Append so the regenerated values land.
55 partial: bool,
56 /// Set of property keys touched by this statement. Threaded into L0
57 /// so the flush emits a `MergeInsertBuilder` source with exactly
58 /// these columns. Empty when `partial == false`.
59 touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64 src: Vid,
65 dst: Vid,
66 edge_type_id: u32,
67 eid: Eid,
68 edge_type_name: String,
69 /// `true` when the SET should flush via the partial-column
70 /// MergeInsert path on the per-edge-type delta tables (Round 12
71 /// §A). Set when `UniConfig::partial_lance_writes` is on.
72 partial: bool,
73 /// Property keys touched by this statement. Threaded into L0 so
74 /// the flush emits a `MergeInsertBuilder` source with exactly
75 /// these columns. Empty when `partial == false`.
76 touched: HashSet<String>,
77 props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84 if vid.is_ephemeral() {
85 return Err(anyhow::Error::from(
86 uni_common::UniError::EphemeralWriteAttempt {
87 kind: "node",
88 id: vid.transient_id().unwrap_or(vid.as_u64()),
89 },
90 ));
91 }
92 Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97 match val {
98 Value::Null => "Null",
99 Value::Bool(_) => "Bool",
100 Value::Int(_) => "Int",
101 Value::Float(_) => "Float",
102 Value::String(_) => "String",
103 Value::Bytes(_) => "Bytes",
104 Value::List(_) => "List",
105 Value::Map(_) => "Map",
106 Value::Node(_) => "Node",
107 Value::Edge(_) => "Edge",
108 Value::Path(_) => "Path",
109 Value::Vector(_) => "Vector",
110 Value::Temporal(_) => "Temporal",
111 _ => "value",
112 }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117 if eid.is_ephemeral() {
118 return Err(anyhow::Error::from(
119 uni_common::UniError::EphemeralWriteAttempt {
120 kind: "edge",
121 id: eid.transient_id().unwrap_or(eid.as_u64()),
122 },
123 ));
124 }
125 Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150 labels: &[String],
151 op: &str,
152) -> Result<()> {
153 let Some(registry) = registry else {
154 return Ok(());
155 };
156 for label in labels {
157 if registry.virtual_label_by_name(label).is_some() {
158 return Err(anyhow!(
159 "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160 labels are read-only; write back via the originating catalog instead"
161 ));
162 }
163 }
164 Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178 edge_type_id: u32,
179 op: &str,
180) -> Result<()> {
181 let Some(registry) = registry else {
182 return Ok(());
183 };
184 if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185 return Err(anyhow!(
186 "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187 types are read-only; write back via the originating catalog instead",
188 entry.name
189 ));
190 }
191 Ok(())
192}
193
194impl Executor {
195 /// Extracts labels from a node value.
196 ///
197 /// Handles both `Value::Map` (with a `_labels` list field) and
198 /// `Value::Node` (with a `labels` vec field).
199 ///
200 /// Returns `None` when the value is not a node or has no labels.
201 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202 match node_val {
203 Value::Map(map) => {
204 // Map-encoded node: look for _labels array
205 if let Some(Value::List(labels_arr)) = map.get("_labels") {
206 let labels: Vec<String> = labels_arr
207 .iter()
208 .filter_map(|v| v.as_str().map(|s| s.to_string()))
209 .collect();
210 if !labels.is_empty() {
211 return Some(labels);
212 }
213 }
214 None
215 }
216 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217 _ => None,
218 }
219 }
220
221 /// Extracts user-visible properties from a value that represents a node or edge.
222 ///
223 /// Strips internal bookkeeping keys (those prefixed with `_` or named
224 /// `ext_id`) from map-encoded entities and returns only the user-facing
225 /// property key-value pairs.
226 ///
227 /// Returns `None` when `val` is not a map, node, or edge.
228 pub(crate) fn extract_user_properties_from_value(
229 val: &Value,
230 ) -> Option<HashMap<String, Value>> {
231 match val {
232 Value::Map(map) => {
233 // Distinguish entity-encoded maps from plain map literals.
234 // A node map has both `_vid` and `_labels`.
235 // An edge map has `_eid`, `_src`, and `_dst`.
236 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237 let is_edge_map = map.contains_key("_eid")
238 && map.contains_key("_src")
239 && map.contains_key("_dst");
240
241 if is_node_map || is_edge_map {
242 // Filter out internal bookkeeping keys
243 let user_props: HashMap<String, Value> = map
244 .iter()
245 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246 .map(|(k, v)| (k.clone(), v.clone()))
247 .collect();
248 // When mutation output omits dotted property columns, user
249 // properties live inside `_all_props` rather than at the
250 // top level of the entity map.
251 if user_props.is_empty()
252 && let Some(Value::Map(all_props)) = map.get("_all_props")
253 {
254 return Some(all_props.clone());
255 }
256 Some(user_props)
257 } else {
258 // Plain map literal — return as-is
259 Some(map.clone())
260 }
261 }
262 Value::Node(node) => Some(node.properties.clone()),
263 Value::Edge(edge) => Some(edge.properties.clone()),
264 _ => None,
265 }
266 }
267
268 /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269 ///
270 /// When `replace` is `true` the entity's property set is replaced: keys absent
271 /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272 /// layer removes them. When `replace` is `false` the map is merged: keys in
273 /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274 /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275 /// modes.
276 ///
277 /// Labels are never altered — the spec states that `SET n = map` replaces
278 /// properties only.
279 ///
280 /// # Errors
281 ///
282 /// Returns an error if the entity cannot be found in the storage layer, or
283 /// if the writer fails to persist the updated properties.
284 #[expect(clippy::too_many_arguments)]
285 async fn apply_properties_to_entity(
286 &self,
287 variable: &str,
288 new_props: HashMap<String, Value>,
289 replace: bool,
290 row: &mut HashMap<String, Value>,
291 writer: &Writer,
292 prop_manager: &PropertyManager,
293 params: &HashMap<String, Value>,
294 ctx: Option<&QueryContext>,
295 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296 prefetched: &Prefetch,
297 ) -> Result<()> {
298 // Clone the target so we can hold &row references elsewhere.
299 let target = row.get(variable).cloned();
300
301 // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302 // forms, mirroring the per-property SET path (issue #68).
303 let schema = self.storage.schema_manager().schema();
304
305 match target {
306 Some(Value::Node(ref node)) => {
307 let vid = node.vid;
308 let labels = node.labels.clone();
309 let current =
310 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311 let write_props = Self::merge_props(current, new_props, replace);
312 let mut enriched = write_props.clone();
313 for label_name in &labels {
314 self.enrich_properties_with_generated_columns(
315 label_name,
316 &mut enriched,
317 prop_manager,
318 params,
319 ctx,
320 )
321 .await?;
322 }
323 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324 let _ = writer
325 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326 .await?;
327 // Update the in-memory row binding
328 if let Some(Value::Node(n)) = row.get_mut(variable) {
329 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330 }
331 }
332 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333 let vid = Self::vid_from_value(node_val)?;
334 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335 let current =
336 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337 let write_props = Self::merge_props(current, new_props, replace);
338 let mut enriched = write_props.clone();
339 for label_name in &labels {
340 self.enrich_properties_with_generated_columns(
341 label_name,
342 &mut enriched,
343 prop_manager,
344 params,
345 ctx,
346 )
347 .await?;
348 }
349 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350 let _ = writer
351 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352 .await?;
353 // Update the in-memory map-encoded node binding
354 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355 // Remove old user property keys, keep internal fields
356 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357 // Build effective (non-null) properties
358 let effective: HashMap<String, Value> =
359 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360 for (k, v) in &effective {
361 node_map.insert(k.clone(), v.clone());
362 }
363 // Replace _all_props to reflect the complete property set
364 node_map.insert("_all_props".to_string(), Value::Map(effective));
365 }
366 }
367 Some(Value::Edge(ref edge)) => {
368 let eid = edge.eid;
369 let src = edge.src;
370 let dst = edge.dst;
371 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372 let current =
373 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374 let write_props = Self::merge_props(current, new_props, replace);
375 let write_props = Self::coerce_and_validate_props(
376 write_props,
377 &schema,
378 std::slice::from_ref(&edge.edge_type),
379 )?;
380 writer
381 .insert_edge(
382 src,
383 dst,
384 etype,
385 eid,
386 write_props.clone(),
387 Some(edge.edge_type.clone()),
388 tx_l0,
389 )
390 .await?;
391 // Update the in-memory row binding
392 if let Some(Value::Edge(e)) = row.get_mut(variable) {
393 e.properties = write_props
394 .into_iter()
395 .filter(|(_, v)| !v.is_null())
396 .collect();
397 }
398 }
399 Some(Value::Map(ref map))
400 if map.contains_key("_eid")
401 && map.contains_key("_src")
402 && map.contains_key("_dst") =>
403 {
404 let ei = self.extract_edge_identity(map)?;
405 let current =
406 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407 let write_props = Self::merge_props(current, new_props, replace);
408 let edge_type_name = map
409 .get("_type")
410 .and_then(|v| v.as_str())
411 .map(|s| s.to_string())
412 .or_else(|| {
413 self.storage
414 .schema_manager()
415 .edge_type_name_by_id_unified(ei.edge_type_id)
416 });
417 let write_props = match &edge_type_name {
418 Some(name) => Self::coerce_and_validate_props(
419 write_props,
420 &schema,
421 std::slice::from_ref(name),
422 )?,
423 None => write_props,
424 };
425 writer
426 .insert_edge(
427 ei.src,
428 ei.dst,
429 ei.edge_type_id,
430 ei.eid,
431 write_props.clone(),
432 edge_type_name,
433 tx_l0,
434 )
435 .await?;
436 // Update the in-memory map-encoded edge binding
437 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438 edge_map.retain(|k, _| k.starts_with('_'));
439 let effective: HashMap<String, Value> = write_props
440 .into_iter()
441 .filter(|(_, v)| !v.is_null())
442 .collect();
443 for (k, v) in &effective {
444 edge_map.insert(k.clone(), v.clone());
445 }
446 // Replace _all_props to reflect the complete property set
447 edge_map.insert("_all_props".to_string(), Value::Map(effective));
448 }
449 }
450 _ => {
451 // No matching entity — nothing to do (caller already guarded against Null)
452 }
453 }
454 Ok(())
455 }
456
457 /// Computes the property map to write given current storage state and the
458 /// incoming change map.
459 ///
460 /// When `replace` is `true`, keys present in `current` but absent from
461 /// `incoming` are tombstoned with `Value::Null`. Null values inside
462 /// `incoming` are always preserved as explicit tombstones.
463 ///
464 /// When `replace` is `false`, `current` is the base and `incoming` is
465 /// merged on top: each key in `incoming` overwrites or tombstones the
466 /// corresponding entry in `current`.
467 fn merge_props(
468 current: HashMap<String, Value>,
469 incoming: HashMap<String, Value>,
470 replace: bool,
471 ) -> HashMap<String, Value> {
472 if replace {
473 // Start from the non-null incoming entries only.
474 let mut result: HashMap<String, Value> = incoming
475 .iter()
476 .filter(|(_, v)| !v.is_null())
477 .map(|(k, v)| (k.clone(), v.clone()))
478 .collect();
479 // Tombstone every current key that is absent from incoming OR explicitly
480 // set to null in incoming (both mean "delete this property").
481 for k in current.keys() {
482 if incoming.get(k).is_none_or(|v| v.is_null()) {
483 result.insert(k.clone(), Value::Null);
484 }
485 }
486 result
487 } else {
488 // Merge: start from current and apply incoming on top
489 let mut result = current;
490 result.extend(incoming);
491 result
492 }
493 }
494
495 /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497 let eid = Eid::from(
498 map.get("_eid")
499 .and_then(|v| v.as_u64())
500 .ok_or_else(|| anyhow!("Invalid _eid"))?,
501 );
502 let src = Vid::from(
503 map.get("_src")
504 .and_then(|v| v.as_u64())
505 .ok_or_else(|| anyhow!("Invalid _src"))?,
506 );
507 let dst = Vid::from(
508 map.get("_dst")
509 .and_then(|v| v.as_u64())
510 .ok_or_else(|| anyhow!("Invalid _dst"))?,
511 );
512 let edge_type_id = self.resolve_edge_type_id(
513 map.get("_type")
514 .or_else(|| map.get("_type_name"))
515 .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516 )?;
517 Ok(EdgeIdentity {
518 eid,
519 src,
520 dst,
521 edge_type_id,
522 })
523 }
524
525 /// Resolve edge type ID from a Value, supporting both Int and String representations.
526 /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527 ///
528 /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529 /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530 /// where the edge type was just created and may not be in the read-only lookup yet.
531 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532 match type_val {
533 Value::Int(i) => Ok(*i as u32),
534 Value::String(name) => {
535 if self.config.strict_schema {
536 let schema = self.storage.schema_manager().schema();
537 schema
538 .edge_type_id_by_name_case_insensitive(name)
539 .ok_or_else(|| {
540 anyhow!(
541 "Edge type '{}' is not defined in the schema \
542 (strict_schema is enabled). \
543 Declare it with db.schema().edge_type(...).apply() first.",
544 name
545 )
546 })
547 } else {
548 // Schemaless: assign new ID if not found in schema or registry.
549 Ok(self
550 .storage
551 .schema_manager()
552 .get_or_assign_edge_type_id(name))
553 }
554 }
555 _ => Err(anyhow!(
556 "Invalid _type value: expected Int or String, got {:?}",
557 type_val
558 )),
559 }
560 }
561
562 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563 if let Some(writer_arc) = &self.writer {
564 // Flush first while holding the lock
565 {
566 let writer: &uni_store::Writer = writer_arc.as_ref();
567 writer.flush_to_l1(None).await?;
568 } // Drop lock before compacting to avoid blocking reads/writes
569
570 // Compaction can run without holding the writer lock
571 let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572 let compaction_results = compactor.compact_all().await?;
573
574 // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575 let am = self.storage.adjacency_manager();
576 let schema = self.storage.schema_manager().schema();
577 for info in compaction_results {
578 // Convert string direction to Direction enum
579 let direction = match info.direction.as_str() {
580 "fwd" => uni_store::storage::direction::Direction::Outgoing,
581 "bwd" => uni_store::storage::direction::Direction::Incoming,
582 _ => continue,
583 };
584
585 // Get edge_type_id
586 if let Some(edge_type_id) =
587 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588 {
589 // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591 }
592 }
593 }
594 Ok(())
595 }
596
597 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598 if let Some(writer_arc) = &self.writer {
599 let writer: &uni_store::Writer = writer_arc.as_ref();
600 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601 }
602 Ok(())
603 }
604
605 pub(crate) async fn execute_copy_to(
606 &self,
607 identifier: &str,
608 path: &str,
609 format: &str,
610 options: &HashMap<String, Value>,
611 ) -> Result<usize> {
612 // Check schema to determine if identifier is an edge type or vertex label
613 let schema = self.storage.schema_manager().schema();
614
615 // Try as edge type first
616 if schema.get_edge_type_case_insensitive(identifier).is_some() {
617 return self
618 .export_edge_type_in_format(identifier, path, format)
619 .await;
620 }
621
622 // Try as vertex label
623 if schema.get_label_case_insensitive(identifier).is_some() {
624 return self
625 .export_vertex_label_in_format(identifier, path, format, options)
626 .await;
627 }
628
629 // Neither edge type nor vertex label found
630 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631 }
632
633 async fn export_vertex_label_in_format(
634 &self,
635 label: &str,
636 path: &str,
637 format: &str,
638 _options: &HashMap<String, Value>,
639 ) -> Result<usize> {
640 match format {
641 "parquet" => self.export_vertex_label(label, path).await,
642 "csv" => {
643 let mut stream = self
644 .storage
645 .scan_vertex_table_stream(label)
646 .await?
647 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649 // Collect all batches
650 let mut all_rows = Vec::new();
651 let mut column_names = Vec::new();
652
653 // Iterate stream using StreamExt
654 use futures::StreamExt;
655 while let Some(batch_result) = stream.next().await {
656 let batch = batch_result?;
657
658 // Get column names from first batch
659 if column_names.is_empty() {
660 column_names = batch
661 .schema()
662 .fields()
663 .iter()
664 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665 .map(|f| f.name().clone())
666 .collect();
667 }
668
669 // Convert batch to rows
670 for row_idx in 0..batch.num_rows() {
671 let mut row = Vec::new();
672 for field in batch.schema().fields() {
673 if field.name().starts_with('_') || field.name() == "ext_id" {
674 continue;
675 }
676
677 let col_idx = batch.schema().index_of(field.name())?;
678 let column = batch.column(col_idx);
679 let value = self.arrow_value_to_json(column, row_idx)?;
680
681 // Convert value to CSV string
682 let csv_value = match value {
683 Value::Null => String::new(),
684 Value::Bool(b) => b.to_string(),
685 Value::Int(i) => i.to_string(),
686 Value::Float(f) => f.to_string(),
687 Value::String(s) => s,
688 _ => format!("{value}"),
689 };
690 row.push(csv_value);
691 }
692 all_rows.push(row);
693 }
694 }
695
696 // Write CSV
697 let file = std::fs::File::create(path)?;
698 let mut wtr = csv::Writer::from_writer(file);
699
700 // Write headers
701 log::debug!("CSV export headers: {:?}", column_names);
702 wtr.write_record(&column_names)?;
703
704 // Write rows
705 for (i, row) in all_rows.iter().enumerate() {
706 log::debug!("CSV export row {}: {:?}", i, row);
707 wtr.write_record(row)?;
708 }
709
710 wtr.flush()?;
711 Ok(all_rows.len())
712 }
713 _ => Err(anyhow!(
714 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715 format
716 )),
717 }
718 }
719
720 async fn export_edge_type_in_format(
721 &self,
722 edge_type: &str,
723 path: &str,
724 format: &str,
725 ) -> Result<usize> {
726 match format {
727 "parquet" => self.export_edge_type(edge_type, path).await,
728 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729 _ => Err(anyhow!(
730 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731 format
732 )),
733 }
734 }
735
736 /// Write a stream of record batches to a Parquet file.
737 /// Returns the total number of rows written, or 0 if the stream is empty.
738 async fn write_batches_to_parquet(
739 mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740 path: &str,
741 entity_description: &str,
742 ) -> Result<usize> {
743 use futures::TryStreamExt;
744
745 // Get first batch to determine schema and create writer
746 let first_batch = match stream.try_next().await? {
747 Some(batch) => batch,
748 None => {
749 log::info!("No data to export from {}", entity_description);
750 return Ok(0);
751 }
752 };
753
754 // Create Parquet writer using schema from first batch
755 let file = std::fs::File::create(path)?;
756 let arrow_schema = first_batch.schema();
757 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759 // Write first batch
760 let mut count = first_batch.num_rows();
761 writer.write(&first_batch)?;
762
763 // Write remaining batches
764 while let Some(batch) = stream.try_next().await? {
765 count += batch.num_rows();
766 writer.write(&batch)?;
767 }
768
769 writer.close()?;
770
771 log::info!(
772 "Exported {} rows from {} to '{}'",
773 count,
774 entity_description,
775 path
776 );
777 Ok(count)
778 }
779
780 /// Export vertices of a specific label to Parquet
781 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782 let stream = self
783 .storage
784 .scan_vertex_table_stream(label)
785 .await?
786 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789 }
790
791 /// Export edges of a specific type to Parquet
792 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793 let schema = self.storage.schema_manager().schema();
794 if !schema.edge_types.contains_key(edge_type) {
795 return Err(anyhow!("Edge type '{}' not found", edge_type));
796 }
797
798 let filter = format!("type = '{}'", edge_type);
799 let stream = self
800 .storage
801 .scan_main_edge_table_stream(Some(&filter))
802 .await?
803 .ok_or_else(|| anyhow!("No edge data found"))?;
804
805 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806 }
807
808 pub(crate) async fn execute_copy_from(
809 &self,
810 label: &str,
811 path: &str,
812 format: &str,
813 options: &HashMap<String, Value>,
814 ) -> Result<usize> {
815 // Read data from file
816 let batches = match format {
817 "parquet" => self.read_parquet_file(path)?,
818 "csv" => self.read_csv_file(path, label, options)?,
819 _ => {
820 return Err(anyhow!(
821 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822 format
823 ));
824 }
825 };
826
827 // Get writer
828 let writer_arc = self
829 .writer
830 .as_ref()
831 .ok_or_else(|| anyhow!("No writer available"))?;
832
833 let db_schema = self.storage.schema_manager().schema();
834
835 // Check if this is a label (vertex) or edge type
836 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838 if is_edge {
839 // Import edges
840 let edge_type_id = db_schema
841 .edge_type_id_by_name(label)
842 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844 // Get src and dst column names from options
845 let src_col = options
846 .get("src_col")
847 .and_then(|v| v.as_str())
848 .unwrap_or("src");
849 let dst_col = options
850 .get("dst_col")
851 .and_then(|v| v.as_str())
852 .unwrap_or("dst");
853
854 // §5.7 of concurrent_writer.md: writer is hoisted above the row
855 // loop now that there is no per-row lock acquisition cost.
856 let writer: &uni_store::Writer = writer_arc.as_ref();
857 let mut total_rows = 0;
858 for batch in batches {
859 let num_rows = batch.num_rows();
860 // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861 let eids = writer.allocate_eids(num_rows).await?;
862
863 for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864 let mut properties = HashMap::new();
865 let mut src_vid: Option<Vid> = None;
866 let mut dst_vid: Option<Vid> = None;
867
868 // Extract properties and VIDs from each column
869 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870 let col_name = field.name();
871 let column = batch.column(col_idx);
872 let value = self.arrow_value_to_json(column, row_idx)?;
873
874 if col_name == src_col {
875 let raw = value.as_u64().unwrap_or_else(|| {
876 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877 });
878 src_vid = Some(Vid::new(raw));
879 } else if col_name == dst_col {
880 let raw = value.as_u64().unwrap_or_else(|| {
881 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882 });
883 dst_vid = Some(Vid::new(raw));
884 } else if !col_name.starts_with('_') && !value.is_null() {
885 properties.insert(col_name.clone(), value);
886 }
887 }
888
889 let src = src_vid
890 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891 let dst = dst_vid
892 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894 writer
895 .insert_edge(
896 src,
897 dst,
898 edge_type_id,
899 eid,
900 properties,
901 Some(label.to_string()),
902 None,
903 )
904 .await?;
905
906 total_rows += 1;
907 }
908 }
909
910 log::info!(
911 "Imported {} edge rows from '{}' into edge type '{}'",
912 total_rows,
913 path,
914 label
915 );
916
917 // Flush to persist edges
918 if total_rows > 0 {
919 writer.flush_to_l1(None).await?;
920 }
921
922 Ok(total_rows)
923 } else {
924 // Import vertices
925 // Validate the label exists in schema
926 db_schema
927 .label_id_by_name_case_insensitive(label)
928 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930 // §5.7 of concurrent_writer.md: writer is hoisted above the row
931 // loop now that there is no per-row lock acquisition cost.
932 let writer: &uni_store::Writer = writer_arc.as_ref();
933 let mut total_rows = 0;
934 for batch in batches {
935 let num_rows = batch.num_rows();
936 // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937 let vids = writer.allocate_vids(num_rows).await?;
938
939 // Convert Arrow batch to rows
940 for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941 let mut properties = HashMap::new();
942
943 // Extract properties from each column
944 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945 let col_name = field.name();
946
947 // Skip internal columns
948 if col_name.starts_with('_') {
949 continue;
950 }
951
952 let column = batch.column(col_idx);
953 let value = self.arrow_value_to_json(column, row_idx)?;
954
955 if !value.is_null() {
956 properties.insert(col_name.clone(), value);
957 }
958 }
959
960 let _ = writer
961 .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962 .await?;
963
964 total_rows += 1;
965 }
966 }
967
968 log::info!(
969 "Imported {} rows from '{}' into label '{}'",
970 total_rows,
971 path,
972 label
973 );
974
975 // Flush to persist vertices
976 if total_rows > 0 {
977 writer.flush_to_l1(None).await?;
978 }
979
980 Ok(total_rows)
981 }
982 }
983
984 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985 use arrow_array::Array;
986 use arrow_schema::DataType as ArrowDataType;
987
988 if column.is_null(row_idx) {
989 return Ok(Value::Null);
990 }
991
992 match column.data_type() {
993 ArrowDataType::Utf8 => {
994 let array = column
995 .as_any()
996 .downcast_ref::<arrow_array::StringArray>()
997 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998 Ok(Value::String(array.value(row_idx).to_string()))
999 }
1000 ArrowDataType::Int32 => {
1001 let array = column
1002 .as_any()
1003 .downcast_ref::<arrow_array::Int32Array>()
1004 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005 Ok(Value::Int(array.value(row_idx) as i64))
1006 }
1007 ArrowDataType::Int64 => {
1008 let array = column
1009 .as_any()
1010 .downcast_ref::<arrow_array::Int64Array>()
1011 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012 Ok(Value::Int(array.value(row_idx)))
1013 }
1014 ArrowDataType::Float32 => {
1015 let array = column
1016 .as_any()
1017 .downcast_ref::<arrow_array::Float32Array>()
1018 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019 Ok(Value::Float(array.value(row_idx) as f64))
1020 }
1021 ArrowDataType::Float64 => {
1022 let array = column
1023 .as_any()
1024 .downcast_ref::<arrow_array::Float64Array>()
1025 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026 Ok(Value::Float(array.value(row_idx)))
1027 }
1028 ArrowDataType::Boolean => {
1029 let array = column
1030 .as_any()
1031 .downcast_ref::<arrow_array::BooleanArray>()
1032 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033 Ok(Value::Bool(array.value(row_idx)))
1034 }
1035 ArrowDataType::UInt64 => {
1036 let array = column
1037 .as_any()
1038 .downcast_ref::<arrow_array::UInt64Array>()
1039 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040 Ok(Value::Int(array.value(row_idx) as i64))
1041 }
1042 _ => {
1043 // For other types, try to convert to string
1044 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045 if let Some(arr) = array {
1046 Ok(Value::String(arr.value(row_idx).to_string()))
1047 } else {
1048 Ok(Value::Null)
1049 }
1050 }
1051 }
1052 }
1053
1054 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055 let file = std::fs::File::open(path)?;
1056 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057 .build()?;
1058 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059 }
1060
1061 fn read_csv_file(
1062 &self,
1063 path: &str,
1064 label: &str,
1065 options: &HashMap<String, Value>,
1066 ) -> Result<Vec<arrow_array::RecordBatch>> {
1067 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069 use std::sync::Arc;
1070
1071 // Parse CSV options
1072 let has_headers = options
1073 .get("headers")
1074 .and_then(|v| v.as_bool())
1075 .unwrap_or(true);
1076
1077 // Read CSV file
1078 let file = std::fs::File::open(path)?;
1079 let mut rdr = csv::ReaderBuilder::new()
1080 .has_headers(has_headers)
1081 .from_reader(file);
1082
1083 // Get schema for type conversion
1084 let db_schema = self.storage.schema_manager().schema();
1085 let properties = db_schema.properties.get(label);
1086
1087 // Collect all rows first to determine schema
1088 let mut rows: Vec<Vec<String>> = Vec::new();
1089 let headers: Vec<String> = if has_headers {
1090 rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091 } else {
1092 Vec::new()
1093 };
1094
1095 for result in rdr.records() {
1096 let record = result?;
1097 rows.push(record.iter().map(|s| s.to_string()).collect());
1098 }
1099
1100 if rows.is_empty() {
1101 return Ok(Vec::new());
1102 }
1103
1104 // Build Arrow schema with proper types based on DB schema
1105 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106 let col_names: Vec<String> = if has_headers {
1107 headers
1108 } else {
1109 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110 };
1111
1112 for name in &col_names {
1113 let arrow_type = if let Some(props) = properties {
1114 if let Some(prop_meta) = props.get(name) {
1115 match prop_meta.r#type {
1116 DataType::Int32 => ArrowDataType::Int32,
1117 DataType::Int64 => ArrowDataType::Int64,
1118 DataType::Float32 => ArrowDataType::Float32,
1119 DataType::Float64 => ArrowDataType::Float64,
1120 DataType::Bool => ArrowDataType::Boolean,
1121 _ => ArrowDataType::Utf8,
1122 }
1123 } else {
1124 ArrowDataType::Utf8
1125 }
1126 } else {
1127 ArrowDataType::Utf8
1128 };
1129 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130 }
1131
1132 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134 // Convert rows to Arrow arrays with proper types
1135 let mut columns: Vec<ArrayRef> = Vec::new();
1136 for (col_idx, field) in arrow_fields.iter().enumerate() {
1137 match field.data_type() {
1138 ArrowDataType::Int32 => {
1139 let values: Vec<Option<i32>> = rows
1140 .iter()
1141 .map(|row| {
1142 if col_idx < row.len() {
1143 row[col_idx].parse().ok()
1144 } else {
1145 None
1146 }
1147 })
1148 .collect();
1149 columns.push(Arc::new(Int32Array::from(values)));
1150 }
1151 _ => {
1152 // Default to string
1153 let values: Vec<Option<String>> = rows
1154 .iter()
1155 .map(|row| {
1156 if col_idx < row.len() {
1157 Some(row[col_idx].clone())
1158 } else {
1159 None
1160 }
1161 })
1162 .collect();
1163 columns.push(Arc::new(StringArray::from(values)));
1164 }
1165 }
1166 }
1167
1168 let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169 Ok(vec![batch])
1170 }
1171
1172 fn parse_data_type(type_str: &str) -> Result<DataType> {
1173 use uni_common::core::schema::{CrdtType, PointType};
1174 let type_str = type_str.to_lowercase();
1175 let type_str = type_str.trim();
1176 match type_str {
1177 "string" | "text" | "varchar" => Ok(DataType::String),
1178 "int" | "integer" | "int32" => Ok(DataType::Int32),
1179 "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180 "float" | "float32" | "real" => Ok(DataType::Float32),
1181 "double" | "float64" => Ok(DataType::Float64),
1182 "bool" | "boolean" => Ok(DataType::Bool),
1183 "timestamp" => Ok(DataType::Timestamp),
1184 "date" => Ok(DataType::Date),
1185 "time" => Ok(DataType::Time),
1186 "datetime" => Ok(DataType::DateTime),
1187 "duration" => Ok(DataType::Duration),
1188 "btic" => Ok(DataType::Btic),
1189 "json" | "jsonb" => Ok(DataType::CypherValue),
1190 "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194 s if s.starts_with("vector(") && s.ends_with(')') => {
1195 let dims_str = &s[7..s.len() - 1];
1196 let dimensions = dims_str
1197 .parse::<usize>()
1198 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199 Ok(DataType::Vector { dimensions })
1200 }
1201 s if s.starts_with("list<") && s.ends_with('>') => {
1202 let inner_type_str = &s[5..s.len() - 1];
1203 let inner_type = Self::parse_data_type(inner_type_str)?;
1204 Ok(DataType::List(Box::new(inner_type)))
1205 }
1206 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1207 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1208 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1209 }
1210 }
1211
1212 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1213 let sm = self.storage.schema_manager_arc();
1214 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1215 return Ok(());
1216 }
1217 sm.add_label_with_desc(&clause.name, clause.description)?;
1218 for prop in clause.properties {
1219 let dt = Self::parse_data_type(&prop.data_type)?;
1220 sm.add_property_with_desc(
1221 &clause.name,
1222 &prop.name,
1223 dt,
1224 prop.nullable,
1225 prop.description,
1226 )?;
1227 if prop.unique {
1228 let constraint = Constraint {
1229 name: format!("{}_{}_unique", clause.name, prop.name),
1230 constraint_type: ConstraintType::Unique {
1231 properties: vec![prop.name],
1232 },
1233 target: ConstraintTarget::Label(clause.name.clone()),
1234 enabled: true,
1235 };
1236 sm.add_constraint(constraint)?;
1237 }
1238 }
1239 sm.save().await?;
1240 Ok(())
1241 }
1242
1243 /// True if `key` is a generated property on any of the given labels.
1244 /// Used by the partial-write flush path (Round 12 §C) to decide
1245 /// whether the property should be added to `touched_keys` so that
1246 /// Lance MergeInsert sends the recomputed value.
1247 fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1248 let schema = self.storage.schema_manager().schema();
1249 for label in labels {
1250 if let Some(props_meta) = schema.properties.get(label)
1251 && let Some(meta) = props_meta.get(key)
1252 && meta.generation_expression.is_some()
1253 {
1254 return true;
1255 }
1256 }
1257 false
1258 }
1259
1260 pub(crate) async fn enrich_properties_with_generated_columns(
1261 &self,
1262 label_name: &str,
1263 properties: &mut HashMap<String, Value>,
1264 prop_manager: &PropertyManager,
1265 params: &HashMap<String, Value>,
1266 ctx: Option<&QueryContext>,
1267 ) -> Result<()> {
1268 let schema = self.storage.schema_manager().schema();
1269
1270 if let Some(props_meta) = schema.properties.get(label_name) {
1271 let mut generators = Vec::new();
1272 for (prop_name, meta) in props_meta {
1273 if let Some(expr_str) = &meta.generation_expression {
1274 generators.push((prop_name.clone(), expr_str.clone()));
1275 }
1276 }
1277
1278 for (prop_name, expr_str) in generators {
1279 let cache_key = (label_name.to_string(), prop_name.clone());
1280 let expr = {
1281 let cache = self.gen_expr_cache.read().await;
1282 cache.get(&cache_key).cloned()
1283 };
1284
1285 let expr = match expr {
1286 Some(e) => e,
1287 None => {
1288 let parsed = uni_cypher::parse_expression(&expr_str)
1289 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1290 let mut cache = self.gen_expr_cache.write().await;
1291 cache.insert(cache_key, parsed.clone());
1292 parsed
1293 }
1294 };
1295
1296 let mut scope = HashMap::new();
1297
1298 // If expression has an explicit variable, use it as an object
1299 if let Some(var) = expr.extract_variable() {
1300 scope.insert(var, Value::Map(properties.clone()));
1301 } else {
1302 // No explicit variable - add properties directly to scope for bare references
1303 // e.g., "lower(email)" can reference "email" directly
1304 for (k, v) in properties.iter() {
1305 scope.insert(k.clone(), v.clone());
1306 }
1307 }
1308
1309 let val = self
1310 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1311 .await?;
1312 properties.insert(prop_name, val);
1313 }
1314 }
1315 Ok(())
1316 }
1317
1318 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1319 let sm = self.storage.schema_manager_arc();
1320 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1321 return Ok(());
1322 }
1323 sm.add_edge_type_with_desc(
1324 &clause.name,
1325 clause.src_labels,
1326 clause.dst_labels,
1327 clause.description,
1328 )?;
1329 for prop in clause.properties {
1330 let dt = Self::parse_data_type(&prop.data_type)?;
1331 sm.add_property_with_desc(
1332 &clause.name,
1333 &prop.name,
1334 dt,
1335 prop.nullable,
1336 prop.description,
1337 )?;
1338 }
1339 sm.save().await?;
1340 Ok(())
1341 }
1342
1343 /// Executes an ALTER action on a schema entity.
1344 ///
1345 /// This is a shared helper for both `execute_alter_label` and
1346 /// `execute_alter_edge_type` since they have identical logic.
1347 pub(crate) async fn execute_alter_entity(
1348 sm: &Arc<SchemaManager>,
1349 entity_name: &str,
1350 action: AlterAction,
1351 ) -> Result<()> {
1352 match action {
1353 AlterAction::AddProperty(prop) => {
1354 let dt = Self::parse_data_type(&prop.data_type)?;
1355 sm.add_property_with_desc(
1356 entity_name,
1357 &prop.name,
1358 dt,
1359 prop.nullable,
1360 prop.description,
1361 )?;
1362 }
1363 AlterAction::DropProperty(prop_name) => {
1364 sm.drop_property(entity_name, &prop_name)?;
1365 }
1366 AlterAction::RenameProperty { old_name, new_name } => {
1367 sm.rename_property(entity_name, &old_name, &new_name)?;
1368 }
1369 AlterAction::SetDescription(desc) => {
1370 if sm.schema().labels.contains_key(entity_name) {
1371 sm.set_label_description(entity_name, desc)?;
1372 } else {
1373 sm.set_edge_type_description(entity_name, desc)?;
1374 }
1375 }
1376 AlterAction::SetPropertyDescription {
1377 property,
1378 description,
1379 } => {
1380 sm.set_property_description(entity_name, &property, description)?;
1381 }
1382 }
1383 sm.save().await?;
1384 Ok(())
1385 }
1386
1387 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1388 Self::execute_alter_entity(
1389 &self.storage.schema_manager_arc(),
1390 &clause.name,
1391 clause.action,
1392 )
1393 .await
1394 }
1395
1396 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1397 Self::execute_alter_entity(
1398 &self.storage.schema_manager_arc(),
1399 &clause.name,
1400 clause.action,
1401 )
1402 .await
1403 }
1404
1405 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1406 let sm = self.storage.schema_manager_arc();
1407 sm.drop_label(&clause.name, clause.if_exists)?;
1408 sm.save().await?;
1409 Ok(())
1410 }
1411
1412 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1413 let sm = self.storage.schema_manager_arc();
1414 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1415 sm.save().await?;
1416 Ok(())
1417 }
1418
1419 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1420 let sm = self.storage.schema_manager_arc();
1421 let target = ConstraintTarget::Label(clause.label);
1422 let c_type = match clause.constraint_type {
1423 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1424 properties: clause.properties,
1425 },
1426 AstConstraintType::Exists => {
1427 let property = clause
1428 .properties
1429 .into_iter()
1430 .next()
1431 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1432 ConstraintType::Exists { property }
1433 }
1434 AstConstraintType::Check => {
1435 let expression = clause
1436 .expression
1437 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1438 ConstraintType::Check {
1439 expression: expression.to_string_repr(),
1440 }
1441 }
1442 };
1443
1444 let constraint = Constraint {
1445 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1446 constraint_type: c_type,
1447 target,
1448 enabled: true,
1449 };
1450
1451 sm.add_constraint(constraint)?;
1452 sm.save().await?;
1453 Ok(())
1454 }
1455
1456 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1457 let sm = self.storage.schema_manager_arc();
1458 sm.drop_constraint(&clause.name, false)?;
1459 sm.save().await?;
1460 Ok(())
1461 }
1462
1463 /// Detects the single-node, single-label MERGE shape the fast path serves.
1464 ///
1465 /// Returns the node pattern and its label when `pattern` is one path with
1466 /// one node element, exactly one label, and a static map-literal property
1467 /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1468 /// per-row query planning. The keys do NOT need to be indexed: the persisted
1469 /// lookup degrades to a (single, filtered) label scan when no scalar index
1470 /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1471 /// Any other shape (edges, multiple labels, non-literal properties) returns
1472 /// `None` so the caller uses the general per-row path.
1473 fn merge_single_node_fastpath<'p>(
1474 &self,
1475 pattern: &'p Pattern,
1476 ) -> Option<(&'p NodePattern, String)> {
1477 if pattern.paths.len() != 1 {
1478 return None;
1479 }
1480 let path = &pattern.paths[0];
1481 if path.elements.len() != 1 {
1482 return None;
1483 }
1484 let PatternElement::Node(n) = &path.elements[0] else {
1485 return None;
1486 };
1487 let labels = n.labels.names();
1488 if labels.len() != 1 {
1489 return None;
1490 }
1491 // The key must be a static map literal so the key names are known.
1492 let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1493 return None;
1494 };
1495 if entries.is_empty() {
1496 return None;
1497 }
1498 // Resolve the label to its schema-canonical case so the fast path agrees
1499 // with the general MERGE path (which matches labels case-insensitively).
1500 // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1501 // scans/keys a different label than the canonical one and creates a
1502 // duplicate (review #3a). Falls back to the as-written label when the
1503 // schema does not know it (schemaless).
1504 let canonical = self
1505 .storage
1506 .schema_manager()
1507 .schema()
1508 .canonical_label_name(&labels[0])
1509 .unwrap_or_else(|| labels[0].clone());
1510 Some((n, canonical))
1511 }
1512
1513 /// RC3: detect the bound-endpoints, anonymous-edge relationship MERGE shape
1514 /// `(a)-[:TYPE]->(b)` whose edge existence can be resolved with one O(1)
1515 /// adjacency probe instead of building and running a per-row traversal
1516 /// `LogicalPlan` (the general path is ~19x the bulk CREATE of the same edges).
1517 ///
1518 /// Returns `(source_var, target_var, edge_type_id, direction)` when the
1519 /// pattern is exactly one path of `[Node, Rel, Node]` where the relationship
1520 /// is a single concrete type, **anonymous** (no variable → no edge binding to
1521 /// reproduce), fixed-length, and unfiltered, and both endpoint nodes are plain
1522 /// variables with no re-specified MERGE properties or inline WHERE (those are
1523 /// filters only the general path applies). Any deviation returns `None` and
1524 /// the caller keeps the general path. The caller still verifies per row that
1525 /// both endpoints are actually bound to vids.
1526 fn merge_relationship_fastpath_shape(
1527 &self,
1528 pattern: &Pattern,
1529 ) -> Option<(
1530 String,
1531 String,
1532 u32,
1533 uni_store::storage::direction::Direction,
1534 )> {
1535 if pattern.paths.len() != 1 {
1536 return None;
1537 }
1538 let [
1539 PatternElement::Node(a),
1540 PatternElement::Relationship(r),
1541 PatternElement::Node(b),
1542 ] = pattern.paths[0].elements.as_slice()
1543 else {
1544 return None;
1545 };
1546 // Endpoints: plain variables, no extra MERGE-pattern properties / inline
1547 // WHERE (a re-specified property is a filter the general path must apply).
1548 let src_var = a.variable.as_ref()?;
1549 let dst_var = b.variable.as_ref()?;
1550 if a.properties.is_some()
1551 || a.where_clause.is_some()
1552 || b.properties.is_some()
1553 || b.where_clause.is_some()
1554 {
1555 return None;
1556 }
1557 // Relationship: single concrete type, anonymous, fixed-length, unfiltered.
1558 if r.variable.is_some()
1559 || r.range.is_some()
1560 || r.properties.is_some()
1561 || r.where_clause.is_some()
1562 || r.types.names().len() != 1
1563 {
1564 return None;
1565 }
1566 let type_name = &r.types.names()[0];
1567 let type_id = if self.config.strict_schema {
1568 self.storage
1569 .schema_manager()
1570 .schema()
1571 .edge_type_id_by_name_case_insensitive(type_name)?
1572 } else {
1573 self.storage
1574 .schema_manager()
1575 .get_or_assign_edge_type_id(type_name)
1576 };
1577 let dir = match r.direction {
1578 uni_cypher::ast::Direction::Outgoing => {
1579 uni_store::storage::direction::Direction::Outgoing
1580 }
1581 uni_cypher::ast::Direction::Incoming => {
1582 uni_store::storage::direction::Direction::Incoming
1583 }
1584 // Undirected existence is ambiguous to encode as one probe; the
1585 // general path handles it.
1586 uni_cypher::ast::Direction::Both => return None,
1587 };
1588 Some((src_var.clone(), dst_var.clone(), type_id, dir))
1589 }
1590
1591 /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1592 /// is not a scalar this fast path can represent.
1593 ///
1594 /// Returning `None` makes the caller fall back to the general per-row path,
1595 /// so unusual key value types (lists, maps, temporals, nulls) are never
1596 /// silently mis-matched. The `_deleted = false` clause mirrors the
1597 /// persisted-read predicate used elsewhere; the version high-water-mark
1598 /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1599 fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1600 if key_props.is_empty() {
1601 return None;
1602 }
1603 let mut parts = Vec::with_capacity(key_props.len() + 1);
1604 for (k, v) in key_props {
1605 if !Self::is_safe_key_ident(k) {
1606 return None;
1607 }
1608 let lit = Self::render_key_literal(v)?;
1609 // Unquoted identifier: the Lance filter parser does not resolve a
1610 // double-quoted column name against the table here, so `"k" = v`
1611 // silently matches nothing. Keys are validated above to be safe
1612 // bare identifiers.
1613 parts.push(format!("{k} = {lit}"));
1614 }
1615 parts.push("_deleted = false".to_string());
1616 Some(parts.join(" AND "))
1617 }
1618
1619 /// True when a MERGE key name is a safe bare identifier for a Lance
1620 /// filter (issue #8). Keys come from a static map literal, but validate
1621 /// anyway.
1622 fn is_safe_key_ident(k: &str) -> bool {
1623 !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1624 }
1625
1626 /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1627 /// for value types this fast path cannot represent (lists, maps,
1628 /// temporals, nulls) — the caller then falls back to the general path.
1629 fn render_key_literal(v: &Value) -> Option<String> {
1630 Some(match v {
1631 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1632 Value::Int(i) => i.to_string(),
1633 Value::Float(f) => f.to_string(),
1634 Value::Bool(b) => b.to_string(),
1635 _ => return None,
1636 })
1637 }
1638
1639 /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1640 /// sorted by `key_names` order, values canonicalized).
1641 ///
1642 /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1643 /// never compares mixed literal types against one column); composite keys
1644 /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1645 /// the same `_deleted = false` clause the per-row filter used.
1646 fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1647 if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1648 return None;
1649 }
1650 let disjunction = if let [key] = key_names {
1651 // Group literals by value variant so each IN list is homogeneous.
1652 let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1653 for tuple in keys {
1654 let (_, v) = tuple.first()?;
1655 groups
1656 .entry(std::mem::discriminant(v))
1657 .or_default()
1658 .push(Self::render_key_literal(v)?);
1659 }
1660 groups
1661 .into_values()
1662 .map(|lits| {
1663 if let [lit] = lits.as_slice() {
1664 format!("{key} = {lit}")
1665 } else {
1666 format!("{key} IN ({})", lits.join(", "))
1667 }
1668 })
1669 .collect::<Vec<_>>()
1670 .join(" OR ")
1671 } else {
1672 keys.iter()
1673 .map(|tuple| {
1674 let conj = tuple
1675 .iter()
1676 .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1677 .collect::<Option<Vec<_>>>()?
1678 .join(" AND ");
1679 Some(format!("({conj})"))
1680 })
1681 .collect::<Option<Vec<_>>>()?
1682 .join(" OR ")
1683 };
1684 Some(format!("({disjunction}) AND _deleted = false"))
1685 }
1686
1687 /// Canonicalize a numeric MERGE-key value for *matching only*.
1688 ///
1689 /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1690 /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1691 /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1692 /// path already applies (review #3a). Non-numeric and non-integral values are
1693 /// returned unchanged. Used only to build match keys / comparisons, never the
1694 /// value written to a created node.
1695 fn canonical_key_value(v: &Value) -> Value {
1696 match v {
1697 Value::Float(f)
1698 if f.is_finite()
1699 && f.fract() == 0.0
1700 && *f >= i64::MIN as f64
1701 && *f <= i64::MAX as f64 =>
1702 {
1703 Value::Int(*f as i64)
1704 }
1705 other => other.clone(),
1706 }
1707 }
1708
1709 /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1710 ///
1711 /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1712 /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1713 /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1714 /// created node's properties come from the original, un-canonicalized map.
1715 fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1716 let mut tuple: MergeKey = key_props
1717 .iter()
1718 .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1719 .collect();
1720 tuple.sort_by(|a, b| a.0.cmp(&b.0));
1721 tuple
1722 }
1723
1724 /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1725 ///
1726 /// Walked once per MERGE statement (issue #69): the per-row fast path then
1727 /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1728 /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1729 /// rows and rows created earlier in the same transaction; rows created by
1730 /// later rows of this same statement are folded in incrementally by
1731 /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1732 /// [`Self::merge_key_tuple`].
1733 fn merge_l0_existing(
1734 &self,
1735 label: &str,
1736 key_names: &[String],
1737 ctx: Option<&QueryContext>,
1738 ) -> HashMap<MergeKey, Vec<Vid>> {
1739 let mut candidates: Vec<Vid> = Vec::new();
1740 l0_visibility::visit_l0_buffers(ctx, |l0| {
1741 if let Some(vids) = l0.label_to_vids.get(label) {
1742 candidates.extend(vids.iter().copied());
1743 }
1744 false
1745 });
1746
1747 let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1748 let mut seen: HashSet<Vid> = HashSet::new();
1749 for vid in candidates {
1750 if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1751 continue;
1752 }
1753 // `lookup_vertex_prop` merges across L0 layers (newest wins).
1754 let tuple: MergeKey = key_names
1755 .iter()
1756 .map(|k| {
1757 let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1758 (k.clone(), Self::canonical_key_value(&v))
1759 })
1760 .collect();
1761 map.entry(tuple).or_default().push(vid);
1762 }
1763 map
1764 }
1765
1766 /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1767 /// size and Lance/DataFusion parse cost; chunks run sequentially.
1768 const MERGE_SCAN_CHUNK: usize = 1000;
1769
1770 /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1771 /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1772 /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1773 /// independent Lance scans).
1774 ///
1775 /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1776 /// read path `MATCH` uses, so it honors the version high-water-mark and
1777 /// sees flushed rows. On the declared-label branch the key-filtered scan
1778 /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1779 /// picks each candidate's max-`_version` row and requires it to be live
1780 /// and still keyed as requested (per-label tables are MVCC-append, so a
1781 /// superseded version's row would otherwise stale-match a rewritten key).
1782 /// Matched rows are grouped by their CANONICAL key tuple (stored values
1783 /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1784 /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1785 /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1786 /// by earlier rows of the same statement) is NOT checked here — the
1787 /// per-row consumer re-checks at row time, exactly as the old per-row
1788 /// scan did.
1789 ///
1790 /// The second returned map carries the FULL property maps the schemaless
1791 /// branch already decoded for each matched vid (empty on the declared-label
1792 /// branch, which projects only key columns) — the caller seeds the
1793 /// statement-level [`Prefetch`] from it at zero extra scans.
1794 ///
1795 /// # Errors
1796 /// Propagates persisted-scan and filter-build failures — fail-closed: a
1797 /// MERGE must never treat a failed lookup as "no match" and create
1798 /// duplicates.
1799 async fn merge_lookup_persisted_batch(
1800 &self,
1801 label: &str,
1802 key_names: &[String],
1803 keys: &HashSet<MergeKey>,
1804 ) -> Result<(
1805 HashMap<MergeKey, Vec<Vid>>,
1806 HashMap<Vid, uni_common::Properties>,
1807 )> {
1808 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1809 if keys.is_empty() {
1810 return Ok((out, HashMap::new()));
1811 }
1812 // An undeclared (schemaless) label has no per-label table — its flushed
1813 // rows live only in the unified main vertex table. Route to the
1814 // main-table lookup, mirroring the planner's scan routing (a schemaless
1815 // MATCH plans `ScanMainByLabels` on the same schema predicate).
1816 if self
1817 .storage
1818 .schema_manager()
1819 .schema()
1820 .get_label_case_insensitive(label)
1821 .is_none()
1822 {
1823 return self
1824 .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1825 .await;
1826 }
1827 // Declared label — the per-label table is MVCC-append (an update
1828 // flush adds a higher-`_version` row for the same vid) and the key
1829 // predicate is pushed into the Lance filter, so a SUPERSEDED version
1830 // whose row still carries a requested key is returned while the vid's
1831 // current row (key rewritten, fails the filter) is invisible to the
1832 // scan. Version dedup among the returned rows cannot detect that, so
1833 // the lookup runs in two passes: the key-filtered scan only nominates
1834 // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1835 // each candidate's max-`_version` row to be live and still keyed as
1836 // requested.
1837 let mut columns: Vec<&str> = vec!["_vid"];
1838 columns.extend(key_names.iter().map(String::as_str));
1839
1840 let key_list: Vec<&MergeKey> = keys.iter().collect();
1841 let mut candidates: Vec<Vid> = Vec::new();
1842 let mut seen: HashSet<Vid> = HashSet::new();
1843 for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1844 let filter = Self::merge_batch_filter(key_names, chunk)
1845 .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1846 let scanned = self
1847 .storage
1848 .scan_vertex_table(label, &columns, Some(&filter))
1849 .await?;
1850 let Some(batch) = scanned else { continue };
1851 let Some(vid_col) = batch
1852 .column_by_name("_vid")
1853 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1854 else {
1855 continue;
1856 };
1857 for i in 0..vid_col.len() {
1858 let vid = Vid::from(vid_col.value(i));
1859 if seen.insert(vid) {
1860 candidates.push(vid);
1861 }
1862 }
1863 }
1864
1865 // Verification pass — tombstones are NOT filtered Lance-side (the
1866 // max-version pick must see them so a deleted winner cannot let an
1867 // older live version resurrect the match), exactly like the
1868 // schemaless branch below.
1869 let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1870 verify_columns.extend(key_names.iter().map(String::as_str));
1871 for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1872 let vid_list = chunk
1873 .iter()
1874 .map(|v| v.as_u64().to_string())
1875 .collect::<Vec<_>>()
1876 .join(", ");
1877 let filter = format!("_vid IN ({vid_list})");
1878 let scanned = self
1879 .storage
1880 .scan_vertex_table(label, &verify_columns, Some(&filter))
1881 .await?;
1882 let Some(batch) = scanned else { continue };
1883 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1884 batch
1885 .column_by_name("_vid")
1886 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1887 batch
1888 .column_by_name("_deleted")
1889 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1890 batch
1891 .column_by_name("_version")
1892 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1893 ) else {
1894 return Err(anyhow!(
1895 "MERGE batched lookup: verification scan missing a required column"
1896 ));
1897 };
1898 let key_cols: Vec<_> = key_names
1899 .iter()
1900 .map(|k| batch.column_by_name(k))
1901 .collect::<Option<Vec<_>>>()
1902 .ok_or_else(|| {
1903 anyhow!("MERGE batched lookup: projected key column missing from scan result")
1904 })?;
1905 // Per-vid MVCC dedup: keep the highest-version row for each vid.
1906 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1907 for i in 0..batch.num_rows() {
1908 let vid = Vid::from(vid_col.value(i));
1909 let ver = ver_col.value(i);
1910 let entry = winners.entry(vid).or_insert((ver, i));
1911 if ver > entry.0 {
1912 *entry = (ver, i);
1913 }
1914 }
1915 for (vid, (_ver, row)) in winners {
1916 if del_col.value(row) {
1917 continue;
1918 }
1919 let tuple: MergeKey = key_names
1920 .iter()
1921 .zip(&key_cols)
1922 .map(|(k, col)| {
1923 let v = uni_store::storage::arrow_convert::arrow_to_value(
1924 col.as_ref(),
1925 row,
1926 None,
1927 );
1928 (k.clone(), Self::canonical_key_value(&v))
1929 })
1930 .collect();
1931 if keys.contains(&tuple) {
1932 out.entry(tuple).or_default().push(vid);
1933 }
1934 }
1935 }
1936 Ok((out, HashMap::new()))
1937 }
1938
1939 /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1940 ///
1941 /// Schemaless rows live only in the unified main vertex table (per-label
1942 /// tables exist only for declared labels), with all properties encoded in
1943 /// the `props_json` CypherValue blob — so key values cannot be pushed into
1944 /// the Lance filter; the key match happens in memory after decoding,
1945 /// exactly like the schemaless MATCH scan. One main-table scan regardless
1946 /// of key count.
1947 ///
1948 /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1949 /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1950 /// version per vid); the per-vid max-`_version` dedup runs here, then
1951 /// deleted winners are dropped.
1952 ///
1953 /// Also returns the full decoded property map per matched vid — the blob
1954 /// is decoded here anyway, and the caller seeds the statement-level
1955 /// [`Prefetch`] from it instead of re-reading per row.
1956 ///
1957 /// # Errors
1958 /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
1959 /// never treat a failed lookup as "no match" and create duplicates.
1960 async fn merge_lookup_persisted_batch_schemaless(
1961 &self,
1962 label: &str,
1963 key_names: &[String],
1964 keys: &HashSet<MergeKey>,
1965 ) -> Result<(
1966 HashMap<MergeKey, Vec<Vid>>,
1967 HashMap<Vid, uni_common::Properties>,
1968 )> {
1969 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1970 let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
1971 let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
1972 let Some(batch) = self
1973 .storage
1974 .scan_main_vertex_table(
1975 &["_vid", "_deleted", "props_json", "_version"],
1976 Some(&filter),
1977 )
1978 .await?
1979 else {
1980 return Ok((out, props_by_vid));
1981 };
1982 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1983 batch
1984 .column_by_name("_vid")
1985 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1986 batch
1987 .column_by_name("_deleted")
1988 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1989 batch
1990 .column_by_name("_version")
1991 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1992 ) else {
1993 return Err(anyhow!(
1994 "schemaless MERGE lookup: main vertex table scan missing a required column"
1995 ));
1996 };
1997 let props_col = batch
1998 .column_by_name("props_json")
1999 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2000
2001 // Per-vid MVCC dedup: keep the highest-version row for each vid.
2002 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
2003 for i in 0..batch.num_rows() {
2004 let vid = Vid::from(vid_col.value(i));
2005 let ver = ver_col.value(i);
2006 let entry = winners.entry(vid).or_insert((ver, i));
2007 if ver > entry.0 {
2008 *entry = (ver, i);
2009 }
2010 }
2011 for (vid, (_ver, row)) in winners {
2012 // Drop deletion tombstones AFTER picking the winner — a deleted
2013 // winner must not let an older live version resurrect the match.
2014 if del_col.value(row) {
2015 continue;
2016 }
2017 // A row without properties matches only an all-Null key tuple.
2018 let props = match props_col {
2019 Some(arr) if !arrow_array::Array::is_null(arr, row) => {
2020 match uni_common::cypher_value_codec::decode(arr.value(row))
2021 .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
2022 {
2023 Value::Map(m) => m,
2024 _ => HashMap::new(),
2025 }
2026 }
2027 _ => HashMap::new(),
2028 };
2029 let tuple: MergeKey = key_names
2030 .iter()
2031 .map(|k| {
2032 (
2033 k.clone(),
2034 Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
2035 )
2036 })
2037 .collect();
2038 if keys.contains(&tuple) {
2039 out.entry(tuple).or_default().push(vid);
2040 props_by_vid.insert(vid, props);
2041 }
2042 }
2043 Ok((out, props_by_vid))
2044 }
2045
2046 /// True if the statement-level MERGE property prefetch is safe for `label`.
2047 ///
2048 /// False when the label declares any CRDT-typed property: a prefetch HIT in
2049 /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
2050 /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
2051 /// labels keep the per-row read path. Undeclared labels are trivially safe
2052 /// (normalization is a no-op without schema CRDT entries).
2053 fn merge_label_prefetch_safe(&self, label: &str) -> bool {
2054 let schema = self.storage.schema_manager().schema();
2055 schema.properties.get(label).is_none_or(|props| {
2056 !props
2057 .values()
2058 .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
2059 })
2060 }
2061
2062 /// True if an L0 override rewrote any key column of a persisted match away
2063 /// from its requested value (so the persisted row no longer matches).
2064 fn vid_overrides_break_key(
2065 vid: Vid,
2066 key_props: &HashMap<String, Value>,
2067 ctx: Option<&QueryContext>,
2068 ) -> bool {
2069 key_props.iter().any(|(k, want)| {
2070 matches!(
2071 l0_visibility::lookup_vertex_prop(vid, k, ctx),
2072 Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
2073 )
2074 })
2075 }
2076
2077 /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2078 /// node variable.
2079 ///
2080 /// Matches the binding shape produced by `execute_create_pattern` and the
2081 /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2082 /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2083 /// valid node binding for those consumers.
2084 fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2085 let mut obj = HashMap::new();
2086 obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2087 obj.insert(
2088 "_labels".to_string(),
2089 Value::List(vec![Value::String(label.to_string())]),
2090 );
2091 for (k, v) in props {
2092 obj.insert(k, v);
2093 }
2094 Value::Map(obj)
2095 }
2096
2097 /// True if an L0-only vertex has every key column set to the requested
2098 /// value. A missing column matches only a requested `Null`.
2099 fn l0_vid_matches_key(
2100 vid: Vid,
2101 key_props: &HashMap<String, Value>,
2102 ctx: Option<&QueryContext>,
2103 ) -> bool {
2104 key_props.iter().all(
2105 |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2106 Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2107 None => *want == Value::Null,
2108 },
2109 )
2110 }
2111
2112 /// Index fast-path execution for one MERGE row of the shape detected by
2113 /// [`Self::merge_single_node_fastpath`].
2114 ///
2115 /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2116 /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2117 /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2118 /// applies ON MATCH SET to every match, or creates the node and applies
2119 /// ON CREATE SET when there is none. A newly created vertex is folded into
2120 /// `existing` so a later row of the same batch with the same key matches it
2121 /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2122 /// match, or one for a create).
2123 ///
2124 /// `prefetched` is the statement-level property prefetch (`None` when the
2125 /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2126 /// vids carry their persisted base row, freshly created vids are seeded
2127 /// with an empty base — per-row reads then resolve as base + L0 layering
2128 /// (every SET flush writes the full row to L0 before the next read, so a
2129 /// prefetch hit equals a fresh read) instead of one storage scan each.
2130 ///
2131 /// # Errors
2132 /// Propagates evaluation, create, and SET failures.
2133 #[expect(
2134 clippy::too_many_arguments,
2135 reason = "mirrors execute_merge's threaded execution state"
2136 )]
2137 async fn execute_merge_row_indexed(
2138 &self,
2139 label: &str,
2140 node: &NodePattern,
2141 path_pattern: &Pattern,
2142 temp_vars: &[String],
2143 mut row: HashMap<String, Value>,
2144 key_props: &HashMap<String, Value>,
2145 persisted: &HashMap<MergeKey, Vec<Vid>>,
2146 key_tuple: &MergeKey,
2147 existing: &mut HashMap<MergeKey, Vec<Vid>>,
2148 on_match: Option<&SetClause>,
2149 on_create: Option<&SetClause>,
2150 prop_manager: &PropertyManager,
2151 params: &HashMap<String, Value>,
2152 ctx: Option<&QueryContext>,
2153 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2154 writer: &Writer,
2155 mut prefetched: Option<&mut Prefetch>,
2156 ) -> Result<Vec<HashMap<String, Value>>> {
2157 let empty_prefetch = Prefetch::default();
2158 let mut seen: HashSet<Vid> = HashSet::new();
2159 let mut matches: Vec<Vid> = Vec::new();
2160 // Persisted (flushed) matches from the per-statement prefetch. The
2161 // prefetch is static for the statement, so re-verify liveness at row
2162 // time — an earlier row of this batch may have deleted the candidate
2163 // or rewritten its key (the old per-row scan saw those through its L0
2164 // overlay checks; these are the same checks, moved to row time).
2165 if let Some(vids) = persisted.get(key_tuple) {
2166 for &vid in vids {
2167 if l0_visibility::is_vertex_deleted(vid, ctx) {
2168 continue;
2169 }
2170 if Self::vid_overrides_break_key(vid, key_props, ctx) {
2171 continue;
2172 }
2173 if seen.insert(vid) {
2174 matches.push(vid);
2175 }
2176 }
2177 }
2178 // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2179 // in case a prior row of this batch mutated or deleted the candidate.
2180 if let Some(vids) = existing.get(key_tuple) {
2181 for &vid in vids {
2182 if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2183 continue;
2184 }
2185 if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2186 matches.push(vid);
2187 }
2188 }
2189 }
2190
2191 let mut out = Vec::new();
2192 if matches.is_empty() {
2193 // No match: create the node, then apply ON CREATE SET. Fold the
2194 // ON CREATE SET property assignments into seed props first so a
2195 // NOT-NULL property supplied only by ON CREATE SET passes
2196 // create-time validation (RC4); the post-create SET below settles
2197 // the final values.
2198 let seed_props = self
2199 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2200 .await?;
2201 self.execute_create_pattern(
2202 path_pattern,
2203 &mut row,
2204 writer,
2205 prop_manager,
2206 params,
2207 ctx,
2208 tx_l0_override,
2209 Some(&seed_props),
2210 )
2211 .await?;
2212 // Fold the new vertex into the batch snapshot for intra-batch
2213 // dedup, and seed the statement prefetch with an empty base: a
2214 // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2215 // resolves from the L0 row the create just wrote instead of
2216 // issuing a per-row storage scan that finds nothing.
2217 if let Some(var) = &node.variable
2218 && let Some(val) = row.get(var)
2219 && let Ok(vid) = Self::vid_from_value(val)
2220 {
2221 existing.entry(key_tuple.clone()).or_default().push(vid);
2222 if let Some(p) = prefetched.as_deref_mut() {
2223 p.vertex.entry(vid).or_default();
2224 }
2225 // Phantom guard (RC2): register this MERGE-create's key so a
2226 // concurrent MERGE of the same key aborts retriably at commit
2227 // (converging to one node) instead of silently duplicating —
2228 // even with no declared UNIQUE constraint. Only inside a
2229 // transaction, where commit re-probes the guard; a plain CREATE
2230 // never registers a key, so it is unaffected.
2231 if let Some(tx_l0) = tx_l0_override {
2232 let key_values: Vec<(String, Value)> = key_props
2233 .iter()
2234 .map(|(k, v)| (k.clone(), v.clone()))
2235 .collect();
2236 let guard_key =
2237 uni_store::runtime::l0::serialize_constraint_key(label, &key_values);
2238 tx_l0.write().insert_merge_guard_key(guard_key, vid);
2239 }
2240 }
2241 if let Some(set) = on_create {
2242 self.execute_set_items_locked(
2243 &set.items,
2244 &mut row,
2245 writer,
2246 prop_manager,
2247 params,
2248 ctx,
2249 tx_l0_override,
2250 prefetched.as_deref().unwrap_or(&empty_prefetch),
2251 )
2252 .await?;
2253 }
2254 Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2255 out.push(row);
2256 } else {
2257 // Apply ON MATCH SET to every matched node (multi-match semantics),
2258 // binding the node variable as a Map with _vid/_labels/props so
2259 // RETURN and downstream operators resolve it as they would for the
2260 // general MATCH and CREATE paths.
2261 for vid in matches {
2262 let mut m = row.clone();
2263 if let Some(var) = &node.variable {
2264 // Minimal binding so ON MATCH SET resolves the node by _vid.
2265 m.insert(
2266 var.clone(),
2267 Self::build_node_map(vid, label, HashMap::new()),
2268 );
2269 }
2270 if let Some(set) = on_match {
2271 self.execute_set_items_locked(
2272 &set.items,
2273 &mut m,
2274 writer,
2275 prop_manager,
2276 params,
2277 ctx,
2278 tx_l0_override,
2279 prefetched.as_deref().unwrap_or(&empty_prefetch),
2280 )
2281 .await?;
2282 }
2283 if let Some(var) = &node.variable {
2284 // Rebind with full, post-SET properties for RETURN
2285 // fidelity. The SET above flushed the full row to L0, so a
2286 // prefetch hit (base + L0 layering) reproduces exactly
2287 // what a fresh storage read would return.
2288 let props = read_vertex_props_with_prefetch(
2289 vid,
2290 prefetched.as_deref().unwrap_or(&empty_prefetch),
2291 prop_manager,
2292 ctx,
2293 )
2294 .await?;
2295 m.insert(var.clone(), Self::build_node_map(vid, label, props));
2296 }
2297 Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2298 out.push(m);
2299 }
2300 }
2301 Ok(out)
2302 }
2303
2304 #[expect(clippy::too_many_arguments)]
2305 pub(crate) async fn execute_merge(
2306 &self,
2307 rows: Vec<HashMap<String, Value>>,
2308 pattern: &Pattern,
2309 on_match: Option<&SetClause>,
2310 on_create: Option<&SetClause>,
2311 prop_manager: &PropertyManager,
2312 params: &HashMap<String, Value>,
2313 ctx: Option<&QueryContext>,
2314 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2315 ) -> Result<Vec<HashMap<String, Value>>> {
2316 let writer_lock = self
2317 .writer
2318 .as_ref()
2319 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2320
2321 // Prepare pattern for path variable binding: assign temp edge variable
2322 // names to unnamed relationships in paths that have path variables.
2323 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2324
2325 // Issue #69: a single-node, single-label MERGE takes the fast path,
2326 // skipping the per-row query planning that made batched MERGE no faster
2327 // than a per-entity loop. Indexed keys get an index point-lookup;
2328 // un-indexed keys still skip planning (the lookup is a filtered scan).
2329 // The shape is the same for every row, so it is detected once.
2330 let fastpath = self.merge_single_node_fastpath(pattern);
2331
2332 // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2333 // fast path then resolves L0/intra-batch matches with an O(1) lookup
2334 // instead of re-walking L0 for every row. `key_names` is the sorted
2335 // static key set, matching `merge_key_tuple`.
2336 let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2337 // Per-row pre-evaluated fast-path keys (None = that row falls back to
2338 // the general path), and the per-statement persisted prefetch over the
2339 // deduped key tuples — ONE chunked scan instead of one scan per row.
2340 // Key expressions only see the row's own bindings + params, so
2341 // evaluating them ahead of any creates cannot observe earlier rows.
2342 let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2343 let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2344 // Statement-level property prefetch for the fast path (review perf
2345 // residual): every persisted match's full row is batch-read ONCE, so
2346 // the per-row ON MATCH SET read and the post-SET rebind resolve as
2347 // prefetch-base + L0 layering instead of one storage scan each.
2348 // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2349 // skips CRDT normalization).
2350 let mut merge_prefetch: Option<Prefetch> = None;
2351 if let Some((node, label)) = &fastpath {
2352 let mut key_names: Vec<String> = match &node.properties {
2353 Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2354 _ => Vec::new(),
2355 };
2356 key_names.sort();
2357 fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2358
2359 row_fast.reserve(rows.len());
2360 for row in &rows {
2361 let mut key_props: HashMap<String, Value> = HashMap::new();
2362 if let Some(props_expr) = &node.properties
2363 && let Value::Map(map) = self
2364 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2365 .await?
2366 {
2367 key_props = map;
2368 }
2369 // Only rows whose every key value is a scalar the persisted
2370 // scan can express take the fast path (same gate as before,
2371 // via the filter builder).
2372 if Self::merge_key_filter(&key_props).is_some() {
2373 let tuple = Self::merge_key_tuple(&key_props);
2374 row_fast.push(Some((key_props, tuple)));
2375 } else {
2376 row_fast.push(None);
2377 }
2378 }
2379 let unique_keys: HashSet<MergeKey> = row_fast
2380 .iter()
2381 .flatten()
2382 .map(|(_, tuple)| tuple.clone())
2383 .collect();
2384 let (persisted, schemaless_props) = self
2385 .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2386 .await?;
2387 fast_persisted = persisted;
2388 if self.merge_label_prefetch_safe(label) {
2389 let mut pf = Prefetch::default();
2390 if !schemaless_props.is_empty() {
2391 // The schemaless lookup already decoded each matched vid's
2392 // full property map — zero extra scans.
2393 pf.vertex.extend(schemaless_props);
2394 } else {
2395 let vids: Vec<Vid> = fast_persisted
2396 .values()
2397 .flatten()
2398 .copied()
2399 .collect::<HashSet<Vid>>()
2400 .into_iter()
2401 .collect();
2402 if !vids.is_empty()
2403 && let Ok(batch_props) = prop_manager
2404 .get_batch_vertex_props_for_label(&vids, label, ctx)
2405 .await
2406 {
2407 // One `_vid IN (…)` scan for every matched row's base.
2408 // On Err the map stays empty — every read falls back to
2409 // the per-row path (fail-open, same posture as
2410 // prefetch_set_targets).
2411 pf.vertex.extend(batch_props);
2412 }
2413 }
2414 merge_prefetch = Some(pf);
2415 }
2416 }
2417
2418 // RC3: relationship-MERGE existence fast-path. The single-node fast path
2419 // above does not cover `(a)-[:R]->(b)`; the general path rebuilds and runs
2420 // a per-row traversal `LogicalPlan` just to check whether the edge exists
2421 // (~19x the bulk CREATE of the same edges). For the bound-endpoints,
2422 // anonymous-edge shape (and no ON MATCH SET, whose match-row semantics the
2423 // general path materialises) we resolve existence with one MVCC-correct
2424 // adjacency probe — `GraphExecutionContext::get_neighbors` merges CSR + all
2425 // L0 buffers including the transaction's own writes, so intra-batch edges
2426 // are seen — and reuse the general create / ON CREATE handling unchanged.
2427 // An ON MATCH SET with actual items needs the general path's materialised
2428 // match rows; a plain MERGE carries an *empty* on_match, which the fast
2429 // path can serve (it emits the row directly, applying nothing on match).
2430 let on_match_empty = on_match.is_none_or(|s| s.items.is_empty());
2431 let rel_fast = if fastpath.is_none() && on_match_empty {
2432 self.merge_relationship_fastpath_shape(pattern)
2433 } else {
2434 None
2435 };
2436 let rel_graph_ctx = rel_fast.as_ref().map(|_| {
2437 let l0_context = match ctx {
2438 Some(c) => crate::query::df_graph::L0Context::from_query_context(c),
2439 None => crate::query::df_graph::L0Context::empty(),
2440 };
2441 let pm_arc = self.prop_manager_arc.clone().unwrap_or_else(|| {
2442 Arc::new(PropertyManager::new(
2443 self.storage.clone(),
2444 self.storage.schema_manager_arc(),
2445 prop_manager.cache_size(),
2446 ))
2447 });
2448 crate::query::df_graph::GraphExecutionContext::with_l0_context(
2449 self.effective_storage(),
2450 l0_context,
2451 pm_arc,
2452 )
2453 });
2454
2455 let mut results = Vec::new();
2456 for (idx, mut row) in rows.into_iter().enumerate() {
2457 // Rows with a pre-evaluated scalar key take the fast path; rows
2458 // with a non-scalar key fall through to the general path below.
2459 if let Some((node, label)) = &fastpath
2460 && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2461 {
2462 let writer: &uni_store::Writer = writer_lock.as_ref();
2463 let row_out = self
2464 .execute_merge_row_indexed(
2465 label,
2466 node,
2467 &path_pattern,
2468 &temp_vars,
2469 row,
2470 key_props,
2471 &fast_persisted,
2472 key_tuple,
2473 &mut fast_existing,
2474 on_match,
2475 on_create,
2476 prop_manager,
2477 params,
2478 ctx,
2479 tx_l0_override,
2480 writer,
2481 merge_prefetch.as_mut(),
2482 )
2483 .await?;
2484 results.extend(row_out);
2485 continue;
2486 }
2487
2488 // RC3 relationship fast path: bound endpoints → resolve edge
2489 // existence with one adjacency probe and reuse the general
2490 // create / ON CREATE handling, skipping the per-row traversal plan.
2491 if let (Some((src_var, dst_var, type_id, dir)), Some(graph_ctx)) =
2492 (rel_fast.as_ref(), rel_graph_ctx.as_ref())
2493 {
2494 let src_vid = row.get(src_var).and_then(|v| Self::vid_from_value(v).ok());
2495 let dst_vid = row.get(dst_var).and_then(|v| Self::vid_from_value(v).ok());
2496 if let (Some(src_vid), Some(dst_vid)) = (src_vid, dst_vid) {
2497 let exists = graph_ctx
2498 .get_neighbors(src_vid, *type_id, *dir)
2499 .into_iter()
2500 .any(|(n, _eid)| n == dst_vid);
2501 let writer: &uni_store::Writer = writer_lock.as_ref();
2502 if !exists {
2503 // Edge absent: create only the edge (endpoints are bound),
2504 // then apply ON CREATE SET — identical to the general
2505 // create branch below.
2506 let seed_props = self
2507 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2508 .await?;
2509 self.execute_create_pattern(
2510 &path_pattern,
2511 &mut row,
2512 writer,
2513 prop_manager,
2514 params,
2515 ctx,
2516 tx_l0_override,
2517 Some(&seed_props),
2518 )
2519 .await?;
2520 if let Some(set) = on_create {
2521 self.execute_set_items_locked(
2522 &set.items,
2523 &mut row,
2524 writer,
2525 prop_manager,
2526 params,
2527 ctx,
2528 tx_l0_override,
2529 &Prefetch::default(),
2530 )
2531 .await?;
2532 }
2533 }
2534 // Whether matched or just created, the edge now exists; bind
2535 // path variables and emit the row (the edge is anonymous, so
2536 // there is no edge binding to reproduce, and ON MATCH SET is
2537 // excluded from this fast path).
2538 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2539 results.push(row);
2540 continue;
2541 }
2542 // Endpoints not bound to vids → fall through to the general path.
2543 }
2544
2545 // General execution: match-or-create per row. (The index fast path
2546 // above already handles single-node, single-label, scalar-indexed
2547 // MERGE — including unique-constrained labels, whose keys are
2548 // indexed — so there is no separate constraint-only fast path.)
2549 let matches = self
2550 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2551 .await?;
2552 let writer: &uni_store::Writer = writer_lock.as_ref();
2553
2554 let result: Result<Vec<HashMap<String, Value>>> = async {
2555 let mut batch = Vec::new();
2556 if !matches.is_empty() {
2557 for mut m in matches {
2558 if let Some(set) = on_match {
2559 self.execute_set_items_locked(
2560 &set.items,
2561 &mut m,
2562 writer,
2563 prop_manager,
2564 params,
2565 ctx,
2566 tx_l0_override,
2567 &Prefetch::default(),
2568 )
2569 .await?;
2570 }
2571 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2572 batch.push(m);
2573 }
2574 } else {
2575 // Fold ON CREATE SET into seed props so a NOT-NULL property
2576 // set only by ON CREATE SET passes create-time validation
2577 // (RC4); the post-create SET below settles the final values.
2578 let seed_props = self
2579 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2580 .await?;
2581 self.execute_create_pattern(
2582 &path_pattern,
2583 &mut row,
2584 writer,
2585 prop_manager,
2586 params,
2587 ctx,
2588 tx_l0_override,
2589 Some(&seed_props),
2590 )
2591 .await?;
2592 if let Some(set) = on_create {
2593 self.execute_set_items_locked(
2594 &set.items,
2595 &mut row,
2596 writer,
2597 prop_manager,
2598 params,
2599 ctx,
2600 tx_l0_override,
2601 &Prefetch::default(),
2602 )
2603 .await?;
2604 }
2605 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2606 batch.push(row);
2607 }
2608 Ok(batch)
2609 }
2610 .await;
2611
2612 results.extend(result?);
2613 }
2614 Ok(results)
2615 }
2616
2617 /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2618 ///
2619 /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2620 /// only by `ON CREATE SET` is present when the MERGE node is created and
2621 /// passes constraint validation (RC4). The right-hand side is evaluated
2622 /// against the current `row`.
2623 ///
2624 /// Items whose right-hand side references the target variable (e.g.
2625 /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2626 /// let the post-create SET read the seeded value and apply the assignment
2627 /// twice. Such items run only post-create, exactly once (unchanged behavior).
2628 ///
2629 /// # Errors
2630 /// Returns an error if evaluating an assignment's right-hand side fails.
2631 pub(crate) async fn on_create_seed_props(
2632 &self,
2633 on_create: Option<&SetClause>,
2634 row: &HashMap<String, Value>,
2635 prop_manager: &PropertyManager,
2636 params: &HashMap<String, Value>,
2637 ctx: Option<&QueryContext>,
2638 ) -> Result<HashMap<String, HashMap<String, Value>>> {
2639 let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2640 let Some(set) = on_create else {
2641 return Ok(seed);
2642 };
2643 for item in &set.items {
2644 if let SetItem::Property { expr, value } = item
2645 && let Expr::Property(var_expr, prop_name) = expr
2646 && let Expr::Variable(var_name) = &**var_expr
2647 // Skip self-referential RHS so the post-create SET (which also
2648 // runs) applies it exactly once rather than reading the seed.
2649 && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2650 value, var_name,
2651 )
2652 {
2653 let val = self
2654 .evaluate_expr(value, row, prop_manager, params, ctx)
2655 .await?;
2656 seed.entry(var_name.clone())
2657 .or_default()
2658 .insert(prop_name.clone(), val);
2659 }
2660 }
2661 Ok(seed)
2662 }
2663
2664 /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2665 #[expect(clippy::too_many_arguments)]
2666 pub(crate) async fn execute_create_pattern(
2667 &self,
2668 pattern: &Pattern,
2669 row: &mut HashMap<String, Value>,
2670 writer: &Writer,
2671 prop_manager: &PropertyManager,
2672 params: &HashMap<String, Value>,
2673 ctx: Option<&QueryContext>,
2674 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2675 // Per-variable properties to gap-fill into newly-created nodes before
2676 // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2677 // NOT-NULL property supplied only by ON CREATE SET passes create-time
2678 // validation (RC4). `None` for plain CREATE.
2679 seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2680 ) -> Result<()> {
2681 for path in &pattern.paths {
2682 let mut prev_vid: Option<Vid> = None;
2683 // (rel_var, type_id, type_name, props_expr, direction)
2684 type PendingRel = (String, u32, String, Option<Expr>, Direction);
2685 let mut rel_pending: Option<PendingRel> = None;
2686
2687 for element in &path.elements {
2688 match element {
2689 PatternElement::Node(n) => {
2690 let mut vid = None;
2691
2692 // Check if node variable already bound in row
2693 if let Some(var) = &n.variable
2694 && let Some(val) = row.get(var)
2695 && let Ok(existing_vid) = Self::vid_from_value(val)
2696 {
2697 vid = Some(existing_vid);
2698 }
2699
2700 // If not bound, create it
2701 if vid.is_none() {
2702 let mut props = HashMap::new();
2703 if let Some(props_expr) = &n.properties {
2704 let props_val = self
2705 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2706 .await?;
2707 if let Value::Map(map) = props_val {
2708 for (k, v) in map {
2709 props.insert(k, v);
2710 }
2711 } else {
2712 return Err(anyhow!("Properties must evaluate to a map"));
2713 }
2714 }
2715
2716 // MERGE ON CREATE SET: gap-fill properties supplied
2717 // only by ON CREATE SET so a NOT-NULL property absent
2718 // from the merge key passes create-time validation
2719 // (RC4). `or_insert` keeps the merge-key/pattern props
2720 // authoritative; the post-create SET re-applies the
2721 // real values, so the final state is unchanged.
2722 if let Some(seed) = seed_props
2723 && let Some(var) = &n.variable
2724 && let Some(var_seed) = seed.get(var)
2725 {
2726 for (k, v) in var_seed {
2727 props.entry(k.clone()).or_insert_with(|| v.clone());
2728 }
2729 }
2730
2731 let schema = self.storage.schema_manager().schema();
2732
2733 // Strict schema: reject undeclared labels.
2734 if self.config.strict_schema {
2735 for label_name in &n.labels {
2736 if schema.get_label_case_insensitive(label_name).is_none() {
2737 return Err(anyhow!(
2738 "Label '{}' is not defined in the schema \
2739 (strict_schema is enabled). \
2740 Declare it with db.schema().label(...).apply() first.",
2741 label_name
2742 ));
2743 }
2744 }
2745 }
2746
2747 // VID generation is label-independent. Pull from the
2748 // per-tx reservoir if set (amortizes the global
2749 // IdAllocator mutex), else fall back to the direct
2750 // per-VID path.
2751 let new_vid = match &self.id_reservoir {
2752 Some(r) => r.next_vid().await?,
2753 None => writer.next_vid().await?,
2754 };
2755
2756 // Enrich with generated columns only for known labels
2757 for label_name in &n.labels {
2758 if schema.get_label_case_insensitive(label_name).is_some() {
2759 self.enrich_properties_with_generated_columns(
2760 label_name,
2761 &mut props,
2762 prop_manager,
2763 params,
2764 ctx,
2765 )
2766 .await?;
2767 }
2768 }
2769
2770 // Validate/coerce against declared types AFTER enrichment, so
2771 // a type mismatch is rejected here rather than silently nulled
2772 // (and the row dropped) at flush — issue #68.
2773 let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2774
2775 // Insert vertex and get back final properties (includes auto-generated embeddings)
2776 let final_props = writer
2777 .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2778 .await?;
2779
2780 // Build node object with final properties (includes embeddings)
2781 if let Some(var) = &n.variable {
2782 let mut obj = HashMap::new();
2783 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2784 let labels_list: Vec<Value> =
2785 n.labels.iter().map(|l| Value::String(l.clone())).collect();
2786 obj.insert("_labels".to_string(), Value::List(labels_list));
2787 for (k, v) in &final_props {
2788 obj.insert(k.clone(), v.clone());
2789 }
2790 // Store node as a Map with _vid, matching MATCH behavior
2791 row.insert(var.clone(), Value::Map(obj));
2792 }
2793 vid = Some(new_vid);
2794 }
2795
2796 let current_vid = vid.unwrap();
2797
2798 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2799 rel_pending.take()
2800 && let Some(src) = prev_vid
2801 {
2802 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2803
2804 if !is_rel_bound {
2805 let mut rel_props = HashMap::new();
2806 if let Some(expr) = rel_props_expr {
2807 let val = self
2808 .evaluate_expr(&expr, row, prop_manager, params, ctx)
2809 .await?;
2810 if let Value::Map(map) = val {
2811 rel_props.extend(map);
2812 }
2813 }
2814 // Validate/coerce edge properties against the declared
2815 // edge-type schema before storing — issue #68.
2816 let edge_schema = self.storage.schema_manager().schema();
2817 let rel_props = Self::coerce_and_validate_props(
2818 rel_props,
2819 &edge_schema,
2820 std::slice::from_ref(&type_name),
2821 )?;
2822 let eid = match &self.id_reservoir {
2823 Some(r) => r.next_eid().await?,
2824 None => writer.next_eid(type_id).await?,
2825 };
2826
2827 // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2828 let (edge_src, edge_dst) = match dir {
2829 Direction::Incoming => (current_vid, src),
2830 _ => (src, current_vid),
2831 };
2832
2833 let store_props = !rel_var.is_empty();
2834 let user_props = if store_props {
2835 rel_props.clone()
2836 } else {
2837 HashMap::new()
2838 };
2839
2840 writer
2841 .insert_edge(
2842 edge_src,
2843 edge_dst,
2844 type_id,
2845 eid,
2846 rel_props,
2847 Some(type_name.clone()),
2848 tx_l0,
2849 )
2850 .await?;
2851
2852 // Edge type name is now stored by insert_edge
2853
2854 if store_props {
2855 let mut edge_map = HashMap::new();
2856 edge_map.insert(
2857 "_eid".to_string(),
2858 Value::Int(eid.as_u64() as i64),
2859 );
2860 edge_map.insert(
2861 "_src".to_string(),
2862 Value::Int(edge_src.as_u64() as i64),
2863 );
2864 edge_map.insert(
2865 "_dst".to_string(),
2866 Value::Int(edge_dst.as_u64() as i64),
2867 );
2868 edge_map
2869 .insert("_type".to_string(), Value::Int(type_id as i64));
2870 // Include user properties so downstream RETURN sees them
2871 for (k, v) in user_props {
2872 edge_map.insert(k, v);
2873 }
2874 row.insert(rel_var, Value::Map(edge_map));
2875 }
2876 }
2877 }
2878 prev_vid = Some(current_vid);
2879 }
2880 PatternElement::Relationship(r) => {
2881 if r.types.len() != 1 {
2882 return Err(anyhow!(
2883 "CREATE relationship must specify exactly one type"
2884 ));
2885 }
2886 let type_name = &r.types[0];
2887 let type_id = if self.config.strict_schema {
2888 let schema = self.storage.schema_manager().schema();
2889 schema
2890 .edge_type_id_by_name_case_insensitive(type_name)
2891 .ok_or_else(|| {
2892 anyhow!(
2893 "Edge type '{}' is not defined in the schema \
2894 (strict_schema is enabled). \
2895 Declare it with db.schema().edge_type(...).apply() first.",
2896 type_name
2897 )
2898 })?
2899 } else {
2900 // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2901 self.storage
2902 .schema_manager()
2903 .get_or_assign_edge_type_id(type_name)
2904 };
2905
2906 rel_pending = Some((
2907 r.variable.clone().unwrap_or_default(),
2908 type_id,
2909 type_name.clone(),
2910 r.properties.clone(),
2911 r.direction.clone(),
2912 ));
2913 }
2914 PatternElement::Parenthesized { .. } => {
2915 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2916 }
2917 }
2918 }
2919 }
2920 Ok(())
2921 }
2922
2923 /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2924 ///
2925 /// These are never valid OpenCypher property values regardless of the declared column
2926 /// type. A `CypherValue` column is the sole exception and is handled by the caller
2927 /// before this is reached.
2928 ///
2929 /// # Errors
2930 /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2931 fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2932 match val {
2933 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2934 anyhow::bail!(
2935 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2936 prop_name
2937 );
2938 }
2939 Value::List(items) => {
2940 for item in items {
2941 if matches!(
2942 item,
2943 Value::Map(_)
2944 | Value::Node(_)
2945 | Value::Edge(_)
2946 | Value::Path(_)
2947 | Value::List(_)
2948 ) {
2949 anyhow::bail!(
2950 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2951 prop_name
2952 );
2953 }
2954 }
2955 }
2956 _ => {}
2957 }
2958 Ok(())
2959 }
2960
2961 /// Validates and coerces `val` against the declared schema type for `prop_name`.
2962 ///
2963 /// Returns the value to actually persist. Beyond the structural checks in
2964 /// [`Self::validate_structural_property_value`], this compares the value against the
2965 /// column's declared `DataType` and:
2966 ///
2967 /// - returns it unchanged when directly storable (including the intentional
2968 /// `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
2969 /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
2970 /// column into the proper `Temporal` value, using the same parser as the Cypher
2971 /// `date()`/`time()`/`datetime()`/`duration()` constructors;
2972 /// - otherwise returns an error, so a type mismatch is surfaced at the call site
2973 /// rather than silently nulled — and the row dropped at flush. See issue #68.
2974 ///
2975 /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
2976 /// behavior.
2977 ///
2978 /// # Errors
2979 /// Returns an error if the value's type is incompatible with the declared column type,
2980 /// or if a string destined for a temporal column is not a valid temporal literal.
2981 fn coerce_and_validate_property_value(
2982 prop_name: &str,
2983 val: Value,
2984 schema: &uni_common::core::schema::Schema,
2985 labels: &[String],
2986 ) -> Result<Value> {
2987 use uni_common::core::schema::DataType;
2988
2989 // Resolve the declared type from the first label that declares this property.
2990 let declared = labels.iter().find_map(|label| {
2991 schema
2992 .properties
2993 .get(label)
2994 .and_then(|props| props.get(prop_name))
2995 .map(|meta| &meta.r#type)
2996 });
2997
2998 // CypherValue columns accept any value (including maps) — skip all checks.
2999 if matches!(declared, Some(DataType::CypherValue)) {
3000 return Ok(val);
3001 }
3002
3003 let Some(dt) = declared else {
3004 // Schemaless property: reject structural values (maps/nodes/edges/paths and
3005 // lists containing them), otherwise store as-is.
3006 Self::validate_structural_property_value(prop_name, &val)?;
3007 return Ok(val);
3008 };
3009
3010 // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
3011 // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
3012 // `Vector`) receiving their matching value, and `Null` (always accepted).
3013 if dt.accepts(&val) {
3014 return Ok(val);
3015 }
3016
3017 // Known-safe coercion: a string into a temporal column is parsed as if it had
3018 // been wrapped in the matching Cypher temporal constructor.
3019 if matches!(val, Value::String(_)) {
3020 let ctor = match dt {
3021 DataType::DateTime => Some("DATETIME"),
3022 DataType::Date => Some("DATE"),
3023 DataType::Time => Some("TIME"),
3024 DataType::Duration => Some("DURATION"),
3025 _ => None,
3026 };
3027 if let Some(name) = ctor {
3028 return uni_query_functions::datetime::eval_datetime_function(
3029 name,
3030 std::slice::from_ref(&val),
3031 )
3032 .map_err(|e| {
3033 anyhow!(
3034 "TypeError: property '{}' is declared {:?} but the string value could \
3035 not be parsed as a {} literal: {}",
3036 prop_name,
3037 dt,
3038 name,
3039 e
3040 )
3041 });
3042 }
3043 }
3044
3045 // Not storable and not coercible. Prefer the structural message when the value
3046 // is itself structural (e.g. a map into a scalar column), preserving prior
3047 // behavior; otherwise report the scalar type mismatch.
3048 Self::validate_structural_property_value(prop_name, &val)?;
3049 anyhow::bail!(
3050 "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
3051 prop_name,
3052 dt,
3053 value_type_name(&val)
3054 );
3055 }
3056
3057 /// Coerces and validates every property in `props` against the declared types for `labels`.
3058 ///
3059 /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
3060 /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
3061 /// before handing properties to the writer, so a type mismatch is rejected up front rather
3062 /// than silently nulled — and the row dropped — at flush (issue #68).
3063 ///
3064 /// # Errors
3065 /// Returns an error on the first property whose value is incompatible with its declared type.
3066 fn coerce_and_validate_props(
3067 props: HashMap<String, Value>,
3068 schema: &uni_common::core::schema::Schema,
3069 labels: &[String],
3070 ) -> Result<HashMap<String, Value>> {
3071 let mut out = HashMap::with_capacity(props.len());
3072 for (k, v) in props {
3073 let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
3074 out.insert(k, cv);
3075 }
3076 Ok(out)
3077 }
3078
3079 #[expect(clippy::too_many_arguments)]
3080 pub(crate) async fn execute_set_items_locked(
3081 &self,
3082 items: &[SetItem],
3083 row: &mut HashMap<String, Value>,
3084 writer: &Writer,
3085 prop_manager: &PropertyManager,
3086 params: &HashMap<String, Value>,
3087 ctx: Option<&QueryContext>,
3088 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3089 prefetched: &Prefetch,
3090 ) -> Result<()> {
3091 // Coalesce SetItem::Property items by target so we do ONE read + ONE
3092 // write per (variable, target) instead of one read-modify-write cycle
3093 // per item. For an UPDATE that sets N properties on the same vertex
3094 // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
3095 // n.confidence = ...`), this collapses N redundant
3096 // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
3097 // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
3098 // the measurement, and the plan in
3099 // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
3100 // for the rationale.
3101 //
3102 // RHS evaluation order is preserved: we evaluate each RHS inline and
3103 // update the row binding immediately, so a later SetItem on the same
3104 // variable that reads `n.<earlier-prop>` sees the new value.
3105 //
3106 // Non-Property variants (Labels, Variable, VariablePlus) are less
3107 // common and have lower payoff; before processing one, we flush any
3108 // pending updates for the same variable so it sees the latest L0
3109 // state and ordering semantics are preserved.
3110 let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
3111 let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
3112
3113 for item in items {
3114 match item {
3115 SetItem::Property { expr, value } => {
3116 if let Expr::Property(var_expr, prop_name) = expr
3117 && let Expr::Variable(var_name) = &**var_expr
3118 && let Some(node_val) = row.get(var_name)
3119 {
3120 if let Ok(vid) = Self::vid_from_value(node_val) {
3121 reject_if_ephemeral_vid(vid)?;
3122 let labels =
3123 Self::extract_labels_from_node(node_val).unwrap_or_default();
3124 let schema = self.storage.schema_manager().schema().clone();
3125
3126 // Lazy one-time read. Always read the full row
3127 // (preserves CRDT merge + constraint validation
3128 // + scan-side L0 visibility). The
3129 // partial-lance-writes optimization happens
3130 // PURELY AT FLUSH TIME via the per-VID
3131 // `vertex_partial_keys` set tracked in L0 — so
3132 // L0 holds the full row, scans see the full
3133 // row, and Lance only receives the touched
3134 // columns. Generated-column-bearing labels
3135 // ride the partial path too (Round 12 §C):
3136 // `enrich_properties_with_generated_columns`
3137 // runs at flush time over the merged-in-L0
3138 // full row, and the produced generator keys
3139 // are appended to `touched` so they land in
3140 // the MergeInsert source.
3141 if !pending_v.contains_key(var_name) {
3142 let storage_cfg = &self.storage.config;
3143 let partial = storage_cfg.partial_lance_writes;
3144 let read = read_vertex_props_with_prefetch(
3145 vid,
3146 prefetched,
3147 prop_manager,
3148 ctx,
3149 )
3150 .await?;
3151 pending_v.insert(
3152 var_name.clone(),
3153 PendingVertexSet {
3154 vid,
3155 labels: labels.clone(),
3156 props: read,
3157 partial,
3158 touched: HashSet::new(),
3159 },
3160 );
3161 }
3162
3163 let val = self
3164 .evaluate_expr(value, row, prop_manager, params, ctx)
3165 .await?;
3166 let val = Self::coerce_and_validate_property_value(
3167 prop_name, val, &schema, &labels,
3168 )?;
3169
3170 let pv = pending_v
3171 .get_mut(var_name)
3172 .expect("inserted above when absent");
3173 pv.props.insert(prop_name.clone(), val.clone());
3174 if pv.partial {
3175 pv.touched.insert(prop_name.clone());
3176 }
3177
3178 // Update the row binding so subsequent RHS sees the new value.
3179 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3180 node_map.insert(prop_name.clone(), val);
3181 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
3182 node.properties.insert(prop_name.clone(), val);
3183 }
3184 } else if let Value::Map(map) = node_val
3185 && map.get("_eid").is_some_and(|v| !v.is_null())
3186 && map.get("_src").is_some_and(|v| !v.is_null())
3187 && map.get("_dst").is_some_and(|v| !v.is_null())
3188 && (map.get("_type").is_some_and(|v| !v.is_null())
3189 || map.get("_type_name").is_some_and(|v| !v.is_null()))
3190 {
3191 let ei = self.extract_edge_identity(map)?;
3192 reject_if_ephemeral_eid(ei.eid)?;
3193 let schema = self.storage.schema_manager().schema().clone();
3194 // Handle _type as either String or Int (Int from CREATE, String
3195 // from queries). UNWIND on VLP edge lists emits `_type_name`
3196 // instead of `_type`; accept either.
3197 let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3198 let edge_type_name = match type_val {
3199 Some(Value::String(s)) => s.clone(),
3200 Some(Value::Int(id)) => schema
3201 .edge_type_name_by_id_unified(*id as u32)
3202 .unwrap_or_else(|| format!("EdgeType{}", id)),
3203 _ => String::new(),
3204 };
3205
3206 if !pending_e.contains_key(var_name) {
3207 let initial = read_edge_props_with_prefetch(
3208 ei.eid,
3209 prefetched,
3210 prop_manager,
3211 ctx,
3212 )
3213 .await?;
3214 let partial = self.storage.config.partial_lance_writes;
3215 pending_e.insert(
3216 var_name.clone(),
3217 PendingEdgeSet {
3218 src: ei.src,
3219 dst: ei.dst,
3220 edge_type_id: ei.edge_type_id,
3221 eid: ei.eid,
3222 edge_type_name: edge_type_name.clone(),
3223 props: initial,
3224 partial,
3225 touched: HashSet::new(),
3226 },
3227 );
3228 }
3229
3230 let val = self
3231 .evaluate_expr(value, row, prop_manager, params, ctx)
3232 .await?;
3233 let val = Self::coerce_and_validate_property_value(
3234 prop_name,
3235 val,
3236 &schema,
3237 std::slice::from_ref(&edge_type_name),
3238 )?;
3239
3240 let pe = pending_e
3241 .get_mut(var_name)
3242 .expect("inserted above when absent");
3243 pe.props.insert(prop_name.clone(), val.clone());
3244 if pe.partial {
3245 pe.touched.insert(prop_name.clone());
3246 }
3247
3248 // Update the row object so subsequent RHS sees the new value.
3249 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3250 edge_map.insert(prop_name.clone(), val);
3251 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3252 edge.properties.insert(prop_name.clone(), val);
3253 }
3254 } else if let Value::Edge(edge) = node_val {
3255 // Handle Value::Edge directly (when traverse returns Edge objects).
3256 reject_if_ephemeral_eid(edge.eid)?;
3257 let eid = edge.eid;
3258 let src = edge.src;
3259 let dst = edge.dst;
3260 let edge_type_name = edge.edge_type.clone();
3261 let etype =
3262 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3263 let schema = self.storage.schema_manager().schema().clone();
3264
3265 if !pending_e.contains_key(var_name) {
3266 let initial = read_edge_props_with_prefetch(
3267 eid,
3268 prefetched,
3269 prop_manager,
3270 ctx,
3271 )
3272 .await?;
3273 let partial = self.storage.config.partial_lance_writes;
3274 pending_e.insert(
3275 var_name.clone(),
3276 PendingEdgeSet {
3277 src,
3278 dst,
3279 edge_type_id: etype,
3280 eid,
3281 edge_type_name: edge_type_name.clone(),
3282 props: initial,
3283 partial,
3284 touched: HashSet::new(),
3285 },
3286 );
3287 }
3288
3289 let val = self
3290 .evaluate_expr(value, row, prop_manager, params, ctx)
3291 .await?;
3292 let val = Self::coerce_and_validate_property_value(
3293 prop_name,
3294 val,
3295 &schema,
3296 std::slice::from_ref(&edge_type_name),
3297 )?;
3298
3299 let pe = pending_e
3300 .get_mut(var_name)
3301 .expect("inserted above when absent");
3302 pe.props.insert(prop_name.clone(), val.clone());
3303 if pe.partial {
3304 pe.touched.insert(prop_name.clone());
3305 }
3306
3307 // Update the row object so subsequent RHS sees the new value.
3308 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3309 edge.properties.insert(prop_name.clone(), val);
3310 }
3311 }
3312 }
3313 }
3314 SetItem::Labels { variable, labels } => {
3315 // Flush any pending writes for this var so the Labels op
3316 // sees latest L0 state. Other variables' pending writes
3317 // can keep waiting (they're independent).
3318 self.flush_pending_var(
3319 variable,
3320 &mut pending_v,
3321 &mut pending_e,
3322 writer,
3323 prop_manager,
3324 params,
3325 ctx,
3326 tx_l0,
3327 prefetched,
3328 )
3329 .await?;
3330
3331 if let Some(node_val) = row.get(variable)
3332 && let Ok(vid) = Self::vid_from_value(node_val)
3333 {
3334 reject_if_ephemeral_vid(vid)?;
3335 let registry = self
3336 .procedure_registry
3337 .as_ref()
3338 .and_then(|pr| pr.plugin_registry());
3339 reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3340
3341 // Get current labels from node value
3342 let current_labels =
3343 Self::extract_labels_from_node(node_val).unwrap_or_default();
3344
3345 // Determine new labels to add (skip duplicates)
3346 let labels_to_add: Vec<_> = labels
3347 .iter()
3348 .filter(|l| !current_labels.contains(l))
3349 .cloned()
3350 .collect();
3351
3352 if !labels_to_add.is_empty() {
3353 // Resolve the FULL new label set and write it to the
3354 // TRANSACTION buffer (so the change is transactional
3355 // and OCC-conflictable), falling back to the context
3356 // (main) L0 for non-transactional callers. Replace
3357 // semantics via `set_vertex_labels`.
3358 let mut new_labels = current_labels;
3359 new_labels.extend(labels_to_add);
3360 if let Some(ctx) = ctx {
3361 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3362 l0.write().set_vertex_labels(vid, &new_labels);
3363 }
3364
3365 // Update the node value in the row with the new labels.
3366 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3367 let labels_list =
3368 new_labels.into_iter().map(Value::String).collect();
3369 obj.insert("_labels".to_string(), Value::List(labels_list));
3370 }
3371 }
3372 }
3373 }
3374 SetItem::Variable { variable, value }
3375 | SetItem::VariablePlus { variable, value } => {
3376 // Flush this var's pending writes first so the
3377 // replace/merge op sees them as latest L0 state.
3378 self.flush_pending_var(
3379 variable,
3380 &mut pending_v,
3381 &mut pending_e,
3382 writer,
3383 prop_manager,
3384 params,
3385 ctx,
3386 tx_l0,
3387 prefetched,
3388 )
3389 .await?;
3390
3391 let replace = matches!(item, SetItem::Variable { .. });
3392 let op_str = if replace { "=" } else { "+=" };
3393
3394 // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3395 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3396 continue;
3397 }
3398 let rhs = self
3399 .evaluate_expr(value, row, prop_manager, params, ctx)
3400 .await?;
3401 let new_props =
3402 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3403 anyhow!(
3404 "SET {} {} expr: right-hand side must evaluate to a map, \
3405 node, or relationship",
3406 variable,
3407 op_str
3408 )
3409 })?;
3410 self.apply_properties_to_entity(
3411 variable,
3412 new_props,
3413 replace,
3414 row,
3415 writer,
3416 prop_manager,
3417 params,
3418 ctx,
3419 tx_l0,
3420 prefetched,
3421 )
3422 .await?;
3423 }
3424 }
3425 }
3426
3427 // Flush all remaining coalesced writes — one writer call per target.
3428 // Partial entries (no generated columns) call
3429 // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3430 // but the touched-keys hint drives a MergeInsert at flush. Full
3431 // entries continue through the legacy
3432 // `insert_vertex_with_labels` (Append) path with
3433 // generated-column enrichment.
3434 for (_var_name, mut pv) in pending_v {
3435 if pv.partial {
3436 // Round 12 §C: run the generator enrichment over the
3437 // merged-in-L0 full row, then add the produced generator
3438 // keys to `touched` so they ride the MergeInsert source.
3439 // Idempotent — generators always recompute against the
3440 // post-merge property map.
3441 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3442 for label_name in &pv.labels {
3443 self.enrich_properties_with_generated_columns(
3444 label_name,
3445 &mut pv.props,
3446 prop_manager,
3447 params,
3448 ctx,
3449 )
3450 .await?;
3451 }
3452 for k in pv.props.keys() {
3453 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3454 pv.touched.insert(k.clone());
3455 }
3456 }
3457 writer
3458 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3459 .await?;
3460 } else {
3461 for label_name in &pv.labels {
3462 self.enrich_properties_with_generated_columns(
3463 label_name,
3464 &mut pv.props,
3465 prop_manager,
3466 params,
3467 ctx,
3468 )
3469 .await?;
3470 }
3471 let _ = writer
3472 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3473 .await?;
3474 }
3475 }
3476 for (_var_name, pe) in pending_e {
3477 if pe.partial {
3478 writer
3479 .insert_edge_partial_full(
3480 pe.src,
3481 pe.dst,
3482 pe.edge_type_id,
3483 pe.eid,
3484 pe.props,
3485 Some(pe.edge_type_name),
3486 pe.touched,
3487 tx_l0,
3488 )
3489 .await?;
3490 } else {
3491 writer
3492 .insert_edge(
3493 pe.src,
3494 pe.dst,
3495 pe.edge_type_id,
3496 pe.eid,
3497 pe.props,
3498 Some(pe.edge_type_name),
3499 tx_l0,
3500 )
3501 .await?;
3502 }
3503 }
3504
3505 Ok(())
3506 }
3507
3508 /// Flush pending SET state for a single variable to the writer.
3509 ///
3510 /// Called from the SET loop when about to process a Labels /
3511 /// Variable / VariablePlus item on `var`, so the subsequent op
3512 /// sees latest L0 state and ordering is preserved.
3513 #[expect(clippy::too_many_arguments)]
3514 async fn flush_pending_var(
3515 &self,
3516 var: &str,
3517 pending_v: &mut HashMap<String, PendingVertexSet>,
3518 pending_e: &mut HashMap<String, PendingEdgeSet>,
3519 writer: &Writer,
3520 prop_manager: &PropertyManager,
3521 _params: &HashMap<String, Value>,
3522 ctx: Option<&QueryContext>,
3523 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3524 _prefetched: &Prefetch,
3525 ) -> Result<()> {
3526 if let Some(mut pv) = pending_v.remove(var) {
3527 if pv.partial {
3528 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3529 for label_name in &pv.labels {
3530 self.enrich_properties_with_generated_columns(
3531 label_name,
3532 &mut pv.props,
3533 prop_manager,
3534 _params,
3535 ctx,
3536 )
3537 .await?;
3538 }
3539 for k in pv.props.keys() {
3540 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3541 pv.touched.insert(k.clone());
3542 }
3543 }
3544 writer
3545 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3546 .await?;
3547 } else {
3548 for label_name in &pv.labels {
3549 self.enrich_properties_with_generated_columns(
3550 label_name,
3551 &mut pv.props,
3552 prop_manager,
3553 _params,
3554 ctx,
3555 )
3556 .await?;
3557 }
3558 let _ = writer
3559 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3560 .await?;
3561 }
3562 }
3563 if let Some(pe) = pending_e.remove(var) {
3564 if pe.partial {
3565 writer
3566 .insert_edge_partial_full(
3567 pe.src,
3568 pe.dst,
3569 pe.edge_type_id,
3570 pe.eid,
3571 pe.props,
3572 Some(pe.edge_type_name),
3573 pe.touched,
3574 tx_l0,
3575 )
3576 .await?;
3577 } else {
3578 writer
3579 .insert_edge(
3580 pe.src,
3581 pe.dst,
3582 pe.edge_type_id,
3583 pe.eid,
3584 pe.props,
3585 Some(pe.edge_type_name),
3586 tx_l0,
3587 )
3588 .await?;
3589 }
3590 }
3591 Ok(())
3592 }
3593
3594 /// Execute REMOVE clause items (property removal or label removal).
3595 ///
3596 /// Property removals are batched per variable to avoid stale reads: when
3597 /// multiple properties of the same entity are removed in one REMOVE clause,
3598 /// we read from storage once, null all specified properties, and write back
3599 /// once. This prevents the second removal from reading stale data that
3600 /// doesn't reflect the first removal's L0 write.
3601 #[expect(clippy::too_many_arguments)]
3602 pub(crate) async fn execute_remove_items_locked(
3603 &self,
3604 items: &[RemoveItem],
3605 row: &mut HashMap<String, Value>,
3606 writer: &Writer,
3607 prop_manager: &PropertyManager,
3608 ctx: Option<&QueryContext>,
3609 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3610 prefetched: &Prefetch,
3611 ) -> Result<()> {
3612 // Collect property names to remove, grouped by variable.
3613 // Use Vec<(String, Vec<String>)> to preserve insertion order.
3614 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3615
3616 for item in items {
3617 match item {
3618 RemoveItem::Property(expr) => {
3619 if let Expr::Property(var_expr, prop_name) = expr
3620 && let Expr::Variable(var_name) = &**var_expr
3621 {
3622 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3623 entry.1.push(prop_name.clone());
3624 } else {
3625 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3626 }
3627 }
3628 }
3629 RemoveItem::Labels { variable, labels } => {
3630 self.execute_remove_labels(variable, labels, row, ctx)?;
3631 }
3632 }
3633 }
3634
3635 // Execute batched property removals per variable.
3636 for (var_name, prop_names) in &prop_removals {
3637 let Some(node_val) = row.get(var_name) else {
3638 continue;
3639 };
3640
3641 if let Ok(vid) = Self::vid_from_value(node_val) {
3642 // Vertex property removal
3643 let mut props =
3644 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3645
3646 // Only write back if at least one property actually exists
3647 let removed_count = prop_names
3648 .iter()
3649 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3650 .count();
3651 let any_exist = removed_count > 0;
3652 if any_exist {
3653 writer.track_properties_removed(removed_count, tx_l0);
3654 for prop_name in prop_names {
3655 props.insert(prop_name.clone(), Value::Null);
3656 }
3657 }
3658 // Compute effective properties (post-removal) for _all_props
3659 let effective: HashMap<String, Value> = props
3660 .iter()
3661 .filter(|(_, v)| !v.is_null())
3662 .map(|(k, v)| (k.clone(), v.clone()))
3663 .collect();
3664 if any_exist {
3665 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3666 let _ = writer
3667 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3668 .await?;
3669 }
3670
3671 // Update the row map: set removed props to Null
3672 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3673 for prop_name in prop_names {
3674 node_map.insert(prop_name.clone(), Value::Null);
3675 }
3676 // Set _all_props to the complete effective property set
3677 node_map.insert("_all_props".to_string(), Value::Map(effective));
3678 }
3679 } else if let Value::Map(map) = node_val {
3680 // Edge property removal (map-encoded)
3681 // Check for non-null _eid to skip OPTIONAL MATCH null edges
3682 let mut edge_effective: Option<HashMap<String, Value>> = None;
3683 if map.get("_eid").is_some_and(|v| !v.is_null()) {
3684 let ei = self.extract_edge_identity(map)?;
3685 let mut props =
3686 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3687 .await?;
3688
3689 let removed_count = prop_names
3690 .iter()
3691 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3692 .count();
3693 let any_exist = removed_count > 0;
3694 if any_exist {
3695 writer.track_properties_removed(removed_count, tx_l0);
3696 for prop_name in prop_names {
3697 props.insert(prop_name.to_string(), Value::Null);
3698 }
3699 }
3700 // Compute effective properties (post-removal) for _all_props
3701 edge_effective = Some(
3702 props
3703 .iter()
3704 .filter(|(_, v)| !v.is_null())
3705 .map(|(k, v)| (k.clone(), v.clone()))
3706 .collect(),
3707 );
3708 if any_exist {
3709 let edge_type_name = map
3710 .get("_type")
3711 .and_then(|v| v.as_str())
3712 .map(|s| s.to_string())
3713 .or_else(|| {
3714 self.storage
3715 .schema_manager()
3716 .edge_type_name_by_id_unified(ei.edge_type_id)
3717 });
3718 writer
3719 .insert_edge(
3720 ei.src,
3721 ei.dst,
3722 ei.edge_type_id,
3723 ei.eid,
3724 props,
3725 edge_type_name,
3726 tx_l0,
3727 )
3728 .await?;
3729 }
3730 }
3731
3732 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3733 for prop_name in prop_names {
3734 edge_map.insert(prop_name.clone(), Value::Null);
3735 }
3736 if let Some(effective) = edge_effective {
3737 edge_map.insert("_all_props".to_string(), Value::Map(effective));
3738 }
3739 }
3740 } else if let Value::Edge(edge) = node_val {
3741 // Edge property removal (Value::Edge)
3742 let eid = edge.eid;
3743 let src = edge.src;
3744 let dst = edge.dst;
3745 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3746
3747 let mut props =
3748 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3749
3750 let removed_count = prop_names
3751 .iter()
3752 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3753 .count();
3754 if removed_count > 0 {
3755 writer.track_properties_removed(removed_count, tx_l0);
3756 for prop_name in prop_names {
3757 props.insert(prop_name.to_string(), Value::Null);
3758 }
3759 writer
3760 .insert_edge(
3761 src,
3762 dst,
3763 etype,
3764 eid,
3765 props,
3766 Some(edge.edge_type.clone()),
3767 tx_l0,
3768 )
3769 .await?;
3770 }
3771
3772 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3773 for prop_name in prop_names {
3774 edge.properties.insert(prop_name.to_string(), Value::Null);
3775 }
3776 }
3777 }
3778 }
3779
3780 Ok(())
3781 }
3782
3783 /// Execute label removal.
3784 pub(crate) fn execute_remove_labels(
3785 &self,
3786 variable: &str,
3787 labels: &[String],
3788 row: &mut HashMap<String, Value>,
3789 ctx: Option<&QueryContext>,
3790 ) -> Result<()> {
3791 if let Some(node_val) = row.get(variable)
3792 && let Ok(vid) = Self::vid_from_value(node_val)
3793 {
3794 reject_if_ephemeral_vid(vid)?;
3795 let registry = self
3796 .procedure_registry
3797 .as_ref()
3798 .and_then(|pr| pr.plugin_registry());
3799 reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3800
3801 // Get current labels from node value
3802 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3803
3804 // Determine which labels to actually remove (only those currently present)
3805 let labels_to_remove: Vec<_> = labels
3806 .iter()
3807 .filter(|l| current_labels.contains(l))
3808 .collect();
3809
3810 if !labels_to_remove.is_empty() {
3811 // Resolve the FULL remaining label set and write it to the
3812 // TRANSACTION buffer (transactional + OCC-conflictable), falling
3813 // back to the context (main) L0 for non-transactional callers.
3814 let remaining_labels: Vec<String> = current_labels
3815 .iter()
3816 .filter(|l| !labels_to_remove.contains(l))
3817 .cloned()
3818 .collect();
3819 if let Some(ctx) = ctx {
3820 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3821 l0.write().set_vertex_labels(vid, &remaining_labels);
3822 }
3823
3824 // Update the node value in the row with the remaining labels.
3825 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3826 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3827 obj.insert("_labels".to_string(), Value::List(labels_list));
3828 }
3829 }
3830 }
3831 Ok(())
3832 }
3833
3834 /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3835 /// by looking up the type from the L0 buffer's edge endpoints.
3836 fn resolve_edge_type_id_for_edge(
3837 &self,
3838 edge: &crate::types::Edge,
3839 writer: &Writer,
3840 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3841 ) -> Result<u32> {
3842 if !edge.edge_type.is_empty() {
3843 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3844 }
3845 // Edge type name is empty (e.g., from anonymous MATCH patterns).
3846 // Look up the edge type ID from the L0 buffer's edge endpoints.
3847 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3848 return Ok(etype);
3849 }
3850 Err(anyhow!(
3851 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3852 edge.eid
3853 ))
3854 }
3855
3856 /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3857 pub(crate) async fn execute_delete_item_locked(
3858 &self,
3859 val: &Value,
3860 detach: bool,
3861 writer: &Writer,
3862 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3863 ) -> Result<()> {
3864 match val {
3865 Value::Null => {
3866 // DELETE null is a no-op per OpenCypher spec
3867 }
3868 Value::Path(path) => {
3869 // Delete path edges first, then nodes
3870 for edge in &path.edges {
3871 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3872 writer
3873 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3874 .await?;
3875 }
3876 for node in &path.nodes {
3877 self.execute_delete_vertex(
3878 node.vid,
3879 detach,
3880 Some(node.labels.clone()),
3881 writer,
3882 tx_l0,
3883 )
3884 .await?;
3885 }
3886 }
3887 _ => {
3888 // Try Path reconstruction from Map first (Arrow loses Path type)
3889 if let Ok(path) = Path::try_from(val) {
3890 for edge in &path.edges {
3891 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3892 writer
3893 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3894 .await?;
3895 }
3896 for node in &path.nodes {
3897 self.execute_delete_vertex(
3898 node.vid,
3899 detach,
3900 Some(node.labels.clone()),
3901 writer,
3902 tx_l0,
3903 )
3904 .await?;
3905 }
3906 } else if let Ok(vid) = Self::vid_from_value(val) {
3907 let labels = Self::extract_labels_from_node(val);
3908 self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3909 .await?;
3910 } else if let Value::Map(map) = val {
3911 self.execute_delete_edge_from_map(map, writer, tx_l0)
3912 .await?;
3913 } else if let Value::Edge(edge) = val {
3914 reject_if_ephemeral_eid(edge.eid)?;
3915 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3916 let registry = self
3917 .procedure_registry
3918 .as_ref()
3919 .and_then(|pr| pr.plugin_registry());
3920 reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3921 writer
3922 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3923 .await?;
3924 }
3925 }
3926 }
3927 Ok(())
3928 }
3929
3930 /// Execute vertex deletion with optional detach.
3931 pub(crate) async fn execute_delete_vertex(
3932 &self,
3933 vid: Vid,
3934 detach: bool,
3935 labels: Option<Vec<String>>,
3936 writer: &Writer,
3937 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3938 ) -> Result<()> {
3939 reject_if_ephemeral_vid(vid)?;
3940 if let Some(ls) = labels.as_deref() {
3941 let registry = self
3942 .procedure_registry
3943 .as_ref()
3944 .and_then(|pr| pr.plugin_registry());
3945 reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3946 }
3947 if detach {
3948 self.detach_delete_vertex(vid, writer, tx_l0).await?;
3949 } else {
3950 self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3951 }
3952 writer.delete_vertex(vid, labels, tx_l0).await?;
3953 Ok(())
3954 }
3955
3956 /// Check that a vertex has no edges (required for non-DETACH DELETE).
3957 ///
3958 /// Loads the subgraph from storage, then excludes edges that have been
3959 /// tombstoned in the writer's L0 or the transaction's L0. This ensures
3960 /// edges deleted earlier in the same DELETE clause are properly excluded.
3961 pub(crate) async fn check_vertex_has_no_edges(
3962 &self,
3963 vid: Vid,
3964 writer: &Writer,
3965 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3966 ) -> Result<()> {
3967 let schema = self.storage.schema_manager().schema();
3968 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
3969
3970 // Collect tombstoned edge IDs from both the writer L0 and tx L0.
3971 let mut tombstoned_eids = std::collections::HashSet::new();
3972 {
3973 let writer_l0 = writer.l0_manager.get_current();
3974 let guard = writer_l0.read();
3975 for &eid in guard.tombstones.keys() {
3976 tombstoned_eids.insert(eid);
3977 }
3978 }
3979 if let Some(tx) = tx_l0 {
3980 let guard = tx.read();
3981 for &eid in guard.tombstones.keys() {
3982 tombstoned_eids.insert(eid);
3983 }
3984 }
3985
3986 let out_graph = self
3987 .storage
3988 .load_subgraph_cached(
3989 &[vid],
3990 &edge_type_ids,
3991 1,
3992 uni_store::runtime::Direction::Outgoing,
3993 Some(writer.l0_manager.get_current()),
3994 )
3995 .await?;
3996 let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3997
3998 let in_graph = self
3999 .storage
4000 .load_subgraph_cached(
4001 &[vid],
4002 &edge_type_ids,
4003 1,
4004 uni_store::runtime::Direction::Incoming,
4005 Some(writer.l0_manager.get_current()),
4006 )
4007 .await?;
4008 let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4009
4010 if has_out || has_in {
4011 return Err(anyhow!(
4012 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
4013 vid
4014 ));
4015 }
4016 Ok(())
4017 }
4018
4019 /// Execute edge deletion from a map representation.
4020 pub(crate) async fn execute_delete_edge_from_map(
4021 &self,
4022 map: &HashMap<String, Value>,
4023 writer: &Writer,
4024 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4025 ) -> Result<()> {
4026 // Check for non-null _eid to skip OPTIONAL MATCH null edges
4027 if map.get("_eid").is_some_and(|v| !v.is_null()) {
4028 let ei = self.extract_edge_identity(map)?;
4029 reject_if_ephemeral_eid(ei.eid)?;
4030 let registry = self
4031 .procedure_registry
4032 .as_ref()
4033 .and_then(|pr| pr.plugin_registry());
4034 reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
4035 writer
4036 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
4037 .await?;
4038 }
4039 Ok(())
4040 }
4041
4042 /// Build a scan plan node.
4043 ///
4044 /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
4045 /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
4046 /// - `label_id == 0` without labels: unlabeled → `ScanAll`
4047 fn make_scan_plan(
4048 label_id: u16,
4049 labels: Vec<String>,
4050 variable: String,
4051 filter: Option<Expr>,
4052 ) -> LogicalPlan {
4053 if label_id > 0 {
4054 LogicalPlan::Scan {
4055 label_id,
4056 labels,
4057 variable,
4058 filter,
4059 optional: false,
4060 }
4061 } else if !labels.is_empty() {
4062 // Schemaless label: use ScanMainByLabels to filter by label name
4063 LogicalPlan::ScanMainByLabels {
4064 labels,
4065 variable,
4066 filter,
4067 optional: false,
4068 }
4069 } else {
4070 LogicalPlan::ScanAll {
4071 variable,
4072 filter,
4073 optional: false,
4074 }
4075 }
4076 }
4077
4078 /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
4079 /// already contains prior operators.
4080 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
4081 if matches!(plan, LogicalPlan::Empty) {
4082 scan
4083 } else {
4084 LogicalPlan::CrossJoin {
4085 left: Box::new(plan),
4086 right: Box::new(scan),
4087 }
4088 }
4089 }
4090
4091 /// Resolve MERGE property map expressions against the current row context.
4092 ///
4093 /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
4094 /// property expressions that reference bound variables. These need to be
4095 /// evaluated to concrete literal values before being converted to filter
4096 /// expressions by `properties_to_expr()`.
4097 async fn resolve_merge_properties(
4098 &self,
4099 properties: &Option<Expr>,
4100 row: &HashMap<String, Value>,
4101 prop_manager: &PropertyManager,
4102 params: &HashMap<String, Value>,
4103 ctx: Option<&QueryContext>,
4104 ) -> Result<Option<Expr>> {
4105 let entries = match properties {
4106 Some(Expr::Map(entries)) => entries,
4107 other => return Ok(other.clone()),
4108 };
4109 let mut resolved = Vec::new();
4110 for (key, val_expr) in entries {
4111 if matches!(val_expr, Expr::Literal(_)) {
4112 resolved.push((key.clone(), val_expr.clone()));
4113 } else {
4114 let value = self
4115 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
4116 .await?;
4117 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
4118 }
4119 }
4120 Ok(Some(Expr::Map(resolved)))
4121 }
4122
4123 /// Convert a runtime Value back to an AST literal expression.
4124 fn value_to_literal_expr(value: &Value) -> Expr {
4125 match value {
4126 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
4127 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
4128 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
4129 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
4130 Value::Null => Expr::Literal(CypherLiteral::Null),
4131 Value::List(items) => {
4132 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
4133 }
4134 Value::Map(entries) => Expr::Map(
4135 entries
4136 .iter()
4137 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
4138 .collect(),
4139 ),
4140 _ => Expr::Literal(CypherLiteral::Null),
4141 }
4142 }
4143
4144 pub(crate) async fn execute_merge_match(
4145 &self,
4146 pattern: &Pattern,
4147 row: &HashMap<String, Value>,
4148 prop_manager: &PropertyManager,
4149 params: &HashMap<String, Value>,
4150 ctx: Option<&QueryContext>,
4151 ) -> Result<Vec<HashMap<String, Value>>> {
4152 // Construct a LogicalPlan for the MATCH part of MERGE
4153 let planner =
4154 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
4155
4156 // We need to construct a CypherQuery to use the planner's plan() method,
4157 // or we can manually construct the LogicalPlan.
4158 // Manual construction is safer as we don't have to round-trip through AST.
4159
4160 let mut plan = LogicalPlan::Empty;
4161 let mut vars_in_scope = Vec::new();
4162
4163 // Add existing bound variables from row to scope
4164 for key in row.keys() {
4165 vars_in_scope.push(key.clone());
4166 }
4167
4168 // Reconstruct Match logic from Planner (simplified for MERGE pattern)
4169 for path in &pattern.paths {
4170 let elements = &path.elements;
4171 let mut i = 0;
4172 while i < elements.len() {
4173 let part = &elements[i];
4174 match part {
4175 PatternElement::Node(n) => {
4176 let variable = n.variable.clone().unwrap_or_default();
4177
4178 // If variable is already bound in the input row, we filter
4179 let is_bound = !variable.is_empty() && row.contains_key(&variable);
4180
4181 if is_bound {
4182 // If bound, we must Scan this specific VID to start the chain
4183 // Extract VID from row
4184 let val = row.get(&variable).unwrap();
4185 let vid = Self::vid_from_value(val)?;
4186
4187 // In the new storage model, VIDs don't embed label info.
4188 // We get label from the node value if available, otherwise use 0 to scan all.
4189 let extracted_labels =
4190 Self::extract_labels_from_node(val).unwrap_or_default();
4191 let label_id = {
4192 let schema = self.storage.schema_manager().schema();
4193 extracted_labels
4194 .first()
4195 .and_then(|l| schema.label_id_by_name(l))
4196 .unwrap_or(0)
4197 };
4198
4199 let resolved_props = self
4200 .resolve_merge_properties(
4201 &n.properties,
4202 row,
4203 prop_manager,
4204 params,
4205 ctx,
4206 )
4207 .await?;
4208 let prop_filter =
4209 planner.properties_to_expr(&variable, &resolved_props);
4210
4211 // Create a filter expression for VID: variable._vid = vid
4212 // But our expression engine handles `Expr::Variable` as column.
4213 // We can inject a filter `id(variable) = vid` if we had `id()` function.
4214 // Or we use internal property `_vid`.
4215
4216 // Note: Scan supports `filter`.
4217 // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4218
4219 let vid_filter = Expr::BinaryOp {
4220 left: Box::new(Expr::Property(
4221 Box::new(Expr::Variable(variable.clone())),
4222 "_vid".to_string(),
4223 )),
4224 op: BinaryOp::Eq,
4225 right: Box::new(Expr::Literal(CypherLiteral::Integer(
4226 vid.as_u64() as i64,
4227 ))),
4228 };
4229
4230 let combined_filter = if let Some(pf) = prop_filter {
4231 Some(Expr::BinaryOp {
4232 left: Box::new(vid_filter),
4233 op: BinaryOp::And,
4234 right: Box::new(pf),
4235 })
4236 } else {
4237 Some(vid_filter)
4238 };
4239
4240 let scan = Self::make_scan_plan(
4241 label_id,
4242 extracted_labels,
4243 variable.clone(),
4244 combined_filter,
4245 );
4246 plan = Self::attach_scan(plan, scan);
4247 } else {
4248 let label_id = if n.labels.is_empty() {
4249 // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4250 0
4251 } else {
4252 let label_name = &n.labels[0];
4253 let schema = self.storage.schema_manager().schema();
4254 if self.config.strict_schema {
4255 schema
4256 .get_label_case_insensitive(label_name)
4257 .map(|m| m.id)
4258 .ok_or_else(|| {
4259 anyhow!(
4260 "Label '{}' is not defined in the schema \
4261 (strict_schema is enabled). \
4262 Declare it with db.schema().label(...).apply() first.",
4263 label_name
4264 )
4265 })?
4266 } else {
4267 // Fall back to label_id 0 (any/schemaless) when not in schema.
4268 schema
4269 .get_label_case_insensitive(label_name)
4270 .map(|m| m.id)
4271 .unwrap_or(0)
4272 }
4273 };
4274
4275 let resolved_props = self
4276 .resolve_merge_properties(
4277 &n.properties,
4278 row,
4279 prop_manager,
4280 params,
4281 ctx,
4282 )
4283 .await?;
4284 let prop_filter =
4285 planner.properties_to_expr(&variable, &resolved_props);
4286 let scan = Self::make_scan_plan(
4287 label_id,
4288 n.labels.names().to_vec(),
4289 variable.clone(),
4290 prop_filter,
4291 );
4292 plan = Self::attach_scan(plan, scan);
4293
4294 // Add label filters when:
4295 // 1. Multiple labels with a known schema label: filter for
4296 // additional labels (Scan only scans by the first label).
4297 // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4298 // nodes, so we must filter to only those with the
4299 // specified label(s).
4300 if !n.labels.is_empty()
4301 && !variable.is_empty()
4302 && (label_id == 0 || n.labels.len() > 1)
4303 && let Some(label_filter) =
4304 planner.node_filter_expr(&variable, &n.labels, &None)
4305 {
4306 plan = LogicalPlan::Filter {
4307 input: Box::new(plan),
4308 predicate: label_filter,
4309 optional_variables: std::collections::HashSet::new(),
4310 };
4311 }
4312
4313 if !variable.is_empty() {
4314 vars_in_scope.push(variable.clone());
4315 }
4316 }
4317
4318 // Now look ahead for relationship
4319 i += 1;
4320 while i < elements.len() {
4321 if let PatternElement::Relationship(r) = &elements[i] {
4322 let target_node_part = &elements[i + 1];
4323 if let PatternElement::Node(n_target) = target_node_part {
4324 let schema = self.storage.schema_manager().schema();
4325 let mut edge_type_ids = Vec::new();
4326
4327 if r.types.is_empty() {
4328 return Err(anyhow!("MERGE edge must have a type"));
4329 } else if r.types.len() > 1 {
4330 return Err(anyhow!(
4331 "MERGE does not support multiple edge types"
4332 ));
4333 } else {
4334 let type_name = &r.types[0];
4335 let type_id = if self.config.strict_schema {
4336 let s = self.storage.schema_manager().schema();
4337 s.edge_type_id_by_name_case_insensitive(type_name)
4338 .ok_or_else(|| {
4339 anyhow!(
4340 "Edge type '{}' is not defined in the schema \
4341 (strict_schema is enabled).",
4342 type_name
4343 )
4344 })?
4345 } else {
4346 // Schemaless: assign new ID if not found.
4347 self.storage
4348 .schema_manager()
4349 .get_or_assign_edge_type_id(type_name)
4350 };
4351 edge_type_ids.push(type_id);
4352 }
4353
4354 // Resolve target label ID. For schemaless labels (not in the
4355 // schema), fall back to 0 which means "any label" in traversal.
4356 let target_label_id: u16 = if let Some(lbl) =
4357 n_target.labels.first()
4358 {
4359 schema
4360 .get_label_case_insensitive(lbl)
4361 .map(|m| m.id)
4362 .unwrap_or(0)
4363 } else if let Some(var) = &n_target.variable {
4364 if let Some(val) = row.get(var) {
4365 // In the new storage model, get labels from node value
4366 if let Some(labels) =
4367 Self::extract_labels_from_node(val)
4368 {
4369 if let Some(first_label) = labels.first() {
4370 schema
4371 .get_label_case_insensitive(first_label)
4372 .map(|m| m.id)
4373 .unwrap_or(0)
4374 } else {
4375 // Bound node with no labels — schemaless, any
4376 0
4377 }
4378 } else if Self::vid_from_value(val).is_ok() {
4379 // VID without label info — schemaless, any
4380 0
4381 } else {
4382 return Err(anyhow!(
4383 "Variable {} is not a node",
4384 var
4385 ));
4386 }
4387 } else {
4388 return Err(anyhow!(
4389 "MERGE pattern node must have a label or be a bound variable"
4390 ));
4391 }
4392 } else {
4393 return Err(anyhow!(
4394 "MERGE pattern node must have a label"
4395 ));
4396 };
4397
4398 let target_variable =
4399 n_target.variable.clone().unwrap_or_default();
4400 let source_variable = match &elements[i - 1] {
4401 PatternElement::Node(n) => {
4402 n.variable.clone().unwrap_or_default()
4403 }
4404 _ => String::new(),
4405 };
4406
4407 let is_variable_length = r.range.is_some();
4408 let type_name = &r.types[0];
4409
4410 // Use TraverseMainByType for schemaless edge types
4411 // (same as MATCH planner) so edge properties are loaded
4412 // correctly from storage + L0 via the adjacency map.
4413 // Regular Traverse only loads properties via
4414 // property_manager which doesn't handle schemaless types.
4415 let is_schemaless = edge_type_ids.iter().all(|id| {
4416 uni_common::core::edge_type::is_schemaless_edge_type(*id)
4417 });
4418
4419 if is_schemaless {
4420 plan = LogicalPlan::TraverseMainByType {
4421 type_names: vec![type_name.clone()],
4422 input: Box::new(plan),
4423 direction: r.direction.clone(),
4424 source_variable,
4425 target_variable: target_variable.clone(),
4426 step_variable: r.variable.clone(),
4427 min_hops: r
4428 .range
4429 .as_ref()
4430 .and_then(|r| r.min)
4431 .unwrap_or(1)
4432 as usize,
4433 max_hops: r
4434 .range
4435 .as_ref()
4436 .and_then(|r| r.max)
4437 .unwrap_or(1)
4438 as usize,
4439 optional: false,
4440 target_filter: None,
4441 path_variable: None,
4442 is_variable_length,
4443 optional_pattern_vars: std::collections::HashSet::new(),
4444 scope_match_variables: std::collections::HashSet::new(),
4445 edge_filter_expr: None,
4446 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4447 };
4448 } else {
4449 // Collect edge property names needed for MERGE filter
4450 let mut edge_props = std::collections::HashSet::new();
4451 if let Some(Expr::Map(entries)) = &r.properties {
4452 for (key, _) in entries {
4453 edge_props.insert(key.clone());
4454 }
4455 }
4456 plan = LogicalPlan::Traverse {
4457 input: Box::new(plan),
4458 edge_type_ids: edge_type_ids.clone(),
4459 direction: r.direction.clone(),
4460 source_variable,
4461 target_variable: target_variable.clone(),
4462 target_label_id,
4463 step_variable: r.variable.clone(),
4464 min_hops: r
4465 .range
4466 .as_ref()
4467 .and_then(|r| r.min)
4468 .unwrap_or(1)
4469 as usize,
4470 max_hops: r
4471 .range
4472 .as_ref()
4473 .and_then(|r| r.max)
4474 .unwrap_or(1)
4475 as usize,
4476 optional: false,
4477 target_filter: None,
4478 path_variable: None,
4479 edge_properties: edge_props,
4480 is_variable_length,
4481 optional_pattern_vars: std::collections::HashSet::new(),
4482 scope_match_variables: std::collections::HashSet::new(),
4483 edge_filter_expr: None,
4484 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4485 qpp_steps: None,
4486 };
4487 }
4488
4489 // Apply property filters for relationship
4490 if r.properties.is_some()
4491 && let Some(r_var) = &r.variable
4492 {
4493 let resolved_rel_props = self
4494 .resolve_merge_properties(
4495 &r.properties,
4496 row,
4497 prop_manager,
4498 params,
4499 ctx,
4500 )
4501 .await?;
4502 if let Some(prop_filter) =
4503 planner.properties_to_expr(r_var, &resolved_rel_props)
4504 {
4505 plan = LogicalPlan::Filter {
4506 input: Box::new(plan),
4507 predicate: prop_filter,
4508 optional_variables: std::collections::HashSet::new(
4509 ),
4510 };
4511 }
4512 }
4513
4514 // Apply property filters for target node if it was new
4515 if !target_variable.is_empty() {
4516 let resolved_target_props = self
4517 .resolve_merge_properties(
4518 &n_target.properties,
4519 row,
4520 prop_manager,
4521 params,
4522 ctx,
4523 )
4524 .await?;
4525 if let Some(prop_filter) = planner.properties_to_expr(
4526 &target_variable,
4527 &resolved_target_props,
4528 ) {
4529 plan = LogicalPlan::Filter {
4530 input: Box::new(plan),
4531 predicate: prop_filter,
4532 optional_variables: std::collections::HashSet::new(
4533 ),
4534 };
4535 }
4536 vars_in_scope.push(target_variable.clone());
4537 }
4538
4539 if let Some(sv) = &r.variable {
4540 vars_in_scope.push(sv.clone());
4541 }
4542 i += 2;
4543 } else {
4544 break;
4545 }
4546 } else {
4547 break;
4548 }
4549 }
4550 }
4551 _ => return Err(anyhow!("Pattern must start with a node")),
4552 }
4553 }
4554
4555 // Execute the plan to find all matches, then filter against bound variables in `row`.
4556 }
4557
4558 let db_matches = self
4559 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4560 .await?;
4561
4562 // Keep only DB results that are consistent with the input row bindings.
4563 // Skip internal keys (starting with "__") as they are implementation
4564 // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4565 // Also skip the empty-string key (""), which is the placeholder variable
4566 // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4567 // and must not constrain the current pattern's match.
4568 let final_matches = db_matches
4569 .into_iter()
4570 .filter(|db_match| {
4571 row.iter().all(|(key, val)| {
4572 if key.is_empty() || key.starts_with("__") {
4573 return true;
4574 }
4575 let Some(db_val) = db_match.get(key) else {
4576 return true;
4577 };
4578 if db_val == val {
4579 return true;
4580 }
4581 // Values differ -- treat as consistent if they represent the same VID
4582 matches!(
4583 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4584 (Ok(v1), Ok(v2)) if v1 == v2
4585 )
4586 })
4587 })
4588 .map(|db_match| {
4589 let mut merged = row.clone();
4590 merged.extend(db_match);
4591 merged
4592 })
4593 .collect();
4594
4595 Ok(final_matches)
4596 }
4597
4598 /// Prepare a MERGE pattern for path variable binding.
4599 ///
4600 /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4601 /// unnamed relationships need internal variable names so that `execute_create_pattern`
4602 /// stores the edge data in the row for later path construction.
4603 ///
4604 /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4605 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4606 let has_path_vars = pattern
4607 .paths
4608 .iter()
4609 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4610
4611 if !has_path_vars {
4612 return (pattern.clone(), Vec::new());
4613 }
4614
4615 let mut modified = pattern.clone();
4616 let mut temp_vars = Vec::new();
4617
4618 for path in &mut modified.paths {
4619 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4620 continue;
4621 }
4622 for (idx, element) in path.elements.iter_mut().enumerate() {
4623 if let PatternElement::Relationship(r) = element
4624 && r.variable.as_ref().is_none_or(String::is_empty)
4625 {
4626 let temp_var = format!("__path_r_{}", idx);
4627 r.variable = Some(temp_var.clone());
4628 temp_vars.push(temp_var);
4629 }
4630 }
4631 }
4632
4633 (modified, temp_vars)
4634 }
4635
4636 /// Bind path variables in the result row based on the MERGE pattern.
4637 ///
4638 /// Walks each path in the pattern, collects node/edge values from the row
4639 /// by variable name, and constructs a `Value::Path`.
4640 fn bind_path_variables(
4641 pattern: &Pattern,
4642 row: &mut HashMap<String, Value>,
4643 temp_vars: &[String],
4644 ) {
4645 for path in &pattern.paths {
4646 let Some(path_var) = path.variable.as_ref() else {
4647 continue;
4648 };
4649 if path_var.is_empty() {
4650 continue;
4651 }
4652
4653 let mut nodes = Vec::new();
4654 let mut edges = Vec::new();
4655
4656 for element in &path.elements {
4657 match element {
4658 PatternElement::Node(n) => {
4659 if let Some(var) = &n.variable
4660 && let Some(val) = row.get(var)
4661 && let Some(node) = Self::value_to_node_for_path(val)
4662 {
4663 nodes.push(node);
4664 }
4665 }
4666 PatternElement::Relationship(r) => {
4667 if let Some(var) = &r.variable
4668 && let Some(val) = row.get(var)
4669 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4670 {
4671 edges.push(edge);
4672 }
4673 }
4674 _ => {}
4675 }
4676 }
4677
4678 if !nodes.is_empty() {
4679 use uni_common::value::Path;
4680 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4681 }
4682 }
4683
4684 // Clean up internal temp variables
4685 for var in temp_vars {
4686 row.remove(var);
4687 }
4688 }
4689
4690 /// Convert a Value (Map or Node) to a Node for path construction.
4691 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4692 match val {
4693 Value::Node(n) => Some(n.clone()),
4694 Value::Map(map) => {
4695 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4696 let labels = if let Some(Value::List(l)) = map.get("_labels") {
4697 l.iter()
4698 .filter_map(|v| {
4699 if let Value::String(s) = v {
4700 Some(s.clone())
4701 } else {
4702 None
4703 }
4704 })
4705 .collect()
4706 } else {
4707 vec![]
4708 };
4709 let properties: HashMap<String, Value> = map
4710 .iter()
4711 .filter(|(k, _)| !k.starts_with('_'))
4712 .map(|(k, v)| (k.clone(), v.clone()))
4713 .collect();
4714 Some(uni_common::value::Node {
4715 vid,
4716 labels,
4717 properties,
4718 })
4719 }
4720 _ => None,
4721 }
4722 }
4723
4724 /// Convert a Value (Map or Edge) to an Edge for path construction.
4725 fn value_to_edge_for_path(
4726 val: &Value,
4727 type_names: &[String],
4728 ) -> Option<uni_common::value::Edge> {
4729 match val {
4730 Value::Edge(e) => Some(e.clone()),
4731 Value::Map(map) => {
4732 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4733 let edge_type = map
4734 .get("_type_name")
4735 .and_then(|v| {
4736 if let Value::String(s) = v {
4737 Some(s.clone())
4738 } else {
4739 None
4740 }
4741 })
4742 .or_else(|| type_names.first().cloned())
4743 .unwrap_or_default();
4744 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4745 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4746 let properties: HashMap<String, Value> = map
4747 .iter()
4748 .filter(|(k, _)| !k.starts_with('_'))
4749 .map(|(k, v)| (k.clone(), v.clone()))
4750 .collect();
4751 Some(uni_common::value::Edge {
4752 eid,
4753 edge_type,
4754 src,
4755 dst,
4756 properties,
4757 })
4758 }
4759 _ => None,
4760 }
4761 }
4762}
4763
4764/// Read a vertex's full property map, preferring `prefetched` over a fresh
4765/// per-row `Backend::scan`.
4766///
4767/// `prefetched` is built once at the top of `apply_mutations` via
4768/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4769/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4770/// same `apply_mutations` invocation (counter increments, same-VID
4771/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4772/// storage state at SET entry. On a miss, fall back to the existing
4773/// per-row path; this preserves correctness for newly created VIDs,
4774/// schemaless rows, multi-label corner cases, and non-Mutation callers
4775/// that pass `&Prefetch::default()`.
4776pub(crate) async fn read_vertex_props_with_prefetch(
4777 vid: Vid,
4778 prefetched: &Prefetch,
4779 prop_manager: &PropertyManager,
4780 ctx: Option<&QueryContext>,
4781) -> Result<uni_common::Properties> {
4782 match prefetched.vertex.get(&vid).cloned() {
4783 Some(mut base) => {
4784 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4785 for (k, v) in l0 {
4786 base.insert(k, v);
4787 }
4788 }
4789 Ok(base)
4790 }
4791 None => Ok(prop_manager
4792 .get_all_vertex_props_with_ctx(vid, ctx)
4793 .await?
4794 .unwrap_or_default()),
4795 }
4796}
4797
4798/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4799/// in L0 edge props so writes from earlier rows of the same
4800/// `apply_mutations` invocation take precedence. On a miss, fall back to
4801/// the per-EID storage path.
4802pub(crate) async fn read_edge_props_with_prefetch(
4803 eid: Eid,
4804 prefetched: &Prefetch,
4805 prop_manager: &PropertyManager,
4806 ctx: Option<&QueryContext>,
4807) -> Result<uni_common::Properties> {
4808 match prefetched.edge.get(&eid).cloned() {
4809 Some(mut base) => {
4810 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4811 for (k, v) in l0 {
4812 base.insert(k, v);
4813 }
4814 }
4815 Ok(base)
4816 }
4817 None => Ok(prop_manager
4818 .get_all_edge_props_with_ctx(eid, ctx)
4819 .await?
4820 .unwrap_or_default()),
4821 }
4822}
4823
4824#[cfg(test)]
4825mod tests {
4826 use super::*;
4827
4828 // ── merge_props tests ────────────────────────────────────────────
4829
4830 #[test]
4831 fn test_merge_props_replace_tombstones_missing_keys() {
4832 let current: HashMap<String, Value> = [
4833 ("name".into(), Value::String("Alice".into())),
4834 ("age".into(), Value::Int(30)),
4835 ]
4836 .into();
4837 let incoming: HashMap<String, Value> =
4838 [("name".into(), Value::String("Bob".into()))].into();
4839
4840 let result = Executor::merge_props(current, incoming, true);
4841 assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4842 assert_eq!(
4843 result.get("age"),
4844 Some(&Value::Null),
4845 "Missing keys should be tombstoned in replace mode"
4846 );
4847 }
4848
4849 #[test]
4850 fn test_merge_props_merge_preserves_existing() {
4851 let current: HashMap<String, Value> = [
4852 ("name".into(), Value::String("Alice".into())),
4853 ("age".into(), Value::Int(30)),
4854 ]
4855 .into();
4856 let incoming: HashMap<String, Value> =
4857 [("city".into(), Value::String("NYC".into()))].into();
4858
4859 let result = Executor::merge_props(current, incoming, false);
4860 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4861 assert_eq!(result.get("age"), Some(&Value::Int(30)));
4862 assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4863 }
4864
4865 #[test]
4866 fn test_merge_props_null_incoming_is_tombstone() {
4867 let current: HashMap<String, Value> =
4868 [("name".into(), Value::String("Alice".into()))].into();
4869 let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4870
4871 // Merge mode: null overwrites
4872 let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4873 assert_eq!(result.get("name"), Some(&Value::Null));
4874
4875 // Replace mode: null is tombstone
4876 let result = Executor::merge_props(current, incoming, true);
4877 assert_eq!(result.get("name"), Some(&Value::Null));
4878 }
4879
4880 #[test]
4881 fn test_merge_props_empty_current() {
4882 let current: HashMap<String, Value> = HashMap::new();
4883 let incoming: HashMap<String, Value> =
4884 [("name".into(), Value::String("Alice".into()))].into();
4885
4886 let result = Executor::merge_props(current, incoming, false);
4887 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4888 assert_eq!(result.len(), 1);
4889 }
4890
4891 #[test]
4892 fn test_merge_props_empty_incoming_replace_tombstones_all() {
4893 let current: HashMap<String, Value> = [
4894 ("name".into(), Value::String("Alice".into())),
4895 ("age".into(), Value::Int(30)),
4896 ]
4897 .into();
4898 let incoming: HashMap<String, Value> = HashMap::new();
4899
4900 let result = Executor::merge_props(current, incoming, true);
4901 assert_eq!(result.get("name"), Some(&Value::Null));
4902 assert_eq!(result.get("age"), Some(&Value::Null));
4903 }
4904
4905 // ── extract_labels_from_node tests ───────────────────────────────
4906
4907 #[test]
4908 fn test_extract_labels_from_map() {
4909 let mut map = HashMap::new();
4910 map.insert("_vid".into(), Value::Int(1));
4911 map.insert(
4912 "_labels".into(),
4913 Value::List(vec![
4914 Value::String("Person".into()),
4915 Value::String("Employee".into()),
4916 ]),
4917 );
4918 let val = Value::Map(map);
4919
4920 let labels = Executor::extract_labels_from_node(&val);
4921 assert_eq!(
4922 labels,
4923 Some(vec!["Person".to_string(), "Employee".to_string()])
4924 );
4925 }
4926
4927 #[test]
4928 fn test_extract_labels_from_value_node() {
4929 let node = uni_common::Node {
4930 vid: uni_common::core::id::Vid::from(1u64),
4931 labels: vec!["Person".to_string()],
4932 properties: HashMap::new(),
4933 };
4934 let labels = Executor::extract_labels_from_node(&Value::Node(node));
4935 assert_eq!(labels, Some(vec!["Person".to_string()]));
4936 }
4937
4938 #[test]
4939 fn test_extract_labels_non_node_returns_none() {
4940 assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4941 assert_eq!(
4942 Executor::extract_labels_from_node(&Value::String("hello".into())),
4943 None
4944 );
4945 }
4946
4947 // ── extract_user_properties_from_value tests ─────────────────────
4948
4949 #[test]
4950 fn test_extract_user_props_strips_internal_keys() {
4951 let mut map = HashMap::new();
4952 map.insert("_vid".into(), Value::Int(1));
4953 map.insert(
4954 "_labels".into(),
4955 Value::List(vec![Value::String("Person".into())]),
4956 );
4957 map.insert("name".into(), Value::String("Alice".into()));
4958 map.insert("age".into(), Value::Int(30));
4959
4960 let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4961 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4962 assert_eq!(props.get("age"), Some(&Value::Int(30)));
4963 assert!(!props.contains_key("_vid"));
4964 assert!(!props.contains_key("_labels"));
4965 }
4966
4967 #[test]
4968 fn test_extract_user_props_plain_map_returns_as_is() {
4969 let mut map = HashMap::new();
4970 map.insert("key".into(), Value::String("value".into()));
4971
4972 let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
4973 assert_eq!(props, map);
4974 }
4975
4976 #[test]
4977 fn test_extract_user_props_from_value_node() {
4978 let mut properties = HashMap::new();
4979 properties.insert("name".into(), Value::String("Alice".into()));
4980 let node = uni_common::Node {
4981 vid: uni_common::core::id::Vid::from(1u64),
4982 labels: vec!["Person".to_string()],
4983 properties,
4984 };
4985 let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
4986 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4987 }
4988}