1use super::core::*;
5use crate::query::planner::LogicalPlan;
6use anyhow::{Result, anyhow};
7use std::collections::HashMap;
8use std::sync::Arc;
9use uni_common::DataType;
10use uni_common::core::id::{Eid, Vid};
11use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
12use uni_common::{Path, Value};
13use uni_cypher::ast::{
14 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
15 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
16 DropEdgeType, DropLabel, Expr, Pattern, PatternElement, RemoveItem, SetClause, SetItem,
17};
18use uni_store::QueryContext;
19use uni_store::runtime::property_manager::PropertyManager;
20use uni_store::runtime::writer::Writer;
21
22struct EdgeIdentity {
24 eid: Eid,
25 src: Vid,
26 dst: Vid,
27 edge_type_id: u32,
28}
29
30impl Executor {
31 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
38 match node_val {
39 Value::Map(map) => {
40 if let Some(Value::List(labels_arr)) = map.get("_labels") {
42 let labels: Vec<String> = labels_arr
43 .iter()
44 .filter_map(|v| v.as_str().map(|s| s.to_string()))
45 .collect();
46 if !labels.is_empty() {
47 return Some(labels);
48 }
49 }
50 None
51 }
52 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
53 _ => None,
54 }
55 }
56
57 pub(crate) fn extract_user_properties_from_value(
65 val: &Value,
66 ) -> Option<HashMap<String, Value>> {
67 match val {
68 Value::Map(map) => {
69 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
73 let is_edge_map = map.contains_key("_eid")
74 && map.contains_key("_src")
75 && map.contains_key("_dst");
76
77 if is_node_map || is_edge_map {
78 let user_props: HashMap<String, Value> = map
80 .iter()
81 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
82 .map(|(k, v)| (k.clone(), v.clone()))
83 .collect();
84 if user_props.is_empty()
88 && let Some(Value::Map(all_props)) = map.get("_all_props")
89 {
90 return Some(all_props.clone());
91 }
92 Some(user_props)
93 } else {
94 Some(map.clone())
96 }
97 }
98 Value::Node(node) => Some(node.properties.clone()),
99 Value::Edge(edge) => Some(edge.properties.clone()),
100 _ => None,
101 }
102 }
103
104 #[expect(clippy::too_many_arguments)]
121 async fn apply_properties_to_entity(
122 &self,
123 variable: &str,
124 new_props: HashMap<String, Value>,
125 replace: bool,
126 row: &mut HashMap<String, Value>,
127 writer: &mut Writer,
128 prop_manager: &PropertyManager,
129 params: &HashMap<String, Value>,
130 ctx: Option<&QueryContext>,
131 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
132 ) -> Result<()> {
133 let target = row.get(variable).cloned();
135
136 match target {
137 Some(Value::Node(ref node)) => {
138 let vid = node.vid;
139 let labels = node.labels.clone();
140 let current = prop_manager
141 .get_all_vertex_props_with_ctx(vid, ctx)
142 .await?
143 .unwrap_or_default();
144 let write_props = Self::merge_props(current, new_props, replace);
145 let mut enriched = write_props.clone();
146 for label_name in &labels {
147 self.enrich_properties_with_generated_columns(
148 label_name,
149 &mut enriched,
150 prop_manager,
151 params,
152 ctx,
153 )
154 .await?;
155 }
156 let _ = writer
157 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
158 .await?;
159 if let Some(Value::Node(n)) = row.get_mut(variable) {
161 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
162 }
163 }
164 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
165 let vid = Self::vid_from_value(node_val)?;
166 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
167 let current = prop_manager
168 .get_all_vertex_props_with_ctx(vid, ctx)
169 .await?
170 .unwrap_or_default();
171 let write_props = Self::merge_props(current, new_props, replace);
172 let mut enriched = write_props.clone();
173 for label_name in &labels {
174 self.enrich_properties_with_generated_columns(
175 label_name,
176 &mut enriched,
177 prop_manager,
178 params,
179 ctx,
180 )
181 .await?;
182 }
183 let _ = writer
184 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
185 .await?;
186 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
188 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
190 let effective: HashMap<String, Value> =
192 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
193 for (k, v) in &effective {
194 node_map.insert(k.clone(), v.clone());
195 }
196 node_map.insert("_all_props".to_string(), Value::Map(effective));
198 }
199 }
200 Some(Value::Edge(ref edge)) => {
201 let eid = edge.eid;
202 let src = edge.src;
203 let dst = edge.dst;
204 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
205 let current = prop_manager
206 .get_all_edge_props_with_ctx(eid, ctx)
207 .await?
208 .unwrap_or_default();
209 let write_props = Self::merge_props(current, new_props, replace);
210 writer
211 .insert_edge(
212 src,
213 dst,
214 etype,
215 eid,
216 write_props.clone(),
217 Some(edge.edge_type.clone()),
218 tx_l0,
219 )
220 .await?;
221 if let Some(Value::Edge(e)) = row.get_mut(variable) {
223 e.properties = write_props
224 .into_iter()
225 .filter(|(_, v)| !v.is_null())
226 .collect();
227 }
228 }
229 Some(Value::Map(ref map))
230 if map.contains_key("_eid")
231 && map.contains_key("_src")
232 && map.contains_key("_dst") =>
233 {
234 let ei = self.extract_edge_identity(map)?;
235 let current = prop_manager
236 .get_all_edge_props_with_ctx(ei.eid, ctx)
237 .await?
238 .unwrap_or_default();
239 let write_props = Self::merge_props(current, new_props, replace);
240 let edge_type_name = map
241 .get("_type")
242 .and_then(|v| v.as_str())
243 .map(|s| s.to_string())
244 .or_else(|| {
245 self.storage
246 .schema_manager()
247 .edge_type_name_by_id_unified(ei.edge_type_id)
248 });
249 writer
250 .insert_edge(
251 ei.src,
252 ei.dst,
253 ei.edge_type_id,
254 ei.eid,
255 write_props.clone(),
256 edge_type_name,
257 tx_l0,
258 )
259 .await?;
260 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
262 edge_map.retain(|k, _| k.starts_with('_'));
263 let effective: HashMap<String, Value> = write_props
264 .into_iter()
265 .filter(|(_, v)| !v.is_null())
266 .collect();
267 for (k, v) in &effective {
268 edge_map.insert(k.clone(), v.clone());
269 }
270 edge_map.insert("_all_props".to_string(), Value::Map(effective));
272 }
273 }
274 _ => {
275 }
277 }
278 Ok(())
279 }
280
281 fn merge_props(
292 current: HashMap<String, Value>,
293 incoming: HashMap<String, Value>,
294 replace: bool,
295 ) -> HashMap<String, Value> {
296 if replace {
297 let mut result: HashMap<String, Value> = incoming
299 .iter()
300 .filter(|(_, v)| !v.is_null())
301 .map(|(k, v)| (k.clone(), v.clone()))
302 .collect();
303 for k in current.keys() {
306 if incoming.get(k).is_none_or(|v| v.is_null()) {
307 result.insert(k.clone(), Value::Null);
308 }
309 }
310 result
311 } else {
312 let mut result = current;
314 result.extend(incoming);
315 result
316 }
317 }
318
319 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
321 let eid = Eid::from(
322 map.get("_eid")
323 .and_then(|v| v.as_u64())
324 .ok_or_else(|| anyhow!("Invalid _eid"))?,
325 );
326 let src = Vid::from(
327 map.get("_src")
328 .and_then(|v| v.as_u64())
329 .ok_or_else(|| anyhow!("Invalid _src"))?,
330 );
331 let dst = Vid::from(
332 map.get("_dst")
333 .and_then(|v| v.as_u64())
334 .ok_or_else(|| anyhow!("Invalid _dst"))?,
335 );
336 let edge_type_id = self.resolve_edge_type_id(
337 map.get("_type")
338 .ok_or_else(|| anyhow!("Missing _type on edge map"))?,
339 )?;
340 Ok(EdgeIdentity {
341 eid,
342 src,
343 dst,
344 edge_type_id,
345 })
346 }
347
348 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
355 match type_val {
356 Value::Int(i) => Ok(*i as u32),
357 Value::String(name) => {
358 Ok(self
361 .storage
362 .schema_manager()
363 .get_or_assign_edge_type_id(name))
364 }
365 _ => Err(anyhow!(
366 "Invalid _type value: expected Int or String, got {:?}",
367 type_val
368 )),
369 }
370 }
371
372 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
373 if let Some(writer_arc) = &self.writer {
374 {
376 let mut writer = writer_arc.write().await;
377 writer.flush_to_l1(None).await?;
378 } let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
382 let compaction_results = compactor.compact_all().await?;
383
384 let am = self.storage.adjacency_manager();
386 let schema = self.storage.schema_manager().schema();
387 for info in compaction_results {
388 let direction = match info.direction.as_str() {
390 "fwd" => uni_store::storage::direction::Direction::Outgoing,
391 "bwd" => uni_store::storage::direction::Direction::Incoming,
392 _ => continue,
393 };
394
395 if let Some(edge_type_id) =
397 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
398 {
399 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
401 }
402 }
403 }
404 Ok(())
405 }
406
407 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
408 if let Some(writer_arc) = &self.writer {
409 let mut writer = writer_arc.write().await;
410 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
411 }
412 Ok(())
413 }
414
415 pub(crate) async fn execute_copy_to(
416 &self,
417 identifier: &str,
418 path: &str,
419 format: &str,
420 options: &HashMap<String, Value>,
421 ) -> Result<usize> {
422 let schema = self.storage.schema_manager().schema();
424
425 if schema.get_edge_type_case_insensitive(identifier).is_some() {
427 return self
428 .export_edge_type_in_format(identifier, path, format)
429 .await;
430 }
431
432 if schema.get_label_case_insensitive(identifier).is_some() {
434 return self
435 .export_vertex_label_in_format(identifier, path, format, options)
436 .await;
437 }
438
439 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
441 }
442
443 async fn export_vertex_label_in_format(
444 &self,
445 label: &str,
446 path: &str,
447 format: &str,
448 _options: &HashMap<String, Value>,
449 ) -> Result<usize> {
450 match format {
451 "parquet" => self.export_vertex_label(label, path).await,
452 "csv" => {
453 let mut stream = self
454 .storage
455 .scan_vertex_table_stream(label)
456 .await?
457 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
458
459 let mut all_rows = Vec::new();
461 let mut column_names = Vec::new();
462
463 use futures::StreamExt;
465 while let Some(batch_result) = stream.next().await {
466 let batch = batch_result?;
467
468 if column_names.is_empty() {
470 column_names = batch
471 .schema()
472 .fields()
473 .iter()
474 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
475 .map(|f| f.name().clone())
476 .collect();
477 }
478
479 for row_idx in 0..batch.num_rows() {
481 let mut row = Vec::new();
482 for field in batch.schema().fields() {
483 if field.name().starts_with('_') || field.name() == "ext_id" {
484 continue;
485 }
486
487 let col_idx = batch.schema().index_of(field.name())?;
488 let column = batch.column(col_idx);
489 let value = self.arrow_value_to_json(column, row_idx)?;
490
491 let csv_value = match value {
493 Value::Null => String::new(),
494 Value::Bool(b) => b.to_string(),
495 Value::Int(i) => i.to_string(),
496 Value::Float(f) => f.to_string(),
497 Value::String(s) => s,
498 _ => format!("{value}"),
499 };
500 row.push(csv_value);
501 }
502 all_rows.push(row);
503 }
504 }
505
506 let file = std::fs::File::create(path)?;
508 let mut wtr = csv::Writer::from_writer(file);
509
510 log::debug!("CSV export headers: {:?}", column_names);
512 wtr.write_record(&column_names)?;
513
514 for (i, row) in all_rows.iter().enumerate() {
516 log::debug!("CSV export row {}: {:?}", i, row);
517 wtr.write_record(row)?;
518 }
519
520 wtr.flush()?;
521 Ok(all_rows.len())
522 }
523 _ => Err(anyhow!(
524 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
525 format
526 )),
527 }
528 }
529
530 async fn export_edge_type_in_format(
531 &self,
532 edge_type: &str,
533 path: &str,
534 format: &str,
535 ) -> Result<usize> {
536 match format {
537 "parquet" => self.export_edge_type(edge_type, path).await,
538 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
539 _ => Err(anyhow!(
540 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
541 format
542 )),
543 }
544 }
545
546 async fn write_batches_to_parquet(
549 mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
550 path: &str,
551 entity_description: &str,
552 ) -> Result<usize> {
553 use futures::TryStreamExt;
554
555 let first_batch = match stream.try_next().await? {
557 Some(batch) => batch,
558 None => {
559 log::info!("No data to export from {}", entity_description);
560 return Ok(0);
561 }
562 };
563
564 let file = std::fs::File::create(path)?;
566 let arrow_schema = first_batch.schema();
567 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
568
569 let mut count = first_batch.num_rows();
571 writer.write(&first_batch)?;
572
573 while let Some(batch) = stream.try_next().await? {
575 count += batch.num_rows();
576 writer.write(&batch)?;
577 }
578
579 writer.close()?;
580
581 log::info!(
582 "Exported {} rows from {} to '{}'",
583 count,
584 entity_description,
585 path
586 );
587 Ok(count)
588 }
589
590 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
592 let stream = self
593 .storage
594 .scan_vertex_table_stream(label)
595 .await?
596 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
597
598 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
599 }
600
601 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
603 let schema = self.storage.schema_manager().schema();
604 if !schema.edge_types.contains_key(edge_type) {
605 return Err(anyhow!("Edge type '{}' not found", edge_type));
606 }
607
608 let filter = format!("type = '{}'", edge_type);
609 let stream = self
610 .storage
611 .scan_main_edge_table_stream(Some(&filter))
612 .await?
613 .ok_or_else(|| anyhow!("No edge data found"))?;
614
615 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
616 }
617
618 pub(crate) async fn execute_copy_from(
619 &self,
620 label: &str,
621 path: &str,
622 format: &str,
623 options: &HashMap<String, Value>,
624 ) -> Result<usize> {
625 let batches = match format {
627 "parquet" => self.read_parquet_file(path)?,
628 "csv" => self.read_csv_file(path, label, options)?,
629 _ => {
630 return Err(anyhow!(
631 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
632 format
633 ));
634 }
635 };
636
637 let writer_arc = self
639 .writer
640 .as_ref()
641 .ok_or_else(|| anyhow!("No writer available"))?;
642
643 let db_schema = self.storage.schema_manager().schema();
644
645 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
647
648 if is_edge {
649 let edge_type_id = db_schema
651 .edge_type_id_by_name(label)
652 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
653
654 let src_col = options
656 .get("src_col")
657 .and_then(|v| v.as_str())
658 .unwrap_or("src");
659 let dst_col = options
660 .get("dst_col")
661 .and_then(|v| v.as_str())
662 .unwrap_or("dst");
663
664 let mut total_rows = 0;
665 for batch in batches {
666 let num_rows = batch.num_rows();
667
668 for row_idx in 0..num_rows {
669 let mut properties = HashMap::new();
670 let mut src_vid: Option<Vid> = None;
671 let mut dst_vid: Option<Vid> = None;
672
673 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
675 let col_name = field.name();
676 let column = batch.column(col_idx);
677 let value = self.arrow_value_to_json(column, row_idx)?;
678
679 if col_name == src_col {
680 let raw = value.as_u64().unwrap_or_else(|| {
681 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
682 });
683 src_vid = Some(Vid::new(raw));
684 } else if col_name == dst_col {
685 let raw = value.as_u64().unwrap_or_else(|| {
686 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
687 });
688 dst_vid = Some(Vid::new(raw));
689 } else if !col_name.starts_with('_') && !value.is_null() {
690 properties.insert(col_name.clone(), value);
691 }
692 }
693
694 let src = src_vid
695 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
696 let dst = dst_vid
697 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
698
699 let mut writer = writer_arc.write().await;
701 let eid = writer.next_eid(edge_type_id).await?;
702 writer
703 .insert_edge(
704 src,
705 dst,
706 edge_type_id,
707 eid,
708 properties,
709 Some(label.to_string()),
710 None,
711 )
712 .await?;
713
714 total_rows += 1;
715 }
716 }
717
718 log::info!(
719 "Imported {} edge rows from '{}' into edge type '{}'",
720 total_rows,
721 path,
722 label
723 );
724
725 if total_rows > 0 {
727 let mut writer = writer_arc.write().await;
728 writer.flush_to_l1(None).await?;
729 }
730
731 Ok(total_rows)
732 } else {
733 db_schema
736 .label_id_by_name_case_insensitive(label)
737 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
738
739 let mut total_rows = 0;
740 for batch in batches {
741 let num_rows = batch.num_rows();
742
743 for row_idx in 0..num_rows {
745 let mut properties = HashMap::new();
746
747 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
749 let col_name = field.name();
750
751 if col_name.starts_with('_') {
753 continue;
754 }
755
756 let column = batch.column(col_idx);
757 let value = self.arrow_value_to_json(column, row_idx)?;
758
759 if !value.is_null() {
760 properties.insert(col_name.clone(), value);
761 }
762 }
763
764 let mut writer = writer_arc.write().await;
766 let vid = writer.next_vid().await?;
767 let _ = writer
768 .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
769 .await?;
770
771 total_rows += 1;
772 }
773 }
774
775 log::info!(
776 "Imported {} rows from '{}' into label '{}'",
777 total_rows,
778 path,
779 label
780 );
781
782 if total_rows > 0 {
784 let mut writer = writer_arc.write().await;
785 writer.flush_to_l1(None).await?;
786 }
787
788 Ok(total_rows)
789 }
790 }
791
792 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
793 use arrow_array::Array;
794 use arrow_schema::DataType as ArrowDataType;
795
796 if column.is_null(row_idx) {
797 return Ok(Value::Null);
798 }
799
800 match column.data_type() {
801 ArrowDataType::Utf8 => {
802 let array = column
803 .as_any()
804 .downcast_ref::<arrow_array::StringArray>()
805 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
806 Ok(Value::String(array.value(row_idx).to_string()))
807 }
808 ArrowDataType::Int32 => {
809 let array = column
810 .as_any()
811 .downcast_ref::<arrow_array::Int32Array>()
812 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
813 Ok(Value::Int(array.value(row_idx) as i64))
814 }
815 ArrowDataType::Int64 => {
816 let array = column
817 .as_any()
818 .downcast_ref::<arrow_array::Int64Array>()
819 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
820 Ok(Value::Int(array.value(row_idx)))
821 }
822 ArrowDataType::Float32 => {
823 let array = column
824 .as_any()
825 .downcast_ref::<arrow_array::Float32Array>()
826 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
827 Ok(Value::Float(array.value(row_idx) as f64))
828 }
829 ArrowDataType::Float64 => {
830 let array = column
831 .as_any()
832 .downcast_ref::<arrow_array::Float64Array>()
833 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
834 Ok(Value::Float(array.value(row_idx)))
835 }
836 ArrowDataType::Boolean => {
837 let array = column
838 .as_any()
839 .downcast_ref::<arrow_array::BooleanArray>()
840 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
841 Ok(Value::Bool(array.value(row_idx)))
842 }
843 ArrowDataType::UInt64 => {
844 let array = column
845 .as_any()
846 .downcast_ref::<arrow_array::UInt64Array>()
847 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
848 Ok(Value::Int(array.value(row_idx) as i64))
849 }
850 _ => {
851 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
853 if let Some(arr) = array {
854 Ok(Value::String(arr.value(row_idx).to_string()))
855 } else {
856 Ok(Value::Null)
857 }
858 }
859 }
860 }
861
862 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
863 let file = std::fs::File::open(path)?;
864 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
865 .build()?;
866 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
867 }
868
869 fn read_csv_file(
870 &self,
871 path: &str,
872 label: &str,
873 options: &HashMap<String, Value>,
874 ) -> Result<Vec<arrow_array::RecordBatch>> {
875 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
876 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
877 use std::sync::Arc;
878
879 let has_headers = options
881 .get("headers")
882 .and_then(|v| v.as_bool())
883 .unwrap_or(true);
884
885 let file = std::fs::File::open(path)?;
887 let mut rdr = csv::ReaderBuilder::new()
888 .has_headers(has_headers)
889 .from_reader(file);
890
891 let db_schema = self.storage.schema_manager().schema();
893 let properties = db_schema.properties.get(label);
894
895 let mut rows: Vec<Vec<String>> = Vec::new();
897 let headers: Vec<String> = if has_headers {
898 rdr.headers()?.iter().map(|s| s.to_string()).collect()
899 } else {
900 Vec::new()
901 };
902
903 for result in rdr.records() {
904 let record = result?;
905 rows.push(record.iter().map(|s| s.to_string()).collect());
906 }
907
908 if rows.is_empty() {
909 return Ok(Vec::new());
910 }
911
912 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
914 let col_names: Vec<String> = if has_headers {
915 headers
916 } else {
917 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
918 };
919
920 for name in &col_names {
921 let arrow_type = if let Some(props) = properties {
922 if let Some(prop_meta) = props.get(name) {
923 match prop_meta.r#type {
924 DataType::Int32 => ArrowDataType::Int32,
925 DataType::Int64 => ArrowDataType::Int64,
926 DataType::Float32 => ArrowDataType::Float32,
927 DataType::Float64 => ArrowDataType::Float64,
928 DataType::Bool => ArrowDataType::Boolean,
929 _ => ArrowDataType::Utf8,
930 }
931 } else {
932 ArrowDataType::Utf8
933 }
934 } else {
935 ArrowDataType::Utf8
936 };
937 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
938 }
939
940 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
941
942 let mut columns: Vec<ArrayRef> = Vec::new();
944 for (col_idx, field) in arrow_fields.iter().enumerate() {
945 match field.data_type() {
946 ArrowDataType::Int32 => {
947 let values: Vec<Option<i32>> = rows
948 .iter()
949 .map(|row| {
950 if col_idx < row.len() {
951 row[col_idx].parse().ok()
952 } else {
953 None
954 }
955 })
956 .collect();
957 columns.push(Arc::new(Int32Array::from(values)));
958 }
959 _ => {
960 let values: Vec<Option<String>> = rows
962 .iter()
963 .map(|row| {
964 if col_idx < row.len() {
965 Some(row[col_idx].clone())
966 } else {
967 None
968 }
969 })
970 .collect();
971 columns.push(Arc::new(StringArray::from(values)));
972 }
973 }
974 }
975
976 let batch = RecordBatch::try_new(arrow_schema, columns)?;
977 Ok(vec![batch])
978 }
979
980 fn parse_data_type(type_str: &str) -> Result<DataType> {
981 use uni_common::core::schema::{CrdtType, PointType};
982 let type_str = type_str.to_lowercase();
983 let type_str = type_str.trim();
984 match type_str {
985 "string" | "text" | "varchar" => Ok(DataType::String),
986 "int" | "integer" | "int32" => Ok(DataType::Int32),
987 "long" | "int64" | "bigint" => Ok(DataType::Int64),
988 "float" | "float32" | "real" => Ok(DataType::Float32),
989 "double" | "float64" => Ok(DataType::Float64),
990 "bool" | "boolean" => Ok(DataType::Bool),
991 "timestamp" => Ok(DataType::Timestamp),
992 "date" => Ok(DataType::Date),
993 "time" => Ok(DataType::Time),
994 "datetime" => Ok(DataType::DateTime),
995 "duration" => Ok(DataType::Duration),
996 "btic" => Ok(DataType::Btic),
997 "json" | "jsonb" => Ok(DataType::CypherValue),
998 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
999 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
1000 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1001 s if s.starts_with("vector(") && s.ends_with(')') => {
1002 let dims_str = &s[7..s.len() - 1];
1003 let dimensions = dims_str
1004 .parse::<usize>()
1005 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1006 Ok(DataType::Vector { dimensions })
1007 }
1008 s if s.starts_with("list<") && s.ends_with('>') => {
1009 let inner_type_str = &s[5..s.len() - 1];
1010 let inner_type = Self::parse_data_type(inner_type_str)?;
1011 Ok(DataType::List(Box::new(inner_type)))
1012 }
1013 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1014 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1015 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1016 }
1017 }
1018
1019 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1020 let sm = self.storage.schema_manager_arc();
1021 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1022 return Ok(());
1023 }
1024 sm.add_label(&clause.name)?;
1025 for prop in clause.properties {
1026 let dt = Self::parse_data_type(&prop.data_type)?;
1027 sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1028 if prop.unique {
1029 let constraint = Constraint {
1030 name: format!("{}_{}_unique", clause.name, prop.name),
1031 constraint_type: ConstraintType::Unique {
1032 properties: vec![prop.name],
1033 },
1034 target: ConstraintTarget::Label(clause.name.clone()),
1035 enabled: true,
1036 };
1037 sm.add_constraint(constraint)?;
1038 }
1039 }
1040 sm.save().await?;
1041 Ok(())
1042 }
1043
1044 pub(crate) async fn enrich_properties_with_generated_columns(
1045 &self,
1046 label_name: &str,
1047 properties: &mut HashMap<String, Value>,
1048 prop_manager: &PropertyManager,
1049 params: &HashMap<String, Value>,
1050 ctx: Option<&QueryContext>,
1051 ) -> Result<()> {
1052 let schema = self.storage.schema_manager().schema();
1053
1054 if let Some(props_meta) = schema.properties.get(label_name) {
1055 let mut generators = Vec::new();
1056 for (prop_name, meta) in props_meta {
1057 if let Some(expr_str) = &meta.generation_expression {
1058 generators.push((prop_name.clone(), expr_str.clone()));
1059 }
1060 }
1061
1062 for (prop_name, expr_str) in generators {
1063 let cache_key = (label_name.to_string(), prop_name.clone());
1064 let expr = {
1065 let cache = self.gen_expr_cache.read().await;
1066 cache.get(&cache_key).cloned()
1067 };
1068
1069 let expr = match expr {
1070 Some(e) => e,
1071 None => {
1072 let parsed = uni_cypher::parse_expression(&expr_str)
1073 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1074 let mut cache = self.gen_expr_cache.write().await;
1075 cache.insert(cache_key, parsed.clone());
1076 parsed
1077 }
1078 };
1079
1080 let mut scope = HashMap::new();
1081
1082 if let Some(var) = expr.extract_variable() {
1084 scope.insert(var, Value::Map(properties.clone()));
1085 } else {
1086 for (k, v) in properties.iter() {
1089 scope.insert(k.clone(), v.clone());
1090 }
1091 }
1092
1093 let val = self
1094 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1095 .await?;
1096 properties.insert(prop_name, val);
1097 }
1098 }
1099 Ok(())
1100 }
1101
1102 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1103 let sm = self.storage.schema_manager_arc();
1104 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1105 return Ok(());
1106 }
1107 sm.add_edge_type(&clause.name, clause.src_labels, clause.dst_labels)?;
1108 for prop in clause.properties {
1109 let dt = Self::parse_data_type(&prop.data_type)?;
1110 sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1111 }
1112 sm.save().await?;
1113 Ok(())
1114 }
1115
1116 pub(crate) async fn execute_alter_entity(
1121 sm: &Arc<SchemaManager>,
1122 entity_name: &str,
1123 action: AlterAction,
1124 ) -> Result<()> {
1125 match action {
1126 AlterAction::AddProperty(prop) => {
1127 let dt = Self::parse_data_type(&prop.data_type)?;
1128 sm.add_property(entity_name, &prop.name, dt, prop.nullable)?;
1129 }
1130 AlterAction::DropProperty(prop_name) => {
1131 sm.drop_property(entity_name, &prop_name)?;
1132 }
1133 AlterAction::RenameProperty { old_name, new_name } => {
1134 sm.rename_property(entity_name, &old_name, &new_name)?;
1135 }
1136 }
1137 sm.save().await?;
1138 Ok(())
1139 }
1140
1141 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1142 Self::execute_alter_entity(
1143 &self.storage.schema_manager_arc(),
1144 &clause.name,
1145 clause.action,
1146 )
1147 .await
1148 }
1149
1150 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1151 Self::execute_alter_entity(
1152 &self.storage.schema_manager_arc(),
1153 &clause.name,
1154 clause.action,
1155 )
1156 .await
1157 }
1158
1159 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1160 let sm = self.storage.schema_manager_arc();
1161 sm.drop_label(&clause.name, clause.if_exists)?;
1162 sm.save().await?;
1163 Ok(())
1164 }
1165
1166 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1167 let sm = self.storage.schema_manager_arc();
1168 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1169 sm.save().await?;
1170 Ok(())
1171 }
1172
1173 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1174 let sm = self.storage.schema_manager_arc();
1175 let target = ConstraintTarget::Label(clause.label);
1176 let c_type = match clause.constraint_type {
1177 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1178 properties: clause.properties,
1179 },
1180 AstConstraintType::Exists => {
1181 let property = clause
1182 .properties
1183 .into_iter()
1184 .next()
1185 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1186 ConstraintType::Exists { property }
1187 }
1188 AstConstraintType::Check => {
1189 let expression = clause
1190 .expression
1191 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1192 ConstraintType::Check {
1193 expression: expression.to_string_repr(),
1194 }
1195 }
1196 };
1197
1198 let constraint = Constraint {
1199 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1200 constraint_type: c_type,
1201 target,
1202 enabled: true,
1203 };
1204
1205 sm.add_constraint(constraint)?;
1206 sm.save().await?;
1207 Ok(())
1208 }
1209
1210 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1211 let sm = self.storage.schema_manager_arc();
1212 sm.drop_constraint(&clause.name, false)?;
1213 sm.save().await?;
1214 Ok(())
1215 }
1216
1217 fn get_composite_constraint(&self, label: &str) -> Option<Constraint> {
1218 let schema = self.storage.schema_manager().schema();
1219 schema
1220 .constraints
1221 .iter()
1222 .find(|c| {
1223 if !c.enabled {
1224 return false;
1225 }
1226 match &c.target {
1227 ConstraintTarget::Label(l) if l == label => {
1228 matches!(c.constraint_type, ConstraintType::Unique { .. })
1229 }
1230 _ => false,
1231 }
1232 })
1233 .cloned()
1234 }
1235
1236 #[expect(clippy::too_many_arguments)]
1237 pub(crate) async fn execute_merge(
1238 &self,
1239 rows: Vec<HashMap<String, Value>>,
1240 pattern: &Pattern,
1241 on_match: Option<&SetClause>,
1242 on_create: Option<&SetClause>,
1243 prop_manager: &PropertyManager,
1244 params: &HashMap<String, Value>,
1245 ctx: Option<&QueryContext>,
1246 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1247 ) -> Result<Vec<HashMap<String, Value>>> {
1248 let writer_lock = self
1249 .writer
1250 .as_ref()
1251 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1252
1253 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1256
1257 let mut results = Vec::new();
1258 for mut row in rows {
1259 let mut optimized_vid = None;
1261 if pattern.paths.len() == 1 {
1262 let path = &pattern.paths[0];
1263 if path.elements.len() == 1
1264 && let PatternElement::Node(n) = &path.elements[0]
1265 && n.labels.len() == 1
1266 && let Some(constraint) = self.get_composite_constraint(&n.labels[0])
1267 && let ConstraintType::Unique { properties } = constraint.constraint_type
1268 {
1269 let label = &n.labels[0];
1270 let mut pattern_props = HashMap::new();
1272 if let Some(props_expr) = &n.properties {
1273 let val = self
1274 .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1275 .await?;
1276 if let Value::Map(map) = val {
1277 for (k, v) in map {
1278 pattern_props.insert(k, v);
1279 }
1280 }
1281 }
1282
1283 let has_all_keys = properties.iter().all(|p| pattern_props.contains_key(p));
1285 if has_all_keys {
1286 let key_props: HashMap<String, serde_json::Value> = properties
1288 .iter()
1289 .filter_map(|p| {
1290 pattern_props.get(p).map(|v| (p.clone(), v.clone().into()))
1291 })
1292 .collect();
1293
1294 if let Ok(Some(vid)) = self
1296 .storage
1297 .index_manager()
1298 .composite_lookup(label, &key_props)
1299 .await
1300 {
1301 optimized_vid = Some((vid, pattern_props));
1302 }
1303 }
1304 }
1305 }
1306
1307 if let Some((vid, _pattern_props)) = optimized_vid {
1308 let mut writer = writer_lock.write().await;
1310
1311 let mut match_row = row.clone();
1312 if let PatternElement::Node(n) = &pattern.paths[0].elements[0]
1313 && let Some(var) = &n.variable
1314 {
1315 match_row.insert(var.clone(), Value::Int(vid.as_u64() as i64));
1316 }
1317
1318 let result = if let Some(set) = on_match {
1319 self.execute_set_items_locked(
1320 &set.items,
1321 &mut match_row,
1322 &mut writer,
1323 prop_manager,
1324 params,
1325 ctx,
1326 tx_l0_override,
1327 )
1328 .await
1329 } else {
1330 Ok(())
1331 };
1332
1333 drop(writer);
1334 result?;
1335
1336 Self::bind_path_variables(&path_pattern, &mut match_row, &temp_vars);
1337 results.push(match_row);
1338 } else {
1339 let matches = self
1341 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1342 .await?;
1343 let mut writer = writer_lock.write().await;
1344
1345 let result: Result<Vec<HashMap<String, Value>>> = async {
1346 let mut batch = Vec::new();
1347 if !matches.is_empty() {
1348 for mut m in matches {
1349 if let Some(set) = on_match {
1350 self.execute_set_items_locked(
1351 &set.items,
1352 &mut m,
1353 &mut writer,
1354 prop_manager,
1355 params,
1356 ctx,
1357 tx_l0_override,
1358 )
1359 .await?;
1360 }
1361 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1362 batch.push(m);
1363 }
1364 } else {
1365 self.execute_create_pattern(
1366 &path_pattern,
1367 &mut row,
1368 &mut writer,
1369 prop_manager,
1370 params,
1371 ctx,
1372 tx_l0_override,
1373 )
1374 .await?;
1375 if let Some(set) = on_create {
1376 self.execute_set_items_locked(
1377 &set.items,
1378 &mut row,
1379 &mut writer,
1380 prop_manager,
1381 params,
1382 ctx,
1383 tx_l0_override,
1384 )
1385 .await?;
1386 }
1387 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1388 batch.push(row);
1389 }
1390 Ok(batch)
1391 }
1392 .await;
1393
1394 drop(writer);
1395 results.extend(result?);
1396 }
1397 }
1398 Ok(results)
1399 }
1400
1401 #[expect(clippy::too_many_arguments)]
1403 pub(crate) async fn execute_create_pattern(
1404 &self,
1405 pattern: &Pattern,
1406 row: &mut HashMap<String, Value>,
1407 writer: &mut Writer,
1408 prop_manager: &PropertyManager,
1409 params: &HashMap<String, Value>,
1410 ctx: Option<&QueryContext>,
1411 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1412 ) -> Result<()> {
1413 for path in &pattern.paths {
1414 let mut prev_vid: Option<Vid> = None;
1415 type PendingRel = (String, u32, String, Option<Expr>, Direction);
1417 let mut rel_pending: Option<PendingRel> = None;
1418
1419 for element in &path.elements {
1420 match element {
1421 PatternElement::Node(n) => {
1422 let mut vid = None;
1423
1424 if let Some(var) = &n.variable
1426 && let Some(val) = row.get(var)
1427 && let Ok(existing_vid) = Self::vid_from_value(val)
1428 {
1429 vid = Some(existing_vid);
1430 }
1431
1432 if vid.is_none() {
1434 let mut props = HashMap::new();
1435 if let Some(props_expr) = &n.properties {
1436 let props_val = self
1437 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
1438 .await?;
1439 if let Value::Map(map) = props_val {
1440 for (k, v) in map {
1441 props.insert(k, v);
1442 }
1443 } else {
1444 return Err(anyhow!("Properties must evaluate to a map"));
1445 }
1446 }
1447
1448 let schema = self.storage.schema_manager().schema();
1450
1451 let new_vid = writer.next_vid().await?;
1453
1454 for label_name in &n.labels {
1456 if schema.get_label_case_insensitive(label_name).is_some() {
1457 self.enrich_properties_with_generated_columns(
1458 label_name,
1459 &mut props,
1460 prop_manager,
1461 params,
1462 ctx,
1463 )
1464 .await?;
1465 }
1466 }
1467
1468 let final_props = writer
1470 .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
1471 .await?;
1472
1473 if let Some(var) = &n.variable {
1475 let mut obj = HashMap::new();
1476 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
1477 let labels_list: Vec<Value> =
1478 n.labels.iter().map(|l| Value::String(l.clone())).collect();
1479 obj.insert("_labels".to_string(), Value::List(labels_list));
1480 for (k, v) in &final_props {
1481 obj.insert(k.clone(), v.clone());
1482 }
1483 row.insert(var.clone(), Value::Map(obj));
1485 }
1486 vid = Some(new_vid);
1487 }
1488
1489 let current_vid = vid.unwrap();
1490
1491 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
1492 rel_pending.take()
1493 && let Some(src) = prev_vid
1494 {
1495 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
1496
1497 if !is_rel_bound {
1498 let mut rel_props = HashMap::new();
1499 if let Some(expr) = rel_props_expr {
1500 let val = self
1501 .evaluate_expr(&expr, row, prop_manager, params, ctx)
1502 .await?;
1503 if let Value::Map(map) = val {
1504 rel_props.extend(map);
1505 }
1506 }
1507 let eid = writer.next_eid(type_id).await?;
1508
1509 let (edge_src, edge_dst) = match dir {
1511 Direction::Incoming => (current_vid, src),
1512 _ => (src, current_vid),
1513 };
1514
1515 let store_props = !rel_var.is_empty();
1516 let user_props = if store_props {
1517 rel_props.clone()
1518 } else {
1519 HashMap::new()
1520 };
1521
1522 writer
1523 .insert_edge(
1524 edge_src,
1525 edge_dst,
1526 type_id,
1527 eid,
1528 rel_props,
1529 Some(type_name.clone()),
1530 tx_l0,
1531 )
1532 .await?;
1533
1534 if store_props {
1537 let mut edge_map = HashMap::new();
1538 edge_map.insert(
1539 "_eid".to_string(),
1540 Value::Int(eid.as_u64() as i64),
1541 );
1542 edge_map.insert(
1543 "_src".to_string(),
1544 Value::Int(edge_src.as_u64() as i64),
1545 );
1546 edge_map.insert(
1547 "_dst".to_string(),
1548 Value::Int(edge_dst.as_u64() as i64),
1549 );
1550 edge_map
1551 .insert("_type".to_string(), Value::Int(type_id as i64));
1552 for (k, v) in user_props {
1554 edge_map.insert(k, v);
1555 }
1556 row.insert(rel_var, Value::Map(edge_map));
1557 }
1558 }
1559 }
1560 prev_vid = Some(current_vid);
1561 }
1562 PatternElement::Relationship(r) => {
1563 if r.types.len() != 1 {
1564 return Err(anyhow!(
1565 "CREATE relationship must specify exactly one type"
1566 ));
1567 }
1568 let type_name = &r.types[0];
1569 let type_id = self
1571 .storage
1572 .schema_manager()
1573 .get_or_assign_edge_type_id(type_name);
1574
1575 rel_pending = Some((
1576 r.variable.clone().unwrap_or_default(),
1577 type_id,
1578 type_name.clone(),
1579 r.properties.clone(),
1580 r.direction.clone(),
1581 ));
1582 }
1583 PatternElement::Parenthesized { .. } => {
1584 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
1585 }
1586 }
1587 }
1588 }
1589 Ok(())
1590 }
1591
1592 fn validate_property_value(
1596 prop_name: &str,
1597 val: &Value,
1598 schema: &uni_common::core::schema::Schema,
1599 labels: &[String],
1600 ) -> Result<()> {
1601 for label in labels {
1603 if let Some(props) = schema.properties.get(label)
1604 && let Some(prop_meta) = props.get(prop_name)
1605 && prop_meta.r#type == uni_common::core::schema::DataType::CypherValue
1606 {
1607 return Ok(());
1608 }
1609 }
1610
1611 match val {
1612 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
1613 anyhow::bail!(
1614 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1615 prop_name
1616 );
1617 }
1618 Value::List(items) => {
1619 for item in items {
1620 match item {
1621 Value::Map(_)
1622 | Value::Node(_)
1623 | Value::Edge(_)
1624 | Value::Path(_)
1625 | Value::List(_) => {
1626 anyhow::bail!(
1627 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1628 prop_name
1629 );
1630 }
1631 _ => {}
1632 }
1633 }
1634 }
1635 _ => {}
1636 }
1637 Ok(())
1638 }
1639
1640 #[expect(clippy::too_many_arguments)]
1641 pub(crate) async fn execute_set_items_locked(
1642 &self,
1643 items: &[SetItem],
1644 row: &mut HashMap<String, Value>,
1645 writer: &mut Writer,
1646 prop_manager: &PropertyManager,
1647 params: &HashMap<String, Value>,
1648 ctx: Option<&QueryContext>,
1649 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1650 ) -> Result<()> {
1651 for item in items {
1652 match item {
1653 SetItem::Property { expr, value } => {
1654 if let Expr::Property(var_expr, prop_name) = expr
1655 && let Expr::Variable(var_name) = &**var_expr
1656 && let Some(node_val) = row.get(var_name)
1657 {
1658 if let Ok(vid) = Self::vid_from_value(node_val) {
1659 let labels =
1660 Self::extract_labels_from_node(node_val).unwrap_or_default();
1661 let schema = self.storage.schema_manager().schema().clone();
1662 let mut props = prop_manager
1663 .get_all_vertex_props_with_ctx(vid, ctx)
1664 .await?
1665 .unwrap_or_default();
1666 let val = self
1667 .evaluate_expr(value, row, prop_manager, params, ctx)
1668 .await?;
1669 Self::validate_property_value(prop_name, &val, &schema, &labels)?;
1670 props.insert(prop_name.clone(), val.clone());
1671
1672 for label_name in &labels {
1674 self.enrich_properties_with_generated_columns(
1675 label_name,
1676 &mut props,
1677 prop_manager,
1678 params,
1679 ctx,
1680 )
1681 .await?;
1682 }
1683
1684 let _ = writer
1685 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1686 .await?;
1687
1688 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1690 node_map.insert(prop_name.clone(), val);
1691 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
1692 node.properties.insert(prop_name.clone(), val);
1693 }
1694 } else if let Value::Map(map) = node_val
1695 && map.get("_eid").is_some_and(|v| !v.is_null())
1696 && map.get("_src").is_some_and(|v| !v.is_null())
1697 && map.get("_dst").is_some_and(|v| !v.is_null())
1698 && map.get("_type").is_some_and(|v| !v.is_null())
1699 {
1700 let ei = self.extract_edge_identity(map)?;
1701 let schema = self.storage.schema_manager().schema().clone();
1702 let edge_type_name = match map.get("_type") {
1704 Some(Value::String(s)) => s.clone(),
1705 Some(Value::Int(id)) => schema
1706 .edge_type_name_by_id_unified(*id as u32)
1707 .unwrap_or_else(|| format!("EdgeType{}", id)),
1708 _ => String::new(),
1709 };
1710
1711 let mut props = prop_manager
1712 .get_all_edge_props_with_ctx(ei.eid, ctx)
1713 .await?
1714 .unwrap_or_default();
1715 let val = self
1716 .evaluate_expr(value, row, prop_manager, params, ctx)
1717 .await?;
1718 Self::validate_property_value(
1719 prop_name,
1720 &val,
1721 &schema,
1722 std::slice::from_ref(&edge_type_name),
1723 )?;
1724 props.insert(prop_name.clone(), val.clone());
1725 writer
1726 .insert_edge(
1727 ei.src,
1728 ei.dst,
1729 ei.edge_type_id,
1730 ei.eid,
1731 props,
1732 Some(edge_type_name.clone()),
1733 tx_l0,
1734 )
1735 .await?;
1736
1737 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1739 edge_map.insert(prop_name.clone(), val);
1740 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1741 edge.properties.insert(prop_name.clone(), val);
1742 }
1743 } else if let Value::Edge(edge) = node_val {
1744 let eid = edge.eid;
1746 let src = edge.src;
1747 let dst = edge.dst;
1748 let edge_type_name = edge.edge_type.clone();
1749 let etype =
1750 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
1751 let schema = self.storage.schema_manager().schema().clone();
1752
1753 let mut props = prop_manager
1754 .get_all_edge_props_with_ctx(eid, ctx)
1755 .await?
1756 .unwrap_or_default();
1757 let val = self
1758 .evaluate_expr(value, row, prop_manager, params, ctx)
1759 .await?;
1760 Self::validate_property_value(
1761 prop_name,
1762 &val,
1763 &schema,
1764 std::slice::from_ref(&edge_type_name),
1765 )?;
1766 props.insert(prop_name.clone(), val.clone());
1767 writer
1768 .insert_edge(
1769 src,
1770 dst,
1771 etype,
1772 eid,
1773 props,
1774 Some(edge_type_name.clone()),
1775 tx_l0,
1776 )
1777 .await?;
1778
1779 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1781 edge.properties.insert(prop_name.clone(), val);
1782 }
1783 }
1784 }
1785 }
1786 SetItem::Labels { variable, labels } => {
1787 if let Some(node_val) = row.get(variable)
1788 && let Ok(vid) = Self::vid_from_value(node_val)
1789 {
1790 let current_labels =
1792 Self::extract_labels_from_node(node_val).unwrap_or_default();
1793
1794 let labels_to_add: Vec<_> = labels
1796 .iter()
1797 .filter(|l| !current_labels.contains(l))
1798 .cloned()
1799 .collect();
1800
1801 if !labels_to_add.is_empty() {
1802 if let Some(ctx) = ctx {
1805 ctx.l0.write().add_vertex_labels(vid, &labels_to_add);
1806 }
1807
1808 if let Some(Value::Map(obj)) = row.get_mut(variable) {
1810 let mut updated_labels = current_labels;
1811 updated_labels.extend(labels_to_add);
1812 let labels_list =
1813 updated_labels.into_iter().map(Value::String).collect();
1814 obj.insert("_labels".to_string(), Value::List(labels_list));
1815 }
1816 }
1817 }
1818 }
1819 SetItem::Variable { variable, value }
1820 | SetItem::VariablePlus { variable, value } => {
1821 let replace = matches!(item, SetItem::Variable { .. });
1822 let op_str = if replace { "=" } else { "+=" };
1823
1824 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
1826 continue;
1827 }
1828 let rhs = self
1829 .evaluate_expr(value, row, prop_manager, params, ctx)
1830 .await?;
1831 let new_props =
1832 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
1833 anyhow!(
1834 "SET {} {} expr: right-hand side must evaluate to a map, \
1835 node, or relationship",
1836 variable,
1837 op_str
1838 )
1839 })?;
1840 self.apply_properties_to_entity(
1841 variable,
1842 new_props,
1843 replace,
1844 row,
1845 writer,
1846 prop_manager,
1847 params,
1848 ctx,
1849 tx_l0,
1850 )
1851 .await?;
1852 }
1853 }
1854 }
1855 Ok(())
1856 }
1857
1858 pub(crate) async fn execute_remove_items_locked(
1866 &self,
1867 items: &[RemoveItem],
1868 row: &mut HashMap<String, Value>,
1869 writer: &mut Writer,
1870 prop_manager: &PropertyManager,
1871 ctx: Option<&QueryContext>,
1872 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1873 ) -> Result<()> {
1874 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
1877
1878 for item in items {
1879 match item {
1880 RemoveItem::Property(expr) => {
1881 if let Expr::Property(var_expr, prop_name) = expr
1882 && let Expr::Variable(var_name) = &**var_expr
1883 {
1884 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
1885 entry.1.push(prop_name.clone());
1886 } else {
1887 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
1888 }
1889 }
1890 }
1891 RemoveItem::Labels { variable, labels } => {
1892 self.execute_remove_labels(variable, labels, row, ctx)?;
1893 }
1894 }
1895 }
1896
1897 for (var_name, prop_names) in &prop_removals {
1899 let Some(node_val) = row.get(var_name) else {
1900 continue;
1901 };
1902
1903 if let Ok(vid) = Self::vid_from_value(node_val) {
1904 let mut props = prop_manager
1906 .get_all_vertex_props_with_ctx(vid, ctx)
1907 .await?
1908 .unwrap_or_default();
1909
1910 let removed_count = prop_names
1912 .iter()
1913 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1914 .count();
1915 let any_exist = removed_count > 0;
1916 if any_exist {
1917 writer.track_properties_removed(removed_count, tx_l0);
1918 for prop_name in prop_names {
1919 props.insert(prop_name.clone(), Value::Null);
1920 }
1921 }
1922 let effective: HashMap<String, Value> = props
1924 .iter()
1925 .filter(|(_, v)| !v.is_null())
1926 .map(|(k, v)| (k.clone(), v.clone()))
1927 .collect();
1928 if any_exist {
1929 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
1930 let _ = writer
1931 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1932 .await?;
1933 }
1934
1935 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1937 for prop_name in prop_names {
1938 node_map.insert(prop_name.clone(), Value::Null);
1939 }
1940 node_map.insert("_all_props".to_string(), Value::Map(effective));
1942 }
1943 } else if let Value::Map(map) = node_val {
1944 let mut edge_effective: Option<HashMap<String, Value>> = None;
1947 if map.get("_eid").is_some_and(|v| !v.is_null()) {
1948 let ei = self.extract_edge_identity(map)?;
1949 let mut props = prop_manager
1950 .get_all_edge_props_with_ctx(ei.eid, ctx)
1951 .await?
1952 .unwrap_or_default();
1953
1954 let removed_count = prop_names
1955 .iter()
1956 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1957 .count();
1958 let any_exist = removed_count > 0;
1959 if any_exist {
1960 writer.track_properties_removed(removed_count, tx_l0);
1961 for prop_name in prop_names {
1962 props.insert(prop_name.to_string(), Value::Null);
1963 }
1964 }
1965 edge_effective = Some(
1967 props
1968 .iter()
1969 .filter(|(_, v)| !v.is_null())
1970 .map(|(k, v)| (k.clone(), v.clone()))
1971 .collect(),
1972 );
1973 if any_exist {
1974 let edge_type_name = map
1975 .get("_type")
1976 .and_then(|v| v.as_str())
1977 .map(|s| s.to_string())
1978 .or_else(|| {
1979 self.storage
1980 .schema_manager()
1981 .edge_type_name_by_id_unified(ei.edge_type_id)
1982 });
1983 writer
1984 .insert_edge(
1985 ei.src,
1986 ei.dst,
1987 ei.edge_type_id,
1988 ei.eid,
1989 props,
1990 edge_type_name,
1991 tx_l0,
1992 )
1993 .await?;
1994 }
1995 }
1996
1997 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1998 for prop_name in prop_names {
1999 edge_map.insert(prop_name.clone(), Value::Null);
2000 }
2001 if let Some(effective) = edge_effective {
2002 edge_map.insert("_all_props".to_string(), Value::Map(effective));
2003 }
2004 }
2005 } else if let Value::Edge(edge) = node_val {
2006 let eid = edge.eid;
2008 let src = edge.src;
2009 let dst = edge.dst;
2010 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
2011
2012 let mut props = prop_manager
2013 .get_all_edge_props_with_ctx(eid, ctx)
2014 .await?
2015 .unwrap_or_default();
2016
2017 let removed_count = prop_names
2018 .iter()
2019 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2020 .count();
2021 if removed_count > 0 {
2022 writer.track_properties_removed(removed_count, tx_l0);
2023 for prop_name in prop_names {
2024 props.insert(prop_name.to_string(), Value::Null);
2025 }
2026 writer
2027 .insert_edge(
2028 src,
2029 dst,
2030 etype,
2031 eid,
2032 props,
2033 Some(edge.edge_type.clone()),
2034 tx_l0,
2035 )
2036 .await?;
2037 }
2038
2039 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2040 for prop_name in prop_names {
2041 edge.properties.insert(prop_name.to_string(), Value::Null);
2042 }
2043 }
2044 }
2045 }
2046
2047 Ok(())
2048 }
2049
2050 pub(crate) fn execute_remove_labels(
2052 &self,
2053 variable: &str,
2054 labels: &[String],
2055 row: &mut HashMap<String, Value>,
2056 ctx: Option<&QueryContext>,
2057 ) -> Result<()> {
2058 if let Some(node_val) = row.get(variable)
2059 && let Ok(vid) = Self::vid_from_value(node_val)
2060 {
2061 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2063
2064 let labels_to_remove: Vec<_> = labels
2066 .iter()
2067 .filter(|l| current_labels.contains(l))
2068 .collect();
2069
2070 if !labels_to_remove.is_empty() {
2071 if let Some(ctx) = ctx {
2073 let mut l0 = ctx.l0.write();
2074 for label in &labels_to_remove {
2075 l0.remove_vertex_label(vid, label);
2076 }
2077 }
2078
2079 if let Some(Value::Map(obj)) = row.get_mut(variable) {
2081 let remaining_labels: Vec<_> = current_labels
2082 .iter()
2083 .filter(|l| !labels_to_remove.contains(l))
2084 .cloned()
2085 .collect();
2086 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
2087 obj.insert("_labels".to_string(), Value::List(labels_list));
2088 }
2089 }
2090 }
2091 Ok(())
2092 }
2093
2094 fn resolve_edge_type_id_for_edge(
2097 &self,
2098 edge: &crate::types::Edge,
2099 writer: &Writer,
2100 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2101 ) -> Result<u32> {
2102 if !edge.edge_type.is_empty() {
2103 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
2104 }
2105 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
2108 return Ok(etype);
2109 }
2110 Err(anyhow!(
2111 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
2112 edge.eid
2113 ))
2114 }
2115
2116 pub(crate) async fn execute_delete_item_locked(
2118 &self,
2119 val: &Value,
2120 detach: bool,
2121 writer: &mut Writer,
2122 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2123 ) -> Result<()> {
2124 match val {
2125 Value::Null => {
2126 }
2128 Value::Path(path) => {
2129 for edge in &path.edges {
2131 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2132 writer
2133 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2134 .await?;
2135 }
2136 for node in &path.nodes {
2137 self.execute_delete_vertex(
2138 node.vid,
2139 detach,
2140 Some(node.labels.clone()),
2141 writer,
2142 tx_l0,
2143 )
2144 .await?;
2145 }
2146 }
2147 _ => {
2148 if let Ok(path) = Path::try_from(val) {
2150 for edge in &path.edges {
2151 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2152 writer
2153 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2154 .await?;
2155 }
2156 for node in &path.nodes {
2157 self.execute_delete_vertex(
2158 node.vid,
2159 detach,
2160 Some(node.labels.clone()),
2161 writer,
2162 tx_l0,
2163 )
2164 .await?;
2165 }
2166 } else if let Ok(vid) = Self::vid_from_value(val) {
2167 let labels = Self::extract_labels_from_node(val);
2168 self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
2169 .await?;
2170 } else if let Value::Map(map) = val {
2171 self.execute_delete_edge_from_map(map, writer, tx_l0)
2172 .await?;
2173 } else if let Value::Edge(edge) = val {
2174 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2175 writer
2176 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2177 .await?;
2178 }
2179 }
2180 }
2181 Ok(())
2182 }
2183
2184 pub(crate) async fn execute_delete_vertex(
2186 &self,
2187 vid: Vid,
2188 detach: bool,
2189 labels: Option<Vec<String>>,
2190 writer: &mut Writer,
2191 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2192 ) -> Result<()> {
2193 if detach {
2194 self.detach_delete_vertex(vid, writer, tx_l0).await?;
2195 } else {
2196 self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
2197 }
2198 writer.delete_vertex(vid, labels, tx_l0).await?;
2199 Ok(())
2200 }
2201
2202 pub(crate) async fn check_vertex_has_no_edges(
2208 &self,
2209 vid: Vid,
2210 writer: &Writer,
2211 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2212 ) -> Result<()> {
2213 let schema = self.storage.schema_manager().schema();
2214 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
2215
2216 let mut tombstoned_eids = std::collections::HashSet::new();
2218 {
2219 let writer_l0 = writer.l0_manager.get_current();
2220 let guard = writer_l0.read();
2221 for &eid in guard.tombstones.keys() {
2222 tombstoned_eids.insert(eid);
2223 }
2224 }
2225 if let Some(tx) = tx_l0 {
2226 let guard = tx.read();
2227 for &eid in guard.tombstones.keys() {
2228 tombstoned_eids.insert(eid);
2229 }
2230 }
2231
2232 let out_graph = self
2233 .storage
2234 .load_subgraph_cached(
2235 &[vid],
2236 &edge_type_ids,
2237 1,
2238 uni_store::runtime::Direction::Outgoing,
2239 Some(writer.l0_manager.get_current()),
2240 )
2241 .await?;
2242 let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2243
2244 let in_graph = self
2245 .storage
2246 .load_subgraph_cached(
2247 &[vid],
2248 &edge_type_ids,
2249 1,
2250 uni_store::runtime::Direction::Incoming,
2251 Some(writer.l0_manager.get_current()),
2252 )
2253 .await?;
2254 let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2255
2256 if has_out || has_in {
2257 return Err(anyhow!(
2258 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
2259 vid
2260 ));
2261 }
2262 Ok(())
2263 }
2264
2265 pub(crate) async fn execute_delete_edge_from_map(
2267 &self,
2268 map: &HashMap<String, Value>,
2269 writer: &mut Writer,
2270 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2271 ) -> Result<()> {
2272 if map.get("_eid").is_some_and(|v| !v.is_null()) {
2274 let ei = self.extract_edge_identity(map)?;
2275 writer
2276 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
2277 .await?;
2278 }
2279 Ok(())
2280 }
2281
2282 fn make_scan_plan(
2288 label_id: u16,
2289 labels: Vec<String>,
2290 variable: String,
2291 filter: Option<Expr>,
2292 ) -> LogicalPlan {
2293 if label_id > 0 {
2294 LogicalPlan::Scan {
2295 label_id,
2296 labels,
2297 variable,
2298 filter,
2299 optional: false,
2300 }
2301 } else if !labels.is_empty() {
2302 LogicalPlan::ScanMainByLabels {
2304 labels,
2305 variable,
2306 filter,
2307 optional: false,
2308 }
2309 } else {
2310 LogicalPlan::ScanAll {
2311 variable,
2312 filter,
2313 optional: false,
2314 }
2315 }
2316 }
2317
2318 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
2321 if matches!(plan, LogicalPlan::Empty) {
2322 scan
2323 } else {
2324 LogicalPlan::CrossJoin {
2325 left: Box::new(plan),
2326 right: Box::new(scan),
2327 }
2328 }
2329 }
2330
2331 async fn resolve_merge_properties(
2338 &self,
2339 properties: &Option<Expr>,
2340 row: &HashMap<String, Value>,
2341 prop_manager: &PropertyManager,
2342 params: &HashMap<String, Value>,
2343 ctx: Option<&QueryContext>,
2344 ) -> Result<Option<Expr>> {
2345 let entries = match properties {
2346 Some(Expr::Map(entries)) => entries,
2347 other => return Ok(other.clone()),
2348 };
2349 let mut resolved = Vec::new();
2350 for (key, val_expr) in entries {
2351 if matches!(val_expr, Expr::Literal(_)) {
2352 resolved.push((key.clone(), val_expr.clone()));
2353 } else {
2354 let value = self
2355 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
2356 .await?;
2357 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
2358 }
2359 }
2360 Ok(Some(Expr::Map(resolved)))
2361 }
2362
2363 fn value_to_literal_expr(value: &Value) -> Expr {
2365 match value {
2366 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
2367 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
2368 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
2369 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
2370 Value::Null => Expr::Literal(CypherLiteral::Null),
2371 Value::List(items) => {
2372 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
2373 }
2374 Value::Map(entries) => Expr::Map(
2375 entries
2376 .iter()
2377 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
2378 .collect(),
2379 ),
2380 _ => Expr::Literal(CypherLiteral::Null),
2381 }
2382 }
2383
2384 pub(crate) async fn execute_merge_match(
2385 &self,
2386 pattern: &Pattern,
2387 row: &HashMap<String, Value>,
2388 prop_manager: &PropertyManager,
2389 params: &HashMap<String, Value>,
2390 ctx: Option<&QueryContext>,
2391 ) -> Result<Vec<HashMap<String, Value>>> {
2392 let planner =
2394 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
2395
2396 let mut plan = LogicalPlan::Empty;
2401 let mut vars_in_scope = Vec::new();
2402
2403 for key in row.keys() {
2405 vars_in_scope.push(key.clone());
2406 }
2407
2408 for path in &pattern.paths {
2410 let elements = &path.elements;
2411 let mut i = 0;
2412 while i < elements.len() {
2413 let part = &elements[i];
2414 match part {
2415 PatternElement::Node(n) => {
2416 let variable = n.variable.clone().unwrap_or_default();
2417
2418 let is_bound = !variable.is_empty() && row.contains_key(&variable);
2420
2421 if is_bound {
2422 let val = row.get(&variable).unwrap();
2425 let vid = Self::vid_from_value(val)?;
2426
2427 let extracted_labels =
2430 Self::extract_labels_from_node(val).unwrap_or_default();
2431 let label_id = {
2432 let schema = self.storage.schema_manager().schema();
2433 extracted_labels
2434 .first()
2435 .and_then(|l| schema.label_id_by_name(l))
2436 .unwrap_or(0)
2437 };
2438
2439 let resolved_props = self
2440 .resolve_merge_properties(
2441 &n.properties,
2442 row,
2443 prop_manager,
2444 params,
2445 ctx,
2446 )
2447 .await?;
2448 let prop_filter =
2449 planner.properties_to_expr(&variable, &resolved_props);
2450
2451 let vid_filter = Expr::BinaryOp {
2460 left: Box::new(Expr::Property(
2461 Box::new(Expr::Variable(variable.clone())),
2462 "_vid".to_string(),
2463 )),
2464 op: BinaryOp::Eq,
2465 right: Box::new(Expr::Literal(CypherLiteral::Integer(
2466 vid.as_u64() as i64,
2467 ))),
2468 };
2469
2470 let combined_filter = if let Some(pf) = prop_filter {
2471 Some(Expr::BinaryOp {
2472 left: Box::new(vid_filter),
2473 op: BinaryOp::And,
2474 right: Box::new(pf),
2475 })
2476 } else {
2477 Some(vid_filter)
2478 };
2479
2480 let scan = Self::make_scan_plan(
2481 label_id,
2482 extracted_labels,
2483 variable.clone(),
2484 combined_filter,
2485 );
2486 plan = Self::attach_scan(plan, scan);
2487 } else {
2488 let label_id = if n.labels.is_empty() {
2489 0
2491 } else {
2492 let label_name = &n.labels[0];
2493 let schema = self.storage.schema_manager().schema();
2494 schema
2497 .get_label_case_insensitive(label_name)
2498 .map(|m| m.id)
2499 .unwrap_or(0)
2500 };
2501
2502 let resolved_props = self
2503 .resolve_merge_properties(
2504 &n.properties,
2505 row,
2506 prop_manager,
2507 params,
2508 ctx,
2509 )
2510 .await?;
2511 let prop_filter =
2512 planner.properties_to_expr(&variable, &resolved_props);
2513 let scan = Self::make_scan_plan(
2514 label_id,
2515 n.labels.clone(),
2516 variable.clone(),
2517 prop_filter,
2518 );
2519 plan = Self::attach_scan(plan, scan);
2520
2521 if !n.labels.is_empty()
2528 && !variable.is_empty()
2529 && (label_id == 0 || n.labels.len() > 1)
2530 && let Some(label_filter) =
2531 planner.node_filter_expr(&variable, &n.labels, &None)
2532 {
2533 plan = LogicalPlan::Filter {
2534 input: Box::new(plan),
2535 predicate: label_filter,
2536 optional_variables: std::collections::HashSet::new(),
2537 };
2538 }
2539
2540 if !variable.is_empty() {
2541 vars_in_scope.push(variable.clone());
2542 }
2543 }
2544
2545 i += 1;
2547 while i < elements.len() {
2548 if let PatternElement::Relationship(r) = &elements[i] {
2549 let target_node_part = &elements[i + 1];
2550 if let PatternElement::Node(n_target) = target_node_part {
2551 let schema = self.storage.schema_manager().schema();
2552 let mut edge_type_ids = Vec::new();
2553
2554 if r.types.is_empty() {
2555 return Err(anyhow!("MERGE edge must have a type"));
2556 } else if r.types.len() > 1 {
2557 return Err(anyhow!(
2558 "MERGE does not support multiple edge types"
2559 ));
2560 } else {
2561 let type_name = &r.types[0];
2562 let type_id = self
2565 .storage
2566 .schema_manager()
2567 .get_or_assign_edge_type_id(type_name);
2568 edge_type_ids.push(type_id);
2569 }
2570
2571 let target_label_id: u16 = if let Some(lbl) =
2574 n_target.labels.first()
2575 {
2576 schema
2577 .get_label_case_insensitive(lbl)
2578 .map(|m| m.id)
2579 .unwrap_or(0)
2580 } else if let Some(var) = &n_target.variable {
2581 if let Some(val) = row.get(var) {
2582 if let Some(labels) =
2584 Self::extract_labels_from_node(val)
2585 {
2586 if let Some(first_label) = labels.first() {
2587 schema
2588 .get_label_case_insensitive(first_label)
2589 .map(|m| m.id)
2590 .unwrap_or(0)
2591 } else {
2592 0
2594 }
2595 } else if Self::vid_from_value(val).is_ok() {
2596 0
2598 } else {
2599 return Err(anyhow!(
2600 "Variable {} is not a node",
2601 var
2602 ));
2603 }
2604 } else {
2605 return Err(anyhow!(
2606 "MERGE pattern node must have a label or be a bound variable"
2607 ));
2608 }
2609 } else {
2610 return Err(anyhow!(
2611 "MERGE pattern node must have a label"
2612 ));
2613 };
2614
2615 let target_variable =
2616 n_target.variable.clone().unwrap_or_default();
2617 let source_variable = match &elements[i - 1] {
2618 PatternElement::Node(n) => {
2619 n.variable.clone().unwrap_or_default()
2620 }
2621 _ => String::new(),
2622 };
2623
2624 let is_variable_length = r.range.is_some();
2625 let type_name = &r.types[0];
2626
2627 let is_schemaless = edge_type_ids.iter().all(|id| {
2633 uni_common::core::edge_type::is_schemaless_edge_type(*id)
2634 });
2635
2636 if is_schemaless {
2637 plan = LogicalPlan::TraverseMainByType {
2638 type_names: vec![type_name.clone()],
2639 input: Box::new(plan),
2640 direction: r.direction.clone(),
2641 source_variable,
2642 target_variable: target_variable.clone(),
2643 step_variable: r.variable.clone(),
2644 min_hops: r
2645 .range
2646 .as_ref()
2647 .and_then(|r| r.min)
2648 .unwrap_or(1)
2649 as usize,
2650 max_hops: r
2651 .range
2652 .as_ref()
2653 .and_then(|r| r.max)
2654 .unwrap_or(1)
2655 as usize,
2656 optional: false,
2657 target_filter: None,
2658 path_variable: None,
2659 is_variable_length,
2660 optional_pattern_vars: std::collections::HashSet::new(),
2661 scope_match_variables: std::collections::HashSet::new(),
2662 edge_filter_expr: None,
2663 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2664 };
2665 } else {
2666 let mut edge_props = std::collections::HashSet::new();
2668 if let Some(Expr::Map(entries)) = &r.properties {
2669 for (key, _) in entries {
2670 edge_props.insert(key.clone());
2671 }
2672 }
2673 plan = LogicalPlan::Traverse {
2674 input: Box::new(plan),
2675 edge_type_ids: edge_type_ids.clone(),
2676 direction: r.direction.clone(),
2677 source_variable,
2678 target_variable: target_variable.clone(),
2679 target_label_id,
2680 step_variable: r.variable.clone(),
2681 min_hops: r
2682 .range
2683 .as_ref()
2684 .and_then(|r| r.min)
2685 .unwrap_or(1)
2686 as usize,
2687 max_hops: r
2688 .range
2689 .as_ref()
2690 .and_then(|r| r.max)
2691 .unwrap_or(1)
2692 as usize,
2693 optional: false,
2694 target_filter: None,
2695 path_variable: None,
2696 edge_properties: edge_props,
2697 is_variable_length,
2698 optional_pattern_vars: std::collections::HashSet::new(),
2699 scope_match_variables: std::collections::HashSet::new(),
2700 edge_filter_expr: None,
2701 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2702 qpp_steps: None,
2703 };
2704 }
2705
2706 if r.properties.is_some()
2708 && let Some(r_var) = &r.variable
2709 {
2710 let resolved_rel_props = self
2711 .resolve_merge_properties(
2712 &r.properties,
2713 row,
2714 prop_manager,
2715 params,
2716 ctx,
2717 )
2718 .await?;
2719 if let Some(prop_filter) =
2720 planner.properties_to_expr(r_var, &resolved_rel_props)
2721 {
2722 plan = LogicalPlan::Filter {
2723 input: Box::new(plan),
2724 predicate: prop_filter,
2725 optional_variables: std::collections::HashSet::new(
2726 ),
2727 };
2728 }
2729 }
2730
2731 if !target_variable.is_empty() {
2733 let resolved_target_props = self
2734 .resolve_merge_properties(
2735 &n_target.properties,
2736 row,
2737 prop_manager,
2738 params,
2739 ctx,
2740 )
2741 .await?;
2742 if let Some(prop_filter) = planner.properties_to_expr(
2743 &target_variable,
2744 &resolved_target_props,
2745 ) {
2746 plan = LogicalPlan::Filter {
2747 input: Box::new(plan),
2748 predicate: prop_filter,
2749 optional_variables: std::collections::HashSet::new(
2750 ),
2751 };
2752 }
2753 vars_in_scope.push(target_variable.clone());
2754 }
2755
2756 if let Some(sv) = &r.variable {
2757 vars_in_scope.push(sv.clone());
2758 }
2759 i += 2;
2760 } else {
2761 break;
2762 }
2763 } else {
2764 break;
2765 }
2766 }
2767 }
2768 _ => return Err(anyhow!("Pattern must start with a node")),
2769 }
2770 }
2771
2772 }
2774
2775 let db_matches = self
2776 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
2777 .await?;
2778
2779 let final_matches = db_matches
2786 .into_iter()
2787 .filter(|db_match| {
2788 row.iter().all(|(key, val)| {
2789 if key.is_empty() || key.starts_with("__") {
2790 return true;
2791 }
2792 let Some(db_val) = db_match.get(key) else {
2793 return true;
2794 };
2795 if db_val == val {
2796 return true;
2797 }
2798 matches!(
2800 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
2801 (Ok(v1), Ok(v2)) if v1 == v2
2802 )
2803 })
2804 })
2805 .map(|db_match| {
2806 let mut merged = row.clone();
2807 merged.extend(db_match);
2808 merged
2809 })
2810 .collect();
2811
2812 Ok(final_matches)
2813 }
2814
2815 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
2823 let has_path_vars = pattern
2824 .paths
2825 .iter()
2826 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
2827
2828 if !has_path_vars {
2829 return (pattern.clone(), Vec::new());
2830 }
2831
2832 let mut modified = pattern.clone();
2833 let mut temp_vars = Vec::new();
2834
2835 for path in &mut modified.paths {
2836 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
2837 continue;
2838 }
2839 for (idx, element) in path.elements.iter_mut().enumerate() {
2840 if let PatternElement::Relationship(r) = element
2841 && r.variable.as_ref().is_none_or(String::is_empty)
2842 {
2843 let temp_var = format!("__path_r_{}", idx);
2844 r.variable = Some(temp_var.clone());
2845 temp_vars.push(temp_var);
2846 }
2847 }
2848 }
2849
2850 (modified, temp_vars)
2851 }
2852
2853 fn bind_path_variables(
2858 pattern: &Pattern,
2859 row: &mut HashMap<String, Value>,
2860 temp_vars: &[String],
2861 ) {
2862 for path in &pattern.paths {
2863 let Some(path_var) = path.variable.as_ref() else {
2864 continue;
2865 };
2866 if path_var.is_empty() {
2867 continue;
2868 }
2869
2870 let mut nodes = Vec::new();
2871 let mut edges = Vec::new();
2872
2873 for element in &path.elements {
2874 match element {
2875 PatternElement::Node(n) => {
2876 if let Some(var) = &n.variable
2877 && let Some(val) = row.get(var)
2878 && let Some(node) = Self::value_to_node_for_path(val)
2879 {
2880 nodes.push(node);
2881 }
2882 }
2883 PatternElement::Relationship(r) => {
2884 if let Some(var) = &r.variable
2885 && let Some(val) = row.get(var)
2886 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
2887 {
2888 edges.push(edge);
2889 }
2890 }
2891 _ => {}
2892 }
2893 }
2894
2895 if !nodes.is_empty() {
2896 use uni_common::value::Path;
2897 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
2898 }
2899 }
2900
2901 for var in temp_vars {
2903 row.remove(var);
2904 }
2905 }
2906
2907 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
2909 match val {
2910 Value::Node(n) => Some(n.clone()),
2911 Value::Map(map) => {
2912 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
2913 let labels = if let Some(Value::List(l)) = map.get("_labels") {
2914 l.iter()
2915 .filter_map(|v| {
2916 if let Value::String(s) = v {
2917 Some(s.clone())
2918 } else {
2919 None
2920 }
2921 })
2922 .collect()
2923 } else {
2924 vec![]
2925 };
2926 let properties: HashMap<String, Value> = map
2927 .iter()
2928 .filter(|(k, _)| !k.starts_with('_'))
2929 .map(|(k, v)| (k.clone(), v.clone()))
2930 .collect();
2931 Some(uni_common::value::Node {
2932 vid,
2933 labels,
2934 properties,
2935 })
2936 }
2937 _ => None,
2938 }
2939 }
2940
2941 fn value_to_edge_for_path(
2943 val: &Value,
2944 type_names: &[String],
2945 ) -> Option<uni_common::value::Edge> {
2946 match val {
2947 Value::Edge(e) => Some(e.clone()),
2948 Value::Map(map) => {
2949 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
2950 let edge_type = map
2951 .get("_type_name")
2952 .and_then(|v| {
2953 if let Value::String(s) = v {
2954 Some(s.clone())
2955 } else {
2956 None
2957 }
2958 })
2959 .or_else(|| type_names.first().cloned())
2960 .unwrap_or_default();
2961 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
2962 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
2963 let properties: HashMap<String, Value> = map
2964 .iter()
2965 .filter(|(k, _)| !k.starts_with('_'))
2966 .map(|(k, v)| (k.clone(), v.clone()))
2967 .collect();
2968 Some(uni_common::value::Edge {
2969 eid,
2970 edge_type,
2971 src,
2972 dst,
2973 properties,
2974 })
2975 }
2976 _ => None,
2977 }
2978 }
2979}