uni_query/query/executor/write.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17 DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18 SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25/// Canonical, hashable key for a single-node MERGE: the key properties as a
26/// `(name, value)` list sorted by name. Used to group existing vertices by key
27/// for the index fast path (issue #69).
28type MergeKey = Vec<(String, Value)>;
29
30/// Identity fields extracted from a map-encoded edge.
31struct EdgeIdentity {
32 eid: Eid,
33 src: Vid,
34 dst: Vid,
35 edge_type_id: u32,
36}
37
38/// Per-variable accumulator for SetItem::Property items targeting a vertex.
39///
40/// Built lazily on the first SetItem touching each variable, then mutated
41/// in place across subsequent items. Flushed once at end of the SET
42/// clause (or earlier if a non-Property SetItem on the same var lands).
43struct PendingVertexSet {
44 vid: Vid,
45 labels: Vec<String>,
46 /// Full property map (storage union L0 from
47 /// `get_all_vertex_props_with_ctx` plus the touched values applied
48 /// in-order). Flushed to L0 whole; L0's `vertex_partial_keys` set
49 /// tells the flush which columns to send to Lance via MergeInsert.
50 props: HashMap<String, Value>,
51 /// `true` when the SET should flush via the partial-column MergeInsert
52 /// path: set when `UniConfig::partial_lance_writes` is on AND the
53 /// label has no generated columns. Generated-column labels still need
54 /// the full-row Append so the regenerated values land.
55 partial: bool,
56 /// Set of property keys touched by this statement. Threaded into L0
57 /// so the flush emits a `MergeInsertBuilder` source with exactly
58 /// these columns. Empty when `partial == false`.
59 touched: HashSet<String>,
60}
61
62/// Per-variable accumulator for SetItem::Property items targeting an edge.
63struct PendingEdgeSet {
64 src: Vid,
65 dst: Vid,
66 edge_type_id: u32,
67 eid: Eid,
68 edge_type_name: String,
69 /// `true` when the SET should flush via the partial-column
70 /// MergeInsert path on the per-edge-type delta tables (Round 12
71 /// §A). Set when `UniConfig::partial_lance_writes` is on.
72 partial: bool,
73 /// Property keys touched by this statement. Threaded into L0 so
74 /// the flush emits a `MergeInsertBuilder` source with exactly
75 /// these columns. Empty when `partial == false`.
76 touched: HashSet<String>,
77 props: HashMap<String, Value>,
78}
79
80/// Refuse to mutate an ephemeral node (M5g / proposal §4.13.1).
81/// Ephemeral entities are return-only — `Vid::EPHEMERAL_BIT` is set on
82/// any id minted by `host.allocate_transient_id()`.
83fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84 if vid.is_ephemeral() {
85 return Err(anyhow::Error::from(
86 uni_common::UniError::EphemeralWriteAttempt {
87 kind: "node",
88 id: vid.transient_id().unwrap_or(vid.as_u64()),
89 },
90 ));
91 }
92 Ok(())
93}
94
95/// Returns a short variant name for a `Value`, used in type-mismatch error messages.
96fn value_type_name(val: &Value) -> &'static str {
97 match val {
98 Value::Null => "Null",
99 Value::Bool(_) => "Bool",
100 Value::Int(_) => "Int",
101 Value::Float(_) => "Float",
102 Value::String(_) => "String",
103 Value::Bytes(_) => "Bytes",
104 Value::List(_) => "List",
105 Value::Map(_) => "Map",
106 Value::Node(_) => "Node",
107 Value::Edge(_) => "Edge",
108 Value::Path(_) => "Path",
109 Value::Vector(_) => "Vector",
110 Value::Temporal(_) => "Temporal",
111 _ => "value",
112 }
113}
114
115/// Refuse to mutate an ephemeral edge (M5g / proposal §4.13.1).
116fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117 if eid.is_ephemeral() {
118 return Err(anyhow::Error::from(
119 uni_common::UniError::EphemeralWriteAttempt {
120 kind: "edge",
121 id: eid.transient_id().unwrap_or(eid.as_u64()),
122 },
123 ));
124 }
125 Ok(())
126}
127
128/// Reject a write whose target label is currently allocated as a
129/// virtual (catalog-backed) label.
130///
131/// Catalog tables are read-only from the host's perspective — there is
132/// no write-back path through `CatalogTable::scan` to the originating
133/// provider, so silently allowing SET/DELETE would leave ghosted state
134/// on the host side that diverges from the external catalog. The
135/// planner already rejects CREATE/MERGE on virtual labels via
136/// `Planner::reject_virtual_label_writes`; this helper is the
137/// equivalent gate on the runtime write path for SET-label-add and
138/// DELETE.
139///
140/// `op` names the offending operation for the error message (e.g.
141/// `"SET"`, `"DELETE"`).
142///
143/// # Errors
144///
145/// Returns an error if `registry` is `Some` and any name in `labels`
146/// is currently registered as a virtual label. Returns `Ok(())` when
147/// no plugin registry is wired (low-level callers without plugins).
148fn reject_virtual_label_write(
149 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150 labels: &[String],
151 op: &str,
152) -> Result<()> {
153 let Some(registry) = registry else {
154 return Ok(());
155 };
156 for label in labels {
157 if registry.virtual_label_by_name(label).is_some() {
158 return Err(anyhow!(
159 "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160 labels are read-only; write back via the originating catalog instead"
161 ));
162 }
163 }
164 Ok(())
165}
166
167/// Reject a write whose target edge-type ID is currently allocated as
168/// a virtual (catalog-backed) edge type. Runtime analog of
169/// [`reject_virtual_label_write`] for the edge path.
170///
171/// # Errors
172///
173/// Returns an error if `registry` is `Some` and `edge_type_id` resolves
174/// to a registered virtual edge type. Returns `Ok(())` when no plugin
175/// registry is wired.
176fn reject_virtual_edge_type_write(
177 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178 edge_type_id: u32,
179 op: &str,
180) -> Result<()> {
181 let Some(registry) = registry else {
182 return Ok(());
183 };
184 if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185 return Err(anyhow!(
186 "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187 types are read-only; write back via the originating catalog instead",
188 entry.name
189 ));
190 }
191 Ok(())
192}
193
194impl Executor {
195 /// Extracts labels from a node value.
196 ///
197 /// Handles both `Value::Map` (with a `_labels` list field) and
198 /// `Value::Node` (with a `labels` vec field).
199 ///
200 /// Returns `None` when the value is not a node or has no labels.
201 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202 match node_val {
203 Value::Map(map) => {
204 // Map-encoded node: look for _labels array
205 if let Some(Value::List(labels_arr)) = map.get("_labels") {
206 let labels: Vec<String> = labels_arr
207 .iter()
208 .filter_map(|v| v.as_str().map(|s| s.to_string()))
209 .collect();
210 if !labels.is_empty() {
211 return Some(labels);
212 }
213 }
214 None
215 }
216 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217 _ => None,
218 }
219 }
220
221 /// Extracts user-visible properties from a value that represents a node or edge.
222 ///
223 /// Strips internal bookkeeping keys (those prefixed with `_` or named
224 /// `ext_id`) from map-encoded entities and returns only the user-facing
225 /// property key-value pairs.
226 ///
227 /// Returns `None` when `val` is not a map, node, or edge.
228 pub(crate) fn extract_user_properties_from_value(
229 val: &Value,
230 ) -> Option<HashMap<String, Value>> {
231 match val {
232 Value::Map(map) => {
233 // Distinguish entity-encoded maps from plain map literals.
234 // A node map has both `_vid` and `_labels`.
235 // An edge map has `_eid`, `_src`, and `_dst`.
236 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237 let is_edge_map = map.contains_key("_eid")
238 && map.contains_key("_src")
239 && map.contains_key("_dst");
240
241 if is_node_map || is_edge_map {
242 // Filter out internal bookkeeping keys
243 let user_props: HashMap<String, Value> = map
244 .iter()
245 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246 .map(|(k, v)| (k.clone(), v.clone()))
247 .collect();
248 // When mutation output omits dotted property columns, user
249 // properties live inside `_all_props` rather than at the
250 // top level of the entity map.
251 if user_props.is_empty()
252 && let Some(Value::Map(all_props)) = map.get("_all_props")
253 {
254 return Some(all_props.clone());
255 }
256 Some(user_props)
257 } else {
258 // Plain map literal — return as-is
259 Some(map.clone())
260 }
261 }
262 Value::Node(node) => Some(node.properties.clone()),
263 Value::Edge(edge) => Some(edge.properties.clone()),
264 _ => None,
265 }
266 }
267
268 /// Applies a property map to a vertex or edge entity bound to `variable` in `row`.
269 ///
270 /// When `replace` is `true` the entity's property set is replaced: keys absent
271 /// from `new_props` are tombstoned (written as `Value::Null`) so the storage
272 /// layer removes them. When `replace` is `false` the map is merged: keys in
273 /// `new_props` are upserted, while keys absent from `new_props` are unchanged.
274 /// A `Value::Null` entry in `new_props` acts as an explicit tombstone in both
275 /// modes.
276 ///
277 /// Labels are never altered — the spec states that `SET n = map` replaces
278 /// properties only.
279 ///
280 /// # Errors
281 ///
282 /// Returns an error if the entity cannot be found in the storage layer, or
283 /// if the writer fails to persist the updated properties.
284 #[expect(clippy::too_many_arguments)]
285 async fn apply_properties_to_entity(
286 &self,
287 variable: &str,
288 new_props: HashMap<String, Value>,
289 replace: bool,
290 row: &mut HashMap<String, Value>,
291 writer: &Writer,
292 prop_manager: &PropertyManager,
293 params: &HashMap<String, Value>,
294 ctx: Option<&QueryContext>,
295 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296 prefetched: &Prefetch,
297 ) -> Result<()> {
298 // Clone the target so we can hold &row references elsewhere.
299 let target = row.get(variable).cloned();
300
301 // Declared-type guard for the whole-entity `SET n = map` / `SET n += map`
302 // forms, mirroring the per-property SET path (issue #68).
303 let schema = self.storage.schema_manager().schema();
304
305 match target {
306 Some(Value::Node(ref node)) => {
307 let vid = node.vid;
308 let labels = node.labels.clone();
309 let current =
310 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311 let write_props = Self::merge_props(current, new_props, replace);
312 let mut enriched = write_props.clone();
313 for label_name in &labels {
314 self.enrich_properties_with_generated_columns(
315 label_name,
316 &mut enriched,
317 prop_manager,
318 params,
319 ctx,
320 )
321 .await?;
322 }
323 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324 let _ = writer
325 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326 .await?;
327 // Update the in-memory row binding
328 if let Some(Value::Node(n)) = row.get_mut(variable) {
329 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330 }
331 }
332 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333 let vid = Self::vid_from_value(node_val)?;
334 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335 let current =
336 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337 let write_props = Self::merge_props(current, new_props, replace);
338 let mut enriched = write_props.clone();
339 for label_name in &labels {
340 self.enrich_properties_with_generated_columns(
341 label_name,
342 &mut enriched,
343 prop_manager,
344 params,
345 ctx,
346 )
347 .await?;
348 }
349 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350 let _ = writer
351 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352 .await?;
353 // Update the in-memory map-encoded node binding
354 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355 // Remove old user property keys, keep internal fields
356 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357 // Build effective (non-null) properties
358 let effective: HashMap<String, Value> =
359 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360 for (k, v) in &effective {
361 node_map.insert(k.clone(), v.clone());
362 }
363 // Replace _all_props to reflect the complete property set
364 node_map.insert("_all_props".to_string(), Value::Map(effective));
365 }
366 }
367 Some(Value::Edge(ref edge)) => {
368 let eid = edge.eid;
369 let src = edge.src;
370 let dst = edge.dst;
371 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372 let current =
373 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374 let write_props = Self::merge_props(current, new_props, replace);
375 let write_props = Self::coerce_and_validate_props(
376 write_props,
377 &schema,
378 std::slice::from_ref(&edge.edge_type),
379 )?;
380 writer
381 .insert_edge(
382 src,
383 dst,
384 etype,
385 eid,
386 write_props.clone(),
387 Some(edge.edge_type.clone()),
388 tx_l0,
389 )
390 .await?;
391 // Update the in-memory row binding
392 if let Some(Value::Edge(e)) = row.get_mut(variable) {
393 e.properties = write_props
394 .into_iter()
395 .filter(|(_, v)| !v.is_null())
396 .collect();
397 }
398 }
399 Some(Value::Map(ref map))
400 if map.contains_key("_eid")
401 && map.contains_key("_src")
402 && map.contains_key("_dst") =>
403 {
404 let ei = self.extract_edge_identity(map)?;
405 let current =
406 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407 let write_props = Self::merge_props(current, new_props, replace);
408 let edge_type_name = map
409 .get("_type")
410 .and_then(|v| v.as_str())
411 .map(|s| s.to_string())
412 .or_else(|| {
413 self.storage
414 .schema_manager()
415 .edge_type_name_by_id_unified(ei.edge_type_id)
416 });
417 let write_props = match &edge_type_name {
418 Some(name) => Self::coerce_and_validate_props(
419 write_props,
420 &schema,
421 std::slice::from_ref(name),
422 )?,
423 None => write_props,
424 };
425 writer
426 .insert_edge(
427 ei.src,
428 ei.dst,
429 ei.edge_type_id,
430 ei.eid,
431 write_props.clone(),
432 edge_type_name,
433 tx_l0,
434 )
435 .await?;
436 // Update the in-memory map-encoded edge binding
437 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438 edge_map.retain(|k, _| k.starts_with('_'));
439 let effective: HashMap<String, Value> = write_props
440 .into_iter()
441 .filter(|(_, v)| !v.is_null())
442 .collect();
443 for (k, v) in &effective {
444 edge_map.insert(k.clone(), v.clone());
445 }
446 // Replace _all_props to reflect the complete property set
447 edge_map.insert("_all_props".to_string(), Value::Map(effective));
448 }
449 }
450 _ => {
451 // No matching entity — nothing to do (caller already guarded against Null)
452 }
453 }
454 Ok(())
455 }
456
457 /// Computes the property map to write given current storage state and the
458 /// incoming change map.
459 ///
460 /// When `replace` is `true`, keys present in `current` but absent from
461 /// `incoming` are tombstoned with `Value::Null`. Null values inside
462 /// `incoming` are always preserved as explicit tombstones.
463 ///
464 /// When `replace` is `false`, `current` is the base and `incoming` is
465 /// merged on top: each key in `incoming` overwrites or tombstones the
466 /// corresponding entry in `current`.
467 fn merge_props(
468 current: HashMap<String, Value>,
469 incoming: HashMap<String, Value>,
470 replace: bool,
471 ) -> HashMap<String, Value> {
472 if replace {
473 // Start from the non-null incoming entries only.
474 let mut result: HashMap<String, Value> = incoming
475 .iter()
476 .filter(|(_, v)| !v.is_null())
477 .map(|(k, v)| (k.clone(), v.clone()))
478 .collect();
479 // Tombstone every current key that is absent from incoming OR explicitly
480 // set to null in incoming (both mean "delete this property").
481 for k in current.keys() {
482 if incoming.get(k).is_none_or(|v| v.is_null()) {
483 result.insert(k.clone(), Value::Null);
484 }
485 }
486 result
487 } else {
488 // Merge: start from current and apply incoming on top
489 let mut result = current;
490 result.extend(incoming);
491 result
492 }
493 }
494
495 /// Extract edge identity fields (`_eid`, `_src`, `_dst`, `_type`) from a map.
496 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497 let eid = Eid::from(
498 map.get("_eid")
499 .and_then(|v| v.as_u64())
500 .ok_or_else(|| anyhow!("Invalid _eid"))?,
501 );
502 let src = Vid::from(
503 map.get("_src")
504 .and_then(|v| v.as_u64())
505 .ok_or_else(|| anyhow!("Invalid _src"))?,
506 );
507 let dst = Vid::from(
508 map.get("_dst")
509 .and_then(|v| v.as_u64())
510 .ok_or_else(|| anyhow!("Invalid _dst"))?,
511 );
512 let edge_type_id = self.resolve_edge_type_id(
513 map.get("_type")
514 .or_else(|| map.get("_type_name"))
515 .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516 )?;
517 Ok(EdgeIdentity {
518 eid,
519 src,
520 dst,
521 edge_type_id,
522 })
523 }
524
525 /// Resolve edge type ID from a Value, supporting both Int and String representations.
526 /// DataFusion traverse stores _type as String("KNOWS"), while write operations need u32 ID.
527 ///
528 /// For String values, uses get_or_assign_edge_type_id to support schemaless edge types
529 /// (assigns new ID if not found). This is critical for MERGE ... ON CREATE SET scenarios
530 /// where the edge type was just created and may not be in the read-only lookup yet.
531 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532 match type_val {
533 Value::Int(i) => Ok(*i as u32),
534 Value::String(name) => {
535 if self.config.strict_schema {
536 let schema = self.storage.schema_manager().schema();
537 schema
538 .edge_type_id_by_name_case_insensitive(name)
539 .ok_or_else(|| {
540 anyhow!(
541 "Edge type '{}' is not defined in the schema \
542 (strict_schema is enabled). \
543 Declare it with db.schema().edge_type(...).apply() first.",
544 name
545 )
546 })
547 } else {
548 // Schemaless: assign new ID if not found in schema or registry.
549 Ok(self
550 .storage
551 .schema_manager()
552 .get_or_assign_edge_type_id(name))
553 }
554 }
555 _ => Err(anyhow!(
556 "Invalid _type value: expected Int or String, got {:?}",
557 type_val
558 )),
559 }
560 }
561
562 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563 if let Some(writer_arc) = &self.writer {
564 // Flush first while holding the lock
565 {
566 let writer: &uni_store::Writer = writer_arc.as_ref();
567 writer.flush_to_l1(None).await?;
568 } // Drop lock before compacting to avoid blocking reads/writes
569
570 // Compaction can run without holding the writer lock
571 let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572 let compaction_results = compactor.compact_all().await?;
573
574 // Re-warm adjacency manager for compacted edge types to sync in-memory CSR with new L2 storage
575 let am = self.storage.adjacency_manager();
576 let schema = self.storage.schema_manager().schema();
577 for info in compaction_results {
578 // Convert string direction to Direction enum
579 let direction = match info.direction.as_str() {
580 "fwd" => uni_store::storage::direction::Direction::Outgoing,
581 "bwd" => uni_store::storage::direction::Direction::Incoming,
582 _ => continue,
583 };
584
585 // Get edge_type_id
586 if let Some(edge_type_id) =
587 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588 {
589 // Re-warm from storage (clears old CSR, loads new L2 + L1 delta)
590 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591 }
592 }
593 }
594 Ok(())
595 }
596
597 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598 if let Some(writer_arc) = &self.writer {
599 let writer: &uni_store::Writer = writer_arc.as_ref();
600 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601 }
602 Ok(())
603 }
604
605 pub(crate) async fn execute_copy_to(
606 &self,
607 identifier: &str,
608 path: &str,
609 format: &str,
610 options: &HashMap<String, Value>,
611 ) -> Result<usize> {
612 // Check schema to determine if identifier is an edge type or vertex label
613 let schema = self.storage.schema_manager().schema();
614
615 // Try as edge type first
616 if schema.get_edge_type_case_insensitive(identifier).is_some() {
617 return self
618 .export_edge_type_in_format(identifier, path, format)
619 .await;
620 }
621
622 // Try as vertex label
623 if schema.get_label_case_insensitive(identifier).is_some() {
624 return self
625 .export_vertex_label_in_format(identifier, path, format, options)
626 .await;
627 }
628
629 // Neither edge type nor vertex label found
630 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631 }
632
633 async fn export_vertex_label_in_format(
634 &self,
635 label: &str,
636 path: &str,
637 format: &str,
638 _options: &HashMap<String, Value>,
639 ) -> Result<usize> {
640 match format {
641 "parquet" => self.export_vertex_label(label, path).await,
642 "csv" => {
643 let mut stream = self
644 .storage
645 .scan_vertex_table_stream(label)
646 .await?
647 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649 // Collect all batches
650 let mut all_rows = Vec::new();
651 let mut column_names = Vec::new();
652
653 // Iterate stream using StreamExt
654 use futures::StreamExt;
655 while let Some(batch_result) = stream.next().await {
656 let batch = batch_result?;
657
658 // Get column names from first batch
659 if column_names.is_empty() {
660 column_names = batch
661 .schema()
662 .fields()
663 .iter()
664 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665 .map(|f| f.name().clone())
666 .collect();
667 }
668
669 // Convert batch to rows
670 for row_idx in 0..batch.num_rows() {
671 let mut row = Vec::new();
672 for field in batch.schema().fields() {
673 if field.name().starts_with('_') || field.name() == "ext_id" {
674 continue;
675 }
676
677 let col_idx = batch.schema().index_of(field.name())?;
678 let column = batch.column(col_idx);
679 let value = self.arrow_value_to_json(column, row_idx)?;
680
681 // Convert value to CSV string
682 let csv_value = match value {
683 Value::Null => String::new(),
684 Value::Bool(b) => b.to_string(),
685 Value::Int(i) => i.to_string(),
686 Value::Float(f) => f.to_string(),
687 Value::String(s) => s,
688 _ => format!("{value}"),
689 };
690 row.push(csv_value);
691 }
692 all_rows.push(row);
693 }
694 }
695
696 // Write CSV
697 let file = std::fs::File::create(path)?;
698 let mut wtr = csv::Writer::from_writer(file);
699
700 // Write headers
701 log::debug!("CSV export headers: {:?}", column_names);
702 wtr.write_record(&column_names)?;
703
704 // Write rows
705 for (i, row) in all_rows.iter().enumerate() {
706 log::debug!("CSV export row {}: {:?}", i, row);
707 wtr.write_record(row)?;
708 }
709
710 wtr.flush()?;
711 Ok(all_rows.len())
712 }
713 _ => Err(anyhow!(
714 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715 format
716 )),
717 }
718 }
719
720 async fn export_edge_type_in_format(
721 &self,
722 edge_type: &str,
723 path: &str,
724 format: &str,
725 ) -> Result<usize> {
726 match format {
727 "parquet" => self.export_edge_type(edge_type, path).await,
728 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729 _ => Err(anyhow!(
730 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731 format
732 )),
733 }
734 }
735
736 /// Write a stream of record batches to a Parquet file.
737 /// Returns the total number of rows written, or 0 if the stream is empty.
738 async fn write_batches_to_parquet(
739 mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740 path: &str,
741 entity_description: &str,
742 ) -> Result<usize> {
743 use futures::TryStreamExt;
744
745 // Get first batch to determine schema and create writer
746 let first_batch = match stream.try_next().await? {
747 Some(batch) => batch,
748 None => {
749 log::info!("No data to export from {}", entity_description);
750 return Ok(0);
751 }
752 };
753
754 // Create Parquet writer using schema from first batch
755 let file = std::fs::File::create(path)?;
756 let arrow_schema = first_batch.schema();
757 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759 // Write first batch
760 let mut count = first_batch.num_rows();
761 writer.write(&first_batch)?;
762
763 // Write remaining batches
764 while let Some(batch) = stream.try_next().await? {
765 count += batch.num_rows();
766 writer.write(&batch)?;
767 }
768
769 writer.close()?;
770
771 log::info!(
772 "Exported {} rows from {} to '{}'",
773 count,
774 entity_description,
775 path
776 );
777 Ok(count)
778 }
779
780 /// Export vertices of a specific label to Parquet
781 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782 let stream = self
783 .storage
784 .scan_vertex_table_stream(label)
785 .await?
786 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789 }
790
791 /// Export edges of a specific type to Parquet
792 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793 let schema = self.storage.schema_manager().schema();
794 if !schema.edge_types.contains_key(edge_type) {
795 return Err(anyhow!("Edge type '{}' not found", edge_type));
796 }
797
798 let filter = format!("type = '{}'", edge_type);
799 let stream = self
800 .storage
801 .scan_main_edge_table_stream(Some(&filter))
802 .await?
803 .ok_or_else(|| anyhow!("No edge data found"))?;
804
805 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806 }
807
808 pub(crate) async fn execute_copy_from(
809 &self,
810 label: &str,
811 path: &str,
812 format: &str,
813 options: &HashMap<String, Value>,
814 ) -> Result<usize> {
815 // Read data from file
816 let batches = match format {
817 "parquet" => self.read_parquet_file(path)?,
818 "csv" => self.read_csv_file(path, label, options)?,
819 _ => {
820 return Err(anyhow!(
821 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822 format
823 ));
824 }
825 };
826
827 // Get writer
828 let writer_arc = self
829 .writer
830 .as_ref()
831 .ok_or_else(|| anyhow!("No writer available"))?;
832
833 let db_schema = self.storage.schema_manager().schema();
834
835 // Check if this is a label (vertex) or edge type
836 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838 if is_edge {
839 // Import edges
840 let edge_type_id = db_schema
841 .edge_type_id_by_name(label)
842 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844 // Get src and dst column names from options
845 let src_col = options
846 .get("src_col")
847 .and_then(|v| v.as_str())
848 .unwrap_or("src");
849 let dst_col = options
850 .get("dst_col")
851 .and_then(|v| v.as_str())
852 .unwrap_or("dst");
853
854 // §5.7 of concurrent_writer.md: writer is hoisted above the row
855 // loop now that there is no per-row lock acquisition cost.
856 let writer: &uni_store::Writer = writer_arc.as_ref();
857 let mut total_rows = 0;
858 for batch in batches {
859 let num_rows = batch.num_rows();
860 // Pre-allocate one EID per row in one IdAllocator mutex acquisition.
861 let eids = writer.allocate_eids(num_rows).await?;
862
863 for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864 let mut properties = HashMap::new();
865 let mut src_vid: Option<Vid> = None;
866 let mut dst_vid: Option<Vid> = None;
867
868 // Extract properties and VIDs from each column
869 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870 let col_name = field.name();
871 let column = batch.column(col_idx);
872 let value = self.arrow_value_to_json(column, row_idx)?;
873
874 if col_name == src_col {
875 let raw = value.as_u64().unwrap_or_else(|| {
876 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877 });
878 src_vid = Some(Vid::new(raw));
879 } else if col_name == dst_col {
880 let raw = value.as_u64().unwrap_or_else(|| {
881 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882 });
883 dst_vid = Some(Vid::new(raw));
884 } else if !col_name.starts_with('_') && !value.is_null() {
885 properties.insert(col_name.clone(), value);
886 }
887 }
888
889 let src = src_vid
890 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891 let dst = dst_vid
892 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894 writer
895 .insert_edge(
896 src,
897 dst,
898 edge_type_id,
899 eid,
900 properties,
901 Some(label.to_string()),
902 None,
903 )
904 .await?;
905
906 total_rows += 1;
907 }
908 }
909
910 log::info!(
911 "Imported {} edge rows from '{}' into edge type '{}'",
912 total_rows,
913 path,
914 label
915 );
916
917 // Flush to persist edges
918 if total_rows > 0 {
919 writer.flush_to_l1(None).await?;
920 }
921
922 Ok(total_rows)
923 } else {
924 // Import vertices
925 // Validate the label exists in schema
926 db_schema
927 .label_id_by_name_case_insensitive(label)
928 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930 // §5.7 of concurrent_writer.md: writer is hoisted above the row
931 // loop now that there is no per-row lock acquisition cost.
932 let writer: &uni_store::Writer = writer_arc.as_ref();
933 let mut total_rows = 0;
934 for batch in batches {
935 let num_rows = batch.num_rows();
936 // Pre-allocate one VID per row in one IdAllocator mutex acquisition.
937 let vids = writer.allocate_vids(num_rows).await?;
938
939 // Convert Arrow batch to rows
940 for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941 let mut properties = HashMap::new();
942
943 // Extract properties from each column
944 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945 let col_name = field.name();
946
947 // Skip internal columns
948 if col_name.starts_with('_') {
949 continue;
950 }
951
952 let column = batch.column(col_idx);
953 let value = self.arrow_value_to_json(column, row_idx)?;
954
955 if !value.is_null() {
956 properties.insert(col_name.clone(), value);
957 }
958 }
959
960 let _ = writer
961 .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962 .await?;
963
964 total_rows += 1;
965 }
966 }
967
968 log::info!(
969 "Imported {} rows from '{}' into label '{}'",
970 total_rows,
971 path,
972 label
973 );
974
975 // Flush to persist vertices
976 if total_rows > 0 {
977 writer.flush_to_l1(None).await?;
978 }
979
980 Ok(total_rows)
981 }
982 }
983
984 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985 use arrow_array::Array;
986 use arrow_schema::DataType as ArrowDataType;
987
988 if column.is_null(row_idx) {
989 return Ok(Value::Null);
990 }
991
992 match column.data_type() {
993 ArrowDataType::Utf8 => {
994 let array = column
995 .as_any()
996 .downcast_ref::<arrow_array::StringArray>()
997 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998 Ok(Value::String(array.value(row_idx).to_string()))
999 }
1000 ArrowDataType::Int32 => {
1001 let array = column
1002 .as_any()
1003 .downcast_ref::<arrow_array::Int32Array>()
1004 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005 Ok(Value::Int(array.value(row_idx) as i64))
1006 }
1007 ArrowDataType::Int64 => {
1008 let array = column
1009 .as_any()
1010 .downcast_ref::<arrow_array::Int64Array>()
1011 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012 Ok(Value::Int(array.value(row_idx)))
1013 }
1014 ArrowDataType::Float32 => {
1015 let array = column
1016 .as_any()
1017 .downcast_ref::<arrow_array::Float32Array>()
1018 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019 Ok(Value::Float(array.value(row_idx) as f64))
1020 }
1021 ArrowDataType::Float64 => {
1022 let array = column
1023 .as_any()
1024 .downcast_ref::<arrow_array::Float64Array>()
1025 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026 Ok(Value::Float(array.value(row_idx)))
1027 }
1028 ArrowDataType::Boolean => {
1029 let array = column
1030 .as_any()
1031 .downcast_ref::<arrow_array::BooleanArray>()
1032 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033 Ok(Value::Bool(array.value(row_idx)))
1034 }
1035 ArrowDataType::UInt64 => {
1036 let array = column
1037 .as_any()
1038 .downcast_ref::<arrow_array::UInt64Array>()
1039 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040 Ok(Value::Int(array.value(row_idx) as i64))
1041 }
1042 _ => {
1043 // For other types, try to convert to string
1044 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045 if let Some(arr) = array {
1046 Ok(Value::String(arr.value(row_idx).to_string()))
1047 } else {
1048 Ok(Value::Null)
1049 }
1050 }
1051 }
1052 }
1053
1054 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055 let file = std::fs::File::open(path)?;
1056 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057 .build()?;
1058 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059 }
1060
1061 fn read_csv_file(
1062 &self,
1063 path: &str,
1064 label: &str,
1065 options: &HashMap<String, Value>,
1066 ) -> Result<Vec<arrow_array::RecordBatch>> {
1067 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069 use std::sync::Arc;
1070
1071 // Parse CSV options
1072 let has_headers = options
1073 .get("headers")
1074 .and_then(|v| v.as_bool())
1075 .unwrap_or(true);
1076
1077 // Read CSV file
1078 let file = std::fs::File::open(path)?;
1079 let mut rdr = csv::ReaderBuilder::new()
1080 .has_headers(has_headers)
1081 .from_reader(file);
1082
1083 // Get schema for type conversion
1084 let db_schema = self.storage.schema_manager().schema();
1085 let properties = db_schema.properties.get(label);
1086
1087 // Collect all rows first to determine schema
1088 let mut rows: Vec<Vec<String>> = Vec::new();
1089 let headers: Vec<String> = if has_headers {
1090 rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091 } else {
1092 Vec::new()
1093 };
1094
1095 for result in rdr.records() {
1096 let record = result?;
1097 rows.push(record.iter().map(|s| s.to_string()).collect());
1098 }
1099
1100 if rows.is_empty() {
1101 return Ok(Vec::new());
1102 }
1103
1104 // Build Arrow schema with proper types based on DB schema
1105 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106 let col_names: Vec<String> = if has_headers {
1107 headers
1108 } else {
1109 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110 };
1111
1112 for name in &col_names {
1113 let arrow_type = if let Some(props) = properties {
1114 if let Some(prop_meta) = props.get(name) {
1115 match prop_meta.r#type {
1116 DataType::Int32 => ArrowDataType::Int32,
1117 DataType::Int64 => ArrowDataType::Int64,
1118 DataType::Float32 => ArrowDataType::Float32,
1119 DataType::Float64 => ArrowDataType::Float64,
1120 DataType::Bool => ArrowDataType::Boolean,
1121 _ => ArrowDataType::Utf8,
1122 }
1123 } else {
1124 ArrowDataType::Utf8
1125 }
1126 } else {
1127 ArrowDataType::Utf8
1128 };
1129 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130 }
1131
1132 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134 // Convert rows to Arrow arrays with proper types
1135 let mut columns: Vec<ArrayRef> = Vec::new();
1136 for (col_idx, field) in arrow_fields.iter().enumerate() {
1137 match field.data_type() {
1138 ArrowDataType::Int32 => {
1139 let values: Vec<Option<i32>> = rows
1140 .iter()
1141 .map(|row| {
1142 if col_idx < row.len() {
1143 row[col_idx].parse().ok()
1144 } else {
1145 None
1146 }
1147 })
1148 .collect();
1149 columns.push(Arc::new(Int32Array::from(values)));
1150 }
1151 _ => {
1152 // Default to string
1153 let values: Vec<Option<String>> = rows
1154 .iter()
1155 .map(|row| {
1156 if col_idx < row.len() {
1157 Some(row[col_idx].clone())
1158 } else {
1159 None
1160 }
1161 })
1162 .collect();
1163 columns.push(Arc::new(StringArray::from(values)));
1164 }
1165 }
1166 }
1167
1168 let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169 Ok(vec![batch])
1170 }
1171
1172 fn parse_data_type(type_str: &str) -> Result<DataType> {
1173 use uni_common::core::schema::{CrdtType, PointType};
1174 let type_str = type_str.to_lowercase();
1175 let type_str = type_str.trim();
1176 match type_str {
1177 "string" | "text" | "varchar" => Ok(DataType::String),
1178 "int" | "integer" | "int32" => Ok(DataType::Int32),
1179 "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180 "float" | "float32" | "real" => Ok(DataType::Float32),
1181 "double" | "float64" => Ok(DataType::Float64),
1182 "bool" | "boolean" => Ok(DataType::Bool),
1183 "timestamp" => Ok(DataType::Timestamp),
1184 "date" => Ok(DataType::Date),
1185 "time" => Ok(DataType::Time),
1186 "datetime" => Ok(DataType::DateTime),
1187 "duration" => Ok(DataType::Duration),
1188 "btic" => Ok(DataType::Btic),
1189 "json" | "jsonb" => Ok(DataType::CypherValue),
1190 "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194 s if s.starts_with("vector(") && s.ends_with(')') => {
1195 let dims_str = &s[7..s.len() - 1];
1196 let dimensions = dims_str
1197 .parse::<usize>()
1198 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199 Ok(DataType::Vector { dimensions })
1200 }
1201 s if s.starts_with("list<") && s.ends_with('>') => {
1202 let inner_type_str = &s[5..s.len() - 1];
1203 let inner_type = Self::parse_data_type(inner_type_str)?;
1204 Ok(DataType::List(Box::new(inner_type)))
1205 }
1206 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1207 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1208 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1209 }
1210 }
1211
1212 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1213 let sm = self.storage.schema_manager_arc();
1214 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1215 return Ok(());
1216 }
1217 sm.add_label_with_desc(&clause.name, clause.description)?;
1218 for prop in clause.properties {
1219 let dt = Self::parse_data_type(&prop.data_type)?;
1220 sm.add_property_with_desc(
1221 &clause.name,
1222 &prop.name,
1223 dt,
1224 prop.nullable,
1225 prop.description,
1226 )?;
1227 if prop.unique {
1228 let constraint = Constraint {
1229 name: format!("{}_{}_unique", clause.name, prop.name),
1230 constraint_type: ConstraintType::Unique {
1231 properties: vec![prop.name],
1232 },
1233 target: ConstraintTarget::Label(clause.name.clone()),
1234 enabled: true,
1235 };
1236 sm.add_constraint(constraint)?;
1237 }
1238 }
1239 sm.save().await?;
1240 Ok(())
1241 }
1242
1243 /// True if `key` is a generated property on any of the given labels.
1244 /// Used by the partial-write flush path (Round 12 §C) to decide
1245 /// whether the property should be added to `touched_keys` so that
1246 /// Lance MergeInsert sends the recomputed value.
1247 fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1248 let schema = self.storage.schema_manager().schema();
1249 for label in labels {
1250 if let Some(props_meta) = schema.properties.get(label)
1251 && let Some(meta) = props_meta.get(key)
1252 && meta.generation_expression.is_some()
1253 {
1254 return true;
1255 }
1256 }
1257 false
1258 }
1259
1260 pub(crate) async fn enrich_properties_with_generated_columns(
1261 &self,
1262 label_name: &str,
1263 properties: &mut HashMap<String, Value>,
1264 prop_manager: &PropertyManager,
1265 params: &HashMap<String, Value>,
1266 ctx: Option<&QueryContext>,
1267 ) -> Result<()> {
1268 let schema = self.storage.schema_manager().schema();
1269
1270 if let Some(props_meta) = schema.properties.get(label_name) {
1271 let mut generators = Vec::new();
1272 for (prop_name, meta) in props_meta {
1273 if let Some(expr_str) = &meta.generation_expression {
1274 generators.push((prop_name.clone(), expr_str.clone()));
1275 }
1276 }
1277
1278 for (prop_name, expr_str) in generators {
1279 let cache_key = (label_name.to_string(), prop_name.clone());
1280 let expr = {
1281 let cache = self.gen_expr_cache.read().await;
1282 cache.get(&cache_key).cloned()
1283 };
1284
1285 let expr = match expr {
1286 Some(e) => e,
1287 None => {
1288 let parsed = uni_cypher::parse_expression(&expr_str)
1289 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1290 let mut cache = self.gen_expr_cache.write().await;
1291 cache.insert(cache_key, parsed.clone());
1292 parsed
1293 }
1294 };
1295
1296 let mut scope = HashMap::new();
1297
1298 // If expression has an explicit variable, use it as an object
1299 if let Some(var) = expr.extract_variable() {
1300 scope.insert(var, Value::Map(properties.clone()));
1301 } else {
1302 // No explicit variable - add properties directly to scope for bare references
1303 // e.g., "lower(email)" can reference "email" directly
1304 for (k, v) in properties.iter() {
1305 scope.insert(k.clone(), v.clone());
1306 }
1307 }
1308
1309 let val = self
1310 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1311 .await?;
1312 properties.insert(prop_name, val);
1313 }
1314 }
1315 Ok(())
1316 }
1317
1318 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1319 let sm = self.storage.schema_manager_arc();
1320 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1321 return Ok(());
1322 }
1323 sm.add_edge_type_with_desc(
1324 &clause.name,
1325 clause.src_labels,
1326 clause.dst_labels,
1327 clause.description,
1328 )?;
1329 for prop in clause.properties {
1330 let dt = Self::parse_data_type(&prop.data_type)?;
1331 sm.add_property_with_desc(
1332 &clause.name,
1333 &prop.name,
1334 dt,
1335 prop.nullable,
1336 prop.description,
1337 )?;
1338 }
1339 sm.save().await?;
1340 Ok(())
1341 }
1342
1343 /// Executes an ALTER action on a schema entity.
1344 ///
1345 /// This is a shared helper for both `execute_alter_label` and
1346 /// `execute_alter_edge_type` since they have identical logic.
1347 pub(crate) async fn execute_alter_entity(
1348 sm: &Arc<SchemaManager>,
1349 entity_name: &str,
1350 action: AlterAction,
1351 ) -> Result<()> {
1352 match action {
1353 AlterAction::AddProperty(prop) => {
1354 let dt = Self::parse_data_type(&prop.data_type)?;
1355 sm.add_property_with_desc(
1356 entity_name,
1357 &prop.name,
1358 dt,
1359 prop.nullable,
1360 prop.description,
1361 )?;
1362 }
1363 AlterAction::DropProperty(prop_name) => {
1364 sm.drop_property(entity_name, &prop_name)?;
1365 }
1366 AlterAction::RenameProperty { old_name, new_name } => {
1367 sm.rename_property(entity_name, &old_name, &new_name)?;
1368 }
1369 AlterAction::SetDescription(desc) => {
1370 if sm.schema().labels.contains_key(entity_name) {
1371 sm.set_label_description(entity_name, desc)?;
1372 } else {
1373 sm.set_edge_type_description(entity_name, desc)?;
1374 }
1375 }
1376 AlterAction::SetPropertyDescription {
1377 property,
1378 description,
1379 } => {
1380 sm.set_property_description(entity_name, &property, description)?;
1381 }
1382 }
1383 sm.save().await?;
1384 Ok(())
1385 }
1386
1387 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1388 Self::execute_alter_entity(
1389 &self.storage.schema_manager_arc(),
1390 &clause.name,
1391 clause.action,
1392 )
1393 .await
1394 }
1395
1396 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1397 Self::execute_alter_entity(
1398 &self.storage.schema_manager_arc(),
1399 &clause.name,
1400 clause.action,
1401 )
1402 .await
1403 }
1404
1405 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1406 let sm = self.storage.schema_manager_arc();
1407 sm.drop_label(&clause.name, clause.if_exists)?;
1408 sm.save().await?;
1409 Ok(())
1410 }
1411
1412 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1413 let sm = self.storage.schema_manager_arc();
1414 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1415 sm.save().await?;
1416 Ok(())
1417 }
1418
1419 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1420 let sm = self.storage.schema_manager_arc();
1421 let target = ConstraintTarget::Label(clause.label);
1422 let c_type = match clause.constraint_type {
1423 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1424 properties: clause.properties,
1425 },
1426 AstConstraintType::Exists => {
1427 let property = clause
1428 .properties
1429 .into_iter()
1430 .next()
1431 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1432 ConstraintType::Exists { property }
1433 }
1434 AstConstraintType::Check => {
1435 let expression = clause
1436 .expression
1437 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1438 ConstraintType::Check {
1439 expression: expression.to_string_repr(),
1440 }
1441 }
1442 };
1443
1444 let constraint = Constraint {
1445 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1446 constraint_type: c_type,
1447 target,
1448 enabled: true,
1449 };
1450
1451 sm.add_constraint(constraint)?;
1452 sm.save().await?;
1453 Ok(())
1454 }
1455
1456 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1457 let sm = self.storage.schema_manager_arc();
1458 sm.drop_constraint(&clause.name, false)?;
1459 sm.save().await?;
1460 Ok(())
1461 }
1462
1463 /// Detects the single-node, single-label MERGE shape the fast path serves.
1464 ///
1465 /// Returns the node pattern and its label when `pattern` is one path with
1466 /// one node element, exactly one label, and a static map-literal property
1467 /// set — the shape [`Self::execute_merge_row_indexed`] can serve without
1468 /// per-row query planning. The keys do NOT need to be indexed: the persisted
1469 /// lookup degrades to a (single, filtered) label scan when no scalar index
1470 /// exists, which is still far cheaper than building a `LogicalPlan` per row.
1471 /// Any other shape (edges, multiple labels, non-literal properties) returns
1472 /// `None` so the caller uses the general per-row path.
1473 fn merge_single_node_fastpath<'p>(
1474 &self,
1475 pattern: &'p Pattern,
1476 ) -> Option<(&'p NodePattern, String)> {
1477 if pattern.paths.len() != 1 {
1478 return None;
1479 }
1480 let path = &pattern.paths[0];
1481 if path.elements.len() != 1 {
1482 return None;
1483 }
1484 let PatternElement::Node(n) = &path.elements[0] else {
1485 return None;
1486 };
1487 let labels = n.labels.names();
1488 if labels.len() != 1 {
1489 return None;
1490 }
1491 // The key must be a static map literal so the key names are known.
1492 let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1493 return None;
1494 };
1495 if entries.is_empty() {
1496 return None;
1497 }
1498 // Resolve the label to its schema-canonical case so the fast path agrees
1499 // with the general MERGE path (which matches labels case-insensitively).
1500 // Without this, `MERGE (:person …)` after a `:Person` row was flushed
1501 // scans/keys a different label than the canonical one and creates a
1502 // duplicate (review #3a). Falls back to the as-written label when the
1503 // schema does not know it (schemaless).
1504 let canonical = self
1505 .storage
1506 .schema_manager()
1507 .schema()
1508 .canonical_label_name(&labels[0])
1509 .unwrap_or_else(|| labels[0].clone());
1510 Some((n, canonical))
1511 }
1512
1513 /// Build the persisted-scan filter for a MERGE key, or `None` if any value
1514 /// is not a scalar this fast path can represent.
1515 ///
1516 /// Returning `None` makes the caller fall back to the general per-row path,
1517 /// so unusual key value types (lists, maps, temporals, nulls) are never
1518 /// silently mis-matched. The `_deleted = false` clause mirrors the
1519 /// persisted-read predicate used elsewhere; the version high-water-mark
1520 /// clause is added by [`uni_store::StorageManager::scan_vertex_table`].
1521 fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1522 if key_props.is_empty() {
1523 return None;
1524 }
1525 let mut parts = Vec::with_capacity(key_props.len() + 1);
1526 for (k, v) in key_props {
1527 if !Self::is_safe_key_ident(k) {
1528 return None;
1529 }
1530 let lit = Self::render_key_literal(v)?;
1531 // Unquoted identifier: the Lance filter parser does not resolve a
1532 // double-quoted column name against the table here, so `"k" = v`
1533 // silently matches nothing. Keys are validated above to be safe
1534 // bare identifiers.
1535 parts.push(format!("{k} = {lit}"));
1536 }
1537 parts.push("_deleted = false".to_string());
1538 Some(parts.join(" AND "))
1539 }
1540
1541 /// True when a MERGE key name is a safe bare identifier for a Lance
1542 /// filter (issue #8). Keys come from a static map literal, but validate
1543 /// anyway.
1544 fn is_safe_key_ident(k: &str) -> bool {
1545 !k.is_empty() && k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
1546 }
1547
1548 /// Render a scalar MERGE-key value as a Lance filter literal, or `None`
1549 /// for value types this fast path cannot represent (lists, maps,
1550 /// temporals, nulls) — the caller then falls back to the general path.
1551 fn render_key_literal(v: &Value) -> Option<String> {
1552 Some(match v {
1553 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1554 Value::Int(i) => i.to_string(),
1555 Value::Float(f) => f.to_string(),
1556 Value::Bool(b) => b.to_string(),
1557 _ => return None,
1558 })
1559 }
1560
1561 /// Build ONE scan filter matching every key tuple in `keys` (all tuples
1562 /// sorted by `key_names` order, values canonicalized).
1563 ///
1564 /// Single-column keys render as type-grouped `k IN (…)` lists (a filter
1565 /// never compares mixed literal types against one column); composite keys
1566 /// render as an OR of per-tuple conjunctions. Both forms are wrapped with
1567 /// the same `_deleted = false` clause the per-row filter used.
1568 fn merge_batch_filter(key_names: &[String], keys: &[&MergeKey]) -> Option<String> {
1569 if keys.is_empty() || key_names.iter().any(|k| !Self::is_safe_key_ident(k)) {
1570 return None;
1571 }
1572 let disjunction = if let [key] = key_names {
1573 // Group literals by value variant so each IN list is homogeneous.
1574 let mut groups: HashMap<std::mem::Discriminant<Value>, Vec<String>> = HashMap::new();
1575 for tuple in keys {
1576 let (_, v) = tuple.first()?;
1577 groups
1578 .entry(std::mem::discriminant(v))
1579 .or_default()
1580 .push(Self::render_key_literal(v)?);
1581 }
1582 groups
1583 .into_values()
1584 .map(|lits| {
1585 if let [lit] = lits.as_slice() {
1586 format!("{key} = {lit}")
1587 } else {
1588 format!("{key} IN ({})", lits.join(", "))
1589 }
1590 })
1591 .collect::<Vec<_>>()
1592 .join(" OR ")
1593 } else {
1594 keys.iter()
1595 .map(|tuple| {
1596 let conj = tuple
1597 .iter()
1598 .map(|(k, v)| Some(format!("{k} = {}", Self::render_key_literal(v)?)))
1599 .collect::<Option<Vec<_>>>()?
1600 .join(" AND ");
1601 Some(format!("({conj})"))
1602 })
1603 .collect::<Option<Vec<_>>>()?
1604 .join(" OR ")
1605 };
1606 Some(format!("({disjunction}) AND _deleted = false"))
1607 }
1608
1609 /// Canonicalize a numeric MERGE-key value for *matching only*.
1610 ///
1611 /// A finite `Float` with an integral value (e.g. `1.0`) is mapped to the
1612 /// equivalent `Int`, so an `Int(1)` key matches a node stored with
1613 /// `Float(1.0)` and vice versa — the coercion the general (DataFusion) MERGE
1614 /// path already applies (review #3a). Non-numeric and non-integral values are
1615 /// returned unchanged. Used only to build match keys / comparisons, never the
1616 /// value written to a created node.
1617 fn canonical_key_value(v: &Value) -> Value {
1618 match v {
1619 Value::Float(f)
1620 if f.is_finite()
1621 && f.fract() == 0.0
1622 && *f >= i64::MIN as f64
1623 && *f <= i64::MAX as f64 =>
1624 {
1625 Value::Int(*f as i64)
1626 }
1627 other => other.clone(),
1628 }
1629 }
1630
1631 /// Canonical sorted `(name, value)` key tuple for a MERGE row's key map.
1632 ///
1633 /// Numeric values are canonicalized ([`Self::canonical_key_value`]) so the
1634 /// tuple compares equal regardless of `Int`/`Float` spelling. This tuple is
1635 /// used purely as a match key (intra-batch dedup, L0 overlay lookup); the
1636 /// created node's properties come from the original, un-canonicalized map.
1637 fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1638 let mut tuple: MergeKey = key_props
1639 .iter()
1640 .map(|(k, v)| (k.clone(), Self::canonical_key_value(v)))
1641 .collect();
1642 tuple.sort_by(|a, b| a.0.cmp(&b.0));
1643 tuple
1644 }
1645
1646 /// Snapshot all live L0 vertices of `label`, grouped by their MERGE key.
1647 ///
1648 /// Walked once per MERGE statement (issue #69): the per-row fast path then
1649 /// resolves L0/uncommitted matches with an O(1) map lookup instead of
1650 /// re-enumerating L0 for every row. Captures committed-not-yet-persisted
1651 /// rows and rows created earlier in the same transaction; rows created by
1652 /// later rows of this same statement are folded in incrementally by
1653 /// [`Self::execute_merge_row_indexed`]. `key_names` must be sorted to match
1654 /// [`Self::merge_key_tuple`].
1655 fn merge_l0_existing(
1656 &self,
1657 label: &str,
1658 key_names: &[String],
1659 ctx: Option<&QueryContext>,
1660 ) -> HashMap<MergeKey, Vec<Vid>> {
1661 let mut candidates: Vec<Vid> = Vec::new();
1662 l0_visibility::visit_l0_buffers(ctx, |l0| {
1663 if let Some(vids) = l0.label_to_vids.get(label) {
1664 candidates.extend(vids.iter().copied());
1665 }
1666 false
1667 });
1668
1669 let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1670 let mut seen: HashSet<Vid> = HashSet::new();
1671 for vid in candidates {
1672 if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1673 continue;
1674 }
1675 // `lookup_vertex_prop` merges across L0 layers (newest wins).
1676 let tuple: MergeKey = key_names
1677 .iter()
1678 .map(|k| {
1679 let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1680 (k.clone(), Self::canonical_key_value(&v))
1681 })
1682 .collect();
1683 map.entry(tuple).or_default().push(vid);
1684 }
1685 map
1686 }
1687
1688 /// Maximum key tuples per batched MERGE scan — bounds the filter-string
1689 /// size and Lance/DataFusion parse cost; chunks run sequentially.
1690 const MERGE_SCAN_CHUNK: usize = 1000;
1691
1692 /// Persisted (flushed) vertices of `label` for EVERY key tuple in `keys`,
1693 /// resolved with one scan per [`Self::MERGE_SCAN_CHUNK`] tuples instead of
1694 /// one scan per input row (review perf #4: `UNWIND … MERGE` issued N
1695 /// independent Lance scans).
1696 ///
1697 /// Scans via [`uni_store::StorageManager::scan_vertex_table`] — the same
1698 /// read path `MATCH` uses, so it honors the version high-water-mark and
1699 /// sees flushed rows. On the declared-label branch the key-filtered scan
1700 /// only NOMINATES candidate vids; a second, unfiltered `_vid IN (…)` pass
1701 /// picks each candidate's max-`_version` row and requires it to be live
1702 /// and still keyed as requested (per-label tables are MVCC-append, so a
1703 /// superseded version's row would otherwise stale-match a rewritten key).
1704 /// Matched rows are grouped by their CANONICAL key tuple (stored values
1705 /// run through [`Self::canonical_key_value`], so a stored `Float(1.0)`
1706 /// lands under a requested `Int(1)` — the coercion Lance's numeric filter
1707 /// equality applies). Liveness against L0 overlays (deletes, key rewrites
1708 /// by earlier rows of the same statement) is NOT checked here — the
1709 /// per-row consumer re-checks at row time, exactly as the old per-row
1710 /// scan did.
1711 ///
1712 /// The second returned map carries the FULL property maps the schemaless
1713 /// branch already decoded for each matched vid (empty on the declared-label
1714 /// branch, which projects only key columns) — the caller seeds the
1715 /// statement-level [`Prefetch`] from it at zero extra scans.
1716 ///
1717 /// # Errors
1718 /// Propagates persisted-scan and filter-build failures — fail-closed: a
1719 /// MERGE must never treat a failed lookup as "no match" and create
1720 /// duplicates.
1721 async fn merge_lookup_persisted_batch(
1722 &self,
1723 label: &str,
1724 key_names: &[String],
1725 keys: &HashSet<MergeKey>,
1726 ) -> Result<(
1727 HashMap<MergeKey, Vec<Vid>>,
1728 HashMap<Vid, uni_common::Properties>,
1729 )> {
1730 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1731 if keys.is_empty() {
1732 return Ok((out, HashMap::new()));
1733 }
1734 // An undeclared (schemaless) label has no per-label table — its flushed
1735 // rows live only in the unified main vertex table. Route to the
1736 // main-table lookup, mirroring the planner's scan routing (a schemaless
1737 // MATCH plans `ScanMainByLabels` on the same schema predicate).
1738 if self
1739 .storage
1740 .schema_manager()
1741 .schema()
1742 .get_label_case_insensitive(label)
1743 .is_none()
1744 {
1745 return self
1746 .merge_lookup_persisted_batch_schemaless(label, key_names, keys)
1747 .await;
1748 }
1749 // Declared label — the per-label table is MVCC-append (an update
1750 // flush adds a higher-`_version` row for the same vid) and the key
1751 // predicate is pushed into the Lance filter, so a SUPERSEDED version
1752 // whose row still carries a requested key is returned while the vid's
1753 // current row (key rewritten, fails the filter) is invisible to the
1754 // scan. Version dedup among the returned rows cannot detect that, so
1755 // the lookup runs in two passes: the key-filtered scan only nominates
1756 // candidate vids, and an unfiltered `_vid IN (…)` scan then requires
1757 // each candidate's max-`_version` row to be live and still keyed as
1758 // requested.
1759 let mut columns: Vec<&str> = vec!["_vid"];
1760 columns.extend(key_names.iter().map(String::as_str));
1761
1762 let key_list: Vec<&MergeKey> = keys.iter().collect();
1763 let mut candidates: Vec<Vid> = Vec::new();
1764 let mut seen: HashSet<Vid> = HashSet::new();
1765 for chunk in key_list.chunks(Self::MERGE_SCAN_CHUNK) {
1766 let filter = Self::merge_batch_filter(key_names, chunk)
1767 .ok_or_else(|| anyhow!("MERGE fast path could not build a batched key filter"))?;
1768 let scanned = self
1769 .storage
1770 .scan_vertex_table(label, &columns, Some(&filter))
1771 .await?;
1772 let Some(batch) = scanned else { continue };
1773 let Some(vid_col) = batch
1774 .column_by_name("_vid")
1775 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1776 else {
1777 continue;
1778 };
1779 for i in 0..vid_col.len() {
1780 let vid = Vid::from(vid_col.value(i));
1781 if seen.insert(vid) {
1782 candidates.push(vid);
1783 }
1784 }
1785 }
1786
1787 // Verification pass — tombstones are NOT filtered Lance-side (the
1788 // max-version pick must see them so a deleted winner cannot let an
1789 // older live version resurrect the match), exactly like the
1790 // schemaless branch below.
1791 let mut verify_columns: Vec<&str> = vec!["_vid", "_deleted", "_version"];
1792 verify_columns.extend(key_names.iter().map(String::as_str));
1793 for chunk in candidates.chunks(Self::MERGE_SCAN_CHUNK) {
1794 let vid_list = chunk
1795 .iter()
1796 .map(|v| v.as_u64().to_string())
1797 .collect::<Vec<_>>()
1798 .join(", ");
1799 let filter = format!("_vid IN ({vid_list})");
1800 let scanned = self
1801 .storage
1802 .scan_vertex_table(label, &verify_columns, Some(&filter))
1803 .await?;
1804 let Some(batch) = scanned else { continue };
1805 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1806 batch
1807 .column_by_name("_vid")
1808 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1809 batch
1810 .column_by_name("_deleted")
1811 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1812 batch
1813 .column_by_name("_version")
1814 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1815 ) else {
1816 return Err(anyhow!(
1817 "MERGE batched lookup: verification scan missing a required column"
1818 ));
1819 };
1820 let key_cols: Vec<_> = key_names
1821 .iter()
1822 .map(|k| batch.column_by_name(k))
1823 .collect::<Option<Vec<_>>>()
1824 .ok_or_else(|| {
1825 anyhow!("MERGE batched lookup: projected key column missing from scan result")
1826 })?;
1827 // Per-vid MVCC dedup: keep the highest-version row for each vid.
1828 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1829 for i in 0..batch.num_rows() {
1830 let vid = Vid::from(vid_col.value(i));
1831 let ver = ver_col.value(i);
1832 let entry = winners.entry(vid).or_insert((ver, i));
1833 if ver > entry.0 {
1834 *entry = (ver, i);
1835 }
1836 }
1837 for (vid, (_ver, row)) in winners {
1838 if del_col.value(row) {
1839 continue;
1840 }
1841 let tuple: MergeKey = key_names
1842 .iter()
1843 .zip(&key_cols)
1844 .map(|(k, col)| {
1845 let v = uni_store::storage::arrow_convert::arrow_to_value(
1846 col.as_ref(),
1847 row,
1848 None,
1849 );
1850 (k.clone(), Self::canonical_key_value(&v))
1851 })
1852 .collect();
1853 if keys.contains(&tuple) {
1854 out.entry(tuple).or_default().push(vid);
1855 }
1856 }
1857 }
1858 Ok((out, HashMap::new()))
1859 }
1860
1861 /// Persisted-match lookup for an UNDECLARED (schemaless) label.
1862 ///
1863 /// Schemaless rows live only in the unified main vertex table (per-label
1864 /// tables exist only for declared labels), with all properties encoded in
1865 /// the `props_json` CypherValue blob — so key values cannot be pushed into
1866 /// the Lance filter; the key match happens in memory after decoding,
1867 /// exactly like the schemaless MATCH scan. One main-table scan regardless
1868 /// of key count.
1869 ///
1870 /// Mirrors `columnar_scan_schemaless_vertex_batch_static`: tombstones are
1871 /// NOT filtered Lance-side (MVCC dedup must see them to pick the winning
1872 /// version per vid); the per-vid max-`_version` dedup runs here, then
1873 /// deleted winners are dropped.
1874 ///
1875 /// Also returns the full decoded property map per matched vid — the blob
1876 /// is decoded here anyway, and the caller seeds the statement-level
1877 /// [`Prefetch`] from it instead of re-reading per row.
1878 ///
1879 /// # Errors
1880 /// Propagates scan and blob-decode failures — fail-closed: a MERGE must
1881 /// never treat a failed lookup as "no match" and create duplicates.
1882 async fn merge_lookup_persisted_batch_schemaless(
1883 &self,
1884 label: &str,
1885 key_names: &[String],
1886 keys: &HashSet<MergeKey>,
1887 ) -> Result<(
1888 HashMap<MergeKey, Vec<Vid>>,
1889 HashMap<Vid, uni_common::Properties>,
1890 )> {
1891 let mut out: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1892 let mut props_by_vid: HashMap<Vid, uni_common::Properties> = HashMap::new();
1893 let filter = format!("array_contains(labels, '{}')", label.replace('\'', "''"));
1894 let Some(batch) = self
1895 .storage
1896 .scan_main_vertex_table(
1897 &["_vid", "_deleted", "props_json", "_version"],
1898 Some(&filter),
1899 )
1900 .await?
1901 else {
1902 return Ok((out, props_by_vid));
1903 };
1904 let (Some(vid_col), Some(del_col), Some(ver_col)) = (
1905 batch
1906 .column_by_name("_vid")
1907 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1908 batch
1909 .column_by_name("_deleted")
1910 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>()),
1911 batch
1912 .column_by_name("_version")
1913 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>()),
1914 ) else {
1915 return Err(anyhow!(
1916 "schemaless MERGE lookup: main vertex table scan missing a required column"
1917 ));
1918 };
1919 let props_col = batch
1920 .column_by_name("props_json")
1921 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1922
1923 // Per-vid MVCC dedup: keep the highest-version row for each vid.
1924 let mut winners: HashMap<Vid, (u64, usize)> = HashMap::new();
1925 for i in 0..batch.num_rows() {
1926 let vid = Vid::from(vid_col.value(i));
1927 let ver = ver_col.value(i);
1928 let entry = winners.entry(vid).or_insert((ver, i));
1929 if ver > entry.0 {
1930 *entry = (ver, i);
1931 }
1932 }
1933 for (vid, (_ver, row)) in winners {
1934 // Drop deletion tombstones AFTER picking the winner — a deleted
1935 // winner must not let an older live version resurrect the match.
1936 if del_col.value(row) {
1937 continue;
1938 }
1939 // A row without properties matches only an all-Null key tuple.
1940 let props = match props_col {
1941 Some(arr) if !arrow_array::Array::is_null(arr, row) => {
1942 match uni_common::cypher_value_codec::decode(arr.value(row))
1943 .map_err(|e| anyhow!("schemaless MERGE lookup: props decode: {e}"))?
1944 {
1945 Value::Map(m) => m,
1946 _ => HashMap::new(),
1947 }
1948 }
1949 _ => HashMap::new(),
1950 };
1951 let tuple: MergeKey = key_names
1952 .iter()
1953 .map(|k| {
1954 (
1955 k.clone(),
1956 Self::canonical_key_value(props.get(k).unwrap_or(&Value::Null)),
1957 )
1958 })
1959 .collect();
1960 if keys.contains(&tuple) {
1961 out.entry(tuple).or_default().push(vid);
1962 props_by_vid.insert(vid, props);
1963 }
1964 }
1965 Ok((out, props_by_vid))
1966 }
1967
1968 /// True if the statement-level MERGE property prefetch is safe for `label`.
1969 ///
1970 /// False when the label declares any CRDT-typed property: a prefetch HIT in
1971 /// [`read_vertex_props_with_prefetch`] skips the `normalize_crdt_properties`
1972 /// pass that `get_all_vertex_props_with_ctx` applies, so CRDT-bearing
1973 /// labels keep the per-row read path. Undeclared labels are trivially safe
1974 /// (normalization is a no-op without schema CRDT entries).
1975 fn merge_label_prefetch_safe(&self, label: &str) -> bool {
1976 let schema = self.storage.schema_manager().schema();
1977 schema.properties.get(label).is_none_or(|props| {
1978 !props
1979 .values()
1980 .any(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1981 })
1982 }
1983
1984 /// True if an L0 override rewrote any key column of a persisted match away
1985 /// from its requested value (so the persisted row no longer matches).
1986 fn vid_overrides_break_key(
1987 vid: Vid,
1988 key_props: &HashMap<String, Value>,
1989 ctx: Option<&QueryContext>,
1990 ) -> bool {
1991 key_props.iter().any(|(k, want)| {
1992 matches!(
1993 l0_visibility::lookup_vertex_prop(vid, k, ctx),
1994 Some(got) if Self::canonical_key_value(&got) != Self::canonical_key_value(want)
1995 )
1996 })
1997 }
1998
1999 /// Build a node Map value (`{_vid, _labels, ...props}`) for binding a MERGE
2000 /// node variable.
2001 ///
2002 /// Matches the binding shape produced by `execute_create_pattern` and the
2003 /// general MATCH path, so ON MATCH SET, RETURN, and downstream operators
2004 /// resolve the variable identically — a bare `Value::Int(vid)` is not a
2005 /// valid node binding for those consumers.
2006 fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
2007 let mut obj = HashMap::new();
2008 obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
2009 obj.insert(
2010 "_labels".to_string(),
2011 Value::List(vec![Value::String(label.to_string())]),
2012 );
2013 for (k, v) in props {
2014 obj.insert(k, v);
2015 }
2016 Value::Map(obj)
2017 }
2018
2019 /// True if an L0-only vertex has every key column set to the requested
2020 /// value. A missing column matches only a requested `Null`.
2021 fn l0_vid_matches_key(
2022 vid: Vid,
2023 key_props: &HashMap<String, Value>,
2024 ctx: Option<&QueryContext>,
2025 ) -> bool {
2026 key_props.iter().all(
2027 |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
2028 Some(got) => Self::canonical_key_value(&got) == Self::canonical_key_value(want),
2029 None => *want == Value::Null,
2030 },
2031 )
2032 }
2033
2034 /// Index fast-path execution for one MERGE row of the shape detected by
2035 /// [`Self::merge_single_node_fastpath`].
2036 ///
2037 /// Resolves matches from the per-batch L0 snapshot `existing` (O(1) lookup,
2038 /// no per-row L0 enumeration) plus the per-statement persisted prefetch
2039 /// (`persisted`, built once by [`Self::merge_lookup_persisted_batch`]);
2040 /// applies ON MATCH SET to every match, or creates the node and applies
2041 /// ON CREATE SET when there is none. A newly created vertex is folded into
2042 /// `existing` so a later row of the same batch with the same key matches it
2043 /// (intra-batch dedup). Returns the RETURN rows for this input row (one per
2044 /// match, or one for a create).
2045 ///
2046 /// `prefetched` is the statement-level property prefetch (`None` when the
2047 /// label is CRDT-bearing, see [`Self::merge_label_prefetch_safe`]): matched
2048 /// vids carry their persisted base row, freshly created vids are seeded
2049 /// with an empty base — per-row reads then resolve as base + L0 layering
2050 /// (every SET flush writes the full row to L0 before the next read, so a
2051 /// prefetch hit equals a fresh read) instead of one storage scan each.
2052 ///
2053 /// # Errors
2054 /// Propagates evaluation, create, and SET failures.
2055 #[expect(
2056 clippy::too_many_arguments,
2057 reason = "mirrors execute_merge's threaded execution state"
2058 )]
2059 async fn execute_merge_row_indexed(
2060 &self,
2061 label: &str,
2062 node: &NodePattern,
2063 path_pattern: &Pattern,
2064 temp_vars: &[String],
2065 mut row: HashMap<String, Value>,
2066 key_props: &HashMap<String, Value>,
2067 persisted: &HashMap<MergeKey, Vec<Vid>>,
2068 key_tuple: &MergeKey,
2069 existing: &mut HashMap<MergeKey, Vec<Vid>>,
2070 on_match: Option<&SetClause>,
2071 on_create: Option<&SetClause>,
2072 prop_manager: &PropertyManager,
2073 params: &HashMap<String, Value>,
2074 ctx: Option<&QueryContext>,
2075 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2076 writer: &Writer,
2077 mut prefetched: Option<&mut Prefetch>,
2078 ) -> Result<Vec<HashMap<String, Value>>> {
2079 let empty_prefetch = Prefetch::default();
2080 let mut seen: HashSet<Vid> = HashSet::new();
2081 let mut matches: Vec<Vid> = Vec::new();
2082 // Persisted (flushed) matches from the per-statement prefetch. The
2083 // prefetch is static for the statement, so re-verify liveness at row
2084 // time — an earlier row of this batch may have deleted the candidate
2085 // or rewritten its key (the old per-row scan saw those through its L0
2086 // overlay checks; these are the same checks, moved to row time).
2087 if let Some(vids) = persisted.get(key_tuple) {
2088 for &vid in vids {
2089 if l0_visibility::is_vertex_deleted(vid, ctx) {
2090 continue;
2091 }
2092 if Self::vid_overrides_break_key(vid, key_props, ctx) {
2093 continue;
2094 }
2095 if seen.insert(vid) {
2096 matches.push(vid);
2097 }
2098 }
2099 }
2100 // L0 / intra-batch matches from the per-batch snapshot, re-verified live
2101 // in case a prior row of this batch mutated or deleted the candidate.
2102 if let Some(vids) = existing.get(key_tuple) {
2103 for &vid in vids {
2104 if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
2105 continue;
2106 }
2107 if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
2108 matches.push(vid);
2109 }
2110 }
2111 }
2112
2113 let mut out = Vec::new();
2114 if matches.is_empty() {
2115 // No match: create the node, then apply ON CREATE SET. Fold the
2116 // ON CREATE SET property assignments into seed props first so a
2117 // NOT-NULL property supplied only by ON CREATE SET passes
2118 // create-time validation (RC4); the post-create SET below settles
2119 // the final values.
2120 let seed_props = self
2121 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2122 .await?;
2123 self.execute_create_pattern(
2124 path_pattern,
2125 &mut row,
2126 writer,
2127 prop_manager,
2128 params,
2129 ctx,
2130 tx_l0_override,
2131 Some(&seed_props),
2132 )
2133 .await?;
2134 // Fold the new vertex into the batch snapshot for intra-batch
2135 // dedup, and seed the statement prefetch with an empty base: a
2136 // fresh vid has nothing in storage, so ON CREATE SET's lazy read
2137 // resolves from the L0 row the create just wrote instead of
2138 // issuing a per-row storage scan that finds nothing.
2139 if let Some(var) = &node.variable
2140 && let Some(val) = row.get(var)
2141 && let Ok(vid) = Self::vid_from_value(val)
2142 {
2143 existing.entry(key_tuple.clone()).or_default().push(vid);
2144 if let Some(p) = prefetched.as_deref_mut() {
2145 p.vertex.entry(vid).or_default();
2146 }
2147 }
2148 if let Some(set) = on_create {
2149 self.execute_set_items_locked(
2150 &set.items,
2151 &mut row,
2152 writer,
2153 prop_manager,
2154 params,
2155 ctx,
2156 tx_l0_override,
2157 prefetched.as_deref().unwrap_or(&empty_prefetch),
2158 )
2159 .await?;
2160 }
2161 Self::bind_path_variables(path_pattern, &mut row, temp_vars);
2162 out.push(row);
2163 } else {
2164 // Apply ON MATCH SET to every matched node (multi-match semantics),
2165 // binding the node variable as a Map with _vid/_labels/props so
2166 // RETURN and downstream operators resolve it as they would for the
2167 // general MATCH and CREATE paths.
2168 for vid in matches {
2169 let mut m = row.clone();
2170 if let Some(var) = &node.variable {
2171 // Minimal binding so ON MATCH SET resolves the node by _vid.
2172 m.insert(
2173 var.clone(),
2174 Self::build_node_map(vid, label, HashMap::new()),
2175 );
2176 }
2177 if let Some(set) = on_match {
2178 self.execute_set_items_locked(
2179 &set.items,
2180 &mut m,
2181 writer,
2182 prop_manager,
2183 params,
2184 ctx,
2185 tx_l0_override,
2186 prefetched.as_deref().unwrap_or(&empty_prefetch),
2187 )
2188 .await?;
2189 }
2190 if let Some(var) = &node.variable {
2191 // Rebind with full, post-SET properties for RETURN
2192 // fidelity. The SET above flushed the full row to L0, so a
2193 // prefetch hit (base + L0 layering) reproduces exactly
2194 // what a fresh storage read would return.
2195 let props = read_vertex_props_with_prefetch(
2196 vid,
2197 prefetched.as_deref().unwrap_or(&empty_prefetch),
2198 prop_manager,
2199 ctx,
2200 )
2201 .await?;
2202 m.insert(var.clone(), Self::build_node_map(vid, label, props));
2203 }
2204 Self::bind_path_variables(path_pattern, &mut m, temp_vars);
2205 out.push(m);
2206 }
2207 }
2208 Ok(out)
2209 }
2210
2211 #[expect(clippy::too_many_arguments)]
2212 pub(crate) async fn execute_merge(
2213 &self,
2214 rows: Vec<HashMap<String, Value>>,
2215 pattern: &Pattern,
2216 on_match: Option<&SetClause>,
2217 on_create: Option<&SetClause>,
2218 prop_manager: &PropertyManager,
2219 params: &HashMap<String, Value>,
2220 ctx: Option<&QueryContext>,
2221 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2222 ) -> Result<Vec<HashMap<String, Value>>> {
2223 let writer_lock = self
2224 .writer
2225 .as_ref()
2226 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
2227
2228 // Prepare pattern for path variable binding: assign temp edge variable
2229 // names to unnamed relationships in paths that have path variables.
2230 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
2231
2232 // Issue #69: a single-node, single-label MERGE takes the fast path,
2233 // skipping the per-row query planning that made batched MERGE no faster
2234 // than a per-entity loop. Indexed keys get an index point-lookup;
2235 // un-indexed keys still skip planning (the lookup is a filtered scan).
2236 // The shape is the same for every row, so it is detected once.
2237 let fastpath = self.merge_single_node_fastpath(pattern);
2238
2239 // Build the per-batch L0 snapshot once (issue #69 Phase C): the per-row
2240 // fast path then resolves L0/intra-batch matches with an O(1) lookup
2241 // instead of re-walking L0 for every row. `key_names` is the sorted
2242 // static key set, matching `merge_key_tuple`.
2243 let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2244 // Per-row pre-evaluated fast-path keys (None = that row falls back to
2245 // the general path), and the per-statement persisted prefetch over the
2246 // deduped key tuples — ONE chunked scan instead of one scan per row.
2247 // Key expressions only see the row's own bindings + params, so
2248 // evaluating them ahead of any creates cannot observe earlier rows.
2249 let mut row_fast: Vec<Option<(HashMap<String, Value>, MergeKey)>> = Vec::new();
2250 let mut fast_persisted: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
2251 // Statement-level property prefetch for the fast path (review perf
2252 // residual): every persisted match's full row is batch-read ONCE, so
2253 // the per-row ON MATCH SET read and the post-SET rebind resolve as
2254 // prefetch-base + L0 layering instead of one storage scan each.
2255 // `None` disables it for CRDT-bearing labels (the prefetch-hit read
2256 // skips CRDT normalization).
2257 let mut merge_prefetch: Option<Prefetch> = None;
2258 if let Some((node, label)) = &fastpath {
2259 let mut key_names: Vec<String> = match &node.properties {
2260 Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
2261 _ => Vec::new(),
2262 };
2263 key_names.sort();
2264 fast_existing = self.merge_l0_existing(label, &key_names, ctx);
2265
2266 row_fast.reserve(rows.len());
2267 for row in &rows {
2268 let mut key_props: HashMap<String, Value> = HashMap::new();
2269 if let Some(props_expr) = &node.properties
2270 && let Value::Map(map) = self
2271 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2272 .await?
2273 {
2274 key_props = map;
2275 }
2276 // Only rows whose every key value is a scalar the persisted
2277 // scan can express take the fast path (same gate as before,
2278 // via the filter builder).
2279 if Self::merge_key_filter(&key_props).is_some() {
2280 let tuple = Self::merge_key_tuple(&key_props);
2281 row_fast.push(Some((key_props, tuple)));
2282 } else {
2283 row_fast.push(None);
2284 }
2285 }
2286 let unique_keys: HashSet<MergeKey> = row_fast
2287 .iter()
2288 .flatten()
2289 .map(|(_, tuple)| tuple.clone())
2290 .collect();
2291 let (persisted, schemaless_props) = self
2292 .merge_lookup_persisted_batch(label, &key_names, &unique_keys)
2293 .await?;
2294 fast_persisted = persisted;
2295 if self.merge_label_prefetch_safe(label) {
2296 let mut pf = Prefetch::default();
2297 if !schemaless_props.is_empty() {
2298 // The schemaless lookup already decoded each matched vid's
2299 // full property map — zero extra scans.
2300 pf.vertex.extend(schemaless_props);
2301 } else {
2302 let vids: Vec<Vid> = fast_persisted
2303 .values()
2304 .flatten()
2305 .copied()
2306 .collect::<HashSet<Vid>>()
2307 .into_iter()
2308 .collect();
2309 if !vids.is_empty()
2310 && let Ok(batch_props) = prop_manager
2311 .get_batch_vertex_props_for_label(&vids, label, ctx)
2312 .await
2313 {
2314 // One `_vid IN (…)` scan for every matched row's base.
2315 // On Err the map stays empty — every read falls back to
2316 // the per-row path (fail-open, same posture as
2317 // prefetch_set_targets).
2318 pf.vertex.extend(batch_props);
2319 }
2320 }
2321 merge_prefetch = Some(pf);
2322 }
2323 }
2324
2325 let mut results = Vec::new();
2326 for (idx, mut row) in rows.into_iter().enumerate() {
2327 // Rows with a pre-evaluated scalar key take the fast path; rows
2328 // with a non-scalar key fall through to the general path below.
2329 if let Some((node, label)) = &fastpath
2330 && let Some((key_props, key_tuple)) = row_fast.get(idx).and_then(|rf| rf.as_ref())
2331 {
2332 let writer: &uni_store::Writer = writer_lock.as_ref();
2333 let row_out = self
2334 .execute_merge_row_indexed(
2335 label,
2336 node,
2337 &path_pattern,
2338 &temp_vars,
2339 row,
2340 key_props,
2341 &fast_persisted,
2342 key_tuple,
2343 &mut fast_existing,
2344 on_match,
2345 on_create,
2346 prop_manager,
2347 params,
2348 ctx,
2349 tx_l0_override,
2350 writer,
2351 merge_prefetch.as_mut(),
2352 )
2353 .await?;
2354 results.extend(row_out);
2355 continue;
2356 }
2357
2358 // General execution: match-or-create per row. (The index fast path
2359 // above already handles single-node, single-label, scalar-indexed
2360 // MERGE — including unique-constrained labels, whose keys are
2361 // indexed — so there is no separate constraint-only fast path.)
2362 let matches = self
2363 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
2364 .await?;
2365 let writer: &uni_store::Writer = writer_lock.as_ref();
2366
2367 let result: Result<Vec<HashMap<String, Value>>> = async {
2368 let mut batch = Vec::new();
2369 if !matches.is_empty() {
2370 for mut m in matches {
2371 if let Some(set) = on_match {
2372 self.execute_set_items_locked(
2373 &set.items,
2374 &mut m,
2375 writer,
2376 prop_manager,
2377 params,
2378 ctx,
2379 tx_l0_override,
2380 &Prefetch::default(),
2381 )
2382 .await?;
2383 }
2384 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
2385 batch.push(m);
2386 }
2387 } else {
2388 // Fold ON CREATE SET into seed props so a NOT-NULL property
2389 // set only by ON CREATE SET passes create-time validation
2390 // (RC4); the post-create SET below settles the final values.
2391 let seed_props = self
2392 .on_create_seed_props(on_create, &row, prop_manager, params, ctx)
2393 .await?;
2394 self.execute_create_pattern(
2395 &path_pattern,
2396 &mut row,
2397 writer,
2398 prop_manager,
2399 params,
2400 ctx,
2401 tx_l0_override,
2402 Some(&seed_props),
2403 )
2404 .await?;
2405 if let Some(set) = on_create {
2406 self.execute_set_items_locked(
2407 &set.items,
2408 &mut row,
2409 writer,
2410 prop_manager,
2411 params,
2412 ctx,
2413 tx_l0_override,
2414 &Prefetch::default(),
2415 )
2416 .await?;
2417 }
2418 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
2419 batch.push(row);
2420 }
2421 Ok(batch)
2422 }
2423 .await;
2424
2425 results.extend(result?);
2426 }
2427 Ok(results)
2428 }
2429
2430 /// Pre-evaluate `ON CREATE SET` property assignments into per-variable seeds.
2431 ///
2432 /// Folds `SET <var>.<prop> = <expr>` items so a NOT-NULL property supplied
2433 /// only by `ON CREATE SET` is present when the MERGE node is created and
2434 /// passes constraint validation (RC4). The right-hand side is evaluated
2435 /// against the current `row`.
2436 ///
2437 /// Items whose right-hand side references the target variable (e.g.
2438 /// `ON CREATE SET n.c = coalesce(n.c, 0) + 1`) are NOT folded: seeding would
2439 /// let the post-create SET read the seeded value and apply the assignment
2440 /// twice. Such items run only post-create, exactly once (unchanged behavior).
2441 ///
2442 /// # Errors
2443 /// Returns an error if evaluating an assignment's right-hand side fails.
2444 pub(crate) async fn on_create_seed_props(
2445 &self,
2446 on_create: Option<&SetClause>,
2447 row: &HashMap<String, Value>,
2448 prop_manager: &PropertyManager,
2449 params: &HashMap<String, Value>,
2450 ctx: Option<&QueryContext>,
2451 ) -> Result<HashMap<String, HashMap<String, Value>>> {
2452 let mut seed: HashMap<String, HashMap<String, Value>> = HashMap::new();
2453 let Some(set) = on_create else {
2454 return Ok(seed);
2455 };
2456 for item in &set.items {
2457 if let SetItem::Property { expr, value } = item
2458 && let Expr::Property(var_expr, prop_name) = expr
2459 && let Expr::Variable(var_name) = &**var_expr
2460 // Skip self-referential RHS so the post-create SET (which also
2461 // runs) applies it exactly once rather than reading the seed.
2462 && !crate::query::df_graph::locy_ast_builder::expr_references_var(
2463 value, var_name,
2464 )
2465 {
2466 let val = self
2467 .evaluate_expr(value, row, prop_manager, params, ctx)
2468 .await?;
2469 seed.entry(var_name.clone())
2470 .or_default()
2471 .insert(prop_name.clone(), val);
2472 }
2473 }
2474 Ok(seed)
2475 }
2476
2477 /// Execute a CREATE pattern, inserting new vertices and edges into the graph.
2478 #[expect(clippy::too_many_arguments)]
2479 pub(crate) async fn execute_create_pattern(
2480 &self,
2481 pattern: &Pattern,
2482 row: &mut HashMap<String, Value>,
2483 writer: &Writer,
2484 prop_manager: &PropertyManager,
2485 params: &HashMap<String, Value>,
2486 ctx: Option<&QueryContext>,
2487 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2488 // Per-variable properties to gap-fill into newly-created nodes before
2489 // constraint validation. Used by MERGE to fold `ON CREATE SET` so a
2490 // NOT-NULL property supplied only by ON CREATE SET passes create-time
2491 // validation (RC4). `None` for plain CREATE.
2492 seed_props: Option<&HashMap<String, HashMap<String, Value>>>,
2493 ) -> Result<()> {
2494 for path in &pattern.paths {
2495 let mut prev_vid: Option<Vid> = None;
2496 // (rel_var, type_id, type_name, props_expr, direction)
2497 type PendingRel = (String, u32, String, Option<Expr>, Direction);
2498 let mut rel_pending: Option<PendingRel> = None;
2499
2500 for element in &path.elements {
2501 match element {
2502 PatternElement::Node(n) => {
2503 let mut vid = None;
2504
2505 // Check if node variable already bound in row
2506 if let Some(var) = &n.variable
2507 && let Some(val) = row.get(var)
2508 && let Ok(existing_vid) = Self::vid_from_value(val)
2509 {
2510 vid = Some(existing_vid);
2511 }
2512
2513 // If not bound, create it
2514 if vid.is_none() {
2515 let mut props = HashMap::new();
2516 if let Some(props_expr) = &n.properties {
2517 let props_val = self
2518 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2519 .await?;
2520 if let Value::Map(map) = props_val {
2521 for (k, v) in map {
2522 props.insert(k, v);
2523 }
2524 } else {
2525 return Err(anyhow!("Properties must evaluate to a map"));
2526 }
2527 }
2528
2529 // MERGE ON CREATE SET: gap-fill properties supplied
2530 // only by ON CREATE SET so a NOT-NULL property absent
2531 // from the merge key passes create-time validation
2532 // (RC4). `or_insert` keeps the merge-key/pattern props
2533 // authoritative; the post-create SET re-applies the
2534 // real values, so the final state is unchanged.
2535 if let Some(seed) = seed_props
2536 && let Some(var) = &n.variable
2537 && let Some(var_seed) = seed.get(var)
2538 {
2539 for (k, v) in var_seed {
2540 props.entry(k.clone()).or_insert_with(|| v.clone());
2541 }
2542 }
2543
2544 let schema = self.storage.schema_manager().schema();
2545
2546 // Strict schema: reject undeclared labels.
2547 if self.config.strict_schema {
2548 for label_name in &n.labels {
2549 if schema.get_label_case_insensitive(label_name).is_none() {
2550 return Err(anyhow!(
2551 "Label '{}' is not defined in the schema \
2552 (strict_schema is enabled). \
2553 Declare it with db.schema().label(...).apply() first.",
2554 label_name
2555 ));
2556 }
2557 }
2558 }
2559
2560 // VID generation is label-independent. Pull from the
2561 // per-tx reservoir if set (amortizes the global
2562 // IdAllocator mutex), else fall back to the direct
2563 // per-VID path.
2564 let new_vid = match &self.id_reservoir {
2565 Some(r) => r.next_vid().await?,
2566 None => writer.next_vid().await?,
2567 };
2568
2569 // Enrich with generated columns only for known labels
2570 for label_name in &n.labels {
2571 if schema.get_label_case_insensitive(label_name).is_some() {
2572 self.enrich_properties_with_generated_columns(
2573 label_name,
2574 &mut props,
2575 prop_manager,
2576 params,
2577 ctx,
2578 )
2579 .await?;
2580 }
2581 }
2582
2583 // Validate/coerce against declared types AFTER enrichment, so
2584 // a type mismatch is rejected here rather than silently nulled
2585 // (and the row dropped) at flush — issue #68.
2586 let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2587
2588 // Insert vertex and get back final properties (includes auto-generated embeddings)
2589 let final_props = writer
2590 .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2591 .await?;
2592
2593 // Build node object with final properties (includes embeddings)
2594 if let Some(var) = &n.variable {
2595 let mut obj = HashMap::new();
2596 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2597 let labels_list: Vec<Value> =
2598 n.labels.iter().map(|l| Value::String(l.clone())).collect();
2599 obj.insert("_labels".to_string(), Value::List(labels_list));
2600 for (k, v) in &final_props {
2601 obj.insert(k.clone(), v.clone());
2602 }
2603 // Store node as a Map with _vid, matching MATCH behavior
2604 row.insert(var.clone(), Value::Map(obj));
2605 }
2606 vid = Some(new_vid);
2607 }
2608
2609 let current_vid = vid.unwrap();
2610
2611 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2612 rel_pending.take()
2613 && let Some(src) = prev_vid
2614 {
2615 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2616
2617 if !is_rel_bound {
2618 let mut rel_props = HashMap::new();
2619 if let Some(expr) = rel_props_expr {
2620 let val = self
2621 .evaluate_expr(&expr, row, prop_manager, params, ctx)
2622 .await?;
2623 if let Value::Map(map) = val {
2624 rel_props.extend(map);
2625 }
2626 }
2627 // Validate/coerce edge properties against the declared
2628 // edge-type schema before storing — issue #68.
2629 let edge_schema = self.storage.schema_manager().schema();
2630 let rel_props = Self::coerce_and_validate_props(
2631 rel_props,
2632 &edge_schema,
2633 std::slice::from_ref(&type_name),
2634 )?;
2635 let eid = match &self.id_reservoir {
2636 Some(r) => r.next_eid().await?,
2637 None => writer.next_eid(type_id).await?,
2638 };
2639
2640 // For incoming edges like (a)<-[:R]-(b), swap so the edge points b -> a
2641 let (edge_src, edge_dst) = match dir {
2642 Direction::Incoming => (current_vid, src),
2643 _ => (src, current_vid),
2644 };
2645
2646 let store_props = !rel_var.is_empty();
2647 let user_props = if store_props {
2648 rel_props.clone()
2649 } else {
2650 HashMap::new()
2651 };
2652
2653 writer
2654 .insert_edge(
2655 edge_src,
2656 edge_dst,
2657 type_id,
2658 eid,
2659 rel_props,
2660 Some(type_name.clone()),
2661 tx_l0,
2662 )
2663 .await?;
2664
2665 // Edge type name is now stored by insert_edge
2666
2667 if store_props {
2668 let mut edge_map = HashMap::new();
2669 edge_map.insert(
2670 "_eid".to_string(),
2671 Value::Int(eid.as_u64() as i64),
2672 );
2673 edge_map.insert(
2674 "_src".to_string(),
2675 Value::Int(edge_src.as_u64() as i64),
2676 );
2677 edge_map.insert(
2678 "_dst".to_string(),
2679 Value::Int(edge_dst.as_u64() as i64),
2680 );
2681 edge_map
2682 .insert("_type".to_string(), Value::Int(type_id as i64));
2683 // Include user properties so downstream RETURN sees them
2684 for (k, v) in user_props {
2685 edge_map.insert(k, v);
2686 }
2687 row.insert(rel_var, Value::Map(edge_map));
2688 }
2689 }
2690 }
2691 prev_vid = Some(current_vid);
2692 }
2693 PatternElement::Relationship(r) => {
2694 if r.types.len() != 1 {
2695 return Err(anyhow!(
2696 "CREATE relationship must specify exactly one type"
2697 ));
2698 }
2699 let type_name = &r.types[0];
2700 let type_id = if self.config.strict_schema {
2701 let schema = self.storage.schema_manager().schema();
2702 schema
2703 .edge_type_id_by_name_case_insensitive(type_name)
2704 .ok_or_else(|| {
2705 anyhow!(
2706 "Edge type '{}' is not defined in the schema \
2707 (strict_schema is enabled). \
2708 Declare it with db.schema().edge_type(...).apply() first.",
2709 type_name
2710 )
2711 })?
2712 } else {
2713 // Schemaless: get or assign edge type ID (bit 31 = 1 for dynamic).
2714 self.storage
2715 .schema_manager()
2716 .get_or_assign_edge_type_id(type_name)
2717 };
2718
2719 rel_pending = Some((
2720 r.variable.clone().unwrap_or_default(),
2721 type_id,
2722 type_name.clone(),
2723 r.properties.clone(),
2724 r.direction.clone(),
2725 ));
2726 }
2727 PatternElement::Parenthesized { .. } => {
2728 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2729 }
2730 }
2731 }
2732 }
2733 Ok(())
2734 }
2735
2736 /// Rejects structural values (maps, nodes, edges, paths, nested lists) in a property.
2737 ///
2738 /// These are never valid OpenCypher property values regardless of the declared column
2739 /// type. A `CypherValue` column is the sole exception and is handled by the caller
2740 /// before this is reached.
2741 ///
2742 /// # Errors
2743 /// Returns an error if `val` is a map/node/edge/path, or a list containing one.
2744 fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2745 match val {
2746 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2747 anyhow::bail!(
2748 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2749 prop_name
2750 );
2751 }
2752 Value::List(items) => {
2753 for item in items {
2754 if matches!(
2755 item,
2756 Value::Map(_)
2757 | Value::Node(_)
2758 | Value::Edge(_)
2759 | Value::Path(_)
2760 | Value::List(_)
2761 ) {
2762 anyhow::bail!(
2763 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2764 prop_name
2765 );
2766 }
2767 }
2768 }
2769 _ => {}
2770 }
2771 Ok(())
2772 }
2773
2774 /// Validates and coerces `val` against the declared schema type for `prop_name`.
2775 ///
2776 /// Returns the value to actually persist. Beyond the structural checks in
2777 /// [`Self::validate_structural_property_value`], this compares the value against the
2778 /// column's declared `DataType` and:
2779 ///
2780 /// - returns it unchanged when directly storable (including the intentional
2781 /// `Int`→`Float`/`Int32` and `Temporal`→`Timestamp` widenings);
2782 /// - coerces a `Value::String` written into a `Date`/`Time`/`DateTime`/`Duration`
2783 /// column into the proper `Temporal` value, using the same parser as the Cypher
2784 /// `date()`/`time()`/`datetime()`/`duration()` constructors;
2785 /// - otherwise returns an error, so a type mismatch is surfaced at the call site
2786 /// rather than silently nulled — and the row dropped at flush. See issue #68.
2787 ///
2788 /// Undeclared (schemaless) properties and `CypherValue` columns keep their permissive
2789 /// behavior.
2790 ///
2791 /// # Errors
2792 /// Returns an error if the value's type is incompatible with the declared column type,
2793 /// or if a string destined for a temporal column is not a valid temporal literal.
2794 fn coerce_and_validate_property_value(
2795 prop_name: &str,
2796 val: Value,
2797 schema: &uni_common::core::schema::Schema,
2798 labels: &[String],
2799 ) -> Result<Value> {
2800 use uni_common::core::schema::DataType;
2801
2802 // Resolve the declared type from the first label that declares this property.
2803 let declared = labels.iter().find_map(|label| {
2804 schema
2805 .properties
2806 .get(label)
2807 .and_then(|props| props.get(prop_name))
2808 .map(|meta| &meta.r#type)
2809 });
2810
2811 // CypherValue columns accept any value (including maps) — skip all checks.
2812 if matches!(declared, Some(DataType::CypherValue)) {
2813 return Ok(val);
2814 }
2815
2816 let Some(dt) = declared else {
2817 // Schemaless property: reject structural values (maps/nodes/edges/paths and
2818 // lists containing them), otherwise store as-is.
2819 Self::validate_structural_property_value(prop_name, &val)?;
2820 return Ok(val);
2821 };
2822
2823 // Directly storable: scalars, the intentional `Int`→`Float`/`Int32` and
2824 // `Temporal`→`Timestamp` widenings, declared composite columns (`Map`/`List`/
2825 // `Vector`) receiving their matching value, and `Null` (always accepted).
2826 if dt.accepts(&val) {
2827 return Ok(val);
2828 }
2829
2830 // Known-safe coercion: a string into a temporal column is parsed as if it had
2831 // been wrapped in the matching Cypher temporal constructor.
2832 if matches!(val, Value::String(_)) {
2833 let ctor = match dt {
2834 DataType::DateTime => Some("DATETIME"),
2835 DataType::Date => Some("DATE"),
2836 DataType::Time => Some("TIME"),
2837 DataType::Duration => Some("DURATION"),
2838 _ => None,
2839 };
2840 if let Some(name) = ctor {
2841 return uni_query_functions::datetime::eval_datetime_function(
2842 name,
2843 std::slice::from_ref(&val),
2844 )
2845 .map_err(|e| {
2846 anyhow!(
2847 "TypeError: property '{}' is declared {:?} but the string value could \
2848 not be parsed as a {} literal: {}",
2849 prop_name,
2850 dt,
2851 name,
2852 e
2853 )
2854 });
2855 }
2856 }
2857
2858 // Not storable and not coercible. Prefer the structural message when the value
2859 // is itself structural (e.g. a map into a scalar column), preserving prior
2860 // behavior; otherwise report the scalar type mismatch.
2861 Self::validate_structural_property_value(prop_name, &val)?;
2862 anyhow::bail!(
2863 "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
2864 prop_name,
2865 dt,
2866 value_type_name(&val)
2867 );
2868 }
2869
2870 /// Coerces and validates every property in `props` against the declared types for `labels`.
2871 ///
2872 /// Applies [`Self::coerce_and_validate_property_value`] to each entry, returning the map
2873 /// with known-safe coercions applied. Use this at every user-facing CREATE/SET write site
2874 /// before handing properties to the writer, so a type mismatch is rejected up front rather
2875 /// than silently nulled — and the row dropped — at flush (issue #68).
2876 ///
2877 /// # Errors
2878 /// Returns an error on the first property whose value is incompatible with its declared type.
2879 fn coerce_and_validate_props(
2880 props: HashMap<String, Value>,
2881 schema: &uni_common::core::schema::Schema,
2882 labels: &[String],
2883 ) -> Result<HashMap<String, Value>> {
2884 let mut out = HashMap::with_capacity(props.len());
2885 for (k, v) in props {
2886 let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
2887 out.insert(k, cv);
2888 }
2889 Ok(out)
2890 }
2891
2892 #[expect(clippy::too_many_arguments)]
2893 pub(crate) async fn execute_set_items_locked(
2894 &self,
2895 items: &[SetItem],
2896 row: &mut HashMap<String, Value>,
2897 writer: &Writer,
2898 prop_manager: &PropertyManager,
2899 params: &HashMap<String, Value>,
2900 ctx: Option<&QueryContext>,
2901 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2902 prefetched: &Prefetch,
2903 ) -> Result<()> {
2904 // Coalesce SetItem::Property items by target so we do ONE read + ONE
2905 // write per (variable, target) instead of one read-modify-write cycle
2906 // per item. For an UPDATE that sets N properties on the same vertex
2907 // (e.g. the ingest hotpath `SET n.frequency = ..., n.last_seen = ...,
2908 // n.confidence = ...`), this collapses N redundant
2909 // `get_all_vertex_props_with_ctx` + `insert_vertex_with_labels` cycles
2910 // into one. See profile_test.rs `diag_72_set_data_scale_with_hnsw` for
2911 // the measurement, and the plan in
2912 // /home/rohit/.claude/plans/plan-and-implement-a-valiant-flame.md
2913 // for the rationale.
2914 //
2915 // RHS evaluation order is preserved: we evaluate each RHS inline and
2916 // update the row binding immediately, so a later SetItem on the same
2917 // variable that reads `n.<earlier-prop>` sees the new value.
2918 //
2919 // Non-Property variants (Labels, Variable, VariablePlus) are less
2920 // common and have lower payoff; before processing one, we flush any
2921 // pending updates for the same variable so it sees the latest L0
2922 // state and ordering semantics are preserved.
2923 let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
2924 let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
2925
2926 for item in items {
2927 match item {
2928 SetItem::Property { expr, value } => {
2929 if let Expr::Property(var_expr, prop_name) = expr
2930 && let Expr::Variable(var_name) = &**var_expr
2931 && let Some(node_val) = row.get(var_name)
2932 {
2933 if let Ok(vid) = Self::vid_from_value(node_val) {
2934 reject_if_ephemeral_vid(vid)?;
2935 let labels =
2936 Self::extract_labels_from_node(node_val).unwrap_or_default();
2937 let schema = self.storage.schema_manager().schema().clone();
2938
2939 // Lazy one-time read. Always read the full row
2940 // (preserves CRDT merge + constraint validation
2941 // + scan-side L0 visibility). The
2942 // partial-lance-writes optimization happens
2943 // PURELY AT FLUSH TIME via the per-VID
2944 // `vertex_partial_keys` set tracked in L0 — so
2945 // L0 holds the full row, scans see the full
2946 // row, and Lance only receives the touched
2947 // columns. Generated-column-bearing labels
2948 // ride the partial path too (Round 12 §C):
2949 // `enrich_properties_with_generated_columns`
2950 // runs at flush time over the merged-in-L0
2951 // full row, and the produced generator keys
2952 // are appended to `touched` so they land in
2953 // the MergeInsert source.
2954 if !pending_v.contains_key(var_name) {
2955 let storage_cfg = &self.storage.config;
2956 let partial = storage_cfg.partial_lance_writes;
2957 let read = read_vertex_props_with_prefetch(
2958 vid,
2959 prefetched,
2960 prop_manager,
2961 ctx,
2962 )
2963 .await?;
2964 pending_v.insert(
2965 var_name.clone(),
2966 PendingVertexSet {
2967 vid,
2968 labels: labels.clone(),
2969 props: read,
2970 partial,
2971 touched: HashSet::new(),
2972 },
2973 );
2974 }
2975
2976 let val = self
2977 .evaluate_expr(value, row, prop_manager, params, ctx)
2978 .await?;
2979 let val = Self::coerce_and_validate_property_value(
2980 prop_name, val, &schema, &labels,
2981 )?;
2982
2983 let pv = pending_v
2984 .get_mut(var_name)
2985 .expect("inserted above when absent");
2986 pv.props.insert(prop_name.clone(), val.clone());
2987 if pv.partial {
2988 pv.touched.insert(prop_name.clone());
2989 }
2990
2991 // Update the row binding so subsequent RHS sees the new value.
2992 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
2993 node_map.insert(prop_name.clone(), val);
2994 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
2995 node.properties.insert(prop_name.clone(), val);
2996 }
2997 } else if let Value::Map(map) = node_val
2998 && map.get("_eid").is_some_and(|v| !v.is_null())
2999 && map.get("_src").is_some_and(|v| !v.is_null())
3000 && map.get("_dst").is_some_and(|v| !v.is_null())
3001 && (map.get("_type").is_some_and(|v| !v.is_null())
3002 || map.get("_type_name").is_some_and(|v| !v.is_null()))
3003 {
3004 let ei = self.extract_edge_identity(map)?;
3005 reject_if_ephemeral_eid(ei.eid)?;
3006 let schema = self.storage.schema_manager().schema().clone();
3007 // Handle _type as either String or Int (Int from CREATE, String
3008 // from queries). UNWIND on VLP edge lists emits `_type_name`
3009 // instead of `_type`; accept either.
3010 let type_val = map.get("_type").or_else(|| map.get("_type_name"));
3011 let edge_type_name = match type_val {
3012 Some(Value::String(s)) => s.clone(),
3013 Some(Value::Int(id)) => schema
3014 .edge_type_name_by_id_unified(*id as u32)
3015 .unwrap_or_else(|| format!("EdgeType{}", id)),
3016 _ => String::new(),
3017 };
3018
3019 if !pending_e.contains_key(var_name) {
3020 let initial = read_edge_props_with_prefetch(
3021 ei.eid,
3022 prefetched,
3023 prop_manager,
3024 ctx,
3025 )
3026 .await?;
3027 let partial = self.storage.config.partial_lance_writes;
3028 pending_e.insert(
3029 var_name.clone(),
3030 PendingEdgeSet {
3031 src: ei.src,
3032 dst: ei.dst,
3033 edge_type_id: ei.edge_type_id,
3034 eid: ei.eid,
3035 edge_type_name: edge_type_name.clone(),
3036 props: initial,
3037 partial,
3038 touched: HashSet::new(),
3039 },
3040 );
3041 }
3042
3043 let val = self
3044 .evaluate_expr(value, row, prop_manager, params, ctx)
3045 .await?;
3046 let val = Self::coerce_and_validate_property_value(
3047 prop_name,
3048 val,
3049 &schema,
3050 std::slice::from_ref(&edge_type_name),
3051 )?;
3052
3053 let pe = pending_e
3054 .get_mut(var_name)
3055 .expect("inserted above when absent");
3056 pe.props.insert(prop_name.clone(), val.clone());
3057 if pe.partial {
3058 pe.touched.insert(prop_name.clone());
3059 }
3060
3061 // Update the row object so subsequent RHS sees the new value.
3062 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3063 edge_map.insert(prop_name.clone(), val);
3064 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3065 edge.properties.insert(prop_name.clone(), val);
3066 }
3067 } else if let Value::Edge(edge) = node_val {
3068 // Handle Value::Edge directly (when traverse returns Edge objects).
3069 reject_if_ephemeral_eid(edge.eid)?;
3070 let eid = edge.eid;
3071 let src = edge.src;
3072 let dst = edge.dst;
3073 let edge_type_name = edge.edge_type.clone();
3074 let etype =
3075 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
3076 let schema = self.storage.schema_manager().schema().clone();
3077
3078 if !pending_e.contains_key(var_name) {
3079 let initial = read_edge_props_with_prefetch(
3080 eid,
3081 prefetched,
3082 prop_manager,
3083 ctx,
3084 )
3085 .await?;
3086 let partial = self.storage.config.partial_lance_writes;
3087 pending_e.insert(
3088 var_name.clone(),
3089 PendingEdgeSet {
3090 src,
3091 dst,
3092 edge_type_id: etype,
3093 eid,
3094 edge_type_name: edge_type_name.clone(),
3095 props: initial,
3096 partial,
3097 touched: HashSet::new(),
3098 },
3099 );
3100 }
3101
3102 let val = self
3103 .evaluate_expr(value, row, prop_manager, params, ctx)
3104 .await?;
3105 let val = Self::coerce_and_validate_property_value(
3106 prop_name,
3107 val,
3108 &schema,
3109 std::slice::from_ref(&edge_type_name),
3110 )?;
3111
3112 let pe = pending_e
3113 .get_mut(var_name)
3114 .expect("inserted above when absent");
3115 pe.props.insert(prop_name.clone(), val.clone());
3116 if pe.partial {
3117 pe.touched.insert(prop_name.clone());
3118 }
3119
3120 // Update the row object so subsequent RHS sees the new value.
3121 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3122 edge.properties.insert(prop_name.clone(), val);
3123 }
3124 }
3125 }
3126 }
3127 SetItem::Labels { variable, labels } => {
3128 // Flush any pending writes for this var so the Labels op
3129 // sees latest L0 state. Other variables' pending writes
3130 // can keep waiting (they're independent).
3131 self.flush_pending_var(
3132 variable,
3133 &mut pending_v,
3134 &mut pending_e,
3135 writer,
3136 prop_manager,
3137 params,
3138 ctx,
3139 tx_l0,
3140 prefetched,
3141 )
3142 .await?;
3143
3144 if let Some(node_val) = row.get(variable)
3145 && let Ok(vid) = Self::vid_from_value(node_val)
3146 {
3147 reject_if_ephemeral_vid(vid)?;
3148 let registry = self
3149 .procedure_registry
3150 .as_ref()
3151 .and_then(|pr| pr.plugin_registry());
3152 reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
3153
3154 // Get current labels from node value
3155 let current_labels =
3156 Self::extract_labels_from_node(node_val).unwrap_or_default();
3157
3158 // Determine new labels to add (skip duplicates)
3159 let labels_to_add: Vec<_> = labels
3160 .iter()
3161 .filter(|l| !current_labels.contains(l))
3162 .cloned()
3163 .collect();
3164
3165 if !labels_to_add.is_empty() {
3166 // Resolve the FULL new label set and write it to the
3167 // TRANSACTION buffer (so the change is transactional
3168 // and OCC-conflictable), falling back to the context
3169 // (main) L0 for non-transactional callers. Replace
3170 // semantics via `set_vertex_labels`.
3171 let mut new_labels = current_labels;
3172 new_labels.extend(labels_to_add);
3173 if let Some(ctx) = ctx {
3174 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3175 l0.write().set_vertex_labels(vid, &new_labels);
3176 }
3177
3178 // Update the node value in the row with the new labels.
3179 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3180 let labels_list =
3181 new_labels.into_iter().map(Value::String).collect();
3182 obj.insert("_labels".to_string(), Value::List(labels_list));
3183 }
3184 }
3185 }
3186 }
3187 SetItem::Variable { variable, value }
3188 | SetItem::VariablePlus { variable, value } => {
3189 // Flush this var's pending writes first so the
3190 // replace/merge op sees them as latest L0 state.
3191 self.flush_pending_var(
3192 variable,
3193 &mut pending_v,
3194 &mut pending_e,
3195 writer,
3196 prop_manager,
3197 params,
3198 ctx,
3199 tx_l0,
3200 prefetched,
3201 )
3202 .await?;
3203
3204 let replace = matches!(item, SetItem::Variable { .. });
3205 let op_str = if replace { "=" } else { "+=" };
3206
3207 // SET n = expr / SET n += expr — null target from OPTIONAL MATCH is a silent no-op
3208 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
3209 continue;
3210 }
3211 let rhs = self
3212 .evaluate_expr(value, row, prop_manager, params, ctx)
3213 .await?;
3214 let new_props =
3215 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
3216 anyhow!(
3217 "SET {} {} expr: right-hand side must evaluate to a map, \
3218 node, or relationship",
3219 variable,
3220 op_str
3221 )
3222 })?;
3223 self.apply_properties_to_entity(
3224 variable,
3225 new_props,
3226 replace,
3227 row,
3228 writer,
3229 prop_manager,
3230 params,
3231 ctx,
3232 tx_l0,
3233 prefetched,
3234 )
3235 .await?;
3236 }
3237 }
3238 }
3239
3240 // Flush all remaining coalesced writes — one writer call per target.
3241 // Partial entries (no generated columns) call
3242 // `Writer::insert_vertex_partial_full` so L0 holds the FULL row
3243 // but the touched-keys hint drives a MergeInsert at flush. Full
3244 // entries continue through the legacy
3245 // `insert_vertex_with_labels` (Append) path with
3246 // generated-column enrichment.
3247 for (_var_name, mut pv) in pending_v {
3248 if pv.partial {
3249 // Round 12 §C: run the generator enrichment over the
3250 // merged-in-L0 full row, then add the produced generator
3251 // keys to `touched` so they ride the MergeInsert source.
3252 // Idempotent — generators always recompute against the
3253 // post-merge property map.
3254 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3255 for label_name in &pv.labels {
3256 self.enrich_properties_with_generated_columns(
3257 label_name,
3258 &mut pv.props,
3259 prop_manager,
3260 params,
3261 ctx,
3262 )
3263 .await?;
3264 }
3265 for k in pv.props.keys() {
3266 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3267 pv.touched.insert(k.clone());
3268 }
3269 }
3270 writer
3271 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3272 .await?;
3273 } else {
3274 for label_name in &pv.labels {
3275 self.enrich_properties_with_generated_columns(
3276 label_name,
3277 &mut pv.props,
3278 prop_manager,
3279 params,
3280 ctx,
3281 )
3282 .await?;
3283 }
3284 let _ = writer
3285 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3286 .await?;
3287 }
3288 }
3289 for (_var_name, pe) in pending_e {
3290 if pe.partial {
3291 writer
3292 .insert_edge_partial_full(
3293 pe.src,
3294 pe.dst,
3295 pe.edge_type_id,
3296 pe.eid,
3297 pe.props,
3298 Some(pe.edge_type_name),
3299 pe.touched,
3300 tx_l0,
3301 )
3302 .await?;
3303 } else {
3304 writer
3305 .insert_edge(
3306 pe.src,
3307 pe.dst,
3308 pe.edge_type_id,
3309 pe.eid,
3310 pe.props,
3311 Some(pe.edge_type_name),
3312 tx_l0,
3313 )
3314 .await?;
3315 }
3316 }
3317
3318 Ok(())
3319 }
3320
3321 /// Flush pending SET state for a single variable to the writer.
3322 ///
3323 /// Called from the SET loop when about to process a Labels /
3324 /// Variable / VariablePlus item on `var`, so the subsequent op
3325 /// sees latest L0 state and ordering is preserved.
3326 #[expect(clippy::too_many_arguments)]
3327 async fn flush_pending_var(
3328 &self,
3329 var: &str,
3330 pending_v: &mut HashMap<String, PendingVertexSet>,
3331 pending_e: &mut HashMap<String, PendingEdgeSet>,
3332 writer: &Writer,
3333 prop_manager: &PropertyManager,
3334 _params: &HashMap<String, Value>,
3335 ctx: Option<&QueryContext>,
3336 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3337 _prefetched: &Prefetch,
3338 ) -> Result<()> {
3339 if let Some(mut pv) = pending_v.remove(var) {
3340 if pv.partial {
3341 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
3342 for label_name in &pv.labels {
3343 self.enrich_properties_with_generated_columns(
3344 label_name,
3345 &mut pv.props,
3346 prop_manager,
3347 _params,
3348 ctx,
3349 )
3350 .await?;
3351 }
3352 for k in pv.props.keys() {
3353 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
3354 pv.touched.insert(k.clone());
3355 }
3356 }
3357 writer
3358 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
3359 .await?;
3360 } else {
3361 for label_name in &pv.labels {
3362 self.enrich_properties_with_generated_columns(
3363 label_name,
3364 &mut pv.props,
3365 prop_manager,
3366 _params,
3367 ctx,
3368 )
3369 .await?;
3370 }
3371 let _ = writer
3372 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
3373 .await?;
3374 }
3375 }
3376 if let Some(pe) = pending_e.remove(var) {
3377 if pe.partial {
3378 writer
3379 .insert_edge_partial_full(
3380 pe.src,
3381 pe.dst,
3382 pe.edge_type_id,
3383 pe.eid,
3384 pe.props,
3385 Some(pe.edge_type_name),
3386 pe.touched,
3387 tx_l0,
3388 )
3389 .await?;
3390 } else {
3391 writer
3392 .insert_edge(
3393 pe.src,
3394 pe.dst,
3395 pe.edge_type_id,
3396 pe.eid,
3397 pe.props,
3398 Some(pe.edge_type_name),
3399 tx_l0,
3400 )
3401 .await?;
3402 }
3403 }
3404 Ok(())
3405 }
3406
3407 /// Execute REMOVE clause items (property removal or label removal).
3408 ///
3409 /// Property removals are batched per variable to avoid stale reads: when
3410 /// multiple properties of the same entity are removed in one REMOVE clause,
3411 /// we read from storage once, null all specified properties, and write back
3412 /// once. This prevents the second removal from reading stale data that
3413 /// doesn't reflect the first removal's L0 write.
3414 #[expect(clippy::too_many_arguments)]
3415 pub(crate) async fn execute_remove_items_locked(
3416 &self,
3417 items: &[RemoveItem],
3418 row: &mut HashMap<String, Value>,
3419 writer: &Writer,
3420 prop_manager: &PropertyManager,
3421 ctx: Option<&QueryContext>,
3422 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3423 prefetched: &Prefetch,
3424 ) -> Result<()> {
3425 // Collect property names to remove, grouped by variable.
3426 // Use Vec<(String, Vec<String>)> to preserve insertion order.
3427 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
3428
3429 for item in items {
3430 match item {
3431 RemoveItem::Property(expr) => {
3432 if let Expr::Property(var_expr, prop_name) = expr
3433 && let Expr::Variable(var_name) = &**var_expr
3434 {
3435 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
3436 entry.1.push(prop_name.clone());
3437 } else {
3438 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
3439 }
3440 }
3441 }
3442 RemoveItem::Labels { variable, labels } => {
3443 self.execute_remove_labels(variable, labels, row, ctx)?;
3444 }
3445 }
3446 }
3447
3448 // Execute batched property removals per variable.
3449 for (var_name, prop_names) in &prop_removals {
3450 let Some(node_val) = row.get(var_name) else {
3451 continue;
3452 };
3453
3454 if let Ok(vid) = Self::vid_from_value(node_val) {
3455 // Vertex property removal
3456 let mut props =
3457 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
3458
3459 // Only write back if at least one property actually exists
3460 let removed_count = prop_names
3461 .iter()
3462 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3463 .count();
3464 let any_exist = removed_count > 0;
3465 if any_exist {
3466 writer.track_properties_removed(removed_count, tx_l0);
3467 for prop_name in prop_names {
3468 props.insert(prop_name.clone(), Value::Null);
3469 }
3470 }
3471 // Compute effective properties (post-removal) for _all_props
3472 let effective: HashMap<String, Value> = props
3473 .iter()
3474 .filter(|(_, v)| !v.is_null())
3475 .map(|(k, v)| (k.clone(), v.clone()))
3476 .collect();
3477 if any_exist {
3478 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3479 let _ = writer
3480 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
3481 .await?;
3482 }
3483
3484 // Update the row map: set removed props to Null
3485 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
3486 for prop_name in prop_names {
3487 node_map.insert(prop_name.clone(), Value::Null);
3488 }
3489 // Set _all_props to the complete effective property set
3490 node_map.insert("_all_props".to_string(), Value::Map(effective));
3491 }
3492 } else if let Value::Map(map) = node_val {
3493 // Edge property removal (map-encoded)
3494 // Check for non-null _eid to skip OPTIONAL MATCH null edges
3495 let mut edge_effective: Option<HashMap<String, Value>> = None;
3496 if map.get("_eid").is_some_and(|v| !v.is_null()) {
3497 let ei = self.extract_edge_identity(map)?;
3498 let mut props =
3499 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
3500 .await?;
3501
3502 let removed_count = prop_names
3503 .iter()
3504 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3505 .count();
3506 let any_exist = removed_count > 0;
3507 if any_exist {
3508 writer.track_properties_removed(removed_count, tx_l0);
3509 for prop_name in prop_names {
3510 props.insert(prop_name.to_string(), Value::Null);
3511 }
3512 }
3513 // Compute effective properties (post-removal) for _all_props
3514 edge_effective = Some(
3515 props
3516 .iter()
3517 .filter(|(_, v)| !v.is_null())
3518 .map(|(k, v)| (k.clone(), v.clone()))
3519 .collect(),
3520 );
3521 if any_exist {
3522 let edge_type_name = map
3523 .get("_type")
3524 .and_then(|v| v.as_str())
3525 .map(|s| s.to_string())
3526 .or_else(|| {
3527 self.storage
3528 .schema_manager()
3529 .edge_type_name_by_id_unified(ei.edge_type_id)
3530 });
3531 writer
3532 .insert_edge(
3533 ei.src,
3534 ei.dst,
3535 ei.edge_type_id,
3536 ei.eid,
3537 props,
3538 edge_type_name,
3539 tx_l0,
3540 )
3541 .await?;
3542 }
3543 }
3544
3545 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3546 for prop_name in prop_names {
3547 edge_map.insert(prop_name.clone(), Value::Null);
3548 }
3549 if let Some(effective) = edge_effective {
3550 edge_map.insert("_all_props".to_string(), Value::Map(effective));
3551 }
3552 }
3553 } else if let Value::Edge(edge) = node_val {
3554 // Edge property removal (Value::Edge)
3555 let eid = edge.eid;
3556 let src = edge.src;
3557 let dst = edge.dst;
3558 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3559
3560 let mut props =
3561 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3562
3563 let removed_count = prop_names
3564 .iter()
3565 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3566 .count();
3567 if removed_count > 0 {
3568 writer.track_properties_removed(removed_count, tx_l0);
3569 for prop_name in prop_names {
3570 props.insert(prop_name.to_string(), Value::Null);
3571 }
3572 writer
3573 .insert_edge(
3574 src,
3575 dst,
3576 etype,
3577 eid,
3578 props,
3579 Some(edge.edge_type.clone()),
3580 tx_l0,
3581 )
3582 .await?;
3583 }
3584
3585 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3586 for prop_name in prop_names {
3587 edge.properties.insert(prop_name.to_string(), Value::Null);
3588 }
3589 }
3590 }
3591 }
3592
3593 Ok(())
3594 }
3595
3596 /// Execute label removal.
3597 pub(crate) fn execute_remove_labels(
3598 &self,
3599 variable: &str,
3600 labels: &[String],
3601 row: &mut HashMap<String, Value>,
3602 ctx: Option<&QueryContext>,
3603 ) -> Result<()> {
3604 if let Some(node_val) = row.get(variable)
3605 && let Ok(vid) = Self::vid_from_value(node_val)
3606 {
3607 reject_if_ephemeral_vid(vid)?;
3608 let registry = self
3609 .procedure_registry
3610 .as_ref()
3611 .and_then(|pr| pr.plugin_registry());
3612 reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3613
3614 // Get current labels from node value
3615 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3616
3617 // Determine which labels to actually remove (only those currently present)
3618 let labels_to_remove: Vec<_> = labels
3619 .iter()
3620 .filter(|l| current_labels.contains(l))
3621 .collect();
3622
3623 if !labels_to_remove.is_empty() {
3624 // Resolve the FULL remaining label set and write it to the
3625 // TRANSACTION buffer (transactional + OCC-conflictable), falling
3626 // back to the context (main) L0 for non-transactional callers.
3627 let remaining_labels: Vec<String> = current_labels
3628 .iter()
3629 .filter(|l| !labels_to_remove.contains(l))
3630 .cloned()
3631 .collect();
3632 if let Some(ctx) = ctx {
3633 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3634 l0.write().set_vertex_labels(vid, &remaining_labels);
3635 }
3636
3637 // Update the node value in the row with the remaining labels.
3638 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3639 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3640 obj.insert("_labels".to_string(), Value::List(labels_list));
3641 }
3642 }
3643 }
3644 Ok(())
3645 }
3646
3647 /// Resolve edge type ID for a Value::Edge, handling empty edge_type strings
3648 /// by looking up the type from the L0 buffer's edge endpoints.
3649 fn resolve_edge_type_id_for_edge(
3650 &self,
3651 edge: &crate::types::Edge,
3652 writer: &Writer,
3653 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3654 ) -> Result<u32> {
3655 if !edge.edge_type.is_empty() {
3656 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3657 }
3658 // Edge type name is empty (e.g., from anonymous MATCH patterns).
3659 // Look up the edge type ID from the L0 buffer's edge endpoints.
3660 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3661 return Ok(etype);
3662 }
3663 Err(anyhow!(
3664 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3665 edge.eid
3666 ))
3667 }
3668
3669 /// Execute DELETE clause for a single item (vertex, edge, path, or null).
3670 pub(crate) async fn execute_delete_item_locked(
3671 &self,
3672 val: &Value,
3673 detach: bool,
3674 writer: &Writer,
3675 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3676 ) -> Result<()> {
3677 match val {
3678 Value::Null => {
3679 // DELETE null is a no-op per OpenCypher spec
3680 }
3681 Value::Path(path) => {
3682 // Delete path edges first, then nodes
3683 for edge in &path.edges {
3684 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3685 writer
3686 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3687 .await?;
3688 }
3689 for node in &path.nodes {
3690 self.execute_delete_vertex(
3691 node.vid,
3692 detach,
3693 Some(node.labels.clone()),
3694 writer,
3695 tx_l0,
3696 )
3697 .await?;
3698 }
3699 }
3700 _ => {
3701 // Try Path reconstruction from Map first (Arrow loses Path type)
3702 if let Ok(path) = Path::try_from(val) {
3703 for edge in &path.edges {
3704 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3705 writer
3706 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3707 .await?;
3708 }
3709 for node in &path.nodes {
3710 self.execute_delete_vertex(
3711 node.vid,
3712 detach,
3713 Some(node.labels.clone()),
3714 writer,
3715 tx_l0,
3716 )
3717 .await?;
3718 }
3719 } else if let Ok(vid) = Self::vid_from_value(val) {
3720 let labels = Self::extract_labels_from_node(val);
3721 self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3722 .await?;
3723 } else if let Value::Map(map) = val {
3724 self.execute_delete_edge_from_map(map, writer, tx_l0)
3725 .await?;
3726 } else if let Value::Edge(edge) = val {
3727 reject_if_ephemeral_eid(edge.eid)?;
3728 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3729 let registry = self
3730 .procedure_registry
3731 .as_ref()
3732 .and_then(|pr| pr.plugin_registry());
3733 reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3734 writer
3735 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3736 .await?;
3737 }
3738 }
3739 }
3740 Ok(())
3741 }
3742
3743 /// Execute vertex deletion with optional detach.
3744 pub(crate) async fn execute_delete_vertex(
3745 &self,
3746 vid: Vid,
3747 detach: bool,
3748 labels: Option<Vec<String>>,
3749 writer: &Writer,
3750 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3751 ) -> Result<()> {
3752 reject_if_ephemeral_vid(vid)?;
3753 if let Some(ls) = labels.as_deref() {
3754 let registry = self
3755 .procedure_registry
3756 .as_ref()
3757 .and_then(|pr| pr.plugin_registry());
3758 reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3759 }
3760 if detach {
3761 self.detach_delete_vertex(vid, writer, tx_l0).await?;
3762 } else {
3763 self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3764 }
3765 writer.delete_vertex(vid, labels, tx_l0).await?;
3766 Ok(())
3767 }
3768
3769 /// Check that a vertex has no edges (required for non-DETACH DELETE).
3770 ///
3771 /// Loads the subgraph from storage, then excludes edges that have been
3772 /// tombstoned in the writer's L0 or the transaction's L0. This ensures
3773 /// edges deleted earlier in the same DELETE clause are properly excluded.
3774 pub(crate) async fn check_vertex_has_no_edges(
3775 &self,
3776 vid: Vid,
3777 writer: &Writer,
3778 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3779 ) -> Result<()> {
3780 let schema = self.storage.schema_manager().schema();
3781 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
3782
3783 // Collect tombstoned edge IDs from both the writer L0 and tx L0.
3784 let mut tombstoned_eids = std::collections::HashSet::new();
3785 {
3786 let writer_l0 = writer.l0_manager.get_current();
3787 let guard = writer_l0.read();
3788 for &eid in guard.tombstones.keys() {
3789 tombstoned_eids.insert(eid);
3790 }
3791 }
3792 if let Some(tx) = tx_l0 {
3793 let guard = tx.read();
3794 for &eid in guard.tombstones.keys() {
3795 tombstoned_eids.insert(eid);
3796 }
3797 }
3798
3799 let out_graph = self
3800 .storage
3801 .load_subgraph_cached(
3802 &[vid],
3803 &edge_type_ids,
3804 1,
3805 uni_store::runtime::Direction::Outgoing,
3806 Some(writer.l0_manager.get_current()),
3807 )
3808 .await?;
3809 let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3810
3811 let in_graph = self
3812 .storage
3813 .load_subgraph_cached(
3814 &[vid],
3815 &edge_type_ids,
3816 1,
3817 uni_store::runtime::Direction::Incoming,
3818 Some(writer.l0_manager.get_current()),
3819 )
3820 .await?;
3821 let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3822
3823 if has_out || has_in {
3824 return Err(anyhow!(
3825 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
3826 vid
3827 ));
3828 }
3829 Ok(())
3830 }
3831
3832 /// Execute edge deletion from a map representation.
3833 pub(crate) async fn execute_delete_edge_from_map(
3834 &self,
3835 map: &HashMap<String, Value>,
3836 writer: &Writer,
3837 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3838 ) -> Result<()> {
3839 // Check for non-null _eid to skip OPTIONAL MATCH null edges
3840 if map.get("_eid").is_some_and(|v| !v.is_null()) {
3841 let ei = self.extract_edge_identity(map)?;
3842 reject_if_ephemeral_eid(ei.eid)?;
3843 let registry = self
3844 .procedure_registry
3845 .as_ref()
3846 .and_then(|pr| pr.plugin_registry());
3847 reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
3848 writer
3849 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
3850 .await?;
3851 }
3852 Ok(())
3853 }
3854
3855 /// Build a scan plan node.
3856 ///
3857 /// - `label_id > 0`: schema label → `Scan` (fast, label-specific storage)
3858 /// - `label_id == 0` with labels: schemaless → `ScanMainByLabels` (main table + L0, filtered by label name)
3859 /// - `label_id == 0` without labels: unlabeled → `ScanAll`
3860 fn make_scan_plan(
3861 label_id: u16,
3862 labels: Vec<String>,
3863 variable: String,
3864 filter: Option<Expr>,
3865 ) -> LogicalPlan {
3866 if label_id > 0 {
3867 LogicalPlan::Scan {
3868 label_id,
3869 labels,
3870 variable,
3871 filter,
3872 optional: false,
3873 }
3874 } else if !labels.is_empty() {
3875 // Schemaless label: use ScanMainByLabels to filter by label name
3876 LogicalPlan::ScanMainByLabels {
3877 labels,
3878 variable,
3879 filter,
3880 optional: false,
3881 }
3882 } else {
3883 LogicalPlan::ScanAll {
3884 variable,
3885 filter,
3886 optional: false,
3887 }
3888 }
3889 }
3890
3891 /// Attach a new scan node to the running plan, using `CrossJoin` when the plan
3892 /// already contains prior operators.
3893 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
3894 if matches!(plan, LogicalPlan::Empty) {
3895 scan
3896 } else {
3897 LogicalPlan::CrossJoin {
3898 left: Box::new(plan),
3899 right: Box::new(scan),
3900 }
3901 }
3902 }
3903
3904 /// Resolve MERGE property map expressions against the current row context.
3905 ///
3906 /// MERGE patterns like `MERGE (city:City {name: person.bornIn})` contain
3907 /// property expressions that reference bound variables. These need to be
3908 /// evaluated to concrete literal values before being converted to filter
3909 /// expressions by `properties_to_expr()`.
3910 async fn resolve_merge_properties(
3911 &self,
3912 properties: &Option<Expr>,
3913 row: &HashMap<String, Value>,
3914 prop_manager: &PropertyManager,
3915 params: &HashMap<String, Value>,
3916 ctx: Option<&QueryContext>,
3917 ) -> Result<Option<Expr>> {
3918 let entries = match properties {
3919 Some(Expr::Map(entries)) => entries,
3920 other => return Ok(other.clone()),
3921 };
3922 let mut resolved = Vec::new();
3923 for (key, val_expr) in entries {
3924 if matches!(val_expr, Expr::Literal(_)) {
3925 resolved.push((key.clone(), val_expr.clone()));
3926 } else {
3927 let value = self
3928 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
3929 .await?;
3930 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
3931 }
3932 }
3933 Ok(Some(Expr::Map(resolved)))
3934 }
3935
3936 /// Convert a runtime Value back to an AST literal expression.
3937 fn value_to_literal_expr(value: &Value) -> Expr {
3938 match value {
3939 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
3940 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
3941 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
3942 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
3943 Value::Null => Expr::Literal(CypherLiteral::Null),
3944 Value::List(items) => {
3945 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
3946 }
3947 Value::Map(entries) => Expr::Map(
3948 entries
3949 .iter()
3950 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
3951 .collect(),
3952 ),
3953 _ => Expr::Literal(CypherLiteral::Null),
3954 }
3955 }
3956
3957 pub(crate) async fn execute_merge_match(
3958 &self,
3959 pattern: &Pattern,
3960 row: &HashMap<String, Value>,
3961 prop_manager: &PropertyManager,
3962 params: &HashMap<String, Value>,
3963 ctx: Option<&QueryContext>,
3964 ) -> Result<Vec<HashMap<String, Value>>> {
3965 // Construct a LogicalPlan for the MATCH part of MERGE
3966 let planner =
3967 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
3968
3969 // We need to construct a CypherQuery to use the planner's plan() method,
3970 // or we can manually construct the LogicalPlan.
3971 // Manual construction is safer as we don't have to round-trip through AST.
3972
3973 let mut plan = LogicalPlan::Empty;
3974 let mut vars_in_scope = Vec::new();
3975
3976 // Add existing bound variables from row to scope
3977 for key in row.keys() {
3978 vars_in_scope.push(key.clone());
3979 }
3980
3981 // Reconstruct Match logic from Planner (simplified for MERGE pattern)
3982 for path in &pattern.paths {
3983 let elements = &path.elements;
3984 let mut i = 0;
3985 while i < elements.len() {
3986 let part = &elements[i];
3987 match part {
3988 PatternElement::Node(n) => {
3989 let variable = n.variable.clone().unwrap_or_default();
3990
3991 // If variable is already bound in the input row, we filter
3992 let is_bound = !variable.is_empty() && row.contains_key(&variable);
3993
3994 if is_bound {
3995 // If bound, we must Scan this specific VID to start the chain
3996 // Extract VID from row
3997 let val = row.get(&variable).unwrap();
3998 let vid = Self::vid_from_value(val)?;
3999
4000 // In the new storage model, VIDs don't embed label info.
4001 // We get label from the node value if available, otherwise use 0 to scan all.
4002 let extracted_labels =
4003 Self::extract_labels_from_node(val).unwrap_or_default();
4004 let label_id = {
4005 let schema = self.storage.schema_manager().schema();
4006 extracted_labels
4007 .first()
4008 .and_then(|l| schema.label_id_by_name(l))
4009 .unwrap_or(0)
4010 };
4011
4012 let resolved_props = self
4013 .resolve_merge_properties(
4014 &n.properties,
4015 row,
4016 prop_manager,
4017 params,
4018 ctx,
4019 )
4020 .await?;
4021 let prop_filter =
4022 planner.properties_to_expr(&variable, &resolved_props);
4023
4024 // Create a filter expression for VID: variable._vid = vid
4025 // But our expression engine handles `Expr::Variable` as column.
4026 // We can inject a filter `id(variable) = vid` if we had `id()` function.
4027 // Or we use internal property `_vid`.
4028
4029 // Note: Scan supports `filter`.
4030 // We can manually construct an Expr::BinaryOp(Eq, Prop(var, _vid), Literal(vid))
4031
4032 let vid_filter = Expr::BinaryOp {
4033 left: Box::new(Expr::Property(
4034 Box::new(Expr::Variable(variable.clone())),
4035 "_vid".to_string(),
4036 )),
4037 op: BinaryOp::Eq,
4038 right: Box::new(Expr::Literal(CypherLiteral::Integer(
4039 vid.as_u64() as i64,
4040 ))),
4041 };
4042
4043 let combined_filter = if let Some(pf) = prop_filter {
4044 Some(Expr::BinaryOp {
4045 left: Box::new(vid_filter),
4046 op: BinaryOp::And,
4047 right: Box::new(pf),
4048 })
4049 } else {
4050 Some(vid_filter)
4051 };
4052
4053 let scan = Self::make_scan_plan(
4054 label_id,
4055 extracted_labels,
4056 variable.clone(),
4057 combined_filter,
4058 );
4059 plan = Self::attach_scan(plan, scan);
4060 } else {
4061 let label_id = if n.labels.is_empty() {
4062 // Unlabeled MERGE node: scan all nodes (label_id 0 → ScanAll)
4063 0
4064 } else {
4065 let label_name = &n.labels[0];
4066 let schema = self.storage.schema_manager().schema();
4067 if self.config.strict_schema {
4068 schema
4069 .get_label_case_insensitive(label_name)
4070 .map(|m| m.id)
4071 .ok_or_else(|| {
4072 anyhow!(
4073 "Label '{}' is not defined in the schema \
4074 (strict_schema is enabled). \
4075 Declare it with db.schema().label(...).apply() first.",
4076 label_name
4077 )
4078 })?
4079 } else {
4080 // Fall back to label_id 0 (any/schemaless) when not in schema.
4081 schema
4082 .get_label_case_insensitive(label_name)
4083 .map(|m| m.id)
4084 .unwrap_or(0)
4085 }
4086 };
4087
4088 let resolved_props = self
4089 .resolve_merge_properties(
4090 &n.properties,
4091 row,
4092 prop_manager,
4093 params,
4094 ctx,
4095 )
4096 .await?;
4097 let prop_filter =
4098 planner.properties_to_expr(&variable, &resolved_props);
4099 let scan = Self::make_scan_plan(
4100 label_id,
4101 n.labels.names().to_vec(),
4102 variable.clone(),
4103 prop_filter,
4104 );
4105 plan = Self::attach_scan(plan, scan);
4106
4107 // Add label filters when:
4108 // 1. Multiple labels with a known schema label: filter for
4109 // additional labels (Scan only scans by the first label).
4110 // 2. Schemaless labels (label_id = 0): ScanAll finds ALL
4111 // nodes, so we must filter to only those with the
4112 // specified label(s).
4113 if !n.labels.is_empty()
4114 && !variable.is_empty()
4115 && (label_id == 0 || n.labels.len() > 1)
4116 && let Some(label_filter) =
4117 planner.node_filter_expr(&variable, &n.labels, &None)
4118 {
4119 plan = LogicalPlan::Filter {
4120 input: Box::new(plan),
4121 predicate: label_filter,
4122 optional_variables: std::collections::HashSet::new(),
4123 };
4124 }
4125
4126 if !variable.is_empty() {
4127 vars_in_scope.push(variable.clone());
4128 }
4129 }
4130
4131 // Now look ahead for relationship
4132 i += 1;
4133 while i < elements.len() {
4134 if let PatternElement::Relationship(r) = &elements[i] {
4135 let target_node_part = &elements[i + 1];
4136 if let PatternElement::Node(n_target) = target_node_part {
4137 let schema = self.storage.schema_manager().schema();
4138 let mut edge_type_ids = Vec::new();
4139
4140 if r.types.is_empty() {
4141 return Err(anyhow!("MERGE edge must have a type"));
4142 } else if r.types.len() > 1 {
4143 return Err(anyhow!(
4144 "MERGE does not support multiple edge types"
4145 ));
4146 } else {
4147 let type_name = &r.types[0];
4148 let type_id = if self.config.strict_schema {
4149 let s = self.storage.schema_manager().schema();
4150 s.edge_type_id_by_name_case_insensitive(type_name)
4151 .ok_or_else(|| {
4152 anyhow!(
4153 "Edge type '{}' is not defined in the schema \
4154 (strict_schema is enabled).",
4155 type_name
4156 )
4157 })?
4158 } else {
4159 // Schemaless: assign new ID if not found.
4160 self.storage
4161 .schema_manager()
4162 .get_or_assign_edge_type_id(type_name)
4163 };
4164 edge_type_ids.push(type_id);
4165 }
4166
4167 // Resolve target label ID. For schemaless labels (not in the
4168 // schema), fall back to 0 which means "any label" in traversal.
4169 let target_label_id: u16 = if let Some(lbl) =
4170 n_target.labels.first()
4171 {
4172 schema
4173 .get_label_case_insensitive(lbl)
4174 .map(|m| m.id)
4175 .unwrap_or(0)
4176 } else if let Some(var) = &n_target.variable {
4177 if let Some(val) = row.get(var) {
4178 // In the new storage model, get labels from node value
4179 if let Some(labels) =
4180 Self::extract_labels_from_node(val)
4181 {
4182 if let Some(first_label) = labels.first() {
4183 schema
4184 .get_label_case_insensitive(first_label)
4185 .map(|m| m.id)
4186 .unwrap_or(0)
4187 } else {
4188 // Bound node with no labels — schemaless, any
4189 0
4190 }
4191 } else if Self::vid_from_value(val).is_ok() {
4192 // VID without label info — schemaless, any
4193 0
4194 } else {
4195 return Err(anyhow!(
4196 "Variable {} is not a node",
4197 var
4198 ));
4199 }
4200 } else {
4201 return Err(anyhow!(
4202 "MERGE pattern node must have a label or be a bound variable"
4203 ));
4204 }
4205 } else {
4206 return Err(anyhow!(
4207 "MERGE pattern node must have a label"
4208 ));
4209 };
4210
4211 let target_variable =
4212 n_target.variable.clone().unwrap_or_default();
4213 let source_variable = match &elements[i - 1] {
4214 PatternElement::Node(n) => {
4215 n.variable.clone().unwrap_or_default()
4216 }
4217 _ => String::new(),
4218 };
4219
4220 let is_variable_length = r.range.is_some();
4221 let type_name = &r.types[0];
4222
4223 // Use TraverseMainByType for schemaless edge types
4224 // (same as MATCH planner) so edge properties are loaded
4225 // correctly from storage + L0 via the adjacency map.
4226 // Regular Traverse only loads properties via
4227 // property_manager which doesn't handle schemaless types.
4228 let is_schemaless = edge_type_ids.iter().all(|id| {
4229 uni_common::core::edge_type::is_schemaless_edge_type(*id)
4230 });
4231
4232 if is_schemaless {
4233 plan = LogicalPlan::TraverseMainByType {
4234 type_names: vec![type_name.clone()],
4235 input: Box::new(plan),
4236 direction: r.direction.clone(),
4237 source_variable,
4238 target_variable: target_variable.clone(),
4239 step_variable: r.variable.clone(),
4240 min_hops: r
4241 .range
4242 .as_ref()
4243 .and_then(|r| r.min)
4244 .unwrap_or(1)
4245 as usize,
4246 max_hops: r
4247 .range
4248 .as_ref()
4249 .and_then(|r| r.max)
4250 .unwrap_or(1)
4251 as usize,
4252 optional: false,
4253 target_filter: None,
4254 path_variable: None,
4255 is_variable_length,
4256 optional_pattern_vars: std::collections::HashSet::new(),
4257 scope_match_variables: std::collections::HashSet::new(),
4258 edge_filter_expr: None,
4259 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4260 };
4261 } else {
4262 // Collect edge property names needed for MERGE filter
4263 let mut edge_props = std::collections::HashSet::new();
4264 if let Some(Expr::Map(entries)) = &r.properties {
4265 for (key, _) in entries {
4266 edge_props.insert(key.clone());
4267 }
4268 }
4269 plan = LogicalPlan::Traverse {
4270 input: Box::new(plan),
4271 edge_type_ids: edge_type_ids.clone(),
4272 direction: r.direction.clone(),
4273 source_variable,
4274 target_variable: target_variable.clone(),
4275 target_label_id,
4276 step_variable: r.variable.clone(),
4277 min_hops: r
4278 .range
4279 .as_ref()
4280 .and_then(|r| r.min)
4281 .unwrap_or(1)
4282 as usize,
4283 max_hops: r
4284 .range
4285 .as_ref()
4286 .and_then(|r| r.max)
4287 .unwrap_or(1)
4288 as usize,
4289 optional: false,
4290 target_filter: None,
4291 path_variable: None,
4292 edge_properties: edge_props,
4293 is_variable_length,
4294 optional_pattern_vars: std::collections::HashSet::new(),
4295 scope_match_variables: std::collections::HashSet::new(),
4296 edge_filter_expr: None,
4297 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4298 qpp_steps: None,
4299 };
4300 }
4301
4302 // Apply property filters for relationship
4303 if r.properties.is_some()
4304 && let Some(r_var) = &r.variable
4305 {
4306 let resolved_rel_props = self
4307 .resolve_merge_properties(
4308 &r.properties,
4309 row,
4310 prop_manager,
4311 params,
4312 ctx,
4313 )
4314 .await?;
4315 if let Some(prop_filter) =
4316 planner.properties_to_expr(r_var, &resolved_rel_props)
4317 {
4318 plan = LogicalPlan::Filter {
4319 input: Box::new(plan),
4320 predicate: prop_filter,
4321 optional_variables: std::collections::HashSet::new(
4322 ),
4323 };
4324 }
4325 }
4326
4327 // Apply property filters for target node if it was new
4328 if !target_variable.is_empty() {
4329 let resolved_target_props = self
4330 .resolve_merge_properties(
4331 &n_target.properties,
4332 row,
4333 prop_manager,
4334 params,
4335 ctx,
4336 )
4337 .await?;
4338 if let Some(prop_filter) = planner.properties_to_expr(
4339 &target_variable,
4340 &resolved_target_props,
4341 ) {
4342 plan = LogicalPlan::Filter {
4343 input: Box::new(plan),
4344 predicate: prop_filter,
4345 optional_variables: std::collections::HashSet::new(
4346 ),
4347 };
4348 }
4349 vars_in_scope.push(target_variable.clone());
4350 }
4351
4352 if let Some(sv) = &r.variable {
4353 vars_in_scope.push(sv.clone());
4354 }
4355 i += 2;
4356 } else {
4357 break;
4358 }
4359 } else {
4360 break;
4361 }
4362 }
4363 }
4364 _ => return Err(anyhow!("Pattern must start with a node")),
4365 }
4366 }
4367
4368 // Execute the plan to find all matches, then filter against bound variables in `row`.
4369 }
4370
4371 let db_matches = self
4372 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
4373 .await?;
4374
4375 // Keep only DB results that are consistent with the input row bindings.
4376 // Skip internal keys (starting with "__") as they are implementation
4377 // artifacts (e.g. __used_edges) and not user-visible variable bindings.
4378 // Also skip the empty-string key (""), which is the placeholder variable
4379 // for unnamed MERGE nodes — it may carry over from a prior MERGE clause
4380 // and must not constrain the current pattern's match.
4381 let final_matches = db_matches
4382 .into_iter()
4383 .filter(|db_match| {
4384 row.iter().all(|(key, val)| {
4385 if key.is_empty() || key.starts_with("__") {
4386 return true;
4387 }
4388 let Some(db_val) = db_match.get(key) else {
4389 return true;
4390 };
4391 if db_val == val {
4392 return true;
4393 }
4394 // Values differ -- treat as consistent if they represent the same VID
4395 matches!(
4396 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
4397 (Ok(v1), Ok(v2)) if v1 == v2
4398 )
4399 })
4400 })
4401 .map(|db_match| {
4402 let mut merged = row.clone();
4403 merged.extend(db_match);
4404 merged
4405 })
4406 .collect();
4407
4408 Ok(final_matches)
4409 }
4410
4411 /// Prepare a MERGE pattern for path variable binding.
4412 ///
4413 /// If any path in the pattern has a path variable (e.g., `MERGE p = (a)-[:R]->(b)`),
4414 /// unnamed relationships need internal variable names so that `execute_create_pattern`
4415 /// stores the edge data in the row for later path construction.
4416 ///
4417 /// Returns the (possibly modified) pattern and a list of temp variable names to clean up.
4418 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
4419 let has_path_vars = pattern
4420 .paths
4421 .iter()
4422 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
4423
4424 if !has_path_vars {
4425 return (pattern.clone(), Vec::new());
4426 }
4427
4428 let mut modified = pattern.clone();
4429 let mut temp_vars = Vec::new();
4430
4431 for path in &mut modified.paths {
4432 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
4433 continue;
4434 }
4435 for (idx, element) in path.elements.iter_mut().enumerate() {
4436 if let PatternElement::Relationship(r) = element
4437 && r.variable.as_ref().is_none_or(String::is_empty)
4438 {
4439 let temp_var = format!("__path_r_{}", idx);
4440 r.variable = Some(temp_var.clone());
4441 temp_vars.push(temp_var);
4442 }
4443 }
4444 }
4445
4446 (modified, temp_vars)
4447 }
4448
4449 /// Bind path variables in the result row based on the MERGE pattern.
4450 ///
4451 /// Walks each path in the pattern, collects node/edge values from the row
4452 /// by variable name, and constructs a `Value::Path`.
4453 fn bind_path_variables(
4454 pattern: &Pattern,
4455 row: &mut HashMap<String, Value>,
4456 temp_vars: &[String],
4457 ) {
4458 for path in &pattern.paths {
4459 let Some(path_var) = path.variable.as_ref() else {
4460 continue;
4461 };
4462 if path_var.is_empty() {
4463 continue;
4464 }
4465
4466 let mut nodes = Vec::new();
4467 let mut edges = Vec::new();
4468
4469 for element in &path.elements {
4470 match element {
4471 PatternElement::Node(n) => {
4472 if let Some(var) = &n.variable
4473 && let Some(val) = row.get(var)
4474 && let Some(node) = Self::value_to_node_for_path(val)
4475 {
4476 nodes.push(node);
4477 }
4478 }
4479 PatternElement::Relationship(r) => {
4480 if let Some(var) = &r.variable
4481 && let Some(val) = row.get(var)
4482 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
4483 {
4484 edges.push(edge);
4485 }
4486 }
4487 _ => {}
4488 }
4489 }
4490
4491 if !nodes.is_empty() {
4492 use uni_common::value::Path;
4493 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
4494 }
4495 }
4496
4497 // Clean up internal temp variables
4498 for var in temp_vars {
4499 row.remove(var);
4500 }
4501 }
4502
4503 /// Convert a Value (Map or Node) to a Node for path construction.
4504 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
4505 match val {
4506 Value::Node(n) => Some(n.clone()),
4507 Value::Map(map) => {
4508 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
4509 let labels = if let Some(Value::List(l)) = map.get("_labels") {
4510 l.iter()
4511 .filter_map(|v| {
4512 if let Value::String(s) = v {
4513 Some(s.clone())
4514 } else {
4515 None
4516 }
4517 })
4518 .collect()
4519 } else {
4520 vec![]
4521 };
4522 let properties: HashMap<String, Value> = map
4523 .iter()
4524 .filter(|(k, _)| !k.starts_with('_'))
4525 .map(|(k, v)| (k.clone(), v.clone()))
4526 .collect();
4527 Some(uni_common::value::Node {
4528 vid,
4529 labels,
4530 properties,
4531 })
4532 }
4533 _ => None,
4534 }
4535 }
4536
4537 /// Convert a Value (Map or Edge) to an Edge for path construction.
4538 fn value_to_edge_for_path(
4539 val: &Value,
4540 type_names: &[String],
4541 ) -> Option<uni_common::value::Edge> {
4542 match val {
4543 Value::Edge(e) => Some(e.clone()),
4544 Value::Map(map) => {
4545 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4546 let edge_type = map
4547 .get("_type_name")
4548 .and_then(|v| {
4549 if let Value::String(s) = v {
4550 Some(s.clone())
4551 } else {
4552 None
4553 }
4554 })
4555 .or_else(|| type_names.first().cloned())
4556 .unwrap_or_default();
4557 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4558 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4559 let properties: HashMap<String, Value> = map
4560 .iter()
4561 .filter(|(k, _)| !k.starts_with('_'))
4562 .map(|(k, v)| (k.clone(), v.clone()))
4563 .collect();
4564 Some(uni_common::value::Edge {
4565 eid,
4566 edge_type,
4567 src,
4568 dst,
4569 properties,
4570 })
4571 }
4572 _ => None,
4573 }
4574 }
4575}
4576
4577/// Read a vertex's full property map, preferring `prefetched` over a fresh
4578/// per-row `Backend::scan`.
4579///
4580/// `prefetched` is built once at the top of `apply_mutations` via
4581/// `prefetch_set_targets` / `prefetch_remove_targets` (mutation_common.rs).
4582/// On a hit, we layer in L0 from `ctx` so writes from earlier rows of the
4583/// same `apply_mutations` invocation (counter increments, same-VID
4584/// duplicates from UNWIND) take precedence — the prefetch only snapshots
4585/// storage state at SET entry. On a miss, fall back to the existing
4586/// per-row path; this preserves correctness for newly created VIDs,
4587/// schemaless rows, multi-label corner cases, and non-Mutation callers
4588/// that pass `&Prefetch::default()`.
4589pub(crate) async fn read_vertex_props_with_prefetch(
4590 vid: Vid,
4591 prefetched: &Prefetch,
4592 prop_manager: &PropertyManager,
4593 ctx: Option<&QueryContext>,
4594) -> Result<uni_common::Properties> {
4595 match prefetched.vertex.get(&vid).cloned() {
4596 Some(mut base) => {
4597 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4598 for (k, v) in l0 {
4599 base.insert(k, v);
4600 }
4601 }
4602 Ok(base)
4603 }
4604 None => Ok(prop_manager
4605 .get_all_vertex_props_with_ctx(vid, ctx)
4606 .await?
4607 .unwrap_or_default()),
4608 }
4609}
4610
4611/// Edge equivalent of [`read_vertex_props_with_prefetch`]. On a hit, layer
4612/// in L0 edge props so writes from earlier rows of the same
4613/// `apply_mutations` invocation take precedence. On a miss, fall back to
4614/// the per-EID storage path.
4615pub(crate) async fn read_edge_props_with_prefetch(
4616 eid: Eid,
4617 prefetched: &Prefetch,
4618 prop_manager: &PropertyManager,
4619 ctx: Option<&QueryContext>,
4620) -> Result<uni_common::Properties> {
4621 match prefetched.edge.get(&eid).cloned() {
4622 Some(mut base) => {
4623 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4624 for (k, v) in l0 {
4625 base.insert(k, v);
4626 }
4627 }
4628 Ok(base)
4629 }
4630 None => Ok(prop_manager
4631 .get_all_edge_props_with_ctx(eid, ctx)
4632 .await?
4633 .unwrap_or_default()),
4634 }
4635}
4636
4637#[cfg(test)]
4638mod tests {
4639 use super::*;
4640
4641 // ── merge_props tests ────────────────────────────────────────────
4642
4643 #[test]
4644 fn test_merge_props_replace_tombstones_missing_keys() {
4645 let current: HashMap<String, Value> = [
4646 ("name".into(), Value::String("Alice".into())),
4647 ("age".into(), Value::Int(30)),
4648 ]
4649 .into();
4650 let incoming: HashMap<String, Value> =
4651 [("name".into(), Value::String("Bob".into()))].into();
4652
4653 let result = Executor::merge_props(current, incoming, true);
4654 assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4655 assert_eq!(
4656 result.get("age"),
4657 Some(&Value::Null),
4658 "Missing keys should be tombstoned in replace mode"
4659 );
4660 }
4661
4662 #[test]
4663 fn test_merge_props_merge_preserves_existing() {
4664 let current: HashMap<String, Value> = [
4665 ("name".into(), Value::String("Alice".into())),
4666 ("age".into(), Value::Int(30)),
4667 ]
4668 .into();
4669 let incoming: HashMap<String, Value> =
4670 [("city".into(), Value::String("NYC".into()))].into();
4671
4672 let result = Executor::merge_props(current, incoming, false);
4673 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4674 assert_eq!(result.get("age"), Some(&Value::Int(30)));
4675 assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4676 }
4677
4678 #[test]
4679 fn test_merge_props_null_incoming_is_tombstone() {
4680 let current: HashMap<String, Value> =
4681 [("name".into(), Value::String("Alice".into()))].into();
4682 let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4683
4684 // Merge mode: null overwrites
4685 let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4686 assert_eq!(result.get("name"), Some(&Value::Null));
4687
4688 // Replace mode: null is tombstone
4689 let result = Executor::merge_props(current, incoming, true);
4690 assert_eq!(result.get("name"), Some(&Value::Null));
4691 }
4692
4693 #[test]
4694 fn test_merge_props_empty_current() {
4695 let current: HashMap<String, Value> = HashMap::new();
4696 let incoming: HashMap<String, Value> =
4697 [("name".into(), Value::String("Alice".into()))].into();
4698
4699 let result = Executor::merge_props(current, incoming, false);
4700 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4701 assert_eq!(result.len(), 1);
4702 }
4703
4704 #[test]
4705 fn test_merge_props_empty_incoming_replace_tombstones_all() {
4706 let current: HashMap<String, Value> = [
4707 ("name".into(), Value::String("Alice".into())),
4708 ("age".into(), Value::Int(30)),
4709 ]
4710 .into();
4711 let incoming: HashMap<String, Value> = HashMap::new();
4712
4713 let result = Executor::merge_props(current, incoming, true);
4714 assert_eq!(result.get("name"), Some(&Value::Null));
4715 assert_eq!(result.get("age"), Some(&Value::Null));
4716 }
4717
4718 // ── extract_labels_from_node tests ───────────────────────────────
4719
4720 #[test]
4721 fn test_extract_labels_from_map() {
4722 let mut map = HashMap::new();
4723 map.insert("_vid".into(), Value::Int(1));
4724 map.insert(
4725 "_labels".into(),
4726 Value::List(vec![
4727 Value::String("Person".into()),
4728 Value::String("Employee".into()),
4729 ]),
4730 );
4731 let val = Value::Map(map);
4732
4733 let labels = Executor::extract_labels_from_node(&val);
4734 assert_eq!(
4735 labels,
4736 Some(vec!["Person".to_string(), "Employee".to_string()])
4737 );
4738 }
4739
4740 #[test]
4741 fn test_extract_labels_from_value_node() {
4742 let node = uni_common::Node {
4743 vid: uni_common::core::id::Vid::from(1u64),
4744 labels: vec!["Person".to_string()],
4745 properties: HashMap::new(),
4746 };
4747 let labels = Executor::extract_labels_from_node(&Value::Node(node));
4748 assert_eq!(labels, Some(vec!["Person".to_string()]));
4749 }
4750
4751 #[test]
4752 fn test_extract_labels_non_node_returns_none() {
4753 assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4754 assert_eq!(
4755 Executor::extract_labels_from_node(&Value::String("hello".into())),
4756 None
4757 );
4758 }
4759
4760 // ── extract_user_properties_from_value tests ─────────────────────
4761
4762 #[test]
4763 fn test_extract_user_props_strips_internal_keys() {
4764 let mut map = HashMap::new();
4765 map.insert("_vid".into(), Value::Int(1));
4766 map.insert(
4767 "_labels".into(),
4768 Value::List(vec![Value::String("Person".into())]),
4769 );
4770 map.insert("name".into(), Value::String("Alice".into()));
4771 map.insert("age".into(), Value::Int(30));
4772
4773 let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4774 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4775 assert_eq!(props.get("age"), Some(&Value::Int(30)));
4776 assert!(!props.contains_key("_vid"));
4777 assert!(!props.contains_key("_labels"));
4778 }
4779
4780 #[test]
4781 fn test_extract_user_props_plain_map_returns_as_is() {
4782 let mut map = HashMap::new();
4783 map.insert("key".into(), Value::String("value".into()));
4784
4785 let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
4786 assert_eq!(props, map);
4787 }
4788
4789 #[test]
4790 fn test_extract_user_props_from_value_node() {
4791 let mut properties = HashMap::new();
4792 properties.insert("name".into(), Value::String("Alice".into()));
4793 let node = uni_common::Node {
4794 vid: uni_common::core::id::Vid::from(1u64),
4795 labels: vec!["Person".to_string()],
4796 properties,
4797 };
4798 let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
4799 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4800 }
4801}