uni_query/query/executor/write.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17 DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18 SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32 eid: Eid,
33 src: Vid,
34 dst: Vid,
35 edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44 vid: Vid,
45 labels: Vec<String>,
46 /// Full property map (storage union L0 from
47 /// `get_all_vertex_props_with_ctx` plus the touched values applied
48 /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49 /// tells the flush which columns to send to Lance via MergeInsert.
50 props: HashMap<String, Value>,
51 /// `true` when the SET should flush via the partial-column MergeInsert
52 /// path: set when `UniConfig::partial_lance_writes` is on AND the
53 /// label has no generated columns. Generated-column labels still need
54 /// the full-row Append so the regenerated values land.
55 partial: bool,
56 /// Set of property keys touched by this statement. Threaded into L0
57 /// so the flush emits a `MergeInsertBuilder` source with exactly
58 /// these columns. Empty when `partial == false`.
59 touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64 src: Vid,
65 dst: Vid,
66 edge_type_id: u32,
67 eid: Eid,
68 edge_type_name: String,
69 /// `true` when the SET should flush via the partial-column
70 /// MergeInsert path on the per-edge-type delta tables (Round 12
71 /// §A). Set when `UniConfig::partial_lance_writes` is on.
72 partial: bool,
73 /// Property keys touched by this statement. Threaded into L0 so
74 /// the flush emits a `MergeInsertBuilder` source with exactly
75 /// these columns. Empty when `partial == false`.
76 touched: HashSet<String>,
77 props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84 if vid.is_ephemeral() {
85 return Err(anyhow::Error::from(
86 uni_common::UniError::EphemeralWriteAttempt {
87 kind: "node",
88 id: vid.transient_id().unwrap_or(vid.as_u64()),
89 },
90 ));
91 }
92 Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97 match val {
98 Value::Null => "Null",
99 Value::Bool(_) => "Bool",
100 Value::Int(_) => "Int",
101 Value::Float(_) => "Float",
102 Value::String(_) => "String",
103 Value::Bytes(_) => "Bytes",
104 Value::List(_) => "List",
105 Value::Map(_) => "Map",
106 Value::Node(_) => "Node",
107 Value::Edge(_) => "Edge",
108 Value::Path(_) => "Path",
109 Value::Vector(_) => "Vector",
110 Value::Temporal(_) => "Temporal",
111 _ => "value",
112 }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117 if eid.is_ephemeral() {
118 return Err(anyhow::Error::from(
119 uni_common::UniError::EphemeralWriteAttempt {
120 kind: "edge",
121 id: eid.transient_id().unwrap_or(eid.as_u64()),
122 },
123 ));
124 }
125 Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150 labels: &[String],
151 op: &str,
152) -> Result<()> {
153 let Some(registry) = registry else {
154 return Ok(());
155 };
156 for label in labels {
157 if registry.virtual_label_by_name(label).is_some() {
158 return Err(anyhow!(
159 "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160 labels are read-only; write back via the originating catalog instead"
161 ));
162 }
163 }
164 Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178 edge_type_id: u32,
179 op: &str,
180) -> Result<()> {
181 let Some(registry) = registry else {
182 return Ok(());
183 };
184 if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185 return Err(anyhow!(
186 "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187 types are read-only; write back via the originating catalog instead",
188 entry.name
189 ));
190 }
191 Ok(())
192}
193
194impl Executor {
195 /// Extracts labels from a node value.
196 ///
197 /// Handles both `Value::Map` (with a `_labels` list field) and
198 /// `Value::Node` (with a `labels` vec field).
199 ///
200 /// Returns `None` when the value is not a node or has no labels.
201 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202 match node_val {
203 Value::Map(map) => {
204 // Map-encoded node: look for _labels array
205 if let Some(Value::List(labels_arr)) = map.get("_labels") {
206 let labels: Vec<String> = labels_arr
207 .iter()
208 .filter_map(|v| v.as_str().map(|s| s.to_string()))
209 .collect();
210 if !labels.is_empty() {
211 return Some(labels);
212 }
213 }
214 None
215 }
216 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217 _ => None,
218 }
219 }
220
221 /// Extracts user-visible properties from a value that represents a node or edge.
222 ///
223 /// Strips internal bookkeeping keys (those prefixed with `_` or named
224 /// `ext_id`) from map-encoded entities and returns only the user-facing
225 /// property key-value pairs.
226 ///
227 /// Returns `None` when `val` is not a map, node, or edge.
228 pub(crate) fn extract_user_properties_from_value(
229 val: &Value,
230 ) -> Option<HashMap<String, Value>> {
231 match val {
232 Value::Map(map) => {
233 // Distinguish entity-encoded maps from plain map literals.
234 // A node map has both `_vid` and `_labels`.
235 // An edge map has `_eid`, `_src`, and `_dst`.
236 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237 let is_edge_map = map.contains_key("_eid")
238 && map.contains_key("_src")
239 && map.contains_key("_dst");
240
241 if is_node_map || is_edge_map {
242 // Filter out internal bookkeeping keys
243 let user_props: HashMap<String, Value> = map
244 .iter()
245 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246 .map(|(k, v)| (k.clone(), v.clone()))
247 .collect();
248 // When mutation output omits dotted property columns, user
249 // properties live inside `_all_props` rather than at the
250 // top level of the entity map.
251 if user_props.is_empty()
252 && let Some(Value::Map(all_props)) = map.get("_all_props")
253 {
254 return Some(all_props.clone());
255 }
256 Some(user_props)
257 } else {
258 // Plain map literal — return as-is
259 Some(map.clone())
260 }
261 }
262 Value::Node(node) => Some(node.properties.clone()),
263 Value::Edge(edge) => Some(edge.properties.clone()),
264 _ => None,
265 }
266 }
267
268 /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269 ///
270 /// When `replace` is `true` the entity's property set is replaced: keys absent
271 /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272 /// layer removes them. When `replace` is `false` the map is merged: keys in
273 /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274 /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275 /// modes.
276 ///
277 /// Labels are never altered — the spec states that `SET n = map` replaces
278 /// properties only.
279 ///
280 /// # Errors
281 ///
282 /// Returns an error if the entity cannot be found in the storage layer, or
283 /// if the writer fails to persist the updated properties.
284 #[expect(clippy::too_many_arguments)]
285 async fn apply_properties_to_entity(
286 &self,
287 variable: &str,
288 new_props: HashMap<String, Value>,
289 replace: bool,
290 row: &mut HashMap<String, Value>,
291 writer: &Writer,
292 prop_manager: &PropertyManager,
293 params: &HashMap<String, Value>,
294 ctx: Option<&QueryContext>,
295 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296 prefetched: &Prefetch,
297 ) -> Result<()> {
298 // Clone the target so we can hold &row references elsewhere.
299 let target = row.get(variable).cloned();
300
301 // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302 // forms, mirroring the per-property SET path (issue #68).
303 let schema = self.storage.schema_manager().schema();
304
305 match target {
306 Some(Value::Node(ref node)) => {
307 let vid = node.vid;
308 let labels = node.labels.clone();
309 let current =
310 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311 let write_props = Self::merge_props(current, new_props, replace);
312 let mut enriched = write_props.clone();
313 for label_name in &labels {
314 self.enrich_properties_with_generated_columns(
315 label_name,
316 &mut enriched,
317 prop_manager,
318 params,
319 ctx,
320 )
321 .await?;
322 }
323 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324 let _ = writer
325 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326 .await?;
327 // Update the in-memory row binding
328 if let Some(Value::Node(n)) = row.get_mut(variable) {
329 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330 }
331 }
332 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333 let vid = Self::vid_from_value(node_val)?;
334 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335 let current =
336 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337 let write_props = Self::merge_props(current, new_props, replace);
338 let mut enriched = write_props.clone();
339 for label_name in &labels {
340 self.enrich_properties_with_generated_columns(
341 label_name,
342 &mut enriched,
343 prop_manager,
344 params,
345 ctx,
346 )
347 .await?;
348 }
349 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350 let _ = writer
351 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352 .await?;
353 // Update the in-memory map-encoded node binding
354 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355 // Remove old user property keys, keep internal fields
356 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357 // Build effective (non-null) properties
358 let effective: HashMap<String, Value> =
359 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360 for (k, v) in &effective {
361 node_map.insert(k.clone(), v.clone());
362 }
363 // Replace _all_props to reflect the complete property set
364 node_map.insert("_all_props".to_string(), Value::Map(effective));
365 }
366 }
367 Some(Value::Edge(ref edge)) => {
368 let eid = edge.eid;
369 let src = edge.src;
370 let dst = edge.dst;
371 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372 let current =
373 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374 let write_props = Self::merge_props(current, new_props, replace);
375 let write_props = Self::coerce_and_validate_props(
376 write_props,
377 &schema,
378 std::slice::from_ref(&edge.edge_type),
379 )?;
380 writer
381 .insert_edge(
382 src,
383 dst,
384 etype,
385 eid,
386 write_props.clone(),
387 Some(edge.edge_type.clone()),
388 tx_l0,
389 )
390 .await?;
391 // Update the in-memory row binding
392 if let Some(Value::Edge(e)) = row.get_mut(variable) {
393 e.properties = write_props
394 .into_iter()
395 .filter(|(_, v)| !v.is_null())
396 .collect();
397 }
398 }
399 Some(Value::Map(ref map))
400 if map.contains_key("_eid")
401 && map.contains_key("_src")
402 && map.contains_key("_dst") =>
403 {
404 let ei = self.extract_edge_identity(map)?;
405 let current =
406 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407 let write_props = Self::merge_props(current, new_props, replace);
408 let edge_type_name = map
409 .get("_type")
410 .and_then(|v| v.as_str())
411 .map(|s| s.to_string())
412 .or_else(|| {
413 self.storage
414 .schema_manager()
415 .edge_type_name_by_id_unified(ei.edge_type_id)
416 });
417 let write_props = match &edge_type_name {
418 Some(name) => Self::coerce_and_validate_props(
419 write_props,
420 &schema,
421 std::slice::from_ref(name),
422 )?,
423 None => write_props,
424 };
425 writer
426 .insert_edge(
427 ei.src,
428 ei.dst,
429 ei.edge_type_id,
430 ei.eid,
431 write_props.clone(),
432 edge_type_name,
433 tx_l0,
434 )
435 .await?;
436 // Update the in-memory map-encoded edge binding
437 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438 edge_map.retain(|k, _| k.starts_with('_'));
439 let effective: HashMap<String, Value> = write_props
440 .into_iter()
441 .filter(|(_, v)| !v.is_null())
442 .collect();
443 for (k, v) in &effective {
444 edge_map.insert(k.clone(), v.clone());
445 }
446 // Replace _all_props to reflect the complete property set
447 edge_map.insert("_all_props".to_string(), Value::Map(effective));
448 }
449 }
450 _ => {
451 // No matching entity — nothing to do (caller already guarded against Null)
452 }
453 }
454 Ok(())
455 }
456
457 /// Computes the property map to write given current storage state and the
458 /// incoming change map.
459 ///
460 /// When `replace` is `true`, keys present in `current` but absent from
461 /// `incoming` are tombstoned with `Value::Null`. Null values inside
462 /// `incoming` are always preserved as explicit tombstones.
463 ///
464 /// When `replace` is `false`, `current` is the base and `incoming` is
465 /// merged on top: each key in `incoming` overwrites or tombstones the
466 /// corresponding entry in `current`.
467 fn merge_props(
468 current: HashMap<String, Value>,
469 incoming: HashMap<String, Value>,
470 replace: bool,
471 ) -> HashMap<String, Value> {
472 if replace {
473 // Start from the non-null incoming entries only.
474 let mut result: HashMap<String, Value> = incoming
475 .iter()
476 .filter(|(_, v)| !v.is_null())
477 .map(|(k, v)| (k.clone(), v.clone()))
478 .collect();
479 // Tombstone every current key that is absent from incoming OR explicitly
480 // set to null in incoming (both mean "delete this property").
481 for k in current.keys() {
482 if incoming.get(k).is_none_or(|v| v.is_null()) {
483 result.insert(k.clone(), Value::Null);
484 }
485 }
486 result
487 } else {
488 // Merge: start from current and apply incoming on top
489 let mut result = current;
490 result.extend(incoming);
491 result
492 }
493 }
494
495 /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497 let eid = Eid::from(
498 map.get("_eid")
499 .and_then(|v| v.as_u64())
500 .ok_or_else(|| anyhow!("Invalid _eid"))?,
501 );
502 let src = Vid::from(
503 map.get("_src")
504 .and_then(|v| v.as_u64())
505 .ok_or_else(|| anyhow!("Invalid _src"))?,
506 );
507 let dst = Vid::from(
508 map.get("_dst")
509 .and_then(|v| v.as_u64())
510 .ok_or_else(|| anyhow!("Invalid _dst"))?,
511 );
512 let edge_type_id = self.resolve_edge_type_id(
513 map.get("_type")
514 .or_else(|| map.get("_type_name"))
515 .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516 )?;
517 Ok(EdgeIdentity {
518 eid,
519 src,
520 dst,
521 edge_type_id,
522 })
523 }
524
525 /// Resolve edge type ID from a Value, supporting both Int and String representations.
526 /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527 ///
528 /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529 /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530 /// where the edge type was just created and may not be in the read-only lookup yet.
531 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532 match type_val {
533 Value::Int(i) => Ok(*i as u32),
534 Value::String(name) => {
535 if self.config.strict_schema {
536 let schema = self.storage.schema_manager().schema();
537 schema
538 .edge_type_id_by_name_case_insensitive(name)
539 .ok_or_else(|| {
540 anyhow!(
541 "Edge type '{}' is not defined in the schema \
542 (strict_schema is enabled). \
543 Declare it with db.schema().edge_type(...).apply() first.",
544 name
545 )
546 })
547 } else {
548 // Schemaless: assign new ID if not found in schema or registry.
549 Ok(self
550 .storage
551 .schema_manager()
552 .get_or_assign_edge_type_id(name))
553 }
554 }
555 _ => Err(anyhow!(
556 "Invalid _type value: expected Int or String, got {:?}",
557 type_val
558 )),
559 }
560 }
561
562 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563 if let Some(writer_arc) = &self.writer {
564 // Flush first while holding the lock
565 {
566 let writer: &uni_store::Writer = writer_arc.as_ref();
567 writer.flush_to_l1(None).await?;
568 } // Drop lock before compacting to avoid blocking reads/writes
569
570 // Compaction can run without holding the writer lock
571 let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572 let compaction_results = compactor.compact_all().await?;
573
574 // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575 let am = self.storage.adjacency_manager();
576 let schema = self.storage.schema_manager().schema();
577 for info in compaction_results {
578 // Convert string direction to Direction enum
579 let direction = match info.direction.as_str() {
580 "fwd" => uni_store::storage::direction::Direction::Outgoing,
581 "bwd" => uni_store::storage::direction::Direction::Incoming,
582 _ => continue,
583 };
584
585 // Get edge_type_id
586 if let Some(edge_type_id) =
587 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588 {
589 // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591 }
592 }
593 }
594 Ok(())
595 }
596
597 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598 if let Some(writer_arc) = &self.writer {
599 let writer: &uni_store::Writer = writer_arc.as_ref();
600 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601 }
602 Ok(())
603 }
604
605 pub(crate) async fn execute_copy_to(
606 &self,
607 identifier: &str,
608 path: &str,
609 format: &str,
610 options: &HashMap<String, Value>,
611 ) -> Result<usize> {
612 // Check schema to determine if identifier is an edge type or vertex label
613 let schema = self.storage.schema_manager().schema();
614
615 // Try as edge type first
616 if schema.get_edge_type_case_insensitive(identifier).is_some() {
617 return self
618 .export_edge_type_in_format(identifier, path, format)
619 .await;
620 }
621
622 // Try as vertex label
623 if schema.get_label_case_insensitive(identifier).is_some() {
624 return self
625 .export_vertex_label_in_format(identifier, path, format, options)
626 .await;
627 }
628
629 // Neither edge type nor vertex label found
630 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631 }
632
633 async fn export_vertex_label_in_format(
634 &self,
635 label: &str,
636 path: &str,
637 format: &str,
638 _options: &HashMap<String, Value>,
639 ) -> Result<usize> {
640 match format {
641 "parquet" => self.export_vertex_label(label, path).await,
642 "csv" => {
643 let mut stream = self
644 .storage
645 .scan_vertex_table_stream(label)
646 .await?
647 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649 // Collect all batches
650 let mut all_rows = Vec::new();
651 let mut column_names = Vec::new();
652
653 // Iterate stream using StreamExt
654 use futures::StreamExt;
655 while let Some(batch_result) = stream.next().await {
656 let batch = batch_result?;
657
658 // Get column names from first batch
659 if column_names.is_empty() {
660 column_names = batch
661 .schema()
662 .fields()
663 .iter()
664 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665 .map(|f| f.name().clone())
666 .collect();
667 }
668
669 // Convert batch to rows
670 for row_idx in 0..batch.num_rows() {
671 let mut row = Vec::new();
672 for field in batch.schema().fields() {
673 if field.name().starts_with('_') || field.name() == "ext_id" {
674 continue;
675 }
676
677 let col_idx = batch.schema().index_of(field.name())?;
678 let column = batch.column(col_idx);
679 let value = self.arrow_value_to_json(column, row_idx)?;
680
681 // Convert value to CSV string
682 let csv_value = match value {
683 Value::Null => String::new(),
684 Value::Bool(b) => b.to_string(),
685 Value::Int(i) => i.to_string(),
686 Value::Float(f) => f.to_string(),
687 Value::String(s) => s,
688 _ => format!("{value}"),
689 };
690 row.push(csv_value);
691 }
692 all_rows.push(row);
693 }
694 }
695
696 // Write CSV
697 let file = std::fs::File::create(path)?;
698 let mut wtr = csv::Writer::from_writer(file);
699
700 // Write headers
701 log::debug!("CSV export headers: {:?}", column_names);
702 wtr.write_record(&column_names)?;
703
704 // Write rows
705 for (i, row) in all_rows.iter().enumerate() {
706 log::debug!("CSV export row {}: {:?}", i, row);
707 wtr.write_record(row)?;
708 }
709
710 wtr.flush()?;
711 Ok(all_rows.len())
712 }
713 _ => Err(anyhow!(
714 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715 format
716 )),
717 }
718 }
719
720 async fn export_edge_type_in_format(
721 &self,
722 edge_type: &str,
723 path: &str,
724 format: &str,
725 ) -> Result<usize> {
726 match format {
727 "parquet" => self.export_edge_type(edge_type, path).await,
728 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729 _ => Err(anyhow!(
730 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731 format
732 )),
733 }
734 }
735
736 /// Write a stream of record batches to a Parquet file.
737 /// Returns the total number of rows written, or 0 if the stream is empty.
738 async fn write_batches_to_parquet(
739 mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740 path: &str,
741 entity_description: &str,
742 ) -> Result<usize> {
743 use futures::TryStreamExt;
744
745 // Get first batch to determine schema and create writer
746 let first_batch = match stream.try_next().await? {
747 Some(batch) => batch,
748 None => {
749 log::info!("No data to export from {}", entity_description);
750 return Ok(0);
751 }
752 };
753
754 // Create Parquet writer using schema from first batch
755 let file = std::fs::File::create(path)?;
756 let arrow_schema = first_batch.schema();
757 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759 // Write first batch
760 let mut count = first_batch.num_rows();
761 writer.write(&first_batch)?;
762
763 // Write remaining batches
764 while let Some(batch) = stream.try_next().await? {
765 count += batch.num_rows();
766 writer.write(&batch)?;
767 }
768
769 writer.close()?;
770
771 log::info!(
772 "Exported {} rows from {} to '{}'",
773 count,
774 entity_description,
775 path
776 );
777 Ok(count)
778 }
779
780 /// Export vertices of a specific label to Parquet
781 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782 let stream = self
783 .storage
784 .scan_vertex_table_stream(label)
785 .await?
786 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789 }
790
791 /// Export edges of a specific type to Parquet
792 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793 let schema = self.storage.schema_manager().schema();
794 if !schema.edge_types.contains_key(edge_type) {
795 return Err(anyhow!("Edge type '{}' not found", edge_type));
796 }
797
798 let filter = format!("type = '{}'", edge_type);
799 let stream = self
800 .storage
801 .scan_main_edge_table_stream(Some(&filter))
802 .await?
803 .ok_or_else(|| anyhow!("No edge data found"))?;
804
805 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806 }
807
808 pub(crate) async fn execute_copy_from(
809 &self,
810 label: &str,
811 path: &str,
812 format: &str,
813 options: &HashMap<String, Value>,
814 ) -> Result<usize> {
815 // Read data from file
816 let batches = match format {
817 "parquet" => self.read_parquet_file(path)?,
818 "csv" => self.read_csv_file(path, label, options)?,
819 _ => {
820 return Err(anyhow!(
821 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822 format
823 ));
824 }
825 };
826
827 // Get writer
828 let writer_arc = self
829 .writer
830 .as_ref()
831 .ok_or_else(|| anyhow!("No writer available"))?;
832
833 let db_schema = self.storage.schema_manager().schema();
834
835 // Check if this is a label (vertex) or edge type
836 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838 if is_edge {
839 // Import edges
840 let edge_type_id = db_schema
841 .edge_type_id_by_name(label)
842 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844 // Get src and dst column names from options
845 let src_col = options
846 .get("src_col")
847 .and_then(|v| v.as_str())
848 .unwrap_or("src");
849 let dst_col = options
850 .get("dst_col")
851 .and_then(|v| v.as_str())
852 .unwrap_or("dst");
853
854 // §5.7 of concurrent_writer.md: writer is hoisted above the row
855 // loop now that there is no per-row lock acquisition cost.
856 let writer: &uni_store::Writer = writer_arc.as_ref();
857 let mut total_rows = 0;
858 for batch in batches {
859 let num_rows = batch.num_rows();
860 // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861 let eids = writer.allocate_eids(num_rows).await?;
862
863 for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864 let mut properties = HashMap::new();
865 let mut src_vid: Option<Vid> = None;
866 let mut dst_vid: Option<Vid> = None;
867
868 // Extract properties and VIDs from each column
869 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870 let col_name = field.name();
871 let column = batch.column(col_idx);
872 let value = self.arrow_value_to_json(column, row_idx)?;
873
874 if col_name == src_col {
875 let raw = value.as_u64().unwrap_or_else(|| {
876 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877 });
878 src_vid = Some(Vid::new(raw));
879 } else if col_name == dst_col {
880 let raw = value.as_u64().unwrap_or_else(|| {
881 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882 });
883 dst_vid = Some(Vid::new(raw));
884 } else if !col_name.starts_with('_') && !value.is_null() {
885 properties.insert(col_name.clone(), value);
886 }
887 }
888
889 let src = src_vid
890 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891 let dst = dst_vid
892 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894 writer
895 .insert_edge(
896 src,
897 dst,
898 edge_type_id,
899 eid,
900 properties,
901 Some(label.to_string()),
902 None,
903 )
904 .await?;
905
906 total_rows += 1;
907 }
908 }
909
910 log::info!(
911 "Imported {} edge rows from '{}' into edge type '{}'",
912 total_rows,
913 path,
914 label
915 );
916
917 // Flush to persist edges
918 if total_rows > 0 {
919 writer.flush_to_l1(None).await?;
920 }
921
922 Ok(total_rows)
923 } else {
924 // Import vertices
925 // Validate the label exists in schema
926 db_schema
927 .label_id_by_name_case_insensitive(label)
928 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930 // §5.7 of concurrent_writer.md: writer is hoisted above the row
931 // loop now that there is no per-row lock acquisition cost.
932 let writer: &uni_store::Writer = writer_arc.as_ref();
933 let mut total_rows = 0;
934 for batch in batches {
935 let num_rows = batch.num_rows();
936 // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937 let vids = writer.allocate_vids(num_rows).await?;
938
939 // Convert Arrow batch to rows
940 for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941 let mut properties = HashMap::new();
942
943 // Extract properties from each column
944 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945 let col_name = field.name();
946
947 // Skip internal columns
948 if col_name.starts_with('_') {
949 continue;
950 }
951
952 let column = batch.column(col_idx);
953 let value = self.arrow_value_to_json(column, row_idx)?;
954
955 if !value.is_null() {
956 properties.insert(col_name.clone(), value);
957 }
958 }
959
960 let _ = writer
961 .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962 .await?;
963
964 total_rows += 1;
965 }
966 }
967
968 log::info!(
969 "Imported {} rows from '{}' into label '{}'",
970 total_rows,
971 path,
972 label
973 );
974
975 // Flush to persist vertices
976 if total_rows > 0 {
977 writer.flush_to_l1(None).await?;
978 }
979
980 Ok(total_rows)
981 }
982 }
983
984 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985 use arrow_array::Array;
986 use arrow_schema::DataType as ArrowDataType;
987
988 if column.is_null(row_idx) {
989 return Ok(Value::Null);
990 }
991
992 match column.data_type() {
993 ArrowDataType::Utf8 => {
994 let array = column
995 .as_any()
996 .downcast_ref::<arrow_array::StringArray>()
997 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998 Ok(Value::String(array.value(row_idx).to_string()))
999 }
1000 ArrowDataType::Int32 => {
1001 let array = column
1002 .as_any()
1003 .downcast_ref::<arrow_array::Int32Array>()
1004 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005 Ok(Value::Int(array.value(row_idx) as i64))
1006 }
1007 ArrowDataType::Int64 => {
1008 let array = column
1009 .as_any()
1010 .downcast_ref::<arrow_array::Int64Array>()
1011 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012 Ok(Value::Int(array.value(row_idx)))
1013 }
1014 ArrowDataType::Float32 => {
1015 let array = column
1016 .as_any()
1017 .downcast_ref::<arrow_array::Float32Array>()
1018 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019 Ok(Value::Float(array.value(row_idx) as f64))
1020 }
1021 ArrowDataType::Float64 => {
1022 let array = column
1023 .as_any()
1024 .downcast_ref::<arrow_array::Float64Array>()
1025 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026 Ok(Value::Float(array.value(row_idx)))
1027 }
1028 ArrowDataType::Boolean => {
1029 let array = column
1030 .as_any()
1031 .downcast_ref::<arrow_array::BooleanArray>()
1032 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033 Ok(Value::Bool(array.value(row_idx)))
1034 }
1035 ArrowDataType::UInt64 => {
1036 let array = column
1037 .as_any()
1038 .downcast_ref::<arrow_array::UInt64Array>()
1039 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040 Ok(Value::Int(array.value(row_idx) as i64))
1041 }
1042 _ => {
1043 // For other types, try to convert to string
1044 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045 if let Some(arr) = array {
1046 Ok(Value::String(arr.value(row_idx).to_string()))
1047 } else {
1048 Ok(Value::Null)
1049 }
1050 }
1051 }
1052 }
1053
1054 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055 let file = std::fs::File::open(path)?;
1056 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057 .build()?;
1058 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059 }
1060
1061 fn read_csv_file(
1062 &self,
1063 path: &str,
1064 label: &str,
1065 options: &HashMap<String, Value>,
1066 ) -> Result<Vec<arrow_array::RecordBatch>> {
1067 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069 use std::sync::Arc;
1070
1071 // Parse CSV options
1072 let has_headers = options
1073 .get("headers")
1074 .and_then(|v| v.as_bool())
1075 .unwrap_or(true);
1076
1077 // Read CSV file
1078 let file = std::fs::File::open(path)?;
1079 let mut rdr = csv::ReaderBuilder::new()
1080 .has_headers(has_headers)
1081 .from_reader(file);
1082
1083 // Get schema for type conversion
1084 let db_schema = self.storage.schema_manager().schema();
1085 let properties = db_schema.properties.get(label);
1086
1087 // Collect all rows first to determine schema
1088 let mut rows: Vec<Vec<String>> = Vec::new();
1089 let headers: Vec<String> = if has_headers {
1090 rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091 } else {
1092 Vec::new()
1093 };
1094
1095 for result in rdr.records() {
1096 let record = result?;
1097 rows.push(record.iter().map(|s| s.to_string()).collect());
1098 }
1099
1100 if rows.is_empty() {
1101 return Ok(Vec::new());
1102 }
1103
1104 // Build Arrow schema with proper types based on DB schema
1105 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106 let col_names: Vec<String> = if has_headers {
1107 headers
1108 } else {
1109 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110 };
1111
1112 for name in &col_names {
1113 let arrow_type = if let Some(props) = properties {
1114 if let Some(prop_meta) = props.get(name) {
1115 match prop_meta.r#type {
1116 DataType::Int32 => ArrowDataType::Int32,
1117 DataType::Int64 => ArrowDataType::Int64,
1118 DataType::Float32 => ArrowDataType::Float32,
1119 DataType::Float64 => ArrowDataType::Float64,
1120 DataType::Bool => ArrowDataType::Boolean,
1121 _ => ArrowDataType::Utf8,
1122 }
1123 } else {
1124 ArrowDataType::Utf8
1125 }
1126 } else {
1127 ArrowDataType::Utf8
1128 };
1129 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130 }
1131
1132 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134 // Convert rows to Arrow arrays with proper types
1135 let mut columns: Vec<ArrayRef> = Vec::new();
1136 for (col_idx, field) in arrow_fields.iter().enumerate() {
1137 match field.data_type() {
1138 ArrowDataType::Int32 => {
1139 let values: Vec<Option<i32>> = rows
1140 .iter()
1141 .map(|row| {
1142 if col_idx < row.len() {
1143 row[col_idx].parse().ok()
1144 } else {
1145 None
1146 }
1147 })
1148 .collect();
1149 columns.push(Arc::new(Int32Array::from(values)));
1150 }
1151 _ => {
1152 // Default to string
1153 let values: Vec<Option<String>> = rows
1154 .iter()
1155 .map(|row| {
1156 if col_idx < row.len() {
1157 Some(row[col_idx].clone())
1158 } else {
1159 None
1160 }
1161 })
1162 .collect();
1163 columns.push(Arc::new(StringArray::from(values)));
1164 }
1165 }
1166 }
1167
1168 let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169 Ok(vec![batch])
1170 }
1171
1172 fn parse_data_type(type_str: &str) -> Result<DataType> {
1173 use uni_common::core::schema::{CrdtType, PointType};
1174 let type_str = type_str.to_lowercase();
1175 let type_str = type_str.trim();
1176 match type_str {
1177 "string" | "text" | "varchar" => Ok(DataType::String),
1178 "int" | "integer" | "int32" => Ok(DataType::Int32),
1179 "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180 "float" | "float32" | "real" => Ok(DataType::Float32),
1181 "double" | "float64" => Ok(DataType::Float64),
1182 "bool" | "boolean" => Ok(DataType::Bool),
1183 "timestamp" => Ok(DataType::Timestamp),
1184 "date" => Ok(DataType::Date),
1185 "time" => Ok(DataType::Time),
1186 "datetime" => Ok(DataType::DateTime),
1187 "duration" => Ok(DataType::Duration),
1188 "btic" => Ok(DataType::Btic),
1189 "json" | "jsonb" => Ok(DataType::CypherValue),
1190 "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194 s if s.starts_with("sparse_vector(") && s.ends_with(')') => {
1195 let dims_str = &s["sparse_vector(".len()..s.len() - 1];
1196 let dimensions = dims_str
1197 .parse::<usize>()
1198 .map_err(|_| anyhow!("Invalid sparse_vector dimensions: {}", dims_str))?;
1199 Ok(DataType::SparseVector { dimensions })
1200 }
1201 s if s.starts_with("vector(") && s.ends_with(')') => {
1202 let dims_str = &s[7..s.len() - 1];
1203 let dimensions = dims_str
1204 .parse::<usize>()
1205 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1206 Ok(DataType::Vector { dimensions })
1207 }
1208 s if s.starts_with("list<") && s.ends_with('>') => {
1209 let inner_type_str = &s[5..s.len() - 1];
1210 let inner_type = Self::parse_data_type(inner_type_str)?;
1211 Ok(DataType::List(Box::new(inner_type)))
1212 }
1213 s if s.starts_with("map<") && s.ends_with('>') => {
1214 let (k_str, v_str) = Self::split_map_kv(&s[4..s.len() - 1])?;
1215 let key_type = Self::parse_data_type(&k_str)?;
1216 if !matches!(key_type, DataType::String) {
1217 return Err(anyhow!("MAP key type must be STRING, got: {k_str}"));
1218 }
1219 let value_type = Self::parse_data_type(&v_str)?;
1220 Ok(DataType::Map(Box::new(key_type), Box::new(value_type)))
1221 }
1222 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1223 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1224 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1225 }
1226 }
1227
1228 /// Split a `MAP<K, V>` inner string on the top-level comma, respecting `<>`/`()` depth
1229 /// so nested value types (`STRING, LIST<INT>`, `STRING, MAP<STRING,INT>`) split at the
1230 /// right comma. Returns trimmed `(key, value)` type strings.
1231 fn split_map_kv(inner: &str) -> Result<(String, String)> {
1232 let mut depth = 0i32;
1233 for (i, c) in inner.char_indices() {
1234 match c {
1235 '<' | '(' => depth += 1,
1236 '>' | ')' => depth -= 1,
1237 ',' if depth == 0 => {
1238 let k = inner[..i].trim();
1239 let v = inner[i + 1..].trim();
1240 if k.is_empty() || v.is_empty() {
1241 return Err(anyhow!("MAP<K,V> requires both a key and a value type"));
1242 }
1243 return Ok((k.to_string(), v.to_string()));
1244 }
1245 _ => {}
1246 }
1247 }
1248 Err(anyhow!(
1249 "MAP<K,V> requires a comma separating key and value types"
1250 ))
1251 }
1252
1253 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1254 let sm = self.storage.schema_manager_arc();
1255 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1256 return Ok(());
1257 }
1258 sm.add_label_with_desc(&clause.name, clause.description)?;
1259 for prop in clause.properties {
1260 let dt = Self::parse_data_type(&prop.data_type)?;
1261 sm.add_property_with_desc(
1262 &clause.name,
1263 &prop.name,
1264 dt,
1265 prop.nullable,
1266 prop.description,
1267 )?;
1268 if prop.unique {
1269 let constraint = Constraint {
1270 name: format!("{}_{}_unique", clause.name, prop.name),
1271 constraint_type: ConstraintType::Unique {
1272 properties: vec![prop.name],
1273 },
1274 target: ConstraintTarget::Label(clause.name.clone()),
1275 enabled: true,
1276 };
1277 sm.add_constraint(constraint)?;
1278 }
1279 }
1280 sm.save().await?;
1281 Ok(())
1282 }
1283
1284 /// True if `key` is a generated property on any of the given labels.
1285 /// Used by the partial-write flush path (Round 12 §C) to decide
1286 /// whether the property should be added to `touched_keys` so that
1287 /// Lance MergeInsert sends the recomputed value.
1288 fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1289 let schema = self.storage.schema_manager().schema();
1290 for label in labels {
1291 if let Some(props_meta) = schema.properties.get(label)
1292 && let Some(meta) = props_meta.get(key)
1293 && meta.generation_expression.is_some()
1294 {
1295 return true;
1296 }
1297 }
1298 false
1299 }
1300
1301 pub(crate) async fn enrich_properties_with_generated_columns(
1302 &self,
1303 label_name: &str,
1304 properties: &mut HashMap<String, Value>,
1305 prop_manager: &PropertyManager,
1306 params: &HashMap<String, Value>,
1307 ctx: Option<&QueryContext>,
1308 ) -> Result<()> {
1309 let schema = self.storage.schema_manager().schema();
1310
1311 if let Some(props_meta) = schema.properties.get(label_name) {
1312 let mut generators = Vec::new();
1313 for (prop_name, meta) in props_meta {
1314 if let Some(expr_str) = &meta.generation_expression {
1315 generators.push((prop_name.clone(), expr_str.clone()));
1316 }
1317 }
1318
1319 for (prop_name, expr_str) in generators {
1320 let cache_key = (label_name.to_string(), prop_name.clone());
1321 let expr = {
1322 let cache = self.gen_expr_cache.read().await;
1323 cache.get(&cache_key).cloned()
1324 };
1325
1326 let expr = match expr {
1327 Some(e) => e,
1328 None => {
1329 let parsed = uni_cypher::parse_expression(&expr_str)
1330 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1331 let mut cache = self.gen_expr_cache.write().await;
1332 cache.insert(cache_key, parsed.clone());
1333 parsed
1334 }
1335 };
1336
1337 let mut scope = HashMap::new();
1338
1339 // If expression has an explicit variable, use it as an object
1340 if let Some(var) = expr.extract_variable() {
1341 scope.insert(var, Value::Map(properties.clone()));
1342 } else {
1343 // No explicit variable - add properties directly to scope for bare references
1344 // e.g., "lower(email)" can reference "email" directly
1345 for (k, v) in properties.iter() {
1346 scope.insert(k.clone(), v.clone());
1347 }
1348 }
1349
1350 let val = self
1351 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1352 .await?;
1353 properties.insert(prop_name, val);
1354 }
1355 }
1356 Ok(())
1357 }
1358
1359 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1360 let sm = self.storage.schema_manager_arc();
1361 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1362 return Ok(());
1363 }
1364 sm.add_edge_type_with_desc(
1365 &clause.name,
1366 clause.src_labels,
1367 clause.dst_labels,
1368 clause.description,
1369 )?;
1370 for prop in clause.properties {
1371 let dt = Self::parse_data_type(&prop.data_type)?;
1372 sm.add_property_with_desc(
1373 &clause.name,
1374 &prop.name,
1375 dt,
1376 prop.nullable,
1377 prop.description,
1378 )?;
1379 }
1380 sm.save().await?;
1381 Ok(())
1382 }
1383
1384 /// Executes an ALTER action on a schema entity.
1385 ///
1386 /// This is a shared helper for both `execute_alter_label` and
1387 /// `execute_alter_edge_type` since they have identical logic.
1388 pub(crate) async fn execute_alter_entity(
1389 sm: &Arc<SchemaManager>,
1390 entity_name: &str,
1391 action: AlterAction,
1392 ) -> Result<()> {
1393 match action {
1394 AlterAction::AddProperty(prop) => {
1395 let dt = Self::parse_data_type(&prop.data_type)?;
1396 sm.add_property_with_desc(
1397 entity_name,
1398 &prop.name,
1399 dt,
1400 prop.nullable,
1401 prop.description,
1402 )?;
1403 }
1404 AlterAction::DropProperty(prop_name) => {
1405 sm.drop_property(entity_name, &prop_name)?;
1406 }
1407 AlterAction::RenameProperty { old_name, new_name } => {
1408 sm.rename_property(entity_name, &old_name, &new_name)?;
1409 }
1410 AlterAction::SetDescription(desc) => {
1411 if sm.schema().labels.contains_key(entity_name) {
1412 sm.set_label_description(entity_name, desc)?;
1413 } else {
1414 sm.set_edge_type_description(entity_name, desc)?;
1415 }
1416 }
1417 AlterAction::SetPropertyDescription {
1418 property,
1419 description,
1420 } => {
1421 sm.set_property_description(entity_name, &property, description)?;
1422 }
1423 }
1424 sm.save().await?;
1425 Ok(())
1426 }
1427
1428 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1429 Self::execute_alter_entity(
1430 &self.storage.schema_manager_arc(),
1431 &clause.name,
1432 clause.action,
1433 )
1434 .await
1435 }
1436
1437 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1438 Self::execute_alter_entity(
1439 &self.storage.schema_manager_arc(),
1440 &clause.name,
1441 clause.action,
1442 )
1443 .await
1444 }
1445
1446 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1447 let sm = self.storage.schema_manager_arc();
1448 sm.drop_label(&clause.name, clause.if_exists)?;
1449 sm.save().await?;
1450 Ok(())
1451 }
1452
1453 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1454 let sm = self.storage.schema_manager_arc();
1455 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1456 sm.save().await?;
1457 Ok(())
1458 }
1459
1460 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1461 let sm = self.storage.schema_manager_arc();
1462 let target = ConstraintTarget::Label(clause.label);
1463 let c_type = match clause.constraint_type {
1464 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1465 properties: clause.properties,
1466 },
1467 AstConstraintType::Exists => {
1468 let property = clause
1469 .properties
1470 .into_iter()
1471 .next()
1472 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1473 ConstraintType::Exists { property }
1474 }
1475 AstConstraintType::Check => {
1476 let expression = clause
1477 .expression
1478 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1479 ConstraintType::Check {
1480 expression: expression.to_string_repr(),
1481 }
1482 }
1483 };
1484
1485 let constraint = Constraint {
1486 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1487 constraint_type: c_type,
1488 target,
1489 enabled: true,
1490 };
1491
1492 sm.add_constraint(constraint)?;
1493 sm.save().await?;
1494 Ok(())
1495 }
1496
1497 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1498 let sm = self.storage.schema_manager_arc();
1499 sm.drop_constraint(&clause.name, false)?;
1500 sm.save().await?;
1501 Ok(())
1502 }
1503
1504 /// Detects the single-node, single-label MERGE shape the fast path serves.
1505 ///
1506 /// Returns the node pattern and its label when `pattern` is one path with
1507 /// one node element, exactly one label, and a static map-literal property
1508 /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1509 /// per-row query planning. The keys do NOT need to be indexed: the persisted
1510 /// lookup degrades to a (single, filtered) label scan when no scalar index
1511 /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1512 /// Any other shape (edges, multiple labels, non-literal properties) returns
1513 /// `None` so the caller uses the general per-row path.
1514 fn merge_single_node_fastpath<'p>(
1515 &self,
1516 pattern: &'p Pattern,
1517 ) -> Option<(&'p NodePattern, String)> {
1518 if pattern.paths.len() != 1 {
1519 return None;
1520 }
1521 let path = &pattern.paths[0];
1522 if path.elements.len() != 1 {
1523 return None;
1524 }
1525 let PatternElement::Node(n) = &path.elements[0] else {
1526 return None;
1527 };
1528 let labels = n.labels.names();
1529 if labels.len() != 1 {
1530 return None;
1531 }
1532 // The key must be a static map literal so the key names are known.
1533 let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1534 return None;
1535 };
1536 if entries.is_empty() {
1537 return None;
1538 }
1539 // Resolve the label to its schema-canonical case so the fast path agrees
1540 // with the general MERGE path (which matches labels case-insensitively).
1541 // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1542 // scans/keys a different label than the canonical one and creates a
1543 // duplicate (review #3a). Falls back to the as-written label when the
1544 // schema does not know it (schemaless).
1545 let canonical = self
1546 .storage
1547 .schema_manager()
1548 .schema()
1549 .canonical_label_name(&labels[0])
1550 .unwrap_or_else(|| labels[0].clone());
1551 Some((n, canonical))
1552 }
1553
1554 /// RC3: detect the bound-endpoints, anonymous-edge relationship MERGE shape
1555 /// `(a)-[:TYPE]->(b)` whose edge existence can be resolved with one O(1)
1556 /// adjacency probe instead of building and running a per-row traversal
1557 /// `LogicalPlan` (the general path is ~19x the bulk CREATE of the same edges).
1558 ///
1559 /// Returns `(source_var, target_var, edge_type_id, direction)` when the
1560 /// pattern is exactly one path of `[Node, Rel, Node]` where the relationship
1561 /// is a single concrete type, **anonymous** (no variable → no edge binding to
1562 /// reproduce), fixed-length, and unfiltered, and both endpoint nodes are plain
1563 /// variables with no re-specified MERGE properties or inline WHERE (those are
1564 /// filters only the general path applies). Any deviation returns `None` and
1565 /// the caller keeps the general path. The caller still verifies per row that
1566 /// both endpoints are actually bound to vids.
1567 fn merge_relationship_fastpath_shape(
1568 &self,
1569 pattern: &Pattern,
1570 ) -> Option<(
1571 String,
1572 String,
1573 u32,
1574 uni_store::storage::direction::Direction,
1575 )> {
1576 if pattern.paths.len() != 1 {
1577 return None;
1578 }
1579 let [
1580 PatternElement::Node(a),
1581 PatternElement::Relationship(r),
1582 PatternElement::Node(b),
1583 ] = pattern.paths[0].elements.as_slice()
1584 else {
1585 return None;
1586 };
1587 // Endpoints: plain variables, no extra MERGE-pattern properties / inline
1588 // WHERE (a re-specified property is a filter the general path must apply).
1589 let src_var = a.variable.as_ref()?;
1590 let dst_var = b.variable.as_ref()?;
1591 if a.properties.is_some()
1592 || a.where_clause.is_some()
1593 || b.properties.is_some()
1594 || b.where_clause.is_some()
1595 {
1596 return None;
1597 }
1598 // Relationship: single concrete type, anonymous, fixed-length, unfiltered.
1599 if r.variable.is_some()
1600 || r.range.is_some()
1601 || r.properties.is_some()
1602 || r.where_clause.is_some()
1603 || r.types.names().len() != 1
1604 {
1605 return None;
1606 }
1607 let type_name = &r.types.names()[0];
1608 let type_id = if self.config.strict_schema {
1609 self.storage
1610 .schema_manager()
1611 .schema()
1612 .edge_type_id_by_name_case_insensitive(type_name)?
1613 } else {
1614 self.storage
1615 .schema_manager()
1616 .get_or_assign_edge_type_id(type_name)
1617 };
1618 let dir = match r.direction {
1619 uni_cypher::ast::Direction::Outgoing => {
1620 uni_store::storage::direction::Direction::Outgoing
1621 }
1622 uni_cypher::ast::Direction::Incoming => {
1623 uni_store::storage::direction::Direction::Incoming
1624 }
1625 // Undirected existence is ambiguous to encode as one probe; the
1626 // general path handles it.
1627 uni_cypher::ast::Direction::Both => return None,
1628 };
1629 Some((src_var.clone(), dst_var.clone(), type_id, dir))
1630 }
1631
1632 /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1633 /// is not a scalar this fast path can represent.
1634 ///
1635 /// Returning `None` makes the caller fall back to the general per-row path,
1636 /// so unusual key value types (lists, maps, temporals, nulls) are never
1637 /// silently mis-matched. The `_deleted = false` clause mirrors the
1638 /// persisted-read predicate used elsewhere; the version high-water-mark
1639 /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1640 fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1641 if key_props.is_empty() {
1642 return None;
1643 }
1644 let mut parts = Vec::with_capacity(key_props.len() + 1);
1645 for (k, v) in key_props {
1646 if !Self::is_safe_key_ident(k) {
1647 return None;
1648 }
1649 let lit = Self::render_key_literal(v)?;
1650 // Unquoted identifier: the Lance filter parser does not resolve a
1651 // double-quoted column name against the table here, so `"k" = v`
1652 // silently matches nothing. Keys are validated above to be safe
1653 // bare identifiers.
1654 parts.push(format!("{k} = {lit}"));
1655 }
1656 parts.push("_deleted = false".to_string());
1657 Some(parts.join(" AND "))
1658 }
1659
1660 /// True when a MERGE key name is a safe bare identifier for a Lance
1661 /// filter (issue #8). Keys come from a static map literal, but validate
1662 /// anyway.
1663 fn is_safe_key_ident(k: &str) -> bool {
1664 !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1665 }
1666
1667 /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1668 /// for value types this fast path cannot represent (lists, maps,
1669 /// temporals, nulls) — the caller then falls back to the general path.
1670 fn render_key_literal(v: &Value) -> Option<String> {
1671 Some(match v {
1672 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1673 Value::Int(i) => i.to_string(),
1674 Value::Float(f) => f.to_string(),
1675 Value::Bool(b) => b.to_string(),
1676 _ => return None,
1677 })
1678 }
1679
1680 /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1681 /// sorted by `key_names` order, values canonicalized).
1682 ///
1683 /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1684 /// never compares mixed literal types against one column); composite keys
1685 /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1686 /// the same `_deleted = false` clause the per-row filter used.
1687 fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1688 if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1689 return None;
1690 }
1691 let disjunction = if let [key] = key_names {
1692 // Group literals by value variant so each IN list is homogeneous.
1693 let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1694 for tuple in keys {
1695 let (_, v) = tuple.first()?;
1696 groups
1697 .entry(std::mem::discriminant(v))
1698 .or_default()
1699 .push(Self::render_key_literal(v)?);
1700 }
1701 groups
1702 .into_values()
1703 .map(|lits| {
1704 if let [lit] = lits.as_slice() {
1705 format!("{key} = {lit}")
1706 } else {
1707 format!("{key} IN ({})", lits.join(", "))
1708 }
1709 })
1710 .collect::<Vec<_>>()
1711 .join(" OR ")
1712 } else {
1713 keys.iter()
1714 .map(|tuple| {
1715 let conj = tuple
1716 .iter()
1717 .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1718 .collect::<Option<Vec<_>>>()?
1719 .join(" AND ");
1720 Some(format!("({conj})"))
1721 })
1722 .collect::<Option<Vec<_>>>()?
1723 .join(" OR ")
1724 };
1725 Some(format!("({disjunction}) AND _deleted = false"))
1726 }
1727
1728 /// Canonicalize a numeric MERGE-key value for *matching only*.
1729 ///
1730 /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1731 /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1732 /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1733 /// path already applies (review #3a). Non-numeric and non-integral values are
1734 /// returned unchanged. Used only to build match keys / comparisons, never the
1735 /// value written to a created node.
1736 fn canonical_key_value(v: &Value) -> Value {
1737 match v {
1738 Value::Float(f)
1739 if f.is_finite()
1740 && f.fract() == 0.0
1741 && *f >= i64::MIN as f64
1742 && *f <= i64::MAX as f64 =>
1743 {
1744 Value::Int(*f as i64)
1745 }
1746 other => other.clone(),
1747 }
1748 }
1749
1750 /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1751 ///
1752 /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1753 /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1754 /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1755 /// created node's properties come from the original, un-canonicalized map.
1756 fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1757 let mut tuple: MergeKey = key_props
1758 .iter()
1759 .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1760 .collect();
1761 tuple.sort_by(|a, b| a.0.cmp(&b.0));
1762 tuple
1763 }
1764
1765 /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1766 ///
1767 /// Walked once per MERGE statement (issue #69): the per-row fast path then
1768 /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1769 /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1770 /// rows and rows created earlier in the same transaction; rows created by
1771 /// later rows of this same statement are folded in incrementally by
1772 /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1773 /// [`Self::merge_key_tuple`].
1774 fn merge_l0_existing(
1775 &self,
1776 label: &str,
1777 key_names: &[String],
1778 ctx: Option<&QueryContext>,
1779 ) -> HashMap<MergeKey, Vec<Vid>> {
1780 let mut candidates: Vec<Vid> = Vec::new();
1781 l0_visibility::visit_l0_buffers(ctx, |l0| {
1782 if let Some(vids) = l0.label_to_vids.get(label) {
1783 candidates.extend(vids.iter().copied());
1784 }
1785 false
1786 });
1787
1788 let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1789 let mut seen: HashSet<Vid> = HashSet::new();
1790 for vid in candidates {
1791 if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1792 continue;
1793 }
1794 // `lookup_vertex_prop` merges across L0 layers (newest wins).
1795 let tuple: MergeKey = key_names
1796 .iter()
1797 .map(|k| {
1798 let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1799 (k.clone(), Self::canonical_key_value(&v))
1800 })
1801 .collect();
1802 map.entry(tuple).or_default().push(vid);
1803 }
1804 map
1805 }
1806
1807 /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1808 /// size and Lance/DataFusion parse cost; chunks run sequentially.
1809 const MERGE_SCAN_CHUNK: usize = 1000;
1810
1811 /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1812 /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1813 /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1814 /// independent Lance scans).
1815 ///
1816 /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1817 /// read path `MATCH` uses, so it honors the version high-water-mark and
1818 /// sees flushed rows. On the declared-label branch the key-filtered scan
1819 /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1820 /// picks each candidate's max-`_version` row and requires it to be live
1821 /// and still keyed as requested (per-label tables are MVCC-append, so a
1822 /// superseded version's row would otherwise stale-match a rewritten key).
1823 /// Matched rows are grouped by their CANONICAL key tuple (stored values
1824 /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1825 /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1826 /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1827 /// by earlier rows of the same statement) is NOT checked here — the
1828 /// per-row consumer re-checks at row time, exactly as the old per-row
1829 /// scan did.
1830 ///
1831 /// The second returned map carries the FULL property maps the schemaless
1832 /// branch already decoded for each matched vid (empty on the declared-label
1833 /// branch, which projects only key columns) — the caller seeds the
1834 /// statement-level [`Prefetch`] from it at zero extra scans.
1835 ///
1836 /// # Errors
1837 /// Propagates persisted-scan and filter-build failures — fail-closed: a
1838 /// MERGE must never treat a failed lookup as "no match" and create
1839 /// duplicates.
1840 async fn merge_lookup_persisted_batch(
1841 &self,
1842 label: &str,
1843 key_names: &[String],
1844 keys: &HashSet<MergeKey>,
1845 ) -> Result<(
1846 HashMap<MergeKey, Vec<Vid>>,
1847 HashMap<Vid, uni_common::Properties>,
1848 )> {
1849 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1850 if keys.is_empty() {
1851 return Ok((out, HashMap::new()));
1852 }
1853 // An undeclared (schemaless) label has no per-label table — its flushed
1854 // rows live only in the unified main vertex table. Route to the
1855 // main-table lookup, mirroring the planner's scan routing (a schemaless
1856 // MATCH plans `ScanMainByLabels` on the same schema predicate).
1857 if self
1858 .storage
1859 .schema_manager()
1860 .schema()
1861 .get_label_case_insensitive(label)
1862 .is_none()
1863 {
1864 return self
1865 .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1866 .await;
1867 }
1868 // Declared label — the per-label table is MVCC-append (an update
1869 // flush adds a higher-`_version` row for the same vid) and the key
1870 // predicate is pushed into the Lance filter, so a SUPERSEDED version
1871 // whose row still carries a requested key is returned while the vid's
1872 // current row (key rewritten, fails the filter) is invisible to the
1873 // scan. Version dedup among the returned rows cannot detect that, so
1874 // the lookup runs in two passes: the key-filtered scan only nominates
1875 // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1876 // each candidate's max-`_version` row to be live and still keyed as
1877 // requested.
1878 let mut columns: Vec<&str> = vec!["_vid"];
1879 columns.extend(key_names.iter().map(String::as_str));
1880
1881 let key_list: Vec<&MergeKey> = keys.iter().collect();
1882 let mut candidates: Vec<Vid> = Vec::new();
1883 let mut seen: HashSet<Vid> = HashSet::new();
1884 for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1885 let filter = Self::merge_batch_filter(key_names, chunk)
1886 .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1887 let scanned = self
1888 .storage
1889 .scan_vertex_table(label, &columns, Some(&filter))
1890 .await?;
1891 let Some(batch) = scanned else { continue };
1892 let Some(vid_col) = batch
1893 .column_by_name("_vid")
1894 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1895 else {
1896 continue;
1897 };
1898 for i in 0..vid_col.len() {
1899 let vid = Vid::from(vid_col.value(i));
1900 if seen.insert(vid) {
1901 candidates.push(vid);
1902 }
1903 }
1904 }
1905
1906 // Verification pass — tombstones are NOT filtered Lance-side (the
1907 // max-version pick must see them so a deleted winner cannot let an
1908 // older live version resurrect the match), exactly like the
1909 // schemaless branch below.
1910 let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1911 verify_columns.extend(key_names.iter().map(String::as_str));
1912 for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1913 let vid_list = chunk
1914 .iter()
1915 .map(|v| v.as_u64().to_string())
1916 .collect::<Vec<_>>()
1917 .join(", ");
1918 let filter = format!("_vid IN ({vid_list})");
1919 let scanned = self
1920 .storage
1921 .scan_vertex_table(label, &verify_columns, Some(&filter))
1922 .await?;
1923 let Some(batch) = scanned else { continue };
1924 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1925 batch
1926 .column_by_name("_vid")
1927 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1928 batch
1929 .column_by_name("_deleted")
1930 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1931 batch
1932 .column_by_name("_version")
1933 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1934 ) else {
1935 return Err(anyhow!(
1936 "MERGE batched lookup: verification scan missing a required column"
1937 ));
1938 };
1939 let key_cols: Vec<_> = key_names
1940 .iter()
1941 .map(|k| batch.column_by_name(k))
1942 .collect::<Option<Vec<_>>>()
1943 .ok_or_else(|| {
1944 anyhow!("MERGE batched lookup: projected key column missing from scan result")
1945 })?;
1946 // Per-vid MVCC dedup: keep the highest-version row for each vid.
1947 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1948 for i in 0..batch.num_rows() {
1949 let vid = Vid::from(vid_col.value(i));
1950 let ver = ver_col.value(i);
1951 let entry = winners.entry(vid).or_insert((ver, i));
1952 if ver > entry.0 {
1953 *entry = (ver, i);
1954 }
1955 }
1956 for (vid, (_ver, row)) in winners {
1957 if del_col.value(row) {
1958 continue;
1959 }
1960 let tuple: MergeKey = key_names
1961 .iter()
1962 .zip(&key_cols)
1963 .map(|(k, col)| {
1964 let v = uni_store::storage::arrow_convert::arrow_to_value(
1965 col.as_ref(),
1966 row,
1967 None,
1968 );
1969 (k.clone(), Self::canonical_key_value(&v))
1970 })
1971 .collect();
1972 if keys.contains(&tuple) {
1973 out.entry(tuple).or_default().push(vid);
1974 }
1975 }
1976 }
1977 Ok((out, HashMap::new()))
1978 }
1979
1980 /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1981 ///
1982 /// Schemaless rows live only in the unified main vertex table (per-label
1983 /// tables exist only for declared labels), with all properties encoded in
1984 /// the `props_json` CypherValue blob — so key values cannot be pushed into
1985 /// the Lance filter; the key match happens in memory after decoding,
1986 /// exactly like the schemaless MATCH scan. One main-table scan regardless
1987 /// of key count.
1988 ///
1989 /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1990 /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1991 /// version per vid); the per-vid max-`_version` dedup runs here, then
1992 /// deleted winners are dropped.
1993 ///
1994 /// Also returns the full decoded property map per matched vid — the blob
1995 /// is decoded here anyway, and the caller seeds the statement-level
1996 /// [`Prefetch`] from it instead of re-reading per row.
1997 ///
1998 /// # Errors
1999 /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
2000 /// never treat a failed lookup as "no match" and create duplicates.
2001 async fn merge_lookup_persisted_batch_schemaless(
2002 &self,
2003 label: &str,
2004 key_names: &[String],
2005 keys: &HashSet<MergeKey>,
2006 ) -> Result<(
2007 HashMap<MergeKey, Vec<Vid>>,
2008 HashMap<Vid, uni_common::Properties>,
2009 )> {
2010 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2011 let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
2012 let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
2013 let Some(batch) = self
2014 .storage
2015 .scan_main_vertex_table(
2016 &["_vid", "_deleted", "props_json", "_version"],
2017 Some(&filter),
2018 )
2019 .await?
2020 else {
2021 return Ok((out, props_by_vid));
2022 };
2023 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
2024 batch
2025 .column_by_name("_vid")
2026 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2027 batch
2028 .column_by_name("_deleted")
2029 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
2030 batch
2031 .column_by_name("_version")
2032 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
2033 ) else {
2034 return Err(anyhow!(
2035 "schemaless MERGE lookup: main vertex table scan missing a required column"
2036 ));
2037 };
2038 let props_col = batch
2039 .column_by_name("props_json")
2040 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2041
2042 // Per-vid MVCC dedup: keep the highest-version row for each vid.
2043 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
2044 for i in 0..batch.num_rows() {
2045 let vid = Vid::from(vid_col.value(i));
2046 let ver = ver_col.value(i);
2047 let entry = winners.entry(vid).or_insert((ver, i));
2048 if ver > entry.0 {
2049 *entry = (ver, i);
2050 }
2051 }
2052 for (vid, (_ver, row)) in winners {
2053 // Drop deletion tombstones AFTER picking the winner — a deleted
2054 // winner must not let an older live version resurrect the match.
2055 if del_col.value(row) {
2056 continue;
2057 }
2058 // A row without properties matches only an all-Null key tuple.
2059 let props = match props_col {
2060 Some(arr) if !arrow_array::Array::is_null(arr, row) => {
2061 match uni_common::cypher_value_codec::decode(arr.value(row))
2062 .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
2063 {
2064 Value::Map(m) => m,
2065 _ => HashMap::new(),
2066 }
2067 }
2068 _ => HashMap::new(),
2069 };
2070 let tuple: MergeKey = key_names
2071 .iter()
2072 .map(|k| {
2073 (
2074 k.clone(),
2075 Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
2076 )
2077 })
2078 .collect();
2079 if keys.contains(&tuple) {
2080 out.entry(tuple).or_default().push(vid);
2081 props_by_vid.insert(vid, props);
2082 }
2083 }
2084 Ok((out, props_by_vid))
2085 }
2086
2087 /// True if the statement-level MERGE property prefetch is safe for `label`.
2088 ///
2089 /// False when the label declares any CRDT-typed property: a prefetch HIT in
2090 /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
2091 /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
2092 /// labels keep the per-row read path. Undeclared labels are trivially safe
2093 /// (normalization is a no-op without schema CRDT entries).
2094 fn merge_label_prefetch_safe(&self, label: &str) -> bool {
2095 let schema = self.storage.schema_manager().schema();
2096 schema.properties.get(label).is_none_or(|props| {
2097 !props
2098 .values()
2099 .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
2100 })
2101 }
2102
2103 /// True if an L0 override rewrote any key column of a persisted match away
2104 /// from its requested value (so the persisted row no longer matches).
2105 fn vid_overrides_break_key(
2106 vid: Vid,
2107 key_props: &HashMap<String, Value>,
2108 ctx: Option<&QueryContext>,
2109 ) -> bool {
2110 key_props.iter().any(|(k, want)| {
2111 matches!(
2112 l0_visibility::lookup_vertex_prop(vid, k, ctx),
2113 Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
2114 )
2115 })
2116 }
2117
2118 /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2119 /// node variable.
2120 ///
2121 /// Matches the binding shape produced by `execute_create_pattern` and the
2122 /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2123 /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2124 /// valid node binding for those consumers.
2125 fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2126 let mut obj = HashMap::new();
2127 obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2128 obj.insert(
2129 "_labels".to_string(),
2130 Value::List(vec![Value::String(label.to_string())]),
2131 );
2132 for (k, v) in props {
2133 obj.insert(k, v);
2134 }
2135 Value::Map(obj)
2136 }
2137
2138 /// True if an L0-only vertex has every key column set to the requested
2139 /// value. A missing column matches only a requested `Null`.
2140 fn l0_vid_matches_key(
2141 vid: Vid,
2142 key_props: &HashMap<String, Value>,
2143 ctx: Option<&QueryContext>,
2144 ) -> bool {
2145 key_props.iter().all(
2146 |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2147 Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2148 None => *want == Value::Null,
2149 },
2150 )
2151 }
2152
2153 /// Index fast-path execution for one MERGE row of the shape detected by
2154 /// [`Self::merge_single_node_fastpath`].
2155 ///
2156 /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2157 /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2158 /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2159 /// applies ON MATCH SET to every match, or creates the node and applies
2160 /// ON CREATE SET when there is none. A newly created vertex is folded into
2161 /// `existing` so a later row of the same batch with the same key matches it
2162 /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2163 /// match, or one for a create).
2164 ///
2165 /// `prefetched` is the statement-level property prefetch (`None` when the
2166 /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2167 /// vids carry their persisted base row, freshly created vids are seeded
2168 /// with an empty base — per-row reads then resolve as base + L0 layering
2169 /// (every SET flush writes the full row to L0 before the next read, so a
2170 /// prefetch hit equals a fresh read) instead of one storage scan each.
2171 ///
2172 /// # Errors
2173 /// Propagates evaluation, create, and SET failures.
2174 #[expect(
2175 clippy::too_many_arguments,
2176 reason = "mirrors execute_merge's threaded execution state"
2177 )]
2178 async fn execute_merge_row_indexed(
2179 &self,
2180 label: &str,
2181 node: &NodePattern,
2182 path_pattern: &Pattern,
2183 temp_vars: &[String],
2184 mut row: HashMap<String, Value>,
2185 key_props: &HashMap<String, Value>,
2186 persisted: &HashMap<MergeKey, Vec<Vid>>,
2187 key_tuple: &MergeKey,
2188 existing: &mut HashMap<MergeKey, Vec<Vid>>,
2189 on_match: Option<&SetClause>,
2190 on_create: Option<&SetClause>,
2191 prop_manager: &PropertyManager,
2192 params: &HashMap<String, Value>,
2193 ctx: Option<&QueryContext>,
2194 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2195 writer: &Writer,
2196 mut prefetched: Option<&mut Prefetch>,
2197 ) -> Result<Vec<HashMap<String, Value>>> {
2198 let empty_prefetch = Prefetch::default();
2199 let mut seen: HashSet<Vid> = HashSet::new();
2200 let mut matches: Vec<Vid> = Vec::new();
2201 // Persisted (flushed) matches from the per-statement prefetch. The
2202 // prefetch is static for the statement, so re-verify liveness at row
2203 // time — an earlier row of this batch may have deleted the candidate
2204 // or rewritten its key (the old per-row scan saw those through its L0
2205 // overlay checks; these are the same checks, moved to row time).
2206 if let Some(vids) = persisted.get(key_tuple) {
2207 for &vid in vids {
2208 if l0_visibility::is_vertex_deleted(vid, ctx) {
2209 continue;
2210 }
2211 if Self::vid_overrides_break_key(vid, key_props, ctx) {
2212 continue;
2213 }
2214 if seen.insert(vid) {
2215 matches.push(vid);
2216 }
2217 }
2218 }
2219 // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2220 // in case a prior row of this batch mutated or deleted the candidate.
2221 if let Some(vids) = existing.get(key_tuple) {
2222 for &vid in vids {
2223 if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2224 continue;
2225 }
2226 if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2227 matches.push(vid);
2228 }
2229 }
2230 }
2231
2232 let mut out = Vec::new();
2233 if matches.is_empty() {
2234 // No match: create the node, then apply ON CREATE SET. Fold the
2235 // ON CREATE SET property assignments into seed props first so a
2236 // NOT-NULL property supplied only by ON CREATE SET passes
2237 // create-time validation (RC4); the post-create SET below settles
2238 // the final values.
2239 let seed_props = self
2240 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2241 .await?;
2242 self.execute_create_pattern(
2243 path_pattern,
2244 &mut row,
2245 writer,
2246 prop_manager,
2247 params,
2248 ctx,
2249 tx_l0_override,
2250 Some(&seed_props),
2251 )
2252 .await?;
2253 // Fold the new vertex into the batch snapshot for intra-batch
2254 // dedup, and seed the statement prefetch with an empty base: a
2255 // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2256 // resolves from the L0 row the create just wrote instead of
2257 // issuing a per-row storage scan that finds nothing.
2258 if let Some(var) = &node.variable
2259 && let Some(val) = row.get(var)
2260 && let Ok(vid) = Self::vid_from_value(val)
2261 {
2262 existing.entry(key_tuple.clone()).or_default().push(vid);
2263 if let Some(p) = prefetched.as_deref_mut() {
2264 p.vertex.entry(vid).or_default();
2265 }
2266 // Phantom guard (RC2): register this MERGE-create's key so a
2267 // concurrent MERGE of the same key aborts retriably at commit
2268 // (converging to one node) instead of silently duplicating —
2269 // even with no declared UNIQUE constraint. Only inside a
2270 // transaction, where commit re-probes the guard; a plain CREATE
2271 // never registers a key, so it is unaffected.
2272 if let Some(tx_l0) = tx_l0_override {
2273 let key_values: Vec<(String, Value)> = key_props
2274 .iter()
2275 .map(|(k, v)| (k.clone(), v.clone()))
2276 .collect();
2277 let guard_key =
2278 uni_store::runtime::l0::serialize_constraint_key(label, &key_values);
2279 tx_l0.write().insert_merge_guard_key(guard_key, vid);
2280 }
2281 }
2282 if let Some(set) = on_create {
2283 self.execute_set_items_locked(
2284 &set.items,
2285 &mut row,
2286 writer,
2287 prop_manager,
2288 params,
2289 ctx,
2290 tx_l0_override,
2291 prefetched.as_deref().unwrap_or(&empty_prefetch),
2292 )
2293 .await?;
2294 }
2295 Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2296 out.push(row);
2297 } else {
2298 // Apply ON MATCH SET to every matched node (multi-match semantics),
2299 // binding the node variable as a Map with _vid/_labels/props so
2300 // RETURN and downstream operators resolve it as they would for the
2301 // general MATCH and CREATE paths.
2302 for vid in matches {
2303 let mut m = row.clone();
2304 if let Some(var) = &node.variable {
2305 // Minimal binding so ON MATCH SET resolves the node by _vid.
2306 m.insert(
2307 var.clone(),
2308 Self::build_node_map(vid, label, HashMap::new()),
2309 );
2310 }
2311 if let Some(set) = on_match {
2312 self.execute_set_items_locked(
2313 &set.items,
2314 &mut m,
2315 writer,
2316 prop_manager,
2317 params,
2318 ctx,
2319 tx_l0_override,
2320 prefetched.as_deref().unwrap_or(&empty_prefetch),
2321 )
2322 .await?;
2323 }
2324 if let Some(var) = &node.variable {
2325 // Rebind with full, post-SET properties for RETURN
2326 // fidelity. The SET above flushed the full row to L0, so a
2327 // prefetch hit (base + L0 layering) reproduces exactly
2328 // what a fresh storage read would return.
2329 let props = read_vertex_props_with_prefetch(
2330 vid,
2331 prefetched.as_deref().unwrap_or(&empty_prefetch),
2332 prop_manager,
2333 ctx,
2334 )
2335 .await?;
2336 m.insert(var.clone(), Self::build_node_map(vid, label, props));
2337 }
2338 Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2339 out.push(m);
2340 }
2341 }
2342 Ok(out)
2343 }
2344
2345 #[expect(clippy::too_many_arguments)]
2346 pub(crate) async fn execute_merge(
2347 &self,
2348 rows: Vec<HashMap<String, Value>>,
2349 pattern: &Pattern,
2350 on_match: Option<&SetClause>,
2351 on_create: Option<&SetClause>,
2352 prop_manager: &PropertyManager,
2353 params: &HashMap<String, Value>,
2354 ctx: Option<&QueryContext>,
2355 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2356 ) -> Result<Vec<HashMap<String, Value>>> {
2357 let writer_lock = self
2358 .writer
2359 .as_ref()
2360 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2361
2362 // Prepare pattern for path variable binding: assign temp edge variable
2363 // names to unnamed relationships in paths that have path variables.
2364 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2365
2366 // Issue #69: a single-node, single-label MERGE takes the fast path,
2367 // skipping the per-row query planning that made batched MERGE no faster
2368 // than a per-entity loop. Indexed keys get an index point-lookup;
2369 // un-indexed keys still skip planning (the lookup is a filtered scan).
2370 // The shape is the same for every row, so it is detected once.
2371 let fastpath = self.merge_single_node_fastpath(pattern);
2372
2373 // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2374 // fast path then resolves L0/intra-batch matches with an O(1) lookup
2375 // instead of re-walking L0 for every row. `key_names` is the sorted
2376 // static key set, matching `merge_key_tuple`.
2377 let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2378 // Per-row pre-evaluated fast-path keys (None = that row falls back to
2379 // the general path), and the per-statement persisted prefetch over the
2380 // deduped key tuples — ONE chunked scan instead of one scan per row.
2381 // Key expressions only see the row's own bindings + params, so
2382 // evaluating them ahead of any creates cannot observe earlier rows.
2383 let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2384 let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2385 // Statement-level property prefetch for the fast path (review perf
2386 // residual): every persisted match's full row is batch-read ONCE, so
2387 // the per-row ON MATCH SET read and the post-SET rebind resolve as
2388 // prefetch-base + L0 layering instead of one storage scan each.
2389 // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2390 // skips CRDT normalization).
2391 let mut merge_prefetch: Option<Prefetch> = None;
2392 if let Some((node, label)) = &fastpath {
2393 let mut key_names: Vec<String> = match &node.properties {
2394 Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2395 _ => Vec::new(),
2396 };
2397 key_names.sort();
2398 fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2399
2400 row_fast.reserve(rows.len());
2401 for row in &rows {
2402 let mut key_props: HashMap<String, Value> = HashMap::new();
2403 if let Some(props_expr) = &node.properties
2404 && let Value::Map(map) = self
2405 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2406 .await?
2407 {
2408 key_props = map;
2409 }
2410 // Only rows whose every key value is a scalar the persisted
2411 // scan can express take the fast path (same gate as before,
2412 // via the filter builder).
2413 if Self::merge_key_filter(&key_props).is_some() {
2414 let tuple = Self::merge_key_tuple(&key_props);
2415 row_fast.push(Some((key_props, tuple)));
2416 } else {
2417 row_fast.push(None);
2418 }
2419 }
2420 let unique_keys: HashSet<MergeKey> = row_fast
2421 .iter()
2422 .flatten()
2423 .map(|(_, tuple)| tuple.clone())
2424 .collect();
2425 let (persisted, schemaless_props) = self
2426 .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2427 .await?;
2428 fast_persisted = persisted;
2429 if self.merge_label_prefetch_safe(label) {
2430 let mut pf = Prefetch::default();
2431 if !schemaless_props.is_empty() {
2432 // The schemaless lookup already decoded each matched vid's
2433 // full property map — zero extra scans.
2434 pf.vertex.extend(schemaless_props);
2435 } else {
2436 let vids: Vec<Vid> = fast_persisted
2437 .values()
2438 .flatten()
2439 .copied()
2440 .collect::<HashSet<Vid>>()
2441 .into_iter()
2442 .collect();
2443 if !vids.is_empty()
2444 && let Ok(batch_props) = prop_manager
2445 .get_batch_vertex_props_for_label(&vids, label, ctx)
2446 .await
2447 {
2448 // One `_vid IN (…)` scan for every matched row's base.
2449 // On Err the map stays empty — every read falls back to
2450 // the per-row path (fail-open, same posture as
2451 // prefetch_set_targets).
2452 pf.vertex.extend(batch_props);
2453 }
2454 }
2455 merge_prefetch = Some(pf);
2456 }
2457 }
2458
2459 // RC3: relationship-MERGE existence fast-path. The single-node fast path
2460 // above does not cover `(a)-[:R]->(b)`; the general path rebuilds and runs
2461 // a per-row traversal `LogicalPlan` just to check whether the edge exists
2462 // (~19x the bulk CREATE of the same edges). For the bound-endpoints,
2463 // anonymous-edge shape (and no ON MATCH SET, whose match-row semantics the
2464 // general path materialises) we resolve existence with one MVCC-correct
2465 // adjacency probe — `GraphExecutionContext::get_neighbors` merges CSR + all
2466 // L0 buffers including the transaction's own writes, so intra-batch edges
2467 // are seen — and reuse the general create / ON CREATE handling unchanged.
2468 // An ON MATCH SET with actual items needs the general path's materialised
2469 // match rows; a plain MERGE carries an *empty* on_match, which the fast
2470 // path can serve (it emits the row directly, applying nothing on match).
2471 let on_match_empty = on_match.is_none_or(|s| s.items.is_empty());
2472 let rel_fast = if fastpath.is_none() && on_match_empty {
2473 self.merge_relationship_fastpath_shape(pattern)
2474 } else {
2475 None
2476 };
2477 let rel_graph_ctx = rel_fast.as_ref().map(|_| {
2478 let l0_context = match ctx {
2479 Some(c) => crate::query::df_graph::L0Context::from_query_context(c),
2480 None => crate::query::df_graph::L0Context::empty(),
2481 };
2482 let pm_arc = self.prop_manager_arc.clone().unwrap_or_else(|| {
2483 Arc::new(PropertyManager::new(
2484 self.storage.clone(),
2485 self.storage.schema_manager_arc(),
2486 prop_manager.cache_size(),
2487 ))
2488 });
2489 crate::query::df_graph::GraphExecutionContext::with_l0_context(
2490 self.effective_storage(),
2491 l0_context,
2492 pm_arc,
2493 )
2494 });
2495
2496 let mut results = Vec::new();
2497 for (idx, mut row) in rows.into_iter().enumerate() {
2498 // Rows with a pre-evaluated scalar key take the fast path; rows
2499 // with a non-scalar key fall through to the general path below.
2500 if let Some((node, label)) = &fastpath
2501 && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2502 {
2503 let writer: &uni_store::Writer = writer_lock.as_ref();
2504 let row_out = self
2505 .execute_merge_row_indexed(
2506 label,
2507 node,
2508 &path_pattern,
2509 &temp_vars,
2510 row,
2511 key_props,
2512 &fast_persisted,
2513 key_tuple,
2514 &mut fast_existing,
2515 on_match,
2516 on_create,
2517 prop_manager,
2518 params,
2519 ctx,
2520 tx_l0_override,
2521 writer,
2522 merge_prefetch.as_mut(),
2523 )
2524 .await?;
2525 results.extend(row_out);
2526 continue;
2527 }
2528
2529 // RC3 relationship fast path: bound endpoints → resolve edge
2530 // existence with one adjacency probe and reuse the general
2531 // create / ON CREATE handling, skipping the per-row traversal plan.
2532 if let (Some((src_var, dst_var, type_id, dir)), Some(graph_ctx)) =
2533 (rel_fast.as_ref(), rel_graph_ctx.as_ref())
2534 {
2535 let src_vid = row.get(src_var).and_then(|v| Self::vid_from_value(v).ok());
2536 let dst_vid = row.get(dst_var).and_then(|v| Self::vid_from_value(v).ok());
2537 if let (Some(src_vid), Some(dst_vid)) = (src_vid, dst_vid) {
2538 let exists = graph_ctx
2539 .get_neighbors(src_vid, *type_id, *dir)
2540 .into_iter()
2541 .any(|(n, _eid)| n == dst_vid);
2542 let writer: &uni_store::Writer = writer_lock.as_ref();
2543 if !exists {
2544 // Edge absent: create only the edge (endpoints are bound),
2545 // then apply ON CREATE SET — identical to the general
2546 // create branch below.
2547 let seed_props = self
2548 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2549 .await?;
2550 self.execute_create_pattern(
2551 &path_pattern,
2552 &mut row,
2553 writer,
2554 prop_manager,
2555 params,
2556 ctx,
2557 tx_l0_override,
2558 Some(&seed_props),
2559 )
2560 .await?;
2561 if let Some(set) = on_create {
2562 self.execute_set_items_locked(
2563 &set.items,
2564 &mut row,
2565 writer,
2566 prop_manager,
2567 params,
2568 ctx,
2569 tx_l0_override,
2570 &Prefetch::default(),
2571 )
2572 .await?;
2573 }
2574 }
2575 // Whether matched or just created, the edge now exists; bind
2576 // path variables and emit the row (the edge is anonymous, so
2577 // there is no edge binding to reproduce, and ON MATCH SET is
2578 // excluded from this fast path).
2579 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2580 results.push(row);
2581 continue;
2582 }
2583 // Endpoints not bound to vids → fall through to the general path.
2584 }
2585
2586 // General execution: match-or-create per row. (The index fast path
2587 // above already handles single-node, single-label, scalar-indexed
2588 // MERGE — including unique-constrained labels, whose keys are
2589 // indexed — so there is no separate constraint-only fast path.)
2590 let matches = self
2591 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2592 .await?;
2593 let writer: &uni_store::Writer = writer_lock.as_ref();
2594
2595 let result: Result<Vec<HashMap<String, Value>>> = async {
2596 let mut batch = Vec::new();
2597 if !matches.is_empty() {
2598 for mut m in matches {
2599 if let Some(set) = on_match {
2600 self.execute_set_items_locked(
2601 &set.items,
2602 &mut m,
2603 writer,
2604 prop_manager,
2605 params,
2606 ctx,
2607 tx_l0_override,
2608 &Prefetch::default(),
2609 )
2610 .await?;
2611 }
2612 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2613 batch.push(m);
2614 }
2615 } else {
2616 // Fold ON CREATE SET into seed props so a NOT-NULL property
2617 // set only by ON CREATE SET passes create-time validation
2618 // (RC4); the post-create SET below settles the final values.
2619 let seed_props = self
2620 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2621 .await?;
2622 self.execute_create_pattern(
2623 &path_pattern,
2624 &mut row,
2625 writer,
2626 prop_manager,
2627 params,
2628 ctx,
2629 tx_l0_override,
2630 Some(&seed_props),
2631 )
2632 .await?;
2633 if let Some(set) = on_create {
2634 self.execute_set_items_locked(
2635 &set.items,
2636 &mut row,
2637 writer,
2638 prop_manager,
2639 params,
2640 ctx,
2641 tx_l0_override,
2642 &Prefetch::default(),
2643 )
2644 .await?;
2645 }
2646 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2647 batch.push(row);
2648 }
2649 Ok(batch)
2650 }
2651 .await;
2652
2653 results.extend(result?);
2654 }
2655 Ok(results)
2656 }
2657
2658 /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2659 ///
2660 /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2661 /// only by `ON CREATE SET` is present when the MERGE node is created and
2662 /// passes constraint validation (RC4). The right-hand side is evaluated
2663 /// against the current `row`.
2664 ///
2665 /// Items whose right-hand side references the target variable (e.g.
2666 /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2667 /// let the post-create SET read the seeded value and apply the assignment
2668 /// twice. Such items run only post-create, exactly once (unchanged behavior).
2669 ///
2670 /// # Errors
2671 /// Returns an error if evaluating an assignment's right-hand side fails.
2672 pub(crate) async fn on_create_seed_props(
2673 &self,
2674 on_create: Option<&SetClause>,
2675 row: &HashMap<String, Value>,
2676 prop_manager: &PropertyManager,
2677 params: &HashMap<String, Value>,
2678 ctx: Option<&QueryContext>,
2679 ) -> Result<HashMap<String, HashMap<String, Value>>> {
2680 let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2681 let Some(set) = on_create else {
2682 return Ok(seed);
2683 };
2684 for item in &set.items {
2685 if let SetItem::Property { expr, value } = item
2686 && let Expr::Property(var_expr, prop_name) = expr
2687 && let Expr::Variable(var_name) = &**var_expr
2688 // Skip self-referential RHS so the post-create SET (which also
2689 // runs) applies it exactly once rather than reading the seed.
2690 && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2691 value, var_name,
2692 )
2693 {
2694 let val = self
2695 .evaluate_expr(value, row, prop_manager, params, ctx)
2696 .await?;
2697 seed.entry(var_name.clone())
2698 .or_default()
2699 .insert(prop_name.clone(), val);
2700 }
2701 }
2702 Ok(seed)
2703 }
2704
2705 /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2706 #[expect(clippy::too_many_arguments)]
2707 pub(crate) async fn execute_create_pattern(
2708 &self,
2709 pattern: &Pattern,
2710 row: &mut HashMap<String, Value>,
2711 writer: &Writer,
2712 prop_manager: &PropertyManager,
2713 params: &HashMap<String, Value>,
2714 ctx: Option<&QueryContext>,
2715 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2716 // Per-variable properties to gap-fill into newly-created nodes before
2717 // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2718 // NOT-NULL property supplied only by ON CREATE SET passes create-time
2719 // validation (RC4). `None` for plain CREATE.
2720 seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2721 ) -> Result<()> {
2722 for path in &pattern.paths {
2723 let mut prev_vid: Option<Vid> = None;
2724 // (rel_var, type_id, type_name, props_expr, direction)
2725 type PendingRel = (String, u32, String, Option<Expr>, Direction);
2726 let mut rel_pending: Option<PendingRel> = None;
2727
2728 for element in &path.elements {
2729 match element {
2730 PatternElement::Node(n) => {
2731 let mut vid = None;
2732
2733 // Check if node variable already bound in row
2734 if let Some(var) = &n.variable
2735 && let Some(val) = row.get(var)
2736 && let Ok(existing_vid) = Self::vid_from_value(val)
2737 {
2738 vid = Some(existing_vid);
2739 }
2740
2741 // If not bound, create it
2742 if vid.is_none() {
2743 let mut props = HashMap::new();
2744 if let Some(props_expr) = &n.properties {
2745 let props_val = self
2746 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2747 .await?;
2748 if let Value::Map(map) = props_val {
2749 for (k, v) in map {
2750 props.insert(k, v);
2751 }
2752 } else {
2753 return Err(anyhow!("Properties must evaluate to a map"));
2754 }
2755 }
2756
2757 // MERGE ON CREATE SET: gap-fill properties supplied
2758 // only by ON CREATE SET so a NOT-NULL property absent
2759 // from the merge key passes create-time validation
2760 // (RC4). `or_insert` keeps the merge-key/pattern props
2761 // authoritative; the post-create SET re-applies the
2762 // real values, so the final state is unchanged.
2763 if let Some(seed) = seed_props
2764 && let Some(var) = &n.variable
2765 && let Some(var_seed) = seed.get(var)
2766 {
2767 for (k, v) in var_seed {
2768 props.entry(k.clone()).or_insert_with(|| v.clone());
2769 }
2770 }
2771
2772 let schema = self.storage.schema_manager().schema();
2773
2774 // Strict schema: reject undeclared labels.
2775 if self.config.strict_schema {
2776 for label_name in &n.labels {
2777 if schema.get_label_case_insensitive(label_name).is_none() {
2778 return Err(anyhow!(
2779 "Label '{}' is not defined in the schema \
2780 (strict_schema is enabled). \
2781 Declare it with db.schema().label(...).apply() first.",
2782 label_name
2783 ));
2784 }
2785 }
2786 }
2787
2788 // VID generation is label-independent. Pull from the
2789 // per-tx reservoir if set (amortizes the global
2790 // IdAllocator mutex), else fall back to the direct
2791 // per-VID path.
2792 let new_vid = match &self.id_reservoir {
2793 Some(r) => r.next_vid().await?,
2794 None => writer.next_vid().await?,
2795 };
2796
2797 // Enrich with generated columns only for known labels
2798 for label_name in &n.labels {
2799 if schema.get_label_case_insensitive(label_name).is_some() {
2800 self.enrich_properties_with_generated_columns(
2801 label_name,
2802 &mut props,
2803 prop_manager,
2804 params,
2805 ctx,
2806 )
2807 .await?;
2808 }
2809 }
2810
2811 // Validate/coerce against declared types AFTER enrichment, so
2812 // a type mismatch is rejected here rather than silently nulled
2813 // (and the row dropped) at flush — issue #68.
2814 let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2815
2816 // Insert vertex and get back final properties (includes auto-generated embeddings)
2817 let final_props = writer
2818 .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2819 .await?;
2820
2821 // Build node object with final properties (includes embeddings)
2822 if let Some(var) = &n.variable {
2823 let mut obj = HashMap::new();
2824 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2825 let labels_list: Vec<Value> =
2826 n.labels.iter().map(|l| Value::String(l.clone())).collect();
2827 obj.insert("_labels".to_string(), Value::List(labels_list));
2828 for (k, v) in &final_props {
2829 obj.insert(k.clone(), v.clone());
2830 }
2831 // Store node as a Map with _vid, matching MATCH behavior
2832 row.insert(var.clone(), Value::Map(obj));
2833 }
2834 vid = Some(new_vid);
2835 }
2836
2837 let current_vid = vid.unwrap();
2838
2839 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2840 rel_pending.take()
2841 && let Some(src) = prev_vid
2842 {
2843 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2844
2845 if !is_rel_bound {
2846 let mut rel_props = HashMap::new();
2847 if let Some(expr) = rel_props_expr {
2848 let val = self
2849 .evaluate_expr(&expr, row, prop_manager, params, ctx)
2850 .await?;
2851 if let Value::Map(map) = val {
2852 rel_props.extend(map);
2853 }
2854 }
2855 // Validate/coerce edge properties against the declared
2856 // edge-type schema before storing — issue #68.
2857 let edge_schema = self.storage.schema_manager().schema();
2858 let rel_props = Self::coerce_and_validate_props(
2859 rel_props,
2860 &edge_schema,
2861 std::slice::from_ref(&type_name),
2862 )?;
2863 let eid = match &self.id_reservoir {
2864 Some(r) => r.next_eid().await?,
2865 None => writer.next_eid(type_id).await?,
2866 };
2867
2868 // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2869 let (edge_src, edge_dst) = match dir {
2870 Direction::Incoming => (current_vid, src),
2871 _ => (src, current_vid),
2872 };
2873
2874 let store_props = !rel_var.is_empty();
2875 let user_props = if store_props {
2876 rel_props.clone()
2877 } else {
2878 HashMap::new()
2879 };
2880
2881 writer
2882 .insert_edge(
2883 edge_src,
2884 edge_dst,
2885 type_id,
2886 eid,
2887 rel_props,
2888 Some(type_name.clone()),
2889 tx_l0,
2890 )
2891 .await?;
2892
2893 // Edge type name is now stored by insert_edge
2894
2895 if store_props {
2896 let mut edge_map = HashMap::new();
2897 edge_map.insert(
2898 "_eid".to_string(),
2899 Value::Int(eid.as_u64() as i64),
2900 );
2901 edge_map.insert(
2902 "_src".to_string(),
2903 Value::Int(edge_src.as_u64() as i64),
2904 );
2905 edge_map.insert(
2906 "_dst".to_string(),
2907 Value::Int(edge_dst.as_u64() as i64),
2908 );
2909 edge_map
2910 .insert("_type".to_string(), Value::Int(type_id as i64));
2911 // Include user properties so downstream RETURN sees them
2912 for (k, v) in user_props {
2913 edge_map.insert(k, v);
2914 }
2915 row.insert(rel_var, Value::Map(edge_map));
2916 }
2917 }
2918 }
2919 prev_vid = Some(current_vid);
2920 }
2921 PatternElement::Relationship(r) => {
2922 if r.types.len() != 1 {
2923 return Err(anyhow!(
2924 "CREATE relationship must specify exactly one type"
2925 ));
2926 }
2927 let type_name = &r.types[0];
2928 let type_id = if self.config.strict_schema {
2929 let schema = self.storage.schema_manager().schema();
2930 schema
2931 .edge_type_id_by_name_case_insensitive(type_name)
2932 .ok_or_else(|| {
2933 anyhow!(
2934 "Edge type '{}' is not defined in the schema \
2935 (strict_schema is enabled). \
2936 Declare it with db.schema().edge_type(...).apply() first.",
2937 type_name
2938 )
2939 })?
2940 } else {
2941 // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2942 self.storage
2943 .schema_manager()
2944 .get_or_assign_edge_type_id(type_name)
2945 };
2946
2947 rel_pending = Some((
2948 r.variable.clone().unwrap_or_default(),
2949 type_id,
2950 type_name.clone(),
2951 r.properties.clone(),
2952 r.direction.clone(),
2953 ));
2954 }
2955 PatternElement::Parenthesized { .. } => {
2956 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2957 }
2958 }
2959 }
2960 }
2961 Ok(())
2962 }
2963
2964 /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2965 ///
2966 /// These are never valid OpenCypher property values regardless of the declared column
2967 /// type. A `CypherValue` column is the sole exception and is handled by the caller
2968 /// before this is reached.
2969 ///
2970 /// # Errors
2971 /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2972 fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2973 match val {
2974 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2975 anyhow::bail!(
2976 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2977 prop_name
2978 );
2979 }
2980 Value::List(items) => {
2981 for item in items {
2982 if matches!(
2983 item,
2984 Value::Map(_)
2985 | Value::Node(_)
2986 | Value::Edge(_)
2987 | Value::Path(_)
2988 | Value::List(_)
2989 ) {
2990 anyhow::bail!(
2991 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2992 prop_name
2993 );
2994 }
2995 }
2996 }
2997 _ => {}
2998 }
2999 Ok(())
3000 }
3001
3002 /// Validates and coerces `val` against the declared schema type for `prop_name`.
3003 ///
3004 /// Returns the value to actually persist. Beyond the structural checks in
3005 /// [`Self::validate_structural_property_value`], this compares the value against the
3006 /// column's declared `DataType` and:
3007 ///
3008 /// - returns it unchanged when directly storable (including the intentional
3009 /// `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
3010 /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
3011 /// column into the proper `Temporal` value, using the same parser as the Cypher
3012 /// `date()`/`time()`/`datetime()`/`duration()` constructors;
3013 /// - otherwise returns an error, so a type mismatch is surfaced at the call site
3014 /// rather than silently nulled — and the row dropped at flush. See issue #68.
3015 ///
3016 /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
3017 /// behavior.
3018 ///
3019 /// # Errors
3020 /// Returns an error if the value's type is incompatible with the declared column type,
3021 /// or if a string destined for a temporal column is not a valid temporal literal.
3022 fn coerce_and_validate_property_value(
3023 prop_name: &str,
3024 val: Value,
3025 schema: &uni_common::core::schema::Schema,
3026 labels: &[String],
3027 ) -> Result<Value> {
3028 use uni_common::core::schema::DataType;
3029
3030 // Resolve the declared type from the first label that declares this property.
3031 let declared = labels.iter().find_map(|label| {
3032 schema
3033 .properties
3034 .get(label)
3035 .and_then(|props| props.get(prop_name))
3036 .map(|meta| &meta.r#type)
3037 });
3038
3039 // CypherValue columns accept any value (including maps) — skip all checks.
3040 if matches!(declared, Some(DataType::CypherValue)) {
3041 return Ok(val);
3042 }
3043
3044 let Some(dt) = declared else {
3045 // Schemaless property: reject structural values (maps/nodes/edges/paths and
3046 // lists containing them), otherwise store as-is.
3047 Self::validate_structural_property_value(prop_name, &val)?;
3048 return Ok(val);
3049 };
3050
3051 // Sparse vectors carry invariants the type system cannot express (strictly
3052 // ascending unique term ids, finite weights, equal-length arrays, term ids
3053 // within the declared term space). Canonicalize and validate the native value
3054 // at ingest so a malformed sparse vector is a clean `TypeError` here, rather
3055 // than a panic deep in the durable WAL value codec — which reconstructs via
3056 // `SparseVector::new` and previously `.expect()`ed (issue #95). The degraded
3057 // `Value::Map` / `Null` forms fall through to `accepts` unchanged.
3058 if let (DataType::SparseVector { dimensions }, Value::SparseVector { .. }) = (dt, &val) {
3059 return Self::canonicalize_sparse_vector(prop_name, val, *dimensions);
3060 }
3061
3062 // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
3063 // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
3064 // `Vector`) receiving their matching value, and `Null` (always accepted).
3065 if dt.accepts(&val) {
3066 return Ok(val);
3067 }
3068
3069 // Known-safe coercion: a string into a temporal column is parsed as if it had
3070 // been wrapped in the matching Cypher temporal constructor.
3071 if matches!(val, Value::String(_)) {
3072 let ctor = match dt {
3073 DataType::DateTime => Some("DATETIME"),
3074 DataType::Date => Some("DATE"),
3075 DataType::Time => Some("TIME"),
3076 DataType::Duration => Some("DURATION"),
3077 _ => None,
3078 };
3079 if let Some(name) = ctor {
3080 return uni_query_functions::datetime::eval_datetime_function(
3081 name,
3082 std::slice::from_ref(&val),
3083 )
3084 .map_err(|e| {
3085 anyhow!(
3086 "TypeError: property '{}' is declared {:?} but the string value could \
3087 not be parsed as a {} literal: {}",
3088 prop_name,
3089 dt,
3090 name,
3091 e
3092 )
3093 });
3094 }
3095 }
3096
3097 // Not storable and not coercible. Prefer the structural message when the value
3098 // is itself structural (e.g. a map into a scalar column), preserving prior
3099 // behavior; otherwise report the scalar type mismatch.
3100 Self::validate_structural_property_value(prop_name, &val)?;
3101 anyhow::bail!(
3102 "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
3103 prop_name,
3104 dt,
3105 value_type_name(&val)
3106 );
3107 }
3108
3109 /// Canonicalizes and validates a value destined for a `SparseVector` column.
3110 ///
3111 /// Sorts term ids ascending and sums the weights of duplicate term ids (via
3112 /// [`uni_sparse_vector::SparseVector::from_pairs`]), then rejects mismatched array
3113 /// lengths, non-finite weights, and term ids outside the declared `dimensions` term
3114 /// space. Running this at the write boundary keeps the durable WAL codec's
3115 /// `SparseVector::new(..)` reconstruction infallible and mirrors the auto-embed
3116 /// path, which canonicalizes through the same kernel constructor (issue #95).
3117 ///
3118 /// # Errors
3119 /// Returns a `TypeError` if the value violates a sparse-vector invariant or carries
3120 /// a term id at or beyond `dimensions`.
3121 fn canonicalize_sparse_vector(prop_name: &str, val: Value, dimensions: usize) -> Result<Value> {
3122 let Value::SparseVector { indices, values } = val else {
3123 // The caller only routes native `Value::SparseVector` values here.
3124 anyhow::bail!(
3125 "TypeError: property '{}' is declared SparseVector but got {}",
3126 prop_name,
3127 value_type_name(&val)
3128 );
3129 };
3130 if indices.len() != values.len() {
3131 anyhow::bail!(
3132 "TypeError: property '{}' sparse vector has {} term ids but {} weights",
3133 prop_name,
3134 indices.len(),
3135 values.len()
3136 );
3137 }
3138 let pairs: Vec<(u32, f32)> = indices.into_iter().zip(values).collect();
3139 let sv = uni_sparse_vector::SparseVector::from_pairs(pairs).map_err(|e| {
3140 anyhow!(
3141 "TypeError: property '{}' has an invalid sparse vector: {}",
3142 prop_name,
3143 e
3144 )
3145 })?;
3146 // `dimensions` is the term-space cardinality (max term id + 1); the largest
3147 // term id of a canonical (ascending) vector is its last index.
3148 if let Some(&max_term) = sv.indices().last()
3149 && max_term as usize >= dimensions
3150 {
3151 anyhow::bail!(
3152 "TypeError: property '{}' sparse vector term id {} is outside the declared \
3153 term space (dimensions = {})",
3154 prop_name,
3155 max_term,
3156 dimensions
3157 );
3158 }
3159 let (indices, values) = sv.into_parts();
3160 Ok(Value::SparseVector { indices, values })
3161 }
3162
3163 /// Coerces and validates every property in `props` against the declared types for `labels`.
3164 ///
3165 /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
3166 /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
3167 /// before handing properties to the writer, so a type mismatch is rejected up front rather
3168 /// than silently nulled — and the row dropped — at flush (issue #68).
3169 ///
3170 /// # Errors
3171 /// Returns an error on the first property whose value is incompatible with its declared type.
3172 fn coerce_and_validate_props(
3173 props: HashMap<String, Value>,
3174 schema: &uni_common::core::schema::Schema,
3175 labels: &[String],
3176 ) -> Result<HashMap<String, Value>> {
3177 let mut out = HashMap::with_capacity(props.len());
3178 for (k, v) in props {
3179 let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
3180 out.insert(k, cv);
3181 }
3182 Ok(out)
3183 }
3184
3185 #[expect(clippy::too_many_arguments)]
3186 pub(crate) async fn execute_set_items_locked(
3187 &self,
3188 items: &[SetItem],
3189 row: &mut HashMap<String, Value>,
3190 writer: &Writer,
3191 prop_manager: &PropertyManager,
3192 params: &HashMap<String, Value>,
3193 ctx: Option<&QueryContext>,
3194 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3195 prefetched: &Prefetch,
3196 ) -> Result<()> {
3197 // Coalesce SetItem::Property items by target so we do ONE read + ONE
3198 // write per (variable, target) instead of one read-modify-write cycle
3199 // per item. For an UPDATE that sets N properties on the same vertex
3200 // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
3201 // n.confidence = ...`), this collapses N redundant
3202 // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
3203 // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
3204 // the measurement, and the plan in
3205 // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
3206 // for the rationale.
3207 //
3208 // RHS evaluation order is preserved: we evaluate each RHS inline and
3209 // update the row binding immediately, so a later SetItem on the same
3210 // variable that reads `n.<earlier-prop>` sees the new value.
3211 //
3212 // Non-Property variants (Labels, Variable, VariablePlus) are less
3213 // common and have lower payoff; before processing one, we flush any
3214 // pending updates for the same variable so it sees the latest L0
3215 // state and ordering semantics are preserved.
3216 let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
3217 let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
3218
3219 for item in items {
3220 match item {
3221 SetItem::Property { expr, value } => {
3222 if let Expr::Property(var_expr, prop_name) = expr
3223 && let Expr::Variable(var_name) = &**var_expr
3224 && let Some(node_val) = row.get(var_name)
3225 {
3226 if let Ok(vid) = Self::vid_from_value(node_val) {
3227 reject_if_ephemeral_vid(vid)?;
3228 let labels =
3229 Self::extract_labels_from_node(node_val).unwrap_or_default();
3230 let schema = self.storage.schema_manager().schema().clone();
3231
3232 // Lazy one-time read. Always read the full row
3233 // (preserves CRDT merge + constraint validation
3234 // + scan-side L0 visibility). The
3235 // partial-lance-writes optimization happens
3236 // PURELY AT FLUSH TIME via the per-VID
3237 // `vertex_partial_keys` set tracked in L0 — so
3238 // L0 holds the full row, scans see the full
3239 // row, and Lance only receives the touched
3240 // columns. Generated-column-bearing labels
3241 // ride the partial path too (Round 12 §C):
3242 // `enrich_properties_with_generated_columns`
3243 // runs at flush time over the merged-in-L0
3244 // full row, and the produced generator keys
3245 // are appended to `touched` so they land in
3246 // the MergeInsert source.
3247 if !pending_v.contains_key(var_name) {
3248 let storage_cfg = &self.storage.config;
3249 let partial = storage_cfg.partial_lance_writes;
3250 let read = read_vertex_props_with_prefetch(
3251 vid,
3252 prefetched,
3253 prop_manager,
3254 ctx,
3255 )
3256 .await?;
3257 pending_v.insert(
3258 var_name.clone(),
3259 PendingVertexSet {
3260 vid,
3261 labels: labels.clone(),
3262 props: read,
3263 partial,
3264 touched: HashSet::new(),
3265 },
3266 );
3267 }
3268
3269 let val = self
3270 .evaluate_expr(value, row, prop_manager, params, ctx)
3271 .await?;
3272 let val = Self::coerce_and_validate_property_value(
3273 prop_name, val, &schema, &labels,
3274 )?;
3275
3276 let pv = pending_v
3277 .get_mut(var_name)
3278 .expect("inserted above when absent");
3279 pv.props.insert(prop_name.clone(), val.clone());
3280 // Record every SET-assigned key. For the partial path this
3281 // drives the MergeInsert source; for the full path it is the
3282 // signal `refresh_embed_targets` uses to re-embed when a
3283 // source column changed (the full writer ignores it otherwise).
3284 pv.touched.insert(prop_name.clone());
3285
3286 // Update the row binding so subsequent RHS sees the new value.
3287 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3288 node_map.insert(prop_name.clone(), val);
3289 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
3290 node.properties.insert(prop_name.clone(), val);
3291 }
3292 } else if let Value::Map(map) = node_val
3293 && map.get("_eid").is_some_and(|v| !v.is_null())
3294 && map.get("_src").is_some_and(|v| !v.is_null())
3295 && map.get("_dst").is_some_and(|v| !v.is_null())
3296 && (map.get("_type").is_some_and(|v| !v.is_null())
3297 || map.get("_type_name").is_some_and(|v| !v.is_null()))
3298 {
3299 let ei = self.extract_edge_identity(map)?;
3300 reject_if_ephemeral_eid(ei.eid)?;
3301 let schema = self.storage.schema_manager().schema().clone();
3302 // Handle _type as either String or Int (Int from CREATE, String
3303 // from queries). UNWIND on VLP edge lists emits `_type_name`
3304 // instead of `_type`; accept either.
3305 let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3306 let edge_type_name = match type_val {
3307 Some(Value::String(s)) => s.clone(),
3308 Some(Value::Int(id)) => schema
3309 .edge_type_name_by_id_unified(*id as u32)
3310 .unwrap_or_else(|| format!("EdgeType{}", id)),
3311 _ => String::new(),
3312 };
3313
3314 if !pending_e.contains_key(var_name) {
3315 let initial = read_edge_props_with_prefetch(
3316 ei.eid,
3317 prefetched,
3318 prop_manager,
3319 ctx,
3320 )
3321 .await?;
3322 let partial = self.storage.config.partial_lance_writes;
3323 pending_e.insert(
3324 var_name.clone(),
3325 PendingEdgeSet {
3326 src: ei.src,
3327 dst: ei.dst,
3328 edge_type_id: ei.edge_type_id,
3329 eid: ei.eid,
3330 edge_type_name: edge_type_name.clone(),
3331 props: initial,
3332 partial,
3333 touched: HashSet::new(),
3334 },
3335 );
3336 }
3337
3338 let val = self
3339 .evaluate_expr(value, row, prop_manager, params, ctx)
3340 .await?;
3341 let val = Self::coerce_and_validate_property_value(
3342 prop_name,
3343 val,
3344 &schema,
3345 std::slice::from_ref(&edge_type_name),
3346 )?;
3347
3348 let pe = pending_e
3349 .get_mut(var_name)
3350 .expect("inserted above when absent");
3351 pe.props.insert(prop_name.clone(), val.clone());
3352 if pe.partial {
3353 pe.touched.insert(prop_name.clone());
3354 }
3355
3356 // Update the row object so subsequent RHS sees the new value.
3357 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3358 edge_map.insert(prop_name.clone(), val);
3359 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3360 edge.properties.insert(prop_name.clone(), val);
3361 }
3362 } else if let Value::Edge(edge) = node_val {
3363 // Handle Value::Edge directly (when traverse returns Edge objects).
3364 reject_if_ephemeral_eid(edge.eid)?;
3365 let eid = edge.eid;
3366 let src = edge.src;
3367 let dst = edge.dst;
3368 let edge_type_name = edge.edge_type.clone();
3369 let etype =
3370 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3371 let schema = self.storage.schema_manager().schema().clone();
3372
3373 if !pending_e.contains_key(var_name) {
3374 let initial = read_edge_props_with_prefetch(
3375 eid,
3376 prefetched,
3377 prop_manager,
3378 ctx,
3379 )
3380 .await?;
3381 let partial = self.storage.config.partial_lance_writes;
3382 pending_e.insert(
3383 var_name.clone(),
3384 PendingEdgeSet {
3385 src,
3386 dst,
3387 edge_type_id: etype,
3388 eid,
3389 edge_type_name: edge_type_name.clone(),
3390 props: initial,
3391 partial,
3392 touched: HashSet::new(),
3393 },
3394 );
3395 }
3396
3397 let val = self
3398 .evaluate_expr(value, row, prop_manager, params, ctx)
3399 .await?;
3400 let val = Self::coerce_and_validate_property_value(
3401 prop_name,
3402 val,
3403 &schema,
3404 std::slice::from_ref(&edge_type_name),
3405 )?;
3406
3407 let pe = pending_e
3408 .get_mut(var_name)
3409 .expect("inserted above when absent");
3410 pe.props.insert(prop_name.clone(), val.clone());
3411 if pe.partial {
3412 pe.touched.insert(prop_name.clone());
3413 }
3414
3415 // Update the row object so subsequent RHS sees the new value.
3416 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3417 edge.properties.insert(prop_name.clone(), val);
3418 }
3419 }
3420 }
3421 }
3422 SetItem::Labels { variable, labels } => {
3423 // Flush any pending writes for this var so the Labels op
3424 // sees latest L0 state. Other variables' pending writes
3425 // can keep waiting (they're independent).
3426 self.flush_pending_var(
3427 variable,
3428 &mut pending_v,
3429 &mut pending_e,
3430 writer,
3431 prop_manager,
3432 params,
3433 ctx,
3434 tx_l0,
3435 prefetched,
3436 )
3437 .await?;
3438
3439 if let Some(node_val) = row.get(variable)
3440 && let Ok(vid) = Self::vid_from_value(node_val)
3441 {
3442 reject_if_ephemeral_vid(vid)?;
3443 let registry = self
3444 .procedure_registry
3445 .as_ref()
3446 .and_then(|pr| pr.plugin_registry());
3447 reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3448
3449 // Get current labels from node value
3450 let current_labels =
3451 Self::extract_labels_from_node(node_val).unwrap_or_default();
3452
3453 // Determine new labels to add (skip duplicates)
3454 let labels_to_add: Vec<_> = labels
3455 .iter()
3456 .filter(|l| !current_labels.contains(l))
3457 .cloned()
3458 .collect();
3459
3460 if !labels_to_add.is_empty() {
3461 // Resolve the FULL new label set and write it to the
3462 // TRANSACTION buffer (so the change is transactional
3463 // and OCC-conflictable), falling back to the context
3464 // (main) L0 for non-transactional callers. Replace
3465 // semantics via `set_vertex_labels`.
3466 let mut new_labels = current_labels;
3467 new_labels.extend(labels_to_add);
3468 if let Some(ctx) = ctx {
3469 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3470 l0.write().set_vertex_labels(vid, &new_labels);
3471 }
3472
3473 // Update the node value in the row with the new labels.
3474 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3475 let labels_list =
3476 new_labels.into_iter().map(Value::String).collect();
3477 obj.insert("_labels".to_string(), Value::List(labels_list));
3478 }
3479 }
3480 }
3481 }
3482 SetItem::Variable { variable, value }
3483 | SetItem::VariablePlus { variable, value } => {
3484 // Flush this var's pending writes first so the
3485 // replace/merge op sees them as latest L0 state.
3486 self.flush_pending_var(
3487 variable,
3488 &mut pending_v,
3489 &mut pending_e,
3490 writer,
3491 prop_manager,
3492 params,
3493 ctx,
3494 tx_l0,
3495 prefetched,
3496 )
3497 .await?;
3498
3499 let replace = matches!(item, SetItem::Variable { .. });
3500 let op_str = if replace { "=" } else { "+=" };
3501
3502 // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3503 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3504 continue;
3505 }
3506 let rhs = self
3507 .evaluate_expr(value, row, prop_manager, params, ctx)
3508 .await?;
3509 let new_props =
3510 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3511 anyhow!(
3512 "SET {} {} expr: right-hand side must evaluate to a map, \
3513 node, or relationship",
3514 variable,
3515 op_str
3516 )
3517 })?;
3518 self.apply_properties_to_entity(
3519 variable,
3520 new_props,
3521 replace,
3522 row,
3523 writer,
3524 prop_manager,
3525 params,
3526 ctx,
3527 tx_l0,
3528 prefetched,
3529 )
3530 .await?;
3531 }
3532 }
3533 }
3534
3535 // Flush all remaining coalesced writes — one writer call per target.
3536 // Partial entries (no generated columns) call
3537 // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3538 // but the touched-keys hint drives a MergeInsert at flush. Full
3539 // entries continue through the legacy
3540 // `insert_vertex_with_labels` (Append) path with
3541 // generated-column enrichment.
3542 for (_var_name, mut pv) in pending_v {
3543 // A SET that touches an auto-embed source column must refresh the target
3544 // embedding; both the partial MergeInsert and the full Append paths
3545 // otherwise re-write the stale vector. Applied before the branch so both
3546 // writer entry points get the refreshed props + touched keys.
3547 writer.refresh_embed_targets(&mut pv.props, &mut pv.touched, &pv.labels);
3548 if pv.partial {
3549 // Round 12 §C: run the generator enrichment over the
3550 // merged-in-L0 full row, then add the produced generator
3551 // keys to `touched` so they ride the MergeInsert source.
3552 // Idempotent — generators always recompute against the
3553 // post-merge property map.
3554 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3555 for label_name in &pv.labels {
3556 self.enrich_properties_with_generated_columns(
3557 label_name,
3558 &mut pv.props,
3559 prop_manager,
3560 params,
3561 ctx,
3562 )
3563 .await?;
3564 }
3565 for k in pv.props.keys() {
3566 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3567 pv.touched.insert(k.clone());
3568 }
3569 }
3570 writer
3571 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3572 .await?;
3573 } else {
3574 for label_name in &pv.labels {
3575 self.enrich_properties_with_generated_columns(
3576 label_name,
3577 &mut pv.props,
3578 prop_manager,
3579 params,
3580 ctx,
3581 )
3582 .await?;
3583 }
3584 let _ = writer
3585 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3586 .await?;
3587 }
3588 }
3589 for (_var_name, pe) in pending_e {
3590 if pe.partial {
3591 writer
3592 .insert_edge_partial_full(
3593 pe.src,
3594 pe.dst,
3595 pe.edge_type_id,
3596 pe.eid,
3597 pe.props,
3598 Some(pe.edge_type_name),
3599 pe.touched,
3600 tx_l0,
3601 )
3602 .await?;
3603 } else {
3604 writer
3605 .insert_edge(
3606 pe.src,
3607 pe.dst,
3608 pe.edge_type_id,
3609 pe.eid,
3610 pe.props,
3611 Some(pe.edge_type_name),
3612 tx_l0,
3613 )
3614 .await?;
3615 }
3616 }
3617
3618 Ok(())
3619 }
3620
3621 /// Flush pending SET state for a single variable to the writer.
3622 ///
3623 /// Called from the SET loop when about to process a Labels /
3624 /// Variable / VariablePlus item on `var`, so the subsequent op
3625 /// sees latest L0 state and ordering is preserved.
3626 #[expect(clippy::too_many_arguments)]
3627 async fn flush_pending_var(
3628 &self,
3629 var: &str,
3630 pending_v: &mut HashMap<String, PendingVertexSet>,
3631 pending_e: &mut HashMap<String, PendingEdgeSet>,
3632 writer: &Writer,
3633 prop_manager: &PropertyManager,
3634 _params: &HashMap<String, Value>,
3635 ctx: Option<&QueryContext>,
3636 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3637 _prefetched: &Prefetch,
3638 ) -> Result<()> {
3639 if let Some(mut pv) = pending_v.remove(var) {
3640 if pv.partial {
3641 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3642 for label_name in &pv.labels {
3643 self.enrich_properties_with_generated_columns(
3644 label_name,
3645 &mut pv.props,
3646 prop_manager,
3647 _params,
3648 ctx,
3649 )
3650 .await?;
3651 }
3652 for k in pv.props.keys() {
3653 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3654 pv.touched.insert(k.clone());
3655 }
3656 }
3657 writer
3658 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3659 .await?;
3660 } else {
3661 for label_name in &pv.labels {
3662 self.enrich_properties_with_generated_columns(
3663 label_name,
3664 &mut pv.props,
3665 prop_manager,
3666 _params,
3667 ctx,
3668 )
3669 .await?;
3670 }
3671 let _ = writer
3672 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3673 .await?;
3674 }
3675 }
3676 if let Some(pe) = pending_e.remove(var) {
3677 if pe.partial {
3678 writer
3679 .insert_edge_partial_full(
3680 pe.src,
3681 pe.dst,
3682 pe.edge_type_id,
3683 pe.eid,
3684 pe.props,
3685 Some(pe.edge_type_name),
3686 pe.touched,
3687 tx_l0,
3688 )
3689 .await?;
3690 } else {
3691 writer
3692 .insert_edge(
3693 pe.src,
3694 pe.dst,
3695 pe.edge_type_id,
3696 pe.eid,
3697 pe.props,
3698 Some(pe.edge_type_name),
3699 tx_l0,
3700 )
3701 .await?;
3702 }
3703 }
3704 Ok(())
3705 }
3706
3707 /// Execute REMOVE clause items (property removal or label removal).
3708 ///
3709 /// Property removals are batched per variable to avoid stale reads: when
3710 /// multiple properties of the same entity are removed in one REMOVE clause,
3711 /// we read from storage once, null all specified properties, and write back
3712 /// once. This prevents the second removal from reading stale data that
3713 /// doesn't reflect the first removal's L0 write.
3714 #[expect(clippy::too_many_arguments)]
3715 pub(crate) async fn execute_remove_items_locked(
3716 &self,
3717 items: &[RemoveItem],
3718 row: &mut HashMap<String, Value>,
3719 writer: &Writer,
3720 prop_manager: &PropertyManager,
3721 ctx: Option<&QueryContext>,
3722 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3723 prefetched: &Prefetch,
3724 ) -> Result<()> {
3725 // Collect property names to remove, grouped by variable.
3726 // Use Vec<(String, Vec<String>)> to preserve insertion order.
3727 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3728
3729 for item in items {
3730 match item {
3731 RemoveItem::Property(expr) => {
3732 if let Expr::Property(var_expr, prop_name) = expr
3733 && let Expr::Variable(var_name) = &**var_expr
3734 {
3735 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3736 entry.1.push(prop_name.clone());
3737 } else {
3738 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3739 }
3740 }
3741 }
3742 RemoveItem::Labels { variable, labels } => {
3743 self.execute_remove_labels(variable, labels, row, ctx)?;
3744 }
3745 }
3746 }
3747
3748 // Execute batched property removals per variable.
3749 for (var_name, prop_names) in &prop_removals {
3750 let Some(node_val) = row.get(var_name) else {
3751 continue;
3752 };
3753
3754 if let Ok(vid) = Self::vid_from_value(node_val) {
3755 // Vertex property removal
3756 let mut props =
3757 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3758
3759 // Only write back if at least one property actually exists
3760 let removed_count = prop_names
3761 .iter()
3762 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3763 .count();
3764 let any_exist = removed_count > 0;
3765 if any_exist {
3766 writer.track_properties_removed(removed_count, tx_l0);
3767 for prop_name in prop_names {
3768 props.insert(prop_name.clone(), Value::Null);
3769 }
3770 }
3771 // Compute effective properties (post-removal) for _all_props
3772 let effective: HashMap<String, Value> = props
3773 .iter()
3774 .filter(|(_, v)| !v.is_null())
3775 .map(|(k, v)| (k.clone(), v.clone()))
3776 .collect();
3777 if any_exist {
3778 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3779 let _ = writer
3780 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3781 .await?;
3782 }
3783
3784 // Update the row map: set removed props to Null
3785 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3786 for prop_name in prop_names {
3787 node_map.insert(prop_name.clone(), Value::Null);
3788 }
3789 // Set _all_props to the complete effective property set
3790 node_map.insert("_all_props".to_string(), Value::Map(effective));
3791 }
3792 } else if let Value::Map(map) = node_val {
3793 // Edge property removal (map-encoded)
3794 // Check for non-null _eid to skip OPTIONAL MATCH null edges
3795 let mut edge_effective: Option<HashMap<String, Value>> = None;
3796 if map.get("_eid").is_some_and(|v| !v.is_null()) {
3797 let ei = self.extract_edge_identity(map)?;
3798 let mut props =
3799 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3800 .await?;
3801
3802 let removed_count = prop_names
3803 .iter()
3804 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3805 .count();
3806 let any_exist = removed_count > 0;
3807 if any_exist {
3808 writer.track_properties_removed(removed_count, tx_l0);
3809 for prop_name in prop_names {
3810 props.insert(prop_name.to_string(), Value::Null);
3811 }
3812 }
3813 // Compute effective properties (post-removal) for _all_props
3814 edge_effective = Some(
3815 props
3816 .iter()
3817 .filter(|(_, v)| !v.is_null())
3818 .map(|(k, v)| (k.clone(), v.clone()))
3819 .collect(),
3820 );
3821 if any_exist {
3822 let edge_type_name = map
3823 .get("_type")
3824 .and_then(|v| v.as_str())
3825 .map(|s| s.to_string())
3826 .or_else(|| {
3827 self.storage
3828 .schema_manager()
3829 .edge_type_name_by_id_unified(ei.edge_type_id)
3830 });
3831 writer
3832 .insert_edge(
3833 ei.src,
3834 ei.dst,
3835 ei.edge_type_id,
3836 ei.eid,
3837 props,
3838 edge_type_name,
3839 tx_l0,
3840 )
3841 .await?;
3842 }
3843 }
3844
3845 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3846 for prop_name in prop_names {
3847 edge_map.insert(prop_name.clone(), Value::Null);
3848 }
3849 if let Some(effective) = edge_effective {
3850 edge_map.insert("_all_props".to_string(), Value::Map(effective));
3851 }
3852 }
3853 } else if let Value::Edge(edge) = node_val {
3854 // Edge property removal (Value::Edge)
3855 let eid = edge.eid;
3856 let src = edge.src;
3857 let dst = edge.dst;
3858 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3859
3860 let mut props =
3861 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3862
3863 let removed_count = prop_names
3864 .iter()
3865 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3866 .count();
3867 if removed_count > 0 {
3868 writer.track_properties_removed(removed_count, tx_l0);
3869 for prop_name in prop_names {
3870 props.insert(prop_name.to_string(), Value::Null);
3871 }
3872 writer
3873 .insert_edge(
3874 src,
3875 dst,
3876 etype,
3877 eid,
3878 props,
3879 Some(edge.edge_type.clone()),
3880 tx_l0,
3881 )
3882 .await?;
3883 }
3884
3885 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3886 for prop_name in prop_names {
3887 edge.properties.insert(prop_name.to_string(), Value::Null);
3888 }
3889 }
3890 }
3891 }
3892
3893 Ok(())
3894 }
3895
3896 /// Execute label removal.
3897 pub(crate) fn execute_remove_labels(
3898 &self,
3899 variable: &str,
3900 labels: &[String],
3901 row: &mut HashMap<String, Value>,
3902 ctx: Option<&QueryContext>,
3903 ) -> Result<()> {
3904 if let Some(node_val) = row.get(variable)
3905 && let Ok(vid) = Self::vid_from_value(node_val)
3906 {
3907 reject_if_ephemeral_vid(vid)?;
3908 let registry = self
3909 .procedure_registry
3910 .as_ref()
3911 .and_then(|pr| pr.plugin_registry());
3912 reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3913
3914 // Get current labels from node value
3915 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3916
3917 // Determine which labels to actually remove (only those currently present)
3918 let labels_to_remove: Vec<_> = labels
3919 .iter()
3920 .filter(|l| current_labels.contains(l))
3921 .collect();
3922
3923 if !labels_to_remove.is_empty() {
3924 // Resolve the FULL remaining label set and write it to the
3925 // TRANSACTION buffer (transactional + OCC-conflictable), falling
3926 // back to the context (main) L0 for non-transactional callers.
3927 let remaining_labels: Vec<String> = current_labels
3928 .iter()
3929 .filter(|l| !labels_to_remove.contains(l))
3930 .cloned()
3931 .collect();
3932 if let Some(ctx) = ctx {
3933 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3934 l0.write().set_vertex_labels(vid, &remaining_labels);
3935 }
3936
3937 // Update the node value in the row with the remaining labels.
3938 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3939 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3940 obj.insert("_labels".to_string(), Value::List(labels_list));
3941 }
3942 }
3943 }
3944 Ok(())
3945 }
3946
3947 /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3948 /// by looking up the type from the L0 buffer's edge endpoints.
3949 fn resolve_edge_type_id_for_edge(
3950 &self,
3951 edge: &crate::types::Edge,
3952 writer: &Writer,
3953 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3954 ) -> Result<u32> {
3955 if !edge.edge_type.is_empty() {
3956 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3957 }
3958 // Edge type name is empty (e.g., from anonymous MATCH patterns).
3959 // Look up the edge type ID from the L0 buffer's edge endpoints.
3960 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3961 return Ok(etype);
3962 }
3963 Err(anyhow!(
3964 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3965 edge.eid
3966 ))
3967 }
3968
3969 /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3970 pub(crate) async fn execute_delete_item_locked(
3971 &self,
3972 val: &Value,
3973 detach: bool,
3974 writer: &Writer,
3975 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3976 ) -> Result<()> {
3977 match val {
3978 Value::Null => {
3979 // DELETE null is a no-op per OpenCypher spec
3980 }
3981 Value::Path(path) => {
3982 // Delete path edges first, then nodes
3983 for edge in &path.edges {
3984 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3985 writer
3986 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3987 .await?;
3988 }
3989 for node in &path.nodes {
3990 self.execute_delete_vertex(
3991 node.vid,
3992 detach,
3993 Some(node.labels.clone()),
3994 writer,
3995 tx_l0,
3996 )
3997 .await?;
3998 }
3999 }
4000 _ => {
4001 // Try Path reconstruction from Map first (Arrow loses Path type)
4002 if let Ok(path) = Path::try_from(val) {
4003 for edge in &path.edges {
4004 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
4005 writer
4006 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
4007 .await?;
4008 }
4009 for node in &path.nodes {
4010 self.execute_delete_vertex(
4011 node.vid,
4012 detach,
4013 Some(node.labels.clone()),
4014 writer,
4015 tx_l0,
4016 )
4017 .await?;
4018 }
4019 } else if let Ok(vid) = Self::vid_from_value(val) {
4020 let labels = Self::extract_labels_from_node(val);
4021 self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
4022 .await?;
4023 } else if let Value::Map(map) = val {
4024 self.execute_delete_edge_from_map(map, writer, tx_l0)
4025 .await?;
4026 } else if let Value::Edge(edge) = val {
4027 reject_if_ephemeral_eid(edge.eid)?;
4028 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
4029 let registry = self
4030 .procedure_registry
4031 .as_ref()
4032 .and_then(|pr| pr.plugin_registry());
4033 reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
4034 writer
4035 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
4036 .await?;
4037 }
4038 }
4039 }
4040 Ok(())
4041 }
4042
4043 /// Execute vertex deletion with optional detach.
4044 pub(crate) async fn execute_delete_vertex(
4045 &self,
4046 vid: Vid,
4047 detach: bool,
4048 labels: Option<Vec<String>>,
4049 writer: &Writer,
4050 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4051 ) -> Result<()> {
4052 reject_if_ephemeral_vid(vid)?;
4053 if let Some(ls) = labels.as_deref() {
4054 let registry = self
4055 .procedure_registry
4056 .as_ref()
4057 .and_then(|pr| pr.plugin_registry());
4058 reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
4059 }
4060 if detach {
4061 self.detach_delete_vertex(vid, writer, tx_l0).await?;
4062 } else {
4063 self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
4064 }
4065 writer.delete_vertex(vid, labels, tx_l0).await?;
4066 Ok(())
4067 }
4068
4069 /// Check that a vertex has no edges (required for non-DETACH DELETE).
4070 ///
4071 /// Loads the subgraph from storage, then excludes edges that have been
4072 /// tombstoned in the writer's L0 or the transaction's L0. This ensures
4073 /// edges deleted earlier in the same DELETE clause are properly excluded.
4074 pub(crate) async fn check_vertex_has_no_edges(
4075 &self,
4076 vid: Vid,
4077 writer: &Writer,
4078 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4079 ) -> Result<()> {
4080 let schema = self.storage.schema_manager().schema();
4081 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
4082
4083 // Collect tombstoned edge IDs from both the writer L0 and tx L0.
4084 let mut tombstoned_eids = std::collections::HashSet::new();
4085 {
4086 let writer_l0 = writer.l0_manager.get_current();
4087 let guard = writer_l0.read();
4088 for &eid in guard.tombstones.keys() {
4089 tombstoned_eids.insert(eid);
4090 }
4091 }
4092 if let Some(tx) = tx_l0 {
4093 let guard = tx.read();
4094 for &eid in guard.tombstones.keys() {
4095 tombstoned_eids.insert(eid);
4096 }
4097 }
4098
4099 let out_graph = self
4100 .storage
4101 .load_subgraph_cached(
4102 &[vid],
4103 &edge_type_ids,
4104 1,
4105 uni_store::runtime::Direction::Outgoing,
4106 Some(writer.l0_manager.get_current()),
4107 )
4108 .await?;
4109 let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4110
4111 let in_graph = self
4112 .storage
4113 .load_subgraph_cached(
4114 &[vid],
4115 &edge_type_ids,
4116 1,
4117 uni_store::runtime::Direction::Incoming,
4118 Some(writer.l0_manager.get_current()),
4119 )
4120 .await?;
4121 let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
4122
4123 if has_out || has_in {
4124 return Err(anyhow!(
4125 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
4126 vid
4127 ));
4128 }
4129 Ok(())
4130 }
4131
4132 /// Execute edge deletion from a map representation.
4133 pub(crate) async fn execute_delete_edge_from_map(
4134 &self,
4135 map: &HashMap<String, Value>,
4136 writer: &Writer,
4137 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
4138 ) -> Result<()> {
4139 // Check for non-null _eid to skip OPTIONAL MATCH null edges
4140 if map.get("_eid").is_some_and(|v| !v.is_null()) {
4141 let ei = self.extract_edge_identity(map)?;
4142 reject_if_ephemeral_eid(ei.eid)?;
4143 let registry = self
4144 .procedure_registry
4145 .as_ref()
4146 .and_then(|pr| pr.plugin_registry());
4147 reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
4148 writer
4149 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
4150 .await?;
4151 }
4152 Ok(())
4153 }
4154
4155 /// Build a scan plan node.
4156 ///
4157 /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
4158 /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
4159 /// - `label_id == 0` without labels: unlabeled → `ScanAll`
4160 fn make_scan_plan(
4161 label_id: u16,
4162 labels: Vec<String>,
4163 variable: String,
4164 filter: Option<Expr>,
4165 ) -> LogicalPlan {
4166 if label_id > 0 {
4167 LogicalPlan::Scan {
4168 label_id,
4169 labels,
4170 variable,
4171 filter,
4172 optional: false,
4173 }
4174 } else if !labels.is_empty() {
4175 // Schemaless label: use ScanMainByLabels to filter by label name
4176 LogicalPlan::ScanMainByLabels {
4177 labels,
4178 variable,
4179 filter,
4180 optional: false,
4181 }
4182 } else {
4183 LogicalPlan::ScanAll {
4184 variable,
4185 filter,
4186 optional: false,
4187 }
4188 }
4189 }
4190
4191 /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
4192 /// already contains prior operators.
4193 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
4194 if matches!(plan, LogicalPlan::Empty) {
4195 scan
4196 } else {
4197 LogicalPlan::CrossJoin {
4198 left: Box::new(plan),
4199 right: Box::new(scan),
4200 }
4201 }
4202 }
4203
4204 /// Resolve MERGE property map expressions against the current row context.
4205 ///
4206 /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
4207 /// property expressions that reference bound variables. These need to be
4208 /// evaluated to concrete literal values before being converted to filter
4209 /// expressions by `properties_to_expr()`.
4210 async fn resolve_merge_properties(
4211 &self,
4212 properties: &Option<Expr>,
4213 row: &HashMap<String, Value>,
4214 prop_manager: &PropertyManager,
4215 params: &HashMap<String, Value>,
4216 ctx: Option<&QueryContext>,
4217 ) -> Result<Option<Expr>> {
4218 let entries = match properties {
4219 Some(Expr::Map(entries)) => entries,
4220 other => return Ok(other.clone()),
4221 };
4222 let mut resolved = Vec::new();
4223 for (key, val_expr) in entries {
4224 if matches!(val_expr, Expr::Literal(_)) {
4225 resolved.push((key.clone(), val_expr.clone()));
4226 } else {
4227 let value = self
4228 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
4229 .await?;
4230 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
4231 }
4232 }
4233 Ok(Some(Expr::Map(resolved)))
4234 }
4235
4236 /// Convert a runtime Value back to an AST literal expression.
4237 fn value_to_literal_expr(value: &Value) -> Expr {
4238 match value {
4239 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
4240 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
4241 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
4242 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
4243 Value::Null => Expr::Literal(CypherLiteral::Null),
4244 Value::List(items) => {
4245 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
4246 }
4247 Value::Map(entries) => Expr::Map(
4248 entries
4249 .iter()
4250 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
4251 .collect(),
4252 ),
4253 _ => Expr::Literal(CypherLiteral::Null),
4254 }
4255 }
4256
4257 pub(crate) async fn execute_merge_match(
4258 &self,
4259 pattern: &Pattern,
4260 row: &HashMap<String, Value>,
4261 prop_manager: &PropertyManager,
4262 params: &HashMap<String, Value>,
4263 ctx: Option<&QueryContext>,
4264 ) -> Result<Vec<HashMap<String, Value>>> {
4265 // Construct a LogicalPlan for the MATCH part of MERGE
4266 let planner =
4267 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
4268
4269 // We need to construct a CypherQuery to use the planner's plan() method,
4270 // or we can manually construct the LogicalPlan.
4271 // Manual construction is safer as we don't have to round-trip through AST.
4272
4273 let mut plan = LogicalPlan::Empty;
4274 let mut vars_in_scope = Vec::new();
4275
4276 // Add existing bound variables from row to scope
4277 for key in row.keys() {
4278 vars_in_scope.push(key.clone());
4279 }
4280
4281 // Reconstruct Match logic from Planner (simplified for MERGE pattern)
4282 for path in &pattern.paths {
4283 let elements = &path.elements;
4284 let mut i = 0;
4285 while i < elements.len() {
4286 let part = &elements[i];
4287 match part {
4288 PatternElement::Node(n) => {
4289 let variable = n.variable.clone().unwrap_or_default();
4290
4291 // If variable is already bound in the input row, we filter
4292 let is_bound = !variable.is_empty() && row.contains_key(&variable);
4293
4294 if is_bound {
4295 // If bound, we must Scan this specific VID to start the chain
4296 // Extract VID from row
4297 let val = row.get(&variable).unwrap();
4298 let vid = Self::vid_from_value(val)?;
4299
4300 // In the new storage model, VIDs don't embed label info.
4301 // We get label from the node value if available, otherwise use 0 to scan all.
4302 let extracted_labels =
4303 Self::extract_labels_from_node(val).unwrap_or_default();
4304 let label_id = {
4305 let schema = self.storage.schema_manager().schema();
4306 extracted_labels
4307 .first()
4308 .and_then(|l| schema.label_id_by_name(l))
4309 .unwrap_or(0)
4310 };
4311
4312 let resolved_props = self
4313 .resolve_merge_properties(
4314 &n.properties,
4315 row,
4316 prop_manager,
4317 params,
4318 ctx,
4319 )
4320 .await?;
4321 let prop_filter =
4322 planner.properties_to_expr(&variable, &resolved_props);
4323
4324 // Create a filter expression for VID: variable._vid = vid
4325 // But our expression engine handles `Expr::Variable` as column.
4326 // We can inject a filter `id(variable) = vid` if we had `id()` function.
4327 // Or we use internal property `_vid`.
4328
4329 // Note: Scan supports `filter`.
4330 // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4331
4332 let vid_filter = Expr::BinaryOp {
4333 left: Box::new(Expr::Property(
4334 Box::new(Expr::Variable(variable.clone())),
4335 "_vid".to_string(),
4336 )),
4337 op: BinaryOp::Eq,
4338 right: Box::new(Expr::Literal(CypherLiteral::Integer(
4339 vid.as_u64() as i64,
4340 ))),
4341 };
4342
4343 let combined_filter = if let Some(pf) = prop_filter {
4344 Some(Expr::BinaryOp {
4345 left: Box::new(vid_filter),
4346 op: BinaryOp::And,
4347 right: Box::new(pf),
4348 })
4349 } else {
4350 Some(vid_filter)
4351 };
4352
4353 let scan = Self::make_scan_plan(
4354 label_id,
4355 extracted_labels,
4356 variable.clone(),
4357 combined_filter,
4358 );
4359 plan = Self::attach_scan(plan, scan);
4360 } else {
4361 let label_id = if n.labels.is_empty() {
4362 // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4363 0
4364 } else {
4365 let label_name = &n.labels[0];
4366 let schema = self.storage.schema_manager().schema();
4367 if self.config.strict_schema {
4368 schema
4369 .get_label_case_insensitive(label_name)
4370 .map(|m| m.id)
4371 .ok_or_else(|| {
4372 anyhow!(
4373 "Label '{}' is not defined in the schema \
4374 (strict_schema is enabled). \
4375 Declare it with db.schema().label(...).apply() first.",
4376 label_name
4377 )
4378 })?
4379 } else {
4380 // Fall back to label_id 0 (any/schemaless) when not in schema.
4381 schema
4382 .get_label_case_insensitive(label_name)
4383 .map(|m| m.id)
4384 .unwrap_or(0)
4385 }
4386 };
4387
4388 let resolved_props = self
4389 .resolve_merge_properties(
4390 &n.properties,
4391 row,
4392 prop_manager,
4393 params,
4394 ctx,
4395 )
4396 .await?;
4397 let prop_filter =
4398 planner.properties_to_expr(&variable, &resolved_props);
4399 let scan = Self::make_scan_plan(
4400 label_id,
4401 n.labels.names().to_vec(),
4402 variable.clone(),
4403 prop_filter,
4404 );
4405 plan = Self::attach_scan(plan, scan);
4406
4407 // Add label filters when:
4408 // 1. Multiple labels with a known schema label: filter for
4409 // additional labels (Scan only scans by the first label).
4410 // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4411 // nodes, so we must filter to only those with the
4412 // specified label(s).
4413 if !n.labels.is_empty()
4414 && !variable.is_empty()
4415 && (label_id == 0 || n.labels.len() > 1)
4416 && let Some(label_filter) =
4417 planner.node_filter_expr(&variable, &n.labels, &None)
4418 {
4419 plan = LogicalPlan::Filter {
4420 input: Box::new(plan),
4421 predicate: label_filter,
4422 optional_variables: std::collections::HashSet::new(),
4423 };
4424 }
4425
4426 if !variable.is_empty() {
4427 vars_in_scope.push(variable.clone());
4428 }
4429 }
4430
4431 // Now look ahead for relationship
4432 i += 1;
4433 while i < elements.len() {
4434 if let PatternElement::Relationship(r) = &elements[i] {
4435 let target_node_part = &elements[i + 1];
4436 if let PatternElement::Node(n_target) = target_node_part {
4437 let schema = self.storage.schema_manager().schema();
4438 let mut edge_type_ids = Vec::new();
4439
4440 if r.types.is_empty() {
4441 return Err(anyhow!("MERGE edge must have a type"));
4442 } else if r.types.len() > 1 {
4443 return Err(anyhow!(
4444 "MERGE does not support multiple edge types"
4445 ));
4446 } else {
4447 let type_name = &r.types[0];
4448 let type_id = if self.config.strict_schema {
4449 let s = self.storage.schema_manager().schema();
4450 s.edge_type_id_by_name_case_insensitive(type_name)
4451 .ok_or_else(|| {
4452 anyhow!(
4453 "Edge type '{}' is not defined in the schema \
4454 (strict_schema is enabled).",
4455 type_name
4456 )
4457 })?
4458 } else {
4459 // Schemaless: assign new ID if not found.
4460 self.storage
4461 .schema_manager()
4462 .get_or_assign_edge_type_id(type_name)
4463 };
4464 edge_type_ids.push(type_id);
4465 }
4466
4467 // Resolve target label ID. For schemaless labels (not in the
4468 // schema), fall back to 0 which means "any label" in traversal.
4469 let target_label_id: u16 = if let Some(lbl) =
4470 n_target.labels.first()
4471 {
4472 schema
4473 .get_label_case_insensitive(lbl)
4474 .map(|m| m.id)
4475 .unwrap_or(0)
4476 } else if let Some(var) = &n_target.variable {
4477 if let Some(val) = row.get(var) {
4478 // In the new storage model, get labels from node value
4479 if let Some(labels) =
4480 Self::extract_labels_from_node(val)
4481 {
4482 if let Some(first_label) = labels.first() {
4483 schema
4484 .get_label_case_insensitive(first_label)
4485 .map(|m| m.id)
4486 .unwrap_or(0)
4487 } else {
4488 // Bound node with no labels — schemaless, any
4489 0
4490 }
4491 } else if Self::vid_from_value(val).is_ok() {
4492 // VID without label info — schemaless, any
4493 0
4494 } else {
4495 return Err(anyhow!(
4496 "Variable {} is not a node",
4497 var
4498 ));
4499 }
4500 } else {
4501 return Err(anyhow!(
4502 "MERGE pattern node must have a label or be a bound variable"
4503 ));
4504 }
4505 } else {
4506 return Err(anyhow!(
4507 "MERGE pattern node must have a label"
4508 ));
4509 };
4510
4511 let target_variable =
4512 n_target.variable.clone().unwrap_or_default();
4513 let source_variable = match &elements[i - 1] {
4514 PatternElement::Node(n) => {
4515 n.variable.clone().unwrap_or_default()
4516 }
4517 _ => String::new(),
4518 };
4519
4520 let is_variable_length = r.range.is_some();
4521 let type_name = &r.types[0];
4522
4523 // Use TraverseMainByType for schemaless edge types
4524 // (same as MATCH planner) so edge properties are loaded
4525 // correctly from storage + L0 via the adjacency map.
4526 // Regular Traverse only loads properties via
4527 // property_manager which doesn't handle schemaless types.
4528 let is_schemaless = edge_type_ids.iter().all(|id| {
4529 uni_common::core::edge_type::is_schemaless_edge_type(*id)
4530 });
4531
4532 if is_schemaless {
4533 plan = LogicalPlan::TraverseMainByType {
4534 type_names: vec![type_name.clone()],
4535 input: Box::new(plan),
4536 direction: r.direction.clone(),
4537 source_variable,
4538 target_variable: target_variable.clone(),
4539 step_variable: r.variable.clone(),
4540 min_hops: r
4541 .range
4542 .as_ref()
4543 .and_then(|r| r.min)
4544 .unwrap_or(1)
4545 as usize,
4546 max_hops: r
4547 .range
4548 .as_ref()
4549 .and_then(|r| r.max)
4550 .unwrap_or(1)
4551 as usize,
4552 optional: false,
4553 target_filter: None,
4554 path_variable: None,
4555 is_variable_length,
4556 optional_pattern_vars: std::collections::HashSet::new(),
4557 scope_match_variables: std::collections::HashSet::new(),
4558 edge_filter_expr: None,
4559 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4560 };
4561 } else {
4562 // Collect edge property names needed for MERGE filter
4563 let mut edge_props = std::collections::HashSet::new();
4564 if let Some(Expr::Map(entries)) = &r.properties {
4565 for (key, _) in entries {
4566 edge_props.insert(key.clone());
4567 }
4568 }
4569 plan = LogicalPlan::Traverse {
4570 input: Box::new(plan),
4571 edge_type_ids: edge_type_ids.clone(),
4572 direction: r.direction.clone(),
4573 source_variable,
4574 target_variable: target_variable.clone(),
4575 target_label_id,
4576 step_variable: r.variable.clone(),
4577 min_hops: r
4578 .range
4579 .as_ref()
4580 .and_then(|r| r.min)
4581 .unwrap_or(1)
4582 as usize,
4583 max_hops: r
4584 .range
4585 .as_ref()
4586 .and_then(|r| r.max)
4587 .unwrap_or(1)
4588 as usize,
4589 optional: false,
4590 target_filter: None,
4591 path_variable: None,
4592 edge_properties: edge_props,
4593 is_variable_length,
4594 optional_pattern_vars: std::collections::HashSet::new(),
4595 scope_match_variables: std::collections::HashSet::new(),
4596 edge_filter_expr: None,
4597 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4598 qpp_steps: None,
4599 };
4600 }
4601
4602 // Apply property filters for relationship
4603 if r.properties.is_some()
4604 && let Some(r_var) = &r.variable
4605 {
4606 let resolved_rel_props = self
4607 .resolve_merge_properties(
4608 &r.properties,
4609 row,
4610 prop_manager,
4611 params,
4612 ctx,
4613 )
4614 .await?;
4615 if let Some(prop_filter) =
4616 planner.properties_to_expr(r_var, &resolved_rel_props)
4617 {
4618 plan = LogicalPlan::Filter {
4619 input: Box::new(plan),
4620 predicate: prop_filter,
4621 optional_variables: std::collections::HashSet::new(
4622 ),
4623 };
4624 }
4625 }
4626
4627 // Apply property filters for target node if it was new
4628 if !target_variable.is_empty() {
4629 let resolved_target_props = self
4630 .resolve_merge_properties(
4631 &n_target.properties,
4632 row,
4633 prop_manager,
4634 params,
4635 ctx,
4636 )
4637 .await?;
4638 if let Some(prop_filter) = planner.properties_to_expr(
4639 &target_variable,
4640 &resolved_target_props,
4641 ) {
4642 plan = LogicalPlan::Filter {
4643 input: Box::new(plan),
4644 predicate: prop_filter,
4645 optional_variables: std::collections::HashSet::new(
4646 ),
4647 };
4648 }
4649 vars_in_scope.push(target_variable.clone());
4650 }
4651
4652 if let Some(sv) = &r.variable {
4653 vars_in_scope.push(sv.clone());
4654 }
4655 i += 2;
4656 } else {
4657 break;
4658 }
4659 } else {
4660 break;
4661 }
4662 }
4663 }
4664 _ => return Err(anyhow!("Pattern must start with a node")),
4665 }
4666 }
4667
4668 // Execute the plan to find all matches, then filter against bound variables in `row`.
4669 }
4670
4671 let db_matches = self
4672 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4673 .await?;
4674
4675 // Keep only DB results that are consistent with the input row bindings.
4676 // Skip internal keys (starting with "__") as they are implementation
4677 // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4678 // Also skip the empty-string key (""), which is the placeholder variable
4679 // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4680 // and must not constrain the current pattern's match.
4681 let final_matches = db_matches
4682 .into_iter()
4683 .filter(|db_match| {
4684 row.iter().all(|(key, val)| {
4685 if key.is_empty() || key.starts_with("__") {
4686 return true;
4687 }
4688 let Some(db_val) = db_match.get(key) else {
4689 return true;
4690 };
4691 if db_val == val {
4692 return true;
4693 }
4694 // Values differ -- treat as consistent if they represent the same VID
4695 matches!(
4696 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4697 (Ok(v1), Ok(v2)) if v1 == v2
4698 )
4699 })
4700 })
4701 .map(|db_match| {
4702 let mut merged = row.clone();
4703 merged.extend(db_match);
4704 merged
4705 })
4706 .collect();
4707
4708 Ok(final_matches)
4709 }
4710
4711 /// Prepare a MERGE pattern for path variable binding.
4712 ///
4713 /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4714 /// unnamed relationships need internal variable names so that `execute_create_pattern`
4715 /// stores the edge data in the row for later path construction.
4716 ///
4717 /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4718 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4719 let has_path_vars = pattern
4720 .paths
4721 .iter()
4722 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4723
4724 if !has_path_vars {
4725 return (pattern.clone(), Vec::new());
4726 }
4727
4728 let mut modified = pattern.clone();
4729 let mut temp_vars = Vec::new();
4730
4731 for path in &mut modified.paths {
4732 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4733 continue;
4734 }
4735 for (idx, element) in path.elements.iter_mut().enumerate() {
4736 if let PatternElement::Relationship(r) = element
4737 && r.variable.as_ref().is_none_or(String::is_empty)
4738 {
4739 let temp_var = format!("__path_r_{}", idx);
4740 r.variable = Some(temp_var.clone());
4741 temp_vars.push(temp_var);
4742 }
4743 }
4744 }
4745
4746 (modified, temp_vars)
4747 }
4748
4749 /// Bind path variables in the result row based on the MERGE pattern.
4750 ///
4751 /// Walks each path in the pattern, collects node/edge values from the row
4752 /// by variable name, and constructs a `Value::Path`.
4753 fn bind_path_variables(
4754 pattern: &Pattern,
4755 row: &mut HashMap<String, Value>,
4756 temp_vars: &[String],
4757 ) {
4758 for path in &pattern.paths {
4759 let Some(path_var) = path.variable.as_ref() else {
4760 continue;
4761 };
4762 if path_var.is_empty() {
4763 continue;
4764 }
4765
4766 let mut nodes = Vec::new();
4767 let mut edges = Vec::new();
4768
4769 for element in &path.elements {
4770 match element {
4771 PatternElement::Node(n) => {
4772 if let Some(var) = &n.variable
4773 && let Some(val) = row.get(var)
4774 && let Some(node) = Self::value_to_node_for_path(val)
4775 {
4776 nodes.push(node);
4777 }
4778 }
4779 PatternElement::Relationship(r) => {
4780 if let Some(var) = &r.variable
4781 && let Some(val) = row.get(var)
4782 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4783 {
4784 edges.push(edge);
4785 }
4786 }
4787 _ => {}
4788 }
4789 }
4790
4791 if !nodes.is_empty() {
4792 use uni_common::value::Path;
4793 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4794 }
4795 }
4796
4797 // Clean up internal temp variables
4798 for var in temp_vars {
4799 row.remove(var);
4800 }
4801 }
4802
4803 /// Convert a Value (Map or Node) to a Node for path construction.
4804 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4805 match val {
4806 Value::Node(n) => Some(n.clone()),
4807 Value::Map(map) => {
4808 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4809 let labels = if let Some(Value::List(l)) = map.get("_labels") {
4810 l.iter()
4811 .filter_map(|v| {
4812 if let Value::String(s) = v {
4813 Some(s.clone())
4814 } else {
4815 None
4816 }
4817 })
4818 .collect()
4819 } else {
4820 vec![]
4821 };
4822 let properties: HashMap<String, Value> = map
4823 .iter()
4824 .filter(|(k, _)| !k.starts_with('_'))
4825 .map(|(k, v)| (k.clone(), v.clone()))
4826 .collect();
4827 Some(uni_common::value::Node {
4828 vid,
4829 labels,
4830 properties,
4831 })
4832 }
4833 _ => None,
4834 }
4835 }
4836
4837 /// Convert a Value (Map or Edge) to an Edge for path construction.
4838 fn value_to_edge_for_path(
4839 val: &Value,
4840 type_names: &[String],
4841 ) -> Option<uni_common::value::Edge> {
4842 match val {
4843 Value::Edge(e) => Some(e.clone()),
4844 Value::Map(map) => {
4845 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4846 let edge_type = map
4847 .get("_type_name")
4848 .and_then(|v| {
4849 if let Value::String(s) = v {
4850 Some(s.clone())
4851 } else {
4852 None
4853 }
4854 })
4855 .or_else(|| type_names.first().cloned())
4856 .unwrap_or_default();
4857 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4858 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4859 let properties: HashMap<String, Value> = map
4860 .iter()
4861 .filter(|(k, _)| !k.starts_with('_'))
4862 .map(|(k, v)| (k.clone(), v.clone()))
4863 .collect();
4864 Some(uni_common::value::Edge {
4865 eid,
4866 edge_type,
4867 src,
4868 dst,
4869 properties,
4870 })
4871 }
4872 _ => None,
4873 }
4874 }
4875}
4876
4877/// Read a vertex's full property map, preferring `prefetched` over a fresh
4878/// per-row `Backend::scan`.
4879///
4880/// `prefetched` is built once at the top of `apply_mutations` via
4881/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4882/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4883/// same `apply_mutations` invocation (counter increments, same-VID
4884/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4885/// storage state at SET entry. On a miss, fall back to the existing
4886/// per-row path; this preserves correctness for newly created VIDs,
4887/// schemaless rows, multi-label corner cases, and non-Mutation callers
4888/// that pass `&Prefetch::default()`.
4889pub(crate) async fn read_vertex_props_with_prefetch(
4890 vid: Vid,
4891 prefetched: &Prefetch,
4892 prop_manager: &PropertyManager,
4893 ctx: Option<&QueryContext>,
4894) -> Result<uni_common::Properties> {
4895 match prefetched.vertex.get(&vid).cloned() {
4896 Some(mut base) => {
4897 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4898 for (k, v) in l0 {
4899 base.insert(k, v);
4900 }
4901 }
4902 Ok(base)
4903 }
4904 None => Ok(prop_manager
4905 .get_all_vertex_props_with_ctx(vid, ctx)
4906 .await?
4907 .unwrap_or_default()),
4908 }
4909}
4910
4911/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4912/// in L0 edge props so writes from earlier rows of the same
4913/// `apply_mutations` invocation take precedence. On a miss, fall back to
4914/// the per-EID storage path.
4915pub(crate) async fn read_edge_props_with_prefetch(
4916 eid: Eid,
4917 prefetched: &Prefetch,
4918 prop_manager: &PropertyManager,
4919 ctx: Option<&QueryContext>,
4920) -> Result<uni_common::Properties> {
4921 match prefetched.edge.get(&eid).cloned() {
4922 Some(mut base) => {
4923 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4924 for (k, v) in l0 {
4925 base.insert(k, v);
4926 }
4927 }
4928 Ok(base)
4929 }
4930 None => Ok(prop_manager
4931 .get_all_edge_props_with_ctx(eid, ctx)
4932 .await?
4933 .unwrap_or_default()),
4934 }
4935}
4936
4937#[cfg(test)]
4938mod tests {
4939 use super::*;
4940
4941 // ── merge_props tests ────────────────────────────────────────────
4942
4943 #[test]
4944 fn test_merge_props_replace_tombstones_missing_keys() {
4945 let current: HashMap<String, Value> = [
4946 ("name".into(), Value::String("Alice".into())),
4947 ("age".into(), Value::Int(30)),
4948 ]
4949 .into();
4950 let incoming: HashMap<String, Value> =
4951 [("name".into(), Value::String("Bob".into()))].into();
4952
4953 let result = Executor::merge_props(current, incoming, true);
4954 assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4955 assert_eq!(
4956 result.get("age"),
4957 Some(&Value::Null),
4958 "Missing keys should be tombstoned in replace mode"
4959 );
4960 }
4961
4962 #[test]
4963 fn test_merge_props_merge_preserves_existing() {
4964 let current: HashMap<String, Value> = [
4965 ("name".into(), Value::String("Alice".into())),
4966 ("age".into(), Value::Int(30)),
4967 ]
4968 .into();
4969 let incoming: HashMap<String, Value> =
4970 [("city".into(), Value::String("NYC".into()))].into();
4971
4972 let result = Executor::merge_props(current, incoming, false);
4973 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4974 assert_eq!(result.get("age"), Some(&Value::Int(30)));
4975 assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4976 }
4977
4978 #[test]
4979 fn test_merge_props_null_incoming_is_tombstone() {
4980 let current: HashMap<String, Value> =
4981 [("name".into(), Value::String("Alice".into()))].into();
4982 let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4983
4984 // Merge mode: null overwrites
4985 let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4986 assert_eq!(result.get("name"), Some(&Value::Null));
4987
4988 // Replace mode: null is tombstone
4989 let result = Executor::merge_props(current, incoming, true);
4990 assert_eq!(result.get("name"), Some(&Value::Null));
4991 }
4992
4993 #[test]
4994 fn test_merge_props_empty_current() {
4995 let current: HashMap<String, Value> = HashMap::new();
4996 let incoming: HashMap<String, Value> =
4997 [("name".into(), Value::String("Alice".into()))].into();
4998
4999 let result = Executor::merge_props(current, incoming, false);
5000 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
5001 assert_eq!(result.len(), 1);
5002 }
5003
5004 #[test]
5005 fn test_merge_props_empty_incoming_replace_tombstones_all() {
5006 let current: HashMap<String, Value> = [
5007 ("name".into(), Value::String("Alice".into())),
5008 ("age".into(), Value::Int(30)),
5009 ]
5010 .into();
5011 let incoming: HashMap<String, Value> = HashMap::new();
5012
5013 let result = Executor::merge_props(current, incoming, true);
5014 assert_eq!(result.get("name"), Some(&Value::Null));
5015 assert_eq!(result.get("age"), Some(&Value::Null));
5016 }
5017
5018 // ── extract_labels_from_node tests ───────────────────────────────
5019
5020 #[test]
5021 fn test_extract_labels_from_map() {
5022 let mut map = HashMap::new();
5023 map.insert("_vid".into(), Value::Int(1));
5024 map.insert(
5025 "_labels".into(),
5026 Value::List(vec![
5027 Value::String("Person".into()),
5028 Value::String("Employee".into()),
5029 ]),
5030 );
5031 let val = Value::Map(map);
5032
5033 let labels = Executor::extract_labels_from_node(&val);
5034 assert_eq!(
5035 labels,
5036 Some(vec!["Person".to_string(), "Employee".to_string()])
5037 );
5038 }
5039
5040 #[test]
5041 fn test_extract_labels_from_value_node() {
5042 let node = uni_common::Node {
5043 vid: uni_common::core::id::Vid::from(1u64),
5044 labels: vec!["Person".to_string()],
5045 properties: HashMap::new(),
5046 };
5047 let labels = Executor::extract_labels_from_node(&Value::Node(node));
5048 assert_eq!(labels, Some(vec!["Person".to_string()]));
5049 }
5050
5051 #[test]
5052 fn test_extract_labels_non_node_returns_none() {
5053 assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
5054 assert_eq!(
5055 Executor::extract_labels_from_node(&Value::String("hello".into())),
5056 None
5057 );
5058 }
5059
5060 // ── extract_user_properties_from_value tests ─────────────────────
5061
5062 #[test]
5063 fn test_extract_user_props_strips_internal_keys() {
5064 let mut map = HashMap::new();
5065 map.insert("_vid".into(), Value::Int(1));
5066 map.insert(
5067 "_labels".into(),
5068 Value::List(vec![Value::String("Person".into())]),
5069 );
5070 map.insert("name".into(), Value::String("Alice".into()));
5071 map.insert("age".into(), Value::Int(30));
5072
5073 let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
5074 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
5075 assert_eq!(props.get("age"), Some(&Value::Int(30)));
5076 assert!(!props.contains_key("_vid"));
5077 assert!(!props.contains_key("_labels"));
5078 }
5079
5080 #[test]
5081 fn test_extract_user_props_plain_map_returns_as_is() {
5082 let mut map = HashMap::new();
5083 map.insert("key".into(), Value::String("value".into()));
5084
5085 let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
5086 assert_eq!(props, map);
5087 }
5088
5089 #[test]
5090 fn test_extract_user_props_from_value_node() {
5091 let mut properties = HashMap::new();
5092 properties.insert("name".into(), Value::String("Alice".into()));
5093 let node = uni_common::Node {
5094 vid: uni_common::core::id::Vid::from(1u64),
5095 labels: vec!["Person".to_string()],
5096 properties,
5097 };
5098 let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
5099 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
5100 }
5101}