1use super::core::*;
5use crate::query::df_graph::mutation_common::Prefetch;
6use crate::query::planner::LogicalPlan;
7use anyhow::{Result, anyhow};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17 DropEdgeType, DropLabel, Expr, NodePattern, Pattern, PatternElement, RemoveItem, SetClause,
18 SetItem,
19};
20use uni_store::QueryContext;
21use uni_store::runtime::l0_visibility;
22use uni_store::runtime::property_manager::PropertyManager;
23use uni_store::runtime::writer::Writer;
24
25type MergeKey = Vec<(String, Value)>;
29
30struct EdgeIdentity {
32 eid: Eid,
33 src: Vid,
34 dst: Vid,
35 edge_type_id: u32,
36}
37
38struct PendingVertexSet {
44 vid: Vid,
45 labels: Vec<String>,
46 props: HashMap<String, Value>,
51 partial: bool,
56 touched: HashSet<String>,
60}
61
62struct PendingEdgeSet {
64 src: Vid,
65 dst: Vid,
66 edge_type_id: u32,
67 eid: Eid,
68 edge_type_name: String,
69 partial: bool,
73 touched: HashSet<String>,
77 props: HashMap<String, Value>,
78}
79
80fn reject_if_ephemeral_vid(vid: Vid) -> Result<()> {
84 if vid.is_ephemeral() {
85 return Err(anyhow::Error::from(
86 uni_common::UniError::EphemeralWriteAttempt {
87 kind: "node",
88 id: vid.transient_id().unwrap_or(vid.as_u64()),
89 },
90 ));
91 }
92 Ok(())
93}
94
95fn value_type_name(val: &Value) -> &'static str {
97 match val {
98 Value::Null => "Null",
99 Value::Bool(_) => "Bool",
100 Value::Int(_) => "Int",
101 Value::Float(_) => "Float",
102 Value::String(_) => "String",
103 Value::Bytes(_) => "Bytes",
104 Value::List(_) => "List",
105 Value::Map(_) => "Map",
106 Value::Node(_) => "Node",
107 Value::Edge(_) => "Edge",
108 Value::Path(_) => "Path",
109 Value::Vector(_) => "Vector",
110 Value::Temporal(_) => "Temporal",
111 _ => "value",
112 }
113}
114
115fn reject_if_ephemeral_eid(eid: Eid) -> Result<()> {
117 if eid.is_ephemeral() {
118 return Err(anyhow::Error::from(
119 uni_common::UniError::EphemeralWriteAttempt {
120 kind: "edge",
121 id: eid.transient_id().unwrap_or(eid.as_u64()),
122 },
123 ));
124 }
125 Ok(())
126}
127
128fn reject_virtual_label_write(
149 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
150 labels: &[String],
151 op: &str,
152) -> Result<()> {
153 let Some(registry) = registry else {
154 return Ok(());
155 };
156 for label in labels {
157 if registry.virtual_label_by_name(label).is_some() {
158 return Err(anyhow!(
159 "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
160 labels are read-only; write back via the originating catalog instead"
161 ));
162 }
163 }
164 Ok(())
165}
166
167fn reject_virtual_edge_type_write(
177 registry: Option<&Arc<uni_plugin::PluginRegistry>>,
178 edge_type_id: u32,
179 op: &str,
180) -> Result<()> {
181 let Some(registry) = registry else {
182 return Ok(());
183 };
184 if let Some(entry) = registry.virtual_edge_type_by_id(edge_type_id) {
185 return Err(anyhow!(
186 "Cannot {op} on virtual (catalog-resolved) edge type `{}` — virtual edge \
187 types are read-only; write back via the originating catalog instead",
188 entry.name
189 ));
190 }
191 Ok(())
192}
193
194impl Executor {
195 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
202 match node_val {
203 Value::Map(map) => {
204 if let Some(Value::List(labels_arr)) = map.get("_labels") {
206 let labels: Vec<String> = labels_arr
207 .iter()
208 .filter_map(|v| v.as_str().map(|s| s.to_string()))
209 .collect();
210 if !labels.is_empty() {
211 return Some(labels);
212 }
213 }
214 None
215 }
216 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
217 _ => None,
218 }
219 }
220
221 pub(crate) fn extract_user_properties_from_value(
229 val: &Value,
230 ) -> Option<HashMap<String, Value>> {
231 match val {
232 Value::Map(map) => {
233 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
237 let is_edge_map = map.contains_key("_eid")
238 && map.contains_key("_src")
239 && map.contains_key("_dst");
240
241 if is_node_map || is_edge_map {
242 let user_props: HashMap<String, Value> = map
244 .iter()
245 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
246 .map(|(k, v)| (k.clone(), v.clone()))
247 .collect();
248 if user_props.is_empty()
252 && let Some(Value::Map(all_props)) = map.get("_all_props")
253 {
254 return Some(all_props.clone());
255 }
256 Some(user_props)
257 } else {
258 Some(map.clone())
260 }
261 }
262 Value::Node(node) => Some(node.properties.clone()),
263 Value::Edge(edge) => Some(edge.properties.clone()),
264 _ => None,
265 }
266 }
267
268 #[expect(clippy::too_many_arguments)]
285 async fn apply_properties_to_entity(
286 &self,
287 variable: &str,
288 new_props: HashMap<String, Value>,
289 replace: bool,
290 row: &mut HashMap<String, Value>,
291 writer: &Writer,
292 prop_manager: &PropertyManager,
293 params: &HashMap<String, Value>,
294 ctx: Option<&QueryContext>,
295 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
296 prefetched: &Prefetch,
297 ) -> Result<()> {
298 let target = row.get(variable).cloned();
300
301 let schema = self.storage.schema_manager().schema();
304
305 match target {
306 Some(Value::Node(ref node)) => {
307 let vid = node.vid;
308 let labels = node.labels.clone();
309 let current =
310 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
311 let write_props = Self::merge_props(current, new_props, replace);
312 let mut enriched = write_props.clone();
313 for label_name in &labels {
314 self.enrich_properties_with_generated_columns(
315 label_name,
316 &mut enriched,
317 prop_manager,
318 params,
319 ctx,
320 )
321 .await?;
322 }
323 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
324 let _ = writer
325 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
326 .await?;
327 if let Some(Value::Node(n)) = row.get_mut(variable) {
329 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
330 }
331 }
332 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
333 let vid = Self::vid_from_value(node_val)?;
334 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
335 let current =
336 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
337 let write_props = Self::merge_props(current, new_props, replace);
338 let mut enriched = write_props.clone();
339 for label_name in &labels {
340 self.enrich_properties_with_generated_columns(
341 label_name,
342 &mut enriched,
343 prop_manager,
344 params,
345 ctx,
346 )
347 .await?;
348 }
349 let enriched = Self::coerce_and_validate_props(enriched, &schema, &labels)?;
350 let _ = writer
351 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
352 .await?;
353 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
355 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
357 let effective: HashMap<String, Value> =
359 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
360 for (k, v) in &effective {
361 node_map.insert(k.clone(), v.clone());
362 }
363 node_map.insert("_all_props".to_string(), Value::Map(effective));
365 }
366 }
367 Some(Value::Edge(ref edge)) => {
368 let eid = edge.eid;
369 let src = edge.src;
370 let dst = edge.dst;
371 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
372 let current =
373 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
374 let write_props = Self::merge_props(current, new_props, replace);
375 let write_props = Self::coerce_and_validate_props(
376 write_props,
377 &schema,
378 std::slice::from_ref(&edge.edge_type),
379 )?;
380 writer
381 .insert_edge(
382 src,
383 dst,
384 etype,
385 eid,
386 write_props.clone(),
387 Some(edge.edge_type.clone()),
388 tx_l0,
389 )
390 .await?;
391 if let Some(Value::Edge(e)) = row.get_mut(variable) {
393 e.properties = write_props
394 .into_iter()
395 .filter(|(_, v)| !v.is_null())
396 .collect();
397 }
398 }
399 Some(Value::Map(ref map))
400 if map.contains_key("_eid")
401 && map.contains_key("_src")
402 && map.contains_key("_dst") =>
403 {
404 let ei = self.extract_edge_identity(map)?;
405 let current =
406 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx).await?;
407 let write_props = Self::merge_props(current, new_props, replace);
408 let edge_type_name = map
409 .get("_type")
410 .and_then(|v| v.as_str())
411 .map(|s| s.to_string())
412 .or_else(|| {
413 self.storage
414 .schema_manager()
415 .edge_type_name_by_id_unified(ei.edge_type_id)
416 });
417 let write_props = match &edge_type_name {
418 Some(name) => Self::coerce_and_validate_props(
419 write_props,
420 &schema,
421 std::slice::from_ref(name),
422 )?,
423 None => write_props,
424 };
425 writer
426 .insert_edge(
427 ei.src,
428 ei.dst,
429 ei.edge_type_id,
430 ei.eid,
431 write_props.clone(),
432 edge_type_name,
433 tx_l0,
434 )
435 .await?;
436 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
438 edge_map.retain(|k, _| k.starts_with('_'));
439 let effective: HashMap<String, Value> = write_props
440 .into_iter()
441 .filter(|(_, v)| !v.is_null())
442 .collect();
443 for (k, v) in &effective {
444 edge_map.insert(k.clone(), v.clone());
445 }
446 edge_map.insert("_all_props".to_string(), Value::Map(effective));
448 }
449 }
450 _ => {
451 }
453 }
454 Ok(())
455 }
456
457 fn merge_props(
468 current: HashMap<String, Value>,
469 incoming: HashMap<String, Value>,
470 replace: bool,
471 ) -> HashMap<String, Value> {
472 if replace {
473 let mut result: HashMap<String, Value> = incoming
475 .iter()
476 .filter(|(_, v)| !v.is_null())
477 .map(|(k, v)| (k.clone(), v.clone()))
478 .collect();
479 for k in current.keys() {
482 if incoming.get(k).is_none_or(|v| v.is_null()) {
483 result.insert(k.clone(), Value::Null);
484 }
485 }
486 result
487 } else {
488 let mut result = current;
490 result.extend(incoming);
491 result
492 }
493 }
494
495 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
497 let eid = Eid::from(
498 map.get("_eid")
499 .and_then(|v| v.as_u64())
500 .ok_or_else(|| anyhow!("Invalid _eid"))?,
501 );
502 let src = Vid::from(
503 map.get("_src")
504 .and_then(|v| v.as_u64())
505 .ok_or_else(|| anyhow!("Invalid _src"))?,
506 );
507 let dst = Vid::from(
508 map.get("_dst")
509 .and_then(|v| v.as_u64())
510 .ok_or_else(|| anyhow!("Invalid _dst"))?,
511 );
512 let edge_type_id = self.resolve_edge_type_id(
513 map.get("_type")
514 .or_else(|| map.get("_type_name"))
515 .ok_or_else(|| anyhow!("Missing _type/_type_name on edge map"))?,
516 )?;
517 Ok(EdgeIdentity {
518 eid,
519 src,
520 dst,
521 edge_type_id,
522 })
523 }
524
525 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
532 match type_val {
533 Value::Int(i) => Ok(*i as u32),
534 Value::String(name) => {
535 if self.config.strict_schema {
536 let schema = self.storage.schema_manager().schema();
537 schema
538 .edge_type_id_by_name_case_insensitive(name)
539 .ok_or_else(|| {
540 anyhow!(
541 "Edge type '{}' is not defined in the schema \
542 (strict_schema is enabled). \
543 Declare it with db.schema().edge_type(...).apply() first.",
544 name
545 )
546 })
547 } else {
548 Ok(self
550 .storage
551 .schema_manager()
552 .get_or_assign_edge_type_id(name))
553 }
554 }
555 _ => Err(anyhow!(
556 "Invalid _type value: expected Int or String, got {:?}",
557 type_val
558 )),
559 }
560 }
561
562 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
563 if let Some(writer_arc) = &self.writer {
564 {
566 let writer: &uni_store::Writer = writer_arc.as_ref();
567 writer.flush_to_l1(None).await?;
568 } let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
572 let compaction_results = compactor.compact_all().await?;
573
574 let am = self.storage.adjacency_manager();
576 let schema = self.storage.schema_manager().schema();
577 for info in compaction_results {
578 let direction = match info.direction.as_str() {
580 "fwd" => uni_store::storage::direction::Direction::Outgoing,
581 "bwd" => uni_store::storage::direction::Direction::Incoming,
582 _ => continue,
583 };
584
585 if let Some(edge_type_id) =
587 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
588 {
589 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
591 }
592 }
593 }
594 Ok(())
595 }
596
597 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
598 if let Some(writer_arc) = &self.writer {
599 let writer: &uni_store::Writer = writer_arc.as_ref();
600 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
601 }
602 Ok(())
603 }
604
605 pub(crate) async fn execute_copy_to(
606 &self,
607 identifier: &str,
608 path: &str,
609 format: &str,
610 options: &HashMap<String, Value>,
611 ) -> Result<usize> {
612 let schema = self.storage.schema_manager().schema();
614
615 if schema.get_edge_type_case_insensitive(identifier).is_some() {
617 return self
618 .export_edge_type_in_format(identifier, path, format)
619 .await;
620 }
621
622 if schema.get_label_case_insensitive(identifier).is_some() {
624 return self
625 .export_vertex_label_in_format(identifier, path, format, options)
626 .await;
627 }
628
629 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
631 }
632
633 async fn export_vertex_label_in_format(
634 &self,
635 label: &str,
636 path: &str,
637 format: &str,
638 _options: &HashMap<String, Value>,
639 ) -> Result<usize> {
640 match format {
641 "parquet" => self.export_vertex_label(label, path).await,
642 "csv" => {
643 let mut stream = self
644 .storage
645 .scan_vertex_table_stream(label)
646 .await?
647 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
648
649 let mut all_rows = Vec::new();
651 let mut column_names = Vec::new();
652
653 use futures::StreamExt;
655 while let Some(batch_result) = stream.next().await {
656 let batch = batch_result?;
657
658 if column_names.is_empty() {
660 column_names = batch
661 .schema()
662 .fields()
663 .iter()
664 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
665 .map(|f| f.name().clone())
666 .collect();
667 }
668
669 for row_idx in 0..batch.num_rows() {
671 let mut row = Vec::new();
672 for field in batch.schema().fields() {
673 if field.name().starts_with('_') || field.name() == "ext_id" {
674 continue;
675 }
676
677 let col_idx = batch.schema().index_of(field.name())?;
678 let column = batch.column(col_idx);
679 let value = self.arrow_value_to_json(column, row_idx)?;
680
681 let csv_value = match value {
683 Value::Null => String::new(),
684 Value::Bool(b) => b.to_string(),
685 Value::Int(i) => i.to_string(),
686 Value::Float(f) => f.to_string(),
687 Value::String(s) => s,
688 _ => format!("{value}"),
689 };
690 row.push(csv_value);
691 }
692 all_rows.push(row);
693 }
694 }
695
696 let file = std::fs::File::create(path)?;
698 let mut wtr = csv::Writer::from_writer(file);
699
700 log::debug!("CSV export headers: {:?}", column_names);
702 wtr.write_record(&column_names)?;
703
704 for (i, row) in all_rows.iter().enumerate() {
706 log::debug!("CSV export row {}: {:?}", i, row);
707 wtr.write_record(row)?;
708 }
709
710 wtr.flush()?;
711 Ok(all_rows.len())
712 }
713 _ => Err(anyhow!(
714 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
715 format
716 )),
717 }
718 }
719
720 async fn export_edge_type_in_format(
721 &self,
722 edge_type: &str,
723 path: &str,
724 format: &str,
725 ) -> Result<usize> {
726 match format {
727 "parquet" => self.export_edge_type(edge_type, path).await,
728 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
729 _ => Err(anyhow!(
730 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
731 format
732 )),
733 }
734 }
735
736 async fn write_batches_to_parquet(
739 mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
740 path: &str,
741 entity_description: &str,
742 ) -> Result<usize> {
743 use futures::TryStreamExt;
744
745 let first_batch = match stream.try_next().await? {
747 Some(batch) => batch,
748 None => {
749 log::info!("No data to export from {}", entity_description);
750 return Ok(0);
751 }
752 };
753
754 let file = std::fs::File::create(path)?;
756 let arrow_schema = first_batch.schema();
757 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
758
759 let mut count = first_batch.num_rows();
761 writer.write(&first_batch)?;
762
763 while let Some(batch) = stream.try_next().await? {
765 count += batch.num_rows();
766 writer.write(&batch)?;
767 }
768
769 writer.close()?;
770
771 log::info!(
772 "Exported {} rows from {} to '{}'",
773 count,
774 entity_description,
775 path
776 );
777 Ok(count)
778 }
779
780 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
782 let stream = self
783 .storage
784 .scan_vertex_table_stream(label)
785 .await?
786 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
787
788 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
789 }
790
791 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
793 let schema = self.storage.schema_manager().schema();
794 if !schema.edge_types.contains_key(edge_type) {
795 return Err(anyhow!("Edge type '{}' not found", edge_type));
796 }
797
798 let filter = format!("type = '{}'", edge_type);
799 let stream = self
800 .storage
801 .scan_main_edge_table_stream(Some(&filter))
802 .await?
803 .ok_or_else(|| anyhow!("No edge data found"))?;
804
805 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
806 }
807
808 pub(crate) async fn execute_copy_from(
809 &self,
810 label: &str,
811 path: &str,
812 format: &str,
813 options: &HashMap<String, Value>,
814 ) -> Result<usize> {
815 let batches = match format {
817 "parquet" => self.read_parquet_file(path)?,
818 "csv" => self.read_csv_file(path, label, options)?,
819 _ => {
820 return Err(anyhow!(
821 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
822 format
823 ));
824 }
825 };
826
827 let writer_arc = self
829 .writer
830 .as_ref()
831 .ok_or_else(|| anyhow!("No writer available"))?;
832
833 let db_schema = self.storage.schema_manager().schema();
834
835 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
837
838 if is_edge {
839 let edge_type_id = db_schema
841 .edge_type_id_by_name(label)
842 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
843
844 let src_col = options
846 .get("src_col")
847 .and_then(|v| v.as_str())
848 .unwrap_or("src");
849 let dst_col = options
850 .get("dst_col")
851 .and_then(|v| v.as_str())
852 .unwrap_or("dst");
853
854 let writer: &uni_store::Writer = writer_arc.as_ref();
857 let mut total_rows = 0;
858 for batch in batches {
859 let num_rows = batch.num_rows();
860 let eids = writer.allocate_eids(num_rows).await?;
862
863 for (row_idx, &eid) in eids.iter().enumerate().take(num_rows) {
864 let mut properties = HashMap::new();
865 let mut src_vid: Option<Vid> = None;
866 let mut dst_vid: Option<Vid> = None;
867
868 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
870 let col_name = field.name();
871 let column = batch.column(col_idx);
872 let value = self.arrow_value_to_json(column, row_idx)?;
873
874 if col_name == src_col {
875 let raw = value.as_u64().unwrap_or_else(|| {
876 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
877 });
878 src_vid = Some(Vid::new(raw));
879 } else if col_name == dst_col {
880 let raw = value.as_u64().unwrap_or_else(|| {
881 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
882 });
883 dst_vid = Some(Vid::new(raw));
884 } else if !col_name.starts_with('_') && !value.is_null() {
885 properties.insert(col_name.clone(), value);
886 }
887 }
888
889 let src = src_vid
890 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
891 let dst = dst_vid
892 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
893
894 writer
895 .insert_edge(
896 src,
897 dst,
898 edge_type_id,
899 eid,
900 properties,
901 Some(label.to_string()),
902 None,
903 )
904 .await?;
905
906 total_rows += 1;
907 }
908 }
909
910 log::info!(
911 "Imported {} edge rows from '{}' into edge type '{}'",
912 total_rows,
913 path,
914 label
915 );
916
917 if total_rows > 0 {
919 writer.flush_to_l1(None).await?;
920 }
921
922 Ok(total_rows)
923 } else {
924 db_schema
927 .label_id_by_name_case_insensitive(label)
928 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
929
930 let writer: &uni_store::Writer = writer_arc.as_ref();
933 let mut total_rows = 0;
934 for batch in batches {
935 let num_rows = batch.num_rows();
936 let vids = writer.allocate_vids(num_rows).await?;
938
939 for (row_idx, &vid) in vids.iter().enumerate().take(num_rows) {
941 let mut properties = HashMap::new();
942
943 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
945 let col_name = field.name();
946
947 if col_name.starts_with('_') {
949 continue;
950 }
951
952 let column = batch.column(col_idx);
953 let value = self.arrow_value_to_json(column, row_idx)?;
954
955 if !value.is_null() {
956 properties.insert(col_name.clone(), value);
957 }
958 }
959
960 let _ = writer
961 .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
962 .await?;
963
964 total_rows += 1;
965 }
966 }
967
968 log::info!(
969 "Imported {} rows from '{}' into label '{}'",
970 total_rows,
971 path,
972 label
973 );
974
975 if total_rows > 0 {
977 writer.flush_to_l1(None).await?;
978 }
979
980 Ok(total_rows)
981 }
982 }
983
984 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
985 use arrow_array::Array;
986 use arrow_schema::DataType as ArrowDataType;
987
988 if column.is_null(row_idx) {
989 return Ok(Value::Null);
990 }
991
992 match column.data_type() {
993 ArrowDataType::Utf8 => {
994 let array = column
995 .as_any()
996 .downcast_ref::<arrow_array::StringArray>()
997 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
998 Ok(Value::String(array.value(row_idx).to_string()))
999 }
1000 ArrowDataType::Int32 => {
1001 let array = column
1002 .as_any()
1003 .downcast_ref::<arrow_array::Int32Array>()
1004 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
1005 Ok(Value::Int(array.value(row_idx) as i64))
1006 }
1007 ArrowDataType::Int64 => {
1008 let array = column
1009 .as_any()
1010 .downcast_ref::<arrow_array::Int64Array>()
1011 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
1012 Ok(Value::Int(array.value(row_idx)))
1013 }
1014 ArrowDataType::Float32 => {
1015 let array = column
1016 .as_any()
1017 .downcast_ref::<arrow_array::Float32Array>()
1018 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
1019 Ok(Value::Float(array.value(row_idx) as f64))
1020 }
1021 ArrowDataType::Float64 => {
1022 let array = column
1023 .as_any()
1024 .downcast_ref::<arrow_array::Float64Array>()
1025 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
1026 Ok(Value::Float(array.value(row_idx)))
1027 }
1028 ArrowDataType::Boolean => {
1029 let array = column
1030 .as_any()
1031 .downcast_ref::<arrow_array::BooleanArray>()
1032 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
1033 Ok(Value::Bool(array.value(row_idx)))
1034 }
1035 ArrowDataType::UInt64 => {
1036 let array = column
1037 .as_any()
1038 .downcast_ref::<arrow_array::UInt64Array>()
1039 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
1040 Ok(Value::Int(array.value(row_idx) as i64))
1041 }
1042 _ => {
1043 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
1045 if let Some(arr) = array {
1046 Ok(Value::String(arr.value(row_idx).to_string()))
1047 } else {
1048 Ok(Value::Null)
1049 }
1050 }
1051 }
1052 }
1053
1054 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
1055 let file = std::fs::File::open(path)?;
1056 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
1057 .build()?;
1058 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1059 }
1060
1061 fn read_csv_file(
1062 &self,
1063 path: &str,
1064 label: &str,
1065 options: &HashMap<String, Value>,
1066 ) -> Result<Vec<arrow_array::RecordBatch>> {
1067 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1068 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1069 use std::sync::Arc;
1070
1071 let has_headers = options
1073 .get("headers")
1074 .and_then(|v| v.as_bool())
1075 .unwrap_or(true);
1076
1077 let file = std::fs::File::open(path)?;
1079 let mut rdr = csv::ReaderBuilder::new()
1080 .has_headers(has_headers)
1081 .from_reader(file);
1082
1083 let db_schema = self.storage.schema_manager().schema();
1085 let properties = db_schema.properties.get(label);
1086
1087 let mut rows: Vec<Vec<String>> = Vec::new();
1089 let headers: Vec<String> = if has_headers {
1090 rdr.headers()?.iter().map(|s| s.to_string()).collect()
1091 } else {
1092 Vec::new()
1093 };
1094
1095 for result in rdr.records() {
1096 let record = result?;
1097 rows.push(record.iter().map(|s| s.to_string()).collect());
1098 }
1099
1100 if rows.is_empty() {
1101 return Ok(Vec::new());
1102 }
1103
1104 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
1106 let col_names: Vec<String> = if has_headers {
1107 headers
1108 } else {
1109 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
1110 };
1111
1112 for name in &col_names {
1113 let arrow_type = if let Some(props) = properties {
1114 if let Some(prop_meta) = props.get(name) {
1115 match prop_meta.r#type {
1116 DataType::Int32 => ArrowDataType::Int32,
1117 DataType::Int64 => ArrowDataType::Int64,
1118 DataType::Float32 => ArrowDataType::Float32,
1119 DataType::Float64 => ArrowDataType::Float64,
1120 DataType::Bool => ArrowDataType::Boolean,
1121 _ => ArrowDataType::Utf8,
1122 }
1123 } else {
1124 ArrowDataType::Utf8
1125 }
1126 } else {
1127 ArrowDataType::Utf8
1128 };
1129 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
1130 }
1131
1132 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
1133
1134 let mut columns: Vec<ArrayRef> = Vec::new();
1136 for (col_idx, field) in arrow_fields.iter().enumerate() {
1137 match field.data_type() {
1138 ArrowDataType::Int32 => {
1139 let values: Vec<Option<i32>> = rows
1140 .iter()
1141 .map(|row| {
1142 if col_idx < row.len() {
1143 row[col_idx].parse().ok()
1144 } else {
1145 None
1146 }
1147 })
1148 .collect();
1149 columns.push(Arc::new(Int32Array::from(values)));
1150 }
1151 _ => {
1152 let values: Vec<Option<String>> = rows
1154 .iter()
1155 .map(|row| {
1156 if col_idx < row.len() {
1157 Some(row[col_idx].clone())
1158 } else {
1159 None
1160 }
1161 })
1162 .collect();
1163 columns.push(Arc::new(StringArray::from(values)));
1164 }
1165 }
1166 }
1167
1168 let batch = RecordBatch::try_new(arrow_schema, columns)?;
1169 Ok(vec![batch])
1170 }
1171
1172 fn parse_data_type(type_str: &str) -> Result<DataType> {
1173 use uni_common::core::schema::{CrdtType, PointType};
1174 let type_str = type_str.to_lowercase();
1175 let type_str = type_str.trim();
1176 match type_str {
1177 "string" | "text" | "varchar" => Ok(DataType::String),
1178 "int" | "integer" | "int32" => Ok(DataType::Int32),
1179 "long" | "int64" | "bigint" => Ok(DataType::Int64),
1180 "float" | "float32" | "real" => Ok(DataType::Float32),
1181 "double" | "float64" => Ok(DataType::Float64),
1182 "bool" | "boolean" => Ok(DataType::Bool),
1183 "timestamp" => Ok(DataType::Timestamp),
1184 "date" => Ok(DataType::Date),
1185 "time" => Ok(DataType::Time),
1186 "datetime" => Ok(DataType::DateTime),
1187 "duration" => Ok(DataType::Duration),
1188 "btic" => Ok(DataType::Btic),
1189 "json" | "jsonb" => Ok(DataType::CypherValue),
1190 "bytes" | "blob" | "binary" => Ok(DataType::Bytes),
1191 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
1192 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1193 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1194 s if s.starts_with("vector(") && s.ends_with(')') => {
1195 let dims_str = &s[7..s.len() - 1];
1196 let dimensions = dims_str
1197 .parse::<usize>()
1198 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1199 Ok(DataType::Vector { dimensions })
1200 }
1201 s if s.starts_with("list<") && s.ends_with('>') => {
1202 let inner_type_str = &s[5..s.len() - 1];
1203 let inner_type = Self::parse_data_type(inner_type_str)?;
1204 Ok(DataType::List(Box::new(inner_type)))
1205 }
1206 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1207 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1208 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1209 }
1210 }
1211
1212 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1213 let sm = self.storage.schema_manager_arc();
1214 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1215 return Ok(());
1216 }
1217 sm.add_label_with_desc(&clause.name, clause.description)?;
1218 for prop in clause.properties {
1219 let dt = Self::parse_data_type(&prop.data_type)?;
1220 sm.add_property_with_desc(
1221 &clause.name,
1222 &prop.name,
1223 dt,
1224 prop.nullable,
1225 prop.description,
1226 )?;
1227 if prop.unique {
1228 let constraint = Constraint {
1229 name: format!("{}_{}_unique", clause.name, prop.name),
1230 constraint_type: ConstraintType::Unique {
1231 properties: vec![prop.name],
1232 },
1233 target: ConstraintTarget::Label(clause.name.clone()),
1234 enabled: true,
1235 };
1236 sm.add_constraint(constraint)?;
1237 }
1238 }
1239 sm.save().await?;
1240 Ok(())
1241 }
1242
1243 fn is_generated_key(&self, labels: &[String], key: &str) -> bool {
1248 let schema = self.storage.schema_manager().schema();
1249 for label in labels {
1250 if let Some(props_meta) = schema.properties.get(label)
1251 && let Some(meta) = props_meta.get(key)
1252 && meta.generation_expression.is_some()
1253 {
1254 return true;
1255 }
1256 }
1257 false
1258 }
1259
1260 pub(crate) async fn enrich_properties_with_generated_columns(
1261 &self,
1262 label_name: &str,
1263 properties: &mut HashMap<String, Value>,
1264 prop_manager: &PropertyManager,
1265 params: &HashMap<String, Value>,
1266 ctx: Option<&QueryContext>,
1267 ) -> Result<()> {
1268 let schema = self.storage.schema_manager().schema();
1269
1270 if let Some(props_meta) = schema.properties.get(label_name) {
1271 let mut generators = Vec::new();
1272 for (prop_name, meta) in props_meta {
1273 if let Some(expr_str) = &meta.generation_expression {
1274 generators.push((prop_name.clone(), expr_str.clone()));
1275 }
1276 }
1277
1278 for (prop_name, expr_str) in generators {
1279 let cache_key = (label_name.to_string(), prop_name.clone());
1280 let expr = {
1281 let cache = self.gen_expr_cache.read().await;
1282 cache.get(&cache_key).cloned()
1283 };
1284
1285 let expr = match expr {
1286 Some(e) => e,
1287 None => {
1288 let parsed = uni_cypher::parse_expression(&expr_str)
1289 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1290 let mut cache = self.gen_expr_cache.write().await;
1291 cache.insert(cache_key, parsed.clone());
1292 parsed
1293 }
1294 };
1295
1296 let mut scope = HashMap::new();
1297
1298 if let Some(var) = expr.extract_variable() {
1300 scope.insert(var, Value::Map(properties.clone()));
1301 } else {
1302 for (k, v) in properties.iter() {
1305 scope.insert(k.clone(), v.clone());
1306 }
1307 }
1308
1309 let val = self
1310 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1311 .await?;
1312 properties.insert(prop_name, val);
1313 }
1314 }
1315 Ok(())
1316 }
1317
1318 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1319 let sm = self.storage.schema_manager_arc();
1320 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1321 return Ok(());
1322 }
1323 sm.add_edge_type_with_desc(
1324 &clause.name,
1325 clause.src_labels,
1326 clause.dst_labels,
1327 clause.description,
1328 )?;
1329 for prop in clause.properties {
1330 let dt = Self::parse_data_type(&prop.data_type)?;
1331 sm.add_property_with_desc(
1332 &clause.name,
1333 &prop.name,
1334 dt,
1335 prop.nullable,
1336 prop.description,
1337 )?;
1338 }
1339 sm.save().await?;
1340 Ok(())
1341 }
1342
1343 pub(crate) async fn execute_alter_entity(
1348 sm: &Arc<SchemaManager>,
1349 entity_name: &str,
1350 action: AlterAction,
1351 ) -> Result<()> {
1352 match action {
1353 AlterAction::AddProperty(prop) => {
1354 let dt = Self::parse_data_type(&prop.data_type)?;
1355 sm.add_property_with_desc(
1356 entity_name,
1357 &prop.name,
1358 dt,
1359 prop.nullable,
1360 prop.description,
1361 )?;
1362 }
1363 AlterAction::DropProperty(prop_name) => {
1364 sm.drop_property(entity_name, &prop_name)?;
1365 }
1366 AlterAction::RenameProperty { old_name, new_name } => {
1367 sm.rename_property(entity_name, &old_name, &new_name)?;
1368 }
1369 AlterAction::SetDescription(desc) => {
1370 if sm.schema().labels.contains_key(entity_name) {
1371 sm.set_label_description(entity_name, desc)?;
1372 } else {
1373 sm.set_edge_type_description(entity_name, desc)?;
1374 }
1375 }
1376 AlterAction::SetPropertyDescription {
1377 property,
1378 description,
1379 } => {
1380 sm.set_property_description(entity_name, &property, description)?;
1381 }
1382 }
1383 sm.save().await?;
1384 Ok(())
1385 }
1386
1387 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1388 Self::execute_alter_entity(
1389 &self.storage.schema_manager_arc(),
1390 &clause.name,
1391 clause.action,
1392 )
1393 .await
1394 }
1395
1396 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1397 Self::execute_alter_entity(
1398 &self.storage.schema_manager_arc(),
1399 &clause.name,
1400 clause.action,
1401 )
1402 .await
1403 }
1404
1405 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1406 let sm = self.storage.schema_manager_arc();
1407 sm.drop_label(&clause.name, clause.if_exists)?;
1408 sm.save().await?;
1409 Ok(())
1410 }
1411
1412 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1413 let sm = self.storage.schema_manager_arc();
1414 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1415 sm.save().await?;
1416 Ok(())
1417 }
1418
1419 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1420 let sm = self.storage.schema_manager_arc();
1421 let target = ConstraintTarget::Label(clause.label);
1422 let c_type = match clause.constraint_type {
1423 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1424 properties: clause.properties,
1425 },
1426 AstConstraintType::Exists => {
1427 let property = clause
1428 .properties
1429 .into_iter()
1430 .next()
1431 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1432 ConstraintType::Exists { property }
1433 }
1434 AstConstraintType::Check => {
1435 let expression = clause
1436 .expression
1437 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1438 ConstraintType::Check {
1439 expression: expression.to_string_repr(),
1440 }
1441 }
1442 };
1443
1444 let constraint = Constraint {
1445 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1446 constraint_type: c_type,
1447 target,
1448 enabled: true,
1449 };
1450
1451 sm.add_constraint(constraint)?;
1452 sm.save().await?;
1453 Ok(())
1454 }
1455
1456 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1457 let sm = self.storage.schema_manager_arc();
1458 sm.drop_constraint(&clause.name, false)?;
1459 sm.save().await?;
1460 Ok(())
1461 }
1462
1463 fn merge_single_node_fastpath<'p>(
1474 &self,
1475 pattern: &'p Pattern,
1476 ) -> Option<(&'p NodePattern, String)> {
1477 if pattern.paths.len() != 1 {
1478 return None;
1479 }
1480 let path = &pattern.paths[0];
1481 if path.elements.len() != 1 {
1482 return None;
1483 }
1484 let PatternElement::Node(n) = &path.elements[0] else {
1485 return None;
1486 };
1487 let labels = n.labels.names();
1488 if labels.len() != 1 {
1489 return None;
1490 }
1491 let Some(Expr::Map(entries)) = n.properties.as_ref() else {
1493 return None;
1494 };
1495 if entries.is_empty() {
1496 return None;
1497 }
1498 Some((n, labels[0].clone()))
1499 }
1500
1501 fn merge_key_filter(key_props: &HashMap<String, Value>) -> Option<String> {
1510 if key_props.is_empty() {
1511 return None;
1512 }
1513 let mut parts = Vec::with_capacity(key_props.len() + 1);
1514 for (k, v) in key_props {
1515 if k.is_empty() || !k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
1517 return None;
1518 }
1519 let lit = match v {
1520 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1521 Value::Int(i) => i.to_string(),
1522 Value::Float(f) => f.to_string(),
1523 Value::Bool(b) => b.to_string(),
1524 _ => return None,
1525 };
1526 parts.push(format!("{k} = {lit}"));
1531 }
1532 parts.push("_deleted = false".to_string());
1533 Some(parts.join(" AND "))
1534 }
1535
1536 fn merge_key_tuple(key_props: &HashMap<String, Value>) -> MergeKey {
1538 let mut tuple: MergeKey = key_props
1539 .iter()
1540 .map(|(k, v)| (k.clone(), v.clone()))
1541 .collect();
1542 tuple.sort_by(|a, b| a.0.cmp(&b.0));
1543 tuple
1544 }
1545
1546 fn merge_l0_existing(
1556 &self,
1557 label: &str,
1558 key_names: &[String],
1559 ctx: Option<&QueryContext>,
1560 ) -> HashMap<MergeKey, Vec<Vid>> {
1561 let mut candidates: Vec<Vid> = Vec::new();
1562 l0_visibility::visit_l0_buffers(ctx, |l0| {
1563 if let Some(vids) = l0.label_to_vids.get(label) {
1564 candidates.extend(vids.iter().copied());
1565 }
1566 false
1567 });
1568
1569 let mut map: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1570 let mut seen: HashSet<Vid> = HashSet::new();
1571 for vid in candidates {
1572 if !seen.insert(vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1573 continue;
1574 }
1575 let tuple: MergeKey = key_names
1577 .iter()
1578 .map(|k| {
1579 let v = l0_visibility::lookup_vertex_prop(vid, k, ctx).unwrap_or(Value::Null);
1580 (k.clone(), v)
1581 })
1582 .collect();
1583 map.entry(tuple).or_default().push(vid);
1584 }
1585 map
1586 }
1587
1588 async fn merge_lookup_persisted(
1599 &self,
1600 label: &str,
1601 key_props: &HashMap<String, Value>,
1602 filter: &str,
1603 ctx: Option<&QueryContext>,
1604 ) -> Result<Vec<Vid>> {
1605 let mut matches: Vec<Vid> = Vec::new();
1606 let scanned = self
1607 .storage
1608 .scan_vertex_table(label, &["_vid"], Some(filter))
1609 .await?;
1610 if let Some(batch) = scanned
1611 && let Some(col) = batch
1612 .column_by_name("_vid")
1613 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
1614 {
1615 for i in 0..col.len() {
1616 let vid = Vid::from(col.value(i));
1617 if l0_visibility::is_vertex_deleted(vid, ctx) {
1618 continue;
1619 }
1620 if Self::vid_overrides_break_key(vid, key_props, ctx) {
1621 continue;
1622 }
1623 matches.push(vid);
1624 }
1625 }
1626 Ok(matches)
1627 }
1628
1629 fn vid_overrides_break_key(
1632 vid: Vid,
1633 key_props: &HashMap<String, Value>,
1634 ctx: Option<&QueryContext>,
1635 ) -> bool {
1636 key_props.iter().any(|(k, want)| {
1637 matches!(l0_visibility::lookup_vertex_prop(vid, k, ctx), Some(got) if &got != want)
1638 })
1639 }
1640
1641 fn build_node_map(vid: Vid, label: &str, props: uni_common::Properties) -> Value {
1649 let mut obj = HashMap::new();
1650 obj.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
1651 obj.insert(
1652 "_labels".to_string(),
1653 Value::List(vec![Value::String(label.to_string())]),
1654 );
1655 for (k, v) in props {
1656 obj.insert(k, v);
1657 }
1658 Value::Map(obj)
1659 }
1660
1661 fn l0_vid_matches_key(
1664 vid: Vid,
1665 key_props: &HashMap<String, Value>,
1666 ctx: Option<&QueryContext>,
1667 ) -> bool {
1668 key_props.iter().all(
1669 |(k, want)| match l0_visibility::lookup_vertex_prop(vid, k, ctx) {
1670 Some(got) => &got == want,
1671 None => *want == Value::Null,
1672 },
1673 )
1674 }
1675
1676 #[expect(
1690 clippy::too_many_arguments,
1691 reason = "mirrors execute_merge's threaded execution state"
1692 )]
1693 async fn execute_merge_row_indexed(
1694 &self,
1695 label: &str,
1696 node: &NodePattern,
1697 path_pattern: &Pattern,
1698 temp_vars: &[String],
1699 mut row: HashMap<String, Value>,
1700 key_props: &HashMap<String, Value>,
1701 filter: &str,
1702 key_tuple: &MergeKey,
1703 existing: &mut HashMap<MergeKey, Vec<Vid>>,
1704 on_match: Option<&SetClause>,
1705 on_create: Option<&SetClause>,
1706 prop_manager: &PropertyManager,
1707 params: &HashMap<String, Value>,
1708 ctx: Option<&QueryContext>,
1709 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1710 writer: &Writer,
1711 ) -> Result<Vec<HashMap<String, Value>>> {
1712 let mut seen: HashSet<Vid> = HashSet::new();
1713 let mut matches: Vec<Vid> = Vec::new();
1714 for vid in self
1716 .merge_lookup_persisted(label, key_props, filter, ctx)
1717 .await?
1718 {
1719 if seen.insert(vid) {
1720 matches.push(vid);
1721 }
1722 }
1723 if let Some(vids) = existing.get(key_tuple) {
1726 for &vid in vids {
1727 if seen.contains(&vid) || l0_visibility::is_vertex_deleted(vid, ctx) {
1728 continue;
1729 }
1730 if Self::l0_vid_matches_key(vid, key_props, ctx) && seen.insert(vid) {
1731 matches.push(vid);
1732 }
1733 }
1734 }
1735
1736 let mut out = Vec::new();
1737 if matches.is_empty() {
1738 self.execute_create_pattern(
1740 path_pattern,
1741 &mut row,
1742 writer,
1743 prop_manager,
1744 params,
1745 ctx,
1746 tx_l0_override,
1747 )
1748 .await?;
1749 if let Some(set) = on_create {
1750 self.execute_set_items_locked(
1751 &set.items,
1752 &mut row,
1753 writer,
1754 prop_manager,
1755 params,
1756 ctx,
1757 tx_l0_override,
1758 &Prefetch::default(),
1759 )
1760 .await?;
1761 }
1762 if let Some(var) = &node.variable
1764 && let Some(val) = row.get(var)
1765 && let Ok(vid) = Self::vid_from_value(val)
1766 {
1767 existing.entry(key_tuple.clone()).or_default().push(vid);
1768 }
1769 Self::bind_path_variables(path_pattern, &mut row, temp_vars);
1770 out.push(row);
1771 } else {
1772 for vid in matches {
1777 let mut m = row.clone();
1778 if let Some(var) = &node.variable {
1779 m.insert(
1781 var.clone(),
1782 Self::build_node_map(vid, label, HashMap::new()),
1783 );
1784 }
1785 if let Some(set) = on_match {
1786 self.execute_set_items_locked(
1787 &set.items,
1788 &mut m,
1789 writer,
1790 prop_manager,
1791 params,
1792 ctx,
1793 tx_l0_override,
1794 &Prefetch::default(),
1795 )
1796 .await?;
1797 }
1798 if let Some(var) = &node.variable {
1799 let props = read_vertex_props_with_prefetch(
1801 vid,
1802 &Prefetch::default(),
1803 prop_manager,
1804 ctx,
1805 )
1806 .await?;
1807 m.insert(var.clone(), Self::build_node_map(vid, label, props));
1808 }
1809 Self::bind_path_variables(path_pattern, &mut m, temp_vars);
1810 out.push(m);
1811 }
1812 }
1813 Ok(out)
1814 }
1815
1816 #[expect(clippy::too_many_arguments)]
1817 pub(crate) async fn execute_merge(
1818 &self,
1819 rows: Vec<HashMap<String, Value>>,
1820 pattern: &Pattern,
1821 on_match: Option<&SetClause>,
1822 on_create: Option<&SetClause>,
1823 prop_manager: &PropertyManager,
1824 params: &HashMap<String, Value>,
1825 ctx: Option<&QueryContext>,
1826 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1827 ) -> Result<Vec<HashMap<String, Value>>> {
1828 let writer_lock = self
1829 .writer
1830 .as_ref()
1831 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1832
1833 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1836
1837 let fastpath = self.merge_single_node_fastpath(pattern);
1843
1844 let mut fast_existing: HashMap<MergeKey, Vec<Vid>> = HashMap::new();
1849 if let Some((node, label)) = &fastpath {
1850 let mut key_names: Vec<String> = match &node.properties {
1851 Some(Expr::Map(entries)) => entries.iter().map(|(k, _)| k.clone()).collect(),
1852 _ => Vec::new(),
1853 };
1854 key_names.sort();
1855 fast_existing = self.merge_l0_existing(label, &key_names, ctx);
1856 }
1857
1858 let mut results = Vec::new();
1859 for mut row in rows {
1860 if let Some((node, label)) = &fastpath {
1861 let mut key_props: HashMap<String, Value> = HashMap::new();
1865 if let Some(props_expr) = &node.properties
1866 && let Value::Map(map) = self
1867 .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1868 .await?
1869 {
1870 key_props = map;
1871 }
1872 if let Some(filter) = Self::merge_key_filter(&key_props) {
1873 let key_tuple = Self::merge_key_tuple(&key_props);
1874 let writer: &uni_store::Writer = writer_lock.as_ref();
1875 let row_out = self
1876 .execute_merge_row_indexed(
1877 label,
1878 node,
1879 &path_pattern,
1880 &temp_vars,
1881 row,
1882 &key_props,
1883 &filter,
1884 &key_tuple,
1885 &mut fast_existing,
1886 on_match,
1887 on_create,
1888 prop_manager,
1889 params,
1890 ctx,
1891 tx_l0_override,
1892 writer,
1893 )
1894 .await?;
1895 results.extend(row_out);
1896 continue;
1897 }
1898 }
1900
1901 let matches = self
1906 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1907 .await?;
1908 let writer: &uni_store::Writer = writer_lock.as_ref();
1909
1910 let result: Result<Vec<HashMap<String, Value>>> = async {
1911 let mut batch = Vec::new();
1912 if !matches.is_empty() {
1913 for mut m in matches {
1914 if let Some(set) = on_match {
1915 self.execute_set_items_locked(
1916 &set.items,
1917 &mut m,
1918 writer,
1919 prop_manager,
1920 params,
1921 ctx,
1922 tx_l0_override,
1923 &Prefetch::default(),
1924 )
1925 .await?;
1926 }
1927 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1928 batch.push(m);
1929 }
1930 } else {
1931 self.execute_create_pattern(
1932 &path_pattern,
1933 &mut row,
1934 writer,
1935 prop_manager,
1936 params,
1937 ctx,
1938 tx_l0_override,
1939 )
1940 .await?;
1941 if let Some(set) = on_create {
1942 self.execute_set_items_locked(
1943 &set.items,
1944 &mut row,
1945 writer,
1946 prop_manager,
1947 params,
1948 ctx,
1949 tx_l0_override,
1950 &Prefetch::default(),
1951 )
1952 .await?;
1953 }
1954 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1955 batch.push(row);
1956 }
1957 Ok(batch)
1958 }
1959 .await;
1960
1961 results.extend(result?);
1962 }
1963 Ok(results)
1964 }
1965
1966 #[expect(clippy::too_many_arguments)]
1968 pub(crate) async fn execute_create_pattern(
1969 &self,
1970 pattern: &Pattern,
1971 row: &mut HashMap<String, Value>,
1972 writer: &Writer,
1973 prop_manager: &PropertyManager,
1974 params: &HashMap<String, Value>,
1975 ctx: Option<&QueryContext>,
1976 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1977 ) -> Result<()> {
1978 for path in &pattern.paths {
1979 let mut prev_vid: Option<Vid> = None;
1980 type PendingRel = (String, u32, String, Option<Expr>, Direction);
1982 let mut rel_pending: Option<PendingRel> = None;
1983
1984 for element in &path.elements {
1985 match element {
1986 PatternElement::Node(n) => {
1987 let mut vid = None;
1988
1989 if let Some(var) = &n.variable
1991 && let Some(val) = row.get(var)
1992 && let Ok(existing_vid) = Self::vid_from_value(val)
1993 {
1994 vid = Some(existing_vid);
1995 }
1996
1997 if vid.is_none() {
1999 let mut props = HashMap::new();
2000 if let Some(props_expr) = &n.properties {
2001 let props_val = self
2002 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
2003 .await?;
2004 if let Value::Map(map) = props_val {
2005 for (k, v) in map {
2006 props.insert(k, v);
2007 }
2008 } else {
2009 return Err(anyhow!("Properties must evaluate to a map"));
2010 }
2011 }
2012
2013 let schema = self.storage.schema_manager().schema();
2014
2015 if self.config.strict_schema {
2017 for label_name in &n.labels {
2018 if schema.get_label_case_insensitive(label_name).is_none() {
2019 return Err(anyhow!(
2020 "Label '{}' is not defined in the schema \
2021 (strict_schema is enabled). \
2022 Declare it with db.schema().label(...).apply() first.",
2023 label_name
2024 ));
2025 }
2026 }
2027 }
2028
2029 let new_vid = match &self.id_reservoir {
2034 Some(r) => r.next_vid().await?,
2035 None => writer.next_vid().await?,
2036 };
2037
2038 for label_name in &n.labels {
2040 if schema.get_label_case_insensitive(label_name).is_some() {
2041 self.enrich_properties_with_generated_columns(
2042 label_name,
2043 &mut props,
2044 prop_manager,
2045 params,
2046 ctx,
2047 )
2048 .await?;
2049 }
2050 }
2051
2052 let props = Self::coerce_and_validate_props(props, &schema, &n.labels)?;
2056
2057 let final_props = writer
2059 .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
2060 .await?;
2061
2062 if let Some(var) = &n.variable {
2064 let mut obj = HashMap::new();
2065 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
2066 let labels_list: Vec<Value> =
2067 n.labels.iter().map(|l| Value::String(l.clone())).collect();
2068 obj.insert("_labels".to_string(), Value::List(labels_list));
2069 for (k, v) in &final_props {
2070 obj.insert(k.clone(), v.clone());
2071 }
2072 row.insert(var.clone(), Value::Map(obj));
2074 }
2075 vid = Some(new_vid);
2076 }
2077
2078 let current_vid = vid.unwrap();
2079
2080 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
2081 rel_pending.take()
2082 && let Some(src) = prev_vid
2083 {
2084 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
2085
2086 if !is_rel_bound {
2087 let mut rel_props = HashMap::new();
2088 if let Some(expr) = rel_props_expr {
2089 let val = self
2090 .evaluate_expr(&expr, row, prop_manager, params, ctx)
2091 .await?;
2092 if let Value::Map(map) = val {
2093 rel_props.extend(map);
2094 }
2095 }
2096 let edge_schema = self.storage.schema_manager().schema();
2099 let rel_props = Self::coerce_and_validate_props(
2100 rel_props,
2101 &edge_schema,
2102 std::slice::from_ref(&type_name),
2103 )?;
2104 let eid = match &self.id_reservoir {
2105 Some(r) => r.next_eid().await?,
2106 None => writer.next_eid(type_id).await?,
2107 };
2108
2109 let (edge_src, edge_dst) = match dir {
2111 Direction::Incoming => (current_vid, src),
2112 _ => (src, current_vid),
2113 };
2114
2115 let store_props = !rel_var.is_empty();
2116 let user_props = if store_props {
2117 rel_props.clone()
2118 } else {
2119 HashMap::new()
2120 };
2121
2122 writer
2123 .insert_edge(
2124 edge_src,
2125 edge_dst,
2126 type_id,
2127 eid,
2128 rel_props,
2129 Some(type_name.clone()),
2130 tx_l0,
2131 )
2132 .await?;
2133
2134 if store_props {
2137 let mut edge_map = HashMap::new();
2138 edge_map.insert(
2139 "_eid".to_string(),
2140 Value::Int(eid.as_u64() as i64),
2141 );
2142 edge_map.insert(
2143 "_src".to_string(),
2144 Value::Int(edge_src.as_u64() as i64),
2145 );
2146 edge_map.insert(
2147 "_dst".to_string(),
2148 Value::Int(edge_dst.as_u64() as i64),
2149 );
2150 edge_map
2151 .insert("_type".to_string(), Value::Int(type_id as i64));
2152 for (k, v) in user_props {
2154 edge_map.insert(k, v);
2155 }
2156 row.insert(rel_var, Value::Map(edge_map));
2157 }
2158 }
2159 }
2160 prev_vid = Some(current_vid);
2161 }
2162 PatternElement::Relationship(r) => {
2163 if r.types.len() != 1 {
2164 return Err(anyhow!(
2165 "CREATE relationship must specify exactly one type"
2166 ));
2167 }
2168 let type_name = &r.types[0];
2169 let type_id = if self.config.strict_schema {
2170 let schema = self.storage.schema_manager().schema();
2171 schema
2172 .edge_type_id_by_name_case_insensitive(type_name)
2173 .ok_or_else(|| {
2174 anyhow!(
2175 "Edge type '{}' is not defined in the schema \
2176 (strict_schema is enabled). \
2177 Declare it with db.schema().edge_type(...).apply() first.",
2178 type_name
2179 )
2180 })?
2181 } else {
2182 self.storage
2184 .schema_manager()
2185 .get_or_assign_edge_type_id(type_name)
2186 };
2187
2188 rel_pending = Some((
2189 r.variable.clone().unwrap_or_default(),
2190 type_id,
2191 type_name.clone(),
2192 r.properties.clone(),
2193 r.direction.clone(),
2194 ));
2195 }
2196 PatternElement::Parenthesized { .. } => {
2197 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
2198 }
2199 }
2200 }
2201 }
2202 Ok(())
2203 }
2204
2205 fn validate_structural_property_value(prop_name: &str, val: &Value) -> Result<()> {
2214 match val {
2215 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
2216 anyhow::bail!(
2217 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2218 prop_name
2219 );
2220 }
2221 Value::List(items) => {
2222 for item in items {
2223 if matches!(
2224 item,
2225 Value::Map(_)
2226 | Value::Node(_)
2227 | Value::Edge(_)
2228 | Value::Path(_)
2229 | Value::List(_)
2230 ) {
2231 anyhow::bail!(
2232 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
2233 prop_name
2234 );
2235 }
2236 }
2237 }
2238 _ => {}
2239 }
2240 Ok(())
2241 }
2242
2243 fn coerce_and_validate_property_value(
2264 prop_name: &str,
2265 val: Value,
2266 schema: &uni_common::core::schema::Schema,
2267 labels: &[String],
2268 ) -> Result<Value> {
2269 use uni_common::core::schema::DataType;
2270
2271 let declared = labels.iter().find_map(|label| {
2273 schema
2274 .properties
2275 .get(label)
2276 .and_then(|props| props.get(prop_name))
2277 .map(|meta| &meta.r#type)
2278 });
2279
2280 if matches!(declared, Some(DataType::CypherValue)) {
2282 return Ok(val);
2283 }
2284
2285 let Some(dt) = declared else {
2286 Self::validate_structural_property_value(prop_name, &val)?;
2289 return Ok(val);
2290 };
2291
2292 if dt.accepts(&val) {
2296 return Ok(val);
2297 }
2298
2299 if matches!(val, Value::String(_)) {
2302 let ctor = match dt {
2303 DataType::DateTime => Some("DATETIME"),
2304 DataType::Date => Some("DATE"),
2305 DataType::Time => Some("TIME"),
2306 DataType::Duration => Some("DURATION"),
2307 _ => None,
2308 };
2309 if let Some(name) = ctor {
2310 return uni_query_functions::datetime::eval_datetime_function(
2311 name,
2312 std::slice::from_ref(&val),
2313 )
2314 .map_err(|e| {
2315 anyhow!(
2316 "TypeError: property '{}' is declared {:?} but the string value could \
2317 not be parsed as a {} literal: {}",
2318 prop_name,
2319 dt,
2320 name,
2321 e
2322 )
2323 });
2324 }
2325 }
2326
2327 Self::validate_structural_property_value(prop_name, &val)?;
2331 anyhow::bail!(
2332 "TypeError: property '{}' is declared {:?} but got an incompatible value of type {}",
2333 prop_name,
2334 dt,
2335 value_type_name(&val)
2336 );
2337 }
2338
2339 fn coerce_and_validate_props(
2349 props: HashMap<String, Value>,
2350 schema: &uni_common::core::schema::Schema,
2351 labels: &[String],
2352 ) -> Result<HashMap<String, Value>> {
2353 let mut out = HashMap::with_capacity(props.len());
2354 for (k, v) in props {
2355 let cv = Self::coerce_and_validate_property_value(&k, v, schema, labels)?;
2356 out.insert(k, cv);
2357 }
2358 Ok(out)
2359 }
2360
2361 #[expect(clippy::too_many_arguments)]
2362 pub(crate) async fn execute_set_items_locked(
2363 &self,
2364 items: &[SetItem],
2365 row: &mut HashMap<String, Value>,
2366 writer: &Writer,
2367 prop_manager: &PropertyManager,
2368 params: &HashMap<String, Value>,
2369 ctx: Option<&QueryContext>,
2370 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2371 prefetched: &Prefetch,
2372 ) -> Result<()> {
2373 let mut pending_v: HashMap<String, PendingVertexSet> = HashMap::new();
2393 let mut pending_e: HashMap<String, PendingEdgeSet> = HashMap::new();
2394
2395 for item in items {
2396 match item {
2397 SetItem::Property { expr, value } => {
2398 if let Expr::Property(var_expr, prop_name) = expr
2399 && let Expr::Variable(var_name) = &**var_expr
2400 && let Some(node_val) = row.get(var_name)
2401 {
2402 if let Ok(vid) = Self::vid_from_value(node_val) {
2403 reject_if_ephemeral_vid(vid)?;
2404 let labels =
2405 Self::extract_labels_from_node(node_val).unwrap_or_default();
2406 let schema = self.storage.schema_manager().schema().clone();
2407
2408 if !pending_v.contains_key(var_name) {
2424 let storage_cfg = &self.storage.config;
2425 let partial = storage_cfg.partial_lance_writes;
2426 let read = read_vertex_props_with_prefetch(
2427 vid,
2428 prefetched,
2429 prop_manager,
2430 ctx,
2431 )
2432 .await?;
2433 pending_v.insert(
2434 var_name.clone(),
2435 PendingVertexSet {
2436 vid,
2437 labels: labels.clone(),
2438 props: read,
2439 partial,
2440 touched: HashSet::new(),
2441 },
2442 );
2443 }
2444
2445 let val = self
2446 .evaluate_expr(value, row, prop_manager, params, ctx)
2447 .await?;
2448 let val = Self::coerce_and_validate_property_value(
2449 prop_name, val, &schema, &labels,
2450 )?;
2451
2452 let pv = pending_v
2453 .get_mut(var_name)
2454 .expect("inserted above when absent");
2455 pv.props.insert(prop_name.clone(), val.clone());
2456 if pv.partial {
2457 pv.touched.insert(prop_name.clone());
2458 }
2459
2460 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
2462 node_map.insert(prop_name.clone(), val);
2463 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
2464 node.properties.insert(prop_name.clone(), val);
2465 }
2466 } else if let Value::Map(map) = node_val
2467 && map.get("_eid").is_some_and(|v| !v.is_null())
2468 && map.get("_src").is_some_and(|v| !v.is_null())
2469 && map.get("_dst").is_some_and(|v| !v.is_null())
2470 && (map.get("_type").is_some_and(|v| !v.is_null())
2471 || map.get("_type_name").is_some_and(|v| !v.is_null()))
2472 {
2473 let ei = self.extract_edge_identity(map)?;
2474 reject_if_ephemeral_eid(ei.eid)?;
2475 let schema = self.storage.schema_manager().schema().clone();
2476 let type_val = map.get("_type").or_else(|| map.get("_type_name"));
2480 let edge_type_name = match type_val {
2481 Some(Value::String(s)) => s.clone(),
2482 Some(Value::Int(id)) => schema
2483 .edge_type_name_by_id_unified(*id as u32)
2484 .unwrap_or_else(|| format!("EdgeType{}", id)),
2485 _ => String::new(),
2486 };
2487
2488 if !pending_e.contains_key(var_name) {
2489 let initial = read_edge_props_with_prefetch(
2490 ei.eid,
2491 prefetched,
2492 prop_manager,
2493 ctx,
2494 )
2495 .await?;
2496 let partial = self.storage.config.partial_lance_writes;
2497 pending_e.insert(
2498 var_name.clone(),
2499 PendingEdgeSet {
2500 src: ei.src,
2501 dst: ei.dst,
2502 edge_type_id: ei.edge_type_id,
2503 eid: ei.eid,
2504 edge_type_name: edge_type_name.clone(),
2505 props: initial,
2506 partial,
2507 touched: HashSet::new(),
2508 },
2509 );
2510 }
2511
2512 let val = self
2513 .evaluate_expr(value, row, prop_manager, params, ctx)
2514 .await?;
2515 let val = Self::coerce_and_validate_property_value(
2516 prop_name,
2517 val,
2518 &schema,
2519 std::slice::from_ref(&edge_type_name),
2520 )?;
2521
2522 let pe = pending_e
2523 .get_mut(var_name)
2524 .expect("inserted above when absent");
2525 pe.props.insert(prop_name.clone(), val.clone());
2526 if pe.partial {
2527 pe.touched.insert(prop_name.clone());
2528 }
2529
2530 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
2532 edge_map.insert(prop_name.clone(), val);
2533 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2534 edge.properties.insert(prop_name.clone(), val);
2535 }
2536 } else if let Value::Edge(edge) = node_val {
2537 reject_if_ephemeral_eid(edge.eid)?;
2539 let eid = edge.eid;
2540 let src = edge.src;
2541 let dst = edge.dst;
2542 let edge_type_name = edge.edge_type.clone();
2543 let etype =
2544 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
2545 let schema = self.storage.schema_manager().schema().clone();
2546
2547 if !pending_e.contains_key(var_name) {
2548 let initial = read_edge_props_with_prefetch(
2549 eid,
2550 prefetched,
2551 prop_manager,
2552 ctx,
2553 )
2554 .await?;
2555 let partial = self.storage.config.partial_lance_writes;
2556 pending_e.insert(
2557 var_name.clone(),
2558 PendingEdgeSet {
2559 src,
2560 dst,
2561 edge_type_id: etype,
2562 eid,
2563 edge_type_name: edge_type_name.clone(),
2564 props: initial,
2565 partial,
2566 touched: HashSet::new(),
2567 },
2568 );
2569 }
2570
2571 let val = self
2572 .evaluate_expr(value, row, prop_manager, params, ctx)
2573 .await?;
2574 let val = Self::coerce_and_validate_property_value(
2575 prop_name,
2576 val,
2577 &schema,
2578 std::slice::from_ref(&edge_type_name),
2579 )?;
2580
2581 let pe = pending_e
2582 .get_mut(var_name)
2583 .expect("inserted above when absent");
2584 pe.props.insert(prop_name.clone(), val.clone());
2585 if pe.partial {
2586 pe.touched.insert(prop_name.clone());
2587 }
2588
2589 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2591 edge.properties.insert(prop_name.clone(), val);
2592 }
2593 }
2594 }
2595 }
2596 SetItem::Labels { variable, labels } => {
2597 self.flush_pending_var(
2601 variable,
2602 &mut pending_v,
2603 &mut pending_e,
2604 writer,
2605 prop_manager,
2606 params,
2607 ctx,
2608 tx_l0,
2609 prefetched,
2610 )
2611 .await?;
2612
2613 if let Some(node_val) = row.get(variable)
2614 && let Ok(vid) = Self::vid_from_value(node_val)
2615 {
2616 reject_if_ephemeral_vid(vid)?;
2617 let registry = self
2618 .procedure_registry
2619 .as_ref()
2620 .and_then(|pr| pr.plugin_registry());
2621 reject_virtual_label_write(registry.as_ref(), labels, "SET")?;
2622
2623 let current_labels =
2625 Self::extract_labels_from_node(node_val).unwrap_or_default();
2626
2627 let labels_to_add: Vec<_> = labels
2629 .iter()
2630 .filter(|l| !current_labels.contains(l))
2631 .cloned()
2632 .collect();
2633
2634 if !labels_to_add.is_empty() {
2635 let mut new_labels = current_labels;
2641 new_labels.extend(labels_to_add);
2642 if let Some(ctx) = ctx {
2643 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
2644 l0.write().set_vertex_labels(vid, &new_labels);
2645 }
2646
2647 if let Some(Value::Map(obj)) = row.get_mut(variable) {
2649 let labels_list =
2650 new_labels.into_iter().map(Value::String).collect();
2651 obj.insert("_labels".to_string(), Value::List(labels_list));
2652 }
2653 }
2654 }
2655 }
2656 SetItem::Variable { variable, value }
2657 | SetItem::VariablePlus { variable, value } => {
2658 self.flush_pending_var(
2661 variable,
2662 &mut pending_v,
2663 &mut pending_e,
2664 writer,
2665 prop_manager,
2666 params,
2667 ctx,
2668 tx_l0,
2669 prefetched,
2670 )
2671 .await?;
2672
2673 let replace = matches!(item, SetItem::Variable { .. });
2674 let op_str = if replace { "=" } else { "+=" };
2675
2676 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
2678 continue;
2679 }
2680 let rhs = self
2681 .evaluate_expr(value, row, prop_manager, params, ctx)
2682 .await?;
2683 let new_props =
2684 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
2685 anyhow!(
2686 "SET {} {} expr: right-hand side must evaluate to a map, \
2687 node, or relationship",
2688 variable,
2689 op_str
2690 )
2691 })?;
2692 self.apply_properties_to_entity(
2693 variable,
2694 new_props,
2695 replace,
2696 row,
2697 writer,
2698 prop_manager,
2699 params,
2700 ctx,
2701 tx_l0,
2702 prefetched,
2703 )
2704 .await?;
2705 }
2706 }
2707 }
2708
2709 for (_var_name, mut pv) in pending_v {
2717 if pv.partial {
2718 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
2724 for label_name in &pv.labels {
2725 self.enrich_properties_with_generated_columns(
2726 label_name,
2727 &mut pv.props,
2728 prop_manager,
2729 params,
2730 ctx,
2731 )
2732 .await?;
2733 }
2734 for k in pv.props.keys() {
2735 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
2736 pv.touched.insert(k.clone());
2737 }
2738 }
2739 writer
2740 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
2741 .await?;
2742 } else {
2743 for label_name in &pv.labels {
2744 self.enrich_properties_with_generated_columns(
2745 label_name,
2746 &mut pv.props,
2747 prop_manager,
2748 params,
2749 ctx,
2750 )
2751 .await?;
2752 }
2753 let _ = writer
2754 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
2755 .await?;
2756 }
2757 }
2758 for (_var_name, pe) in pending_e {
2759 if pe.partial {
2760 writer
2761 .insert_edge_partial_full(
2762 pe.src,
2763 pe.dst,
2764 pe.edge_type_id,
2765 pe.eid,
2766 pe.props,
2767 Some(pe.edge_type_name),
2768 pe.touched,
2769 tx_l0,
2770 )
2771 .await?;
2772 } else {
2773 writer
2774 .insert_edge(
2775 pe.src,
2776 pe.dst,
2777 pe.edge_type_id,
2778 pe.eid,
2779 pe.props,
2780 Some(pe.edge_type_name),
2781 tx_l0,
2782 )
2783 .await?;
2784 }
2785 }
2786
2787 Ok(())
2788 }
2789
2790 #[expect(clippy::too_many_arguments)]
2796 async fn flush_pending_var(
2797 &self,
2798 var: &str,
2799 pending_v: &mut HashMap<String, PendingVertexSet>,
2800 pending_e: &mut HashMap<String, PendingEdgeSet>,
2801 writer: &Writer,
2802 prop_manager: &PropertyManager,
2803 _params: &HashMap<String, Value>,
2804 ctx: Option<&QueryContext>,
2805 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2806 _prefetched: &Prefetch,
2807 ) -> Result<()> {
2808 if let Some(mut pv) = pending_v.remove(var) {
2809 if pv.partial {
2810 let pre_keys: HashSet<String> = pv.props.keys().cloned().collect();
2811 for label_name in &pv.labels {
2812 self.enrich_properties_with_generated_columns(
2813 label_name,
2814 &mut pv.props,
2815 prop_manager,
2816 _params,
2817 ctx,
2818 )
2819 .await?;
2820 }
2821 for k in pv.props.keys() {
2822 if !pre_keys.contains(k) || self.is_generated_key(&pv.labels, k) {
2823 pv.touched.insert(k.clone());
2824 }
2825 }
2826 writer
2827 .insert_vertex_partial_full(pv.vid, pv.props, pv.touched, &pv.labels, tx_l0)
2828 .await?;
2829 } else {
2830 for label_name in &pv.labels {
2831 self.enrich_properties_with_generated_columns(
2832 label_name,
2833 &mut pv.props,
2834 prop_manager,
2835 _params,
2836 ctx,
2837 )
2838 .await?;
2839 }
2840 let _ = writer
2841 .insert_vertex_with_labels(pv.vid, pv.props, &pv.labels, tx_l0)
2842 .await?;
2843 }
2844 }
2845 if let Some(pe) = pending_e.remove(var) {
2846 if pe.partial {
2847 writer
2848 .insert_edge_partial_full(
2849 pe.src,
2850 pe.dst,
2851 pe.edge_type_id,
2852 pe.eid,
2853 pe.props,
2854 Some(pe.edge_type_name),
2855 pe.touched,
2856 tx_l0,
2857 )
2858 .await?;
2859 } else {
2860 writer
2861 .insert_edge(
2862 pe.src,
2863 pe.dst,
2864 pe.edge_type_id,
2865 pe.eid,
2866 pe.props,
2867 Some(pe.edge_type_name),
2868 tx_l0,
2869 )
2870 .await?;
2871 }
2872 }
2873 Ok(())
2874 }
2875
2876 #[expect(clippy::too_many_arguments)]
2884 pub(crate) async fn execute_remove_items_locked(
2885 &self,
2886 items: &[RemoveItem],
2887 row: &mut HashMap<String, Value>,
2888 writer: &Writer,
2889 prop_manager: &PropertyManager,
2890 ctx: Option<&QueryContext>,
2891 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2892 prefetched: &Prefetch,
2893 ) -> Result<()> {
2894 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
2897
2898 for item in items {
2899 match item {
2900 RemoveItem::Property(expr) => {
2901 if let Expr::Property(var_expr, prop_name) = expr
2902 && let Expr::Variable(var_name) = &**var_expr
2903 {
2904 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
2905 entry.1.push(prop_name.clone());
2906 } else {
2907 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
2908 }
2909 }
2910 }
2911 RemoveItem::Labels { variable, labels } => {
2912 self.execute_remove_labels(variable, labels, row, ctx)?;
2913 }
2914 }
2915 }
2916
2917 for (var_name, prop_names) in &prop_removals {
2919 let Some(node_val) = row.get(var_name) else {
2920 continue;
2921 };
2922
2923 if let Ok(vid) = Self::vid_from_value(node_val) {
2924 let mut props =
2926 read_vertex_props_with_prefetch(vid, prefetched, prop_manager, ctx).await?;
2927
2928 let removed_count = prop_names
2930 .iter()
2931 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2932 .count();
2933 let any_exist = removed_count > 0;
2934 if any_exist {
2935 writer.track_properties_removed(removed_count, tx_l0);
2936 for prop_name in prop_names {
2937 props.insert(prop_name.clone(), Value::Null);
2938 }
2939 }
2940 let effective: HashMap<String, Value> = props
2942 .iter()
2943 .filter(|(_, v)| !v.is_null())
2944 .map(|(k, v)| (k.clone(), v.clone()))
2945 .collect();
2946 if any_exist {
2947 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2948 let _ = writer
2949 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
2950 .await?;
2951 }
2952
2953 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
2955 for prop_name in prop_names {
2956 node_map.insert(prop_name.clone(), Value::Null);
2957 }
2958 node_map.insert("_all_props".to_string(), Value::Map(effective));
2960 }
2961 } else if let Value::Map(map) = node_val {
2962 let mut edge_effective: Option<HashMap<String, Value>> = None;
2965 if map.get("_eid").is_some_and(|v| !v.is_null()) {
2966 let ei = self.extract_edge_identity(map)?;
2967 let mut props =
2968 read_edge_props_with_prefetch(ei.eid, prefetched, prop_manager, ctx)
2969 .await?;
2970
2971 let removed_count = prop_names
2972 .iter()
2973 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2974 .count();
2975 let any_exist = removed_count > 0;
2976 if any_exist {
2977 writer.track_properties_removed(removed_count, tx_l0);
2978 for prop_name in prop_names {
2979 props.insert(prop_name.to_string(), Value::Null);
2980 }
2981 }
2982 edge_effective = Some(
2984 props
2985 .iter()
2986 .filter(|(_, v)| !v.is_null())
2987 .map(|(k, v)| (k.clone(), v.clone()))
2988 .collect(),
2989 );
2990 if any_exist {
2991 let edge_type_name = map
2992 .get("_type")
2993 .and_then(|v| v.as_str())
2994 .map(|s| s.to_string())
2995 .or_else(|| {
2996 self.storage
2997 .schema_manager()
2998 .edge_type_name_by_id_unified(ei.edge_type_id)
2999 });
3000 writer
3001 .insert_edge(
3002 ei.src,
3003 ei.dst,
3004 ei.edge_type_id,
3005 ei.eid,
3006 props,
3007 edge_type_name,
3008 tx_l0,
3009 )
3010 .await?;
3011 }
3012 }
3013
3014 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
3015 for prop_name in prop_names {
3016 edge_map.insert(prop_name.clone(), Value::Null);
3017 }
3018 if let Some(effective) = edge_effective {
3019 edge_map.insert("_all_props".to_string(), Value::Map(effective));
3020 }
3021 }
3022 } else if let Value::Edge(edge) = node_val {
3023 let eid = edge.eid;
3025 let src = edge.src;
3026 let dst = edge.dst;
3027 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
3028
3029 let mut props =
3030 read_edge_props_with_prefetch(eid, prefetched, prop_manager, ctx).await?;
3031
3032 let removed_count = prop_names
3033 .iter()
3034 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
3035 .count();
3036 if removed_count > 0 {
3037 writer.track_properties_removed(removed_count, tx_l0);
3038 for prop_name in prop_names {
3039 props.insert(prop_name.to_string(), Value::Null);
3040 }
3041 writer
3042 .insert_edge(
3043 src,
3044 dst,
3045 etype,
3046 eid,
3047 props,
3048 Some(edge.edge_type.clone()),
3049 tx_l0,
3050 )
3051 .await?;
3052 }
3053
3054 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
3055 for prop_name in prop_names {
3056 edge.properties.insert(prop_name.to_string(), Value::Null);
3057 }
3058 }
3059 }
3060 }
3061
3062 Ok(())
3063 }
3064
3065 pub(crate) fn execute_remove_labels(
3067 &self,
3068 variable: &str,
3069 labels: &[String],
3070 row: &mut HashMap<String, Value>,
3071 ctx: Option<&QueryContext>,
3072 ) -> Result<()> {
3073 if let Some(node_val) = row.get(variable)
3074 && let Ok(vid) = Self::vid_from_value(node_val)
3075 {
3076 reject_if_ephemeral_vid(vid)?;
3077 let registry = self
3078 .procedure_registry
3079 .as_ref()
3080 .and_then(|pr| pr.plugin_registry());
3081 reject_virtual_label_write(registry.as_ref(), labels, "REMOVE")?;
3082
3083 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
3085
3086 let labels_to_remove: Vec<_> = labels
3088 .iter()
3089 .filter(|l| current_labels.contains(l))
3090 .collect();
3091
3092 if !labels_to_remove.is_empty() {
3093 let remaining_labels: Vec<String> = current_labels
3097 .iter()
3098 .filter(|l| !labels_to_remove.contains(l))
3099 .cloned()
3100 .collect();
3101 if let Some(ctx) = ctx {
3102 let l0 = ctx.transaction_l0.as_ref().unwrap_or(&ctx.l0);
3103 l0.write().set_vertex_labels(vid, &remaining_labels);
3104 }
3105
3106 if let Some(Value::Map(obj)) = row.get_mut(variable) {
3108 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
3109 obj.insert("_labels".to_string(), Value::List(labels_list));
3110 }
3111 }
3112 }
3113 Ok(())
3114 }
3115
3116 fn resolve_edge_type_id_for_edge(
3119 &self,
3120 edge: &crate::types::Edge,
3121 writer: &Writer,
3122 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3123 ) -> Result<u32> {
3124 if !edge.edge_type.is_empty() {
3125 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
3126 }
3127 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
3130 return Ok(etype);
3131 }
3132 Err(anyhow!(
3133 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
3134 edge.eid
3135 ))
3136 }
3137
3138 pub(crate) async fn execute_delete_item_locked(
3140 &self,
3141 val: &Value,
3142 detach: bool,
3143 writer: &Writer,
3144 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3145 ) -> Result<()> {
3146 match val {
3147 Value::Null => {
3148 }
3150 Value::Path(path) => {
3151 for edge in &path.edges {
3153 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3154 writer
3155 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3156 .await?;
3157 }
3158 for node in &path.nodes {
3159 self.execute_delete_vertex(
3160 node.vid,
3161 detach,
3162 Some(node.labels.clone()),
3163 writer,
3164 tx_l0,
3165 )
3166 .await?;
3167 }
3168 }
3169 _ => {
3170 if let Ok(path) = Path::try_from(val) {
3172 for edge in &path.edges {
3173 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3174 writer
3175 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3176 .await?;
3177 }
3178 for node in &path.nodes {
3179 self.execute_delete_vertex(
3180 node.vid,
3181 detach,
3182 Some(node.labels.clone()),
3183 writer,
3184 tx_l0,
3185 )
3186 .await?;
3187 }
3188 } else if let Ok(vid) = Self::vid_from_value(val) {
3189 let labels = Self::extract_labels_from_node(val);
3190 self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
3191 .await?;
3192 } else if let Value::Map(map) = val {
3193 self.execute_delete_edge_from_map(map, writer, tx_l0)
3194 .await?;
3195 } else if let Value::Edge(edge) = val {
3196 reject_if_ephemeral_eid(edge.eid)?;
3197 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
3198 let registry = self
3199 .procedure_registry
3200 .as_ref()
3201 .and_then(|pr| pr.plugin_registry());
3202 reject_virtual_edge_type_write(registry.as_ref(), etype, "DELETE")?;
3203 writer
3204 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
3205 .await?;
3206 }
3207 }
3208 }
3209 Ok(())
3210 }
3211
3212 pub(crate) async fn execute_delete_vertex(
3214 &self,
3215 vid: Vid,
3216 detach: bool,
3217 labels: Option<Vec<String>>,
3218 writer: &Writer,
3219 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3220 ) -> Result<()> {
3221 reject_if_ephemeral_vid(vid)?;
3222 if let Some(ls) = labels.as_deref() {
3223 let registry = self
3224 .procedure_registry
3225 .as_ref()
3226 .and_then(|pr| pr.plugin_registry());
3227 reject_virtual_label_write(registry.as_ref(), ls, "DELETE")?;
3228 }
3229 if detach {
3230 self.detach_delete_vertex(vid, writer, tx_l0).await?;
3231 } else {
3232 self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
3233 }
3234 writer.delete_vertex(vid, labels, tx_l0).await?;
3235 Ok(())
3236 }
3237
3238 pub(crate) async fn check_vertex_has_no_edges(
3244 &self,
3245 vid: Vid,
3246 writer: &Writer,
3247 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3248 ) -> Result<()> {
3249 let schema = self.storage.schema_manager().schema();
3250 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
3251
3252 let mut tombstoned_eids = std::collections::HashSet::new();
3254 {
3255 let writer_l0 = writer.l0_manager.get_current();
3256 let guard = writer_l0.read();
3257 for &eid in guard.tombstones.keys() {
3258 tombstoned_eids.insert(eid);
3259 }
3260 }
3261 if let Some(tx) = tx_l0 {
3262 let guard = tx.read();
3263 for &eid in guard.tombstones.keys() {
3264 tombstoned_eids.insert(eid);
3265 }
3266 }
3267
3268 let out_graph = self
3269 .storage
3270 .load_subgraph_cached(
3271 &[vid],
3272 &edge_type_ids,
3273 1,
3274 uni_store::runtime::Direction::Outgoing,
3275 Some(writer.l0_manager.get_current()),
3276 )
3277 .await?;
3278 let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3279
3280 let in_graph = self
3281 .storage
3282 .load_subgraph_cached(
3283 &[vid],
3284 &edge_type_ids,
3285 1,
3286 uni_store::runtime::Direction::Incoming,
3287 Some(writer.l0_manager.get_current()),
3288 )
3289 .await?;
3290 let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
3291
3292 if has_out || has_in {
3293 return Err(anyhow!(
3294 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
3295 vid
3296 ));
3297 }
3298 Ok(())
3299 }
3300
3301 pub(crate) async fn execute_delete_edge_from_map(
3303 &self,
3304 map: &HashMap<String, Value>,
3305 writer: &Writer,
3306 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3307 ) -> Result<()> {
3308 if map.get("_eid").is_some_and(|v| !v.is_null()) {
3310 let ei = self.extract_edge_identity(map)?;
3311 reject_if_ephemeral_eid(ei.eid)?;
3312 let registry = self
3313 .procedure_registry
3314 .as_ref()
3315 .and_then(|pr| pr.plugin_registry());
3316 reject_virtual_edge_type_write(registry.as_ref(), ei.edge_type_id, "DELETE")?;
3317 writer
3318 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
3319 .await?;
3320 }
3321 Ok(())
3322 }
3323
3324 fn make_scan_plan(
3330 label_id: u16,
3331 labels: Vec<String>,
3332 variable: String,
3333 filter: Option<Expr>,
3334 ) -> LogicalPlan {
3335 if label_id > 0 {
3336 LogicalPlan::Scan {
3337 label_id,
3338 labels,
3339 variable,
3340 filter,
3341 optional: false,
3342 }
3343 } else if !labels.is_empty() {
3344 LogicalPlan::ScanMainByLabels {
3346 labels,
3347 variable,
3348 filter,
3349 optional: false,
3350 }
3351 } else {
3352 LogicalPlan::ScanAll {
3353 variable,
3354 filter,
3355 optional: false,
3356 }
3357 }
3358 }
3359
3360 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
3363 if matches!(plan, LogicalPlan::Empty) {
3364 scan
3365 } else {
3366 LogicalPlan::CrossJoin {
3367 left: Box::new(plan),
3368 right: Box::new(scan),
3369 }
3370 }
3371 }
3372
3373 async fn resolve_merge_properties(
3380 &self,
3381 properties: &Option<Expr>,
3382 row: &HashMap<String, Value>,
3383 prop_manager: &PropertyManager,
3384 params: &HashMap<String, Value>,
3385 ctx: Option<&QueryContext>,
3386 ) -> Result<Option<Expr>> {
3387 let entries = match properties {
3388 Some(Expr::Map(entries)) => entries,
3389 other => return Ok(other.clone()),
3390 };
3391 let mut resolved = Vec::new();
3392 for (key, val_expr) in entries {
3393 if matches!(val_expr, Expr::Literal(_)) {
3394 resolved.push((key.clone(), val_expr.clone()));
3395 } else {
3396 let value = self
3397 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
3398 .await?;
3399 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
3400 }
3401 }
3402 Ok(Some(Expr::Map(resolved)))
3403 }
3404
3405 fn value_to_literal_expr(value: &Value) -> Expr {
3407 match value {
3408 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
3409 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
3410 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
3411 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
3412 Value::Null => Expr::Literal(CypherLiteral::Null),
3413 Value::List(items) => {
3414 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
3415 }
3416 Value::Map(entries) => Expr::Map(
3417 entries
3418 .iter()
3419 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
3420 .collect(),
3421 ),
3422 _ => Expr::Literal(CypherLiteral::Null),
3423 }
3424 }
3425
3426 pub(crate) async fn execute_merge_match(
3427 &self,
3428 pattern: &Pattern,
3429 row: &HashMap<String, Value>,
3430 prop_manager: &PropertyManager,
3431 params: &HashMap<String, Value>,
3432 ctx: Option<&QueryContext>,
3433 ) -> Result<Vec<HashMap<String, Value>>> {
3434 let planner =
3436 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
3437
3438 let mut plan = LogicalPlan::Empty;
3443 let mut vars_in_scope = Vec::new();
3444
3445 for key in row.keys() {
3447 vars_in_scope.push(key.clone());
3448 }
3449
3450 for path in &pattern.paths {
3452 let elements = &path.elements;
3453 let mut i = 0;
3454 while i < elements.len() {
3455 let part = &elements[i];
3456 match part {
3457 PatternElement::Node(n) => {
3458 let variable = n.variable.clone().unwrap_or_default();
3459
3460 let is_bound = !variable.is_empty() && row.contains_key(&variable);
3462
3463 if is_bound {
3464 let val = row.get(&variable).unwrap();
3467 let vid = Self::vid_from_value(val)?;
3468
3469 let extracted_labels =
3472 Self::extract_labels_from_node(val).unwrap_or_default();
3473 let label_id = {
3474 let schema = self.storage.schema_manager().schema();
3475 extracted_labels
3476 .first()
3477 .and_then(|l| schema.label_id_by_name(l))
3478 .unwrap_or(0)
3479 };
3480
3481 let resolved_props = self
3482 .resolve_merge_properties(
3483 &n.properties,
3484 row,
3485 prop_manager,
3486 params,
3487 ctx,
3488 )
3489 .await?;
3490 let prop_filter =
3491 planner.properties_to_expr(&variable, &resolved_props);
3492
3493 let vid_filter = Expr::BinaryOp {
3502 left: Box::new(Expr::Property(
3503 Box::new(Expr::Variable(variable.clone())),
3504 "_vid".to_string(),
3505 )),
3506 op: BinaryOp::Eq,
3507 right: Box::new(Expr::Literal(CypherLiteral::Integer(
3508 vid.as_u64() as i64,
3509 ))),
3510 };
3511
3512 let combined_filter = if let Some(pf) = prop_filter {
3513 Some(Expr::BinaryOp {
3514 left: Box::new(vid_filter),
3515 op: BinaryOp::And,
3516 right: Box::new(pf),
3517 })
3518 } else {
3519 Some(vid_filter)
3520 };
3521
3522 let scan = Self::make_scan_plan(
3523 label_id,
3524 extracted_labels,
3525 variable.clone(),
3526 combined_filter,
3527 );
3528 plan = Self::attach_scan(plan, scan);
3529 } else {
3530 let label_id = if n.labels.is_empty() {
3531 0
3533 } else {
3534 let label_name = &n.labels[0];
3535 let schema = self.storage.schema_manager().schema();
3536 if self.config.strict_schema {
3537 schema
3538 .get_label_case_insensitive(label_name)
3539 .map(|m| m.id)
3540 .ok_or_else(|| {
3541 anyhow!(
3542 "Label '{}' is not defined in the schema \
3543 (strict_schema is enabled). \
3544 Declare it with db.schema().label(...).apply() first.",
3545 label_name
3546 )
3547 })?
3548 } else {
3549 schema
3551 .get_label_case_insensitive(label_name)
3552 .map(|m| m.id)
3553 .unwrap_or(0)
3554 }
3555 };
3556
3557 let resolved_props = self
3558 .resolve_merge_properties(
3559 &n.properties,
3560 row,
3561 prop_manager,
3562 params,
3563 ctx,
3564 )
3565 .await?;
3566 let prop_filter =
3567 planner.properties_to_expr(&variable, &resolved_props);
3568 let scan = Self::make_scan_plan(
3569 label_id,
3570 n.labels.names().to_vec(),
3571 variable.clone(),
3572 prop_filter,
3573 );
3574 plan = Self::attach_scan(plan, scan);
3575
3576 if !n.labels.is_empty()
3583 && !variable.is_empty()
3584 && (label_id == 0 || n.labels.len() > 1)
3585 && let Some(label_filter) =
3586 planner.node_filter_expr(&variable, &n.labels, &None)
3587 {
3588 plan = LogicalPlan::Filter {
3589 input: Box::new(plan),
3590 predicate: label_filter,
3591 optional_variables: std::collections::HashSet::new(),
3592 };
3593 }
3594
3595 if !variable.is_empty() {
3596 vars_in_scope.push(variable.clone());
3597 }
3598 }
3599
3600 i += 1;
3602 while i < elements.len() {
3603 if let PatternElement::Relationship(r) = &elements[i] {
3604 let target_node_part = &elements[i + 1];
3605 if let PatternElement::Node(n_target) = target_node_part {
3606 let schema = self.storage.schema_manager().schema();
3607 let mut edge_type_ids = Vec::new();
3608
3609 if r.types.is_empty() {
3610 return Err(anyhow!("MERGE edge must have a type"));
3611 } else if r.types.len() > 1 {
3612 return Err(anyhow!(
3613 "MERGE does not support multiple edge types"
3614 ));
3615 } else {
3616 let type_name = &r.types[0];
3617 let type_id = if self.config.strict_schema {
3618 let s = self.storage.schema_manager().schema();
3619 s.edge_type_id_by_name_case_insensitive(type_name)
3620 .ok_or_else(|| {
3621 anyhow!(
3622 "Edge type '{}' is not defined in the schema \
3623 (strict_schema is enabled).",
3624 type_name
3625 )
3626 })?
3627 } else {
3628 self.storage
3630 .schema_manager()
3631 .get_or_assign_edge_type_id(type_name)
3632 };
3633 edge_type_ids.push(type_id);
3634 }
3635
3636 let target_label_id: u16 = if let Some(lbl) =
3639 n_target.labels.first()
3640 {
3641 schema
3642 .get_label_case_insensitive(lbl)
3643 .map(|m| m.id)
3644 .unwrap_or(0)
3645 } else if let Some(var) = &n_target.variable {
3646 if let Some(val) = row.get(var) {
3647 if let Some(labels) =
3649 Self::extract_labels_from_node(val)
3650 {
3651 if let Some(first_label) = labels.first() {
3652 schema
3653 .get_label_case_insensitive(first_label)
3654 .map(|m| m.id)
3655 .unwrap_or(0)
3656 } else {
3657 0
3659 }
3660 } else if Self::vid_from_value(val).is_ok() {
3661 0
3663 } else {
3664 return Err(anyhow!(
3665 "Variable {} is not a node",
3666 var
3667 ));
3668 }
3669 } else {
3670 return Err(anyhow!(
3671 "MERGE pattern node must have a label or be a bound variable"
3672 ));
3673 }
3674 } else {
3675 return Err(anyhow!(
3676 "MERGE pattern node must have a label"
3677 ));
3678 };
3679
3680 let target_variable =
3681 n_target.variable.clone().unwrap_or_default();
3682 let source_variable = match &elements[i - 1] {
3683 PatternElement::Node(n) => {
3684 n.variable.clone().unwrap_or_default()
3685 }
3686 _ => String::new(),
3687 };
3688
3689 let is_variable_length = r.range.is_some();
3690 let type_name = &r.types[0];
3691
3692 let is_schemaless = edge_type_ids.iter().all(|id| {
3698 uni_common::core::edge_type::is_schemaless_edge_type(*id)
3699 });
3700
3701 if is_schemaless {
3702 plan = LogicalPlan::TraverseMainByType {
3703 type_names: vec![type_name.clone()],
3704 input: Box::new(plan),
3705 direction: r.direction.clone(),
3706 source_variable,
3707 target_variable: target_variable.clone(),
3708 step_variable: r.variable.clone(),
3709 min_hops: r
3710 .range
3711 .as_ref()
3712 .and_then(|r| r.min)
3713 .unwrap_or(1)
3714 as usize,
3715 max_hops: r
3716 .range
3717 .as_ref()
3718 .and_then(|r| r.max)
3719 .unwrap_or(1)
3720 as usize,
3721 optional: false,
3722 target_filter: None,
3723 path_variable: None,
3724 is_variable_length,
3725 optional_pattern_vars: std::collections::HashSet::new(),
3726 scope_match_variables: std::collections::HashSet::new(),
3727 edge_filter_expr: None,
3728 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
3729 };
3730 } else {
3731 let mut edge_props = std::collections::HashSet::new();
3733 if let Some(Expr::Map(entries)) = &r.properties {
3734 for (key, _) in entries {
3735 edge_props.insert(key.clone());
3736 }
3737 }
3738 plan = LogicalPlan::Traverse {
3739 input: Box::new(plan),
3740 edge_type_ids: edge_type_ids.clone(),
3741 direction: r.direction.clone(),
3742 source_variable,
3743 target_variable: target_variable.clone(),
3744 target_label_id,
3745 step_variable: r.variable.clone(),
3746 min_hops: r
3747 .range
3748 .as_ref()
3749 .and_then(|r| r.min)
3750 .unwrap_or(1)
3751 as usize,
3752 max_hops: r
3753 .range
3754 .as_ref()
3755 .and_then(|r| r.max)
3756 .unwrap_or(1)
3757 as usize,
3758 optional: false,
3759 target_filter: None,
3760 path_variable: None,
3761 edge_properties: edge_props,
3762 is_variable_length,
3763 optional_pattern_vars: std::collections::HashSet::new(),
3764 scope_match_variables: std::collections::HashSet::new(),
3765 edge_filter_expr: None,
3766 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
3767 qpp_steps: None,
3768 };
3769 }
3770
3771 if r.properties.is_some()
3773 && let Some(r_var) = &r.variable
3774 {
3775 let resolved_rel_props = self
3776 .resolve_merge_properties(
3777 &r.properties,
3778 row,
3779 prop_manager,
3780 params,
3781 ctx,
3782 )
3783 .await?;
3784 if let Some(prop_filter) =
3785 planner.properties_to_expr(r_var, &resolved_rel_props)
3786 {
3787 plan = LogicalPlan::Filter {
3788 input: Box::new(plan),
3789 predicate: prop_filter,
3790 optional_variables: std::collections::HashSet::new(
3791 ),
3792 };
3793 }
3794 }
3795
3796 if !target_variable.is_empty() {
3798 let resolved_target_props = self
3799 .resolve_merge_properties(
3800 &n_target.properties,
3801 row,
3802 prop_manager,
3803 params,
3804 ctx,
3805 )
3806 .await?;
3807 if let Some(prop_filter) = planner.properties_to_expr(
3808 &target_variable,
3809 &resolved_target_props,
3810 ) {
3811 plan = LogicalPlan::Filter {
3812 input: Box::new(plan),
3813 predicate: prop_filter,
3814 optional_variables: std::collections::HashSet::new(
3815 ),
3816 };
3817 }
3818 vars_in_scope.push(target_variable.clone());
3819 }
3820
3821 if let Some(sv) = &r.variable {
3822 vars_in_scope.push(sv.clone());
3823 }
3824 i += 2;
3825 } else {
3826 break;
3827 }
3828 } else {
3829 break;
3830 }
3831 }
3832 }
3833 _ => return Err(anyhow!("Pattern must start with a node")),
3834 }
3835 }
3836
3837 }
3839
3840 let db_matches = self
3841 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
3842 .await?;
3843
3844 let final_matches = db_matches
3851 .into_iter()
3852 .filter(|db_match| {
3853 row.iter().all(|(key, val)| {
3854 if key.is_empty() || key.starts_with("__") {
3855 return true;
3856 }
3857 let Some(db_val) = db_match.get(key) else {
3858 return true;
3859 };
3860 if db_val == val {
3861 return true;
3862 }
3863 matches!(
3865 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
3866 (Ok(v1), Ok(v2)) if v1 == v2
3867 )
3868 })
3869 })
3870 .map(|db_match| {
3871 let mut merged = row.clone();
3872 merged.extend(db_match);
3873 merged
3874 })
3875 .collect();
3876
3877 Ok(final_matches)
3878 }
3879
3880 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
3888 let has_path_vars = pattern
3889 .paths
3890 .iter()
3891 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
3892
3893 if !has_path_vars {
3894 return (pattern.clone(), Vec::new());
3895 }
3896
3897 let mut modified = pattern.clone();
3898 let mut temp_vars = Vec::new();
3899
3900 for path in &mut modified.paths {
3901 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
3902 continue;
3903 }
3904 for (idx, element) in path.elements.iter_mut().enumerate() {
3905 if let PatternElement::Relationship(r) = element
3906 && r.variable.as_ref().is_none_or(String::is_empty)
3907 {
3908 let temp_var = format!("__path_r_{}", idx);
3909 r.variable = Some(temp_var.clone());
3910 temp_vars.push(temp_var);
3911 }
3912 }
3913 }
3914
3915 (modified, temp_vars)
3916 }
3917
3918 fn bind_path_variables(
3923 pattern: &Pattern,
3924 row: &mut HashMap<String, Value>,
3925 temp_vars: &[String],
3926 ) {
3927 for path in &pattern.paths {
3928 let Some(path_var) = path.variable.as_ref() else {
3929 continue;
3930 };
3931 if path_var.is_empty() {
3932 continue;
3933 }
3934
3935 let mut nodes = Vec::new();
3936 let mut edges = Vec::new();
3937
3938 for element in &path.elements {
3939 match element {
3940 PatternElement::Node(n) => {
3941 if let Some(var) = &n.variable
3942 && let Some(val) = row.get(var)
3943 && let Some(node) = Self::value_to_node_for_path(val)
3944 {
3945 nodes.push(node);
3946 }
3947 }
3948 PatternElement::Relationship(r) => {
3949 if let Some(var) = &r.variable
3950 && let Some(val) = row.get(var)
3951 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
3952 {
3953 edges.push(edge);
3954 }
3955 }
3956 _ => {}
3957 }
3958 }
3959
3960 if !nodes.is_empty() {
3961 use uni_common::value::Path;
3962 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
3963 }
3964 }
3965
3966 for var in temp_vars {
3968 row.remove(var);
3969 }
3970 }
3971
3972 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
3974 match val {
3975 Value::Node(n) => Some(n.clone()),
3976 Value::Map(map) => {
3977 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
3978 let labels = if let Some(Value::List(l)) = map.get("_labels") {
3979 l.iter()
3980 .filter_map(|v| {
3981 if let Value::String(s) = v {
3982 Some(s.clone())
3983 } else {
3984 None
3985 }
3986 })
3987 .collect()
3988 } else {
3989 vec![]
3990 };
3991 let properties: HashMap<String, Value> = map
3992 .iter()
3993 .filter(|(k, _)| !k.starts_with('_'))
3994 .map(|(k, v)| (k.clone(), v.clone()))
3995 .collect();
3996 Some(uni_common::value::Node {
3997 vid,
3998 labels,
3999 properties,
4000 })
4001 }
4002 _ => None,
4003 }
4004 }
4005
4006 fn value_to_edge_for_path(
4008 val: &Value,
4009 type_names: &[String],
4010 ) -> Option<uni_common::value::Edge> {
4011 match val {
4012 Value::Edge(e) => Some(e.clone()),
4013 Value::Map(map) => {
4014 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
4015 let edge_type = map
4016 .get("_type_name")
4017 .and_then(|v| {
4018 if let Value::String(s) = v {
4019 Some(s.clone())
4020 } else {
4021 None
4022 }
4023 })
4024 .or_else(|| type_names.first().cloned())
4025 .unwrap_or_default();
4026 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
4027 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
4028 let properties: HashMap<String, Value> = map
4029 .iter()
4030 .filter(|(k, _)| !k.starts_with('_'))
4031 .map(|(k, v)| (k.clone(), v.clone()))
4032 .collect();
4033 Some(uni_common::value::Edge {
4034 eid,
4035 edge_type,
4036 src,
4037 dst,
4038 properties,
4039 })
4040 }
4041 _ => None,
4042 }
4043 }
4044}
4045
4046pub(crate) async fn read_vertex_props_with_prefetch(
4059 vid: Vid,
4060 prefetched: &Prefetch,
4061 prop_manager: &PropertyManager,
4062 ctx: Option<&QueryContext>,
4063) -> Result<uni_common::Properties> {
4064 match prefetched.vertex.get(&vid).cloned() {
4065 Some(mut base) => {
4066 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_vertex_props(vid, ctx) {
4067 for (k, v) in l0 {
4068 base.insert(k, v);
4069 }
4070 }
4071 Ok(base)
4072 }
4073 None => Ok(prop_manager
4074 .get_all_vertex_props_with_ctx(vid, ctx)
4075 .await?
4076 .unwrap_or_default()),
4077 }
4078}
4079
4080pub(crate) async fn read_edge_props_with_prefetch(
4085 eid: Eid,
4086 prefetched: &Prefetch,
4087 prop_manager: &PropertyManager,
4088 ctx: Option<&QueryContext>,
4089) -> Result<uni_common::Properties> {
4090 match prefetched.edge.get(&eid).cloned() {
4091 Some(mut base) => {
4092 if let Some(l0) = uni_store::runtime::l0_visibility::accumulate_edge_props(eid, ctx) {
4093 for (k, v) in l0 {
4094 base.insert(k, v);
4095 }
4096 }
4097 Ok(base)
4098 }
4099 None => Ok(prop_manager
4100 .get_all_edge_props_with_ctx(eid, ctx)
4101 .await?
4102 .unwrap_or_default()),
4103 }
4104}
4105
4106#[cfg(test)]
4107mod tests {
4108 use super::*;
4109
4110 #[test]
4113 fn test_merge_props_replace_tombstones_missing_keys() {
4114 let current: HashMap<String, Value> = [
4115 ("name".into(), Value::String("Alice".into())),
4116 ("age".into(), Value::Int(30)),
4117 ]
4118 .into();
4119 let incoming: HashMap<String, Value> =
4120 [("name".into(), Value::String("Bob".into()))].into();
4121
4122 let result = Executor::merge_props(current, incoming, true);
4123 assert_eq!(result.get("name"), Some(&Value::String("Bob".into())));
4124 assert_eq!(
4125 result.get("age"),
4126 Some(&Value::Null),
4127 "Missing keys should be tombstoned in replace mode"
4128 );
4129 }
4130
4131 #[test]
4132 fn test_merge_props_merge_preserves_existing() {
4133 let current: HashMap<String, Value> = [
4134 ("name".into(), Value::String("Alice".into())),
4135 ("age".into(), Value::Int(30)),
4136 ]
4137 .into();
4138 let incoming: HashMap<String, Value> =
4139 [("city".into(), Value::String("NYC".into()))].into();
4140
4141 let result = Executor::merge_props(current, incoming, false);
4142 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4143 assert_eq!(result.get("age"), Some(&Value::Int(30)));
4144 assert_eq!(result.get("city"), Some(&Value::String("NYC".into())));
4145 }
4146
4147 #[test]
4148 fn test_merge_props_null_incoming_is_tombstone() {
4149 let current: HashMap<String, Value> =
4150 [("name".into(), Value::String("Alice".into()))].into();
4151 let incoming: HashMap<String, Value> = [("name".into(), Value::Null)].into();
4152
4153 let result = Executor::merge_props(current.clone(), incoming.clone(), false);
4155 assert_eq!(result.get("name"), Some(&Value::Null));
4156
4157 let result = Executor::merge_props(current, incoming, true);
4159 assert_eq!(result.get("name"), Some(&Value::Null));
4160 }
4161
4162 #[test]
4163 fn test_merge_props_empty_current() {
4164 let current: HashMap<String, Value> = HashMap::new();
4165 let incoming: HashMap<String, Value> =
4166 [("name".into(), Value::String("Alice".into()))].into();
4167
4168 let result = Executor::merge_props(current, incoming, false);
4169 assert_eq!(result.get("name"), Some(&Value::String("Alice".into())));
4170 assert_eq!(result.len(), 1);
4171 }
4172
4173 #[test]
4174 fn test_merge_props_empty_incoming_replace_tombstones_all() {
4175 let current: HashMap<String, Value> = [
4176 ("name".into(), Value::String("Alice".into())),
4177 ("age".into(), Value::Int(30)),
4178 ]
4179 .into();
4180 let incoming: HashMap<String, Value> = HashMap::new();
4181
4182 let result = Executor::merge_props(current, incoming, true);
4183 assert_eq!(result.get("name"), Some(&Value::Null));
4184 assert_eq!(result.get("age"), Some(&Value::Null));
4185 }
4186
4187 #[test]
4190 fn test_extract_labels_from_map() {
4191 let mut map = HashMap::new();
4192 map.insert("_vid".into(), Value::Int(1));
4193 map.insert(
4194 "_labels".into(),
4195 Value::List(vec![
4196 Value::String("Person".into()),
4197 Value::String("Employee".into()),
4198 ]),
4199 );
4200 let val = Value::Map(map);
4201
4202 let labels = Executor::extract_labels_from_node(&val);
4203 assert_eq!(
4204 labels,
4205 Some(vec!["Person".to_string(), "Employee".to_string()])
4206 );
4207 }
4208
4209 #[test]
4210 fn test_extract_labels_from_value_node() {
4211 let node = uni_common::Node {
4212 vid: uni_common::core::id::Vid::from(1u64),
4213 labels: vec!["Person".to_string()],
4214 properties: HashMap::new(),
4215 };
4216 let labels = Executor::extract_labels_from_node(&Value::Node(node));
4217 assert_eq!(labels, Some(vec!["Person".to_string()]));
4218 }
4219
4220 #[test]
4221 fn test_extract_labels_non_node_returns_none() {
4222 assert_eq!(Executor::extract_labels_from_node(&Value::Int(42)), None);
4223 assert_eq!(
4224 Executor::extract_labels_from_node(&Value::String("hello".into())),
4225 None
4226 );
4227 }
4228
4229 #[test]
4232 fn test_extract_user_props_strips_internal_keys() {
4233 let mut map = HashMap::new();
4234 map.insert("_vid".into(), Value::Int(1));
4235 map.insert(
4236 "_labels".into(),
4237 Value::List(vec![Value::String("Person".into())]),
4238 );
4239 map.insert("name".into(), Value::String("Alice".into()));
4240 map.insert("age".into(), Value::Int(30));
4241
4242 let props = Executor::extract_user_properties_from_value(&Value::Map(map)).unwrap();
4243 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4244 assert_eq!(props.get("age"), Some(&Value::Int(30)));
4245 assert!(!props.contains_key("_vid"));
4246 assert!(!props.contains_key("_labels"));
4247 }
4248
4249 #[test]
4250 fn test_extract_user_props_plain_map_returns_as_is() {
4251 let mut map = HashMap::new();
4252 map.insert("key".into(), Value::String("value".into()));
4253
4254 let props = Executor::extract_user_properties_from_value(&Value::Map(map.clone())).unwrap();
4255 assert_eq!(props, map);
4256 }
4257
4258 #[test]
4259 fn test_extract_user_props_from_value_node() {
4260 let mut properties = HashMap::new();
4261 properties.insert("name".into(), Value::String("Alice".into()));
4262 let node = uni_common::Node {
4263 vid: uni_common::core::id::Vid::from(1u64),
4264 labels: vec!["Person".to_string()],
4265 properties,
4266 };
4267 let props = Executor::extract_user_properties_from_value(&Value::Node(node)).unwrap();
4268 assert_eq!(props.get("name"), Some(&Value::String("Alice".into())));
4269 }
4270}