1use super::core::*;
5use crate::query::planner::LogicalPlan;
6use anyhow::{Result, anyhow};
7use lancedb::query::{ExecutableQuery, QueryBase};
8use std::collections::HashMap;
9use std::sync::Arc;
10use uni_common::DataType;
11use uni_common::core::id::{Eid, Vid};
12use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
13use uni_common::{Path, Value};
14use uni_cypher::ast::{
15 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
16 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
17 DropEdgeType, DropLabel, Expr, Pattern, PatternElement, RemoveItem, SetClause, SetItem,
18};
19use uni_store::QueryContext;
20use uni_store::runtime::property_manager::PropertyManager;
21use uni_store::runtime::writer::Writer;
22
23struct EdgeIdentity {
25 eid: Eid,
26 src: Vid,
27 dst: Vid,
28 edge_type_id: u32,
29}
30
31impl Executor {
32 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
39 match node_val {
40 Value::Map(map) => {
41 if let Some(Value::List(labels_arr)) = map.get("_labels") {
43 let labels: Vec<String> = labels_arr
44 .iter()
45 .filter_map(|v| v.as_str().map(|s| s.to_string()))
46 .collect();
47 if !labels.is_empty() {
48 return Some(labels);
49 }
50 }
51 None
52 }
53 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
54 _ => None,
55 }
56 }
57
58 pub(crate) fn extract_user_properties_from_value(
66 val: &Value,
67 ) -> Option<HashMap<String, Value>> {
68 match val {
69 Value::Map(map) => {
70 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
74 let is_edge_map = map.contains_key("_eid")
75 && map.contains_key("_src")
76 && map.contains_key("_dst");
77
78 if is_node_map || is_edge_map {
79 let user_props: HashMap<String, Value> = map
81 .iter()
82 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
83 .map(|(k, v)| (k.clone(), v.clone()))
84 .collect();
85 if user_props.is_empty()
89 && let Some(Value::Map(all_props)) = map.get("_all_props")
90 {
91 return Some(all_props.clone());
92 }
93 Some(user_props)
94 } else {
95 Some(map.clone())
97 }
98 }
99 Value::Node(node) => Some(node.properties.clone()),
100 Value::Edge(edge) => Some(edge.properties.clone()),
101 _ => None,
102 }
103 }
104
105 #[expect(clippy::too_many_arguments)]
122 async fn apply_properties_to_entity(
123 &self,
124 variable: &str,
125 new_props: HashMap<String, Value>,
126 replace: bool,
127 row: &mut HashMap<String, Value>,
128 writer: &mut Writer,
129 prop_manager: &PropertyManager,
130 params: &HashMap<String, Value>,
131 ctx: Option<&QueryContext>,
132 ) -> Result<()> {
133 let target = row.get(variable).cloned();
135
136 match target {
137 Some(Value::Node(ref node)) => {
138 let vid = node.vid;
139 let labels = node.labels.clone();
140 let current = prop_manager
141 .get_all_vertex_props_with_ctx(vid, ctx)
142 .await?
143 .unwrap_or_default();
144 let write_props = Self::merge_props(current, new_props, replace);
145 let mut enriched = write_props.clone();
146 for label_name in &labels {
147 self.enrich_properties_with_generated_columns(
148 label_name,
149 &mut enriched,
150 prop_manager,
151 params,
152 ctx,
153 )
154 .await?;
155 }
156 let _ = writer
157 .insert_vertex_with_labels(vid, enriched.clone(), &labels)
158 .await?;
159 if let Some(Value::Node(n)) = row.get_mut(variable) {
161 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
162 }
163 }
164 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
165 let vid = Self::vid_from_value(node_val)?;
166 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
167 let current = prop_manager
168 .get_all_vertex_props_with_ctx(vid, ctx)
169 .await?
170 .unwrap_or_default();
171 let write_props = Self::merge_props(current, new_props, replace);
172 let mut enriched = write_props.clone();
173 for label_name in &labels {
174 self.enrich_properties_with_generated_columns(
175 label_name,
176 &mut enriched,
177 prop_manager,
178 params,
179 ctx,
180 )
181 .await?;
182 }
183 let _ = writer
184 .insert_vertex_with_labels(vid, enriched.clone(), &labels)
185 .await?;
186 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
188 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
190 let effective: HashMap<String, Value> =
192 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
193 for (k, v) in &effective {
194 node_map.insert(k.clone(), v.clone());
195 }
196 node_map.insert("_all_props".to_string(), Value::Map(effective));
198 }
199 }
200 Some(Value::Edge(ref edge)) => {
201 let eid = edge.eid;
202 let src = edge.src;
203 let dst = edge.dst;
204 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
205 let current = prop_manager
206 .get_all_edge_props_with_ctx(eid, ctx)
207 .await?
208 .unwrap_or_default();
209 let write_props = Self::merge_props(current, new_props, replace);
210 writer
211 .insert_edge(
212 src,
213 dst,
214 etype,
215 eid,
216 write_props.clone(),
217 Some(edge.edge_type.clone()),
218 )
219 .await?;
220 if let Some(Value::Edge(e)) = row.get_mut(variable) {
222 e.properties = write_props
223 .into_iter()
224 .filter(|(_, v)| !v.is_null())
225 .collect();
226 }
227 }
228 Some(Value::Map(ref map))
229 if map.contains_key("_eid")
230 && map.contains_key("_src")
231 && map.contains_key("_dst") =>
232 {
233 let ei = self.extract_edge_identity(map)?;
234 let current = prop_manager
235 .get_all_edge_props_with_ctx(ei.eid, ctx)
236 .await?
237 .unwrap_or_default();
238 let write_props = Self::merge_props(current, new_props, replace);
239 let edge_type_name = map
240 .get("_type")
241 .and_then(|v| v.as_str())
242 .map(|s| s.to_string())
243 .or_else(|| {
244 self.storage
245 .schema_manager()
246 .edge_type_name_by_id_unified(ei.edge_type_id)
247 });
248 writer
249 .insert_edge(
250 ei.src,
251 ei.dst,
252 ei.edge_type_id,
253 ei.eid,
254 write_props.clone(),
255 edge_type_name,
256 )
257 .await?;
258 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
260 edge_map.retain(|k, _| k.starts_with('_'));
261 let effective: HashMap<String, Value> = write_props
262 .into_iter()
263 .filter(|(_, v)| !v.is_null())
264 .collect();
265 for (k, v) in &effective {
266 edge_map.insert(k.clone(), v.clone());
267 }
268 edge_map.insert("_all_props".to_string(), Value::Map(effective));
270 }
271 }
272 _ => {
273 }
275 }
276 Ok(())
277 }
278
279 fn merge_props(
290 current: HashMap<String, Value>,
291 incoming: HashMap<String, Value>,
292 replace: bool,
293 ) -> HashMap<String, Value> {
294 if replace {
295 let mut result: HashMap<String, Value> = incoming
297 .iter()
298 .filter(|(_, v)| !v.is_null())
299 .map(|(k, v)| (k.clone(), v.clone()))
300 .collect();
301 for k in current.keys() {
304 if incoming.get(k).is_none_or(|v| v.is_null()) {
305 result.insert(k.clone(), Value::Null);
306 }
307 }
308 result
309 } else {
310 let mut result = current;
312 result.extend(incoming);
313 result
314 }
315 }
316
317 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
319 let eid = Eid::from(
320 map.get("_eid")
321 .and_then(|v| v.as_u64())
322 .ok_or_else(|| anyhow!("Invalid _eid"))?,
323 );
324 let src = Vid::from(
325 map.get("_src")
326 .and_then(|v| v.as_u64())
327 .ok_or_else(|| anyhow!("Invalid _src"))?,
328 );
329 let dst = Vid::from(
330 map.get("_dst")
331 .and_then(|v| v.as_u64())
332 .ok_or_else(|| anyhow!("Invalid _dst"))?,
333 );
334 let edge_type_id = self.resolve_edge_type_id(
335 map.get("_type")
336 .ok_or_else(|| anyhow!("Missing _type on edge map"))?,
337 )?;
338 Ok(EdgeIdentity {
339 eid,
340 src,
341 dst,
342 edge_type_id,
343 })
344 }
345
346 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
353 match type_val {
354 Value::Int(i) => Ok(*i as u32),
355 Value::String(name) => {
356 Ok(self
359 .storage
360 .schema_manager()
361 .get_or_assign_edge_type_id(name))
362 }
363 _ => Err(anyhow!(
364 "Invalid _type value: expected Int or String, got {:?}",
365 type_val
366 )),
367 }
368 }
369
370 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
371 if let Some(writer_arc) = &self.writer {
372 {
374 let mut writer = writer_arc.write().await;
375 writer.flush_to_l1(None).await?;
376 } let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
380 let compaction_results = compactor.compact_all().await?;
381
382 let am = self.storage.adjacency_manager();
384 let schema = self.storage.schema_manager().schema();
385 for info in compaction_results {
386 let direction = match info.direction.as_str() {
388 "fwd" => uni_store::storage::direction::Direction::Outgoing,
389 "bwd" => uni_store::storage::direction::Direction::Incoming,
390 _ => continue,
391 };
392
393 if let Some(edge_type_id) =
395 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
396 {
397 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
399 }
400 }
401 }
402 Ok(())
403 }
404
405 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
406 if let Some(writer_arc) = &self.writer {
407 let mut writer = writer_arc.write().await;
408 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
409 }
410 Ok(())
411 }
412
413 pub(crate) async fn execute_copy_to(
414 &self,
415 identifier: &str,
416 path: &str,
417 format: &str,
418 options: &HashMap<String, Value>,
419 ) -> Result<usize> {
420 let schema = self.storage.schema_manager().schema();
422
423 if schema.get_edge_type_case_insensitive(identifier).is_some() {
425 return self
426 .export_edge_type_in_format(identifier, path, format)
427 .await;
428 }
429
430 if schema.get_label_case_insensitive(identifier).is_some() {
432 return self
433 .export_vertex_label_in_format(identifier, path, format, options)
434 .await;
435 }
436
437 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
439 }
440
441 async fn export_vertex_label_in_format(
442 &self,
443 label: &str,
444 path: &str,
445 format: &str,
446 _options: &HashMap<String, Value>,
447 ) -> Result<usize> {
448 match format {
449 "parquet" => self.export_vertex_label(label, path).await,
450 "csv" => {
451 let dataset = self.storage.vertex_dataset(label)?;
453 let table = dataset.open_lancedb(self.storage.lancedb_store()).await?;
454
455 let mut stream = table.query().execute().await?;
457
458 let mut all_rows = Vec::new();
460 let mut column_names = Vec::new();
461
462 use futures::StreamExt;
464 while let Some(batch_result) = stream.next().await {
465 let batch = batch_result?;
466
467 if column_names.is_empty() {
469 column_names = batch
470 .schema()
471 .fields()
472 .iter()
473 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
474 .map(|f| f.name().clone())
475 .collect();
476 }
477
478 for row_idx in 0..batch.num_rows() {
480 let mut row = Vec::new();
481 for field in batch.schema().fields() {
482 if field.name().starts_with('_') || field.name() == "ext_id" {
483 continue;
484 }
485
486 let col_idx = batch.schema().index_of(field.name())?;
487 let column = batch.column(col_idx);
488 let value = self.arrow_value_to_json(column, row_idx)?;
489
490 let csv_value = match value {
492 Value::Null => String::new(),
493 Value::Bool(b) => b.to_string(),
494 Value::Int(i) => i.to_string(),
495 Value::Float(f) => f.to_string(),
496 Value::String(s) => s,
497 _ => format!("{value}"),
498 };
499 row.push(csv_value);
500 }
501 all_rows.push(row);
502 }
503 }
504
505 let file = std::fs::File::create(path)?;
507 let mut wtr = csv::Writer::from_writer(file);
508
509 log::debug!("CSV export headers: {:?}", column_names);
511 wtr.write_record(&column_names)?;
512
513 for (i, row) in all_rows.iter().enumerate() {
515 log::debug!("CSV export row {}: {:?}", i, row);
516 wtr.write_record(row)?;
517 }
518
519 wtr.flush()?;
520 Ok(all_rows.len())
521 }
522 _ => Err(anyhow!(
523 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
524 format
525 )),
526 }
527 }
528
529 async fn export_edge_type_in_format(
530 &self,
531 edge_type: &str,
532 path: &str,
533 format: &str,
534 ) -> Result<usize> {
535 match format {
536 "parquet" => self.export_edge_type(edge_type, path).await,
537 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
538 _ => Err(anyhow!(
539 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
540 format
541 )),
542 }
543 }
544
545 async fn write_batches_to_parquet(
548 mut stream: impl futures::Stream<Item = Result<arrow_array::RecordBatch, lancedb::Error>>
549 + Unpin,
550 path: &str,
551 entity_description: &str,
552 ) -> Result<usize> {
553 use futures::TryStreamExt;
554
555 let first_batch = match stream.try_next().await? {
557 Some(batch) => batch,
558 None => {
559 log::info!("No data to export from {}", entity_description);
560 return Ok(0);
561 }
562 };
563
564 let file = std::fs::File::create(path)?;
566 let arrow_schema = first_batch.schema();
567 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
568
569 let mut count = first_batch.num_rows();
571 writer.write(&first_batch)?;
572
573 while let Some(batch) = stream.try_next().await? {
575 count += batch.num_rows();
576 writer.write(&batch)?;
577 }
578
579 writer.close()?;
580
581 log::info!(
582 "Exported {} rows from {} to '{}'",
583 count,
584 entity_description,
585 path
586 );
587 Ok(count)
588 }
589
590 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
592 let dataset = self.storage.vertex_dataset(label)?;
593 let lancedb_store = self.storage.lancedb_store();
594 let table = dataset.open_lancedb(lancedb_store).await?;
595 let query = table.query();
596 let stream = query.execute().await?;
597
598 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
599 }
600
601 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
603 let schema = self.storage.schema_manager().schema();
604 if !schema.edge_types.contains_key(edge_type) {
605 return Err(anyhow!("Edge type '{}' not found", edge_type));
606 }
607
608 let lancedb_store = self.storage.lancedb_store();
609 let table = lancedb_store.open_main_edge_table().await?;
610 let query = table.query().only_if(format!("type = '{}'", edge_type));
611 let stream = query.execute().await?;
612
613 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
614 }
615
616 pub(crate) async fn execute_copy_from(
617 &self,
618 label: &str,
619 path: &str,
620 format: &str,
621 options: &HashMap<String, Value>,
622 ) -> Result<usize> {
623 let batches = match format {
625 "parquet" => self.read_parquet_file(path)?,
626 "csv" => self.read_csv_file(path, label, options)?,
627 _ => {
628 return Err(anyhow!(
629 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
630 format
631 ));
632 }
633 };
634
635 let writer_arc = self
637 .writer
638 .as_ref()
639 .ok_or_else(|| anyhow!("No writer available"))?;
640
641 let db_schema = self.storage.schema_manager().schema();
642
643 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
645
646 if is_edge {
647 let edge_type_id = db_schema
649 .edge_type_id_by_name(label)
650 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
651
652 let src_col = options
654 .get("src_col")
655 .and_then(|v| v.as_str())
656 .unwrap_or("src");
657 let dst_col = options
658 .get("dst_col")
659 .and_then(|v| v.as_str())
660 .unwrap_or("dst");
661
662 let mut total_rows = 0;
663 for batch in batches {
664 let num_rows = batch.num_rows();
665
666 for row_idx in 0..num_rows {
667 let mut properties = HashMap::new();
668 let mut src_vid: Option<Vid> = None;
669 let mut dst_vid: Option<Vid> = None;
670
671 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
673 let col_name = field.name();
674 let column = batch.column(col_idx);
675 let value = self.arrow_value_to_json(column, row_idx)?;
676
677 if col_name == src_col {
678 let raw = value.as_u64().unwrap_or_else(|| {
679 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
680 });
681 src_vid = Some(Vid::new(raw));
682 } else if col_name == dst_col {
683 let raw = value.as_u64().unwrap_or_else(|| {
684 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
685 });
686 dst_vid = Some(Vid::new(raw));
687 } else if !col_name.starts_with('_') && !value.is_null() {
688 properties.insert(col_name.clone(), value);
689 }
690 }
691
692 let src = src_vid
693 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
694 let dst = dst_vid
695 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
696
697 let mut writer = writer_arc.write().await;
699 let eid = writer.next_eid(edge_type_id).await?;
700 writer
701 .insert_edge(
702 src,
703 dst,
704 edge_type_id,
705 eid,
706 properties,
707 Some(label.to_string()),
708 )
709 .await?;
710
711 total_rows += 1;
712 }
713 }
714
715 log::info!(
716 "Imported {} edge rows from '{}' into edge type '{}'",
717 total_rows,
718 path,
719 label
720 );
721
722 if total_rows > 0 {
724 let mut writer = writer_arc.write().await;
725 writer.flush_to_l1(None).await?;
726 }
727
728 Ok(total_rows)
729 } else {
730 db_schema
733 .label_id_by_name_case_insensitive(label)
734 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
735
736 let mut total_rows = 0;
737 for batch in batches {
738 let num_rows = batch.num_rows();
739
740 for row_idx in 0..num_rows {
742 let mut properties = HashMap::new();
743
744 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
746 let col_name = field.name();
747
748 if col_name.starts_with('_') {
750 continue;
751 }
752
753 let column = batch.column(col_idx);
754 let value = self.arrow_value_to_json(column, row_idx)?;
755
756 if !value.is_null() {
757 properties.insert(col_name.clone(), value);
758 }
759 }
760
761 let mut writer = writer_arc.write().await;
763 let vid = writer.next_vid().await?;
764 let _ = writer
765 .insert_vertex_with_labels(vid, properties, &[label.to_string()])
766 .await?;
767
768 total_rows += 1;
769 }
770 }
771
772 log::info!(
773 "Imported {} rows from '{}' into label '{}'",
774 total_rows,
775 path,
776 label
777 );
778
779 if total_rows > 0 {
781 let mut writer = writer_arc.write().await;
782 writer.flush_to_l1(None).await?;
783 }
784
785 Ok(total_rows)
786 }
787 }
788
789 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
790 use arrow_array::Array;
791 use arrow_schema::DataType as ArrowDataType;
792
793 if column.is_null(row_idx) {
794 return Ok(Value::Null);
795 }
796
797 match column.data_type() {
798 ArrowDataType::Utf8 => {
799 let array = column
800 .as_any()
801 .downcast_ref::<arrow_array::StringArray>()
802 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
803 Ok(Value::String(array.value(row_idx).to_string()))
804 }
805 ArrowDataType::Int32 => {
806 let array = column
807 .as_any()
808 .downcast_ref::<arrow_array::Int32Array>()
809 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
810 Ok(Value::Int(array.value(row_idx) as i64))
811 }
812 ArrowDataType::Int64 => {
813 let array = column
814 .as_any()
815 .downcast_ref::<arrow_array::Int64Array>()
816 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
817 Ok(Value::Int(array.value(row_idx)))
818 }
819 ArrowDataType::Float32 => {
820 let array = column
821 .as_any()
822 .downcast_ref::<arrow_array::Float32Array>()
823 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
824 Ok(Value::Float(array.value(row_idx) as f64))
825 }
826 ArrowDataType::Float64 => {
827 let array = column
828 .as_any()
829 .downcast_ref::<arrow_array::Float64Array>()
830 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
831 Ok(Value::Float(array.value(row_idx)))
832 }
833 ArrowDataType::Boolean => {
834 let array = column
835 .as_any()
836 .downcast_ref::<arrow_array::BooleanArray>()
837 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
838 Ok(Value::Bool(array.value(row_idx)))
839 }
840 ArrowDataType::UInt64 => {
841 let array = column
842 .as_any()
843 .downcast_ref::<arrow_array::UInt64Array>()
844 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
845 Ok(Value::Int(array.value(row_idx) as i64))
846 }
847 _ => {
848 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
850 if let Some(arr) = array {
851 Ok(Value::String(arr.value(row_idx).to_string()))
852 } else {
853 Ok(Value::Null)
854 }
855 }
856 }
857 }
858
859 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
860 let file = std::fs::File::open(path)?;
861 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
862 .build()?;
863 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
864 }
865
866 fn read_csv_file(
867 &self,
868 path: &str,
869 label: &str,
870 options: &HashMap<String, Value>,
871 ) -> Result<Vec<arrow_array::RecordBatch>> {
872 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
873 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
874 use std::sync::Arc;
875
876 let has_headers = options
878 .get("headers")
879 .and_then(|v| v.as_bool())
880 .unwrap_or(true);
881
882 let file = std::fs::File::open(path)?;
884 let mut rdr = csv::ReaderBuilder::new()
885 .has_headers(has_headers)
886 .from_reader(file);
887
888 let db_schema = self.storage.schema_manager().schema();
890 let properties = db_schema.properties.get(label);
891
892 let mut rows: Vec<Vec<String>> = Vec::new();
894 let headers: Vec<String> = if has_headers {
895 rdr.headers()?.iter().map(|s| s.to_string()).collect()
896 } else {
897 Vec::new()
898 };
899
900 for result in rdr.records() {
901 let record = result?;
902 rows.push(record.iter().map(|s| s.to_string()).collect());
903 }
904
905 if rows.is_empty() {
906 return Ok(Vec::new());
907 }
908
909 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
911 let col_names: Vec<String> = if has_headers {
912 headers
913 } else {
914 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
915 };
916
917 for name in &col_names {
918 let arrow_type = if let Some(props) = properties {
919 if let Some(prop_meta) = props.get(name) {
920 match prop_meta.r#type {
921 DataType::Int32 => ArrowDataType::Int32,
922 DataType::Int64 => ArrowDataType::Int64,
923 DataType::Float32 => ArrowDataType::Float32,
924 DataType::Float64 => ArrowDataType::Float64,
925 DataType::Bool => ArrowDataType::Boolean,
926 _ => ArrowDataType::Utf8,
927 }
928 } else {
929 ArrowDataType::Utf8
930 }
931 } else {
932 ArrowDataType::Utf8
933 };
934 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
935 }
936
937 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
938
939 let mut columns: Vec<ArrayRef> = Vec::new();
941 for (col_idx, field) in arrow_fields.iter().enumerate() {
942 match field.data_type() {
943 ArrowDataType::Int32 => {
944 let values: Vec<Option<i32>> = rows
945 .iter()
946 .map(|row| {
947 if col_idx < row.len() {
948 row[col_idx].parse().ok()
949 } else {
950 None
951 }
952 })
953 .collect();
954 columns.push(Arc::new(Int32Array::from(values)));
955 }
956 _ => {
957 let values: Vec<Option<String>> = rows
959 .iter()
960 .map(|row| {
961 if col_idx < row.len() {
962 Some(row[col_idx].clone())
963 } else {
964 None
965 }
966 })
967 .collect();
968 columns.push(Arc::new(StringArray::from(values)));
969 }
970 }
971 }
972
973 let batch = RecordBatch::try_new(arrow_schema, columns)?;
974 Ok(vec![batch])
975 }
976
977 fn parse_data_type(type_str: &str) -> Result<DataType> {
978 use uni_common::core::schema::{CrdtType, PointType};
979 let type_str = type_str.to_lowercase();
980 let type_str = type_str.trim();
981 match type_str {
982 "string" | "text" | "varchar" => Ok(DataType::String),
983 "int" | "integer" | "int32" => Ok(DataType::Int32),
984 "long" | "int64" | "bigint" => Ok(DataType::Int64),
985 "float" | "float32" | "real" => Ok(DataType::Float32),
986 "double" | "float64" => Ok(DataType::Float64),
987 "bool" | "boolean" => Ok(DataType::Bool),
988 "timestamp" => Ok(DataType::Timestamp),
989 "date" => Ok(DataType::Date),
990 "time" => Ok(DataType::Time),
991 "datetime" => Ok(DataType::DateTime),
992 "duration" => Ok(DataType::Duration),
993 "json" | "jsonb" => Ok(DataType::CypherValue),
994 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
995 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
996 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
997 s if s.starts_with("vector(") && s.ends_with(')') => {
998 let dims_str = &s[7..s.len() - 1];
999 let dimensions = dims_str
1000 .parse::<usize>()
1001 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1002 Ok(DataType::Vector { dimensions })
1003 }
1004 s if s.starts_with("list<") && s.ends_with('>') => {
1005 let inner_type_str = &s[5..s.len() - 1];
1006 let inner_type = Self::parse_data_type(inner_type_str)?;
1007 Ok(DataType::List(Box::new(inner_type)))
1008 }
1009 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1010 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1011 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1012 }
1013 }
1014
1015 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1016 let sm = self.storage.schema_manager_arc();
1017 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1018 return Ok(());
1019 }
1020 sm.add_label(&clause.name)?;
1021 for prop in clause.properties {
1022 let dt = Self::parse_data_type(&prop.data_type)?;
1023 sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1024 if prop.unique {
1025 let constraint = Constraint {
1026 name: format!("{}_{}_unique", clause.name, prop.name),
1027 constraint_type: ConstraintType::Unique {
1028 properties: vec![prop.name],
1029 },
1030 target: ConstraintTarget::Label(clause.name.clone()),
1031 enabled: true,
1032 };
1033 sm.add_constraint(constraint)?;
1034 }
1035 }
1036 sm.save().await?;
1037 Ok(())
1038 }
1039
1040 pub(crate) async fn enrich_properties_with_generated_columns(
1041 &self,
1042 label_name: &str,
1043 properties: &mut HashMap<String, Value>,
1044 prop_manager: &PropertyManager,
1045 params: &HashMap<String, Value>,
1046 ctx: Option<&QueryContext>,
1047 ) -> Result<()> {
1048 let schema = self.storage.schema_manager().schema();
1049
1050 if let Some(props_meta) = schema.properties.get(label_name) {
1051 let mut generators = Vec::new();
1052 for (prop_name, meta) in props_meta {
1053 if let Some(expr_str) = &meta.generation_expression {
1054 generators.push((prop_name.clone(), expr_str.clone()));
1055 }
1056 }
1057
1058 for (prop_name, expr_str) in generators {
1059 let cache_key = (label_name.to_string(), prop_name.clone());
1060 let expr = {
1061 let cache = self.gen_expr_cache.read().await;
1062 cache.get(&cache_key).cloned()
1063 };
1064
1065 let expr = match expr {
1066 Some(e) => e,
1067 None => {
1068 let parsed = uni_cypher::parse_expression(&expr_str)
1069 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1070 let mut cache = self.gen_expr_cache.write().await;
1071 cache.insert(cache_key, parsed.clone());
1072 parsed
1073 }
1074 };
1075
1076 let mut scope = HashMap::new();
1077
1078 if let Some(var) = expr.extract_variable() {
1080 scope.insert(var, Value::Map(properties.clone()));
1081 } else {
1082 for (k, v) in properties.iter() {
1085 scope.insert(k.clone(), v.clone());
1086 }
1087 }
1088
1089 let val = self
1090 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1091 .await?;
1092 properties.insert(prop_name, val);
1093 }
1094 }
1095 Ok(())
1096 }
1097
1098 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1099 let sm = self.storage.schema_manager_arc();
1100 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1101 return Ok(());
1102 }
1103 sm.add_edge_type(&clause.name, clause.src_labels, clause.dst_labels)?;
1104 for prop in clause.properties {
1105 let dt = Self::parse_data_type(&prop.data_type)?;
1106 sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1107 }
1108 sm.save().await?;
1109 Ok(())
1110 }
1111
1112 pub(crate) async fn execute_alter_entity(
1117 sm: &Arc<SchemaManager>,
1118 entity_name: &str,
1119 action: AlterAction,
1120 ) -> Result<()> {
1121 match action {
1122 AlterAction::AddProperty(prop) => {
1123 let dt = Self::parse_data_type(&prop.data_type)?;
1124 sm.add_property(entity_name, &prop.name, dt, prop.nullable)?;
1125 }
1126 AlterAction::DropProperty(prop_name) => {
1127 sm.drop_property(entity_name, &prop_name)?;
1128 }
1129 AlterAction::RenameProperty { old_name, new_name } => {
1130 sm.rename_property(entity_name, &old_name, &new_name)?;
1131 }
1132 }
1133 sm.save().await?;
1134 Ok(())
1135 }
1136
1137 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1138 Self::execute_alter_entity(
1139 &self.storage.schema_manager_arc(),
1140 &clause.name,
1141 clause.action,
1142 )
1143 .await
1144 }
1145
1146 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1147 Self::execute_alter_entity(
1148 &self.storage.schema_manager_arc(),
1149 &clause.name,
1150 clause.action,
1151 )
1152 .await
1153 }
1154
1155 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1156 let sm = self.storage.schema_manager_arc();
1157 sm.drop_label(&clause.name, clause.if_exists)?;
1158 sm.save().await?;
1159 Ok(())
1160 }
1161
1162 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1163 let sm = self.storage.schema_manager_arc();
1164 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1165 sm.save().await?;
1166 Ok(())
1167 }
1168
1169 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1170 let sm = self.storage.schema_manager_arc();
1171 let target = ConstraintTarget::Label(clause.label);
1172 let c_type = match clause.constraint_type {
1173 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1174 properties: clause.properties,
1175 },
1176 AstConstraintType::Exists => {
1177 let property = clause
1178 .properties
1179 .into_iter()
1180 .next()
1181 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1182 ConstraintType::Exists { property }
1183 }
1184 AstConstraintType::Check => {
1185 let expression = clause
1186 .expression
1187 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1188 ConstraintType::Check {
1189 expression: expression.to_string_repr(),
1190 }
1191 }
1192 };
1193
1194 let constraint = Constraint {
1195 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1196 constraint_type: c_type,
1197 target,
1198 enabled: true,
1199 };
1200
1201 sm.add_constraint(constraint)?;
1202 sm.save().await?;
1203 Ok(())
1204 }
1205
1206 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1207 let sm = self.storage.schema_manager_arc();
1208 sm.drop_constraint(&clause.name, false)?;
1209 sm.save().await?;
1210 Ok(())
1211 }
1212
1213 fn get_composite_constraint(&self, label: &str) -> Option<Constraint> {
1214 let schema = self.storage.schema_manager().schema();
1215 schema
1216 .constraints
1217 .iter()
1218 .find(|c| {
1219 if !c.enabled {
1220 return false;
1221 }
1222 match &c.target {
1223 ConstraintTarget::Label(l) if l == label => {
1224 matches!(c.constraint_type, ConstraintType::Unique { .. })
1225 }
1226 _ => false,
1227 }
1228 })
1229 .cloned()
1230 }
1231
1232 #[expect(clippy::too_many_arguments)]
1233 pub(crate) async fn execute_merge(
1234 &self,
1235 rows: Vec<HashMap<String, Value>>,
1236 pattern: &Pattern,
1237 on_match: Option<&SetClause>,
1238 on_create: Option<&SetClause>,
1239 prop_manager: &PropertyManager,
1240 params: &HashMap<String, Value>,
1241 ctx: Option<&QueryContext>,
1242 ) -> Result<Vec<HashMap<String, Value>>> {
1243 let writer_lock = self
1244 .writer
1245 .as_ref()
1246 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1247
1248 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1251
1252 let mut results = Vec::new();
1253 for mut row in rows {
1254 let mut optimized_vid = None;
1256 if pattern.paths.len() == 1 {
1257 let path = &pattern.paths[0];
1258 if path.elements.len() == 1
1259 && let PatternElement::Node(n) = &path.elements[0]
1260 && n.labels.len() == 1
1261 && let Some(constraint) = self.get_composite_constraint(&n.labels[0])
1262 && let ConstraintType::Unique { properties } = constraint.constraint_type
1263 {
1264 let label = &n.labels[0];
1265 let mut pattern_props = HashMap::new();
1267 if let Some(props_expr) = &n.properties {
1268 let val = self
1269 .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1270 .await?;
1271 if let Value::Map(map) = val {
1272 for (k, v) in map {
1273 pattern_props.insert(k, v);
1274 }
1275 }
1276 }
1277
1278 let has_all_keys = properties.iter().all(|p| pattern_props.contains_key(p));
1280 if has_all_keys {
1281 let key_props: HashMap<String, serde_json::Value> = properties
1283 .iter()
1284 .filter_map(|p| {
1285 pattern_props.get(p).map(|v| (p.clone(), v.clone().into()))
1286 })
1287 .collect();
1288
1289 if let Ok(Some(vid)) = self
1291 .storage
1292 .index_manager()
1293 .composite_lookup(label, &key_props)
1294 .await
1295 {
1296 optimized_vid = Some((vid, pattern_props));
1297 }
1298 }
1299 }
1300 }
1301
1302 if let Some((vid, _pattern_props)) = optimized_vid {
1303 let mut writer = writer_lock.write().await;
1305 let mut match_row = row.clone();
1306 if let PatternElement::Node(n) = &pattern.paths[0].elements[0]
1307 && let Some(var) = &n.variable
1308 {
1309 match_row.insert(var.clone(), Value::Int(vid.as_u64() as i64));
1310 }
1311
1312 if let Some(set) = on_match {
1313 self.execute_set_items_locked(
1314 &set.items,
1315 &mut match_row,
1316 &mut writer,
1317 prop_manager,
1318 params,
1319 ctx,
1320 )
1321 .await?;
1322 }
1323 Self::bind_path_variables(&path_pattern, &mut match_row, &temp_vars);
1324 results.push(match_row);
1325 } else {
1326 let matches = self
1328 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1329 .await?;
1330 let mut writer = writer_lock.write().await;
1331 if !matches.is_empty() {
1332 for mut m in matches {
1333 if let Some(set) = on_match {
1334 self.execute_set_items_locked(
1335 &set.items,
1336 &mut m,
1337 &mut writer,
1338 prop_manager,
1339 params,
1340 ctx,
1341 )
1342 .await?;
1343 }
1344 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1345 results.push(m);
1346 }
1347 } else {
1348 self.execute_create_pattern(
1349 &path_pattern,
1350 &mut row,
1351 &mut writer,
1352 prop_manager,
1353 params,
1354 ctx,
1355 )
1356 .await?;
1357 if let Some(set) = on_create {
1358 self.execute_set_items_locked(
1359 &set.items,
1360 &mut row,
1361 &mut writer,
1362 prop_manager,
1363 params,
1364 ctx,
1365 )
1366 .await?;
1367 }
1368 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1369 results.push(row);
1370 }
1371 }
1372 }
1373 Ok(results)
1374 }
1375
1376 pub(crate) async fn execute_create_pattern(
1378 &self,
1379 pattern: &Pattern,
1380 row: &mut HashMap<String, Value>,
1381 writer: &mut Writer,
1382 prop_manager: &PropertyManager,
1383 params: &HashMap<String, Value>,
1384 ctx: Option<&QueryContext>,
1385 ) -> Result<()> {
1386 for path in &pattern.paths {
1387 let mut prev_vid: Option<Vid> = None;
1388 type PendingRel = (String, u32, String, Option<Expr>, Direction);
1390 let mut rel_pending: Option<PendingRel> = None;
1391
1392 for element in &path.elements {
1393 match element {
1394 PatternElement::Node(n) => {
1395 let mut vid = None;
1396
1397 if let Some(var) = &n.variable
1399 && let Some(val) = row.get(var)
1400 && let Ok(existing_vid) = Self::vid_from_value(val)
1401 {
1402 vid = Some(existing_vid);
1403 }
1404
1405 if vid.is_none() {
1407 let mut props = HashMap::new();
1408 if let Some(props_expr) = &n.properties {
1409 let props_val = self
1410 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
1411 .await?;
1412 if let Value::Map(map) = props_val {
1413 for (k, v) in map {
1414 props.insert(k, v);
1415 }
1416 } else {
1417 return Err(anyhow!("Properties must evaluate to a map"));
1418 }
1419 }
1420
1421 let schema = self.storage.schema_manager().schema();
1423
1424 let new_vid = writer.next_vid().await?;
1426
1427 for label_name in &n.labels {
1429 if schema.get_label_case_insensitive(label_name).is_some() {
1430 self.enrich_properties_with_generated_columns(
1431 label_name,
1432 &mut props,
1433 prop_manager,
1434 params,
1435 ctx,
1436 )
1437 .await?;
1438 }
1439 }
1440
1441 let final_props = writer
1443 .insert_vertex_with_labels(new_vid, props, &n.labels)
1444 .await?;
1445
1446 if let Some(var) = &n.variable {
1448 let mut obj = HashMap::new();
1449 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
1450 let labels_list: Vec<Value> =
1451 n.labels.iter().map(|l| Value::String(l.clone())).collect();
1452 obj.insert("_labels".to_string(), Value::List(labels_list));
1453 for (k, v) in &final_props {
1454 obj.insert(k.clone(), v.clone());
1455 }
1456 row.insert(var.clone(), Value::Map(obj));
1458 }
1459 vid = Some(new_vid);
1460 }
1461
1462 let current_vid = vid.unwrap();
1463
1464 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
1465 rel_pending.take()
1466 && let Some(src) = prev_vid
1467 {
1468 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
1469
1470 if !is_rel_bound {
1471 let mut rel_props = HashMap::new();
1472 if let Some(expr) = rel_props_expr {
1473 let val = self
1474 .evaluate_expr(&expr, row, prop_manager, params, ctx)
1475 .await?;
1476 if let Value::Map(map) = val {
1477 rel_props.extend(map);
1478 }
1479 }
1480 let eid = writer.next_eid(type_id).await?;
1481
1482 let (edge_src, edge_dst) = match dir {
1484 Direction::Incoming => (current_vid, src),
1485 _ => (src, current_vid),
1486 };
1487
1488 let store_props = !rel_var.is_empty();
1489 let user_props = if store_props {
1490 rel_props.clone()
1491 } else {
1492 HashMap::new()
1493 };
1494
1495 writer
1496 .insert_edge(
1497 edge_src,
1498 edge_dst,
1499 type_id,
1500 eid,
1501 rel_props,
1502 Some(type_name.clone()),
1503 )
1504 .await?;
1505
1506 if store_props {
1509 let mut edge_map = HashMap::new();
1510 edge_map.insert(
1511 "_eid".to_string(),
1512 Value::Int(eid.as_u64() as i64),
1513 );
1514 edge_map.insert(
1515 "_src".to_string(),
1516 Value::Int(edge_src.as_u64() as i64),
1517 );
1518 edge_map.insert(
1519 "_dst".to_string(),
1520 Value::Int(edge_dst.as_u64() as i64),
1521 );
1522 edge_map
1523 .insert("_type".to_string(), Value::Int(type_id as i64));
1524 for (k, v) in user_props {
1526 edge_map.insert(k, v);
1527 }
1528 row.insert(rel_var, Value::Map(edge_map));
1529 }
1530 }
1531 }
1532 prev_vid = Some(current_vid);
1533 }
1534 PatternElement::Relationship(r) => {
1535 if r.types.len() != 1 {
1536 return Err(anyhow!(
1537 "CREATE relationship must specify exactly one type"
1538 ));
1539 }
1540 let type_name = &r.types[0];
1541 let type_id = self
1543 .storage
1544 .schema_manager()
1545 .get_or_assign_edge_type_id(type_name);
1546
1547 rel_pending = Some((
1548 r.variable.clone().unwrap_or_default(),
1549 type_id,
1550 type_name.clone(),
1551 r.properties.clone(),
1552 r.direction.clone(),
1553 ));
1554 }
1555 PatternElement::Parenthesized { .. } => {
1556 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
1557 }
1558 }
1559 }
1560 }
1561 Ok(())
1562 }
1563
1564 fn validate_property_value(
1568 prop_name: &str,
1569 val: &Value,
1570 schema: &uni_common::core::schema::Schema,
1571 labels: &[String],
1572 ) -> Result<()> {
1573 for label in labels {
1575 if let Some(props) = schema.properties.get(label)
1576 && let Some(prop_meta) = props.get(prop_name)
1577 && prop_meta.r#type == uni_common::core::schema::DataType::CypherValue
1578 {
1579 return Ok(());
1580 }
1581 }
1582
1583 match val {
1584 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
1585 anyhow::bail!(
1586 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1587 prop_name
1588 );
1589 }
1590 Value::List(items) => {
1591 for item in items {
1592 match item {
1593 Value::Map(_)
1594 | Value::Node(_)
1595 | Value::Edge(_)
1596 | Value::Path(_)
1597 | Value::List(_) => {
1598 anyhow::bail!(
1599 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1600 prop_name
1601 );
1602 }
1603 _ => {}
1604 }
1605 }
1606 }
1607 _ => {}
1608 }
1609 Ok(())
1610 }
1611
1612 pub(crate) async fn execute_set_items_locked(
1613 &self,
1614 items: &[SetItem],
1615 row: &mut HashMap<String, Value>,
1616 writer: &mut Writer,
1617 prop_manager: &PropertyManager,
1618 params: &HashMap<String, Value>,
1619 ctx: Option<&QueryContext>,
1620 ) -> Result<()> {
1621 for item in items {
1622 match item {
1623 SetItem::Property { expr, value } => {
1624 if let Expr::Property(var_expr, prop_name) = expr
1625 && let Expr::Variable(var_name) = &**var_expr
1626 && let Some(node_val) = row.get(var_name)
1627 {
1628 if let Ok(vid) = Self::vid_from_value(node_val) {
1629 let labels =
1630 Self::extract_labels_from_node(node_val).unwrap_or_default();
1631 let schema = self.storage.schema_manager().schema().clone();
1632 let mut props = prop_manager
1633 .get_all_vertex_props_with_ctx(vid, ctx)
1634 .await?
1635 .unwrap_or_default();
1636 let val = self
1637 .evaluate_expr(value, row, prop_manager, params, ctx)
1638 .await?;
1639 Self::validate_property_value(prop_name, &val, &schema, &labels)?;
1640 props.insert(prop_name.clone(), val.clone());
1641
1642 for label_name in &labels {
1644 self.enrich_properties_with_generated_columns(
1645 label_name,
1646 &mut props,
1647 prop_manager,
1648 params,
1649 ctx,
1650 )
1651 .await?;
1652 }
1653
1654 let _ = writer
1655 .insert_vertex_with_labels(vid, props, &labels)
1656 .await?;
1657
1658 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1660 node_map.insert(prop_name.clone(), val);
1661 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
1662 node.properties.insert(prop_name.clone(), val);
1663 }
1664 } else if let Value::Map(map) = node_val
1665 && map.get("_eid").is_some_and(|v| !v.is_null())
1666 && map.get("_src").is_some_and(|v| !v.is_null())
1667 && map.get("_dst").is_some_and(|v| !v.is_null())
1668 && map.get("_type").is_some_and(|v| !v.is_null())
1669 {
1670 let ei = self.extract_edge_identity(map)?;
1671 let schema = self.storage.schema_manager().schema().clone();
1672 let edge_type_name = match map.get("_type") {
1674 Some(Value::String(s)) => s.clone(),
1675 Some(Value::Int(id)) => schema
1676 .edge_type_name_by_id_unified(*id as u32)
1677 .unwrap_or_else(|| format!("EdgeType{}", id)),
1678 _ => String::new(),
1679 };
1680
1681 let mut props = prop_manager
1682 .get_all_edge_props_with_ctx(ei.eid, ctx)
1683 .await?
1684 .unwrap_or_default();
1685 let val = self
1686 .evaluate_expr(value, row, prop_manager, params, ctx)
1687 .await?;
1688 Self::validate_property_value(
1689 prop_name,
1690 &val,
1691 &schema,
1692 std::slice::from_ref(&edge_type_name),
1693 )?;
1694 props.insert(prop_name.clone(), val.clone());
1695 writer
1696 .insert_edge(
1697 ei.src,
1698 ei.dst,
1699 ei.edge_type_id,
1700 ei.eid,
1701 props,
1702 Some(edge_type_name.clone()),
1703 )
1704 .await?;
1705
1706 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1708 edge_map.insert(prop_name.clone(), val);
1709 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1710 edge.properties.insert(prop_name.clone(), val);
1711 }
1712 } else if let Value::Edge(edge) = node_val {
1713 let eid = edge.eid;
1715 let src = edge.src;
1716 let dst = edge.dst;
1717 let edge_type_name = edge.edge_type.clone();
1718 let etype =
1719 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
1720 let schema = self.storage.schema_manager().schema().clone();
1721
1722 let mut props = prop_manager
1723 .get_all_edge_props_with_ctx(eid, ctx)
1724 .await?
1725 .unwrap_or_default();
1726 let val = self
1727 .evaluate_expr(value, row, prop_manager, params, ctx)
1728 .await?;
1729 Self::validate_property_value(
1730 prop_name,
1731 &val,
1732 &schema,
1733 std::slice::from_ref(&edge_type_name),
1734 )?;
1735 props.insert(prop_name.clone(), val.clone());
1736 writer
1737 .insert_edge(
1738 src,
1739 dst,
1740 etype,
1741 eid,
1742 props,
1743 Some(edge_type_name.clone()),
1744 )
1745 .await?;
1746
1747 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1749 edge.properties.insert(prop_name.clone(), val);
1750 }
1751 }
1752 }
1753 }
1754 SetItem::Labels { variable, labels } => {
1755 if let Some(node_val) = row.get(variable)
1756 && let Ok(vid) = Self::vid_from_value(node_val)
1757 {
1758 let current_labels =
1760 Self::extract_labels_from_node(node_val).unwrap_or_default();
1761
1762 let labels_to_add: Vec<_> = labels
1764 .iter()
1765 .filter(|l| !current_labels.contains(l))
1766 .cloned()
1767 .collect();
1768
1769 if !labels_to_add.is_empty() {
1770 if let Some(ctx) = ctx {
1773 ctx.l0.write().add_vertex_labels(vid, &labels_to_add);
1774 }
1775
1776 if let Some(Value::Map(obj)) = row.get_mut(variable) {
1778 let mut updated_labels = current_labels;
1779 updated_labels.extend(labels_to_add);
1780 let labels_list =
1781 updated_labels.into_iter().map(Value::String).collect();
1782 obj.insert("_labels".to_string(), Value::List(labels_list));
1783 }
1784 }
1785 }
1786 }
1787 SetItem::Variable { variable, value }
1788 | SetItem::VariablePlus { variable, value } => {
1789 let replace = matches!(item, SetItem::Variable { .. });
1790 let op_str = if replace { "=" } else { "+=" };
1791
1792 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
1794 continue;
1795 }
1796 let rhs = self
1797 .evaluate_expr(value, row, prop_manager, params, ctx)
1798 .await?;
1799 let new_props =
1800 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
1801 anyhow!(
1802 "SET {} {} expr: right-hand side must evaluate to a map, \
1803 node, or relationship",
1804 variable,
1805 op_str
1806 )
1807 })?;
1808 self.apply_properties_to_entity(
1809 variable,
1810 new_props,
1811 replace,
1812 row,
1813 writer,
1814 prop_manager,
1815 params,
1816 ctx,
1817 )
1818 .await?;
1819 }
1820 }
1821 }
1822 Ok(())
1823 }
1824
1825 pub(crate) async fn execute_remove_items_locked(
1833 &self,
1834 items: &[RemoveItem],
1835 row: &mut HashMap<String, Value>,
1836 writer: &mut Writer,
1837 prop_manager: &PropertyManager,
1838 ctx: Option<&QueryContext>,
1839 ) -> Result<()> {
1840 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
1843
1844 for item in items {
1845 match item {
1846 RemoveItem::Property(expr) => {
1847 if let Expr::Property(var_expr, prop_name) = expr
1848 && let Expr::Variable(var_name) = &**var_expr
1849 {
1850 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
1851 entry.1.push(prop_name.clone());
1852 } else {
1853 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
1854 }
1855 }
1856 }
1857 RemoveItem::Labels { variable, labels } => {
1858 self.execute_remove_labels(variable, labels, row, ctx)?;
1859 }
1860 }
1861 }
1862
1863 for (var_name, prop_names) in &prop_removals {
1865 let Some(node_val) = row.get(var_name) else {
1866 continue;
1867 };
1868
1869 if let Ok(vid) = Self::vid_from_value(node_val) {
1870 let mut props = prop_manager
1872 .get_all_vertex_props_with_ctx(vid, ctx)
1873 .await?
1874 .unwrap_or_default();
1875
1876 let any_exist = prop_names
1878 .iter()
1879 .any(|p| props.get(p).is_some_and(|v| !v.is_null()));
1880 if any_exist {
1881 for prop_name in prop_names {
1882 props.insert(prop_name.clone(), Value::Null);
1883 }
1884 }
1885 let effective: HashMap<String, Value> = props
1887 .iter()
1888 .filter(|(_, v)| !v.is_null())
1889 .map(|(k, v)| (k.clone(), v.clone()))
1890 .collect();
1891 if any_exist {
1892 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
1893 let _ = writer
1894 .insert_vertex_with_labels(vid, props, &labels)
1895 .await?;
1896 }
1897
1898 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1900 for prop_name in prop_names {
1901 node_map.insert(prop_name.clone(), Value::Null);
1902 }
1903 node_map.insert("_all_props".to_string(), Value::Map(effective));
1905 }
1906 } else if let Value::Map(map) = node_val {
1907 let mut edge_effective: Option<HashMap<String, Value>> = None;
1910 if map.get("_eid").is_some_and(|v| !v.is_null()) {
1911 let ei = self.extract_edge_identity(map)?;
1912 let mut props = prop_manager
1913 .get_all_edge_props_with_ctx(ei.eid, ctx)
1914 .await?
1915 .unwrap_or_default();
1916
1917 let any_exist = prop_names
1918 .iter()
1919 .any(|p| props.get(p).is_some_and(|v| !v.is_null()));
1920 if any_exist {
1921 for prop_name in prop_names {
1922 props.insert(prop_name.to_string(), Value::Null);
1923 }
1924 }
1925 edge_effective = Some(
1927 props
1928 .iter()
1929 .filter(|(_, v)| !v.is_null())
1930 .map(|(k, v)| (k.clone(), v.clone()))
1931 .collect(),
1932 );
1933 if any_exist {
1934 let edge_type_name = map
1935 .get("_type")
1936 .and_then(|v| v.as_str())
1937 .map(|s| s.to_string())
1938 .or_else(|| {
1939 self.storage
1940 .schema_manager()
1941 .edge_type_name_by_id_unified(ei.edge_type_id)
1942 });
1943 writer
1944 .insert_edge(
1945 ei.src,
1946 ei.dst,
1947 ei.edge_type_id,
1948 ei.eid,
1949 props,
1950 edge_type_name,
1951 )
1952 .await?;
1953 }
1954 }
1955
1956 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1957 for prop_name in prop_names {
1958 edge_map.insert(prop_name.clone(), Value::Null);
1959 }
1960 if let Some(effective) = edge_effective {
1961 edge_map.insert("_all_props".to_string(), Value::Map(effective));
1962 }
1963 }
1964 } else if let Value::Edge(edge) = node_val {
1965 let eid = edge.eid;
1967 let src = edge.src;
1968 let dst = edge.dst;
1969 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
1970
1971 let mut props = prop_manager
1972 .get_all_edge_props_with_ctx(eid, ctx)
1973 .await?
1974 .unwrap_or_default();
1975
1976 let any_exist = prop_names
1977 .iter()
1978 .any(|p| props.get(p).is_some_and(|v| !v.is_null()));
1979 if any_exist {
1980 for prop_name in prop_names {
1981 props.insert(prop_name.to_string(), Value::Null);
1982 }
1983 writer
1984 .insert_edge(src, dst, etype, eid, props, Some(edge.edge_type.clone()))
1985 .await?;
1986 }
1987
1988 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1989 for prop_name in prop_names {
1990 edge.properties.insert(prop_name.to_string(), Value::Null);
1991 }
1992 }
1993 }
1994 }
1995
1996 Ok(())
1997 }
1998
1999 pub(crate) fn execute_remove_labels(
2001 &self,
2002 variable: &str,
2003 labels: &[String],
2004 row: &mut HashMap<String, Value>,
2005 ctx: Option<&QueryContext>,
2006 ) -> Result<()> {
2007 if let Some(node_val) = row.get(variable)
2008 && let Ok(vid) = Self::vid_from_value(node_val)
2009 {
2010 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2012
2013 let labels_to_remove: Vec<_> = labels
2015 .iter()
2016 .filter(|l| current_labels.contains(l))
2017 .collect();
2018
2019 if !labels_to_remove.is_empty() {
2020 if let Some(ctx) = ctx {
2022 let mut l0 = ctx.l0.write();
2023 for label in &labels_to_remove {
2024 l0.remove_vertex_label(vid, label);
2025 }
2026 }
2027
2028 if let Some(Value::Map(obj)) = row.get_mut(variable) {
2030 let remaining_labels: Vec<_> = current_labels
2031 .iter()
2032 .filter(|l| !labels_to_remove.contains(l))
2033 .cloned()
2034 .collect();
2035 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
2036 obj.insert("_labels".to_string(), Value::List(labels_list));
2037 }
2038 }
2039 }
2040 Ok(())
2041 }
2042
2043 fn resolve_edge_type_id_for_edge(
2046 &self,
2047 edge: &crate::types::Edge,
2048 writer: &Writer,
2049 ) -> Result<u32> {
2050 if !edge.edge_type.is_empty() {
2051 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
2052 }
2053 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid) {
2056 return Ok(etype);
2057 }
2058 Err(anyhow!(
2059 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
2060 edge.eid
2061 ))
2062 }
2063
2064 pub(crate) async fn execute_delete_item_locked(
2066 &self,
2067 val: &Value,
2068 detach: bool,
2069 writer: &mut Writer,
2070 ) -> Result<()> {
2071 match val {
2072 Value::Null => {
2073 }
2075 Value::Path(path) => {
2076 for edge in &path.edges {
2078 let etype = self.resolve_edge_type_id_for_edge(edge, writer)?;
2079 writer
2080 .delete_edge(edge.eid, edge.src, edge.dst, etype)
2081 .await?;
2082 }
2083 for node in &path.nodes {
2084 self.execute_delete_vertex(node.vid, detach, Some(node.labels.clone()), writer)
2085 .await?;
2086 }
2087 }
2088 _ => {
2089 if let Ok(path) = Path::try_from(val) {
2091 for edge in &path.edges {
2092 let etype = self.resolve_edge_type_id_for_edge(edge, writer)?;
2093 writer
2094 .delete_edge(edge.eid, edge.src, edge.dst, etype)
2095 .await?;
2096 }
2097 for node in &path.nodes {
2098 self.execute_delete_vertex(
2099 node.vid,
2100 detach,
2101 Some(node.labels.clone()),
2102 writer,
2103 )
2104 .await?;
2105 }
2106 } else if let Ok(vid) = Self::vid_from_value(val) {
2107 let labels = Self::extract_labels_from_node(val);
2108 self.execute_delete_vertex(vid, detach, labels, writer)
2109 .await?;
2110 } else if let Value::Map(map) = val {
2111 self.execute_delete_edge_from_map(map, writer).await?;
2112 } else if let Value::Edge(edge) = val {
2113 let etype = self.resolve_edge_type_id_for_edge(edge, writer)?;
2114 writer
2115 .delete_edge(edge.eid, edge.src, edge.dst, etype)
2116 .await?;
2117 }
2118 }
2119 }
2120 Ok(())
2121 }
2122
2123 pub(crate) async fn execute_delete_vertex(
2125 &self,
2126 vid: Vid,
2127 detach: bool,
2128 labels: Option<Vec<String>>,
2129 writer: &mut Writer,
2130 ) -> Result<()> {
2131 if detach {
2132 self.detach_delete_vertex(vid, writer).await?;
2133 } else {
2134 self.check_vertex_has_no_edges(vid, writer).await?;
2135 }
2136 writer.delete_vertex(vid, labels).await?;
2137 Ok(())
2138 }
2139
2140 pub(crate) async fn check_vertex_has_no_edges(&self, vid: Vid, writer: &Writer) -> Result<()> {
2142 let schema = self.storage.schema_manager().schema();
2143 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
2144
2145 let out_graph = self
2146 .storage
2147 .load_subgraph_cached(
2148 &[vid],
2149 &edge_type_ids,
2150 1,
2151 uni_store::runtime::Direction::Outgoing,
2152 Some(writer.l0_manager.get_current()),
2153 )
2154 .await?;
2155 let has_out = out_graph.edges().next().is_some();
2156
2157 let in_graph = self
2158 .storage
2159 .load_subgraph_cached(
2160 &[vid],
2161 &edge_type_ids,
2162 1,
2163 uni_store::runtime::Direction::Incoming,
2164 Some(writer.l0_manager.get_current()),
2165 )
2166 .await?;
2167 let has_in = in_graph.edges().next().is_some();
2168
2169 if has_out || has_in {
2170 return Err(anyhow!(
2171 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
2172 vid
2173 ));
2174 }
2175 Ok(())
2176 }
2177
2178 pub(crate) async fn execute_delete_edge_from_map(
2180 &self,
2181 map: &HashMap<String, Value>,
2182 writer: &mut Writer,
2183 ) -> Result<()> {
2184 if map.get("_eid").is_some_and(|v| !v.is_null()) {
2186 let ei = self.extract_edge_identity(map)?;
2187 writer
2188 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id)
2189 .await?;
2190 }
2191 Ok(())
2192 }
2193
2194 fn make_scan_plan(
2200 label_id: u16,
2201 labels: Vec<String>,
2202 variable: String,
2203 filter: Option<Expr>,
2204 ) -> LogicalPlan {
2205 if label_id > 0 {
2206 LogicalPlan::Scan {
2207 label_id,
2208 labels,
2209 variable,
2210 filter,
2211 optional: false,
2212 }
2213 } else if !labels.is_empty() {
2214 LogicalPlan::ScanMainByLabels {
2216 labels,
2217 variable,
2218 filter,
2219 optional: false,
2220 }
2221 } else {
2222 LogicalPlan::ScanAll {
2223 variable,
2224 filter,
2225 optional: false,
2226 }
2227 }
2228 }
2229
2230 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
2233 if matches!(plan, LogicalPlan::Empty) {
2234 scan
2235 } else {
2236 LogicalPlan::CrossJoin {
2237 left: Box::new(plan),
2238 right: Box::new(scan),
2239 }
2240 }
2241 }
2242
2243 async fn resolve_merge_properties(
2250 &self,
2251 properties: &Option<Expr>,
2252 row: &HashMap<String, Value>,
2253 prop_manager: &PropertyManager,
2254 params: &HashMap<String, Value>,
2255 ctx: Option<&QueryContext>,
2256 ) -> Result<Option<Expr>> {
2257 let entries = match properties {
2258 Some(Expr::Map(entries)) => entries,
2259 other => return Ok(other.clone()),
2260 };
2261 let mut resolved = Vec::new();
2262 for (key, val_expr) in entries {
2263 if matches!(val_expr, Expr::Literal(_)) {
2264 resolved.push((key.clone(), val_expr.clone()));
2265 } else {
2266 let value = self
2267 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
2268 .await?;
2269 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
2270 }
2271 }
2272 Ok(Some(Expr::Map(resolved)))
2273 }
2274
2275 fn value_to_literal_expr(value: &Value) -> Expr {
2277 match value {
2278 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
2279 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
2280 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
2281 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
2282 Value::Null => Expr::Literal(CypherLiteral::Null),
2283 Value::List(items) => {
2284 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
2285 }
2286 Value::Map(entries) => Expr::Map(
2287 entries
2288 .iter()
2289 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
2290 .collect(),
2291 ),
2292 _ => Expr::Literal(CypherLiteral::Null),
2293 }
2294 }
2295
2296 pub(crate) async fn execute_merge_match(
2297 &self,
2298 pattern: &Pattern,
2299 row: &HashMap<String, Value>,
2300 prop_manager: &PropertyManager,
2301 params: &HashMap<String, Value>,
2302 ctx: Option<&QueryContext>,
2303 ) -> Result<Vec<HashMap<String, Value>>> {
2304 let planner =
2306 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
2307
2308 let mut plan = LogicalPlan::Empty;
2313 let mut vars_in_scope = Vec::new();
2314
2315 for key in row.keys() {
2317 vars_in_scope.push(key.clone());
2318 }
2319
2320 for path in &pattern.paths {
2322 let elements = &path.elements;
2323 let mut i = 0;
2324 while i < elements.len() {
2325 let part = &elements[i];
2326 match part {
2327 PatternElement::Node(n) => {
2328 let variable = n.variable.clone().unwrap_or_default();
2329
2330 let is_bound = !variable.is_empty() && row.contains_key(&variable);
2332
2333 if is_bound {
2334 let val = row.get(&variable).unwrap();
2337 let vid = Self::vid_from_value(val)?;
2338
2339 let extracted_labels =
2342 Self::extract_labels_from_node(val).unwrap_or_default();
2343 let label_id = {
2344 let schema = self.storage.schema_manager().schema();
2345 extracted_labels
2346 .first()
2347 .and_then(|l| schema.label_id_by_name(l))
2348 .unwrap_or(0)
2349 };
2350
2351 let resolved_props = self
2352 .resolve_merge_properties(
2353 &n.properties,
2354 row,
2355 prop_manager,
2356 params,
2357 ctx,
2358 )
2359 .await?;
2360 let prop_filter =
2361 planner.properties_to_expr(&variable, &resolved_props);
2362
2363 let vid_filter = Expr::BinaryOp {
2372 left: Box::new(Expr::Property(
2373 Box::new(Expr::Variable(variable.clone())),
2374 "_vid".to_string(),
2375 )),
2376 op: BinaryOp::Eq,
2377 right: Box::new(Expr::Literal(CypherLiteral::Integer(
2378 vid.as_u64() as i64,
2379 ))),
2380 };
2381
2382 let combined_filter = if let Some(pf) = prop_filter {
2383 Some(Expr::BinaryOp {
2384 left: Box::new(vid_filter),
2385 op: BinaryOp::And,
2386 right: Box::new(pf),
2387 })
2388 } else {
2389 Some(vid_filter)
2390 };
2391
2392 let scan = Self::make_scan_plan(
2393 label_id,
2394 extracted_labels,
2395 variable.clone(),
2396 combined_filter,
2397 );
2398 plan = Self::attach_scan(plan, scan);
2399 } else {
2400 let label_id = if n.labels.is_empty() {
2401 0
2403 } else {
2404 let label_name = &n.labels[0];
2405 let schema = self.storage.schema_manager().schema();
2406 schema
2409 .get_label_case_insensitive(label_name)
2410 .map(|m| m.id)
2411 .unwrap_or(0)
2412 };
2413
2414 let resolved_props = self
2415 .resolve_merge_properties(
2416 &n.properties,
2417 row,
2418 prop_manager,
2419 params,
2420 ctx,
2421 )
2422 .await?;
2423 let prop_filter =
2424 planner.properties_to_expr(&variable, &resolved_props);
2425 let scan = Self::make_scan_plan(
2426 label_id,
2427 n.labels.clone(),
2428 variable.clone(),
2429 prop_filter,
2430 );
2431 plan = Self::attach_scan(plan, scan);
2432
2433 if !n.labels.is_empty()
2440 && !variable.is_empty()
2441 && (label_id == 0 || n.labels.len() > 1)
2442 && let Some(label_filter) =
2443 planner.node_filter_expr(&variable, &n.labels, &None)
2444 {
2445 plan = LogicalPlan::Filter {
2446 input: Box::new(plan),
2447 predicate: label_filter,
2448 optional_variables: std::collections::HashSet::new(),
2449 };
2450 }
2451
2452 if !variable.is_empty() {
2453 vars_in_scope.push(variable.clone());
2454 }
2455 }
2456
2457 i += 1;
2459 while i < elements.len() {
2460 if let PatternElement::Relationship(r) = &elements[i] {
2461 let target_node_part = &elements[i + 1];
2462 if let PatternElement::Node(n_target) = target_node_part {
2463 let schema = self.storage.schema_manager().schema();
2464 let mut edge_type_ids = Vec::new();
2465
2466 if r.types.is_empty() {
2467 return Err(anyhow!("MERGE edge must have a type"));
2468 } else if r.types.len() > 1 {
2469 return Err(anyhow!(
2470 "MERGE does not support multiple edge types"
2471 ));
2472 } else {
2473 let type_name = &r.types[0];
2474 let type_id = self
2477 .storage
2478 .schema_manager()
2479 .get_or_assign_edge_type_id(type_name);
2480 edge_type_ids.push(type_id);
2481 }
2482
2483 let target_label_id: u16 = if let Some(lbl) =
2486 n_target.labels.first()
2487 {
2488 schema
2489 .get_label_case_insensitive(lbl)
2490 .map(|m| m.id)
2491 .unwrap_or(0)
2492 } else if let Some(var) = &n_target.variable {
2493 if let Some(val) = row.get(var) {
2494 if let Some(labels) =
2496 Self::extract_labels_from_node(val)
2497 {
2498 if let Some(first_label) = labels.first() {
2499 schema
2500 .get_label_case_insensitive(first_label)
2501 .map(|m| m.id)
2502 .unwrap_or(0)
2503 } else {
2504 0
2506 }
2507 } else if Self::vid_from_value(val).is_ok() {
2508 0
2510 } else {
2511 return Err(anyhow!(
2512 "Variable {} is not a node",
2513 var
2514 ));
2515 }
2516 } else {
2517 return Err(anyhow!(
2518 "MERGE pattern node must have a label or be a bound variable"
2519 ));
2520 }
2521 } else {
2522 return Err(anyhow!(
2523 "MERGE pattern node must have a label"
2524 ));
2525 };
2526
2527 let target_variable =
2528 n_target.variable.clone().unwrap_or_default();
2529 let source_variable = match &elements[i - 1] {
2530 PatternElement::Node(n) => {
2531 n.variable.clone().unwrap_or_default()
2532 }
2533 _ => String::new(),
2534 };
2535
2536 let is_variable_length = r.range.is_some();
2537 let type_name = &r.types[0];
2538
2539 let is_schemaless = edge_type_ids.iter().all(|id| {
2545 uni_common::core::edge_type::is_schemaless_edge_type(*id)
2546 });
2547
2548 if is_schemaless {
2549 plan = LogicalPlan::TraverseMainByType {
2550 type_names: vec![type_name.clone()],
2551 input: Box::new(plan),
2552 direction: r.direction.clone(),
2553 source_variable,
2554 target_variable: target_variable.clone(),
2555 step_variable: r.variable.clone(),
2556 min_hops: r
2557 .range
2558 .as_ref()
2559 .and_then(|r| r.min)
2560 .unwrap_or(1)
2561 as usize,
2562 max_hops: r
2563 .range
2564 .as_ref()
2565 .and_then(|r| r.max)
2566 .unwrap_or(1)
2567 as usize,
2568 optional: false,
2569 target_filter: None,
2570 path_variable: None,
2571 is_variable_length,
2572 optional_pattern_vars: std::collections::HashSet::new(),
2573 scope_match_variables: std::collections::HashSet::new(),
2574 edge_filter_expr: None,
2575 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2576 };
2577 } else {
2578 let mut edge_props = std::collections::HashSet::new();
2580 if let Some(Expr::Map(entries)) = &r.properties {
2581 for (key, _) in entries {
2582 edge_props.insert(key.clone());
2583 }
2584 }
2585 plan = LogicalPlan::Traverse {
2586 input: Box::new(plan),
2587 edge_type_ids: edge_type_ids.clone(),
2588 direction: r.direction.clone(),
2589 source_variable,
2590 target_variable: target_variable.clone(),
2591 target_label_id,
2592 step_variable: r.variable.clone(),
2593 min_hops: r
2594 .range
2595 .as_ref()
2596 .and_then(|r| r.min)
2597 .unwrap_or(1)
2598 as usize,
2599 max_hops: r
2600 .range
2601 .as_ref()
2602 .and_then(|r| r.max)
2603 .unwrap_or(1)
2604 as usize,
2605 optional: false,
2606 target_filter: None,
2607 path_variable: None,
2608 edge_properties: edge_props,
2609 is_variable_length,
2610 optional_pattern_vars: std::collections::HashSet::new(),
2611 scope_match_variables: std::collections::HashSet::new(),
2612 edge_filter_expr: None,
2613 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2614 qpp_steps: None,
2615 };
2616 }
2617
2618 if r.properties.is_some()
2620 && let Some(r_var) = &r.variable
2621 {
2622 let resolved_rel_props = self
2623 .resolve_merge_properties(
2624 &r.properties,
2625 row,
2626 prop_manager,
2627 params,
2628 ctx,
2629 )
2630 .await?;
2631 if let Some(prop_filter) =
2632 planner.properties_to_expr(r_var, &resolved_rel_props)
2633 {
2634 plan = LogicalPlan::Filter {
2635 input: Box::new(plan),
2636 predicate: prop_filter,
2637 optional_variables: std::collections::HashSet::new(
2638 ),
2639 };
2640 }
2641 }
2642
2643 if !target_variable.is_empty() {
2645 let resolved_target_props = self
2646 .resolve_merge_properties(
2647 &n_target.properties,
2648 row,
2649 prop_manager,
2650 params,
2651 ctx,
2652 )
2653 .await?;
2654 if let Some(prop_filter) = planner.properties_to_expr(
2655 &target_variable,
2656 &resolved_target_props,
2657 ) {
2658 plan = LogicalPlan::Filter {
2659 input: Box::new(plan),
2660 predicate: prop_filter,
2661 optional_variables: std::collections::HashSet::new(
2662 ),
2663 };
2664 }
2665 vars_in_scope.push(target_variable.clone());
2666 }
2667
2668 if let Some(sv) = &r.variable {
2669 vars_in_scope.push(sv.clone());
2670 }
2671 i += 2;
2672 } else {
2673 break;
2674 }
2675 } else {
2676 break;
2677 }
2678 }
2679 }
2680 _ => return Err(anyhow!("Pattern must start with a node")),
2681 }
2682 }
2683
2684 }
2686
2687 let db_matches = self
2688 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
2689 .await?;
2690
2691 let final_matches = db_matches
2698 .into_iter()
2699 .filter(|db_match| {
2700 row.iter().all(|(key, val)| {
2701 if key.is_empty() || key.starts_with("__") {
2702 return true;
2703 }
2704 let Some(db_val) = db_match.get(key) else {
2705 return true;
2706 };
2707 if db_val == val {
2708 return true;
2709 }
2710 matches!(
2712 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
2713 (Ok(v1), Ok(v2)) if v1 == v2
2714 )
2715 })
2716 })
2717 .map(|db_match| {
2718 let mut merged = row.clone();
2719 merged.extend(db_match);
2720 merged
2721 })
2722 .collect();
2723
2724 Ok(final_matches)
2725 }
2726
2727 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
2735 let has_path_vars = pattern
2736 .paths
2737 .iter()
2738 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
2739
2740 if !has_path_vars {
2741 return (pattern.clone(), Vec::new());
2742 }
2743
2744 let mut modified = pattern.clone();
2745 let mut temp_vars = Vec::new();
2746
2747 for path in &mut modified.paths {
2748 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
2749 continue;
2750 }
2751 for (idx, element) in path.elements.iter_mut().enumerate() {
2752 if let PatternElement::Relationship(r) = element
2753 && r.variable.as_ref().is_none_or(String::is_empty)
2754 {
2755 let temp_var = format!("__path_r_{}", idx);
2756 r.variable = Some(temp_var.clone());
2757 temp_vars.push(temp_var);
2758 }
2759 }
2760 }
2761
2762 (modified, temp_vars)
2763 }
2764
2765 fn bind_path_variables(
2770 pattern: &Pattern,
2771 row: &mut HashMap<String, Value>,
2772 temp_vars: &[String],
2773 ) {
2774 for path in &pattern.paths {
2775 let Some(path_var) = path.variable.as_ref() else {
2776 continue;
2777 };
2778 if path_var.is_empty() {
2779 continue;
2780 }
2781
2782 let mut nodes = Vec::new();
2783 let mut edges = Vec::new();
2784
2785 for element in &path.elements {
2786 match element {
2787 PatternElement::Node(n) => {
2788 if let Some(var) = &n.variable
2789 && let Some(val) = row.get(var)
2790 && let Some(node) = Self::value_to_node_for_path(val)
2791 {
2792 nodes.push(node);
2793 }
2794 }
2795 PatternElement::Relationship(r) => {
2796 if let Some(var) = &r.variable
2797 && let Some(val) = row.get(var)
2798 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
2799 {
2800 edges.push(edge);
2801 }
2802 }
2803 _ => {}
2804 }
2805 }
2806
2807 if !nodes.is_empty() {
2808 use uni_common::value::Path;
2809 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
2810 }
2811 }
2812
2813 for var in temp_vars {
2815 row.remove(var);
2816 }
2817 }
2818
2819 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
2821 match val {
2822 Value::Node(n) => Some(n.clone()),
2823 Value::Map(map) => {
2824 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
2825 let labels = if let Some(Value::List(l)) = map.get("_labels") {
2826 l.iter()
2827 .filter_map(|v| {
2828 if let Value::String(s) = v {
2829 Some(s.clone())
2830 } else {
2831 None
2832 }
2833 })
2834 .collect()
2835 } else {
2836 vec![]
2837 };
2838 let properties: HashMap<String, Value> = map
2839 .iter()
2840 .filter(|(k, _)| !k.starts_with('_'))
2841 .map(|(k, v)| (k.clone(), v.clone()))
2842 .collect();
2843 Some(uni_common::value::Node {
2844 vid,
2845 labels,
2846 properties,
2847 })
2848 }
2849 _ => None,
2850 }
2851 }
2852
2853 fn value_to_edge_for_path(
2855 val: &Value,
2856 type_names: &[String],
2857 ) -> Option<uni_common::value::Edge> {
2858 match val {
2859 Value::Edge(e) => Some(e.clone()),
2860 Value::Map(map) => {
2861 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
2862 let edge_type = map
2863 .get("_type_name")
2864 .and_then(|v| {
2865 if let Value::String(s) = v {
2866 Some(s.clone())
2867 } else {
2868 None
2869 }
2870 })
2871 .or_else(|| type_names.first().cloned())
2872 .unwrap_or_default();
2873 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
2874 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
2875 let properties: HashMap<String, Value> = map
2876 .iter()
2877 .filter(|(k, _)| !k.starts_with('_'))
2878 .map(|(k, v)| (k.clone(), v.clone()))
2879 .collect();
2880 Some(uni_common::value::Edge {
2881 eid,
2882 edge_type,
2883 src,
2884 dst,
2885 properties,
2886 })
2887 }
2888 _ => None,
2889 }
2890 }
2891}