1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73fn collect_l0_vids(
78 ctx: Option<&QueryContext>,
79 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81 let mut vids = Vec::new();
82 if let Some(ctx) = ctx {
83 vids.extend(extractor(&ctx.l0.read()));
84 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85 vids.extend(extractor(&tx_l0_arc.read()));
86 }
87 for pending_l0_arc in &ctx.pending_flush_l0s {
88 vids.extend(extractor(&pending_l0_arc.read()));
89 }
90 }
91 vids
92}
93
94async fn hydrate_entity_if_needed(
103 map: &mut HashMap<String, Value>,
104 prop_manager: &PropertyManager,
105 ctx: Option<&QueryContext>,
106) {
107 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110 tracing::debug!(
111 "Pushdown fallback: hydrating edge {} at execution time",
112 eid_u64
113 );
114 if let Ok(Some(props)) = prop_manager
115 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116 .await
117 {
118 for (key, value) in props {
119 map.entry(key).or_insert(value);
120 }
121 }
122 } else {
123 tracing::trace!(
124 "Pushdown success: edge {} already has {} properties",
125 eid_u64,
126 map.len() - EDGE_SYSTEM_FIELD_COUNT
127 );
128 }
129 return;
130 }
131
132 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135 tracing::debug!(
136 "Pushdown fallback: hydrating vertex {} at execution time",
137 vid_u64
138 );
139 if let Ok(Some(props)) = prop_manager
140 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141 .await
142 {
143 for (key, value) in props {
144 map.entry(key).or_insert(value);
145 }
146 }
147 } else {
148 tracing::trace!(
149 "Pushdown success: vertex {} already has {} properties",
150 vid_u64,
151 map.len() - VERTEX_SYSTEM_FIELD_COUNT
152 );
153 }
154 }
155}
156
157fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163 let prefix = format!("{}.", var);
164 let mut map = HashMap::new();
165
166 let dotted_keys: Vec<String> = row
167 .keys()
168 .filter(|k| k.starts_with(&prefix))
169 .cloned()
170 .collect();
171
172 for key in &dotted_keys {
173 let prop_name = &key[prefix.len()..];
174 if let Some(val) = row.remove(key) {
175 map.insert(prop_name.to_string(), val);
176 }
177 }
178
179 row.insert(var.to_string(), Value::Map(map));
181}
182
183fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193 let vid_key = format!("{}._vid", var);
195 let labels_key = format!("{}._labels", var);
196
197 let vid_val = row.remove(&vid_key);
198 let labels_val = row.remove(&labels_key);
199
200 if let Some(Value::Map(map)) = row.get_mut(var) {
201 if let Some(v) = vid_val {
202 map.insert("_vid".to_string(), v);
203 }
204 if let Some(v) = labels_val {
205 map.insert("_labels".to_string(), v);
206 }
207 }
208
209 let eid_key = format!("{}._eid", var);
214 let type_key = format!("{}._type", var);
215
216 let eid_val = row.remove(&eid_key);
217 let type_val = row.remove(&type_key);
218
219 if (eid_val.is_some() || type_val.is_some())
220 && let Some(Value::Map(map)) = row.get_mut(var)
221 {
222 if let Some(v) = eid_val {
223 map.entry("_eid".to_string()).or_insert(v);
224 }
225 if let Some(v) = type_val {
226 map.entry("_type".to_string()).or_insert(v);
227 }
228 }
229
230 let prefix = format!("{}.", var);
232 let dotted_keys: Vec<String> = row
233 .keys()
234 .filter(|k| k.starts_with(&prefix))
235 .cloned()
236 .collect();
237 for key in dotted_keys {
238 let prop_name = key[prefix.len()..].to_string();
239 let val = row.remove(&key);
240 if prop_name.starts_with('_') || prop_name == "overflow_json" {
241 continue;
242 }
243 if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244 map.insert(prop_name, val);
245 }
246 }
247}
248
249impl Executor {
250 async fn verify_and_filter_candidates(
255 &self,
256 mut candidates: Vec<Vid>,
257 variable: &str,
258 filter: Option<&Expr>,
259 ctx: Option<&QueryContext>,
260 prop_manager: &PropertyManager,
261 params: &HashMap<String, Value>,
262 ) -> Result<Vec<Vid>> {
263 candidates.sort_unstable();
264 candidates.dedup();
265
266 let mut verified_vids = Vec::new();
267 for vid in candidates {
268 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269 continue; };
271
272 if let Some(expr) = filter {
273 let mut props_map: HashMap<String, Value> = props;
274 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276 let mut row = HashMap::new();
277 row.insert(variable.to_string(), Value::Map(props_map));
278
279 let res = self
280 .evaluate_expr(expr, &row, prop_manager, params, ctx)
281 .await?;
282 if res.as_bool().unwrap_or(false) {
283 verified_vids.push(vid);
284 }
285 } else {
286 verified_vids.push(vid);
287 }
288 }
289
290 Ok(verified_vids)
291 }
292
293 pub(crate) async fn scan_storage_candidates(
294 &self,
295 label_id: u16,
296 variable: &str,
297 filter: Option<&Expr>,
298 ) -> Result<Vec<Vid>> {
299 let schema = self.storage.schema_manager().schema();
300 let label_name = schema
301 .label_name_by_id(label_id)
302 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304 let empty_props = std::collections::HashMap::new();
306 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307 let filter_sql = filter.and_then(|expr| {
308 LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309 });
310
311 self.storage
312 .scan_vertex_candidates(label_name, filter_sql.as_deref())
313 .await
314 }
315
316 pub(crate) async fn scan_label_with_filter(
317 &self,
318 label_id: u16,
319 variable: &str,
320 filter: Option<&Expr>,
321 ctx: Option<&QueryContext>,
322 prop_manager: &PropertyManager,
323 params: &HashMap<String, Value>,
324 ) -> Result<Vec<Vid>> {
325 let mut candidates = self
326 .scan_storage_candidates(label_id, variable, filter)
327 .await?;
328
329 let schema = self.storage.schema_manager().schema();
331 if let Some(label_name) = schema.label_name_by_id(label_id) {
332 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333 }
334
335 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336 .await
337 }
338
339 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340 if let Value::Node(node) = val {
342 return Ok(node.vid);
343 }
344 if let Value::Map(map) = val
346 && let Some(vid_val) = map.get("_vid")
347 && let Some(v) = vid_val.as_u64()
348 {
349 return Ok(Vid::from(v));
350 }
351 if let Some(s) = val.as_str()
353 && let Ok(id) = s.parse::<u64>()
354 {
355 return Ok(Vid::new(id));
356 }
357 if let Some(v) = val.as_u64() {
359 return Ok(Vid::from(v));
360 }
361 Err(anyhow!("Invalid Vid format: {:?}", val))
362 }
363
364 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370 for val in row.values() {
371 if let Ok(vid) = Self::vid_from_value(val)
372 && vid == target_vid
373 {
374 return val.clone();
375 }
376 }
377 Value::Map(HashMap::from([(
379 "_vid".to_string(),
380 Value::Int(target_vid.as_u64() as i64),
381 )]))
382 }
383
384 pub async fn create_datafusion_planner(
389 &self,
390 prop_manager: &PropertyManager,
391 params: &HashMap<String, Value>,
392 ) -> Result<(
393 Arc<SyncRwLock<SessionContext>>,
394 HybridPhysicalPlanner,
395 Arc<PropertyManager>,
396 )> {
397 let query_ctx = self.get_context().await;
398 let l0_context = match query_ctx {
399 Some(ref ctx) => L0Context::from_query_context(ctx),
400 None => L0Context::empty(),
401 };
402
403 let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
409 shared.clone()
410 } else {
411 Arc::new(PropertyManager::new(
412 self.storage.clone(),
413 self.storage.schema_manager_arc(),
414 prop_manager.cache_size(),
415 ))
416 };
417
418 let has_custom_udfs = self
437 .custom_function_registry
438 .as_ref()
439 .is_some_and(|r| !r.is_empty());
440 let host_plugin_registry = self
441 .procedure_registry
442 .as_ref()
443 .and_then(|pr| pr.plugin_registry());
444 let optimizer_providers = host_plugin_registry
448 .as_ref()
449 .map(|pr| pr.optimizer_rules())
450 .unwrap_or_default();
451 let session_local_registry =
452 crate::query::df_udfs_plugin::current_session_plugin_registry();
453 let needs_dynamic_registration = has_custom_udfs
454 || !optimizer_providers.is_empty()
455 || host_plugin_registry.is_some()
456 || session_local_registry.is_some();
457
458 let session = if let (Some(tmpl), false) = (
459 self.df_session_template.as_ref(),
460 needs_dynamic_registration,
461 ) {
462 (**tmpl).clone()
464 } else {
465 let session = if optimizer_providers.is_empty() {
473 SessionContext::new()
474 } else {
475 use datafusion::execution::session_state::SessionStateBuilder;
476 use uni_plugin::traits::operator::OptimizerPhase;
477 let mut builder = SessionStateBuilder::new().with_default_features();
478 for provider in optimizer_providers.iter() {
479 match provider.phase() {
480 OptimizerPhase::Logical => {
481 builder = builder.with_optimizer_rule(provider.rule());
482 }
483 OptimizerPhase::Physical => {
484 if let Some(rule) = provider.physical_rule() {
485 builder = builder.with_physical_optimizer_rule(rule);
486 } else {
487 tracing::debug!(
488 target: "uni.plugin.registry",
489 "physical-phase provider returned no physical_rule(); skipping"
490 );
491 }
492 }
493 OptimizerPhase::Both => {
494 builder = builder.with_optimizer_rule(provider.rule());
495 if let Some(rule) = provider.physical_rule() {
496 builder = builder.with_physical_optimizer_rule(rule);
497 }
498 }
499 _ => {
500 tracing::debug!(
501 target: "uni.plugin.registry",
502 "skipping optimizer rule for unknown phase"
503 );
504 }
505 }
506 }
507 let state = builder.build();
508 SessionContext::new_with_state(state)
509 };
510 crate::query::df_udfs::register_cypher_udfs(&session)?;
511 if let Some(ref registry) = self.custom_function_registry {
515 crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
516 &session, registry,
517 )?;
518 }
519 if let Some(ref host_pr) = host_plugin_registry {
530 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
531 }
532 if let Some(ref session_local) = session_local_registry {
538 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
539 }
540 session
541 };
542 let session_ctx = Arc::new(SyncRwLock::new(session));
543
544 let mut planner = HybridPhysicalPlanner::with_l0_context(
545 session_ctx.clone(),
546 self.storage.clone(),
547 l0_context,
548 prop_manager_arc.clone(),
549 self.storage.schema_manager().schema(),
550 params.clone(),
551 HashMap::new(),
552 );
553
554 planner = planner.with_algo_registry(self.algo_registry.clone());
555 if let Some(ref registry) = self.procedure_registry {
556 planner = planner.with_procedure_registry(registry.clone());
557 }
558 let host_plugin_registry = self
567 .procedure_registry
568 .as_ref()
569 .and_then(|r| r.plugin_registry());
570 if let Some(registry) = host_plugin_registry {
571 planner = planner.with_plugin_registry(registry);
572 }
573 if let Some(ref xervo_runtime) = self.xervo_runtime {
574 planner = planner.with_xervo_runtime(xervo_runtime.clone());
575 }
576 if let Some(writer) = &self.writer {
581 planner = planner.with_writer(Arc::clone(writer));
582 }
583
584 Ok((session_ctx, planner, prop_manager_arc))
585 }
586
587 pub fn collect_batches(
589 session_ctx: &Arc<SyncRwLock<SessionContext>>,
590 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
591 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
592 Box::pin(async move {
593 use futures::TryStreamExt;
594
595 let task_ctx = session_ctx.read().task_ctx();
596 let partition_count = execution_plan.output_partitioning().partition_count();
597 let mut all_batches = Vec::new();
598 for partition in 0..partition_count {
599 let stream = execution_plan.execute(partition, task_ctx.clone())?;
600 let batches: Vec<RecordBatch> = stream.try_collect().await?;
601 all_batches.extend(batches);
602 }
603 Ok(all_batches)
604 })
605 }
606
607 pub async fn execute_datafusion(
612 &self,
613 plan: LogicalPlan,
614 prop_manager: &PropertyManager,
615 params: &HashMap<String, Value>,
616 ) -> Result<Vec<RecordBatch>> {
617 let (batches, _plan) = self
618 .execute_datafusion_with_plan(plan, prop_manager, params)
619 .await?;
620 Ok(batches)
621 }
622
623 pub async fn execute_datafusion_with_plan(
630 &self,
631 plan: LogicalPlan,
632 prop_manager: &PropertyManager,
633 params: &HashMap<String, Value>,
634 ) -> Result<(
635 Vec<RecordBatch>,
636 Arc<dyn datafusion::physical_plan::ExecutionPlan>,
637 )> {
638 let (session_ctx, mut planner, prop_manager_arc) =
639 self.create_datafusion_planner(prop_manager, params).await?;
640
641 if Self::contains_write_operations(&plan) {
643 let writer = self
644 .writer
645 .as_ref()
646 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
647 .clone();
648 let query_ctx = self.get_context().await;
649
650 debug_assert!(
651 query_ctx.is_some(),
652 "BUG: query_ctx is None for write operation"
653 );
654
655 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
656 executor: self.clone(),
657 writer,
658 prop_manager: prop_manager_arc,
659 params: params.clone(),
660 query_ctx,
661 tx_l0_override: self.transaction_l0_override.clone(),
662 });
663 planner = planner.with_mutation_context(mutation_ctx);
664 tracing::debug!(
665 plan_type = Self::get_plan_type(&plan),
666 "Mutation routed to DataFusion engine"
667 );
668 }
669
670 let execution_plan = planner.plan(&plan)?;
671 let plan_clone = Arc::clone(&execution_plan);
672 let result = Self::collect_batches(&session_ctx, execution_plan).await;
673
674 let graph_warnings = planner.graph_ctx().take_warnings();
676 if !graph_warnings.is_empty()
677 && let Ok(mut w) = self.warnings.lock()
678 {
679 w.extend(graph_warnings);
680 }
681
682 result.map(|batches| (batches, plan_clone))
683 }
684
685 pub(crate) async fn execute_merge_read_plan(
691 &self,
692 plan: LogicalPlan,
693 prop_manager: &PropertyManager,
694 params: &HashMap<String, Value>,
695 merge_variables: Vec<String>,
696 ) -> Result<Vec<HashMap<String, Value>>> {
697 let (session_ctx, planner, _prop_manager_arc) =
698 self.create_datafusion_planner(prop_manager, params).await?;
699
700 let extra: HashMap<String, HashSet<String>> = merge_variables
703 .iter()
704 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
705 .collect();
706 let execution_plan = planner.plan_with_properties(&plan, extra)?;
707 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
708
709 let flat_rows = self.record_batches_to_rows(all_batches)?;
711
712 let rows = flat_rows
715 .into_iter()
716 .map(|mut row| {
717 for var in &merge_variables {
718 if row.contains_key(var) {
720 continue;
721 }
722 let prefix = format!("{}.", var);
723 let dotted_keys: Vec<String> = row
724 .keys()
725 .filter(|k| k.starts_with(&prefix))
726 .cloned()
727 .collect();
728 if !dotted_keys.is_empty() {
729 let mut map = HashMap::new();
730 for key in dotted_keys {
731 let prop_name = key[prefix.len()..].to_string();
732 if let Some(val) = row.remove(&key) {
733 map.insert(prop_name, val);
734 }
735 }
736 row.insert(var.clone(), Value::Map(map));
737 }
738 }
739 row
740 })
741 .collect();
742
743 Ok(rows)
744 }
745
746 pub(crate) fn record_batches_to_rows(
753 &self,
754 batches: Vec<RecordBatch>,
755 ) -> Result<Vec<HashMap<String, Value>>> {
756 let mut rows = Vec::new();
757
758 for batch in batches {
759 let num_rows = batch.num_rows();
760 let schema = batch.schema();
761
762 for row_idx in 0..num_rows {
763 let mut row = HashMap::new();
764
765 for (col_idx, field) in schema.fields().iter().enumerate() {
766 let column = batch.column(col_idx);
767 let data_type =
769 if uni_common::core::schema::is_datetime_struct(field.data_type()) {
770 Some(&uni_common::DataType::DateTime)
771 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
772 Some(&uni_common::DataType::Time)
773 } else {
774 None
775 };
776 let mut value =
777 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
778
779 if field
782 .metadata()
783 .get("cv_encoded")
784 .is_some_and(|v| v == "true")
785 && let Value::String(s) = &value
786 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
787 {
788 value = Value::from(parsed);
789 }
790
791 value = Self::normalize_path_if_needed(value);
793
794 row.insert(field.name().clone(), value);
795 }
796
797 let bare_vars: Vec<String> = row
806 .keys()
807 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
808 .cloned()
809 .collect();
810
811 let vid_placeholder_vars: Vec<String> = row
815 .keys()
816 .filter(|k| {
817 !k.contains('.')
818 && matches!(row.get(*k), Some(Value::String(_)))
819 && row.contains_key(&format!("{}._vid", k))
820 })
821 .cloned()
822 .collect();
823
824 for var in &vid_placeholder_vars {
825 promote_vid_placeholder(&mut row, var);
826 }
827
828 for var in &bare_vars {
829 merge_dotted_columns(&mut row, var);
830 }
831
832 rows.push(row);
833 }
834 }
835
836 Ok(rows)
837 }
838
839 fn normalize_path_if_needed(value: Value) -> Value {
844 match value {
845 Value::Map(map)
846 if map.contains_key("nodes")
847 && (map.contains_key("relationships") || map.contains_key("edges")) =>
848 {
849 Self::normalize_path_map(map)
850 }
851 other => other,
852 }
853 }
854
855 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
857 if let Some(Value::List(nodes)) = map.remove("nodes") {
859 let normalized_nodes: Vec<Value> = nodes
860 .into_iter()
861 .map(|n| {
862 if let Value::Map(node_map) = n {
863 Self::normalize_path_node_map(node_map)
864 } else {
865 n
866 }
867 })
868 .collect();
869 map.insert("nodes".to_string(), Value::List(normalized_nodes));
870 }
871
872 let rels_key = if map.contains_key("relationships") {
874 "relationships"
875 } else {
876 "edges"
877 };
878 if let Some(Value::List(rels)) = map.remove(rels_key) {
879 let normalized_rels: Vec<Value> = rels
880 .into_iter()
881 .map(|r| {
882 if let Value::Map(rel_map) = r {
883 Self::normalize_path_edge_map(rel_map)
884 } else {
885 r
886 }
887 })
888 .collect();
889 map.insert("relationships".to_string(), Value::List(normalized_rels));
890 }
891
892 Value::Map(map)
893 }
894
895 fn value_to_id_string(val: Value) -> String {
897 match val {
898 Value::Int(n) => n.to_string(),
899 Value::Float(n) => n.to_string(),
900 Value::String(s) => s,
901 other => other.to_string(),
902 }
903 }
904
905 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
908 if let Some(val) = map.remove(src_key) {
909 map.insert(
910 dst_key.to_string(),
911 Value::String(Self::value_to_id_string(val)),
912 );
913 }
914 }
915
916 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
918 match map.get("properties") {
919 Some(props) if !props.is_null() => {}
920 _ => {
921 map.insert("properties".to_string(), Value::Map(HashMap::new()));
922 }
923 }
924 }
925
926 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
928 Self::stringify_map_field(&mut map, "_vid", "_id");
929 Self::ensure_properties_map(&mut map);
930 Value::Map(map)
931 }
932
933 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
935 Self::stringify_map_field(&mut map, "_eid", "_id");
936 Self::stringify_map_field(&mut map, "_src", "_src");
937 Self::stringify_map_field(&mut map, "_dst", "_dst");
938
939 if let Some(type_name) = map.remove("_type_name") {
940 map.insert("_type".to_string(), type_name);
941 }
942
943 Self::ensure_properties_map(&mut map);
944 Value::Map(map)
945 }
946
947 #[instrument(
948 skip(self, prop_manager, params),
949 fields(rows_returned, duration_ms),
950 level = "info"
951 )]
952 pub fn execute<'a>(
953 &'a self,
954 plan: LogicalPlan,
955 prop_manager: &'a PropertyManager,
956 params: &'a HashMap<String, Value>,
957 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
958 Box::pin(async move {
959 let query_type = Self::get_plan_type(&plan);
960 let ctx = self.get_context().await;
961 let start = Instant::now();
962
963 let res = if Self::is_ddl_or_admin(&plan) {
966 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
967 .await
968 } else {
969 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
970 self.record_batches_to_rows(batches)
971 };
972
973 let duration = start.elapsed();
974 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
975 .record(duration.as_secs_f64());
976
977 tracing::Span::current().record("duration_ms", duration.as_millis());
978 match &res {
979 Ok(rows) => {
980 tracing::Span::current().record("rows_returned", rows.len());
981 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
982 .increment(rows.len() as u64);
983 }
984 Err(e) => {
985 let error_type = if e.to_string().contains("timed out") {
986 "timeout"
987 } else if e.to_string().contains("syntax") {
988 "syntax"
989 } else {
990 "execution"
991 };
992 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
993 }
994 }
995
996 res
997 })
998 }
999
1000 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1001 match plan {
1002 LogicalPlan::Scan { .. } => "read_scan",
1003 LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1004 LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1005 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1006 LogicalPlan::Traverse { .. } => "read_traverse",
1007 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1008 LogicalPlan::ScanAll { .. } => "read_scan_all",
1009 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1010 LogicalPlan::VectorKnn { .. } => "read_vector",
1011 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1012 LogicalPlan::Merge { .. } => "write_merge",
1013 LogicalPlan::Delete { .. } => "write_delete",
1014 LogicalPlan::Set { .. } => "write_set",
1015 LogicalPlan::Remove { .. } => "write_remove",
1016 LogicalPlan::ProcedureCall { .. } => "call",
1017 LogicalPlan::Copy { .. } => "copy",
1018 LogicalPlan::Backup { .. } => "backup",
1019 _ => "other",
1020 }
1021 }
1022
1023 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1033 match plan {
1034 LogicalPlan::Project { input, .. }
1036 | LogicalPlan::Sort { input, .. }
1037 | LogicalPlan::Limit { input, .. }
1038 | LogicalPlan::Distinct { input }
1039 | LogicalPlan::Aggregate { input, .. }
1040 | LogicalPlan::Window { input, .. }
1041 | LogicalPlan::Unwind { input, .. }
1042 | LogicalPlan::Filter { input, .. }
1043 | LogicalPlan::Create { input, .. }
1044 | LogicalPlan::CreateBatch { input, .. }
1045 | LogicalPlan::Set { input, .. }
1046 | LogicalPlan::Remove { input, .. }
1047 | LogicalPlan::Delete { input, .. }
1048 | LogicalPlan::Merge { input, .. }
1049 | LogicalPlan::Foreach { input, .. }
1050 | LogicalPlan::Traverse { input, .. }
1051 | LogicalPlan::TraverseMainByType { input, .. }
1052 | LogicalPlan::BindZeroLengthPath { input, .. }
1053 | LogicalPlan::BindPath { input, .. }
1054 | LogicalPlan::ShortestPath { input, .. }
1055 | LogicalPlan::AllShortestPaths { input, .. }
1056 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1057
1058 LogicalPlan::Apply {
1060 input, subquery, ..
1061 }
1062 | LogicalPlan::SubqueryCall { input, subquery } => {
1063 vec![input.as_ref(), subquery.as_ref()]
1064 }
1065 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1066 vec![left.as_ref(), right.as_ref()]
1067 }
1068 LogicalPlan::RecursiveCTE {
1069 initial, recursive, ..
1070 } => vec![initial.as_ref(), recursive.as_ref()],
1071 LogicalPlan::QuantifiedPattern {
1072 input,
1073 pattern_plan,
1074 ..
1075 } => vec![input.as_ref(), pattern_plan.as_ref()],
1076
1077 _ => vec![],
1079 }
1080 }
1081
1082 pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1089 match plan {
1090 LogicalPlan::CreateLabel(_)
1092 | LogicalPlan::CreateEdgeType(_)
1093 | LogicalPlan::AlterLabel(_)
1094 | LogicalPlan::AlterEdgeType(_)
1095 | LogicalPlan::DropLabel(_)
1096 | LogicalPlan::DropEdgeType(_)
1097 | LogicalPlan::CreateConstraint(_)
1098 | LogicalPlan::DropConstraint(_)
1099 | LogicalPlan::ShowConstraints(_) => true,
1100
1101 LogicalPlan::CreateVectorIndex { .. }
1103 | LogicalPlan::CreateFullTextIndex { .. }
1104 | LogicalPlan::CreateScalarIndex { .. }
1105 | LogicalPlan::CreateJsonFtsIndex { .. }
1106 | LogicalPlan::DropIndex { .. }
1107 | LogicalPlan::ShowIndexes { .. } => true,
1108
1109 LogicalPlan::ShowDatabase
1111 | LogicalPlan::ShowConfig
1112 | LogicalPlan::ShowStatistics
1113 | LogicalPlan::Vacuum
1114 | LogicalPlan::Checkpoint
1115 | LogicalPlan::Copy { .. }
1116 | LogicalPlan::CopyTo { .. }
1117 | LogicalPlan::CopyFrom { .. }
1118 | LogicalPlan::Backup { .. }
1119 | LogicalPlan::Explain { .. } => true,
1120
1121 LogicalPlan::ProcedureCall { procedure_name, .. } => {
1124 !Self::is_df_eligible_procedure(procedure_name)
1125 }
1126
1127 _ => Self::plan_children(plan)
1129 .iter()
1130 .any(|child| Self::is_ddl_or_admin(child)),
1131 }
1132 }
1133
1134 fn is_df_eligible_procedure(name: &str) -> bool {
1140 matches!(
1141 name,
1142 "uni.schema.labels"
1143 | "uni.schema.edgeTypes"
1144 | "uni.schema.relationshipTypes"
1145 | "uni.schema.indexes"
1146 | "uni.schema.constraints"
1147 | "uni.schema.labelInfo"
1148 | "uni.vector.query"
1149 | "uni.fts.query"
1150 | "uni.search"
1151 | "uni.create.vNode"
1158 | "uni.create.vEdge"
1159 ) || name.starts_with("uni.algo.")
1160 }
1161
1162 fn contains_write_operations(plan: &LogicalPlan) -> bool {
1169 match plan {
1170 LogicalPlan::Create { .. }
1171 | LogicalPlan::CreateBatch { .. }
1172 | LogicalPlan::Merge { .. }
1173 | LogicalPlan::Delete { .. }
1174 | LogicalPlan::Set { .. }
1175 | LogicalPlan::Remove { .. }
1176 | LogicalPlan::Foreach { .. } => true,
1177 _ => Self::plan_children(plan)
1178 .iter()
1179 .any(|child| Self::contains_write_operations(child)),
1180 }
1181 }
1182
1183 pub fn execute_stream(
1188 self,
1189 plan: LogicalPlan,
1190 prop_manager: Arc<PropertyManager>,
1191 params: HashMap<String, Value>,
1192 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1193 let this = self;
1194 let this_for_ctx = this.clone();
1195
1196 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1197
1198 ctx_stream
1199 .flat_map(move |ctx| {
1200 let plan = plan.clone();
1201 let this = this.clone();
1202 let prop_manager = prop_manager.clone();
1203 let params = params.clone();
1204
1205 let fut = async move {
1206 if Self::is_ddl_or_admin(&plan) {
1207 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1208 .await
1209 } else {
1210 let batches = this
1211 .execute_datafusion(plan, &prop_manager, ¶ms)
1212 .await?;
1213 this.record_batches_to_rows(batches)
1214 }
1215 };
1216 stream::once(fut).boxed()
1217 })
1218 .boxed()
1219 }
1220
1221 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1224 arrow_convert::arrow_to_value(col, row, None)
1225 }
1226
1227 pub(crate) fn evaluate_expr<'a>(
1228 &'a self,
1229 expr: &'a Expr,
1230 row: &'a HashMap<String, Value>,
1231 prop_manager: &'a PropertyManager,
1232 params: &'a HashMap<String, Value>,
1233 ctx: Option<&'a QueryContext>,
1234 ) -> BoxFuture<'a, Result<Value>> {
1235 let this = self;
1236 Box::pin(async move {
1237 let repr = expr.to_string_repr();
1239 if let Some(val) = row.get(&repr) {
1240 return Ok(val.clone());
1241 }
1242
1243 match expr {
1244 Expr::PatternComprehension { .. } => {
1245 Err(anyhow::anyhow!(
1247 "Pattern comprehensions are handled by DataFusion executor"
1248 ))
1249 }
1250 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1251 "COLLECT subqueries not yet supported in executor"
1252 )),
1253 Expr::Variable(name) => {
1254 if let Some(val) = row.get(name) {
1255 Ok(val.clone())
1256 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1257 Ok(vid_val.clone())
1261 } else {
1262 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1263 }
1264 }
1265 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1266 Expr::Property(var_expr, prop_name) => {
1267 if let Expr::Variable(var_name) = var_expr.as_ref() {
1271 let flat_key = format!("{}.{}", var_name, prop_name);
1272 if let Some(val) = row.get(flat_key.as_str()) {
1273 return Ok(val.clone());
1274 }
1275 }
1276
1277 let base_val = this
1278 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1279 .await?;
1280
1281 if (prop_name == "_vid" || prop_name == "_id")
1283 && let Ok(vid) = Self::vid_from_value(&base_val)
1284 {
1285 return Ok(Value::Int(vid.as_u64() as i64));
1286 }
1287
1288 if let Value::Node(node) = &base_val {
1290 if prop_name == "_vid" || prop_name == "_id" {
1292 return Ok(Value::Int(node.vid.as_u64() as i64));
1293 }
1294 if prop_name == "_labels" {
1295 return Ok(Value::List(
1296 node.labels
1297 .iter()
1298 .map(|l| Value::String(l.clone()))
1299 .collect(),
1300 ));
1301 }
1302 if let Some(val) = node.properties.get(prop_name.as_str()) {
1304 return Ok(val.clone());
1305 }
1306 if let Ok(val) = prop_manager
1308 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1309 .await
1310 {
1311 return Ok(val);
1312 }
1313 return Ok(Value::Null);
1314 }
1315
1316 if let Value::Edge(edge) = &base_val {
1318 if prop_name == "_eid" || prop_name == "_id" {
1320 return Ok(Value::Int(edge.eid.as_u64() as i64));
1321 }
1322 if prop_name == "_type" {
1323 return Ok(Value::String(edge.edge_type.clone()));
1324 }
1325 if prop_name == "_src" {
1326 return Ok(Value::Int(edge.src.as_u64() as i64));
1327 }
1328 if prop_name == "_dst" {
1329 return Ok(Value::Int(edge.dst.as_u64() as i64));
1330 }
1331 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1333 return Ok(val.clone());
1334 }
1335 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1337 {
1338 return Ok(val);
1339 }
1340 return Ok(Value::Null);
1341 }
1342
1343 if let Value::Map(map) = &base_val {
1346 if let Some(val) = map.get(prop_name.as_str()) {
1348 return Ok(val.clone());
1349 }
1350 if let Some(Value::Map(props)) = map.get("properties")
1352 && let Some(val) = props.get(prop_name.as_str())
1353 {
1354 return Ok(val.clone());
1355 }
1356 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1358 map.get("_id")
1359 .and_then(|v| v.as_str())
1360 .and_then(|s| s.parse::<u64>().ok())
1361 });
1362 if let Some(id) = vid_opt {
1363 let vid = Vid::from(id);
1364 if let Ok(val) = prop_manager
1365 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1366 .await
1367 {
1368 return Ok(val);
1369 }
1370 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1371 let eid = uni_common::core::id::Eid::from(id);
1372 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1373 return Ok(val);
1374 }
1375 }
1376 return Ok(Value::Null);
1377 }
1378
1379 if let Ok(vid) = Self::vid_from_value(&base_val) {
1381 return prop_manager
1382 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1383 .await;
1384 }
1385
1386 if base_val.is_null() {
1387 return Ok(Value::Null);
1388 }
1389
1390 {
1392 use crate::query::datetime::{
1393 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1394 is_duration_string, is_temporal_accessor, is_temporal_string,
1395 };
1396
1397 if let Value::Temporal(tv) = &base_val {
1399 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1400 if is_duration_accessor(prop_name) {
1401 return eval_duration_accessor(
1403 &base_val.to_string(),
1404 prop_name,
1405 );
1406 }
1407 } else if is_temporal_accessor(prop_name) {
1408 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1409 }
1410 }
1411
1412 if let Value::String(s) = &base_val {
1414 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1415 return eval_temporal_accessor(s, prop_name);
1416 }
1417 if is_duration_string(s) && is_duration_accessor(prop_name) {
1418 return eval_duration_accessor(s, prop_name);
1419 }
1420 }
1421 }
1422
1423 Err(anyhow!(
1424 "Cannot access property '{}' on {:?}",
1425 prop_name,
1426 base_val
1427 ))
1428 }
1429 Expr::ArrayIndex {
1430 array: arr_expr,
1431 index: idx_expr,
1432 } => {
1433 let arr_val = this
1434 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1435 .await?;
1436 let idx_val = this
1437 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1438 .await?;
1439
1440 if let Value::List(arr) = &arr_val {
1441 if let Some(i) = idx_val.as_i64() {
1443 let idx = if i < 0 {
1444 let positive_idx = arr.len() as i64 + i;
1446 if positive_idx < 0 {
1447 return Ok(Value::Null); }
1449 positive_idx as usize
1450 } else {
1451 i as usize
1452 };
1453 if idx < arr.len() {
1454 return Ok(arr[idx].clone());
1455 }
1456 return Ok(Value::Null);
1457 } else if idx_val.is_null() {
1458 return Ok(Value::Null);
1459 } else {
1460 return Err(anyhow::anyhow!(
1461 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1462 idx_val
1463 ));
1464 }
1465 }
1466 if let Value::Map(map) = &arr_val {
1467 if let Some(key) = idx_val.as_str() {
1468 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1469 } else if !idx_val.is_null() {
1470 return Err(anyhow::anyhow!(
1471 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1472 idx_val
1473 ));
1474 }
1475 }
1476 if let Value::Node(node) = &arr_val {
1478 if let Some(key) = idx_val.as_str() {
1479 if let Some(val) = node.properties.get(key) {
1481 return Ok(val.clone());
1482 }
1483 if let Ok(val) = prop_manager
1485 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1486 .await
1487 {
1488 return Ok(val);
1489 }
1490 return Ok(Value::Null);
1491 } else if !idx_val.is_null() {
1492 return Err(anyhow::anyhow!(
1493 "TypeError: Node index must be a string, got: {:?}",
1494 idx_val
1495 ));
1496 }
1497 }
1498 if let Value::Edge(edge) = &arr_val {
1500 if let Some(key) = idx_val.as_str() {
1501 if let Some(val) = edge.properties.get(key) {
1503 return Ok(val.clone());
1504 }
1505 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1507 return Ok(val);
1508 }
1509 return Ok(Value::Null);
1510 } else if !idx_val.is_null() {
1511 return Err(anyhow::anyhow!(
1512 "TypeError: Edge index must be a string, got: {:?}",
1513 idx_val
1514 ));
1515 }
1516 }
1517 if let Ok(vid) = Self::vid_from_value(&arr_val)
1519 && let Some(key) = idx_val.as_str()
1520 {
1521 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1522 {
1523 return Ok(val);
1524 }
1525 return Ok(Value::Null);
1526 }
1527 if arr_val.is_null() {
1528 return Ok(Value::Null);
1529 }
1530 Err(anyhow!(
1531 "TypeError: InvalidArgumentType - cannot index into {:?}",
1532 arr_val
1533 ))
1534 }
1535 Expr::ArraySlice { array, start, end } => {
1536 let arr_val = this
1537 .evaluate_expr(array, row, prop_manager, params, ctx)
1538 .await?;
1539
1540 if let Value::List(arr) = &arr_val {
1541 let len = arr.len();
1542
1543 let start_idx = if let Some(s) = start {
1545 let v = this
1546 .evaluate_expr(s, row, prop_manager, params, ctx)
1547 .await?;
1548 if v.is_null() {
1549 return Ok(Value::Null);
1550 }
1551 let raw = v.as_i64().unwrap_or(0);
1552 if raw < 0 {
1553 (len as i64 + raw).max(0) as usize
1554 } else {
1555 (raw as usize).min(len)
1556 }
1557 } else {
1558 0
1559 };
1560
1561 let end_idx = if let Some(e) = end {
1563 let v = this
1564 .evaluate_expr(e, row, prop_manager, params, ctx)
1565 .await?;
1566 if v.is_null() {
1567 return Ok(Value::Null);
1568 }
1569 let raw = v.as_i64().unwrap_or(len as i64);
1570 if raw < 0 {
1571 (len as i64 + raw).max(0) as usize
1572 } else {
1573 (raw as usize).min(len)
1574 }
1575 } else {
1576 len
1577 };
1578
1579 if start_idx >= end_idx {
1581 return Ok(Value::List(vec![]));
1582 }
1583 let end_idx = end_idx.min(len);
1584 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1585 }
1586
1587 if arr_val.is_null() {
1588 return Ok(Value::Null);
1589 }
1590 Err(anyhow!("Cannot slice {:?}", arr_val))
1591 }
1592 Expr::Literal(lit) => Ok(lit.to_value()),
1593 Expr::List(items) => {
1594 let mut vals = Vec::new();
1595 for item in items {
1596 vals.push(
1597 this.evaluate_expr(item, row, prop_manager, params, ctx)
1598 .await?,
1599 );
1600 }
1601 Ok(Value::List(vals))
1602 }
1603 Expr::Map(items) => {
1604 let mut map = HashMap::new();
1605 for (key, value_expr) in items {
1606 let val = this
1607 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1608 .await?;
1609 map.insert(key.clone(), val);
1610 }
1611 Ok(Value::Map(map))
1612 }
1613 Expr::Exists { query, .. } => {
1614 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1616 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1617
1618 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1619 Ok(plan) => {
1620 let mut sub_params = params.clone();
1621 sub_params.extend(row.clone());
1622
1623 match this.execute(plan, prop_manager, &sub_params).await {
1624 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1625 Err(e) => {
1626 log::debug!("EXISTS subquery execution failed: {}", e);
1627 Ok(Value::Bool(false))
1628 }
1629 }
1630 }
1631 Err(e) => {
1632 log::debug!("EXISTS subquery planning failed: {}", e);
1633 Ok(Value::Bool(false))
1634 }
1635 }
1636 }
1637 Expr::CountSubquery(query) => {
1638 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1640
1641 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1642
1643 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1644 Ok(plan) => {
1645 let mut sub_params = params.clone();
1646 sub_params.extend(row.clone());
1647
1648 match this.execute(plan, prop_manager, &sub_params).await {
1649 Ok(results) => Ok(Value::from(results.len() as i64)),
1650 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1651 }
1652 }
1653 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1654 }
1655 }
1656 Expr::Quantifier {
1657 quantifier,
1658 variable,
1659 list,
1660 predicate,
1661 } => {
1662 let list_val = this
1676 .evaluate_expr(list, row, prop_manager, params, ctx)
1677 .await?;
1678
1679 if list_val.is_null() {
1681 return Ok(Value::Null);
1682 }
1683
1684 let items = match list_val {
1686 Value::List(arr) => arr,
1687 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1688 };
1689
1690 let mut satisfied_count = 0;
1692 for item in &items {
1693 let mut item_row = row.clone();
1695 item_row.insert(variable.clone(), item.clone());
1696
1697 let pred_result = this
1699 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1700 .await?;
1701
1702 if let Value::Bool(true) = pred_result {
1704 satisfied_count += 1;
1705 }
1706 }
1707
1708 let result = match quantifier {
1710 Quantifier::All => satisfied_count == items.len(),
1711 Quantifier::Any => satisfied_count > 0,
1712 Quantifier::Single => satisfied_count == 1,
1713 Quantifier::None => satisfied_count == 0,
1714 };
1715
1716 Ok(Value::Bool(result))
1717 }
1718 Expr::ListComprehension {
1719 variable,
1720 list,
1721 where_clause,
1722 map_expr,
1723 } => {
1724 let list_val = this
1731 .evaluate_expr(list, row, prop_manager, params, ctx)
1732 .await?;
1733
1734 if list_val.is_null() {
1736 return Ok(Value::Null);
1737 }
1738
1739 let items = match list_val {
1741 Value::List(arr) => arr,
1742 _ => {
1743 return Err(anyhow!(
1744 "List comprehension expects a list, got: {:?}",
1745 list_val
1746 ));
1747 }
1748 };
1749
1750 let mut results = Vec::new();
1752 for item in &items {
1753 let mut item_row = row.clone();
1755 item_row.insert(variable.clone(), item.clone());
1756
1757 if let Some(predicate) = where_clause {
1759 let pred_result = this
1760 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1761 .await?;
1762
1763 if !matches!(pred_result, Value::Bool(true)) {
1765 continue;
1766 }
1767 }
1768
1769 let mapped_val = this
1771 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1772 .await?;
1773 results.push(mapped_val);
1774 }
1775
1776 Ok(Value::List(results))
1777 }
1778 Expr::BinaryOp { left, op, right } => {
1779 match op {
1781 BinaryOp::And => {
1782 let l_val = this
1783 .evaluate_expr(left, row, prop_manager, params, ctx)
1784 .await?;
1785 if let Some(false) = l_val.as_bool() {
1787 return Ok(Value::Bool(false));
1788 }
1789 let r_val = this
1790 .evaluate_expr(right, row, prop_manager, params, ctx)
1791 .await?;
1792 eval_binary_op(&l_val, op, &r_val)
1793 }
1794 BinaryOp::Or => {
1795 let l_val = this
1796 .evaluate_expr(left, row, prop_manager, params, ctx)
1797 .await?;
1798 if let Some(true) = l_val.as_bool() {
1800 return Ok(Value::Bool(true));
1801 }
1802 let r_val = this
1803 .evaluate_expr(right, row, prop_manager, params, ctx)
1804 .await?;
1805 eval_binary_op(&l_val, op, &r_val)
1806 }
1807 _ => {
1808 let l_val = this
1810 .evaluate_expr(left, row, prop_manager, params, ctx)
1811 .await?;
1812 let r_val = this
1813 .evaluate_expr(right, row, prop_manager, params, ctx)
1814 .await?;
1815 eval_binary_op(&l_val, op, &r_val)
1816 }
1817 }
1818 }
1819 Expr::In { expr, list } => {
1820 let l_val = this
1821 .evaluate_expr(expr, row, prop_manager, params, ctx)
1822 .await?;
1823 let r_val = this
1824 .evaluate_expr(list, row, prop_manager, params, ctx)
1825 .await?;
1826 eval_in_op(&l_val, &r_val)
1827 }
1828 Expr::UnaryOp { op, expr } => {
1829 let val = this
1830 .evaluate_expr(expr, row, prop_manager, params, ctx)
1831 .await?;
1832 match op {
1833 UnaryOp::Not => {
1834 match val.as_bool() {
1836 Some(b) => Ok(Value::Bool(!b)),
1837 None if val.is_null() => Ok(Value::Null),
1838 None => Err(anyhow!(
1839 "InvalidArgumentType: NOT requires a boolean argument"
1840 )),
1841 }
1842 }
1843 UnaryOp::Neg => {
1844 if let Some(i) = val.as_i64() {
1845 Ok(Value::Int(-i))
1846 } else if let Some(f) = val.as_f64() {
1847 Ok(Value::Float(-f))
1848 } else {
1849 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1850 }
1851 }
1852 }
1853 }
1854 Expr::IsNull(expr) => {
1855 let val = this
1856 .evaluate_expr(expr, row, prop_manager, params, ctx)
1857 .await?;
1858 Ok(Value::Bool(val.is_null()))
1859 }
1860 Expr::IsNotNull(expr) => {
1861 let val = this
1862 .evaluate_expr(expr, row, prop_manager, params, ctx)
1863 .await?;
1864 Ok(Value::Bool(!val.is_null()))
1865 }
1866 Expr::IsUnique(_) => {
1867 Err(anyhow!(
1869 "IS UNIQUE can only be used in constraint definitions"
1870 ))
1871 }
1872 Expr::Case {
1873 expr,
1874 when_then,
1875 else_expr,
1876 } => {
1877 if let Some(base_expr) = expr {
1878 let base_val = this
1879 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1880 .await?;
1881 for (w, t) in when_then {
1882 let w_val = this
1883 .evaluate_expr(w, row, prop_manager, params, ctx)
1884 .await?;
1885 if base_val == w_val {
1886 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1887 }
1888 }
1889 } else {
1890 for (w, t) in when_then {
1891 let w_val = this
1892 .evaluate_expr(w, row, prop_manager, params, ctx)
1893 .await?;
1894 if w_val.as_bool() == Some(true) {
1895 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1896 }
1897 }
1898 }
1899 if let Some(e) = else_expr {
1900 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1901 }
1902 Ok(Value::Null)
1903 }
1904 Expr::Wildcard => Ok(Value::Null),
1905 Expr::FunctionCall { name, args, .. } => {
1906 if name.eq_ignore_ascii_case("ID") {
1908 if args.len() != 1 {
1909 return Err(anyhow!("id() requires exactly 1 argument"));
1910 }
1911 let val = this
1912 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1913 .await?;
1914 if let Value::Map(map) = &val {
1915 if let Some(vid_val) = map.get("_vid") {
1917 return Ok(vid_val.clone());
1918 }
1919 if let Some(eid_val) = map.get("_eid") {
1921 return Ok(eid_val.clone());
1922 }
1923 if let Some(id_val) = map.get("_id") {
1925 return Ok(id_val.clone());
1926 }
1927 }
1928 return Ok(Value::Null);
1929 }
1930
1931 if name.eq_ignore_ascii_case("ELEMENTID") {
1933 if args.len() != 1 {
1934 return Err(anyhow!("elementId() requires exactly 1 argument"));
1935 }
1936 let val = this
1937 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1938 .await?;
1939 if let Value::Map(map) = &val {
1940 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1943 return Ok(Value::String(vid_val.to_string()));
1944 }
1945 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1948 return Ok(Value::String(eid_val.to_string()));
1949 }
1950 }
1951 return Ok(Value::Null);
1952 }
1953
1954 if name.eq_ignore_ascii_case("TYPE") {
1956 if args.len() != 1 {
1957 return Err(anyhow!("type() requires exactly 1 argument"));
1958 }
1959 let val = this
1960 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1961 .await?;
1962 if let Value::Map(map) = &val
1963 && let Some(type_val) = map.get("_type")
1964 {
1965 if let Some(type_id) =
1967 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1968 {
1969 if let Some(name) = this
1970 .storage
1971 .schema_manager()
1972 .edge_type_name_by_id_unified(type_id)
1973 {
1974 return Ok(Value::String(name));
1975 }
1976 } else if let Some(name) = type_val.as_str() {
1977 return Ok(Value::String(name.to_string()));
1978 }
1979 }
1980 return Ok(Value::Null);
1981 }
1982
1983 if name.eq_ignore_ascii_case("LABELS") {
1985 if args.len() != 1 {
1986 return Err(anyhow!("labels() requires exactly 1 argument"));
1987 }
1988 let val = this
1989 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1990 .await?;
1991 if let Value::Map(map) = &val
1992 && let Some(labels_val) = map.get("_labels")
1993 {
1994 return Ok(labels_val.clone());
1995 }
1996 return Ok(Value::Null);
1997 }
1998
1999 if name.eq_ignore_ascii_case("PROPERTIES") {
2001 if args.len() != 1 {
2002 return Err(anyhow!("properties() requires exactly 1 argument"));
2003 }
2004 let val = this
2005 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2006 .await?;
2007 if let Value::Map(map) = &val {
2008 let mut props = HashMap::new();
2010 for (k, v) in map.iter() {
2011 if !k.starts_with('_') {
2012 props.insert(k.clone(), v.clone());
2013 }
2014 }
2015 return Ok(Value::Map(props));
2016 }
2017 return Ok(Value::Null);
2018 }
2019
2020 if name.eq_ignore_ascii_case("STARTNODE") {
2022 if args.len() != 1 {
2023 return Err(anyhow!("startNode() requires exactly 1 argument"));
2024 }
2025 let val = this
2026 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2027 .await?;
2028 if let Value::Edge(edge) = &val {
2029 return Ok(Self::find_node_by_vid(row, edge.src));
2030 }
2031 if let Value::Map(map) = &val {
2032 if let Some(start_node) = map.get("_startNode") {
2033 return Ok(start_node.clone());
2034 }
2035 if let Some(src_vid) = map.get("_src_vid") {
2036 return Ok(Value::Map(HashMap::from([(
2037 "_vid".to_string(),
2038 src_vid.clone(),
2039 )])));
2040 }
2041 if let Some(src_id) = map.get("_src")
2043 && let Some(u) = src_id.as_u64()
2044 {
2045 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2046 }
2047 }
2048 return Ok(Value::Null);
2049 }
2050
2051 if name.eq_ignore_ascii_case("ENDNODE") {
2053 if args.len() != 1 {
2054 return Err(anyhow!("endNode() requires exactly 1 argument"));
2055 }
2056 let val = this
2057 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2058 .await?;
2059 if let Value::Edge(edge) = &val {
2060 return Ok(Self::find_node_by_vid(row, edge.dst));
2061 }
2062 if let Value::Map(map) = &val {
2063 if let Some(end_node) = map.get("_endNode") {
2064 return Ok(end_node.clone());
2065 }
2066 if let Some(dst_vid) = map.get("_dst_vid") {
2067 return Ok(Value::Map(HashMap::from([(
2068 "_vid".to_string(),
2069 dst_vid.clone(),
2070 )])));
2071 }
2072 if let Some(dst_id) = map.get("_dst")
2074 && let Some(u) = dst_id.as_u64()
2075 {
2076 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2077 }
2078 }
2079 return Ok(Value::Null);
2080 }
2081
2082 if name.eq_ignore_ascii_case("HASLABEL") {
2085 if args.len() != 2 {
2086 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2087 }
2088 let node_val = this
2089 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2090 .await?;
2091 let label_val = this
2092 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2093 .await?;
2094
2095 let label_to_check = label_val.as_str().ok_or_else(|| {
2096 anyhow!("Second argument to hasLabel must be a string")
2097 })?;
2098
2099 let has_label = match &node_val {
2100 Value::Map(map) if map.contains_key("_vid") => {
2102 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2103 labels_arr
2104 .iter()
2105 .any(|l| l.as_str() == Some(label_to_check))
2106 } else {
2107 false
2108 }
2109 }
2110 Value::Map(map) => {
2112 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2113 labels_arr
2114 .iter()
2115 .any(|l| l.as_str() == Some(label_to_check))
2116 } else {
2117 false
2118 }
2119 }
2120 _ => false,
2121 };
2122 return Ok(Value::Bool(has_label));
2123 }
2124
2125 if matches!(
2128 name.to_uppercase().as_str(),
2129 "ANY" | "ALL" | "NONE" | "SINGLE"
2130 ) {
2131 return Err(anyhow!(
2132 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2133 name.to_lowercase()
2134 ));
2135 }
2136
2137 if name.eq_ignore_ascii_case("COALESCE") {
2139 for arg in args {
2140 let val = this
2141 .evaluate_expr(arg, row, prop_manager, params, ctx)
2142 .await?;
2143 if !val.is_null() {
2144 return Ok(val);
2145 }
2146 }
2147 return Ok(Value::Null);
2148 }
2149
2150 if name.eq_ignore_ascii_case("vector_similarity") {
2152 if args.len() != 2 {
2153 return Err(anyhow!("vector_similarity takes 2 arguments"));
2154 }
2155 let v1 = this
2156 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2157 .await?;
2158 let v2 = this
2159 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2160 .await?;
2161 return eval_vector_similarity(&v1, &v2);
2162 }
2163
2164 if name.eq_ignore_ascii_case("uni.temporal.validAt")
2166 || name.eq_ignore_ascii_case("uni.validAt")
2167 || name.eq_ignore_ascii_case("validAt")
2168 {
2169 if args.len() != 4 {
2170 return Err(anyhow!("validAt requires 4 arguments"));
2171 }
2172 let node_val = this
2173 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2174 .await?;
2175 let start_prop = this
2176 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2177 .await?
2178 .as_str()
2179 .ok_or(anyhow!("start_prop must be string"))?
2180 .to_string();
2181 let end_prop = this
2182 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2183 .await?
2184 .as_str()
2185 .ok_or(anyhow!("end_prop must be string"))?
2186 .to_string();
2187 let time_val = this
2188 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2189 .await?;
2190
2191 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2192 anyhow!("time argument must be a datetime value or string")
2193 })?;
2194
2195 let valid_from_val: Option<Value> = if let Ok(vid) =
2197 Self::vid_from_value(&node_val)
2198 {
2199 prop_manager
2201 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2202 .await
2203 .ok()
2204 } else if let Value::Map(map) = &node_val {
2205 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2207 let vid = Vid::from(vid_val);
2208 prop_manager
2209 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2210 .await
2211 .ok()
2212 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2213 let eid = uni_common::core::id::Eid::from(eid_val);
2215 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2216 } else {
2217 map.get(&start_prop).cloned()
2219 }
2220 } else {
2221 return Ok(Value::Bool(false));
2222 };
2223
2224 let valid_from = match valid_from_val {
2225 Some(ref v) => match value_to_datetime_utc(v) {
2226 Some(dt) => dt,
2227 None if v.is_null() => return Ok(Value::Bool(false)),
2228 None => {
2229 return Err(anyhow!(
2230 "Property {} must be a datetime value or string",
2231 start_prop
2232 ));
2233 }
2234 },
2235 None => return Ok(Value::Bool(false)),
2236 };
2237
2238 let valid_to_val: Option<Value> = if let Ok(vid) =
2239 Self::vid_from_value(&node_val)
2240 {
2241 prop_manager
2243 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2244 .await
2245 .ok()
2246 } else if let Value::Map(map) = &node_val {
2247 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2249 let vid = Vid::from(vid_val);
2250 prop_manager
2251 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2252 .await
2253 .ok()
2254 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2255 let eid = uni_common::core::id::Eid::from(eid_val);
2257 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2258 } else {
2259 map.get(&end_prop).cloned()
2261 }
2262 } else {
2263 return Ok(Value::Bool(false));
2264 };
2265
2266 let valid_to = match valid_to_val {
2267 Some(ref v) => match value_to_datetime_utc(v) {
2268 Some(dt) => Some(dt),
2269 None if v.is_null() => None,
2270 None => {
2271 return Err(anyhow!(
2272 "Property {} must be a datetime value or null",
2273 end_prop
2274 ));
2275 }
2276 },
2277 None => None,
2278 };
2279
2280 let is_valid = valid_from <= query_time
2281 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2282 return Ok(Value::Bool(is_valid));
2283 }
2284
2285 let mut evaluated_args = Vec::with_capacity(args.len());
2287 for arg in args {
2288 let mut val = this
2289 .evaluate_expr(arg, row, prop_manager, params, ctx)
2290 .await?;
2291
2292 if let Value::Map(ref mut map) = val {
2295 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2296 }
2297
2298 evaluated_args.push(val);
2299 }
2300 eval_scalar_function(
2301 name,
2302 &evaluated_args,
2303 self.custom_function_registry.as_deref(),
2304 )
2305 }
2306 Expr::Reduce {
2307 accumulator,
2308 init,
2309 variable,
2310 list,
2311 expr,
2312 } => {
2313 let mut acc = self
2314 .evaluate_expr(init, row, prop_manager, params, ctx)
2315 .await?;
2316 let list_val = self
2317 .evaluate_expr(list, row, prop_manager, params, ctx)
2318 .await?;
2319
2320 if let Value::List(items) = list_val {
2321 for item in items {
2322 let mut scope = row.clone();
2326 scope.insert(accumulator.clone(), acc.clone());
2327 scope.insert(variable.clone(), item);
2328
2329 acc = self
2330 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2331 .await?;
2332 }
2333 } else {
2334 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2335 }
2336 Ok(acc)
2337 }
2338 Expr::ValidAt { .. } => {
2339 Err(anyhow!(
2341 "VALID_AT expression should have been transformed to function call in planner"
2342 ))
2343 }
2344
2345 Expr::LabelCheck { expr, labels } => {
2346 let val = this
2347 .evaluate_expr(expr, row, prop_manager, params, ctx)
2348 .await?;
2349 match &val {
2350 Value::Null => Ok(Value::Null),
2351 Value::Map(map) => {
2352 let is_edge = map.contains_key("_eid")
2354 || map.contains_key("_type_name")
2355 || (map.contains_key("_type") && !map.contains_key("_vid"));
2356
2357 if is_edge {
2358 if labels.len() > 1 {
2360 return Ok(Value::Bool(false));
2361 }
2362 let label_to_check = &labels[0];
2363 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2364 {
2365 t == label_to_check
2366 } else if let Some(Value::String(t)) = map.get("_type") {
2367 t == label_to_check
2368 } else {
2369 false
2370 };
2371 Ok(Value::Bool(has_type))
2372 } else {
2373 let has_all = labels.iter().all(|label_to_check| {
2375 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2376 labels_arr
2377 .iter()
2378 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2379 } else {
2380 false
2381 }
2382 });
2383 Ok(Value::Bool(has_all))
2384 }
2385 }
2386 _ => Ok(Value::Bool(false)),
2387 }
2388 }
2389
2390 Expr::MapProjection { base, items } => {
2391 let base_value = this
2392 .evaluate_expr(base, row, prop_manager, params, ctx)
2393 .await?;
2394
2395 let properties = match &base_value {
2397 Value::Map(map) => map,
2398 _ => {
2399 return Err(anyhow!(
2400 "Map projection requires object, got {:?}",
2401 base_value
2402 ));
2403 }
2404 };
2405
2406 let mut result_map = HashMap::new();
2407
2408 for item in items {
2409 match item {
2410 MapProjectionItem::Property(prop) => {
2411 if let Some(value) = properties.get(prop.as_str()) {
2412 result_map.insert(prop.clone(), value.clone());
2413 }
2414 }
2415 MapProjectionItem::AllProperties => {
2416 for (key, value) in properties.iter() {
2418 if !key.starts_with('_') {
2419 result_map.insert(key.clone(), value.clone());
2420 }
2421 }
2422 }
2423 MapProjectionItem::LiteralEntry(key, expr) => {
2424 let value = this
2425 .evaluate_expr(expr, row, prop_manager, params, ctx)
2426 .await?;
2427 result_map.insert(key.clone(), value);
2428 }
2429 MapProjectionItem::Variable(var_name) => {
2430 if let Some(value) = row.get(var_name.as_str()) {
2433 result_map.insert(var_name.clone(), value.clone());
2434 }
2435 }
2436 }
2437 }
2438
2439 Ok(Value::Map(result_map))
2440 }
2441 }
2442 })
2443 }
2444
2445 pub(crate) fn execute_subplan<'a>(
2446 &'a self,
2447 plan: LogicalPlan,
2448 prop_manager: &'a PropertyManager,
2449 params: &'a HashMap<String, Value>,
2450 ctx: Option<&'a QueryContext>,
2451 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2452 Box::pin(async move {
2453 if let Some(ctx) = ctx {
2454 ctx.check_timeout()?;
2455 }
2456 match plan {
2457 LogicalPlan::Union { left, right, all } => {
2458 self.execute_union(left, right, all, prop_manager, params, ctx)
2459 .await
2460 }
2461 LogicalPlan::CreateVectorIndex {
2462 config,
2463 if_not_exists,
2464 } => {
2465 if if_not_exists && self.index_exists_by_name(&config.name) {
2466 return Ok(vec![]);
2467 }
2468 let idx_mgr = self.storage.index_manager();
2469 idx_mgr.create_vector_index(config).await?;
2470 Ok(vec![])
2471 }
2472 LogicalPlan::CreateFullTextIndex {
2473 config,
2474 if_not_exists,
2475 } => {
2476 if if_not_exists && self.index_exists_by_name(&config.name) {
2477 return Ok(vec![]);
2478 }
2479 let idx_mgr = self.storage.index_manager();
2480 idx_mgr.create_fts_index(config).await?;
2481 Ok(vec![])
2482 }
2483 LogicalPlan::CreateScalarIndex {
2484 mut config,
2485 if_not_exists,
2486 } => {
2487 if if_not_exists && self.index_exists_by_name(&config.name) {
2488 return Ok(vec![]);
2489 }
2490
2491 let mut modified_properties = Vec::new();
2493
2494 for prop in &config.properties {
2495 if prop.contains('(') && prop.contains(')') {
2497 let gen_col = SchemaManager::generated_column_name(prop);
2498
2499 let sm = self.storage.schema_manager_arc();
2501 if let Err(e) = sm.add_generated_property(
2502 &config.label,
2503 &gen_col,
2504 DataType::String, prop.clone(),
2506 ) {
2507 log::warn!("Failed to add generated property (might exist): {}", e);
2508 }
2509
2510 modified_properties.push(gen_col);
2511 } else {
2512 modified_properties.push(prop.clone());
2514 }
2515 }
2516
2517 config.properties = modified_properties;
2518
2519 let idx_mgr = self.storage.index_manager();
2520 idx_mgr.create_scalar_index(config).await?;
2521 Ok(vec![])
2522 }
2523 LogicalPlan::CreateJsonFtsIndex {
2524 config,
2525 if_not_exists,
2526 } => {
2527 if if_not_exists && self.index_exists_by_name(&config.name) {
2528 return Ok(vec![]);
2529 }
2530 let idx_mgr = self.storage.index_manager();
2531 idx_mgr.create_json_fts_index(config).await?;
2532 Ok(vec![])
2533 }
2534 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2535 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2536 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2537 LogicalPlan::Vacuum => {
2538 self.execute_vacuum().await?;
2539 Ok(vec![])
2540 }
2541 LogicalPlan::Checkpoint => {
2542 self.execute_checkpoint().await?;
2543 Ok(vec![])
2544 }
2545 LogicalPlan::CopyTo {
2546 label,
2547 path,
2548 format,
2549 options,
2550 } => {
2551 let count = self
2552 .execute_copy_to(&label, &path, &format, &options)
2553 .await?;
2554 let mut result = HashMap::new();
2555 result.insert("count".to_string(), Value::Int(count as i64));
2556 Ok(vec![result])
2557 }
2558 LogicalPlan::CopyFrom {
2559 label,
2560 path,
2561 format,
2562 options,
2563 } => {
2564 let count = self
2565 .execute_copy_from(&label, &path, &format, &options)
2566 .await?;
2567 let mut result = HashMap::new();
2568 result.insert("count".to_string(), Value::Int(count as i64));
2569 Ok(vec![result])
2570 }
2571 LogicalPlan::CreateLabel(clause) => {
2572 self.execute_create_label(clause).await?;
2573 Ok(vec![])
2574 }
2575 LogicalPlan::CreateEdgeType(clause) => {
2576 self.execute_create_edge_type(clause).await?;
2577 Ok(vec![])
2578 }
2579 LogicalPlan::AlterLabel(clause) => {
2580 self.execute_alter_label(clause).await?;
2581 Ok(vec![])
2582 }
2583 LogicalPlan::AlterEdgeType(clause) => {
2584 self.execute_alter_edge_type(clause).await?;
2585 Ok(vec![])
2586 }
2587 LogicalPlan::DropLabel(clause) => {
2588 self.execute_drop_label(clause).await?;
2589 Ok(vec![])
2590 }
2591 LogicalPlan::DropEdgeType(clause) => {
2592 self.execute_drop_edge_type(clause).await?;
2593 Ok(vec![])
2594 }
2595 LogicalPlan::CreateConstraint(clause) => {
2596 self.execute_create_constraint(clause).await?;
2597 Ok(vec![])
2598 }
2599 LogicalPlan::DropConstraint(clause) => {
2600 self.execute_drop_constraint(clause).await?;
2601 Ok(vec![])
2602 }
2603 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2604 LogicalPlan::DropIndex { name, if_exists } => {
2605 let idx_mgr = self.storage.index_manager();
2606 match idx_mgr.drop_index(&name).await {
2607 Ok(_) => Ok(vec![]),
2608 Err(e) => {
2609 if if_exists && e.to_string().contains("not found") {
2610 Ok(vec![])
2611 } else {
2612 Err(e)
2613 }
2614 }
2615 }
2616 }
2617 LogicalPlan::ShowIndexes { filter } => {
2618 Ok(self.execute_show_indexes(filter.as_deref()))
2619 }
2620 LogicalPlan::Scan { .. }
2623 | LogicalPlan::FusedIndexScan { .. }
2624 | LogicalPlan::FusedIndexScanWrapped { .. }
2625 | LogicalPlan::ExtIdLookup { .. }
2626 | LogicalPlan::ScanAll { .. }
2627 | LogicalPlan::ScanMainByLabels { .. }
2628 | LogicalPlan::Traverse { .. }
2629 | LogicalPlan::TraverseMainByType { .. } => {
2630 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2631 self.record_batches_to_rows(batches)
2632 }
2633 LogicalPlan::Filter {
2634 input,
2635 predicate,
2636 optional_variables,
2637 } => {
2638 let input_matches = self
2639 .execute_subplan(*input, prop_manager, params, ctx)
2640 .await?;
2641
2642 tracing::debug!(
2643 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2644 predicate,
2645 input_matches.len(),
2646 optional_variables
2647 );
2648
2649 if !optional_variables.is_empty() {
2653 let is_optional_key = |k: &str| -> bool {
2656 optional_variables.contains(k)
2657 || optional_variables
2658 .iter()
2659 .any(|var| k.starts_with(&format!("{}.", var)))
2660 };
2661
2662 let is_internal_key =
2664 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2665
2666 let non_optional_vars: Vec<String> = input_matches
2668 .first()
2669 .map(|row| {
2670 row.keys()
2671 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2672 .cloned()
2673 .collect()
2674 })
2675 .unwrap_or_default();
2676
2677 let mut groups: std::collections::HashMap<
2679 Vec<u8>,
2680 Vec<HashMap<String, Value>>,
2681 > = std::collections::HashMap::new();
2682
2683 for row in &input_matches {
2684 let key: Vec<u8> = non_optional_vars
2686 .iter()
2687 .map(|var| {
2688 row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2689 })
2690 .collect::<Vec<_>>()
2691 .join("|")
2692 .into_bytes();
2693
2694 groups.entry(key).or_default().push(row.clone());
2695 }
2696
2697 let mut filtered = Vec::new();
2698 for (_key, group_rows) in groups {
2699 let mut group_passed = Vec::new();
2700
2701 for row in &group_rows {
2702 let has_null_optional = optional_variables.iter().any(|var| {
2704 let direct_null =
2706 matches!(row.get(var), Some(Value::Null) | None);
2707 let prefixed_null = row
2708 .keys()
2709 .filter(|k| k.starts_with(&format!("{}.", var)))
2710 .any(|k| matches!(row.get(k), Some(Value::Null)));
2711 direct_null || prefixed_null
2712 });
2713
2714 if has_null_optional {
2715 group_passed.push(row.clone());
2716 continue;
2717 }
2718
2719 let res = self
2720 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2721 .await?;
2722
2723 if res.as_bool().unwrap_or(false) {
2724 group_passed.push(row.clone());
2725 }
2726 }
2727
2728 if group_passed.is_empty() {
2729 if let Some(template) = group_rows.first() {
2732 let mut null_row = HashMap::new();
2733 for (k, v) in template {
2734 if is_optional_key(k) {
2735 null_row.insert(k.clone(), Value::Null);
2736 } else {
2737 null_row.insert(k.clone(), v.clone());
2738 }
2739 }
2740 filtered.push(null_row);
2741 }
2742 } else {
2743 filtered.extend(group_passed);
2744 }
2745 }
2746
2747 tracing::debug!(
2748 "Filter (OPTIONAL): {} input rows -> {} output rows",
2749 input_matches.len(),
2750 filtered.len()
2751 );
2752
2753 return Ok(filtered);
2754 }
2755
2756 let mut filtered = Vec::new();
2758 for row in input_matches.iter() {
2759 let res = self
2760 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2761 .await?;
2762
2763 let passes = res.as_bool().unwrap_or(false);
2764
2765 if passes {
2766 filtered.push(row.clone());
2767 }
2768 }
2769
2770 tracing::debug!(
2771 "Filter: {} input rows -> {} output rows",
2772 input_matches.len(),
2773 filtered.len()
2774 );
2775
2776 Ok(filtered)
2777 }
2778 LogicalPlan::ProcedureCall {
2779 procedure_name,
2780 arguments,
2781 yield_items,
2782 } => {
2783 let yield_names: Vec<String> =
2784 yield_items.iter().map(|(n, _)| n.clone()).collect();
2785 let results = self
2786 .execute_procedure(
2787 &procedure_name,
2788 &arguments,
2789 &yield_names,
2790 prop_manager,
2791 params,
2792 ctx,
2793 )
2794 .await?;
2795
2796 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2802 if !has_aliases {
2803 Ok(results)
2806 } else {
2807 let mut aliased_results = Vec::with_capacity(results.len());
2808 for row in results {
2809 let mut new_row = HashMap::new();
2810 for (name, alias) in &yield_items {
2811 let col_name = alias.as_ref().unwrap_or(name);
2812 let val = row.get(name).cloned().unwrap_or(Value::Null);
2813 new_row.insert(col_name.clone(), val);
2814 }
2815 aliased_results.push(new_row);
2816 }
2817 Ok(aliased_results)
2818 }
2819 }
2820 LogicalPlan::VectorKnn { .. } => {
2821 unreachable!("VectorKnn is handled by DataFusion engine")
2822 }
2823 LogicalPlan::InvertedIndexLookup { .. } => {
2824 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2825 }
2826 LogicalPlan::Sort { input, order_by } => {
2827 let rows = self
2828 .execute_subplan(*input, prop_manager, params, ctx)
2829 .await?;
2830 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2831 .await
2832 }
2833 LogicalPlan::Limit { input, skip, fetch } => {
2834 let rows = self
2835 .execute_subplan(*input, prop_manager, params, ctx)
2836 .await?;
2837 let skip = skip.unwrap_or(0);
2838 let take = fetch.unwrap_or(usize::MAX);
2839 Ok(rows.into_iter().skip(skip).take(take).collect())
2840 }
2841 LogicalPlan::Aggregate {
2842 input,
2843 group_by,
2844 aggregates,
2845 } => {
2846 let rows = self
2847 .execute_subplan(*input, prop_manager, params, ctx)
2848 .await?;
2849 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2850 .await
2851 }
2852 LogicalPlan::Window {
2853 input,
2854 window_exprs,
2855 } => {
2856 let rows = self
2857 .execute_subplan(*input, prop_manager, params, ctx)
2858 .await?;
2859 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2860 .await
2861 }
2862 LogicalPlan::Project { input, projections } => {
2863 let matches = self
2864 .execute_subplan(*input, prop_manager, params, ctx)
2865 .await?;
2866 self.execute_project(matches, &projections, prop_manager, params, ctx)
2867 .await
2868 }
2869 LogicalPlan::Distinct { input } => {
2870 let rows = self
2871 .execute_subplan(*input, prop_manager, params, ctx)
2872 .await?;
2873 let mut seen = std::collections::HashSet::new();
2874 let mut result = Vec::new();
2875 for row in rows {
2876 let key = Self::canonical_row_key(&row);
2877 if seen.insert(key) {
2878 result.push(row);
2879 }
2880 }
2881 Ok(result)
2882 }
2883 LogicalPlan::Unwind {
2884 input,
2885 expr,
2886 variable,
2887 } => {
2888 let input_rows = self
2889 .execute_subplan(*input, prop_manager, params, ctx)
2890 .await?;
2891 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2892 .await
2893 }
2894 LogicalPlan::Apply {
2895 input,
2896 subquery,
2897 input_filter,
2898 } => {
2899 let input_rows = self
2900 .execute_subplan(*input, prop_manager, params, ctx)
2901 .await?;
2902 self.execute_apply(
2903 input_rows,
2904 &subquery,
2905 input_filter.as_ref(),
2906 prop_manager,
2907 params,
2908 ctx,
2909 )
2910 .await
2911 }
2912 LogicalPlan::SubqueryCall { input, subquery } => {
2913 let input_rows = self
2914 .execute_subplan(*input, prop_manager, params, ctx)
2915 .await?;
2916 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2919 .await
2920 }
2921 LogicalPlan::RecursiveCTE {
2922 cte_name,
2923 initial,
2924 recursive,
2925 } => {
2926 self.execute_recursive_cte(
2927 &cte_name,
2928 *initial,
2929 *recursive,
2930 prop_manager,
2931 params,
2932 ctx,
2933 )
2934 .await
2935 }
2936 LogicalPlan::CrossJoin { left, right } => {
2937 self.execute_cross_join(left, right, prop_manager, params, ctx)
2938 .await
2939 }
2940 LogicalPlan::Set { .. }
2941 | LogicalPlan::Remove { .. }
2942 | LogicalPlan::Merge { .. }
2943 | LogicalPlan::Create { .. }
2944 | LogicalPlan::CreateBatch { .. } => {
2945 unreachable!("mutations are handled by DataFusion engine")
2946 }
2947 LogicalPlan::Delete { .. } => {
2948 unreachable!("mutations are handled by DataFusion engine")
2949 }
2950 LogicalPlan::Copy {
2951 target,
2952 source,
2953 is_export,
2954 options,
2955 } => {
2956 if is_export {
2957 self.execute_export(&target, &source, &options, prop_manager, ctx)
2958 .await
2959 } else {
2960 self.execute_copy(&target, &source, &options, prop_manager)
2961 .await
2962 }
2963 }
2964 LogicalPlan::Backup {
2965 destination,
2966 options,
2967 } => self.execute_backup(&destination, &options).await,
2968 LogicalPlan::Explain { plan } => {
2969 let plan_str = format!("{:#?}", plan);
2970 let mut row = HashMap::new();
2971 row.insert("plan".to_string(), Value::String(plan_str));
2972 Ok(vec![row])
2973 }
2974 LogicalPlan::ShortestPath { .. } => {
2975 unreachable!("ShortestPath is handled by DataFusion engine")
2976 }
2977 LogicalPlan::AllShortestPaths { .. } => {
2978 unreachable!("AllShortestPaths is handled by DataFusion engine")
2979 }
2980 LogicalPlan::Foreach { .. } => {
2981 unreachable!("mutations are handled by DataFusion engine")
2982 }
2983 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2984 LogicalPlan::BindZeroLengthPath { .. } => {
2985 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2986 }
2987 LogicalPlan::BindPath { .. } => {
2988 unreachable!("BindPath is handled by DataFusion engine")
2989 }
2990 LogicalPlan::QuantifiedPattern { .. } => {
2991 unreachable!("QuantifiedPattern is handled by DataFusion engine")
2992 }
2993 LogicalPlan::LocyProgram { .. }
2994 | LogicalPlan::LocyFold { .. }
2995 | LogicalPlan::LocyBestBy { .. }
2996 | LogicalPlan::LocyPriority { .. }
2997 | LogicalPlan::LocyDerivedScan { .. }
2998 | LogicalPlan::LocyProject { .. }
2999 | LogicalPlan::LocyModelInvoke { .. } => {
3000 unreachable!("Locy operators are handled by DataFusion engine")
3001 }
3002 }
3003 })
3004 }
3005
3006 #[expect(clippy::too_many_arguments)]
3011 pub(crate) async fn execute_foreach_body_plan(
3012 &self,
3013 plan: LogicalPlan,
3014 scope: &mut HashMap<String, Value>,
3015 writer: &uni_store::runtime::writer::Writer,
3016 prop_manager: &PropertyManager,
3017 params: &HashMap<String, Value>,
3018 ctx: Option<&QueryContext>,
3019 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3020 ) -> Result<()> {
3021 match plan {
3022 LogicalPlan::Set { items, .. } => {
3023 self.execute_set_items_locked(
3024 &items,
3025 scope,
3026 writer,
3027 prop_manager,
3028 params,
3029 ctx,
3030 tx_l0,
3031 &crate::query::df_graph::mutation_common::Prefetch::default(),
3032 )
3033 .await?;
3034 }
3035 LogicalPlan::Remove { items, .. } => {
3036 self.execute_remove_items_locked(
3037 &items,
3038 scope,
3039 writer,
3040 prop_manager,
3041 ctx,
3042 tx_l0,
3043 &crate::query::df_graph::mutation_common::Prefetch::default(),
3044 )
3045 .await?;
3046 }
3047 LogicalPlan::Delete { items, detach, .. } => {
3048 for expr in &items {
3049 let val = self
3050 .evaluate_expr(expr, scope, prop_manager, params, ctx)
3051 .await?;
3052 self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3053 .await?;
3054 }
3055 }
3056 LogicalPlan::Create { pattern, .. } => {
3057 self.execute_create_pattern(
3058 &pattern,
3059 scope,
3060 writer,
3061 prop_manager,
3062 params,
3063 ctx,
3064 tx_l0,
3065 )
3066 .await?;
3067 }
3068 LogicalPlan::CreateBatch { patterns, .. } => {
3069 for pattern in &patterns {
3070 self.execute_create_pattern(
3071 pattern,
3072 scope,
3073 writer,
3074 prop_manager,
3075 params,
3076 ctx,
3077 tx_l0,
3078 )
3079 .await?;
3080 }
3081 }
3082 LogicalPlan::Merge {
3083 pattern,
3084 on_match: _,
3085 on_create,
3086 ..
3087 } => {
3088 self.execute_create_pattern(
3089 &pattern,
3090 scope,
3091 writer,
3092 prop_manager,
3093 params,
3094 ctx,
3095 tx_l0,
3096 )
3097 .await?;
3098 if let Some(on_create_clause) = on_create {
3099 self.execute_set_items_locked(
3100 &on_create_clause.items,
3101 scope,
3102 writer,
3103 prop_manager,
3104 params,
3105 ctx,
3106 tx_l0,
3107 &crate::query::df_graph::mutation_common::Prefetch::default(),
3108 )
3109 .await?;
3110 }
3111 }
3112 LogicalPlan::Foreach {
3113 variable,
3114 list,
3115 body,
3116 ..
3117 } => {
3118 let list_val = self
3119 .evaluate_expr(&list, scope, prop_manager, params, ctx)
3120 .await?;
3121 let items = match list_val {
3122 Value::List(arr) => arr,
3123 Value::Null => return Ok(()),
3124 _ => return Err(anyhow!("FOREACH requires a list")),
3125 };
3126 for item in items {
3127 let mut nested_scope = scope.clone();
3128 nested_scope.insert(variable.clone(), item);
3129 for nested_plan in &body {
3130 Box::pin(self.execute_foreach_body_plan(
3131 nested_plan.clone(),
3132 &mut nested_scope,
3133 writer,
3134 prop_manager,
3135 params,
3136 ctx,
3137 tx_l0,
3138 ))
3139 .await?;
3140 }
3141 }
3142 }
3143 _ => {
3144 return Err(anyhow!(
3145 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3146 ));
3147 }
3148 }
3149 Ok(())
3150 }
3151
3152 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3153 let mut pairs: Vec<_> = row.iter().collect();
3154 pairs.sort_by_key(|(k, _)| *k);
3155
3156 pairs
3157 .into_iter()
3158 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3159 .collect::<Vec<_>>()
3160 .join("|")
3161 }
3162
3163 fn canonical_value_key(v: &Value) -> String {
3164 match v {
3165 Value::Null => "null".to_string(),
3166 Value::Bool(b) => format!("b:{b}"),
3167 Value::Int(i) => format!("n:{i}"),
3168 Value::Float(f) => {
3169 if f.is_nan() {
3170 "nan".to_string()
3171 } else if f.is_infinite() {
3172 if f.is_sign_positive() {
3173 "inf:+".to_string()
3174 } else {
3175 "inf:-".to_string()
3176 }
3177 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3178 format!("n:{}", *f as i64)
3179 } else {
3180 format!("f:{f}")
3181 }
3182 }
3183 Value::String(s) => {
3184 if let Some(k) = Self::temporal_string_key(s) {
3185 format!("temporal:{k}")
3186 } else {
3187 format!("s:{s}")
3188 }
3189 }
3190 Value::Bytes(b) => format!("bytes:{:?}", b),
3191 Value::List(items) => format!(
3192 "list:[{}]",
3193 items
3194 .iter()
3195 .map(Self::canonical_value_key)
3196 .collect::<Vec<_>>()
3197 .join(",")
3198 ),
3199 Value::Map(map) => {
3200 let mut pairs: Vec<_> = map.iter().collect();
3201 pairs.sort_by_key(|(k, _)| *k);
3202 format!(
3203 "map:{{{}}}",
3204 pairs
3205 .into_iter()
3206 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3207 .collect::<Vec<_>>()
3208 .join(",")
3209 )
3210 }
3211 Value::Node(n) => {
3212 let mut labels = n.labels.clone();
3213 labels.sort();
3214 format!(
3215 "node:{}:{}:{}",
3216 n.vid.as_u64(),
3217 labels.join(":"),
3218 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3219 )
3220 }
3221 Value::Edge(e) => format!(
3222 "edge:{}:{}:{}:{}:{}",
3223 e.eid.as_u64(),
3224 e.edge_type,
3225 e.src.as_u64(),
3226 e.dst.as_u64(),
3227 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3228 ),
3229 Value::Path(p) => format!(
3230 "path:nodes=[{}];edges=[{}]",
3231 p.nodes
3232 .iter()
3233 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3234 .collect::<Vec<_>>()
3235 .join(","),
3236 p.edges
3237 .iter()
3238 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3239 .collect::<Vec<_>>()
3240 .join(",")
3241 ),
3242 Value::Vector(vs) => format!("vec:{:?}", vs),
3243 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3244 _ => format!("{v:?}"),
3245 }
3246 }
3247
3248 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3249 match t {
3250 uni_common::TemporalValue::Date { days_since_epoch } => {
3251 format!("date:{days_since_epoch}")
3252 }
3253 uni_common::TemporalValue::LocalTime {
3254 nanos_since_midnight,
3255 } => format!("localtime:{nanos_since_midnight}"),
3256 uni_common::TemporalValue::Time {
3257 nanos_since_midnight,
3258 offset_seconds,
3259 } => {
3260 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3261 format!("time:{utc_nanos}")
3262 }
3263 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3264 format!("localdatetime:{nanos_since_epoch}")
3265 }
3266 uni_common::TemporalValue::DateTime {
3267 nanos_since_epoch, ..
3268 } => format!("datetime:{nanos_since_epoch}"),
3269 uni_common::TemporalValue::Duration {
3270 months,
3271 days,
3272 nanos,
3273 } => format!("duration:{months}:{days}:{nanos}"),
3274 uni_common::TemporalValue::Btic { lo, hi, meta } => {
3275 format!("btic:{lo}:{hi}:{meta}")
3276 }
3277 }
3278 }
3279
3280 fn temporal_string_key(s: &str) -> Option<String> {
3281 let fn_name = match classify_temporal(s)? {
3282 uni_common::TemporalType::Date => "DATE",
3283 uni_common::TemporalType::LocalTime => "LOCALTIME",
3284 uni_common::TemporalType::Time => "TIME",
3285 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3286 uni_common::TemporalType::DateTime => "DATETIME",
3287 uni_common::TemporalType::Duration => "DURATION",
3288 uni_common::TemporalType::Btic => return None, };
3290 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3291 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3292 _ => None,
3293 }
3294 }
3295
3296 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3299
3300 pub(crate) async fn execute_aggregate(
3301 &self,
3302 rows: Vec<HashMap<String, Value>>,
3303 group_by: &[Expr],
3304 aggregates: &[Expr],
3305 prop_manager: &PropertyManager,
3306 params: &HashMap<String, Value>,
3307 ctx: Option<&QueryContext>,
3308 ) -> Result<Vec<HashMap<String, Value>>> {
3309 if let Some(ctx) = ctx {
3311 ctx.check_timeout()?;
3312 }
3313
3314 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3315
3316 if rows.is_empty() {
3319 if group_by.is_empty() {
3320 let accs = Self::create_accumulators(aggregates);
3321 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3322 return Ok(vec![row]);
3323 }
3324 return Ok(vec![]);
3325 }
3326
3327 for (idx, row) in rows.into_iter().enumerate() {
3328 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3330 && let Some(ctx) = ctx
3331 {
3332 ctx.check_timeout()?;
3333 }
3334
3335 let key_vals = self
3336 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3337 .await?;
3338 let key_str = format!(
3341 "[{}]",
3342 key_vals
3343 .iter()
3344 .map(Self::canonical_value_key)
3345 .collect::<Vec<_>>()
3346 .join(",")
3347 );
3348
3349 let entry = groups
3350 .entry(key_str)
3351 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3352
3353 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3354 .await?;
3355 }
3356
3357 let results = groups
3358 .values()
3359 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3360 .collect();
3361
3362 Ok(results)
3363 }
3364
3365 pub(crate) async fn execute_window(
3366 &self,
3367 mut rows: Vec<HashMap<String, Value>>,
3368 window_exprs: &[Expr],
3369 _prop_manager: &PropertyManager,
3370 _params: &HashMap<String, Value>,
3371 ctx: Option<&QueryContext>,
3372 ) -> Result<Vec<HashMap<String, Value>>> {
3373 if let Some(ctx) = ctx {
3375 ctx.check_timeout()?;
3376 }
3377
3378 if rows.is_empty() || window_exprs.is_empty() {
3380 return Ok(rows);
3381 }
3382
3383 for window_expr in window_exprs {
3385 let Expr::FunctionCall {
3387 name,
3388 args,
3389 window_spec: Some(window_spec),
3390 ..
3391 } = window_expr
3392 else {
3393 return Err(anyhow!(
3394 "Window expression must be a FunctionCall with OVER clause: {:?}",
3395 window_expr
3396 ));
3397 };
3398
3399 let name_upper = name.to_uppercase();
3400
3401 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3403 return Err(anyhow!(
3404 "Unsupported window function: {}. Supported functions: {}",
3405 name,
3406 WINDOW_FUNCTIONS.join(", ")
3407 ));
3408 }
3409
3410 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3412
3413 for (row_idx, row) in rows.iter().enumerate() {
3414 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3416 vec![]
3418 } else {
3419 window_spec
3420 .partition_by
3421 .iter()
3422 .map(|expr| self.evaluate_simple_expr(expr, row))
3423 .collect::<Result<Vec<_>>>()?
3424 };
3425
3426 partition_map
3427 .entry(partition_key)
3428 .or_default()
3429 .push(row_idx);
3430 }
3431
3432 for (_partition_key, row_indices) in partition_map.iter_mut() {
3434 if !window_spec.order_by.is_empty() {
3436 row_indices.sort_by(|&a, &b| {
3437 for sort_item in &window_spec.order_by {
3438 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3439 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3440
3441 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3442 let cmp = Executor::compare_values(&va, &vb);
3443 let cmp = if sort_item.ascending {
3444 cmp
3445 } else {
3446 cmp.reverse()
3447 };
3448 if cmp != std::cmp::Ordering::Equal {
3449 return cmp;
3450 }
3451 }
3452 }
3453 std::cmp::Ordering::Equal
3454 });
3455 }
3456
3457 for (position, &row_idx) in row_indices.iter().enumerate() {
3459 let window_value = match name_upper.as_str() {
3460 "ROW_NUMBER" => Value::from((position + 1) as i64),
3461 "RANK" => {
3462 let rank = if position == 0 {
3464 1i64
3465 } else {
3466 let prev_row_idx = row_indices[position - 1];
3467 let same_as_prev = self.rows_have_same_sort_keys(
3468 &window_spec.order_by,
3469 &rows,
3470 row_idx,
3471 prev_row_idx,
3472 );
3473
3474 if same_as_prev {
3475 let mut group_start = position - 1;
3477 while group_start > 0 {
3478 let curr_idx = row_indices[group_start];
3479 let prev_idx = row_indices[group_start - 1];
3480 if !self.rows_have_same_sort_keys(
3481 &window_spec.order_by,
3482 &rows,
3483 curr_idx,
3484 prev_idx,
3485 ) {
3486 break;
3487 }
3488 group_start -= 1;
3489 }
3490 (group_start + 1) as i64
3491 } else {
3492 (position + 1) as i64
3493 }
3494 };
3495 Value::from(rank)
3496 }
3497 "DENSE_RANK" => {
3498 let mut dense_rank = 1i64;
3500 for i in 0..position {
3501 let curr_idx = row_indices[i + 1];
3502 let prev_idx = row_indices[i];
3503 if !self.rows_have_same_sort_keys(
3504 &window_spec.order_by,
3505 &rows,
3506 curr_idx,
3507 prev_idx,
3508 ) {
3509 dense_rank += 1;
3510 }
3511 }
3512 Value::from(dense_rank)
3513 }
3514 "LAG" => {
3515 let (value_expr, offset, default_value) =
3516 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3517
3518 if position >= offset {
3519 let target_idx = row_indices[position - offset];
3520 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3521 } else {
3522 default_value
3523 }
3524 }
3525 "LEAD" => {
3526 let (value_expr, offset, default_value) =
3527 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3528
3529 if position + offset < row_indices.len() {
3530 let target_idx = row_indices[position + offset];
3531 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3532 } else {
3533 default_value
3534 }
3535 }
3536 "NTILE" => {
3537 let num_buckets_expr = args.first().ok_or_else(|| {
3539 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3540 })?;
3541 let num_buckets_val =
3542 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3543 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3544 anyhow!(
3545 "NTILE argument must be an integer, got: {:?}",
3546 num_buckets_val
3547 )
3548 })?;
3549
3550 if num_buckets <= 0 {
3551 return Err(anyhow!(
3552 "NTILE bucket count must be positive, got: {}",
3553 num_buckets
3554 ));
3555 }
3556
3557 let num_buckets = num_buckets as usize;
3558 let partition_size = row_indices.len();
3559
3560 let base_size = partition_size / num_buckets;
3565 let extra_rows = partition_size % num_buckets;
3566
3567 let bucket = if position < extra_rows * (base_size + 1) {
3569 position / (base_size + 1) + 1
3571 } else {
3572 let adjusted_position = position - extra_rows * (base_size + 1);
3574 extra_rows + (adjusted_position / base_size) + 1
3575 };
3576
3577 Value::from(bucket as i64)
3578 }
3579 "FIRST_VALUE" => {
3580 let value_expr = args.first().ok_or_else(|| {
3582 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3583 })?;
3584
3585 if row_indices.is_empty() {
3587 Value::Null
3588 } else {
3589 let first_idx = row_indices[0];
3590 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3591 }
3592 }
3593 "LAST_VALUE" => {
3594 let value_expr = args.first().ok_or_else(|| {
3596 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3597 })?;
3598
3599 if row_indices.is_empty() {
3601 Value::Null
3602 } else {
3603 let last_idx = row_indices[row_indices.len() - 1];
3604 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3605 }
3606 }
3607 "NTH_VALUE" => {
3608 if args.len() != 2 {
3610 return Err(anyhow!(
3611 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3612 ));
3613 }
3614
3615 let value_expr = &args[0];
3616 let n_expr = &args[1];
3617
3618 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3619 let n = n_val.as_i64().ok_or_else(|| {
3620 anyhow!(
3621 "NTH_VALUE second argument must be an integer, got: {:?}",
3622 n_val
3623 )
3624 })?;
3625
3626 if n <= 0 {
3627 return Err(anyhow!(
3628 "NTH_VALUE position must be positive, got: {}",
3629 n
3630 ));
3631 }
3632
3633 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3635 let nth_idx = row_indices[nth_index];
3636 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3637 } else {
3638 Value::Null
3639 }
3640 }
3641 _ => unreachable!("Window function {} already validated", name),
3642 };
3643
3644 let col_name = window_expr.to_string_repr();
3647 rows[row_idx].insert(col_name, window_value);
3648 }
3649 }
3650 }
3651
3652 Ok(rows)
3653 }
3654
3655 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3660 match expr {
3661 Expr::Variable(name) => row
3662 .get(name)
3663 .cloned()
3664 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3665 Expr::Property(base, prop) => {
3666 let base_val = self.evaluate_simple_expr(base, row)?;
3667 if let Value::Map(map) = base_val {
3668 map.get(prop)
3669 .cloned()
3670 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3671 } else {
3672 Err(anyhow!("Cannot access property on non-object"))
3673 }
3674 }
3675 Expr::Literal(lit) => Ok(lit.to_value()),
3676 _ => Err(anyhow!(
3677 "Unsupported expression in window function: {:?}",
3678 expr
3679 )),
3680 }
3681 }
3682
3683 fn rows_have_same_sort_keys(
3685 &self,
3686 order_by: &[uni_cypher::ast::SortItem],
3687 rows: &[HashMap<String, Value>],
3688 idx_a: usize,
3689 idx_b: usize,
3690 ) -> bool {
3691 order_by.iter().all(|sort_item| {
3692 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3693 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3694 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3695 })
3696 }
3697
3698 fn extract_lag_lead_params<'a>(
3700 &self,
3701 func_name: &str,
3702 args: &'a [Expr],
3703 row: &HashMap<String, Value>,
3704 ) -> Result<(&'a Expr, usize, Value)> {
3705 let value_expr = args.first().ok_or_else(|| {
3706 anyhow!(
3707 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3708 func_name,
3709 func_name
3710 )
3711 })?;
3712
3713 let offset = if let Some(offset_expr) = args.get(1) {
3714 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3715 offset_val.as_i64().ok_or_else(|| {
3716 anyhow!(
3717 "{} offset must be an integer, got: {:?}",
3718 func_name,
3719 offset_val
3720 )
3721 })? as usize
3722 } else {
3723 1
3724 };
3725
3726 let default_value = if let Some(default_expr) = args.get(2) {
3727 self.evaluate_simple_expr(default_expr, row)?
3728 } else {
3729 Value::Null
3730 };
3731
3732 Ok((value_expr, offset, default_value))
3733 }
3734
3735 pub(crate) async fn evaluate_group_keys(
3737 &self,
3738 group_by: &[Expr],
3739 row: &HashMap<String, Value>,
3740 prop_manager: &PropertyManager,
3741 params: &HashMap<String, Value>,
3742 ctx: Option<&QueryContext>,
3743 ) -> Result<Vec<Value>> {
3744 let mut key_vals = Vec::new();
3745 for expr in group_by {
3746 key_vals.push(
3747 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3748 .await?,
3749 );
3750 }
3751 Ok(key_vals)
3752 }
3753
3754 pub(crate) async fn update_accumulators(
3756 &self,
3757 accs: &mut [Accumulator],
3758 aggregates: &[Expr],
3759 row: &HashMap<String, Value>,
3760 prop_manager: &PropertyManager,
3761 params: &HashMap<String, Value>,
3762 ctx: Option<&QueryContext>,
3763 ) -> Result<()> {
3764 for (i, agg_expr) in aggregates.iter().enumerate() {
3765 if let Expr::FunctionCall { args, .. } = agg_expr {
3766 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3767 let val = if is_wildcard {
3768 Value::Null
3769 } else {
3770 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3771 .await?
3772 };
3773 accs[i].update(&val, is_wildcard);
3774 }
3775 }
3776 Ok(())
3777 }
3778
3779 pub(crate) async fn execute_recursive_cte(
3781 &self,
3782 cte_name: &str,
3783 initial: LogicalPlan,
3784 recursive: LogicalPlan,
3785 prop_manager: &PropertyManager,
3786 params: &HashMap<String, Value>,
3787 ctx: Option<&QueryContext>,
3788 ) -> Result<Vec<HashMap<String, Value>>> {
3789 use std::collections::HashSet;
3790
3791 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3794 let mut pairs: Vec<_> = row.iter().collect();
3795 pairs.sort_by(|a, b| a.0.cmp(b.0));
3796 format!("{pairs:?}")
3797 }
3798
3799 let mut working_table = self
3801 .execute_subplan(initial, prop_manager, params, ctx)
3802 .await?;
3803 let mut result_table = working_table.clone();
3804
3805 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3807
3808 let max_iterations = self.config.max_recursive_cte_iterations;
3811 for _iteration in 0..max_iterations {
3812 if let Some(ctx) = ctx {
3814 ctx.check_timeout()?;
3815 }
3816
3817 if working_table.is_empty() {
3818 break;
3819 }
3820
3821 let working_val = Value::List(
3823 working_table
3824 .iter()
3825 .map(|row| {
3826 if row.len() == 1 {
3827 row.values().next().unwrap().clone()
3828 } else {
3829 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3830 }
3831 })
3832 .collect(),
3833 );
3834
3835 let mut next_params = params.clone();
3836 next_params.insert(cte_name.to_string(), working_val);
3837
3838 let next_result = self
3840 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3841 .await?;
3842
3843 if next_result.is_empty() {
3844 break;
3845 }
3846
3847 let new_rows: Vec<_> = next_result
3849 .into_iter()
3850 .filter(|row| {
3851 let key = row_key(row);
3852 seen.insert(key) })
3854 .collect();
3855
3856 if new_rows.is_empty() {
3857 break;
3859 }
3860
3861 result_table.extend(new_rows.clone());
3862 working_table = new_rows;
3863 }
3864
3865 let final_list = Value::List(
3867 result_table
3868 .into_iter()
3869 .map(|row| {
3870 if row.len() == 1 {
3882 row.values().next().unwrap().clone()
3883 } else {
3884 Value::Map(row.into_iter().collect())
3885 }
3886 })
3887 .collect(),
3888 );
3889
3890 let mut final_row = HashMap::new();
3891 final_row.insert(cte_name.to_string(), final_list);
3892 Ok(vec![final_row])
3893 }
3894
3895 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3897
3898 pub(crate) async fn execute_sort(
3899 &self,
3900 rows: Vec<HashMap<String, Value>>,
3901 order_by: &[uni_cypher::ast::SortItem],
3902 prop_manager: &PropertyManager,
3903 params: &HashMap<String, Value>,
3904 ctx: Option<&QueryContext>,
3905 ) -> Result<Vec<HashMap<String, Value>>> {
3906 if let Some(ctx) = ctx {
3908 ctx.check_timeout()?;
3909 }
3910
3911 let mut rows_with_keys = Vec::with_capacity(rows.len());
3912 for (idx, row) in rows.into_iter().enumerate() {
3913 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3915 && let Some(ctx) = ctx
3916 {
3917 ctx.check_timeout()?;
3918 }
3919
3920 let mut keys = Vec::new();
3921 for item in order_by {
3922 let val = row
3923 .get(&item.expr.to_string_repr())
3924 .cloned()
3925 .unwrap_or(Value::Null);
3926 let val = if val.is_null() {
3927 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3928 .await
3929 .unwrap_or(Value::Null)
3930 } else {
3931 val
3932 };
3933 keys.push(val);
3934 }
3935 rows_with_keys.push((row, keys));
3936 }
3937
3938 if let Some(ctx) = ctx {
3940 ctx.check_timeout()?;
3941 }
3942
3943 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3944
3945 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3946 }
3947
3948 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3950 aggregates
3951 .iter()
3952 .map(|expr| {
3953 if let Expr::FunctionCall { name, distinct, .. } = expr {
3954 Accumulator::new(name, *distinct)
3955 } else {
3956 Accumulator::new("COUNT", false)
3957 }
3958 })
3959 .collect()
3960 }
3961
3962 pub(crate) fn build_aggregate_result(
3964 group_by: &[Expr],
3965 aggregates: &[Expr],
3966 key_vals: &[Value],
3967 accs: &[Accumulator],
3968 ) -> HashMap<String, Value> {
3969 let mut res_row = HashMap::new();
3970 for (i, expr) in group_by.iter().enumerate() {
3971 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3972 }
3973 for (i, expr) in aggregates.iter().enumerate() {
3974 let col_name = crate::query::planner::aggregate_column_name(expr);
3976 res_row.insert(col_name, accs[i].finish());
3977 }
3978 res_row
3979 }
3980
3981 pub(crate) fn compare_sort_keys(
3983 a_keys: &[Value],
3984 b_keys: &[Value],
3985 order_by: &[uni_cypher::ast::SortItem],
3986 ) -> std::cmp::Ordering {
3987 for (i, item) in order_by.iter().enumerate() {
3988 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3989 if order != std::cmp::Ordering::Equal {
3990 return if item.ascending {
3991 order
3992 } else {
3993 order.reverse()
3994 };
3995 }
3996 }
3997 std::cmp::Ordering::Equal
3998 }
3999
4000 pub(crate) async fn execute_backup(
4004 &self,
4005 destination: &str,
4006 _options: &HashMap<String, Value>,
4007 ) -> Result<Vec<HashMap<String, Value>>> {
4008 if let Some(writer_arc) = &self.writer {
4010 let writer: &uni_store::Writer = writer_arc.as_ref();
4011 writer.flush_to_l1(None).await?;
4012 }
4013
4014 let snapshot_manager = self.storage.snapshot_manager();
4016 let snapshot = snapshot_manager
4017 .load_latest_snapshot()
4018 .await?
4019 .ok_or_else(|| anyhow!("No snapshot found"))?;
4020
4021 if is_cloud_url(destination) {
4023 self.backup_to_cloud(destination, &snapshot.snapshot_id)
4024 .await?;
4025 } else {
4026 let validated_dest = self.validate_path(destination)?;
4028 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4029 .await?;
4030 }
4031
4032 let mut res = HashMap::new();
4033 res.insert(
4034 "status".to_string(),
4035 Value::String("Backup completed".to_string()),
4036 );
4037 res.insert(
4038 "snapshot_id".to_string(),
4039 Value::String(snapshot.snapshot_id),
4040 );
4041 Ok(vec![res])
4042 }
4043
4044 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4046 let source_path = std::path::Path::new(self.storage.base_path());
4047
4048 if !dest_path.exists() {
4049 std::fs::create_dir_all(dest_path)?;
4050 }
4051
4052 if source_path.exists() {
4054 Self::copy_dir_all(source_path, dest_path)?;
4055 }
4056
4057 let schema_manager = self.storage.schema_manager();
4059 let dest_catalog = dest_path.join("catalog");
4060 if !dest_catalog.exists() {
4061 std::fs::create_dir_all(&dest_catalog)?;
4062 }
4063
4064 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4065 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4066
4067 Ok(())
4068 }
4069
4070 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4074 use object_store::ObjectStore;
4075 use object_store::ObjectStoreExt;
4076 use object_store::local::LocalFileSystem;
4077 use object_store::path::Path as ObjPath;
4078
4079 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4080 let source_path = std::path::Path::new(self.storage.base_path());
4081
4082 let src_store: Arc<dyn ObjectStore> =
4084 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4085
4086 let catalog_src = ObjPath::from("catalog");
4088 let catalog_dst = if dest_prefix.as_ref().is_empty() {
4089 ObjPath::from("catalog")
4090 } else {
4091 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4092 };
4093 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4094
4095 let storage_src = ObjPath::from("storage");
4097 let storage_dst = if dest_prefix.as_ref().is_empty() {
4098 ObjPath::from("storage")
4099 } else {
4100 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4101 };
4102 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4103
4104 let schema_manager = self.storage.schema_manager();
4106 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4107 let schema_path = if dest_prefix.as_ref().is_empty() {
4108 ObjPath::from("catalog/schema.json")
4109 } else {
4110 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4111 };
4112 dest_store
4113 .put(&schema_path, bytes::Bytes::from(schema_content).into())
4114 .await?;
4115
4116 Ok(())
4117 }
4118
4119 const MAX_BACKUP_DEPTH: usize = 100;
4124
4125 const MAX_BACKUP_FILES: usize = 100_000;
4130
4131 pub(crate) fn copy_dir_all(
4139 src: &std::path::Path,
4140 dst: &std::path::Path,
4141 ) -> std::io::Result<()> {
4142 let mut file_count = 0usize;
4143 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4144 }
4145
4146 pub(crate) fn copy_dir_all_impl(
4148 src: &std::path::Path,
4149 dst: &std::path::Path,
4150 depth: usize,
4151 file_count: &mut usize,
4152 ) -> std::io::Result<()> {
4153 if depth >= Self::MAX_BACKUP_DEPTH {
4154 return Err(std::io::Error::new(
4155 std::io::ErrorKind::InvalidInput,
4156 format!(
4157 "Maximum backup depth {} exceeded at {:?}",
4158 Self::MAX_BACKUP_DEPTH,
4159 src
4160 ),
4161 ));
4162 }
4163
4164 std::fs::create_dir_all(dst)?;
4165
4166 for entry in std::fs::read_dir(src)? {
4167 if *file_count >= Self::MAX_BACKUP_FILES {
4168 return Err(std::io::Error::new(
4169 std::io::ErrorKind::InvalidInput,
4170 format!(
4171 "Maximum backup file count {} exceeded",
4172 Self::MAX_BACKUP_FILES
4173 ),
4174 ));
4175 }
4176 *file_count += 1;
4177
4178 let entry = entry?;
4179 let metadata = entry.metadata()?;
4180
4181 if metadata.file_type().is_symlink() {
4183 continue;
4185 }
4186
4187 let dst_path = dst.join(entry.file_name());
4188 if metadata.is_dir() {
4189 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4190 } else {
4191 std::fs::copy(entry.path(), dst_path)?;
4192 }
4193 }
4194 Ok(())
4195 }
4196
4197 pub(crate) async fn execute_copy(
4198 &self,
4199 target: &str,
4200 source: &str,
4201 options: &HashMap<String, Value>,
4202 prop_manager: &PropertyManager,
4203 ) -> Result<Vec<HashMap<String, Value>>> {
4204 let format = options
4205 .get("format")
4206 .and_then(|v| v.as_str())
4207 .unwrap_or_else(|| {
4208 if source.ends_with(".parquet") {
4209 "parquet"
4210 } else {
4211 "csv"
4212 }
4213 });
4214
4215 match format.to_lowercase().as_str() {
4216 "csv" => self.execute_csv_import(target, source, options).await,
4217 "parquet" => {
4218 self.execute_parquet_import(target, source, options, prop_manager)
4219 .await
4220 }
4221 _ => Err(anyhow!("Unsupported format: {}", format)),
4222 }
4223 }
4224
4225 pub(crate) async fn execute_csv_import(
4226 &self,
4227 target: &str,
4228 source: &str,
4229 options: &HashMap<String, Value>,
4230 ) -> Result<Vec<HashMap<String, Value>>> {
4231 let validated_source = self.validate_path(source)?;
4233
4234 let writer_lock = self
4235 .writer
4236 .as_ref()
4237 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4238
4239 let schema = self.storage.schema_manager().schema();
4240
4241 let label_meta = schema.labels.get(target);
4243 let edge_meta = schema.edge_types.get(target);
4244
4245 if label_meta.is_none() && edge_meta.is_none() {
4246 return Err(anyhow!("Target '{}' not found in schema", target));
4247 }
4248
4249 let delimiter_str = options
4251 .get("delimiter")
4252 .and_then(|v| v.as_str())
4253 .unwrap_or(",");
4254 let delimiter = if delimiter_str.is_empty() {
4255 b','
4256 } else {
4257 delimiter_str.as_bytes()[0]
4258 };
4259 let has_header = options
4260 .get("header")
4261 .and_then(|v| v.as_bool())
4262 .unwrap_or(true);
4263
4264 let mut rdr = csv::ReaderBuilder::new()
4265 .delimiter(delimiter)
4266 .has_headers(has_header)
4267 .from_path(&validated_source)?;
4268
4269 let headers = rdr.headers()?.clone();
4270 let mut count = 0;
4271
4272 let writer: &uni_store::Writer = writer_lock.as_ref();
4273
4274 const CSV_ID_CHUNK: usize = 256;
4278
4279 if label_meta.is_some() {
4280 let target_props = schema
4281 .properties
4282 .get(target)
4283 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4284
4285 let mut vid_chunk: std::collections::VecDeque<Vid> =
4286 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4287 for result in rdr.records() {
4288 let record = result?;
4289 let mut props = HashMap::new();
4290
4291 for (i, header) in headers.iter().enumerate() {
4292 if let Some(val_str) = record.get(i)
4293 && let Some(prop_meta) = target_props.get(header)
4294 {
4295 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4296 props.insert(header.to_string(), val);
4297 }
4298 }
4299
4300 if vid_chunk.is_empty() {
4301 vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4302 }
4303 let vid = vid_chunk.pop_front().unwrap();
4304 writer
4305 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4306 .await?;
4307 count += 1;
4308 }
4309 } else if let Some(meta) = edge_meta {
4310 let type_id = meta.id;
4311 let target_props = schema
4312 .properties
4313 .get(target)
4314 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4315
4316 let src_col = options
4319 .get("src_col")
4320 .and_then(|v| v.as_str())
4321 .unwrap_or("_src");
4322 let dst_col = options
4323 .get("dst_col")
4324 .and_then(|v| v.as_str())
4325 .unwrap_or("_dst");
4326
4327 let mut eid_chunk: std::collections::VecDeque<Eid> =
4328 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4329 for result in rdr.records() {
4330 let record = result?;
4331 let mut props = HashMap::new();
4332 let mut src_vid = None;
4333 let mut dst_vid = None;
4334
4335 for (i, header) in headers.iter().enumerate() {
4336 if let Some(val_str) = record.get(i) {
4337 if header == src_col {
4338 src_vid =
4339 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4340 } else if header == dst_col {
4341 dst_vid =
4342 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4343 } else if let Some(prop_meta) = target_props.get(header) {
4344 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4345 props.insert(header.to_string(), val);
4346 }
4347 }
4348 }
4349
4350 let src =
4351 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4352 let dst = dst_vid
4353 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4354
4355 if eid_chunk.is_empty() {
4356 eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4357 }
4358 let eid = eid_chunk.pop_front().unwrap();
4359 writer
4360 .insert_edge(
4361 src,
4362 dst,
4363 type_id,
4364 eid,
4365 props,
4366 Some(target.to_string()),
4367 None,
4368 )
4369 .await?;
4370 count += 1;
4371 }
4372 }
4373
4374 let mut res = HashMap::new();
4375 res.insert("count".to_string(), Value::Int(count as i64));
4376 Ok(vec![res])
4377 }
4378
4379 pub(crate) async fn execute_parquet_import(
4383 &self,
4384 target: &str,
4385 source: &str,
4386 options: &HashMap<String, Value>,
4387 _prop_manager: &PropertyManager,
4388 ) -> Result<Vec<HashMap<String, Value>>> {
4389 let writer_lock = self
4390 .writer
4391 .as_ref()
4392 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4393
4394 let schema = self.storage.schema_manager().schema();
4395
4396 let label_meta = schema.labels.get(target);
4398 let edge_meta = schema.edge_types.get(target);
4399
4400 if label_meta.is_none() && edge_meta.is_none() {
4401 return Err(anyhow!("Target '{}' not found in schema", target));
4402 }
4403
4404 let reader = if is_cloud_url(source) {
4406 self.open_parquet_from_cloud(source).await?
4407 } else {
4408 let validated_source = self.validate_path(source)?;
4410 let file = std::fs::File::open(&validated_source)?;
4411 let builder =
4412 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4413 builder.build()?
4414 };
4415 let mut reader = reader;
4416
4417 let mut count = 0;
4418 let writer: &uni_store::Writer = writer_lock.as_ref();
4419
4420 if label_meta.is_some() {
4421 let target_props = schema
4422 .properties
4423 .get(target)
4424 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4425
4426 for batch in reader.by_ref() {
4427 let batch = batch?;
4428 let num_rows = batch.num_rows();
4429 let vids = writer.allocate_vids(num_rows).await?;
4431 for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4432 let mut props = HashMap::new();
4433 for field in batch.schema().fields() {
4434 let name = field.name();
4435 if target_props.contains_key(name) {
4436 let col = batch.column_by_name(name).unwrap();
4437 if !col.is_null(row) {
4438 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4440 let val =
4441 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4442 props.insert(name.clone(), val);
4443 }
4444 }
4445 }
4446 writer
4447 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4448 .await?;
4449 count += 1;
4450 }
4451 }
4452 } else if let Some(meta) = edge_meta {
4453 let type_id = meta.id;
4454 let target_props = schema
4455 .properties
4456 .get(target)
4457 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4458
4459 let src_col = options
4460 .get("src_col")
4461 .and_then(|v| v.as_str())
4462 .unwrap_or("_src");
4463 let dst_col = options
4464 .get("dst_col")
4465 .and_then(|v| v.as_str())
4466 .unwrap_or("_dst");
4467
4468 for batch in reader {
4469 let batch = batch?;
4470 let num_rows = batch.num_rows();
4471 let eids = writer.allocate_eids(num_rows).await?;
4473 for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4474 let mut props = HashMap::new();
4475 let mut src_vid = None;
4476 let mut dst_vid = None;
4477
4478 for field in batch.schema().fields() {
4479 let name = field.name();
4480 let col = batch.column_by_name(name).unwrap();
4481 if col.is_null(row) {
4482 continue;
4483 }
4484
4485 if name == src_col {
4486 let val = Self::arrow_to_value(col.as_ref(), row);
4487 src_vid = Some(Self::vid_from_value(&val)?);
4488 } else if name == dst_col {
4489 let val = Self::arrow_to_value(col.as_ref(), row);
4490 dst_vid = Some(Self::vid_from_value(&val)?);
4491 } else if let Some(pm) = target_props.get(name) {
4492 let val =
4494 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4495 props.insert(name.clone(), val);
4496 }
4497 }
4498
4499 let src = src_vid
4500 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4501 let dst = dst_vid.ok_or_else(|| {
4502 anyhow!("Missing destination VID in column '{}'", dst_col)
4503 })?;
4504
4505 writer
4506 .insert_edge(
4507 src,
4508 dst,
4509 type_id,
4510 eid,
4511 props,
4512 Some(target.to_string()),
4513 None,
4514 )
4515 .await?;
4516 count += 1;
4517 }
4518 }
4519 }
4520
4521 let mut res = HashMap::new();
4522 res.insert("count".to_string(), Value::Int(count as i64));
4523 Ok(vec![res])
4524 }
4525
4526 async fn open_parquet_from_cloud(
4530 &self,
4531 source_url: &str,
4532 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4533 use object_store::ObjectStoreExt;
4534
4535 let (store, path) = build_store_from_url(source_url)?;
4536
4537 let bytes = store.get(&path).await?.bytes().await?;
4539
4540 let reader = bytes::Bytes::from(bytes.to_vec());
4542 let builder =
4543 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4544 Ok(builder.build()?)
4545 }
4546
4547 pub(crate) async fn scan_edge_type(
4548 &self,
4549 edge_type: &str,
4550 ctx: Option<&QueryContext>,
4551 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4552 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4553
4554 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4556
4557 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4559
4560 if let Some(ctx) = ctx {
4562 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4563 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4564 }
4565
4566 Ok(edges
4567 .into_iter()
4568 .map(|(eid, (src, dst))| (eid, src, dst))
4569 .collect())
4570 }
4571
4572 pub(crate) async fn scan_edge_type_l2(
4577 &self,
4578 _edge_type: &str,
4579 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4580 ) -> Result<()> {
4581 Ok(())
4584 }
4585
4586 pub(crate) async fn scan_edge_type_l1(
4588 &self,
4589 edge_type: &str,
4590 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4591 ) -> Result<()> {
4592 if let Ok(Some(batch)) = self
4593 .storage
4594 .scan_delta_table(
4595 edge_type,
4596 "fwd",
4597 &["eid", "src_vid", "dst_vid", "op", "_version"],
4598 None,
4599 )
4600 .await
4601 {
4602 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4604 HashMap::new();
4605
4606 self.process_delta_batch(&batch, &mut versioned_ops)?;
4607
4608 for (eid, (_, op, src, dst)) in versioned_ops {
4610 if op == 0 {
4611 edges.insert(eid, (src, dst));
4612 } else if op == 1 {
4613 edges.remove(&eid);
4614 }
4615 }
4616 }
4617 Ok(())
4618 }
4619
4620 pub(crate) fn process_delta_batch(
4622 &self,
4623 batch: &arrow_array::RecordBatch,
4624 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4625 ) -> Result<()> {
4626 use arrow_array::UInt64Array;
4627 let eid_col = batch
4628 .column_by_name("eid")
4629 .ok_or(anyhow!("Missing eid"))?
4630 .as_any()
4631 .downcast_ref::<UInt64Array>()
4632 .ok_or(anyhow!("Invalid eid"))?;
4633 let src_col = batch
4634 .column_by_name("src_vid")
4635 .ok_or(anyhow!("Missing src_vid"))?
4636 .as_any()
4637 .downcast_ref::<UInt64Array>()
4638 .ok_or(anyhow!("Invalid src_vid"))?;
4639 let dst_col = batch
4640 .column_by_name("dst_vid")
4641 .ok_or(anyhow!("Missing dst_vid"))?
4642 .as_any()
4643 .downcast_ref::<UInt64Array>()
4644 .ok_or(anyhow!("Invalid dst_vid"))?;
4645 let op_col = batch
4646 .column_by_name("op")
4647 .ok_or(anyhow!("Missing op"))?
4648 .as_any()
4649 .downcast_ref::<arrow_array::UInt8Array>()
4650 .ok_or(anyhow!("Invalid op"))?;
4651 let version_col = batch
4652 .column_by_name("_version")
4653 .ok_or(anyhow!("Missing _version"))?
4654 .as_any()
4655 .downcast_ref::<UInt64Array>()
4656 .ok_or(anyhow!("Invalid _version"))?;
4657
4658 for i in 0..batch.num_rows() {
4659 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4660 let version = version_col.value(i);
4661 let op = op_col.value(i);
4662 let src = Vid::from(src_col.value(i));
4663 let dst = Vid::from(dst_col.value(i));
4664
4665 match versioned_ops.entry(eid) {
4666 std::collections::hash_map::Entry::Vacant(e) => {
4667 e.insert((version, op, src, dst));
4668 }
4669 std::collections::hash_map::Entry::Occupied(mut e) => {
4670 if version > e.get().0 {
4671 e.insert((version, op, src, dst));
4672 }
4673 }
4674 }
4675 }
4676 Ok(())
4677 }
4678
4679 pub(crate) fn scan_edge_type_l0(
4681 &self,
4682 edge_type: &str,
4683 ctx: &QueryContext,
4684 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4685 ) {
4686 let schema = self.storage.schema_manager().schema();
4687 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4688
4689 if let Some(type_id) = type_id {
4690 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4692
4693 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4695 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4696 }
4697
4698 for pending_l0_arc in &ctx.pending_flush_l0s {
4700 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4701 }
4702 }
4703 }
4704
4705 pub(crate) fn scan_single_l0(
4707 &self,
4708 l0: &uni_store::runtime::L0Buffer,
4709 type_id: u32,
4710 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4711 ) {
4712 for edge_entry in l0.graph.edges() {
4713 if edge_entry.edge_type == type_id {
4714 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4715 }
4716 }
4717 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4719 for eid in eids_to_check {
4720 if l0.is_tombstoned(eid) {
4721 edges.remove(&eid);
4722 }
4723 }
4724 }
4725
4726 pub(crate) fn filter_tombstoned_vertex_edges(
4728 &self,
4729 ctx: &QueryContext,
4730 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4731 ) {
4732 let l0 = ctx.l0.read();
4733 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4734
4735 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4737 let tx_l0 = tx_l0_arc.read();
4738 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4739 }
4740
4741 for pending_l0_arc in &ctx.pending_flush_l0s {
4743 let pending_l0 = pending_l0_arc.read();
4744 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4745 }
4746
4747 edges.retain(|_, (src, dst)| {
4748 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4749 });
4750 }
4751
4752 pub(crate) async fn execute_project(
4754 &self,
4755 input_rows: Vec<HashMap<String, Value>>,
4756 projections: &[(Expr, Option<String>)],
4757 prop_manager: &PropertyManager,
4758 params: &HashMap<String, Value>,
4759 ctx: Option<&QueryContext>,
4760 ) -> Result<Vec<HashMap<String, Value>>> {
4761 let mut results = Vec::new();
4762 for m in input_rows {
4763 let mut row = HashMap::new();
4764 for (expr, alias) in projections {
4765 let val = self
4766 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4767 .await?;
4768 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4769 row.insert(name, val);
4770 }
4771 results.push(row);
4772 }
4773 Ok(results)
4774 }
4775
4776 pub(crate) async fn execute_unwind(
4778 &self,
4779 input_rows: Vec<HashMap<String, Value>>,
4780 expr: &Expr,
4781 variable: &str,
4782 prop_manager: &PropertyManager,
4783 params: &HashMap<String, Value>,
4784 ctx: Option<&QueryContext>,
4785 ) -> Result<Vec<HashMap<String, Value>>> {
4786 let mut results = Vec::new();
4787 for row in input_rows {
4788 let val = self
4789 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4790 .await?;
4791 if let Value::List(items) = val {
4792 for item in items {
4793 let mut new_row = row.clone();
4794 new_row.insert(variable.to_string(), item);
4795 results.push(new_row);
4796 }
4797 }
4798 }
4799 Ok(results)
4800 }
4801
4802 pub(crate) async fn execute_apply(
4804 &self,
4805 input_rows: Vec<HashMap<String, Value>>,
4806 subquery: &LogicalPlan,
4807 input_filter: Option<&Expr>,
4808 prop_manager: &PropertyManager,
4809 params: &HashMap<String, Value>,
4810 ctx: Option<&QueryContext>,
4811 ) -> Result<Vec<HashMap<String, Value>>> {
4812 let mut filtered_rows = input_rows;
4813
4814 if let Some(filter) = input_filter {
4815 let mut filtered = Vec::new();
4816 for row in filtered_rows {
4817 let res = self
4818 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4819 .await?;
4820 if res.as_bool().unwrap_or(false) {
4821 filtered.push(row);
4822 }
4823 }
4824 filtered_rows = filtered;
4825 }
4826
4827 if filtered_rows.is_empty() {
4830 let sub_rows = self
4831 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4832 .await?;
4833 return Ok(sub_rows);
4834 }
4835
4836 let mut results = Vec::new();
4837 for row in filtered_rows {
4838 let mut sub_params = params.clone();
4839 sub_params.extend(row.clone());
4840
4841 let sub_rows = self
4842 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4843 .await?;
4844
4845 for sub_row in sub_rows {
4846 let mut new_row = row.clone();
4847 new_row.extend(sub_row);
4848 results.push(new_row);
4849 }
4850 }
4851 Ok(results)
4852 }
4853
4854 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4856 let schema = self.storage.schema_manager().schema();
4857 let mut rows = Vec::new();
4858 for idx in &schema.indexes {
4859 let (name, type_str, details) = match idx {
4860 uni_common::core::schema::IndexDefinition::Vector(c) => (
4861 c.name.clone(),
4862 "VECTOR",
4863 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4864 ),
4865 uni_common::core::schema::IndexDefinition::FullText(c) => (
4866 c.name.clone(),
4867 "FULLTEXT",
4868 format!("on {}:{:?}", c.label, c.properties),
4869 ),
4870 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4871 cfg.name.clone(),
4872 "SCALAR",
4873 format!(":{}({:?})", cfg.label, cfg.properties),
4874 ),
4875 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4876 };
4877
4878 if let Some(f) = filter
4879 && f != type_str
4880 {
4881 continue;
4882 }
4883
4884 let mut row = HashMap::new();
4885 row.insert("name".to_string(), Value::String(name));
4886 row.insert("type".to_string(), Value::String(type_str.to_string()));
4887 row.insert("details".to_string(), Value::String(details));
4888 rows.push(row);
4889 }
4890 rows
4891 }
4892
4893 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4894 let mut row = HashMap::new();
4895 row.insert("name".to_string(), Value::String("uni".to_string()));
4896 vec![row]
4898 }
4899
4900 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4901 vec![]
4903 }
4904
4905 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4906 let snapshot = self
4907 .storage
4908 .snapshot_manager()
4909 .load_latest_snapshot()
4910 .await?;
4911 let mut results = Vec::new();
4912
4913 if let Some(snap) = snapshot {
4914 for (label, s) in &snap.vertices {
4915 let mut row = HashMap::new();
4916 row.insert("type".to_string(), Value::String("Label".to_string()));
4917 row.insert("name".to_string(), Value::String(label.clone()));
4918 row.insert("count".to_string(), Value::Int(s.count as i64));
4919 results.push(row);
4920 }
4921 for (edge, s) in &snap.edges {
4922 let mut row = HashMap::new();
4923 row.insert("type".to_string(), Value::String("Edge".to_string()));
4924 row.insert("name".to_string(), Value::String(edge.clone()));
4925 row.insert("count".to_string(), Value::Int(s.count as i64));
4926 results.push(row);
4927 }
4928 }
4929
4930 Ok(results)
4931 }
4932
4933 pub(crate) fn execute_show_constraints(
4934 &self,
4935 clause: ShowConstraints,
4936 ) -> Vec<HashMap<String, Value>> {
4937 let schema = self.storage.schema_manager().schema();
4938 let mut rows = Vec::new();
4939 for c in &schema.constraints {
4940 if let Some(target) = &clause.target {
4941 match (target, &c.target) {
4942 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4943 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4944 if e1 == e2 => {}
4945 _ => continue,
4946 }
4947 }
4948
4949 let mut row = HashMap::new();
4950 row.insert("name".to_string(), Value::String(c.name.clone()));
4951 let type_str = match c.constraint_type {
4952 ConstraintType::Unique { .. } => "UNIQUE",
4953 ConstraintType::Exists { .. } => "EXISTS",
4954 ConstraintType::Check { .. } => "CHECK",
4955 _ => "UNKNOWN",
4956 };
4957 row.insert("type".to_string(), Value::String(type_str.to_string()));
4958
4959 let target_str = match &c.target {
4960 ConstraintTarget::Label(l) => format!("(:{})", l),
4961 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4962 _ => "UNKNOWN".to_string(),
4963 };
4964 row.insert("target".to_string(), Value::String(target_str));
4965
4966 rows.push(row);
4967 }
4968 rows
4969 }
4970
4971 pub(crate) async fn execute_cross_join(
4973 &self,
4974 left: Box<LogicalPlan>,
4975 right: Box<LogicalPlan>,
4976 prop_manager: &PropertyManager,
4977 params: &HashMap<String, Value>,
4978 ctx: Option<&QueryContext>,
4979 ) -> Result<Vec<HashMap<String, Value>>> {
4980 let left_rows = self
4981 .execute_subplan(*left, prop_manager, params, ctx)
4982 .await?;
4983 let right_rows = self
4984 .execute_subplan(*right, prop_manager, params, ctx)
4985 .await?;
4986
4987 let mut results = Vec::new();
4988 for l in &left_rows {
4989 for r in &right_rows {
4990 let mut combined = l.clone();
4991 combined.extend(r.clone());
4992 results.push(combined);
4993 }
4994 }
4995 Ok(results)
4996 }
4997
4998 pub(crate) async fn execute_union(
5000 &self,
5001 left: Box<LogicalPlan>,
5002 right: Box<LogicalPlan>,
5003 all: bool,
5004 prop_manager: &PropertyManager,
5005 params: &HashMap<String, Value>,
5006 ctx: Option<&QueryContext>,
5007 ) -> Result<Vec<HashMap<String, Value>>> {
5008 let mut left_rows = self
5009 .execute_subplan(*left, prop_manager, params, ctx)
5010 .await?;
5011 let mut right_rows = self
5012 .execute_subplan(*right, prop_manager, params, ctx)
5013 .await?;
5014
5015 left_rows.append(&mut right_rows);
5016
5017 if !all {
5018 let mut seen = HashSet::new();
5019 left_rows.retain(|row| {
5020 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5021 let key = format!("{sorted_row:?}");
5022 seen.insert(key)
5023 });
5024 }
5025 Ok(left_rows)
5026 }
5027
5028 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5030 let schema = self.storage.schema_manager().schema();
5031 schema.indexes.iter().any(|idx| match idx {
5032 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5033 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5034 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5035 _ => false,
5036 })
5037 }
5038
5039 pub(crate) async fn execute_export(
5040 &self,
5041 target: &str,
5042 source: &str,
5043 options: &HashMap<String, Value>,
5044 prop_manager: &PropertyManager,
5045 ctx: Option<&QueryContext>,
5046 ) -> Result<Vec<HashMap<String, Value>>> {
5047 let format = options
5048 .get("format")
5049 .and_then(|v| v.as_str())
5050 .unwrap_or("csv")
5051 .to_lowercase();
5052
5053 match format.as_str() {
5054 "csv" => {
5055 self.execute_csv_export(target, source, options, prop_manager, ctx)
5056 .await
5057 }
5058 "parquet" => {
5059 self.execute_parquet_export(target, source, options, prop_manager, ctx)
5060 .await
5061 }
5062 _ => Err(anyhow!("Unsupported export format: {}", format)),
5063 }
5064 }
5065
5066 pub(crate) async fn execute_csv_export(
5067 &self,
5068 target: &str,
5069 source: &str,
5070 options: &HashMap<String, Value>,
5071 prop_manager: &PropertyManager,
5072 ctx: Option<&QueryContext>,
5073 ) -> Result<Vec<HashMap<String, Value>>> {
5074 let validated_dest = self.validate_path(source)?;
5076
5077 let schema = self.storage.schema_manager().schema();
5078 let label_meta = schema.labels.get(target);
5079 let edge_meta = schema.edge_types.get(target);
5080
5081 if label_meta.is_none() && edge_meta.is_none() {
5082 return Err(anyhow!("Target '{}' not found in schema", target));
5083 }
5084
5085 let delimiter_str = options
5086 .get("delimiter")
5087 .and_then(|v| v.as_str())
5088 .unwrap_or(",");
5089 let delimiter = if delimiter_str.is_empty() {
5090 b','
5091 } else {
5092 delimiter_str.as_bytes()[0]
5093 };
5094 let has_header = options
5095 .get("header")
5096 .and_then(|v| v.as_bool())
5097 .unwrap_or(true);
5098
5099 let mut wtr = csv::WriterBuilder::new()
5100 .delimiter(delimiter)
5101 .from_path(&validated_dest)?;
5102
5103 let mut count = 0;
5104 let empty_props = HashMap::new();
5106
5107 if let Some(meta) = label_meta {
5108 let label_id = meta.id;
5109 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5110 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5111 prop_names.sort();
5112
5113 let mut headers = vec!["_vid".to_string()];
5114 headers.extend(prop_names.clone());
5115
5116 if has_header {
5117 wtr.write_record(&headers)?;
5118 }
5119
5120 let vids = self
5121 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5122 .await?;
5123
5124 for vid in vids {
5125 let props = prop_manager
5126 .get_all_vertex_props_with_ctx(vid, ctx)
5127 .await?
5128 .unwrap_or_default();
5129
5130 let mut row = Vec::with_capacity(headers.len());
5131 row.push(vid.to_string());
5132 for p_name in &prop_names {
5133 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5134 row.push(self.format_csv_value(val));
5135 }
5136 wtr.write_record(&row)?;
5137 count += 1;
5138 }
5139 } else if let Some(meta) = edge_meta {
5140 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5141 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5142 prop_names.sort();
5143
5144 let mut headers = vec![
5146 "_eid".to_string(),
5147 "_src".to_string(),
5148 "_dst".to_string(),
5149 "_type".to_string(),
5150 ];
5151 headers.extend(prop_names.clone());
5152
5153 if has_header {
5154 wtr.write_record(&headers)?;
5155 }
5156
5157 let edges = self.scan_edge_type(target, ctx).await?;
5158
5159 for (eid, src, dst) in edges {
5160 let props = prop_manager
5161 .get_all_edge_props_with_ctx(eid, ctx)
5162 .await?
5163 .unwrap_or_default();
5164
5165 let mut row = Vec::with_capacity(headers.len());
5166 row.push(eid.to_string());
5167 row.push(src.to_string());
5168 row.push(dst.to_string());
5169 row.push(meta.id.to_string());
5170
5171 for p_name in &prop_names {
5172 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5173 row.push(self.format_csv_value(val));
5174 }
5175 wtr.write_record(&row)?;
5176 count += 1;
5177 }
5178 }
5179
5180 wtr.flush()?;
5181 let mut res = HashMap::new();
5182 res.insert("count".to_string(), Value::Int(count as i64));
5183 Ok(vec![res])
5184 }
5185
5186 pub(crate) async fn execute_parquet_export(
5190 &self,
5191 target: &str,
5192 destination: &str,
5193 _options: &HashMap<String, Value>,
5194 prop_manager: &PropertyManager,
5195 ctx: Option<&QueryContext>,
5196 ) -> Result<Vec<HashMap<String, Value>>> {
5197 let schema_manager = self.storage.schema_manager();
5198 let schema = schema_manager.schema();
5199 let label_meta = schema.labels.get(target);
5200 let edge_meta = schema.edge_types.get(target);
5201
5202 if label_meta.is_none() && edge_meta.is_none() {
5203 return Err(anyhow!("Target '{}' not found in schema", target));
5204 }
5205
5206 let arrow_schema = if label_meta.is_some() {
5207 let dataset = self.storage.vertex_dataset(target)?;
5208 dataset.get_arrow_schema(&schema)?
5209 } else {
5210 let dataset = self.storage.edge_dataset(target, "", "")?;
5212 dataset.get_arrow_schema(&schema)?
5213 };
5214
5215 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5216
5217 if let Some(meta) = label_meta {
5218 let label_id = meta.id;
5219 let vids = self
5220 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5221 .await?;
5222
5223 for vid in vids {
5224 let mut props = prop_manager
5225 .get_all_vertex_props_with_ctx(vid, ctx)
5226 .await?
5227 .unwrap_or_default();
5228
5229 props.insert(
5230 "_vid".to_string(),
5231 uni_common::Value::Int(vid.as_u64() as i64),
5232 );
5233 if !props.contains_key("_uid") {
5234 props.insert(
5235 "_uid".to_string(),
5236 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5237 );
5238 }
5239 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5240 props.insert("_version".to_string(), uni_common::Value::Int(1));
5241 rows.push(props);
5242 }
5243 } else if edge_meta.is_some() {
5244 let edges = self.scan_edge_type(target, ctx).await?;
5245 for (eid, src, dst) in edges {
5246 let mut props = prop_manager
5247 .get_all_edge_props_with_ctx(eid, ctx)
5248 .await?
5249 .unwrap_or_default();
5250
5251 props.insert(
5252 "eid".to_string(),
5253 uni_common::Value::Int(eid.as_u64() as i64),
5254 );
5255 props.insert(
5256 "src_vid".to_string(),
5257 uni_common::Value::Int(src.as_u64() as i64),
5258 );
5259 props.insert(
5260 "dst_vid".to_string(),
5261 uni_common::Value::Int(dst.as_u64() as i64),
5262 );
5263 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5264 props.insert("_version".to_string(), uni_common::Value::Int(1));
5265 rows.push(props);
5266 }
5267 }
5268
5269 if is_cloud_url(destination) {
5271 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5272 .await?;
5273 } else {
5274 let validated_dest = self.validate_path(destination)?;
5276 let file = std::fs::File::create(&validated_dest)?;
5277 let mut writer =
5278 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5279
5280 if !rows.is_empty() {
5282 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5283 writer.write(&batch)?;
5284 }
5285
5286 writer.close()?;
5287 }
5288
5289 let mut res = HashMap::new();
5290 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5291 Ok(vec![res])
5292 }
5293
5294 async fn write_parquet_to_cloud(
5296 &self,
5297 dest_url: &str,
5298 rows: &[HashMap<String, uni_common::Value>],
5299 arrow_schema: &arrow_schema::Schema,
5300 ) -> Result<()> {
5301 use object_store::ObjectStoreExt;
5302
5303 let (store, path) = build_store_from_url(dest_url)?;
5304
5305 let mut buffer = Vec::new();
5307 {
5308 let mut writer = parquet::arrow::ArrowWriter::try_new(
5309 &mut buffer,
5310 Arc::new(arrow_schema.clone()),
5311 None,
5312 )?;
5313
5314 if !rows.is_empty() {
5315 let batch = self.rows_to_batch(rows, arrow_schema)?;
5316 writer.write(&batch)?;
5317 }
5318
5319 writer.close()?;
5320 }
5321
5322 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5324
5325 Ok(())
5326 }
5327
5328 pub(crate) fn rows_to_batch(
5329 &self,
5330 rows: &[HashMap<String, uni_common::Value>],
5331 schema: &arrow_schema::Schema,
5332 ) -> Result<RecordBatch> {
5333 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5334
5335 for field in schema.fields() {
5336 let name = field.name();
5337 let dt = field.data_type();
5338
5339 let values: Vec<uni_common::Value> = rows
5340 .iter()
5341 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5342 .collect();
5343 let array = self.values_to_array(&values, dt)?;
5344 columns.push(array);
5345 }
5346
5347 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5348 }
5349
5350 pub(crate) fn values_to_array(
5353 &self,
5354 values: &[uni_common::Value],
5355 dt: &arrow_schema::DataType,
5356 ) -> Result<Arc<dyn Array>> {
5357 arrow_convert::values_to_array(values, dt)
5358 }
5359
5360 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5361 match val {
5362 Value::Null => "".to_string(),
5363 Value::String(s) => s,
5364 Value::Int(i) => i.to_string(),
5365 Value::Float(f) => f.to_string(),
5366 Value::Bool(b) => b.to_string(),
5367 _ => format!("{val}"),
5368 }
5369 }
5370
5371 pub(crate) fn parse_csv_value(
5372 &self,
5373 s: &str,
5374 data_type: &uni_common::core::schema::DataType,
5375 prop_name: &str,
5376 ) -> Result<Value> {
5377 if s.is_empty() || s.to_lowercase() == "null" {
5378 return Ok(Value::Null);
5379 }
5380
5381 use uni_common::core::schema::DataType;
5382 match data_type {
5383 DataType::String => Ok(Value::String(s.to_string())),
5384 DataType::Int32 | DataType::Int64 => {
5385 let i = s.parse::<i64>().map_err(|_| {
5386 anyhow!(
5387 "Failed to parse integer for property '{}': {}",
5388 prop_name,
5389 s
5390 )
5391 })?;
5392 Ok(Value::Int(i))
5393 }
5394 DataType::Float32 | DataType::Float64 => {
5395 let f = s.parse::<f64>().map_err(|_| {
5396 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5397 })?;
5398 Ok(Value::Float(f))
5399 }
5400 DataType::Bool => {
5401 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5402 anyhow!(
5403 "Failed to parse boolean for property '{}': {}",
5404 prop_name,
5405 s
5406 )
5407 })?;
5408 Ok(Value::Bool(b))
5409 }
5410 DataType::CypherValue => {
5411 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5412 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5413 })?;
5414 Ok(Value::from(json_val))
5415 }
5416 DataType::Vector { .. } => {
5417 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5418 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5419 })?;
5420 Ok(Value::Vector(v))
5421 }
5422 _ => Ok(Value::String(s.to_string())),
5423 }
5424 }
5425
5426 pub(crate) async fn detach_delete_vertex(
5427 &self,
5428 vid: Vid,
5429 writer: &Writer,
5430 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5431 ) -> Result<()> {
5432 let schema = self.storage.schema_manager().schema();
5433 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5434
5435 let out_graph = self
5437 .storage
5438 .load_subgraph_cached(
5439 &[vid],
5440 &edge_type_ids,
5441 1,
5442 uni_store::runtime::Direction::Outgoing,
5443 Some(writer.l0_manager.get_current()),
5444 )
5445 .await?;
5446
5447 for edge in out_graph.edges() {
5448 writer
5449 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5450 .await?;
5451 }
5452
5453 let in_graph = self
5455 .storage
5456 .load_subgraph_cached(
5457 &[vid],
5458 &edge_type_ids,
5459 1,
5460 uni_store::runtime::Direction::Incoming,
5461 Some(writer.l0_manager.get_current()),
5462 )
5463 .await?;
5464
5465 for edge in in_graph.edges() {
5466 writer
5467 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5468 .await?;
5469 }
5470
5471 Ok(())
5472 }
5473
5474 async fn batch_load_incident_edges(
5485 &self,
5486 vids: &[Vid],
5487 writer: &Writer,
5488 ) -> Result<(
5489 uni_store::runtime::working_graph::WorkingGraph,
5490 uni_store::runtime::working_graph::WorkingGraph,
5491 )> {
5492 let schema = self.storage.schema_manager().schema();
5493 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5494 let l0 = Some(writer.l0_manager.get_current());
5495
5496 let out_graph = self
5497 .storage
5498 .load_subgraph_cached(
5499 vids,
5500 &edge_type_ids,
5501 1,
5502 uni_store::runtime::Direction::Outgoing,
5503 l0.clone(),
5504 )
5505 .await?;
5506 let in_graph = self
5507 .storage
5508 .load_subgraph_cached(
5509 vids,
5510 &edge_type_ids,
5511 1,
5512 uni_store::runtime::Direction::Incoming,
5513 l0,
5514 )
5515 .await?;
5516 Ok((out_graph, in_graph))
5517 }
5518
5519 pub(crate) async fn batch_detach_delete_vertices(
5521 &self,
5522 vids: &[Vid],
5523 labels_per_vid: Vec<Option<Vec<String>>>,
5524 writer: &Writer,
5525 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5526 ) -> Result<()> {
5527 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5528
5529 for edge in out_graph.edges() {
5530 writer
5531 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5532 .await?;
5533 }
5534 for edge in in_graph.edges() {
5535 writer
5536 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5537 .await?;
5538 }
5539
5540 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5541 writer.delete_vertex(*vid, labels, tx_l0).await?;
5542 }
5543
5544 Ok(())
5545 }
5546
5547 pub(crate) async fn batch_check_vertices_have_no_edges(
5565 &self,
5566 vids: &[Vid],
5567 writer: &Writer,
5568 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5569 ) -> Result<()> {
5570 if vids.is_empty() {
5571 return Ok(());
5572 }
5573
5574 let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5575 std::collections::HashSet::new();
5576 {
5577 let writer_l0 = writer.l0_manager.get_current();
5578 let guard = writer_l0.read();
5579 for &eid in guard.tombstones.keys() {
5580 tombstoned_eids.insert(eid);
5581 }
5582 }
5583 if let Some(tx) = tx_l0 {
5584 let guard = tx.read();
5585 for &eid in guard.tombstones.keys() {
5586 tombstoned_eids.insert(eid);
5587 }
5588 }
5589
5590 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5591
5592 for edge in out_graph.edges().chain(in_graph.edges()) {
5593 if !tombstoned_eids.contains(&edge.eid) {
5594 return Err(anyhow!(
5595 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5596 edge.src_vid
5597 ));
5598 }
5599 }
5600 Ok(())
5601 }
5602}