1use super::core::*;
5use crate::query::planner::LogicalPlan;
6use anyhow::{Result, anyhow};
7use std::collections::HashMap;
8use std::sync::Arc;
9use uni_common::DataType;
10use uni_common::core::id::{Eid, Vid};
11use uni_common::core::schema::{Constraint, ConstraintTarget, ConstraintType, SchemaManager};
12use uni_common::{Path, Value};
13use uni_cypher::ast::{
14 AlterAction, AlterEdgeType, AlterLabel, BinaryOp, ConstraintType as AstConstraintType,
15 CreateConstraint, CreateEdgeType, CreateLabel, CypherLiteral, Direction, DropConstraint,
16 DropEdgeType, DropLabel, Expr, Pattern, PatternElement, RemoveItem, SetClause, SetItem,
17};
18use uni_store::QueryContext;
19use uni_store::runtime::property_manager::PropertyManager;
20use uni_store::runtime::writer::Writer;
21
22struct EdgeIdentity {
24 eid: Eid,
25 src: Vid,
26 dst: Vid,
27 edge_type_id: u32,
28}
29
30impl Executor {
31 pub(crate) fn extract_labels_from_node(node_val: &Value) -> Option<Vec<String>> {
38 match node_val {
39 Value::Map(map) => {
40 if let Some(Value::List(labels_arr)) = map.get("_labels") {
42 let labels: Vec<String> = labels_arr
43 .iter()
44 .filter_map(|v| v.as_str().map(|s| s.to_string()))
45 .collect();
46 if !labels.is_empty() {
47 return Some(labels);
48 }
49 }
50 None
51 }
52 Value::Node(node) => (!node.labels.is_empty()).then(|| node.labels.clone()),
53 _ => None,
54 }
55 }
56
57 pub(crate) fn extract_user_properties_from_value(
65 val: &Value,
66 ) -> Option<HashMap<String, Value>> {
67 match val {
68 Value::Map(map) => {
69 let is_node_map = map.contains_key("_vid") && map.contains_key("_labels");
73 let is_edge_map = map.contains_key("_eid")
74 && map.contains_key("_src")
75 && map.contains_key("_dst");
76
77 if is_node_map || is_edge_map {
78 let user_props: HashMap<String, Value> = map
80 .iter()
81 .filter(|(k, _)| !k.starts_with('_') && k.as_str() != "ext_id")
82 .map(|(k, v)| (k.clone(), v.clone()))
83 .collect();
84 if user_props.is_empty()
88 && let Some(Value::Map(all_props)) = map.get("_all_props")
89 {
90 return Some(all_props.clone());
91 }
92 Some(user_props)
93 } else {
94 Some(map.clone())
96 }
97 }
98 Value::Node(node) => Some(node.properties.clone()),
99 Value::Edge(edge) => Some(edge.properties.clone()),
100 _ => None,
101 }
102 }
103
104 #[expect(clippy::too_many_arguments)]
121 async fn apply_properties_to_entity(
122 &self,
123 variable: &str,
124 new_props: HashMap<String, Value>,
125 replace: bool,
126 row: &mut HashMap<String, Value>,
127 writer: &mut Writer,
128 prop_manager: &PropertyManager,
129 params: &HashMap<String, Value>,
130 ctx: Option<&QueryContext>,
131 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
132 ) -> Result<()> {
133 let target = row.get(variable).cloned();
135
136 match target {
137 Some(Value::Node(ref node)) => {
138 let vid = node.vid;
139 let labels = node.labels.clone();
140 let current = prop_manager
141 .get_all_vertex_props_with_ctx(vid, ctx)
142 .await?
143 .unwrap_or_default();
144 let write_props = Self::merge_props(current, new_props, replace);
145 let mut enriched = write_props.clone();
146 for label_name in &labels {
147 self.enrich_properties_with_generated_columns(
148 label_name,
149 &mut enriched,
150 prop_manager,
151 params,
152 ctx,
153 )
154 .await?;
155 }
156 let _ = writer
157 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
158 .await?;
159 if let Some(Value::Node(n)) = row.get_mut(variable) {
161 n.properties = enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
162 }
163 }
164 Some(ref node_val) if Self::vid_from_value(node_val).is_ok() => {
165 let vid = Self::vid_from_value(node_val)?;
166 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
167 let current = prop_manager
168 .get_all_vertex_props_with_ctx(vid, ctx)
169 .await?
170 .unwrap_or_default();
171 let write_props = Self::merge_props(current, new_props, replace);
172 let mut enriched = write_props.clone();
173 for label_name in &labels {
174 self.enrich_properties_with_generated_columns(
175 label_name,
176 &mut enriched,
177 prop_manager,
178 params,
179 ctx,
180 )
181 .await?;
182 }
183 let _ = writer
184 .insert_vertex_with_labels(vid, enriched.clone(), &labels, tx_l0)
185 .await?;
186 if let Some(Value::Map(node_map)) = row.get_mut(variable) {
188 node_map.retain(|k, _| k.starts_with('_') || k == "ext_id");
190 let effective: HashMap<String, Value> =
192 enriched.into_iter().filter(|(_, v)| !v.is_null()).collect();
193 for (k, v) in &effective {
194 node_map.insert(k.clone(), v.clone());
195 }
196 node_map.insert("_all_props".to_string(), Value::Map(effective));
198 }
199 }
200 Some(Value::Edge(ref edge)) => {
201 let eid = edge.eid;
202 let src = edge.src;
203 let dst = edge.dst;
204 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
205 let current = prop_manager
206 .get_all_edge_props_with_ctx(eid, ctx)
207 .await?
208 .unwrap_or_default();
209 let write_props = Self::merge_props(current, new_props, replace);
210 writer
211 .insert_edge(
212 src,
213 dst,
214 etype,
215 eid,
216 write_props.clone(),
217 Some(edge.edge_type.clone()),
218 tx_l0,
219 )
220 .await?;
221 if let Some(Value::Edge(e)) = row.get_mut(variable) {
223 e.properties = write_props
224 .into_iter()
225 .filter(|(_, v)| !v.is_null())
226 .collect();
227 }
228 }
229 Some(Value::Map(ref map))
230 if map.contains_key("_eid")
231 && map.contains_key("_src")
232 && map.contains_key("_dst") =>
233 {
234 let ei = self.extract_edge_identity(map)?;
235 let current = prop_manager
236 .get_all_edge_props_with_ctx(ei.eid, ctx)
237 .await?
238 .unwrap_or_default();
239 let write_props = Self::merge_props(current, new_props, replace);
240 let edge_type_name = map
241 .get("_type")
242 .and_then(|v| v.as_str())
243 .map(|s| s.to_string())
244 .or_else(|| {
245 self.storage
246 .schema_manager()
247 .edge_type_name_by_id_unified(ei.edge_type_id)
248 });
249 writer
250 .insert_edge(
251 ei.src,
252 ei.dst,
253 ei.edge_type_id,
254 ei.eid,
255 write_props.clone(),
256 edge_type_name,
257 tx_l0,
258 )
259 .await?;
260 if let Some(Value::Map(edge_map)) = row.get_mut(variable) {
262 edge_map.retain(|k, _| k.starts_with('_'));
263 let effective: HashMap<String, Value> = write_props
264 .into_iter()
265 .filter(|(_, v)| !v.is_null())
266 .collect();
267 for (k, v) in &effective {
268 edge_map.insert(k.clone(), v.clone());
269 }
270 edge_map.insert("_all_props".to_string(), Value::Map(effective));
272 }
273 }
274 _ => {
275 }
277 }
278 Ok(())
279 }
280
281 fn merge_props(
292 current: HashMap<String, Value>,
293 incoming: HashMap<String, Value>,
294 replace: bool,
295 ) -> HashMap<String, Value> {
296 if replace {
297 let mut result: HashMap<String, Value> = incoming
299 .iter()
300 .filter(|(_, v)| !v.is_null())
301 .map(|(k, v)| (k.clone(), v.clone()))
302 .collect();
303 for k in current.keys() {
306 if incoming.get(k).is_none_or(|v| v.is_null()) {
307 result.insert(k.clone(), Value::Null);
308 }
309 }
310 result
311 } else {
312 let mut result = current;
314 result.extend(incoming);
315 result
316 }
317 }
318
319 fn extract_edge_identity(&self, map: &HashMap<String, Value>) -> Result<EdgeIdentity> {
321 let eid = Eid::from(
322 map.get("_eid")
323 .and_then(|v| v.as_u64())
324 .ok_or_else(|| anyhow!("Invalid _eid"))?,
325 );
326 let src = Vid::from(
327 map.get("_src")
328 .and_then(|v| v.as_u64())
329 .ok_or_else(|| anyhow!("Invalid _src"))?,
330 );
331 let dst = Vid::from(
332 map.get("_dst")
333 .and_then(|v| v.as_u64())
334 .ok_or_else(|| anyhow!("Invalid _dst"))?,
335 );
336 let edge_type_id = self.resolve_edge_type_id(
337 map.get("_type")
338 .ok_or_else(|| anyhow!("Missing _type on edge map"))?,
339 )?;
340 Ok(EdgeIdentity {
341 eid,
342 src,
343 dst,
344 edge_type_id,
345 })
346 }
347
348 fn resolve_edge_type_id(&self, type_val: &Value) -> Result<u32> {
355 match type_val {
356 Value::Int(i) => Ok(*i as u32),
357 Value::String(name) => {
358 Ok(self
361 .storage
362 .schema_manager()
363 .get_or_assign_edge_type_id(name))
364 }
365 _ => Err(anyhow!(
366 "Invalid _type value: expected Int or String, got {:?}",
367 type_val
368 )),
369 }
370 }
371
372 pub(crate) async fn execute_vacuum(&self) -> Result<()> {
373 if let Some(writer_arc) = &self.writer {
374 {
376 let mut writer = writer_arc.write().await;
377 writer.flush_to_l1(None).await?;
378 } let compactor = uni_store::storage::compaction::Compactor::new(self.storage.clone());
382 let compaction_results = compactor.compact_all().await?;
383
384 let am = self.storage.adjacency_manager();
386 let schema = self.storage.schema_manager().schema();
387 for info in compaction_results {
388 let direction = match info.direction.as_str() {
390 "fwd" => uni_store::storage::direction::Direction::Outgoing,
391 "bwd" => uni_store::storage::direction::Direction::Incoming,
392 _ => continue,
393 };
394
395 if let Some(edge_type_id) =
397 schema.edge_type_id_unified_case_insensitive(&info.edge_type)
398 {
399 let _ = am.warm(&self.storage, edge_type_id, direction, None).await;
401 }
402 }
403 }
404 Ok(())
405 }
406
407 pub(crate) async fn execute_checkpoint(&self) -> Result<()> {
408 if let Some(writer_arc) = &self.writer {
409 let mut writer = writer_arc.write().await;
410 writer.flush_to_l1(Some("checkpoint".to_string())).await?;
411 }
412 Ok(())
413 }
414
415 pub(crate) async fn execute_copy_to(
416 &self,
417 identifier: &str,
418 path: &str,
419 format: &str,
420 options: &HashMap<String, Value>,
421 ) -> Result<usize> {
422 let schema = self.storage.schema_manager().schema();
424
425 if schema.get_edge_type_case_insensitive(identifier).is_some() {
427 return self
428 .export_edge_type_in_format(identifier, path, format)
429 .await;
430 }
431
432 if schema.get_label_case_insensitive(identifier).is_some() {
434 return self
435 .export_vertex_label_in_format(identifier, path, format, options)
436 .await;
437 }
438
439 Err(anyhow!("Unknown label or edge type: '{}'", identifier))
441 }
442
443 async fn export_vertex_label_in_format(
444 &self,
445 label: &str,
446 path: &str,
447 format: &str,
448 _options: &HashMap<String, Value>,
449 ) -> Result<usize> {
450 match format {
451 "parquet" => self.export_vertex_label(label, path).await,
452 "csv" => {
453 let mut stream = self
454 .storage
455 .scan_vertex_table_stream(label)
456 .await?
457 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
458
459 let mut all_rows = Vec::new();
461 let mut column_names = Vec::new();
462
463 use futures::StreamExt;
465 while let Some(batch_result) = stream.next().await {
466 let batch = batch_result?;
467
468 if column_names.is_empty() {
470 column_names = batch
471 .schema()
472 .fields()
473 .iter()
474 .filter(|f| !f.name().starts_with('_') && f.name() != "ext_id")
475 .map(|f| f.name().clone())
476 .collect();
477 }
478
479 for row_idx in 0..batch.num_rows() {
481 let mut row = Vec::new();
482 for field in batch.schema().fields() {
483 if field.name().starts_with('_') || field.name() == "ext_id" {
484 continue;
485 }
486
487 let col_idx = batch.schema().index_of(field.name())?;
488 let column = batch.column(col_idx);
489 let value = self.arrow_value_to_json(column, row_idx)?;
490
491 let csv_value = match value {
493 Value::Null => String::new(),
494 Value::Bool(b) => b.to_string(),
495 Value::Int(i) => i.to_string(),
496 Value::Float(f) => f.to_string(),
497 Value::String(s) => s,
498 _ => format!("{value}"),
499 };
500 row.push(csv_value);
501 }
502 all_rows.push(row);
503 }
504 }
505
506 let file = std::fs::File::create(path)?;
508 let mut wtr = csv::Writer::from_writer(file);
509
510 log::debug!("CSV export headers: {:?}", column_names);
512 wtr.write_record(&column_names)?;
513
514 for (i, row) in all_rows.iter().enumerate() {
516 log::debug!("CSV export row {}: {:?}", i, row);
517 wtr.write_record(row)?;
518 }
519
520 wtr.flush()?;
521 Ok(all_rows.len())
522 }
523 _ => Err(anyhow!(
524 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
525 format
526 )),
527 }
528 }
529
530 async fn export_edge_type_in_format(
531 &self,
532 edge_type: &str,
533 path: &str,
534 format: &str,
535 ) -> Result<usize> {
536 match format {
537 "parquet" => self.export_edge_type(edge_type, path).await,
538 "csv" => Err(anyhow!("CSV export not yet supported for edge types")),
539 _ => Err(anyhow!(
540 "COPY TO only supports 'parquet' and 'csv' formats, got '{}'",
541 format
542 )),
543 }
544 }
545
546 async fn write_batches_to_parquet(
549 mut stream: impl futures::Stream<Item = anyhow::Result<arrow_array::RecordBatch>> + Unpin,
550 path: &str,
551 entity_description: &str,
552 ) -> Result<usize> {
553 use futures::TryStreamExt;
554
555 let first_batch = match stream.try_next().await? {
557 Some(batch) => batch,
558 None => {
559 log::info!("No data to export from {}", entity_description);
560 return Ok(0);
561 }
562 };
563
564 let file = std::fs::File::create(path)?;
566 let arrow_schema = first_batch.schema();
567 let mut writer = parquet::arrow::ArrowWriter::try_new(file, arrow_schema, None)?;
568
569 let mut count = first_batch.num_rows();
571 writer.write(&first_batch)?;
572
573 while let Some(batch) = stream.try_next().await? {
575 count += batch.num_rows();
576 writer.write(&batch)?;
577 }
578
579 writer.close()?;
580
581 log::info!(
582 "Exported {} rows from {} to '{}'",
583 count,
584 entity_description,
585 path
586 );
587 Ok(count)
588 }
589
590 async fn export_vertex_label(&self, label: &str, path: &str) -> Result<usize> {
592 let stream = self
593 .storage
594 .scan_vertex_table_stream(label)
595 .await?
596 .ok_or_else(|| anyhow!("No data for label '{}'", label))?;
597
598 Self::write_batches_to_parquet(stream, path, &format!("label '{}'", label)).await
599 }
600
601 async fn export_edge_type(&self, edge_type: &str, path: &str) -> Result<usize> {
603 let schema = self.storage.schema_manager().schema();
604 if !schema.edge_types.contains_key(edge_type) {
605 return Err(anyhow!("Edge type '{}' not found", edge_type));
606 }
607
608 let filter = format!("type = '{}'", edge_type);
609 let stream = self
610 .storage
611 .scan_main_edge_table_stream(Some(&filter))
612 .await?
613 .ok_or_else(|| anyhow!("No edge data found"))?;
614
615 Self::write_batches_to_parquet(stream, path, &format!("edge type '{}'", edge_type)).await
616 }
617
618 pub(crate) async fn execute_copy_from(
619 &self,
620 label: &str,
621 path: &str,
622 format: &str,
623 options: &HashMap<String, Value>,
624 ) -> Result<usize> {
625 let batches = match format {
627 "parquet" => self.read_parquet_file(path)?,
628 "csv" => self.read_csv_file(path, label, options)?,
629 _ => {
630 return Err(anyhow!(
631 "COPY FROM only supports 'parquet' and 'csv' formats, got '{}'",
632 format
633 ));
634 }
635 };
636
637 let writer_arc = self
639 .writer
640 .as_ref()
641 .ok_or_else(|| anyhow!("No writer available"))?;
642
643 let db_schema = self.storage.schema_manager().schema();
644
645 let is_edge = db_schema.edge_type_id_by_name(label).is_some();
647
648 if is_edge {
649 let edge_type_id = db_schema
651 .edge_type_id_by_name(label)
652 .ok_or_else(|| anyhow!("Edge type '{}' not found in schema", label))?;
653
654 let src_col = options
656 .get("src_col")
657 .and_then(|v| v.as_str())
658 .unwrap_or("src");
659 let dst_col = options
660 .get("dst_col")
661 .and_then(|v| v.as_str())
662 .unwrap_or("dst");
663
664 let mut total_rows = 0;
665 for batch in batches {
666 let num_rows = batch.num_rows();
667
668 for row_idx in 0..num_rows {
669 let mut properties = HashMap::new();
670 let mut src_vid: Option<Vid> = None;
671 let mut dst_vid: Option<Vid> = None;
672
673 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
675 let col_name = field.name();
676 let column = batch.column(col_idx);
677 let value = self.arrow_value_to_json(column, row_idx)?;
678
679 if col_name == src_col {
680 let raw = value.as_u64().unwrap_or_else(|| {
681 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
682 });
683 src_vid = Some(Vid::new(raw));
684 } else if col_name == dst_col {
685 let raw = value.as_u64().unwrap_or_else(|| {
686 value.as_str().and_then(|s| s.parse().ok()).unwrap_or(0)
687 });
688 dst_vid = Some(Vid::new(raw));
689 } else if !col_name.starts_with('_') && !value.is_null() {
690 properties.insert(col_name.clone(), value);
691 }
692 }
693
694 let src = src_vid
695 .ok_or_else(|| anyhow!("Missing source VID column '{}'", src_col))?;
696 let dst = dst_vid
697 .ok_or_else(|| anyhow!("Missing destination VID column '{}'", dst_col))?;
698
699 let mut writer = writer_arc.write().await;
701 let eid = writer.next_eid(edge_type_id).await?;
702 writer
703 .insert_edge(
704 src,
705 dst,
706 edge_type_id,
707 eid,
708 properties,
709 Some(label.to_string()),
710 None,
711 )
712 .await?;
713
714 total_rows += 1;
715 }
716 }
717
718 log::info!(
719 "Imported {} edge rows from '{}' into edge type '{}'",
720 total_rows,
721 path,
722 label
723 );
724
725 if total_rows > 0 {
727 let mut writer = writer_arc.write().await;
728 writer.flush_to_l1(None).await?;
729 }
730
731 Ok(total_rows)
732 } else {
733 db_schema
736 .label_id_by_name_case_insensitive(label)
737 .ok_or_else(|| anyhow!("Label '{}' not found in schema", label))?;
738
739 let mut total_rows = 0;
740 for batch in batches {
741 let num_rows = batch.num_rows();
742
743 for row_idx in 0..num_rows {
745 let mut properties = HashMap::new();
746
747 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
749 let col_name = field.name();
750
751 if col_name.starts_with('_') {
753 continue;
754 }
755
756 let column = batch.column(col_idx);
757 let value = self.arrow_value_to_json(column, row_idx)?;
758
759 if !value.is_null() {
760 properties.insert(col_name.clone(), value);
761 }
762 }
763
764 let mut writer = writer_arc.write().await;
766 let vid = writer.next_vid().await?;
767 let _ = writer
768 .insert_vertex_with_labels(vid, properties, &[label.to_string()], None)
769 .await?;
770
771 total_rows += 1;
772 }
773 }
774
775 log::info!(
776 "Imported {} rows from '{}' into label '{}'",
777 total_rows,
778 path,
779 label
780 );
781
782 if total_rows > 0 {
784 let mut writer = writer_arc.write().await;
785 writer.flush_to_l1(None).await?;
786 }
787
788 Ok(total_rows)
789 }
790 }
791
792 fn arrow_value_to_json(&self, column: &arrow_array::ArrayRef, row_idx: usize) -> Result<Value> {
793 use arrow_array::Array;
794 use arrow_schema::DataType as ArrowDataType;
795
796 if column.is_null(row_idx) {
797 return Ok(Value::Null);
798 }
799
800 match column.data_type() {
801 ArrowDataType::Utf8 => {
802 let array = column
803 .as_any()
804 .downcast_ref::<arrow_array::StringArray>()
805 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?;
806 Ok(Value::String(array.value(row_idx).to_string()))
807 }
808 ArrowDataType::Int32 => {
809 let array = column
810 .as_any()
811 .downcast_ref::<arrow_array::Int32Array>()
812 .ok_or_else(|| anyhow!("Failed to downcast to Int32Array"))?;
813 Ok(Value::Int(array.value(row_idx) as i64))
814 }
815 ArrowDataType::Int64 => {
816 let array = column
817 .as_any()
818 .downcast_ref::<arrow_array::Int64Array>()
819 .ok_or_else(|| anyhow!("Failed to downcast to Int64Array"))?;
820 Ok(Value::Int(array.value(row_idx)))
821 }
822 ArrowDataType::Float32 => {
823 let array = column
824 .as_any()
825 .downcast_ref::<arrow_array::Float32Array>()
826 .ok_or_else(|| anyhow!("Failed to downcast to Float32Array"))?;
827 Ok(Value::Float(array.value(row_idx) as f64))
828 }
829 ArrowDataType::Float64 => {
830 let array = column
831 .as_any()
832 .downcast_ref::<arrow_array::Float64Array>()
833 .ok_or_else(|| anyhow!("Failed to downcast to Float64Array"))?;
834 Ok(Value::Float(array.value(row_idx)))
835 }
836 ArrowDataType::Boolean => {
837 let array = column
838 .as_any()
839 .downcast_ref::<arrow_array::BooleanArray>()
840 .ok_or_else(|| anyhow!("Failed to downcast to BooleanArray"))?;
841 Ok(Value::Bool(array.value(row_idx)))
842 }
843 ArrowDataType::UInt64 => {
844 let array = column
845 .as_any()
846 .downcast_ref::<arrow_array::UInt64Array>()
847 .ok_or_else(|| anyhow!("Failed to downcast to UInt64Array"))?;
848 Ok(Value::Int(array.value(row_idx) as i64))
849 }
850 _ => {
851 let array = column.as_any().downcast_ref::<arrow_array::StringArray>();
853 if let Some(arr) = array {
854 Ok(Value::String(arr.value(row_idx).to_string()))
855 } else {
856 Ok(Value::Null)
857 }
858 }
859 }
860 }
861
862 fn read_parquet_file(&self, path: &str) -> Result<Vec<arrow_array::RecordBatch>> {
863 let file = std::fs::File::open(path)?;
864 let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?
865 .build()?;
866 reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
867 }
868
869 fn read_csv_file(
870 &self,
871 path: &str,
872 label: &str,
873 options: &HashMap<String, Value>,
874 ) -> Result<Vec<arrow_array::RecordBatch>> {
875 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
876 use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
877 use std::sync::Arc;
878
879 let has_headers = options
881 .get("headers")
882 .and_then(|v| v.as_bool())
883 .unwrap_or(true);
884
885 let file = std::fs::File::open(path)?;
887 let mut rdr = csv::ReaderBuilder::new()
888 .has_headers(has_headers)
889 .from_reader(file);
890
891 let db_schema = self.storage.schema_manager().schema();
893 let properties = db_schema.properties.get(label);
894
895 let mut rows: Vec<Vec<String>> = Vec::new();
897 let headers: Vec<String> = if has_headers {
898 rdr.headers()?.iter().map(|s| s.to_string()).collect()
899 } else {
900 Vec::new()
901 };
902
903 for result in rdr.records() {
904 let record = result?;
905 rows.push(record.iter().map(|s| s.to_string()).collect());
906 }
907
908 if rows.is_empty() {
909 return Ok(Vec::new());
910 }
911
912 let mut arrow_fields: Vec<Arc<Field>> = Vec::new();
914 let col_names: Vec<String> = if has_headers {
915 headers
916 } else {
917 (0..rows[0].len()).map(|i| format!("col{}", i)).collect()
918 };
919
920 for name in &col_names {
921 let arrow_type = if let Some(props) = properties {
922 if let Some(prop_meta) = props.get(name) {
923 match prop_meta.r#type {
924 DataType::Int32 => ArrowDataType::Int32,
925 DataType::Int64 => ArrowDataType::Int64,
926 DataType::Float32 => ArrowDataType::Float32,
927 DataType::Float64 => ArrowDataType::Float64,
928 DataType::Bool => ArrowDataType::Boolean,
929 _ => ArrowDataType::Utf8,
930 }
931 } else {
932 ArrowDataType::Utf8
933 }
934 } else {
935 ArrowDataType::Utf8
936 };
937 arrow_fields.push(Arc::new(Field::new(name, arrow_type, true)));
938 }
939
940 let arrow_schema = Arc::new(ArrowSchema::new(arrow_fields.clone()));
941
942 let mut columns: Vec<ArrayRef> = Vec::new();
944 for (col_idx, field) in arrow_fields.iter().enumerate() {
945 match field.data_type() {
946 ArrowDataType::Int32 => {
947 let values: Vec<Option<i32>> = rows
948 .iter()
949 .map(|row| {
950 if col_idx < row.len() {
951 row[col_idx].parse().ok()
952 } else {
953 None
954 }
955 })
956 .collect();
957 columns.push(Arc::new(Int32Array::from(values)));
958 }
959 _ => {
960 let values: Vec<Option<String>> = rows
962 .iter()
963 .map(|row| {
964 if col_idx < row.len() {
965 Some(row[col_idx].clone())
966 } else {
967 None
968 }
969 })
970 .collect();
971 columns.push(Arc::new(StringArray::from(values)));
972 }
973 }
974 }
975
976 let batch = RecordBatch::try_new(arrow_schema, columns)?;
977 Ok(vec![batch])
978 }
979
980 fn parse_data_type(type_str: &str) -> Result<DataType> {
981 use uni_common::core::schema::{CrdtType, PointType};
982 let type_str = type_str.to_lowercase();
983 let type_str = type_str.trim();
984 match type_str {
985 "string" | "text" | "varchar" => Ok(DataType::String),
986 "int" | "integer" | "int32" => Ok(DataType::Int32),
987 "long" | "int64" | "bigint" => Ok(DataType::Int64),
988 "float" | "float32" | "real" => Ok(DataType::Float32),
989 "double" | "float64" => Ok(DataType::Float64),
990 "bool" | "boolean" => Ok(DataType::Bool),
991 "timestamp" => Ok(DataType::Timestamp),
992 "date" => Ok(DataType::Date),
993 "time" => Ok(DataType::Time),
994 "datetime" => Ok(DataType::DateTime),
995 "duration" => Ok(DataType::Duration),
996 "json" | "jsonb" => Ok(DataType::CypherValue),
997 "point" => Ok(DataType::Point(PointType::Cartesian2D)),
998 "point3d" => Ok(DataType::Point(PointType::Cartesian3D)),
999 "geopoint" | "geographic" => Ok(DataType::Point(PointType::Geographic)),
1000 s if s.starts_with("vector(") && s.ends_with(')') => {
1001 let dims_str = &s[7..s.len() - 1];
1002 let dimensions = dims_str
1003 .parse::<usize>()
1004 .map_err(|_| anyhow!("Invalid vector dimensions: {}", dims_str))?;
1005 Ok(DataType::Vector { dimensions })
1006 }
1007 s if s.starts_with("list<") && s.ends_with('>') => {
1008 let inner_type_str = &s[5..s.len() - 1];
1009 let inner_type = Self::parse_data_type(inner_type_str)?;
1010 Ok(DataType::List(Box::new(inner_type)))
1011 }
1012 "gcounter" => Ok(DataType::Crdt(CrdtType::GCounter)),
1013 "lwwregister" => Ok(DataType::Crdt(CrdtType::LWWRegister)),
1014 _ => Err(anyhow!("Unknown data type: {}", type_str)),
1015 }
1016 }
1017
1018 pub(crate) async fn execute_create_label(&self, clause: CreateLabel) -> Result<()> {
1019 let sm = self.storage.schema_manager_arc();
1020 if clause.if_not_exists && sm.schema().labels.contains_key(&clause.name) {
1021 return Ok(());
1022 }
1023 sm.add_label(&clause.name)?;
1024 for prop in clause.properties {
1025 let dt = Self::parse_data_type(&prop.data_type)?;
1026 sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1027 if prop.unique {
1028 let constraint = Constraint {
1029 name: format!("{}_{}_unique", clause.name, prop.name),
1030 constraint_type: ConstraintType::Unique {
1031 properties: vec![prop.name],
1032 },
1033 target: ConstraintTarget::Label(clause.name.clone()),
1034 enabled: true,
1035 };
1036 sm.add_constraint(constraint)?;
1037 }
1038 }
1039 sm.save().await?;
1040 Ok(())
1041 }
1042
1043 pub(crate) async fn enrich_properties_with_generated_columns(
1044 &self,
1045 label_name: &str,
1046 properties: &mut HashMap<String, Value>,
1047 prop_manager: &PropertyManager,
1048 params: &HashMap<String, Value>,
1049 ctx: Option<&QueryContext>,
1050 ) -> Result<()> {
1051 let schema = self.storage.schema_manager().schema();
1052
1053 if let Some(props_meta) = schema.properties.get(label_name) {
1054 let mut generators = Vec::new();
1055 for (prop_name, meta) in props_meta {
1056 if let Some(expr_str) = &meta.generation_expression {
1057 generators.push((prop_name.clone(), expr_str.clone()));
1058 }
1059 }
1060
1061 for (prop_name, expr_str) in generators {
1062 let cache_key = (label_name.to_string(), prop_name.clone());
1063 let expr = {
1064 let cache = self.gen_expr_cache.read().await;
1065 cache.get(&cache_key).cloned()
1066 };
1067
1068 let expr = match expr {
1069 Some(e) => e,
1070 None => {
1071 let parsed = uni_cypher::parse_expression(&expr_str)
1072 .map_err(|e| anyhow!("Failed to parse generation expression: {}", e))?;
1073 let mut cache = self.gen_expr_cache.write().await;
1074 cache.insert(cache_key, parsed.clone());
1075 parsed
1076 }
1077 };
1078
1079 let mut scope = HashMap::new();
1080
1081 if let Some(var) = expr.extract_variable() {
1083 scope.insert(var, Value::Map(properties.clone()));
1084 } else {
1085 for (k, v) in properties.iter() {
1088 scope.insert(k.clone(), v.clone());
1089 }
1090 }
1091
1092 let val = self
1093 .evaluate_expr(&expr, &scope, prop_manager, params, ctx)
1094 .await?;
1095 properties.insert(prop_name, val);
1096 }
1097 }
1098 Ok(())
1099 }
1100
1101 pub(crate) async fn execute_create_edge_type(&self, clause: CreateEdgeType) -> Result<()> {
1102 let sm = self.storage.schema_manager_arc();
1103 if clause.if_not_exists && sm.schema().edge_types.contains_key(&clause.name) {
1104 return Ok(());
1105 }
1106 sm.add_edge_type(&clause.name, clause.src_labels, clause.dst_labels)?;
1107 for prop in clause.properties {
1108 let dt = Self::parse_data_type(&prop.data_type)?;
1109 sm.add_property(&clause.name, &prop.name, dt, prop.nullable)?;
1110 }
1111 sm.save().await?;
1112 Ok(())
1113 }
1114
1115 pub(crate) async fn execute_alter_entity(
1120 sm: &Arc<SchemaManager>,
1121 entity_name: &str,
1122 action: AlterAction,
1123 ) -> Result<()> {
1124 match action {
1125 AlterAction::AddProperty(prop) => {
1126 let dt = Self::parse_data_type(&prop.data_type)?;
1127 sm.add_property(entity_name, &prop.name, dt, prop.nullable)?;
1128 }
1129 AlterAction::DropProperty(prop_name) => {
1130 sm.drop_property(entity_name, &prop_name)?;
1131 }
1132 AlterAction::RenameProperty { old_name, new_name } => {
1133 sm.rename_property(entity_name, &old_name, &new_name)?;
1134 }
1135 }
1136 sm.save().await?;
1137 Ok(())
1138 }
1139
1140 pub(crate) async fn execute_alter_label(&self, clause: AlterLabel) -> Result<()> {
1141 Self::execute_alter_entity(
1142 &self.storage.schema_manager_arc(),
1143 &clause.name,
1144 clause.action,
1145 )
1146 .await
1147 }
1148
1149 pub(crate) async fn execute_alter_edge_type(&self, clause: AlterEdgeType) -> Result<()> {
1150 Self::execute_alter_entity(
1151 &self.storage.schema_manager_arc(),
1152 &clause.name,
1153 clause.action,
1154 )
1155 .await
1156 }
1157
1158 pub(crate) async fn execute_drop_label(&self, clause: DropLabel) -> Result<()> {
1159 let sm = self.storage.schema_manager_arc();
1160 sm.drop_label(&clause.name, clause.if_exists)?;
1161 sm.save().await?;
1162 Ok(())
1163 }
1164
1165 pub(crate) async fn execute_drop_edge_type(&self, clause: DropEdgeType) -> Result<()> {
1166 let sm = self.storage.schema_manager_arc();
1167 sm.drop_edge_type(&clause.name, clause.if_exists)?;
1168 sm.save().await?;
1169 Ok(())
1170 }
1171
1172 pub(crate) async fn execute_create_constraint(&self, clause: CreateConstraint) -> Result<()> {
1173 let sm = self.storage.schema_manager_arc();
1174 let target = ConstraintTarget::Label(clause.label);
1175 let c_type = match clause.constraint_type {
1176 AstConstraintType::Unique | AstConstraintType::NodeKey => ConstraintType::Unique {
1177 properties: clause.properties,
1178 },
1179 AstConstraintType::Exists => {
1180 let property = clause
1181 .properties
1182 .into_iter()
1183 .next()
1184 .ok_or_else(|| anyhow!("EXISTS constraint requires a property"))?;
1185 ConstraintType::Exists { property }
1186 }
1187 AstConstraintType::Check => {
1188 let expression = clause
1189 .expression
1190 .ok_or_else(|| anyhow!("CHECK constraint requires an expression"))?;
1191 ConstraintType::Check {
1192 expression: expression.to_string_repr(),
1193 }
1194 }
1195 };
1196
1197 let constraint = Constraint {
1198 name: clause.name.unwrap_or_else(|| "auto_constraint".to_string()),
1199 constraint_type: c_type,
1200 target,
1201 enabled: true,
1202 };
1203
1204 sm.add_constraint(constraint)?;
1205 sm.save().await?;
1206 Ok(())
1207 }
1208
1209 pub(crate) async fn execute_drop_constraint(&self, clause: DropConstraint) -> Result<()> {
1210 let sm = self.storage.schema_manager_arc();
1211 sm.drop_constraint(&clause.name, false)?;
1212 sm.save().await?;
1213 Ok(())
1214 }
1215
1216 fn get_composite_constraint(&self, label: &str) -> Option<Constraint> {
1217 let schema = self.storage.schema_manager().schema();
1218 schema
1219 .constraints
1220 .iter()
1221 .find(|c| {
1222 if !c.enabled {
1223 return false;
1224 }
1225 match &c.target {
1226 ConstraintTarget::Label(l) if l == label => {
1227 matches!(c.constraint_type, ConstraintType::Unique { .. })
1228 }
1229 _ => false,
1230 }
1231 })
1232 .cloned()
1233 }
1234
1235 #[expect(clippy::too_many_arguments)]
1236 pub(crate) async fn execute_merge(
1237 &self,
1238 rows: Vec<HashMap<String, Value>>,
1239 pattern: &Pattern,
1240 on_match: Option<&SetClause>,
1241 on_create: Option<&SetClause>,
1242 prop_manager: &PropertyManager,
1243 params: &HashMap<String, Value>,
1244 ctx: Option<&QueryContext>,
1245 tx_l0_override: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1246 ) -> Result<Vec<HashMap<String, Value>>> {
1247 let writer_lock = self
1248 .writer
1249 .as_ref()
1250 .ok_or_else(|| anyhow!("Write operation requires a Writer"))?;
1251
1252 let (path_pattern, temp_vars) = Self::prepare_pattern_for_path_binding(pattern);
1255
1256 let mut results = Vec::new();
1257 for mut row in rows {
1258 let mut optimized_vid = None;
1260 if pattern.paths.len() == 1 {
1261 let path = &pattern.paths[0];
1262 if path.elements.len() == 1
1263 && let PatternElement::Node(n) = &path.elements[0]
1264 && n.labels.len() == 1
1265 && let Some(constraint) = self.get_composite_constraint(&n.labels[0])
1266 && let ConstraintType::Unique { properties } = constraint.constraint_type
1267 {
1268 let label = &n.labels[0];
1269 let mut pattern_props = HashMap::new();
1271 if let Some(props_expr) = &n.properties {
1272 let val = self
1273 .evaluate_expr(props_expr, &row, prop_manager, params, ctx)
1274 .await?;
1275 if let Value::Map(map) = val {
1276 for (k, v) in map {
1277 pattern_props.insert(k, v);
1278 }
1279 }
1280 }
1281
1282 let has_all_keys = properties.iter().all(|p| pattern_props.contains_key(p));
1284 if has_all_keys {
1285 let key_props: HashMap<String, serde_json::Value> = properties
1287 .iter()
1288 .filter_map(|p| {
1289 pattern_props.get(p).map(|v| (p.clone(), v.clone().into()))
1290 })
1291 .collect();
1292
1293 if let Ok(Some(vid)) = self
1295 .storage
1296 .index_manager()
1297 .composite_lookup(label, &key_props)
1298 .await
1299 {
1300 optimized_vid = Some((vid, pattern_props));
1301 }
1302 }
1303 }
1304 }
1305
1306 if let Some((vid, _pattern_props)) = optimized_vid {
1307 let mut writer = writer_lock.write().await;
1309
1310 let mut match_row = row.clone();
1311 if let PatternElement::Node(n) = &pattern.paths[0].elements[0]
1312 && let Some(var) = &n.variable
1313 {
1314 match_row.insert(var.clone(), Value::Int(vid.as_u64() as i64));
1315 }
1316
1317 let result = if let Some(set) = on_match {
1318 self.execute_set_items_locked(
1319 &set.items,
1320 &mut match_row,
1321 &mut writer,
1322 prop_manager,
1323 params,
1324 ctx,
1325 tx_l0_override,
1326 )
1327 .await
1328 } else {
1329 Ok(())
1330 };
1331
1332 drop(writer);
1333 result?;
1334
1335 Self::bind_path_variables(&path_pattern, &mut match_row, &temp_vars);
1336 results.push(match_row);
1337 } else {
1338 let matches = self
1340 .execute_merge_match(pattern, &row, prop_manager, params, ctx)
1341 .await?;
1342 let mut writer = writer_lock.write().await;
1343
1344 let result: Result<Vec<HashMap<String, Value>>> = async {
1345 let mut batch = Vec::new();
1346 if !matches.is_empty() {
1347 for mut m in matches {
1348 if let Some(set) = on_match {
1349 self.execute_set_items_locked(
1350 &set.items,
1351 &mut m,
1352 &mut writer,
1353 prop_manager,
1354 params,
1355 ctx,
1356 tx_l0_override,
1357 )
1358 .await?;
1359 }
1360 Self::bind_path_variables(&path_pattern, &mut m, &temp_vars);
1361 batch.push(m);
1362 }
1363 } else {
1364 self.execute_create_pattern(
1365 &path_pattern,
1366 &mut row,
1367 &mut writer,
1368 prop_manager,
1369 params,
1370 ctx,
1371 tx_l0_override,
1372 )
1373 .await?;
1374 if let Some(set) = on_create {
1375 self.execute_set_items_locked(
1376 &set.items,
1377 &mut row,
1378 &mut writer,
1379 prop_manager,
1380 params,
1381 ctx,
1382 tx_l0_override,
1383 )
1384 .await?;
1385 }
1386 Self::bind_path_variables(&path_pattern, &mut row, &temp_vars);
1387 batch.push(row);
1388 }
1389 Ok(batch)
1390 }
1391 .await;
1392
1393 drop(writer);
1394 results.extend(result?);
1395 }
1396 }
1397 Ok(results)
1398 }
1399
1400 #[expect(clippy::too_many_arguments)]
1402 pub(crate) async fn execute_create_pattern(
1403 &self,
1404 pattern: &Pattern,
1405 row: &mut HashMap<String, Value>,
1406 writer: &mut Writer,
1407 prop_manager: &PropertyManager,
1408 params: &HashMap<String, Value>,
1409 ctx: Option<&QueryContext>,
1410 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1411 ) -> Result<()> {
1412 for path in &pattern.paths {
1413 let mut prev_vid: Option<Vid> = None;
1414 type PendingRel = (String, u32, String, Option<Expr>, Direction);
1416 let mut rel_pending: Option<PendingRel> = None;
1417
1418 for element in &path.elements {
1419 match element {
1420 PatternElement::Node(n) => {
1421 let mut vid = None;
1422
1423 if let Some(var) = &n.variable
1425 && let Some(val) = row.get(var)
1426 && let Ok(existing_vid) = Self::vid_from_value(val)
1427 {
1428 vid = Some(existing_vid);
1429 }
1430
1431 if vid.is_none() {
1433 let mut props = HashMap::new();
1434 if let Some(props_expr) = &n.properties {
1435 let props_val = self
1436 .evaluate_expr(props_expr, row, prop_manager, params, ctx)
1437 .await?;
1438 if let Value::Map(map) = props_val {
1439 for (k, v) in map {
1440 props.insert(k, v);
1441 }
1442 } else {
1443 return Err(anyhow!("Properties must evaluate to a map"));
1444 }
1445 }
1446
1447 let schema = self.storage.schema_manager().schema();
1449
1450 let new_vid = writer.next_vid().await?;
1452
1453 for label_name in &n.labels {
1455 if schema.get_label_case_insensitive(label_name).is_some() {
1456 self.enrich_properties_with_generated_columns(
1457 label_name,
1458 &mut props,
1459 prop_manager,
1460 params,
1461 ctx,
1462 )
1463 .await?;
1464 }
1465 }
1466
1467 let final_props = writer
1469 .insert_vertex_with_labels(new_vid, props, &n.labels, tx_l0)
1470 .await?;
1471
1472 if let Some(var) = &n.variable {
1474 let mut obj = HashMap::new();
1475 obj.insert("_vid".to_string(), Value::Int(new_vid.as_u64() as i64));
1476 let labels_list: Vec<Value> =
1477 n.labels.iter().map(|l| Value::String(l.clone())).collect();
1478 obj.insert("_labels".to_string(), Value::List(labels_list));
1479 for (k, v) in &final_props {
1480 obj.insert(k.clone(), v.clone());
1481 }
1482 row.insert(var.clone(), Value::Map(obj));
1484 }
1485 vid = Some(new_vid);
1486 }
1487
1488 let current_vid = vid.unwrap();
1489
1490 if let Some((rel_var, type_id, type_name, rel_props_expr, dir)) =
1491 rel_pending.take()
1492 && let Some(src) = prev_vid
1493 {
1494 let is_rel_bound = !rel_var.is_empty() && row.contains_key(&rel_var);
1495
1496 if !is_rel_bound {
1497 let mut rel_props = HashMap::new();
1498 if let Some(expr) = rel_props_expr {
1499 let val = self
1500 .evaluate_expr(&expr, row, prop_manager, params, ctx)
1501 .await?;
1502 if let Value::Map(map) = val {
1503 rel_props.extend(map);
1504 }
1505 }
1506 let eid = writer.next_eid(type_id).await?;
1507
1508 let (edge_src, edge_dst) = match dir {
1510 Direction::Incoming => (current_vid, src),
1511 _ => (src, current_vid),
1512 };
1513
1514 let store_props = !rel_var.is_empty();
1515 let user_props = if store_props {
1516 rel_props.clone()
1517 } else {
1518 HashMap::new()
1519 };
1520
1521 writer
1522 .insert_edge(
1523 edge_src,
1524 edge_dst,
1525 type_id,
1526 eid,
1527 rel_props,
1528 Some(type_name.clone()),
1529 tx_l0,
1530 )
1531 .await?;
1532
1533 if store_props {
1536 let mut edge_map = HashMap::new();
1537 edge_map.insert(
1538 "_eid".to_string(),
1539 Value::Int(eid.as_u64() as i64),
1540 );
1541 edge_map.insert(
1542 "_src".to_string(),
1543 Value::Int(edge_src.as_u64() as i64),
1544 );
1545 edge_map.insert(
1546 "_dst".to_string(),
1547 Value::Int(edge_dst.as_u64() as i64),
1548 );
1549 edge_map
1550 .insert("_type".to_string(), Value::Int(type_id as i64));
1551 for (k, v) in user_props {
1553 edge_map.insert(k, v);
1554 }
1555 row.insert(rel_var, Value::Map(edge_map));
1556 }
1557 }
1558 }
1559 prev_vid = Some(current_vid);
1560 }
1561 PatternElement::Relationship(r) => {
1562 if r.types.len() != 1 {
1563 return Err(anyhow!(
1564 "CREATE relationship must specify exactly one type"
1565 ));
1566 }
1567 let type_name = &r.types[0];
1568 let type_id = self
1570 .storage
1571 .schema_manager()
1572 .get_or_assign_edge_type_id(type_name);
1573
1574 rel_pending = Some((
1575 r.variable.clone().unwrap_or_default(),
1576 type_id,
1577 type_name.clone(),
1578 r.properties.clone(),
1579 r.direction.clone(),
1580 ));
1581 }
1582 PatternElement::Parenthesized { .. } => {
1583 return Err(anyhow!("Parenthesized pattern not supported in CREATE"));
1584 }
1585 }
1586 }
1587 }
1588 Ok(())
1589 }
1590
1591 fn validate_property_value(
1595 prop_name: &str,
1596 val: &Value,
1597 schema: &uni_common::core::schema::Schema,
1598 labels: &[String],
1599 ) -> Result<()> {
1600 for label in labels {
1602 if let Some(props) = schema.properties.get(label)
1603 && let Some(prop_meta) = props.get(prop_name)
1604 && prop_meta.r#type == uni_common::core::schema::DataType::CypherValue
1605 {
1606 return Ok(());
1607 }
1608 }
1609
1610 match val {
1611 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path(_) => {
1612 anyhow::bail!(
1613 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1614 prop_name
1615 );
1616 }
1617 Value::List(items) => {
1618 for item in items {
1619 match item {
1620 Value::Map(_)
1621 | Value::Node(_)
1622 | Value::Edge(_)
1623 | Value::Path(_)
1624 | Value::List(_) => {
1625 anyhow::bail!(
1626 "TypeError: InvalidPropertyType - Property '{}' has an invalid type",
1627 prop_name
1628 );
1629 }
1630 _ => {}
1631 }
1632 }
1633 }
1634 _ => {}
1635 }
1636 Ok(())
1637 }
1638
1639 #[expect(clippy::too_many_arguments)]
1640 pub(crate) async fn execute_set_items_locked(
1641 &self,
1642 items: &[SetItem],
1643 row: &mut HashMap<String, Value>,
1644 writer: &mut Writer,
1645 prop_manager: &PropertyManager,
1646 params: &HashMap<String, Value>,
1647 ctx: Option<&QueryContext>,
1648 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1649 ) -> Result<()> {
1650 for item in items {
1651 match item {
1652 SetItem::Property { expr, value } => {
1653 if let Expr::Property(var_expr, prop_name) = expr
1654 && let Expr::Variable(var_name) = &**var_expr
1655 && let Some(node_val) = row.get(var_name)
1656 {
1657 if let Ok(vid) = Self::vid_from_value(node_val) {
1658 let labels =
1659 Self::extract_labels_from_node(node_val).unwrap_or_default();
1660 let schema = self.storage.schema_manager().schema().clone();
1661 let mut props = prop_manager
1662 .get_all_vertex_props_with_ctx(vid, ctx)
1663 .await?
1664 .unwrap_or_default();
1665 let val = self
1666 .evaluate_expr(value, row, prop_manager, params, ctx)
1667 .await?;
1668 Self::validate_property_value(prop_name, &val, &schema, &labels)?;
1669 props.insert(prop_name.clone(), val.clone());
1670
1671 for label_name in &labels {
1673 self.enrich_properties_with_generated_columns(
1674 label_name,
1675 &mut props,
1676 prop_manager,
1677 params,
1678 ctx,
1679 )
1680 .await?;
1681 }
1682
1683 let _ = writer
1684 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1685 .await?;
1686
1687 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1689 node_map.insert(prop_name.clone(), val);
1690 } else if let Some(Value::Node(node)) = row.get_mut(var_name) {
1691 node.properties.insert(prop_name.clone(), val);
1692 }
1693 } else if let Value::Map(map) = node_val
1694 && map.get("_eid").is_some_and(|v| !v.is_null())
1695 && map.get("_src").is_some_and(|v| !v.is_null())
1696 && map.get("_dst").is_some_and(|v| !v.is_null())
1697 && map.get("_type").is_some_and(|v| !v.is_null())
1698 {
1699 let ei = self.extract_edge_identity(map)?;
1700 let schema = self.storage.schema_manager().schema().clone();
1701 let edge_type_name = match map.get("_type") {
1703 Some(Value::String(s)) => s.clone(),
1704 Some(Value::Int(id)) => schema
1705 .edge_type_name_by_id_unified(*id as u32)
1706 .unwrap_or_else(|| format!("EdgeType{}", id)),
1707 _ => String::new(),
1708 };
1709
1710 let mut props = prop_manager
1711 .get_all_edge_props_with_ctx(ei.eid, ctx)
1712 .await?
1713 .unwrap_or_default();
1714 let val = self
1715 .evaluate_expr(value, row, prop_manager, params, ctx)
1716 .await?;
1717 Self::validate_property_value(
1718 prop_name,
1719 &val,
1720 &schema,
1721 std::slice::from_ref(&edge_type_name),
1722 )?;
1723 props.insert(prop_name.clone(), val.clone());
1724 writer
1725 .insert_edge(
1726 ei.src,
1727 ei.dst,
1728 ei.edge_type_id,
1729 ei.eid,
1730 props,
1731 Some(edge_type_name.clone()),
1732 tx_l0,
1733 )
1734 .await?;
1735
1736 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1738 edge_map.insert(prop_name.clone(), val);
1739 } else if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1740 edge.properties.insert(prop_name.clone(), val);
1741 }
1742 } else if let Value::Edge(edge) = node_val {
1743 let eid = edge.eid;
1745 let src = edge.src;
1746 let dst = edge.dst;
1747 let edge_type_name = edge.edge_type.clone();
1748 let etype =
1749 self.resolve_edge_type_id(&Value::String(edge_type_name.clone()))?;
1750 let schema = self.storage.schema_manager().schema().clone();
1751
1752 let mut props = prop_manager
1753 .get_all_edge_props_with_ctx(eid, ctx)
1754 .await?
1755 .unwrap_or_default();
1756 let val = self
1757 .evaluate_expr(value, row, prop_manager, params, ctx)
1758 .await?;
1759 Self::validate_property_value(
1760 prop_name,
1761 &val,
1762 &schema,
1763 std::slice::from_ref(&edge_type_name),
1764 )?;
1765 props.insert(prop_name.clone(), val.clone());
1766 writer
1767 .insert_edge(
1768 src,
1769 dst,
1770 etype,
1771 eid,
1772 props,
1773 Some(edge_type_name.clone()),
1774 tx_l0,
1775 )
1776 .await?;
1777
1778 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
1780 edge.properties.insert(prop_name.clone(), val);
1781 }
1782 }
1783 }
1784 }
1785 SetItem::Labels { variable, labels } => {
1786 if let Some(node_val) = row.get(variable)
1787 && let Ok(vid) = Self::vid_from_value(node_val)
1788 {
1789 let current_labels =
1791 Self::extract_labels_from_node(node_val).unwrap_or_default();
1792
1793 let labels_to_add: Vec<_> = labels
1795 .iter()
1796 .filter(|l| !current_labels.contains(l))
1797 .cloned()
1798 .collect();
1799
1800 if !labels_to_add.is_empty() {
1801 if let Some(ctx) = ctx {
1804 ctx.l0.write().add_vertex_labels(vid, &labels_to_add);
1805 }
1806
1807 if let Some(Value::Map(obj)) = row.get_mut(variable) {
1809 let mut updated_labels = current_labels;
1810 updated_labels.extend(labels_to_add);
1811 let labels_list =
1812 updated_labels.into_iter().map(Value::String).collect();
1813 obj.insert("_labels".to_string(), Value::List(labels_list));
1814 }
1815 }
1816 }
1817 }
1818 SetItem::Variable { variable, value }
1819 | SetItem::VariablePlus { variable, value } => {
1820 let replace = matches!(item, SetItem::Variable { .. });
1821 let op_str = if replace { "=" } else { "+=" };
1822
1823 if matches!(row.get(variable.as_str()), None | Some(Value::Null)) {
1825 continue;
1826 }
1827 let rhs = self
1828 .evaluate_expr(value, row, prop_manager, params, ctx)
1829 .await?;
1830 let new_props =
1831 Self::extract_user_properties_from_value(&rhs).ok_or_else(|| {
1832 anyhow!(
1833 "SET {} {} expr: right-hand side must evaluate to a map, \
1834 node, or relationship",
1835 variable,
1836 op_str
1837 )
1838 })?;
1839 self.apply_properties_to_entity(
1840 variable,
1841 new_props,
1842 replace,
1843 row,
1844 writer,
1845 prop_manager,
1846 params,
1847 ctx,
1848 tx_l0,
1849 )
1850 .await?;
1851 }
1852 }
1853 }
1854 Ok(())
1855 }
1856
1857 pub(crate) async fn execute_remove_items_locked(
1865 &self,
1866 items: &[RemoveItem],
1867 row: &mut HashMap<String, Value>,
1868 writer: &mut Writer,
1869 prop_manager: &PropertyManager,
1870 ctx: Option<&QueryContext>,
1871 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
1872 ) -> Result<()> {
1873 let mut prop_removals: Vec<(String, Vec<String>)> = Vec::new();
1876
1877 for item in items {
1878 match item {
1879 RemoveItem::Property(expr) => {
1880 if let Expr::Property(var_expr, prop_name) = expr
1881 && let Expr::Variable(var_name) = &**var_expr
1882 {
1883 if let Some(entry) = prop_removals.iter_mut().find(|(v, _)| v == var_name) {
1884 entry.1.push(prop_name.clone());
1885 } else {
1886 prop_removals.push((var_name.clone(), vec![prop_name.clone()]));
1887 }
1888 }
1889 }
1890 RemoveItem::Labels { variable, labels } => {
1891 self.execute_remove_labels(variable, labels, row, ctx)?;
1892 }
1893 }
1894 }
1895
1896 for (var_name, prop_names) in &prop_removals {
1898 let Some(node_val) = row.get(var_name) else {
1899 continue;
1900 };
1901
1902 if let Ok(vid) = Self::vid_from_value(node_val) {
1903 let mut props = prop_manager
1905 .get_all_vertex_props_with_ctx(vid, ctx)
1906 .await?
1907 .unwrap_or_default();
1908
1909 let removed_count = prop_names
1911 .iter()
1912 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1913 .count();
1914 let any_exist = removed_count > 0;
1915 if any_exist {
1916 writer.track_properties_removed(removed_count, tx_l0);
1917 for prop_name in prop_names {
1918 props.insert(prop_name.clone(), Value::Null);
1919 }
1920 }
1921 let effective: HashMap<String, Value> = props
1923 .iter()
1924 .filter(|(_, v)| !v.is_null())
1925 .map(|(k, v)| (k.clone(), v.clone()))
1926 .collect();
1927 if any_exist {
1928 let labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
1929 let _ = writer
1930 .insert_vertex_with_labels(vid, props, &labels, tx_l0)
1931 .await?;
1932 }
1933
1934 if let Some(Value::Map(node_map)) = row.get_mut(var_name) {
1936 for prop_name in prop_names {
1937 node_map.insert(prop_name.clone(), Value::Null);
1938 }
1939 node_map.insert("_all_props".to_string(), Value::Map(effective));
1941 }
1942 } else if let Value::Map(map) = node_val {
1943 let mut edge_effective: Option<HashMap<String, Value>> = None;
1946 if map.get("_eid").is_some_and(|v| !v.is_null()) {
1947 let ei = self.extract_edge_identity(map)?;
1948 let mut props = prop_manager
1949 .get_all_edge_props_with_ctx(ei.eid, ctx)
1950 .await?
1951 .unwrap_or_default();
1952
1953 let removed_count = prop_names
1954 .iter()
1955 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
1956 .count();
1957 let any_exist = removed_count > 0;
1958 if any_exist {
1959 writer.track_properties_removed(removed_count, tx_l0);
1960 for prop_name in prop_names {
1961 props.insert(prop_name.to_string(), Value::Null);
1962 }
1963 }
1964 edge_effective = Some(
1966 props
1967 .iter()
1968 .filter(|(_, v)| !v.is_null())
1969 .map(|(k, v)| (k.clone(), v.clone()))
1970 .collect(),
1971 );
1972 if any_exist {
1973 let edge_type_name = map
1974 .get("_type")
1975 .and_then(|v| v.as_str())
1976 .map(|s| s.to_string())
1977 .or_else(|| {
1978 self.storage
1979 .schema_manager()
1980 .edge_type_name_by_id_unified(ei.edge_type_id)
1981 });
1982 writer
1983 .insert_edge(
1984 ei.src,
1985 ei.dst,
1986 ei.edge_type_id,
1987 ei.eid,
1988 props,
1989 edge_type_name,
1990 tx_l0,
1991 )
1992 .await?;
1993 }
1994 }
1995
1996 if let Some(Value::Map(edge_map)) = row.get_mut(var_name) {
1997 for prop_name in prop_names {
1998 edge_map.insert(prop_name.clone(), Value::Null);
1999 }
2000 if let Some(effective) = edge_effective {
2001 edge_map.insert("_all_props".to_string(), Value::Map(effective));
2002 }
2003 }
2004 } else if let Value::Edge(edge) = node_val {
2005 let eid = edge.eid;
2007 let src = edge.src;
2008 let dst = edge.dst;
2009 let etype = self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()))?;
2010
2011 let mut props = prop_manager
2012 .get_all_edge_props_with_ctx(eid, ctx)
2013 .await?
2014 .unwrap_or_default();
2015
2016 let removed_count = prop_names
2017 .iter()
2018 .filter(|p| props.get(*p).is_some_and(|v| !v.is_null()))
2019 .count();
2020 if removed_count > 0 {
2021 writer.track_properties_removed(removed_count, tx_l0);
2022 for prop_name in prop_names {
2023 props.insert(prop_name.to_string(), Value::Null);
2024 }
2025 writer
2026 .insert_edge(
2027 src,
2028 dst,
2029 etype,
2030 eid,
2031 props,
2032 Some(edge.edge_type.clone()),
2033 tx_l0,
2034 )
2035 .await?;
2036 }
2037
2038 if let Some(Value::Edge(edge)) = row.get_mut(var_name) {
2039 for prop_name in prop_names {
2040 edge.properties.insert(prop_name.to_string(), Value::Null);
2041 }
2042 }
2043 }
2044 }
2045
2046 Ok(())
2047 }
2048
2049 pub(crate) fn execute_remove_labels(
2051 &self,
2052 variable: &str,
2053 labels: &[String],
2054 row: &mut HashMap<String, Value>,
2055 ctx: Option<&QueryContext>,
2056 ) -> Result<()> {
2057 if let Some(node_val) = row.get(variable)
2058 && let Ok(vid) = Self::vid_from_value(node_val)
2059 {
2060 let current_labels = Self::extract_labels_from_node(node_val).unwrap_or_default();
2062
2063 let labels_to_remove: Vec<_> = labels
2065 .iter()
2066 .filter(|l| current_labels.contains(l))
2067 .collect();
2068
2069 if !labels_to_remove.is_empty() {
2070 if let Some(ctx) = ctx {
2072 let mut l0 = ctx.l0.write();
2073 for label in &labels_to_remove {
2074 l0.remove_vertex_label(vid, label);
2075 }
2076 }
2077
2078 if let Some(Value::Map(obj)) = row.get_mut(variable) {
2080 let remaining_labels: Vec<_> = current_labels
2081 .iter()
2082 .filter(|l| !labels_to_remove.contains(l))
2083 .cloned()
2084 .collect();
2085 let labels_list = remaining_labels.into_iter().map(Value::String).collect();
2086 obj.insert("_labels".to_string(), Value::List(labels_list));
2087 }
2088 }
2089 }
2090 Ok(())
2091 }
2092
2093 fn resolve_edge_type_id_for_edge(
2096 &self,
2097 edge: &crate::types::Edge,
2098 writer: &Writer,
2099 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2100 ) -> Result<u32> {
2101 if !edge.edge_type.is_empty() {
2102 return self.resolve_edge_type_id(&Value::String(edge.edge_type.clone()));
2103 }
2104 if let Some(etype) = writer.get_edge_type_id_from_l0(edge.eid, tx_l0) {
2107 return Ok(etype);
2108 }
2109 Err(anyhow!(
2110 "Cannot determine edge type for edge {:?} — edge type name is empty and not found in L0",
2111 edge.eid
2112 ))
2113 }
2114
2115 pub(crate) async fn execute_delete_item_locked(
2117 &self,
2118 val: &Value,
2119 detach: bool,
2120 writer: &mut Writer,
2121 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2122 ) -> Result<()> {
2123 match val {
2124 Value::Null => {
2125 }
2127 Value::Path(path) => {
2128 for edge in &path.edges {
2130 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2131 writer
2132 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2133 .await?;
2134 }
2135 for node in &path.nodes {
2136 self.execute_delete_vertex(
2137 node.vid,
2138 detach,
2139 Some(node.labels.clone()),
2140 writer,
2141 tx_l0,
2142 )
2143 .await?;
2144 }
2145 }
2146 _ => {
2147 if let Ok(path) = Path::try_from(val) {
2149 for edge in &path.edges {
2150 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2151 writer
2152 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2153 .await?;
2154 }
2155 for node in &path.nodes {
2156 self.execute_delete_vertex(
2157 node.vid,
2158 detach,
2159 Some(node.labels.clone()),
2160 writer,
2161 tx_l0,
2162 )
2163 .await?;
2164 }
2165 } else if let Ok(vid) = Self::vid_from_value(val) {
2166 let labels = Self::extract_labels_from_node(val);
2167 self.execute_delete_vertex(vid, detach, labels, writer, tx_l0)
2168 .await?;
2169 } else if let Value::Map(map) = val {
2170 self.execute_delete_edge_from_map(map, writer, tx_l0)
2171 .await?;
2172 } else if let Value::Edge(edge) = val {
2173 let etype = self.resolve_edge_type_id_for_edge(edge, writer, tx_l0)?;
2174 writer
2175 .delete_edge(edge.eid, edge.src, edge.dst, etype, tx_l0)
2176 .await?;
2177 }
2178 }
2179 }
2180 Ok(())
2181 }
2182
2183 pub(crate) async fn execute_delete_vertex(
2185 &self,
2186 vid: Vid,
2187 detach: bool,
2188 labels: Option<Vec<String>>,
2189 writer: &mut Writer,
2190 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2191 ) -> Result<()> {
2192 if detach {
2193 self.detach_delete_vertex(vid, writer, tx_l0).await?;
2194 } else {
2195 self.check_vertex_has_no_edges(vid, writer, tx_l0).await?;
2196 }
2197 writer.delete_vertex(vid, labels, tx_l0).await?;
2198 Ok(())
2199 }
2200
2201 pub(crate) async fn check_vertex_has_no_edges(
2207 &self,
2208 vid: Vid,
2209 writer: &Writer,
2210 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2211 ) -> Result<()> {
2212 let schema = self.storage.schema_manager().schema();
2213 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
2214
2215 let mut tombstoned_eids = std::collections::HashSet::new();
2217 {
2218 let writer_l0 = writer.l0_manager.get_current();
2219 let guard = writer_l0.read();
2220 for &eid in guard.tombstones.keys() {
2221 tombstoned_eids.insert(eid);
2222 }
2223 }
2224 if let Some(tx) = tx_l0 {
2225 let guard = tx.read();
2226 for &eid in guard.tombstones.keys() {
2227 tombstoned_eids.insert(eid);
2228 }
2229 }
2230
2231 let out_graph = self
2232 .storage
2233 .load_subgraph_cached(
2234 &[vid],
2235 &edge_type_ids,
2236 1,
2237 uni_store::runtime::Direction::Outgoing,
2238 Some(writer.l0_manager.get_current()),
2239 )
2240 .await?;
2241 let has_out = out_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2242
2243 let in_graph = self
2244 .storage
2245 .load_subgraph_cached(
2246 &[vid],
2247 &edge_type_ids,
2248 1,
2249 uni_store::runtime::Direction::Incoming,
2250 Some(writer.l0_manager.get_current()),
2251 )
2252 .await?;
2253 let has_in = in_graph.edges().any(|e| !tombstoned_eids.contains(&e.eid));
2254
2255 if has_out || has_in {
2256 return Err(anyhow!(
2257 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
2258 vid
2259 ));
2260 }
2261 Ok(())
2262 }
2263
2264 pub(crate) async fn execute_delete_edge_from_map(
2266 &self,
2267 map: &HashMap<String, Value>,
2268 writer: &mut Writer,
2269 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2270 ) -> Result<()> {
2271 if map.get("_eid").is_some_and(|v| !v.is_null()) {
2273 let ei = self.extract_edge_identity(map)?;
2274 writer
2275 .delete_edge(ei.eid, ei.src, ei.dst, ei.edge_type_id, tx_l0)
2276 .await?;
2277 }
2278 Ok(())
2279 }
2280
2281 fn make_scan_plan(
2287 label_id: u16,
2288 labels: Vec<String>,
2289 variable: String,
2290 filter: Option<Expr>,
2291 ) -> LogicalPlan {
2292 if label_id > 0 {
2293 LogicalPlan::Scan {
2294 label_id,
2295 labels,
2296 variable,
2297 filter,
2298 optional: false,
2299 }
2300 } else if !labels.is_empty() {
2301 LogicalPlan::ScanMainByLabels {
2303 labels,
2304 variable,
2305 filter,
2306 optional: false,
2307 }
2308 } else {
2309 LogicalPlan::ScanAll {
2310 variable,
2311 filter,
2312 optional: false,
2313 }
2314 }
2315 }
2316
2317 fn attach_scan(plan: LogicalPlan, scan: LogicalPlan) -> LogicalPlan {
2320 if matches!(plan, LogicalPlan::Empty) {
2321 scan
2322 } else {
2323 LogicalPlan::CrossJoin {
2324 left: Box::new(plan),
2325 right: Box::new(scan),
2326 }
2327 }
2328 }
2329
2330 async fn resolve_merge_properties(
2337 &self,
2338 properties: &Option<Expr>,
2339 row: &HashMap<String, Value>,
2340 prop_manager: &PropertyManager,
2341 params: &HashMap<String, Value>,
2342 ctx: Option<&QueryContext>,
2343 ) -> Result<Option<Expr>> {
2344 let entries = match properties {
2345 Some(Expr::Map(entries)) => entries,
2346 other => return Ok(other.clone()),
2347 };
2348 let mut resolved = Vec::new();
2349 for (key, val_expr) in entries {
2350 if matches!(val_expr, Expr::Literal(_)) {
2351 resolved.push((key.clone(), val_expr.clone()));
2352 } else {
2353 let value = self
2354 .evaluate_expr(val_expr, row, prop_manager, params, ctx)
2355 .await?;
2356 resolved.push((key.clone(), Self::value_to_literal_expr(&value)));
2357 }
2358 }
2359 Ok(Some(Expr::Map(resolved)))
2360 }
2361
2362 fn value_to_literal_expr(value: &Value) -> Expr {
2364 match value {
2365 Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
2366 Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
2367 Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
2368 Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
2369 Value::Null => Expr::Literal(CypherLiteral::Null),
2370 Value::List(items) => {
2371 Expr::List(items.iter().map(Self::value_to_literal_expr).collect())
2372 }
2373 Value::Map(entries) => Expr::Map(
2374 entries
2375 .iter()
2376 .map(|(k, v)| (k.clone(), Self::value_to_literal_expr(v)))
2377 .collect(),
2378 ),
2379 _ => Expr::Literal(CypherLiteral::Null),
2380 }
2381 }
2382
2383 pub(crate) async fn execute_merge_match(
2384 &self,
2385 pattern: &Pattern,
2386 row: &HashMap<String, Value>,
2387 prop_manager: &PropertyManager,
2388 params: &HashMap<String, Value>,
2389 ctx: Option<&QueryContext>,
2390 ) -> Result<Vec<HashMap<String, Value>>> {
2391 let planner =
2393 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
2394
2395 let mut plan = LogicalPlan::Empty;
2400 let mut vars_in_scope = Vec::new();
2401
2402 for key in row.keys() {
2404 vars_in_scope.push(key.clone());
2405 }
2406
2407 for path in &pattern.paths {
2409 let elements = &path.elements;
2410 let mut i = 0;
2411 while i < elements.len() {
2412 let part = &elements[i];
2413 match part {
2414 PatternElement::Node(n) => {
2415 let variable = n.variable.clone().unwrap_or_default();
2416
2417 let is_bound = !variable.is_empty() && row.contains_key(&variable);
2419
2420 if is_bound {
2421 let val = row.get(&variable).unwrap();
2424 let vid = Self::vid_from_value(val)?;
2425
2426 let extracted_labels =
2429 Self::extract_labels_from_node(val).unwrap_or_default();
2430 let label_id = {
2431 let schema = self.storage.schema_manager().schema();
2432 extracted_labels
2433 .first()
2434 .and_then(|l| schema.label_id_by_name(l))
2435 .unwrap_or(0)
2436 };
2437
2438 let resolved_props = self
2439 .resolve_merge_properties(
2440 &n.properties,
2441 row,
2442 prop_manager,
2443 params,
2444 ctx,
2445 )
2446 .await?;
2447 let prop_filter =
2448 planner.properties_to_expr(&variable, &resolved_props);
2449
2450 let vid_filter = Expr::BinaryOp {
2459 left: Box::new(Expr::Property(
2460 Box::new(Expr::Variable(variable.clone())),
2461 "_vid".to_string(),
2462 )),
2463 op: BinaryOp::Eq,
2464 right: Box::new(Expr::Literal(CypherLiteral::Integer(
2465 vid.as_u64() as i64,
2466 ))),
2467 };
2468
2469 let combined_filter = if let Some(pf) = prop_filter {
2470 Some(Expr::BinaryOp {
2471 left: Box::new(vid_filter),
2472 op: BinaryOp::And,
2473 right: Box::new(pf),
2474 })
2475 } else {
2476 Some(vid_filter)
2477 };
2478
2479 let scan = Self::make_scan_plan(
2480 label_id,
2481 extracted_labels,
2482 variable.clone(),
2483 combined_filter,
2484 );
2485 plan = Self::attach_scan(plan, scan);
2486 } else {
2487 let label_id = if n.labels.is_empty() {
2488 0
2490 } else {
2491 let label_name = &n.labels[0];
2492 let schema = self.storage.schema_manager().schema();
2493 schema
2496 .get_label_case_insensitive(label_name)
2497 .map(|m| m.id)
2498 .unwrap_or(0)
2499 };
2500
2501 let resolved_props = self
2502 .resolve_merge_properties(
2503 &n.properties,
2504 row,
2505 prop_manager,
2506 params,
2507 ctx,
2508 )
2509 .await?;
2510 let prop_filter =
2511 planner.properties_to_expr(&variable, &resolved_props);
2512 let scan = Self::make_scan_plan(
2513 label_id,
2514 n.labels.clone(),
2515 variable.clone(),
2516 prop_filter,
2517 );
2518 plan = Self::attach_scan(plan, scan);
2519
2520 if !n.labels.is_empty()
2527 && !variable.is_empty()
2528 && (label_id == 0 || n.labels.len() > 1)
2529 && let Some(label_filter) =
2530 planner.node_filter_expr(&variable, &n.labels, &None)
2531 {
2532 plan = LogicalPlan::Filter {
2533 input: Box::new(plan),
2534 predicate: label_filter,
2535 optional_variables: std::collections::HashSet::new(),
2536 };
2537 }
2538
2539 if !variable.is_empty() {
2540 vars_in_scope.push(variable.clone());
2541 }
2542 }
2543
2544 i += 1;
2546 while i < elements.len() {
2547 if let PatternElement::Relationship(r) = &elements[i] {
2548 let target_node_part = &elements[i + 1];
2549 if let PatternElement::Node(n_target) = target_node_part {
2550 let schema = self.storage.schema_manager().schema();
2551 let mut edge_type_ids = Vec::new();
2552
2553 if r.types.is_empty() {
2554 return Err(anyhow!("MERGE edge must have a type"));
2555 } else if r.types.len() > 1 {
2556 return Err(anyhow!(
2557 "MERGE does not support multiple edge types"
2558 ));
2559 } else {
2560 let type_name = &r.types[0];
2561 let type_id = self
2564 .storage
2565 .schema_manager()
2566 .get_or_assign_edge_type_id(type_name);
2567 edge_type_ids.push(type_id);
2568 }
2569
2570 let target_label_id: u16 = if let Some(lbl) =
2573 n_target.labels.first()
2574 {
2575 schema
2576 .get_label_case_insensitive(lbl)
2577 .map(|m| m.id)
2578 .unwrap_or(0)
2579 } else if let Some(var) = &n_target.variable {
2580 if let Some(val) = row.get(var) {
2581 if let Some(labels) =
2583 Self::extract_labels_from_node(val)
2584 {
2585 if let Some(first_label) = labels.first() {
2586 schema
2587 .get_label_case_insensitive(first_label)
2588 .map(|m| m.id)
2589 .unwrap_or(0)
2590 } else {
2591 0
2593 }
2594 } else if Self::vid_from_value(val).is_ok() {
2595 0
2597 } else {
2598 return Err(anyhow!(
2599 "Variable {} is not a node",
2600 var
2601 ));
2602 }
2603 } else {
2604 return Err(anyhow!(
2605 "MERGE pattern node must have a label or be a bound variable"
2606 ));
2607 }
2608 } else {
2609 return Err(anyhow!(
2610 "MERGE pattern node must have a label"
2611 ));
2612 };
2613
2614 let target_variable =
2615 n_target.variable.clone().unwrap_or_default();
2616 let source_variable = match &elements[i - 1] {
2617 PatternElement::Node(n) => {
2618 n.variable.clone().unwrap_or_default()
2619 }
2620 _ => String::new(),
2621 };
2622
2623 let is_variable_length = r.range.is_some();
2624 let type_name = &r.types[0];
2625
2626 let is_schemaless = edge_type_ids.iter().all(|id| {
2632 uni_common::core::edge_type::is_schemaless_edge_type(*id)
2633 });
2634
2635 if is_schemaless {
2636 plan = LogicalPlan::TraverseMainByType {
2637 type_names: vec![type_name.clone()],
2638 input: Box::new(plan),
2639 direction: r.direction.clone(),
2640 source_variable,
2641 target_variable: target_variable.clone(),
2642 step_variable: r.variable.clone(),
2643 min_hops: r
2644 .range
2645 .as_ref()
2646 .and_then(|r| r.min)
2647 .unwrap_or(1)
2648 as usize,
2649 max_hops: r
2650 .range
2651 .as_ref()
2652 .and_then(|r| r.max)
2653 .unwrap_or(1)
2654 as usize,
2655 optional: false,
2656 target_filter: None,
2657 path_variable: None,
2658 is_variable_length,
2659 optional_pattern_vars: std::collections::HashSet::new(),
2660 scope_match_variables: std::collections::HashSet::new(),
2661 edge_filter_expr: None,
2662 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2663 };
2664 } else {
2665 let mut edge_props = std::collections::HashSet::new();
2667 if let Some(Expr::Map(entries)) = &r.properties {
2668 for (key, _) in entries {
2669 edge_props.insert(key.clone());
2670 }
2671 }
2672 plan = LogicalPlan::Traverse {
2673 input: Box::new(plan),
2674 edge_type_ids: edge_type_ids.clone(),
2675 direction: r.direction.clone(),
2676 source_variable,
2677 target_variable: target_variable.clone(),
2678 target_label_id,
2679 step_variable: r.variable.clone(),
2680 min_hops: r
2681 .range
2682 .as_ref()
2683 .and_then(|r| r.min)
2684 .unwrap_or(1)
2685 as usize,
2686 max_hops: r
2687 .range
2688 .as_ref()
2689 .and_then(|r| r.max)
2690 .unwrap_or(1)
2691 as usize,
2692 optional: false,
2693 target_filter: None,
2694 path_variable: None,
2695 edge_properties: edge_props,
2696 is_variable_length,
2697 optional_pattern_vars: std::collections::HashSet::new(),
2698 scope_match_variables: std::collections::HashSet::new(),
2699 edge_filter_expr: None,
2700 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
2701 qpp_steps: None,
2702 };
2703 }
2704
2705 if r.properties.is_some()
2707 && let Some(r_var) = &r.variable
2708 {
2709 let resolved_rel_props = self
2710 .resolve_merge_properties(
2711 &r.properties,
2712 row,
2713 prop_manager,
2714 params,
2715 ctx,
2716 )
2717 .await?;
2718 if let Some(prop_filter) =
2719 planner.properties_to_expr(r_var, &resolved_rel_props)
2720 {
2721 plan = LogicalPlan::Filter {
2722 input: Box::new(plan),
2723 predicate: prop_filter,
2724 optional_variables: std::collections::HashSet::new(
2725 ),
2726 };
2727 }
2728 }
2729
2730 if !target_variable.is_empty() {
2732 let resolved_target_props = self
2733 .resolve_merge_properties(
2734 &n_target.properties,
2735 row,
2736 prop_manager,
2737 params,
2738 ctx,
2739 )
2740 .await?;
2741 if let Some(prop_filter) = planner.properties_to_expr(
2742 &target_variable,
2743 &resolved_target_props,
2744 ) {
2745 plan = LogicalPlan::Filter {
2746 input: Box::new(plan),
2747 predicate: prop_filter,
2748 optional_variables: std::collections::HashSet::new(
2749 ),
2750 };
2751 }
2752 vars_in_scope.push(target_variable.clone());
2753 }
2754
2755 if let Some(sv) = &r.variable {
2756 vars_in_scope.push(sv.clone());
2757 }
2758 i += 2;
2759 } else {
2760 break;
2761 }
2762 } else {
2763 break;
2764 }
2765 }
2766 }
2767 _ => return Err(anyhow!("Pattern must start with a node")),
2768 }
2769 }
2770
2771 }
2773
2774 let db_matches = self
2775 .execute_merge_read_plan(plan, prop_manager, params, vars_in_scope.clone())
2776 .await?;
2777
2778 let final_matches = db_matches
2785 .into_iter()
2786 .filter(|db_match| {
2787 row.iter().all(|(key, val)| {
2788 if key.is_empty() || key.starts_with("__") {
2789 return true;
2790 }
2791 let Some(db_val) = db_match.get(key) else {
2792 return true;
2793 };
2794 if db_val == val {
2795 return true;
2796 }
2797 matches!(
2799 (Self::vid_from_value(val), Self::vid_from_value(db_val)),
2800 (Ok(v1), Ok(v2)) if v1 == v2
2801 )
2802 })
2803 })
2804 .map(|db_match| {
2805 let mut merged = row.clone();
2806 merged.extend(db_match);
2807 merged
2808 })
2809 .collect();
2810
2811 Ok(final_matches)
2812 }
2813
2814 fn prepare_pattern_for_path_binding(pattern: &Pattern) -> (Pattern, Vec<String>) {
2822 let has_path_vars = pattern
2823 .paths
2824 .iter()
2825 .any(|p| p.variable.as_ref().is_some_and(|v| !v.is_empty()));
2826
2827 if !has_path_vars {
2828 return (pattern.clone(), Vec::new());
2829 }
2830
2831 let mut modified = pattern.clone();
2832 let mut temp_vars = Vec::new();
2833
2834 for path in &mut modified.paths {
2835 if path.variable.as_ref().is_none_or(|v| v.is_empty()) {
2836 continue;
2837 }
2838 for (idx, element) in path.elements.iter_mut().enumerate() {
2839 if let PatternElement::Relationship(r) = element
2840 && r.variable.as_ref().is_none_or(String::is_empty)
2841 {
2842 let temp_var = format!("__path_r_{}", idx);
2843 r.variable = Some(temp_var.clone());
2844 temp_vars.push(temp_var);
2845 }
2846 }
2847 }
2848
2849 (modified, temp_vars)
2850 }
2851
2852 fn bind_path_variables(
2857 pattern: &Pattern,
2858 row: &mut HashMap<String, Value>,
2859 temp_vars: &[String],
2860 ) {
2861 for path in &pattern.paths {
2862 let Some(path_var) = path.variable.as_ref() else {
2863 continue;
2864 };
2865 if path_var.is_empty() {
2866 continue;
2867 }
2868
2869 let mut nodes = Vec::new();
2870 let mut edges = Vec::new();
2871
2872 for element in &path.elements {
2873 match element {
2874 PatternElement::Node(n) => {
2875 if let Some(var) = &n.variable
2876 && let Some(val) = row.get(var)
2877 && let Some(node) = Self::value_to_node_for_path(val)
2878 {
2879 nodes.push(node);
2880 }
2881 }
2882 PatternElement::Relationship(r) => {
2883 if let Some(var) = &r.variable
2884 && let Some(val) = row.get(var)
2885 && let Some(edge) = Self::value_to_edge_for_path(val, &r.types)
2886 {
2887 edges.push(edge);
2888 }
2889 }
2890 _ => {}
2891 }
2892 }
2893
2894 if !nodes.is_empty() {
2895 use uni_common::value::Path;
2896 row.insert(path_var.clone(), Value::Path(Path { nodes, edges }));
2897 }
2898 }
2899
2900 for var in temp_vars {
2902 row.remove(var);
2903 }
2904 }
2905
2906 fn value_to_node_for_path(val: &Value) -> Option<uni_common::value::Node> {
2908 match val {
2909 Value::Node(n) => Some(n.clone()),
2910 Value::Map(map) => {
2911 let vid = map.get("_vid").and_then(|v| v.as_u64()).map(Vid::new)?;
2912 let labels = if let Some(Value::List(l)) = map.get("_labels") {
2913 l.iter()
2914 .filter_map(|v| {
2915 if let Value::String(s) = v {
2916 Some(s.clone())
2917 } else {
2918 None
2919 }
2920 })
2921 .collect()
2922 } else {
2923 vec![]
2924 };
2925 let properties: HashMap<String, Value> = map
2926 .iter()
2927 .filter(|(k, _)| !k.starts_with('_'))
2928 .map(|(k, v)| (k.clone(), v.clone()))
2929 .collect();
2930 Some(uni_common::value::Node {
2931 vid,
2932 labels,
2933 properties,
2934 })
2935 }
2936 _ => None,
2937 }
2938 }
2939
2940 fn value_to_edge_for_path(
2942 val: &Value,
2943 type_names: &[String],
2944 ) -> Option<uni_common::value::Edge> {
2945 match val {
2946 Value::Edge(e) => Some(e.clone()),
2947 Value::Map(map) => {
2948 let eid = map.get("_eid").and_then(|v| v.as_u64()).map(Eid::new)?;
2949 let edge_type = map
2950 .get("_type_name")
2951 .and_then(|v| {
2952 if let Value::String(s) = v {
2953 Some(s.clone())
2954 } else {
2955 None
2956 }
2957 })
2958 .or_else(|| type_names.first().cloned())
2959 .unwrap_or_default();
2960 let src = map.get("_src").and_then(|v| v.as_u64()).map(Vid::new)?;
2961 let dst = map.get("_dst").and_then(|v| v.as_u64()).map(Vid::new)?;
2962 let properties: HashMap<String, Value> = map
2963 .iter()
2964 .filter(|(k, _)| !k.starts_with('_'))
2965 .map(|(k, v)| (k.clone(), v.clone()))
2966 .collect();
2967 Some(uni_common::value::Edge {
2968 eid,
2969 edge_type,
2970 src,
2971 dst,
2972 properties,
2973 })
2974 }
2975 _ => None,
2976 }
2977 }
2978}