1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73fn collect_l0_vids(
78 ctx: Option<&QueryContext>,
79 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81 let mut vids = Vec::new();
82 if let Some(ctx) = ctx {
83 vids.extend(extractor(&ctx.l0.read()));
84 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85 vids.extend(extractor(&tx_l0_arc.read()));
86 }
87 for pending_l0_arc in &ctx.pending_flush_l0s {
88 vids.extend(extractor(&pending_l0_arc.read()));
89 }
90 }
91 vids
92}
93
94async fn hydrate_entity_if_needed(
103 map: &mut HashMap<String, Value>,
104 prop_manager: &PropertyManager,
105 ctx: Option<&QueryContext>,
106) {
107 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110 tracing::debug!(
111 "Pushdown fallback: hydrating edge {} at execution time",
112 eid_u64
113 );
114 if let Ok(Some(props)) = prop_manager
115 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116 .await
117 {
118 for (key, value) in props {
119 map.entry(key).or_insert(value);
120 }
121 }
122 } else {
123 tracing::trace!(
124 "Pushdown success: edge {} already has {} properties",
125 eid_u64,
126 map.len() - EDGE_SYSTEM_FIELD_COUNT
127 );
128 }
129 return;
130 }
131
132 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135 tracing::debug!(
136 "Pushdown fallback: hydrating vertex {} at execution time",
137 vid_u64
138 );
139 if let Ok(Some(props)) = prop_manager
140 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141 .await
142 {
143 for (key, value) in props {
144 map.entry(key).or_insert(value);
145 }
146 }
147 } else {
148 tracing::trace!(
149 "Pushdown success: vertex {} already has {} properties",
150 vid_u64,
151 map.len() - VERTEX_SYSTEM_FIELD_COUNT
152 );
153 }
154 }
155}
156
157fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163 let prefix = format!("{}.", var);
164 let mut map = HashMap::new();
165
166 let dotted_keys: Vec<String> = row
167 .keys()
168 .filter(|k| k.starts_with(&prefix))
169 .cloned()
170 .collect();
171
172 for key in &dotted_keys {
173 let prop_name = &key[prefix.len()..];
174 if let Some(val) = row.remove(key) {
175 map.insert(prop_name.to_string(), val);
176 }
177 }
178
179 row.insert(var.to_string(), Value::Map(map));
181}
182
183fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193 let vid_key = format!("{}._vid", var);
195 let labels_key = format!("{}._labels", var);
196
197 let vid_val = row.remove(&vid_key);
198 let labels_val = row.remove(&labels_key);
199
200 if let Some(Value::Map(map)) = row.get_mut(var) {
201 if let Some(v) = vid_val {
202 map.insert("_vid".to_string(), v);
203 }
204 if let Some(v) = labels_val {
205 map.insert("_labels".to_string(), v);
206 }
207 }
208
209 let eid_key = format!("{}._eid", var);
214 let type_key = format!("{}._type", var);
215
216 let eid_val = row.remove(&eid_key);
217 let type_val = row.remove(&type_key);
218
219 if (eid_val.is_some() || type_val.is_some())
220 && let Some(Value::Map(map)) = row.get_mut(var)
221 {
222 if let Some(v) = eid_val {
223 map.entry("_eid".to_string()).or_insert(v);
224 }
225 if let Some(v) = type_val {
226 map.entry("_type".to_string()).or_insert(v);
227 }
228 }
229
230 let prefix = format!("{}.", var);
232 let dotted_keys: Vec<String> = row
233 .keys()
234 .filter(|k| k.starts_with(&prefix))
235 .cloned()
236 .collect();
237 for key in dotted_keys {
238 let prop_name = key[prefix.len()..].to_string();
239 let val = row.remove(&key);
240 if prop_name.starts_with('_') || prop_name == "overflow_json" {
241 continue;
242 }
243 if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244 map.insert(prop_name, val);
245 }
246 }
247}
248
249impl Executor {
250 async fn verify_and_filter_candidates(
255 &self,
256 mut candidates: Vec<Vid>,
257 variable: &str,
258 filter: Option<&Expr>,
259 ctx: Option<&QueryContext>,
260 prop_manager: &PropertyManager,
261 params: &HashMap<String, Value>,
262 ) -> Result<Vec<Vid>> {
263 candidates.sort_unstable();
264 candidates.dedup();
265
266 let mut verified_vids = Vec::new();
267 for vid in candidates {
268 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269 continue; };
271
272 if let Some(expr) = filter {
273 let mut props_map: HashMap<String, Value> = props;
274 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276 let mut row = HashMap::new();
277 row.insert(variable.to_string(), Value::Map(props_map));
278
279 let res = self
280 .evaluate_expr(expr, &row, prop_manager, params, ctx)
281 .await?;
282 if res.as_bool().unwrap_or(false) {
283 verified_vids.push(vid);
284 }
285 } else {
286 verified_vids.push(vid);
287 }
288 }
289
290 Ok(verified_vids)
291 }
292
293 pub(crate) async fn scan_storage_candidates(
294 &self,
295 label_id: u16,
296 variable: &str,
297 filter: Option<&Expr>,
298 ) -> Result<Vec<Vid>> {
299 let schema = self.storage.schema_manager().schema();
300 let label_name = schema
301 .label_name_by_id(label_id)
302 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304 let empty_props = std::collections::HashMap::new();
306 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307 let filter_sql = filter.and_then(|expr| {
308 LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309 });
310
311 self.storage
312 .scan_vertex_candidates(label_name, filter_sql.as_deref())
313 .await
314 }
315
316 pub(crate) async fn scan_label_with_filter(
317 &self,
318 label_id: u16,
319 variable: &str,
320 filter: Option<&Expr>,
321 ctx: Option<&QueryContext>,
322 prop_manager: &PropertyManager,
323 params: &HashMap<String, Value>,
324 ) -> Result<Vec<Vid>> {
325 let mut candidates = self
326 .scan_storage_candidates(label_id, variable, filter)
327 .await?;
328
329 let schema = self.storage.schema_manager().schema();
331 if let Some(label_name) = schema.label_name_by_id(label_id) {
332 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333 }
334
335 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336 .await
337 }
338
339 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340 if let Value::Node(node) = val {
342 return Ok(node.vid);
343 }
344 if let Value::Map(map) = val
346 && let Some(vid_val) = map.get("_vid")
347 && let Some(v) = vid_val.as_u64()
348 {
349 return Ok(Vid::from(v));
350 }
351 if let Some(s) = val.as_str()
353 && let Ok(id) = s.parse::<u64>()
354 {
355 return Ok(Vid::new(id));
356 }
357 if let Some(v) = val.as_u64() {
359 return Ok(Vid::from(v));
360 }
361 Err(anyhow!("Invalid Vid format: {:?}", val))
362 }
363
364 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370 for val in row.values() {
371 if let Ok(vid) = Self::vid_from_value(val)
372 && vid == target_vid
373 {
374 return val.clone();
375 }
376 }
377 Value::Map(HashMap::from([(
379 "_vid".to_string(),
380 Value::Int(target_vid.as_u64() as i64),
381 )]))
382 }
383
384 pub async fn create_datafusion_planner(
389 &self,
390 prop_manager: &PropertyManager,
391 params: &HashMap<String, Value>,
392 ) -> Result<(
393 Arc<SyncRwLock<SessionContext>>,
394 HybridPhysicalPlanner,
395 Arc<PropertyManager>,
396 )> {
397 let query_ctx = self.get_context().await;
398 let l0_context = match query_ctx {
399 Some(ref ctx) => L0Context::from_query_context(ctx),
400 None => L0Context::empty(),
401 };
402
403 let effective_storage = self.effective_storage();
412
413 let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
419 shared.clone()
420 } else {
421 Arc::new(PropertyManager::new(
422 self.storage.clone(),
423 self.storage.schema_manager_arc(),
424 prop_manager.cache_size(),
425 ))
426 };
427
428 let has_custom_udfs = self
447 .custom_function_registry
448 .as_ref()
449 .is_some_and(|r| !r.is_empty());
450 let host_plugin_registry = self
451 .procedure_registry
452 .as_ref()
453 .and_then(|pr| pr.plugin_registry());
454 let optimizer_providers = host_plugin_registry
458 .as_ref()
459 .map(|pr| pr.optimizer_rules())
460 .unwrap_or_default();
461 let session_local_registry =
462 crate::query::df_udfs_plugin::current_session_plugin_registry();
463 let needs_dynamic_registration = has_custom_udfs
464 || !optimizer_providers.is_empty()
465 || host_plugin_registry.is_some()
466 || session_local_registry.is_some();
467
468 let session = if let (Some(tmpl), false) = (
469 self.df_session_template.as_ref(),
470 needs_dynamic_registration,
471 ) {
472 (**tmpl).clone()
474 } else {
475 let session = if optimizer_providers.is_empty() {
483 SessionContext::new()
484 } else {
485 use datafusion::execution::session_state::SessionStateBuilder;
486 use uni_plugin::traits::operator::OptimizerPhase;
487 let mut builder = SessionStateBuilder::new().with_default_features();
488 for provider in optimizer_providers.iter() {
489 match provider.phase() {
490 OptimizerPhase::Logical => {
491 builder = builder.with_optimizer_rule(provider.rule());
492 }
493 OptimizerPhase::Physical => {
494 if let Some(rule) = provider.physical_rule() {
495 builder = builder.with_physical_optimizer_rule(rule);
496 } else {
497 tracing::debug!(
498 target: "uni.plugin.registry",
499 "physical-phase provider returned no physical_rule(); skipping"
500 );
501 }
502 }
503 OptimizerPhase::Both => {
504 builder = builder.with_optimizer_rule(provider.rule());
505 if let Some(rule) = provider.physical_rule() {
506 builder = builder.with_physical_optimizer_rule(rule);
507 }
508 }
509 _ => {
510 tracing::debug!(
511 target: "uni.plugin.registry",
512 "skipping optimizer rule for unknown phase"
513 );
514 }
515 }
516 }
517 let state = builder.build();
518 SessionContext::new_with_state(state)
519 };
520 crate::query::df_udfs::register_cypher_udfs(&session)?;
521 if let Some(ref registry) = self.custom_function_registry {
525 crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
526 &session, registry,
527 )?;
528 }
529 if let Some(ref host_pr) = host_plugin_registry {
540 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
541 }
542 if let Some(ref session_local) = session_local_registry {
548 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
549 }
550 session
551 };
552 let session_ctx = Arc::new(SyncRwLock::new(session));
553
554 let mut planner = HybridPhysicalPlanner::with_l0_context(
555 session_ctx.clone(),
556 effective_storage.clone(),
557 l0_context,
558 prop_manager_arc.clone(),
559 effective_storage.schema_manager().schema(),
560 params.clone(),
561 HashMap::new(),
562 );
563
564 planner = planner.with_algo_registry(self.algo_registry.clone());
565 if let Some(ref registry) = self.procedure_registry {
566 planner = planner.with_procedure_registry(registry.clone());
567 }
568 let host_plugin_registry = self
577 .procedure_registry
578 .as_ref()
579 .and_then(|r| r.plugin_registry());
580 if let Some(registry) = host_plugin_registry {
581 planner = planner.with_plugin_registry(registry);
582 }
583 if let Some(ref xervo_runtime) = self.xervo_runtime {
584 planner = planner.with_xervo_runtime(xervo_runtime.clone());
585 }
586 if let Some(writer) = &self.writer {
591 planner = planner.with_writer(Arc::clone(writer));
592 }
593
594 Ok((session_ctx, planner, prop_manager_arc))
595 }
596
597 pub fn collect_batches(
599 session_ctx: &Arc<SyncRwLock<SessionContext>>,
600 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
601 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
602 Box::pin(async move {
603 use futures::TryStreamExt;
604
605 let task_ctx = session_ctx.read().task_ctx();
606 let partition_count = execution_plan.output_partitioning().partition_count();
607 let mut all_batches = Vec::new();
608 for partition in 0..partition_count {
609 let stream = execution_plan.execute(partition, task_ctx.clone())?;
610 let batches: Vec<RecordBatch> = stream.try_collect().await?;
611 all_batches.extend(batches);
612 }
613 Ok(all_batches)
614 })
615 }
616
617 pub async fn execute_datafusion(
622 &self,
623 plan: LogicalPlan,
624 prop_manager: &PropertyManager,
625 params: &HashMap<String, Value>,
626 ) -> Result<Vec<RecordBatch>> {
627 let (batches, _plan) = self
628 .execute_datafusion_with_plan(plan, prop_manager, params)
629 .await?;
630 Ok(batches)
631 }
632
633 pub async fn execute_datafusion_with_plan(
640 &self,
641 plan: LogicalPlan,
642 prop_manager: &PropertyManager,
643 params: &HashMap<String, Value>,
644 ) -> Result<(
645 Vec<RecordBatch>,
646 Arc<dyn datafusion::physical_plan::ExecutionPlan>,
647 )> {
648 let (session_ctx, mut planner, prop_manager_arc) =
649 self.create_datafusion_planner(prop_manager, params).await?;
650
651 if Self::contains_write_operations(&plan) {
653 let writer = self
654 .writer
655 .as_ref()
656 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
657 .clone();
658 let query_ctx = self.get_context().await;
659
660 debug_assert!(
661 query_ctx.is_some(),
662 "BUG: query_ctx is None for write operation"
663 );
664
665 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
666 executor: self.clone(),
667 writer,
668 prop_manager: prop_manager_arc,
669 params: params.clone(),
670 query_ctx,
671 tx_l0_override: self.transaction_l0_override.clone(),
672 });
673 planner = planner.with_mutation_context(mutation_ctx);
674 tracing::debug!(
675 plan_type = Self::get_plan_type(&plan),
676 "Mutation routed to DataFusion engine"
677 );
678 }
679
680 let execution_plan = planner.plan(&plan)?;
681 let plan_clone = Arc::clone(&execution_plan);
682 let result = Self::collect_batches(&session_ctx, execution_plan).await;
683
684 let graph_warnings = planner.graph_ctx().take_warnings();
686 if !graph_warnings.is_empty()
687 && let Ok(mut w) = self.warnings.lock()
688 {
689 w.extend(graph_warnings);
690 }
691
692 result.map(|batches| (batches, plan_clone))
693 }
694
695 pub(crate) async fn execute_merge_read_plan(
701 &self,
702 plan: LogicalPlan,
703 prop_manager: &PropertyManager,
704 params: &HashMap<String, Value>,
705 merge_variables: Vec<String>,
706 ) -> Result<Vec<HashMap<String, Value>>> {
707 let (session_ctx, planner, _prop_manager_arc) =
708 self.create_datafusion_planner(prop_manager, params).await?;
709
710 let extra: HashMap<String, HashSet<String>> = merge_variables
713 .iter()
714 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
715 .collect();
716 let execution_plan = planner.plan_with_properties(&plan, extra)?;
717 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
718
719 let flat_rows = self.record_batches_to_rows(all_batches)?;
721
722 let rows = flat_rows
725 .into_iter()
726 .map(|mut row| {
727 for var in &merge_variables {
728 if row.contains_key(var) {
730 continue;
731 }
732 let prefix = format!("{}.", var);
733 let dotted_keys: Vec<String> = row
734 .keys()
735 .filter(|k| k.starts_with(&prefix))
736 .cloned()
737 .collect();
738 if !dotted_keys.is_empty() {
739 let mut map = HashMap::new();
740 for key in dotted_keys {
741 let prop_name = key[prefix.len()..].to_string();
742 if let Some(val) = row.remove(&key) {
743 map.insert(prop_name, val);
744 }
745 }
746 row.insert(var.clone(), Value::Map(map));
747 }
748 }
749 row
750 })
751 .collect();
752
753 Ok(rows)
754 }
755
756 pub(crate) fn record_batches_to_rows(
763 &self,
764 batches: Vec<RecordBatch>,
765 ) -> Result<Vec<HashMap<String, Value>>> {
766 let mut rows = Vec::new();
767
768 for batch in batches {
769 let num_rows = batch.num_rows();
770 let schema = batch.schema();
771
772 for row_idx in 0..num_rows {
773 let mut row = HashMap::new();
774
775 for (col_idx, field) in schema.fields().iter().enumerate() {
776 let column = batch.column(col_idx);
777 let data_type =
779 if uni_common::core::schema::is_datetime_struct(field.data_type()) {
780 Some(&uni_common::DataType::DateTime)
781 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
782 Some(&uni_common::DataType::Time)
783 } else {
784 None
785 };
786 let mut value =
787 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
788
789 if field
792 .metadata()
793 .get("cv_encoded")
794 .is_some_and(|v| v == "true")
795 && let Value::String(s) = &value
796 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
797 {
798 value = Value::from(parsed);
799 }
800
801 value = Self::normalize_path_if_needed(value);
803
804 row.insert(field.name().clone(), value);
805 }
806
807 let bare_vars: Vec<String> = row
816 .keys()
817 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
818 .cloned()
819 .collect();
820
821 let vid_placeholder_vars: Vec<String> = row
825 .keys()
826 .filter(|k| {
827 !k.contains('.')
828 && matches!(row.get(*k), Some(Value::String(_)))
829 && row.contains_key(&format!("{}._vid", k))
830 })
831 .cloned()
832 .collect();
833
834 for var in &vid_placeholder_vars {
835 promote_vid_placeholder(&mut row, var);
836 }
837
838 for var in &bare_vars {
839 merge_dotted_columns(&mut row, var);
840 }
841
842 rows.push(row);
843 }
844 }
845
846 Ok(rows)
847 }
848
849 fn normalize_path_if_needed(value: Value) -> Value {
854 match value {
855 Value::Map(map)
856 if map.contains_key("nodes")
857 && (map.contains_key("relationships") || map.contains_key("edges")) =>
858 {
859 Self::normalize_path_map(map)
860 }
861 other => other,
862 }
863 }
864
865 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
867 if let Some(Value::List(nodes)) = map.remove("nodes") {
869 let normalized_nodes: Vec<Value> = nodes
870 .into_iter()
871 .map(|n| {
872 if let Value::Map(node_map) = n {
873 Self::normalize_path_node_map(node_map)
874 } else {
875 n
876 }
877 })
878 .collect();
879 map.insert("nodes".to_string(), Value::List(normalized_nodes));
880 }
881
882 let rels_key = if map.contains_key("relationships") {
884 "relationships"
885 } else {
886 "edges"
887 };
888 if let Some(Value::List(rels)) = map.remove(rels_key) {
889 let normalized_rels: Vec<Value> = rels
890 .into_iter()
891 .map(|r| {
892 if let Value::Map(rel_map) = r {
893 Self::normalize_path_edge_map(rel_map)
894 } else {
895 r
896 }
897 })
898 .collect();
899 map.insert("relationships".to_string(), Value::List(normalized_rels));
900 }
901
902 Value::Map(map)
903 }
904
905 fn value_to_id_string(val: Value) -> String {
907 match val {
908 Value::Int(n) => n.to_string(),
909 Value::Float(n) => n.to_string(),
910 Value::String(s) => s,
911 other => other.to_string(),
912 }
913 }
914
915 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
918 if let Some(val) = map.remove(src_key) {
919 map.insert(
920 dst_key.to_string(),
921 Value::String(Self::value_to_id_string(val)),
922 );
923 }
924 }
925
926 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
928 match map.get("properties") {
929 Some(props) if !props.is_null() => {}
930 _ => {
931 map.insert("properties".to_string(), Value::Map(HashMap::new()));
932 }
933 }
934 }
935
936 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
938 Self::stringify_map_field(&mut map, "_vid", "_id");
939 Self::ensure_properties_map(&mut map);
940 Value::Map(map)
941 }
942
943 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
945 Self::stringify_map_field(&mut map, "_eid", "_id");
946 Self::stringify_map_field(&mut map, "_src", "_src");
947 Self::stringify_map_field(&mut map, "_dst", "_dst");
948
949 if let Some(type_name) = map.remove("_type_name") {
950 map.insert("_type".to_string(), type_name);
951 }
952
953 Self::ensure_properties_map(&mut map);
954 Value::Map(map)
955 }
956
957 #[instrument(
958 skip(self, prop_manager, params),
959 fields(rows_returned, duration_ms),
960 level = "info"
961 )]
962 pub fn execute<'a>(
963 &'a self,
964 plan: LogicalPlan,
965 prop_manager: &'a PropertyManager,
966 params: &'a HashMap<String, Value>,
967 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
968 Box::pin(async move {
969 let query_type = Self::get_plan_type(&plan);
970 let ctx = self.get_context().await;
971 let start = Instant::now();
972
973 let res = if Self::is_ddl_or_admin(&plan) {
976 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
977 .await
978 } else {
979 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
980 self.record_batches_to_rows(batches)
981 };
982
983 let duration = start.elapsed();
984 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
985 .record(duration.as_secs_f64());
986
987 tracing::Span::current().record("duration_ms", duration.as_millis());
988 match &res {
989 Ok(rows) => {
990 tracing::Span::current().record("rows_returned", rows.len());
991 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
992 .increment(rows.len() as u64);
993 }
994 Err(e) => {
995 let error_type = if e.to_string().contains("timed out") {
996 "timeout"
997 } else if e.to_string().contains("syntax") {
998 "syntax"
999 } else {
1000 "execution"
1001 };
1002 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
1003 }
1004 }
1005
1006 res
1007 })
1008 }
1009
1010 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1011 match plan {
1012 LogicalPlan::Scan { .. } => "read_scan",
1013 LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1014 LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1015 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1016 LogicalPlan::Traverse { .. } => "read_traverse",
1017 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1018 LogicalPlan::ScanAll { .. } => "read_scan_all",
1019 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1020 LogicalPlan::VectorKnn { .. } => "read_vector",
1021 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1022 LogicalPlan::Merge { .. } => "write_merge",
1023 LogicalPlan::Delete { .. } => "write_delete",
1024 LogicalPlan::Set { .. } => "write_set",
1025 LogicalPlan::Remove { .. } => "write_remove",
1026 LogicalPlan::ProcedureCall { .. } => "call",
1027 LogicalPlan::Copy { .. } => "copy",
1028 LogicalPlan::Backup { .. } => "backup",
1029 _ => "other",
1030 }
1031 }
1032
1033 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1043 match plan {
1044 LogicalPlan::Project { input, .. }
1046 | LogicalPlan::Sort { input, .. }
1047 | LogicalPlan::Limit { input, .. }
1048 | LogicalPlan::Distinct { input }
1049 | LogicalPlan::Aggregate { input, .. }
1050 | LogicalPlan::Window { input, .. }
1051 | LogicalPlan::Unwind { input, .. }
1052 | LogicalPlan::Filter { input, .. }
1053 | LogicalPlan::Create { input, .. }
1054 | LogicalPlan::CreateBatch { input, .. }
1055 | LogicalPlan::Set { input, .. }
1056 | LogicalPlan::Remove { input, .. }
1057 | LogicalPlan::Delete { input, .. }
1058 | LogicalPlan::Merge { input, .. }
1059 | LogicalPlan::Foreach { input, .. }
1060 | LogicalPlan::Traverse { input, .. }
1061 | LogicalPlan::TraverseMainByType { input, .. }
1062 | LogicalPlan::BindZeroLengthPath { input, .. }
1063 | LogicalPlan::BindPath { input, .. }
1064 | LogicalPlan::ShortestPath { input, .. }
1065 | LogicalPlan::AllShortestPaths { input, .. }
1066 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1067
1068 LogicalPlan::Apply {
1070 input, subquery, ..
1071 }
1072 | LogicalPlan::SubqueryCall { input, subquery } => {
1073 vec![input.as_ref(), subquery.as_ref()]
1074 }
1075 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1076 vec![left.as_ref(), right.as_ref()]
1077 }
1078 LogicalPlan::RecursiveCTE {
1079 initial, recursive, ..
1080 } => vec![initial.as_ref(), recursive.as_ref()],
1081 LogicalPlan::QuantifiedPattern {
1082 input,
1083 pattern_plan,
1084 ..
1085 } => vec![input.as_ref(), pattern_plan.as_ref()],
1086
1087 _ => vec![],
1089 }
1090 }
1091
1092 pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1099 match plan {
1100 LogicalPlan::CreateLabel(_)
1102 | LogicalPlan::CreateEdgeType(_)
1103 | LogicalPlan::AlterLabel(_)
1104 | LogicalPlan::AlterEdgeType(_)
1105 | LogicalPlan::DropLabel(_)
1106 | LogicalPlan::DropEdgeType(_)
1107 | LogicalPlan::CreateConstraint(_)
1108 | LogicalPlan::DropConstraint(_)
1109 | LogicalPlan::ShowConstraints(_) => true,
1110
1111 LogicalPlan::CreateVectorIndex { .. }
1113 | LogicalPlan::CreateFullTextIndex { .. }
1114 | LogicalPlan::CreateScalarIndex { .. }
1115 | LogicalPlan::CreateJsonFtsIndex { .. }
1116 | LogicalPlan::DropIndex { .. }
1117 | LogicalPlan::ShowIndexes { .. } => true,
1118
1119 LogicalPlan::ShowDatabase
1121 | LogicalPlan::ShowConfig
1122 | LogicalPlan::ShowStatistics
1123 | LogicalPlan::Vacuum
1124 | LogicalPlan::Checkpoint
1125 | LogicalPlan::Copy { .. }
1126 | LogicalPlan::CopyTo { .. }
1127 | LogicalPlan::CopyFrom { .. }
1128 | LogicalPlan::Backup { .. }
1129 | LogicalPlan::Explain { .. } => true,
1130
1131 LogicalPlan::ProcedureCall { procedure_name, .. } => {
1134 !Self::is_df_eligible_procedure(procedure_name)
1135 }
1136
1137 _ => Self::plan_children(plan)
1139 .iter()
1140 .any(|child| Self::is_ddl_or_admin(child)),
1141 }
1142 }
1143
1144 fn is_df_eligible_procedure(name: &str) -> bool {
1150 matches!(
1151 name,
1152 "uni.schema.labels"
1153 | "uni.schema.edgeTypes"
1154 | "uni.schema.relationshipTypes"
1155 | "uni.schema.indexes"
1156 | "uni.schema.constraints"
1157 | "uni.schema.labelInfo"
1158 | "uni.vector.query"
1159 | "uni.fts.query"
1160 | "uni.search"
1161 | "uni.create.vNode"
1168 | "uni.create.vEdge"
1169 ) || name.starts_with("uni.algo.")
1170 }
1171
1172 fn contains_write_operations(plan: &LogicalPlan) -> bool {
1179 match plan {
1180 LogicalPlan::Create { .. }
1181 | LogicalPlan::CreateBatch { .. }
1182 | LogicalPlan::Merge { .. }
1183 | LogicalPlan::Delete { .. }
1184 | LogicalPlan::Set { .. }
1185 | LogicalPlan::Remove { .. }
1186 | LogicalPlan::Foreach { .. } => true,
1187 _ => Self::plan_children(plan)
1188 .iter()
1189 .any(|child| Self::contains_write_operations(child)),
1190 }
1191 }
1192
1193 pub fn execute_stream(
1198 self,
1199 plan: LogicalPlan,
1200 prop_manager: Arc<PropertyManager>,
1201 params: HashMap<String, Value>,
1202 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1203 let this = self;
1204 let this_for_ctx = this.clone();
1205
1206 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1207
1208 ctx_stream
1209 .flat_map(move |ctx| {
1210 let plan = plan.clone();
1211 let this = this.clone();
1212 let prop_manager = prop_manager.clone();
1213 let params = params.clone();
1214
1215 let fut = async move {
1216 if Self::is_ddl_or_admin(&plan) {
1217 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1218 .await
1219 } else {
1220 let batches = this
1221 .execute_datafusion(plan, &prop_manager, ¶ms)
1222 .await?;
1223 this.record_batches_to_rows(batches)
1224 }
1225 };
1226 stream::once(fut).boxed()
1227 })
1228 .boxed()
1229 }
1230
1231 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1234 arrow_convert::arrow_to_value(col, row, None)
1235 }
1236
1237 pub(crate) fn evaluate_expr<'a>(
1238 &'a self,
1239 expr: &'a Expr,
1240 row: &'a HashMap<String, Value>,
1241 prop_manager: &'a PropertyManager,
1242 params: &'a HashMap<String, Value>,
1243 ctx: Option<&'a QueryContext>,
1244 ) -> BoxFuture<'a, Result<Value>> {
1245 let this = self;
1246 Box::pin(async move {
1247 let repr = expr.to_string_repr();
1249 if let Some(val) = row.get(&repr) {
1250 return Ok(val.clone());
1251 }
1252
1253 match expr {
1254 Expr::PatternComprehension { .. } => {
1255 Err(anyhow::anyhow!(
1257 "Pattern comprehensions are handled by DataFusion executor"
1258 ))
1259 }
1260 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1261 "COLLECT subqueries not yet supported in executor"
1262 )),
1263 Expr::Variable(name) => {
1264 if let Some(val) = row.get(name) {
1265 Ok(val.clone())
1266 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1267 Ok(vid_val.clone())
1271 } else {
1272 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1273 }
1274 }
1275 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1276 Expr::Property(var_expr, prop_name) => {
1277 if let Expr::Variable(var_name) = var_expr.as_ref() {
1281 let flat_key = format!("{}.{}", var_name, prop_name);
1282 if let Some(val) = row.get(flat_key.as_str()) {
1283 return Ok(val.clone());
1284 }
1285 }
1286
1287 let base_val = this
1288 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1289 .await?;
1290
1291 if (prop_name == "_vid" || prop_name == "_id")
1293 && let Ok(vid) = Self::vid_from_value(&base_val)
1294 {
1295 return Ok(Value::Int(vid.as_u64() as i64));
1296 }
1297
1298 if let Value::Node(node) = &base_val {
1300 if prop_name == "_vid" || prop_name == "_id" {
1302 return Ok(Value::Int(node.vid.as_u64() as i64));
1303 }
1304 if prop_name == "_labels" {
1305 return Ok(Value::List(
1306 node.labels
1307 .iter()
1308 .map(|l| Value::String(l.clone()))
1309 .collect(),
1310 ));
1311 }
1312 if let Some(val) = node.properties.get(prop_name.as_str()) {
1314 return Ok(val.clone());
1315 }
1316 if let Ok(val) = prop_manager
1318 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1319 .await
1320 {
1321 return Ok(val);
1322 }
1323 return Ok(Value::Null);
1324 }
1325
1326 if let Value::Edge(edge) = &base_val {
1328 if prop_name == "_eid" || prop_name == "_id" {
1330 return Ok(Value::Int(edge.eid.as_u64() as i64));
1331 }
1332 if prop_name == "_type" {
1333 return Ok(Value::String(edge.edge_type.clone()));
1334 }
1335 if prop_name == "_src" {
1336 return Ok(Value::Int(edge.src.as_u64() as i64));
1337 }
1338 if prop_name == "_dst" {
1339 return Ok(Value::Int(edge.dst.as_u64() as i64));
1340 }
1341 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1343 return Ok(val.clone());
1344 }
1345 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1347 {
1348 return Ok(val);
1349 }
1350 return Ok(Value::Null);
1351 }
1352
1353 if let Value::Map(map) = &base_val {
1356 if let Some(val) = map.get(prop_name.as_str()) {
1358 return Ok(val.clone());
1359 }
1360 if let Some(Value::Map(props)) = map.get("properties")
1362 && let Some(val) = props.get(prop_name.as_str())
1363 {
1364 return Ok(val.clone());
1365 }
1366 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1368 map.get("_id")
1369 .and_then(|v| v.as_str())
1370 .and_then(|s| s.parse::<u64>().ok())
1371 });
1372 if let Some(id) = vid_opt {
1373 let vid = Vid::from(id);
1374 if let Ok(val) = prop_manager
1375 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1376 .await
1377 {
1378 return Ok(val);
1379 }
1380 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1381 let eid = uni_common::core::id::Eid::from(id);
1382 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1383 return Ok(val);
1384 }
1385 }
1386 return Ok(Value::Null);
1387 }
1388
1389 if let Ok(vid) = Self::vid_from_value(&base_val) {
1391 return prop_manager
1392 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1393 .await;
1394 }
1395
1396 if base_val.is_null() {
1397 return Ok(Value::Null);
1398 }
1399
1400 {
1402 use crate::query::datetime::{
1403 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1404 is_duration_string, is_temporal_accessor, is_temporal_string,
1405 };
1406
1407 if let Value::Temporal(tv) = &base_val {
1409 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1410 if is_duration_accessor(prop_name) {
1411 return eval_duration_accessor(
1413 &base_val.to_string(),
1414 prop_name,
1415 );
1416 }
1417 } else if is_temporal_accessor(prop_name) {
1418 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1419 }
1420 }
1421
1422 if let Value::String(s) = &base_val {
1424 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1425 return eval_temporal_accessor(s, prop_name);
1426 }
1427 if is_duration_string(s) && is_duration_accessor(prop_name) {
1428 return eval_duration_accessor(s, prop_name);
1429 }
1430 }
1431 }
1432
1433 Err(anyhow!(
1434 "Cannot access property '{}' on {:?}",
1435 prop_name,
1436 base_val
1437 ))
1438 }
1439 Expr::ArrayIndex {
1440 array: arr_expr,
1441 index: idx_expr,
1442 } => {
1443 let arr_val = this
1444 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1445 .await?;
1446 let idx_val = this
1447 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1448 .await?;
1449
1450 if let Value::List(arr) = &arr_val {
1451 if let Some(i) = idx_val.as_i64() {
1453 let idx = if i < 0 {
1454 let positive_idx = arr.len() as i64 + i;
1456 if positive_idx < 0 {
1457 return Ok(Value::Null); }
1459 positive_idx as usize
1460 } else {
1461 i as usize
1462 };
1463 if idx < arr.len() {
1464 return Ok(arr[idx].clone());
1465 }
1466 return Ok(Value::Null);
1467 } else if idx_val.is_null() {
1468 return Ok(Value::Null);
1469 } else {
1470 return Err(anyhow::anyhow!(
1471 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1472 idx_val
1473 ));
1474 }
1475 }
1476 if let Value::Map(map) = &arr_val {
1477 if let Some(key) = idx_val.as_str() {
1478 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1479 } else if !idx_val.is_null() {
1480 return Err(anyhow::anyhow!(
1481 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1482 idx_val
1483 ));
1484 }
1485 }
1486 if let Value::Node(node) = &arr_val {
1488 if let Some(key) = idx_val.as_str() {
1489 if let Some(val) = node.properties.get(key) {
1491 return Ok(val.clone());
1492 }
1493 if let Ok(val) = prop_manager
1495 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1496 .await
1497 {
1498 return Ok(val);
1499 }
1500 return Ok(Value::Null);
1501 } else if !idx_val.is_null() {
1502 return Err(anyhow::anyhow!(
1503 "TypeError: Node index must be a string, got: {:?}",
1504 idx_val
1505 ));
1506 }
1507 }
1508 if let Value::Edge(edge) = &arr_val {
1510 if let Some(key) = idx_val.as_str() {
1511 if let Some(val) = edge.properties.get(key) {
1513 return Ok(val.clone());
1514 }
1515 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1517 return Ok(val);
1518 }
1519 return Ok(Value::Null);
1520 } else if !idx_val.is_null() {
1521 return Err(anyhow::anyhow!(
1522 "TypeError: Edge index must be a string, got: {:?}",
1523 idx_val
1524 ));
1525 }
1526 }
1527 if let Ok(vid) = Self::vid_from_value(&arr_val)
1529 && let Some(key) = idx_val.as_str()
1530 {
1531 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1532 {
1533 return Ok(val);
1534 }
1535 return Ok(Value::Null);
1536 }
1537 if arr_val.is_null() {
1538 return Ok(Value::Null);
1539 }
1540 Err(anyhow!(
1541 "TypeError: InvalidArgumentType - cannot index into {:?}",
1542 arr_val
1543 ))
1544 }
1545 Expr::ArraySlice { array, start, end } => {
1546 let arr_val = this
1547 .evaluate_expr(array, row, prop_manager, params, ctx)
1548 .await?;
1549
1550 if let Value::List(arr) = &arr_val {
1551 let len = arr.len();
1552
1553 let start_idx = if let Some(s) = start {
1555 let v = this
1556 .evaluate_expr(s, row, prop_manager, params, ctx)
1557 .await?;
1558 if v.is_null() {
1559 return Ok(Value::Null);
1560 }
1561 let raw = v.as_i64().unwrap_or(0);
1562 if raw < 0 {
1563 (len as i64 + raw).max(0) as usize
1564 } else {
1565 (raw as usize).min(len)
1566 }
1567 } else {
1568 0
1569 };
1570
1571 let end_idx = if let Some(e) = end {
1573 let v = this
1574 .evaluate_expr(e, row, prop_manager, params, ctx)
1575 .await?;
1576 if v.is_null() {
1577 return Ok(Value::Null);
1578 }
1579 let raw = v.as_i64().unwrap_or(len as i64);
1580 if raw < 0 {
1581 (len as i64 + raw).max(0) as usize
1582 } else {
1583 (raw as usize).min(len)
1584 }
1585 } else {
1586 len
1587 };
1588
1589 if start_idx >= end_idx {
1591 return Ok(Value::List(vec![]));
1592 }
1593 let end_idx = end_idx.min(len);
1594 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1595 }
1596
1597 if arr_val.is_null() {
1598 return Ok(Value::Null);
1599 }
1600 Err(anyhow!("Cannot slice {:?}", arr_val))
1601 }
1602 Expr::Literal(lit) => Ok(lit.to_value()),
1603 Expr::List(items) => {
1604 let mut vals = Vec::new();
1605 for item in items {
1606 vals.push(
1607 this.evaluate_expr(item, row, prop_manager, params, ctx)
1608 .await?,
1609 );
1610 }
1611 Ok(Value::List(vals))
1612 }
1613 Expr::Map(items) => {
1614 let mut map = HashMap::new();
1615 for (key, value_expr) in items {
1616 let val = this
1617 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1618 .await?;
1619 map.insert(key.clone(), val);
1620 }
1621 Ok(Value::Map(map))
1622 }
1623 Expr::Exists { query, .. } => {
1624 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1626 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1627
1628 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1629 Ok(plan) => {
1630 let mut sub_params = params.clone();
1631 sub_params.extend(row.clone());
1632
1633 match this.execute(plan, prop_manager, &sub_params).await {
1634 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1635 Err(e) => {
1636 log::debug!("EXISTS subquery execution failed: {}", e);
1637 Ok(Value::Bool(false))
1638 }
1639 }
1640 }
1641 Err(e) => {
1642 log::debug!("EXISTS subquery planning failed: {}", e);
1643 Ok(Value::Bool(false))
1644 }
1645 }
1646 }
1647 Expr::CountSubquery(query) => {
1648 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1650
1651 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1652
1653 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1654 Ok(plan) => {
1655 let mut sub_params = params.clone();
1656 sub_params.extend(row.clone());
1657
1658 match this.execute(plan, prop_manager, &sub_params).await {
1659 Ok(results) => Ok(Value::from(results.len() as i64)),
1660 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1661 }
1662 }
1663 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1664 }
1665 }
1666 Expr::Quantifier {
1667 quantifier,
1668 variable,
1669 list,
1670 predicate,
1671 } => {
1672 let list_val = this
1686 .evaluate_expr(list, row, prop_manager, params, ctx)
1687 .await?;
1688
1689 if list_val.is_null() {
1691 return Ok(Value::Null);
1692 }
1693
1694 let items = match list_val {
1696 Value::List(arr) => arr,
1697 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1698 };
1699
1700 let mut satisfied_count = 0;
1702 for item in &items {
1703 let mut item_row = row.clone();
1705 item_row.insert(variable.clone(), item.clone());
1706
1707 let pred_result = this
1709 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1710 .await?;
1711
1712 if let Value::Bool(true) = pred_result {
1714 satisfied_count += 1;
1715 }
1716 }
1717
1718 let result = match quantifier {
1720 Quantifier::All => satisfied_count == items.len(),
1721 Quantifier::Any => satisfied_count > 0,
1722 Quantifier::Single => satisfied_count == 1,
1723 Quantifier::None => satisfied_count == 0,
1724 };
1725
1726 Ok(Value::Bool(result))
1727 }
1728 Expr::ListComprehension {
1729 variable,
1730 list,
1731 where_clause,
1732 map_expr,
1733 } => {
1734 let list_val = this
1741 .evaluate_expr(list, row, prop_manager, params, ctx)
1742 .await?;
1743
1744 if list_val.is_null() {
1746 return Ok(Value::Null);
1747 }
1748
1749 let items = match list_val {
1751 Value::List(arr) => arr,
1752 _ => {
1753 return Err(anyhow!(
1754 "List comprehension expects a list, got: {:?}",
1755 list_val
1756 ));
1757 }
1758 };
1759
1760 let mut results = Vec::new();
1762 for item in &items {
1763 let mut item_row = row.clone();
1765 item_row.insert(variable.clone(), item.clone());
1766
1767 if let Some(predicate) = where_clause {
1769 let pred_result = this
1770 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1771 .await?;
1772
1773 if !matches!(pred_result, Value::Bool(true)) {
1775 continue;
1776 }
1777 }
1778
1779 let mapped_val = this
1781 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1782 .await?;
1783 results.push(mapped_val);
1784 }
1785
1786 Ok(Value::List(results))
1787 }
1788 Expr::BinaryOp { left, op, right } => {
1789 match op {
1791 BinaryOp::And => {
1792 let l_val = this
1793 .evaluate_expr(left, row, prop_manager, params, ctx)
1794 .await?;
1795 if let Some(false) = l_val.as_bool() {
1797 return Ok(Value::Bool(false));
1798 }
1799 let r_val = this
1800 .evaluate_expr(right, row, prop_manager, params, ctx)
1801 .await?;
1802 eval_binary_op(&l_val, op, &r_val)
1803 }
1804 BinaryOp::Or => {
1805 let l_val = this
1806 .evaluate_expr(left, row, prop_manager, params, ctx)
1807 .await?;
1808 if let Some(true) = l_val.as_bool() {
1810 return Ok(Value::Bool(true));
1811 }
1812 let r_val = this
1813 .evaluate_expr(right, row, prop_manager, params, ctx)
1814 .await?;
1815 eval_binary_op(&l_val, op, &r_val)
1816 }
1817 _ => {
1818 let l_val = this
1820 .evaluate_expr(left, row, prop_manager, params, ctx)
1821 .await?;
1822 let r_val = this
1823 .evaluate_expr(right, row, prop_manager, params, ctx)
1824 .await?;
1825 eval_binary_op(&l_val, op, &r_val)
1826 }
1827 }
1828 }
1829 Expr::In { expr, list } => {
1830 let l_val = this
1831 .evaluate_expr(expr, row, prop_manager, params, ctx)
1832 .await?;
1833 let r_val = this
1834 .evaluate_expr(list, row, prop_manager, params, ctx)
1835 .await?;
1836 eval_in_op(&l_val, &r_val)
1837 }
1838 Expr::UnaryOp { op, expr } => {
1839 let val = this
1840 .evaluate_expr(expr, row, prop_manager, params, ctx)
1841 .await?;
1842 match op {
1843 UnaryOp::Not => {
1844 match val.as_bool() {
1846 Some(b) => Ok(Value::Bool(!b)),
1847 None if val.is_null() => Ok(Value::Null),
1848 None => Err(anyhow!(
1849 "InvalidArgumentType: NOT requires a boolean argument"
1850 )),
1851 }
1852 }
1853 UnaryOp::Neg => {
1854 if let Some(i) = val.as_i64() {
1855 Ok(Value::Int(-i))
1856 } else if let Some(f) = val.as_f64() {
1857 Ok(Value::Float(-f))
1858 } else {
1859 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1860 }
1861 }
1862 }
1863 }
1864 Expr::IsNull(expr) => {
1865 let val = this
1866 .evaluate_expr(expr, row, prop_manager, params, ctx)
1867 .await?;
1868 Ok(Value::Bool(val.is_null()))
1869 }
1870 Expr::IsNotNull(expr) => {
1871 let val = this
1872 .evaluate_expr(expr, row, prop_manager, params, ctx)
1873 .await?;
1874 Ok(Value::Bool(!val.is_null()))
1875 }
1876 Expr::IsUnique(_) => {
1877 Err(anyhow!(
1879 "IS UNIQUE can only be used in constraint definitions"
1880 ))
1881 }
1882 Expr::Case {
1883 expr,
1884 when_then,
1885 else_expr,
1886 } => {
1887 if let Some(base_expr) = expr {
1888 let base_val = this
1889 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1890 .await?;
1891 for (w, t) in when_then {
1892 let w_val = this
1893 .evaluate_expr(w, row, prop_manager, params, ctx)
1894 .await?;
1895 if base_val == w_val {
1896 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1897 }
1898 }
1899 } else {
1900 for (w, t) in when_then {
1901 let w_val = this
1902 .evaluate_expr(w, row, prop_manager, params, ctx)
1903 .await?;
1904 if w_val.as_bool() == Some(true) {
1905 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1906 }
1907 }
1908 }
1909 if let Some(e) = else_expr {
1910 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1911 }
1912 Ok(Value::Null)
1913 }
1914 Expr::Wildcard => Ok(Value::Null),
1915 Expr::FunctionCall { name, args, .. } => {
1916 if name.eq_ignore_ascii_case("ID") {
1918 if args.len() != 1 {
1919 return Err(anyhow!("id() requires exactly 1 argument"));
1920 }
1921 let val = this
1922 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1923 .await?;
1924 if let Value::Map(map) = &val {
1925 if let Some(vid_val) = map.get("_vid") {
1927 return Ok(vid_val.clone());
1928 }
1929 if let Some(eid_val) = map.get("_eid") {
1931 return Ok(eid_val.clone());
1932 }
1933 if let Some(id_val) = map.get("_id") {
1935 return Ok(id_val.clone());
1936 }
1937 }
1938 return Ok(Value::Null);
1939 }
1940
1941 if name.eq_ignore_ascii_case("ELEMENTID") {
1943 if args.len() != 1 {
1944 return Err(anyhow!("elementId() requires exactly 1 argument"));
1945 }
1946 let val = this
1947 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1948 .await?;
1949 if let Value::Map(map) = &val {
1950 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1953 return Ok(Value::String(vid_val.to_string()));
1954 }
1955 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1958 return Ok(Value::String(eid_val.to_string()));
1959 }
1960 }
1961 return Ok(Value::Null);
1962 }
1963
1964 if name.eq_ignore_ascii_case("TYPE") {
1966 if args.len() != 1 {
1967 return Err(anyhow!("type() requires exactly 1 argument"));
1968 }
1969 let val = this
1970 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1971 .await?;
1972 if let Value::Map(map) = &val
1973 && let Some(type_val) = map.get("_type")
1974 {
1975 if let Some(type_id) =
1977 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1978 {
1979 if let Some(name) = this
1980 .storage
1981 .schema_manager()
1982 .edge_type_name_by_id_unified(type_id)
1983 {
1984 return Ok(Value::String(name));
1985 }
1986 } else if let Some(name) = type_val.as_str() {
1987 return Ok(Value::String(name.to_string()));
1988 }
1989 }
1990 return Ok(Value::Null);
1991 }
1992
1993 if name.eq_ignore_ascii_case("LABELS") {
1995 if args.len() != 1 {
1996 return Err(anyhow!("labels() requires exactly 1 argument"));
1997 }
1998 let val = this
1999 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2000 .await?;
2001 if let Value::Map(map) = &val
2002 && let Some(labels_val) = map.get("_labels")
2003 {
2004 return Ok(labels_val.clone());
2005 }
2006 return Ok(Value::Null);
2007 }
2008
2009 if name.eq_ignore_ascii_case("PROPERTIES") {
2011 if args.len() != 1 {
2012 return Err(anyhow!("properties() requires exactly 1 argument"));
2013 }
2014 let val = this
2015 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2016 .await?;
2017 if let Value::Map(map) = &val {
2018 let mut props = HashMap::new();
2020 for (k, v) in map.iter() {
2021 if !k.starts_with('_') {
2022 props.insert(k.clone(), v.clone());
2023 }
2024 }
2025 return Ok(Value::Map(props));
2026 }
2027 return Ok(Value::Null);
2028 }
2029
2030 if name.eq_ignore_ascii_case("STARTNODE") {
2032 if args.len() != 1 {
2033 return Err(anyhow!("startNode() requires exactly 1 argument"));
2034 }
2035 let val = this
2036 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2037 .await?;
2038 if let Value::Edge(edge) = &val {
2039 return Ok(Self::find_node_by_vid(row, edge.src));
2040 }
2041 if let Value::Map(map) = &val {
2042 if let Some(start_node) = map.get("_startNode") {
2043 return Ok(start_node.clone());
2044 }
2045 if let Some(src_vid) = map.get("_src_vid") {
2046 return Ok(Value::Map(HashMap::from([(
2047 "_vid".to_string(),
2048 src_vid.clone(),
2049 )])));
2050 }
2051 if let Some(src_id) = map.get("_src")
2053 && let Some(u) = src_id.as_u64()
2054 {
2055 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2056 }
2057 }
2058 return Ok(Value::Null);
2059 }
2060
2061 if name.eq_ignore_ascii_case("ENDNODE") {
2063 if args.len() != 1 {
2064 return Err(anyhow!("endNode() requires exactly 1 argument"));
2065 }
2066 let val = this
2067 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2068 .await?;
2069 if let Value::Edge(edge) = &val {
2070 return Ok(Self::find_node_by_vid(row, edge.dst));
2071 }
2072 if let Value::Map(map) = &val {
2073 if let Some(end_node) = map.get("_endNode") {
2074 return Ok(end_node.clone());
2075 }
2076 if let Some(dst_vid) = map.get("_dst_vid") {
2077 return Ok(Value::Map(HashMap::from([(
2078 "_vid".to_string(),
2079 dst_vid.clone(),
2080 )])));
2081 }
2082 if let Some(dst_id) = map.get("_dst")
2084 && let Some(u) = dst_id.as_u64()
2085 {
2086 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2087 }
2088 }
2089 return Ok(Value::Null);
2090 }
2091
2092 if name.eq_ignore_ascii_case("HASLABEL") {
2095 if args.len() != 2 {
2096 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2097 }
2098 let node_val = this
2099 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2100 .await?;
2101 let label_val = this
2102 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2103 .await?;
2104
2105 let label_to_check = label_val.as_str().ok_or_else(|| {
2106 anyhow!("Second argument to hasLabel must be a string")
2107 })?;
2108
2109 let has_label = match &node_val {
2110 Value::Map(map) if map.contains_key("_vid") => {
2112 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2113 labels_arr
2114 .iter()
2115 .any(|l| l.as_str() == Some(label_to_check))
2116 } else {
2117 false
2118 }
2119 }
2120 Value::Map(map) => {
2122 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2123 labels_arr
2124 .iter()
2125 .any(|l| l.as_str() == Some(label_to_check))
2126 } else {
2127 false
2128 }
2129 }
2130 _ => false,
2131 };
2132 return Ok(Value::Bool(has_label));
2133 }
2134
2135 if matches!(
2138 name.to_uppercase().as_str(),
2139 "ANY" | "ALL" | "NONE" | "SINGLE"
2140 ) {
2141 return Err(anyhow!(
2142 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2143 name.to_lowercase()
2144 ));
2145 }
2146
2147 if name.eq_ignore_ascii_case("COALESCE") {
2149 for arg in args {
2150 let val = this
2151 .evaluate_expr(arg, row, prop_manager, params, ctx)
2152 .await?;
2153 if !val.is_null() {
2154 return Ok(val);
2155 }
2156 }
2157 return Ok(Value::Null);
2158 }
2159
2160 if name.eq_ignore_ascii_case("vector_similarity") {
2162 if args.len() != 2 {
2163 return Err(anyhow!("vector_similarity takes 2 arguments"));
2164 }
2165 let v1 = this
2166 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2167 .await?;
2168 let v2 = this
2169 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2170 .await?;
2171 return eval_vector_similarity(&v1, &v2);
2172 }
2173
2174 if name.eq_ignore_ascii_case("uni.temporal.validAt")
2176 || name.eq_ignore_ascii_case("uni.validAt")
2177 || name.eq_ignore_ascii_case("validAt")
2178 {
2179 if args.len() != 4 {
2180 return Err(anyhow!("validAt requires 4 arguments"));
2181 }
2182 let node_val = this
2183 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2184 .await?;
2185 let start_prop = this
2186 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2187 .await?
2188 .as_str()
2189 .ok_or(anyhow!("start_prop must be string"))?
2190 .to_string();
2191 let end_prop = this
2192 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2193 .await?
2194 .as_str()
2195 .ok_or(anyhow!("end_prop must be string"))?
2196 .to_string();
2197 let time_val = this
2198 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2199 .await?;
2200
2201 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2202 anyhow!("time argument must be a datetime value or string")
2203 })?;
2204
2205 let valid_from_val: Option<Value> = if let Ok(vid) =
2207 Self::vid_from_value(&node_val)
2208 {
2209 prop_manager
2211 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2212 .await
2213 .ok()
2214 } else if let Value::Map(map) = &node_val {
2215 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2217 let vid = Vid::from(vid_val);
2218 prop_manager
2219 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2220 .await
2221 .ok()
2222 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2223 let eid = uni_common::core::id::Eid::from(eid_val);
2225 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2226 } else {
2227 map.get(&start_prop).cloned()
2229 }
2230 } else {
2231 return Ok(Value::Bool(false));
2232 };
2233
2234 let valid_from = match valid_from_val {
2235 Some(ref v) => match value_to_datetime_utc(v) {
2236 Some(dt) => dt,
2237 None if v.is_null() => return Ok(Value::Bool(false)),
2238 None => {
2239 return Err(anyhow!(
2240 "Property {} must be a datetime value or string",
2241 start_prop
2242 ));
2243 }
2244 },
2245 None => return Ok(Value::Bool(false)),
2246 };
2247
2248 let valid_to_val: Option<Value> = if let Ok(vid) =
2249 Self::vid_from_value(&node_val)
2250 {
2251 prop_manager
2253 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2254 .await
2255 .ok()
2256 } else if let Value::Map(map) = &node_val {
2257 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2259 let vid = Vid::from(vid_val);
2260 prop_manager
2261 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2262 .await
2263 .ok()
2264 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2265 let eid = uni_common::core::id::Eid::from(eid_val);
2267 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2268 } else {
2269 map.get(&end_prop).cloned()
2271 }
2272 } else {
2273 return Ok(Value::Bool(false));
2274 };
2275
2276 let valid_to = match valid_to_val {
2277 Some(ref v) => match value_to_datetime_utc(v) {
2278 Some(dt) => Some(dt),
2279 None if v.is_null() => None,
2280 None => {
2281 return Err(anyhow!(
2282 "Property {} must be a datetime value or null",
2283 end_prop
2284 ));
2285 }
2286 },
2287 None => None,
2288 };
2289
2290 let is_valid = valid_from <= query_time
2291 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2292 return Ok(Value::Bool(is_valid));
2293 }
2294
2295 let mut evaluated_args = Vec::with_capacity(args.len());
2297 for arg in args {
2298 let mut val = this
2299 .evaluate_expr(arg, row, prop_manager, params, ctx)
2300 .await?;
2301
2302 if let Value::Map(ref mut map) = val {
2305 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2306 }
2307
2308 evaluated_args.push(val);
2309 }
2310 eval_scalar_function(
2311 name,
2312 &evaluated_args,
2313 self.custom_function_registry.as_deref(),
2314 )
2315 }
2316 Expr::Reduce {
2317 accumulator,
2318 init,
2319 variable,
2320 list,
2321 expr,
2322 } => {
2323 let mut acc = self
2324 .evaluate_expr(init, row, prop_manager, params, ctx)
2325 .await?;
2326 let list_val = self
2327 .evaluate_expr(list, row, prop_manager, params, ctx)
2328 .await?;
2329
2330 if let Value::List(items) = list_val {
2331 for item in items {
2332 let mut scope = row.clone();
2336 scope.insert(accumulator.clone(), acc.clone());
2337 scope.insert(variable.clone(), item);
2338
2339 acc = self
2340 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2341 .await?;
2342 }
2343 } else {
2344 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2345 }
2346 Ok(acc)
2347 }
2348 Expr::ValidAt { .. } => {
2349 Err(anyhow!(
2351 "VALID_AT expression should have been transformed to function call in planner"
2352 ))
2353 }
2354
2355 Expr::LabelCheck { expr, labels } => {
2356 let val = this
2357 .evaluate_expr(expr, row, prop_manager, params, ctx)
2358 .await?;
2359 match &val {
2360 Value::Null => Ok(Value::Null),
2361 Value::Map(map) => {
2362 let is_edge = map.contains_key("_eid")
2364 || map.contains_key("_type_name")
2365 || (map.contains_key("_type") && !map.contains_key("_vid"));
2366
2367 if is_edge {
2368 if labels.len() > 1 {
2370 return Ok(Value::Bool(false));
2371 }
2372 let label_to_check = &labels[0];
2373 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2374 {
2375 t == label_to_check
2376 } else if let Some(Value::String(t)) = map.get("_type") {
2377 t == label_to_check
2378 } else {
2379 false
2380 };
2381 Ok(Value::Bool(has_type))
2382 } else {
2383 let has_all = labels.iter().all(|label_to_check| {
2385 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2386 labels_arr
2387 .iter()
2388 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2389 } else {
2390 false
2391 }
2392 });
2393 Ok(Value::Bool(has_all))
2394 }
2395 }
2396 _ => Ok(Value::Bool(false)),
2397 }
2398 }
2399
2400 Expr::MapProjection { base, items } => {
2401 let base_value = this
2402 .evaluate_expr(base, row, prop_manager, params, ctx)
2403 .await?;
2404
2405 let properties = match &base_value {
2407 Value::Map(map) => map,
2408 _ => {
2409 return Err(anyhow!(
2410 "Map projection requires object, got {:?}",
2411 base_value
2412 ));
2413 }
2414 };
2415
2416 let mut result_map = HashMap::new();
2417
2418 for item in items {
2419 match item {
2420 MapProjectionItem::Property(prop) => {
2421 if let Some(value) = properties.get(prop.as_str()) {
2422 result_map.insert(prop.clone(), value.clone());
2423 }
2424 }
2425 MapProjectionItem::AllProperties => {
2426 for (key, value) in properties.iter() {
2428 if !key.starts_with('_') {
2429 result_map.insert(key.clone(), value.clone());
2430 }
2431 }
2432 }
2433 MapProjectionItem::LiteralEntry(key, expr) => {
2434 let value = this
2435 .evaluate_expr(expr, row, prop_manager, params, ctx)
2436 .await?;
2437 result_map.insert(key.clone(), value);
2438 }
2439 MapProjectionItem::Variable(var_name) => {
2440 if let Some(value) = row.get(var_name.as_str()) {
2443 result_map.insert(var_name.clone(), value.clone());
2444 }
2445 }
2446 }
2447 }
2448
2449 Ok(Value::Map(result_map))
2450 }
2451 }
2452 })
2453 }
2454
2455 pub(crate) fn execute_subplan<'a>(
2456 &'a self,
2457 plan: LogicalPlan,
2458 prop_manager: &'a PropertyManager,
2459 params: &'a HashMap<String, Value>,
2460 ctx: Option<&'a QueryContext>,
2461 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2462 Box::pin(async move {
2463 if let Some(ctx) = ctx {
2464 ctx.check_timeout()?;
2465 }
2466 match plan {
2467 LogicalPlan::Union { left, right, all } => {
2468 self.execute_union(left, right, all, prop_manager, params, ctx)
2469 .await
2470 }
2471 LogicalPlan::CreateVectorIndex {
2472 config,
2473 if_not_exists,
2474 } => {
2475 if if_not_exists && self.index_exists_by_name(&config.name) {
2476 return Ok(vec![]);
2477 }
2478 let idx_mgr = self.storage.index_manager();
2479 idx_mgr.create_vector_index(config).await?;
2480 Ok(vec![])
2481 }
2482 LogicalPlan::CreateFullTextIndex {
2483 config,
2484 if_not_exists,
2485 } => {
2486 if if_not_exists && self.index_exists_by_name(&config.name) {
2487 return Ok(vec![]);
2488 }
2489 let idx_mgr = self.storage.index_manager();
2490 idx_mgr.create_fts_index(config).await?;
2491 Ok(vec![])
2492 }
2493 LogicalPlan::CreateScalarIndex {
2494 mut config,
2495 if_not_exists,
2496 } => {
2497 if if_not_exists && self.index_exists_by_name(&config.name) {
2498 return Ok(vec![]);
2499 }
2500
2501 let mut modified_properties = Vec::new();
2503
2504 for prop in &config.properties {
2505 if prop.contains('(') && prop.contains(')') {
2507 let gen_col = SchemaManager::generated_column_name(prop);
2508
2509 let sm = self.storage.schema_manager_arc();
2511 if let Err(e) = sm.add_generated_property(
2512 &config.label,
2513 &gen_col,
2514 DataType::String, prop.clone(),
2516 ) {
2517 log::warn!("Failed to add generated property (might exist): {}", e);
2518 }
2519
2520 modified_properties.push(gen_col);
2521 } else {
2522 modified_properties.push(prop.clone());
2524 }
2525 }
2526
2527 config.properties = modified_properties;
2528
2529 let idx_mgr = self.storage.index_manager();
2530 idx_mgr.create_scalar_index(config).await?;
2531 Ok(vec![])
2532 }
2533 LogicalPlan::CreateJsonFtsIndex {
2534 config,
2535 if_not_exists,
2536 } => {
2537 if if_not_exists && self.index_exists_by_name(&config.name) {
2538 return Ok(vec![]);
2539 }
2540 let idx_mgr = self.storage.index_manager();
2541 idx_mgr.create_json_fts_index(config).await?;
2542 Ok(vec![])
2543 }
2544 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2545 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2546 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2547 LogicalPlan::Vacuum => {
2548 self.execute_vacuum().await?;
2549 Ok(vec![])
2550 }
2551 LogicalPlan::Checkpoint => {
2552 self.execute_checkpoint().await?;
2553 Ok(vec![])
2554 }
2555 LogicalPlan::CopyTo {
2556 label,
2557 path,
2558 format,
2559 options,
2560 } => {
2561 let count = self
2562 .execute_copy_to(&label, &path, &format, &options)
2563 .await?;
2564 let mut result = HashMap::new();
2565 result.insert("count".to_string(), Value::Int(count as i64));
2566 Ok(vec![result])
2567 }
2568 LogicalPlan::CopyFrom {
2569 label,
2570 path,
2571 format,
2572 options,
2573 } => {
2574 let count = self
2575 .execute_copy_from(&label, &path, &format, &options)
2576 .await?;
2577 let mut result = HashMap::new();
2578 result.insert("count".to_string(), Value::Int(count as i64));
2579 Ok(vec![result])
2580 }
2581 LogicalPlan::CreateLabel(clause) => {
2582 self.execute_create_label(clause).await?;
2583 Ok(vec![])
2584 }
2585 LogicalPlan::CreateEdgeType(clause) => {
2586 self.execute_create_edge_type(clause).await?;
2587 Ok(vec![])
2588 }
2589 LogicalPlan::AlterLabel(clause) => {
2590 self.execute_alter_label(clause).await?;
2591 Ok(vec![])
2592 }
2593 LogicalPlan::AlterEdgeType(clause) => {
2594 self.execute_alter_edge_type(clause).await?;
2595 Ok(vec![])
2596 }
2597 LogicalPlan::DropLabel(clause) => {
2598 self.execute_drop_label(clause).await?;
2599 Ok(vec![])
2600 }
2601 LogicalPlan::DropEdgeType(clause) => {
2602 self.execute_drop_edge_type(clause).await?;
2603 Ok(vec![])
2604 }
2605 LogicalPlan::CreateConstraint(clause) => {
2606 self.execute_create_constraint(clause).await?;
2607 Ok(vec![])
2608 }
2609 LogicalPlan::DropConstraint(clause) => {
2610 self.execute_drop_constraint(clause).await?;
2611 Ok(vec![])
2612 }
2613 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2614 LogicalPlan::DropIndex { name, if_exists } => {
2615 let idx_mgr = self.storage.index_manager();
2616 match idx_mgr.drop_index(&name).await {
2617 Ok(_) => Ok(vec![]),
2618 Err(e) => {
2619 if if_exists && e.to_string().contains("not found") {
2620 Ok(vec![])
2621 } else {
2622 Err(e)
2623 }
2624 }
2625 }
2626 }
2627 LogicalPlan::ShowIndexes { filter } => {
2628 Ok(self.execute_show_indexes(filter.as_deref()))
2629 }
2630 LogicalPlan::Scan { .. }
2633 | LogicalPlan::FusedIndexScan { .. }
2634 | LogicalPlan::FusedIndexScanWrapped { .. }
2635 | LogicalPlan::ExtIdLookup { .. }
2636 | LogicalPlan::ScanAll { .. }
2637 | LogicalPlan::ScanMainByLabels { .. }
2638 | LogicalPlan::Traverse { .. }
2639 | LogicalPlan::TraverseMainByType { .. } => {
2640 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2641 self.record_batches_to_rows(batches)
2642 }
2643 LogicalPlan::Filter {
2644 input,
2645 predicate,
2646 optional_variables,
2647 } => {
2648 let input_matches = self
2649 .execute_subplan(*input, prop_manager, params, ctx)
2650 .await?;
2651
2652 tracing::debug!(
2653 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2654 predicate,
2655 input_matches.len(),
2656 optional_variables
2657 );
2658
2659 if !optional_variables.is_empty() {
2663 let is_optional_key = |k: &str| -> bool {
2666 optional_variables.contains(k)
2667 || optional_variables
2668 .iter()
2669 .any(|var| k.starts_with(&format!("{}.", var)))
2670 };
2671
2672 let is_internal_key =
2674 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2675
2676 let non_optional_vars: Vec<String> = input_matches
2678 .first()
2679 .map(|row| {
2680 row.keys()
2681 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2682 .cloned()
2683 .collect()
2684 })
2685 .unwrap_or_default();
2686
2687 let mut groups: std::collections::HashMap<
2689 Vec<u8>,
2690 Vec<HashMap<String, Value>>,
2691 > = std::collections::HashMap::new();
2692
2693 for row in &input_matches {
2694 let key: Vec<u8> = non_optional_vars
2696 .iter()
2697 .map(|var| {
2698 row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2699 })
2700 .collect::<Vec<_>>()
2701 .join("|")
2702 .into_bytes();
2703
2704 groups.entry(key).or_default().push(row.clone());
2705 }
2706
2707 let mut filtered = Vec::new();
2708 for (_key, group_rows) in groups {
2709 let mut group_passed = Vec::new();
2710
2711 for row in &group_rows {
2712 let has_null_optional = optional_variables.iter().any(|var| {
2714 let direct_null =
2716 matches!(row.get(var), Some(Value::Null) | None);
2717 let prefixed_null = row
2718 .keys()
2719 .filter(|k| k.starts_with(&format!("{}.", var)))
2720 .any(|k| matches!(row.get(k), Some(Value::Null)));
2721 direct_null || prefixed_null
2722 });
2723
2724 if has_null_optional {
2725 group_passed.push(row.clone());
2726 continue;
2727 }
2728
2729 let res = self
2730 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2731 .await?;
2732
2733 if res.as_bool().unwrap_or(false) {
2734 group_passed.push(row.clone());
2735 }
2736 }
2737
2738 if group_passed.is_empty() {
2739 if let Some(template) = group_rows.first() {
2742 let mut null_row = HashMap::new();
2743 for (k, v) in template {
2744 if is_optional_key(k) {
2745 null_row.insert(k.clone(), Value::Null);
2746 } else {
2747 null_row.insert(k.clone(), v.clone());
2748 }
2749 }
2750 filtered.push(null_row);
2751 }
2752 } else {
2753 filtered.extend(group_passed);
2754 }
2755 }
2756
2757 tracing::debug!(
2758 "Filter (OPTIONAL): {} input rows -> {} output rows",
2759 input_matches.len(),
2760 filtered.len()
2761 );
2762
2763 return Ok(filtered);
2764 }
2765
2766 let mut filtered = Vec::new();
2768 for row in input_matches.iter() {
2769 let res = self
2770 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2771 .await?;
2772
2773 let passes = res.as_bool().unwrap_or(false);
2774
2775 if passes {
2776 filtered.push(row.clone());
2777 }
2778 }
2779
2780 tracing::debug!(
2781 "Filter: {} input rows -> {} output rows",
2782 input_matches.len(),
2783 filtered.len()
2784 );
2785
2786 Ok(filtered)
2787 }
2788 LogicalPlan::ProcedureCall {
2789 procedure_name,
2790 arguments,
2791 yield_items,
2792 } => {
2793 let yield_names: Vec<String> =
2794 yield_items.iter().map(|(n, _)| n.clone()).collect();
2795 let results = self
2796 .execute_procedure(
2797 &procedure_name,
2798 &arguments,
2799 &yield_names,
2800 prop_manager,
2801 params,
2802 ctx,
2803 )
2804 .await?;
2805
2806 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2812 if !has_aliases {
2813 Ok(results)
2816 } else {
2817 let mut aliased_results = Vec::with_capacity(results.len());
2818 for row in results {
2819 let mut new_row = HashMap::new();
2820 for (name, alias) in &yield_items {
2821 let col_name = alias.as_ref().unwrap_or(name);
2822 let val = row.get(name).cloned().unwrap_or(Value::Null);
2823 new_row.insert(col_name.clone(), val);
2824 }
2825 aliased_results.push(new_row);
2826 }
2827 Ok(aliased_results)
2828 }
2829 }
2830 LogicalPlan::VectorKnn { .. } => {
2831 unreachable!("VectorKnn is handled by DataFusion engine")
2832 }
2833 LogicalPlan::InvertedIndexLookup { .. } => {
2834 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2835 }
2836 LogicalPlan::Sort { input, order_by } => {
2837 let rows = self
2838 .execute_subplan(*input, prop_manager, params, ctx)
2839 .await?;
2840 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2841 .await
2842 }
2843 LogicalPlan::Limit { input, skip, fetch } => {
2844 let rows = self
2845 .execute_subplan(*input, prop_manager, params, ctx)
2846 .await?;
2847 let skip = skip.unwrap_or(0);
2848 let take = fetch.unwrap_or(usize::MAX);
2849 Ok(rows.into_iter().skip(skip).take(take).collect())
2850 }
2851 LogicalPlan::Aggregate {
2852 input,
2853 group_by,
2854 aggregates,
2855 } => {
2856 let rows = self
2857 .execute_subplan(*input, prop_manager, params, ctx)
2858 .await?;
2859 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2860 .await
2861 }
2862 LogicalPlan::Window {
2863 input,
2864 window_exprs,
2865 } => {
2866 let rows = self
2867 .execute_subplan(*input, prop_manager, params, ctx)
2868 .await?;
2869 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2870 .await
2871 }
2872 LogicalPlan::Project { input, projections } => {
2873 let matches = self
2874 .execute_subplan(*input, prop_manager, params, ctx)
2875 .await?;
2876 self.execute_project(matches, &projections, prop_manager, params, ctx)
2877 .await
2878 }
2879 LogicalPlan::Distinct { input } => {
2880 let rows = self
2881 .execute_subplan(*input, prop_manager, params, ctx)
2882 .await?;
2883 let mut seen = std::collections::HashSet::new();
2884 let mut result = Vec::new();
2885 for row in rows {
2886 let key = Self::canonical_row_key(&row);
2887 if seen.insert(key) {
2888 result.push(row);
2889 }
2890 }
2891 Ok(result)
2892 }
2893 LogicalPlan::Unwind {
2894 input,
2895 expr,
2896 variable,
2897 } => {
2898 let input_rows = self
2899 .execute_subplan(*input, prop_manager, params, ctx)
2900 .await?;
2901 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2902 .await
2903 }
2904 LogicalPlan::Apply {
2905 input,
2906 subquery,
2907 input_filter,
2908 } => {
2909 let input_rows = self
2910 .execute_subplan(*input, prop_manager, params, ctx)
2911 .await?;
2912 self.execute_apply(
2913 input_rows,
2914 &subquery,
2915 input_filter.as_ref(),
2916 prop_manager,
2917 params,
2918 ctx,
2919 )
2920 .await
2921 }
2922 LogicalPlan::SubqueryCall { input, subquery } => {
2923 let input_rows = self
2924 .execute_subplan(*input, prop_manager, params, ctx)
2925 .await?;
2926 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2929 .await
2930 }
2931 LogicalPlan::RecursiveCTE {
2932 cte_name,
2933 initial,
2934 recursive,
2935 } => {
2936 self.execute_recursive_cte(
2937 &cte_name,
2938 *initial,
2939 *recursive,
2940 prop_manager,
2941 params,
2942 ctx,
2943 )
2944 .await
2945 }
2946 LogicalPlan::CrossJoin { left, right } => {
2947 self.execute_cross_join(left, right, prop_manager, params, ctx)
2948 .await
2949 }
2950 LogicalPlan::Set { .. }
2951 | LogicalPlan::Remove { .. }
2952 | LogicalPlan::Merge { .. }
2953 | LogicalPlan::Create { .. }
2954 | LogicalPlan::CreateBatch { .. } => {
2955 unreachable!("mutations are handled by DataFusion engine")
2956 }
2957 LogicalPlan::Delete { .. } => {
2958 unreachable!("mutations are handled by DataFusion engine")
2959 }
2960 LogicalPlan::Copy {
2961 target,
2962 source,
2963 is_export,
2964 options,
2965 } => {
2966 if is_export {
2967 self.execute_export(&target, &source, &options, prop_manager, ctx)
2968 .await
2969 } else {
2970 self.execute_copy(&target, &source, &options, prop_manager)
2971 .await
2972 }
2973 }
2974 LogicalPlan::Backup {
2975 destination,
2976 options,
2977 } => self.execute_backup(&destination, &options).await,
2978 LogicalPlan::Explain { plan } => {
2979 let plan_str = format!("{:#?}", plan);
2980 let mut row = HashMap::new();
2981 row.insert("plan".to_string(), Value::String(plan_str));
2982 Ok(vec![row])
2983 }
2984 LogicalPlan::ShortestPath { .. } => {
2985 unreachable!("ShortestPath is handled by DataFusion engine")
2986 }
2987 LogicalPlan::AllShortestPaths { .. } => {
2988 unreachable!("AllShortestPaths is handled by DataFusion engine")
2989 }
2990 LogicalPlan::Foreach { .. } => {
2991 unreachable!("mutations are handled by DataFusion engine")
2992 }
2993 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2994 LogicalPlan::BindZeroLengthPath { .. } => {
2995 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2996 }
2997 LogicalPlan::BindPath { .. } => {
2998 unreachable!("BindPath is handled by DataFusion engine")
2999 }
3000 LogicalPlan::QuantifiedPattern { .. } => {
3001 unreachable!("QuantifiedPattern is handled by DataFusion engine")
3002 }
3003 LogicalPlan::LocyProgram { .. }
3004 | LogicalPlan::LocyFold { .. }
3005 | LogicalPlan::LocyBestBy { .. }
3006 | LogicalPlan::LocyPriority { .. }
3007 | LogicalPlan::LocyDerivedScan { .. }
3008 | LogicalPlan::LocyProject { .. }
3009 | LogicalPlan::LocyModelInvoke { .. } => {
3010 unreachable!("Locy operators are handled by DataFusion engine")
3011 }
3012 }
3013 })
3014 }
3015
3016 #[expect(clippy::too_many_arguments)]
3021 pub(crate) async fn execute_foreach_body_plan(
3022 &self,
3023 plan: LogicalPlan,
3024 scope: &mut HashMap<String, Value>,
3025 writer: &uni_store::runtime::writer::Writer,
3026 prop_manager: &PropertyManager,
3027 params: &HashMap<String, Value>,
3028 ctx: Option<&QueryContext>,
3029 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3030 ) -> Result<()> {
3031 match plan {
3032 LogicalPlan::Set { items, .. } => {
3033 self.execute_set_items_locked(
3034 &items,
3035 scope,
3036 writer,
3037 prop_manager,
3038 params,
3039 ctx,
3040 tx_l0,
3041 &crate::query::df_graph::mutation_common::Prefetch::default(),
3042 )
3043 .await?;
3044 }
3045 LogicalPlan::Remove { items, .. } => {
3046 self.execute_remove_items_locked(
3047 &items,
3048 scope,
3049 writer,
3050 prop_manager,
3051 ctx,
3052 tx_l0,
3053 &crate::query::df_graph::mutation_common::Prefetch::default(),
3054 )
3055 .await?;
3056 }
3057 LogicalPlan::Delete { items, detach, .. } => {
3058 for expr in &items {
3059 let val = self
3060 .evaluate_expr(expr, scope, prop_manager, params, ctx)
3061 .await?;
3062 self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3063 .await?;
3064 }
3065 }
3066 LogicalPlan::Create { pattern, .. } => {
3067 self.execute_create_pattern(
3068 &pattern,
3069 scope,
3070 writer,
3071 prop_manager,
3072 params,
3073 ctx,
3074 tx_l0,
3075 None,
3076 )
3077 .await?;
3078 }
3079 LogicalPlan::CreateBatch { patterns, .. } => {
3080 for pattern in &patterns {
3081 self.execute_create_pattern(
3082 pattern,
3083 scope,
3084 writer,
3085 prop_manager,
3086 params,
3087 ctx,
3088 tx_l0,
3089 None,
3090 )
3091 .await?;
3092 }
3093 }
3094 LogicalPlan::Merge {
3095 pattern,
3096 on_match: _,
3097 on_create,
3098 ..
3099 } => {
3100 let seed_props = self
3104 .on_create_seed_props(on_create.as_ref(), scope, prop_manager, params, ctx)
3105 .await?;
3106 self.execute_create_pattern(
3107 &pattern,
3108 scope,
3109 writer,
3110 prop_manager,
3111 params,
3112 ctx,
3113 tx_l0,
3114 Some(&seed_props),
3115 )
3116 .await?;
3117 if let Some(on_create_clause) = on_create {
3118 self.execute_set_items_locked(
3119 &on_create_clause.items,
3120 scope,
3121 writer,
3122 prop_manager,
3123 params,
3124 ctx,
3125 tx_l0,
3126 &crate::query::df_graph::mutation_common::Prefetch::default(),
3127 )
3128 .await?;
3129 }
3130 }
3131 LogicalPlan::Foreach {
3132 variable,
3133 list,
3134 body,
3135 ..
3136 } => {
3137 let list_val = self
3138 .evaluate_expr(&list, scope, prop_manager, params, ctx)
3139 .await?;
3140 let items = match list_val {
3141 Value::List(arr) => arr,
3142 Value::Null => return Ok(()),
3143 _ => return Err(anyhow!("FOREACH requires a list")),
3144 };
3145 for item in items {
3146 let mut nested_scope = scope.clone();
3147 nested_scope.insert(variable.clone(), item);
3148 for nested_plan in &body {
3149 Box::pin(self.execute_foreach_body_plan(
3150 nested_plan.clone(),
3151 &mut nested_scope,
3152 writer,
3153 prop_manager,
3154 params,
3155 ctx,
3156 tx_l0,
3157 ))
3158 .await?;
3159 }
3160 }
3161 }
3162 _ => {
3163 return Err(anyhow!(
3164 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3165 ));
3166 }
3167 }
3168 Ok(())
3169 }
3170
3171 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3172 let mut pairs: Vec<_> = row.iter().collect();
3173 pairs.sort_by_key(|(k, _)| *k);
3174
3175 pairs
3176 .into_iter()
3177 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3178 .collect::<Vec<_>>()
3179 .join("|")
3180 }
3181
3182 fn canonical_value_key(v: &Value) -> String {
3183 match v {
3184 Value::Null => "null".to_string(),
3185 Value::Bool(b) => format!("b:{b}"),
3186 Value::Int(i) => format!("n:{i}"),
3187 Value::Float(f) => {
3188 if f.is_nan() {
3189 "nan".to_string()
3190 } else if f.is_infinite() {
3191 if f.is_sign_positive() {
3192 "inf:+".to_string()
3193 } else {
3194 "inf:-".to_string()
3195 }
3196 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3197 format!("n:{}", *f as i64)
3198 } else {
3199 format!("f:{f}")
3200 }
3201 }
3202 Value::String(s) => {
3203 if let Some(k) = Self::temporal_string_key(s) {
3204 format!("temporal:{k}")
3205 } else {
3206 format!("s:{s}")
3207 }
3208 }
3209 Value::Bytes(b) => format!("bytes:{:?}", b),
3210 Value::List(items) => format!(
3211 "list:[{}]",
3212 items
3213 .iter()
3214 .map(Self::canonical_value_key)
3215 .collect::<Vec<_>>()
3216 .join(",")
3217 ),
3218 Value::Map(map) => {
3219 let mut pairs: Vec<_> = map.iter().collect();
3220 pairs.sort_by_key(|(k, _)| *k);
3221 format!(
3222 "map:{{{}}}",
3223 pairs
3224 .into_iter()
3225 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3226 .collect::<Vec<_>>()
3227 .join(",")
3228 )
3229 }
3230 Value::Node(n) => {
3231 let mut labels = n.labels.clone();
3232 labels.sort();
3233 format!(
3234 "node:{}:{}:{}",
3235 n.vid.as_u64(),
3236 labels.join(":"),
3237 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3238 )
3239 }
3240 Value::Edge(e) => format!(
3241 "edge:{}:{}:{}:{}:{}",
3242 e.eid.as_u64(),
3243 e.edge_type,
3244 e.src.as_u64(),
3245 e.dst.as_u64(),
3246 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3247 ),
3248 Value::Path(p) => format!(
3249 "path:nodes=[{}];edges=[{}]",
3250 p.nodes
3251 .iter()
3252 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3253 .collect::<Vec<_>>()
3254 .join(","),
3255 p.edges
3256 .iter()
3257 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3258 .collect::<Vec<_>>()
3259 .join(",")
3260 ),
3261 Value::Vector(vs) => format!("vec:{:?}", vs),
3262 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3263 _ => format!("{v:?}"),
3264 }
3265 }
3266
3267 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3268 match t {
3269 uni_common::TemporalValue::Date { days_since_epoch } => {
3270 format!("date:{days_since_epoch}")
3271 }
3272 uni_common::TemporalValue::LocalTime {
3273 nanos_since_midnight,
3274 } => format!("localtime:{nanos_since_midnight}"),
3275 uni_common::TemporalValue::Time {
3276 nanos_since_midnight,
3277 offset_seconds,
3278 } => {
3279 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3280 format!("time:{utc_nanos}")
3281 }
3282 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3283 format!("localdatetime:{nanos_since_epoch}")
3284 }
3285 uni_common::TemporalValue::DateTime {
3286 nanos_since_epoch, ..
3287 } => format!("datetime:{nanos_since_epoch}"),
3288 uni_common::TemporalValue::Duration {
3289 months,
3290 days,
3291 nanos,
3292 } => format!("duration:{months}:{days}:{nanos}"),
3293 uni_common::TemporalValue::Btic { lo, hi, meta } => {
3294 format!("btic:{lo}:{hi}:{meta}")
3295 }
3296 }
3297 }
3298
3299 fn temporal_string_key(s: &str) -> Option<String> {
3300 let fn_name = match classify_temporal(s)? {
3301 uni_common::TemporalType::Date => "DATE",
3302 uni_common::TemporalType::LocalTime => "LOCALTIME",
3303 uni_common::TemporalType::Time => "TIME",
3304 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3305 uni_common::TemporalType::DateTime => "DATETIME",
3306 uni_common::TemporalType::Duration => "DURATION",
3307 uni_common::TemporalType::Btic => return None, };
3309 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3310 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3311 _ => None,
3312 }
3313 }
3314
3315 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3318
3319 pub(crate) async fn execute_aggregate(
3320 &self,
3321 rows: Vec<HashMap<String, Value>>,
3322 group_by: &[Expr],
3323 aggregates: &[Expr],
3324 prop_manager: &PropertyManager,
3325 params: &HashMap<String, Value>,
3326 ctx: Option<&QueryContext>,
3327 ) -> Result<Vec<HashMap<String, Value>>> {
3328 if let Some(ctx) = ctx {
3330 ctx.check_timeout()?;
3331 }
3332
3333 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3334
3335 if rows.is_empty() {
3338 if group_by.is_empty() {
3339 let accs = Self::create_accumulators(aggregates);
3340 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3341 return Ok(vec![row]);
3342 }
3343 return Ok(vec![]);
3344 }
3345
3346 for (idx, row) in rows.into_iter().enumerate() {
3347 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3349 && let Some(ctx) = ctx
3350 {
3351 ctx.check_timeout()?;
3352 }
3353
3354 let key_vals = self
3355 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3356 .await?;
3357 let key_str = format!(
3360 "[{}]",
3361 key_vals
3362 .iter()
3363 .map(Self::canonical_value_key)
3364 .collect::<Vec<_>>()
3365 .join(",")
3366 );
3367
3368 let entry = groups
3369 .entry(key_str)
3370 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3371
3372 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3373 .await?;
3374 }
3375
3376 let results = groups
3377 .values()
3378 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3379 .collect();
3380
3381 Ok(results)
3382 }
3383
3384 pub(crate) async fn execute_window(
3385 &self,
3386 mut rows: Vec<HashMap<String, Value>>,
3387 window_exprs: &[Expr],
3388 _prop_manager: &PropertyManager,
3389 _params: &HashMap<String, Value>,
3390 ctx: Option<&QueryContext>,
3391 ) -> Result<Vec<HashMap<String, Value>>> {
3392 if let Some(ctx) = ctx {
3394 ctx.check_timeout()?;
3395 }
3396
3397 if rows.is_empty() || window_exprs.is_empty() {
3399 return Ok(rows);
3400 }
3401
3402 for window_expr in window_exprs {
3404 let Expr::FunctionCall {
3406 name,
3407 args,
3408 window_spec: Some(window_spec),
3409 ..
3410 } = window_expr
3411 else {
3412 return Err(anyhow!(
3413 "Window expression must be a FunctionCall with OVER clause: {:?}",
3414 window_expr
3415 ));
3416 };
3417
3418 let name_upper = name.to_uppercase();
3419
3420 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3422 return Err(anyhow!(
3423 "Unsupported window function: {}. Supported functions: {}",
3424 name,
3425 WINDOW_FUNCTIONS.join(", ")
3426 ));
3427 }
3428
3429 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3431
3432 for (row_idx, row) in rows.iter().enumerate() {
3433 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3435 vec![]
3437 } else {
3438 window_spec
3439 .partition_by
3440 .iter()
3441 .map(|expr| self.evaluate_simple_expr(expr, row))
3442 .collect::<Result<Vec<_>>>()?
3443 };
3444
3445 partition_map
3446 .entry(partition_key)
3447 .or_default()
3448 .push(row_idx);
3449 }
3450
3451 for (_partition_key, row_indices) in partition_map.iter_mut() {
3453 if !window_spec.order_by.is_empty() {
3455 row_indices.sort_by(|&a, &b| {
3456 for sort_item in &window_spec.order_by {
3457 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3458 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3459
3460 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3461 let cmp = Executor::compare_values(&va, &vb);
3462 let cmp = if sort_item.ascending {
3463 cmp
3464 } else {
3465 cmp.reverse()
3466 };
3467 if cmp != std::cmp::Ordering::Equal {
3468 return cmp;
3469 }
3470 }
3471 }
3472 std::cmp::Ordering::Equal
3473 });
3474 }
3475
3476 for (position, &row_idx) in row_indices.iter().enumerate() {
3478 let window_value = match name_upper.as_str() {
3479 "ROW_NUMBER" => Value::from((position + 1) as i64),
3480 "RANK" => {
3481 let rank = if position == 0 {
3483 1i64
3484 } else {
3485 let prev_row_idx = row_indices[position - 1];
3486 let same_as_prev = self.rows_have_same_sort_keys(
3487 &window_spec.order_by,
3488 &rows,
3489 row_idx,
3490 prev_row_idx,
3491 );
3492
3493 if same_as_prev {
3494 let mut group_start = position - 1;
3496 while group_start > 0 {
3497 let curr_idx = row_indices[group_start];
3498 let prev_idx = row_indices[group_start - 1];
3499 if !self.rows_have_same_sort_keys(
3500 &window_spec.order_by,
3501 &rows,
3502 curr_idx,
3503 prev_idx,
3504 ) {
3505 break;
3506 }
3507 group_start -= 1;
3508 }
3509 (group_start + 1) as i64
3510 } else {
3511 (position + 1) as i64
3512 }
3513 };
3514 Value::from(rank)
3515 }
3516 "DENSE_RANK" => {
3517 let mut dense_rank = 1i64;
3519 for i in 0..position {
3520 let curr_idx = row_indices[i + 1];
3521 let prev_idx = row_indices[i];
3522 if !self.rows_have_same_sort_keys(
3523 &window_spec.order_by,
3524 &rows,
3525 curr_idx,
3526 prev_idx,
3527 ) {
3528 dense_rank += 1;
3529 }
3530 }
3531 Value::from(dense_rank)
3532 }
3533 "LAG" => {
3534 let (value_expr, offset, default_value) =
3535 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3536
3537 if position >= offset {
3538 let target_idx = row_indices[position - offset];
3539 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3540 } else {
3541 default_value
3542 }
3543 }
3544 "LEAD" => {
3545 let (value_expr, offset, default_value) =
3546 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3547
3548 if position + offset < row_indices.len() {
3549 let target_idx = row_indices[position + offset];
3550 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3551 } else {
3552 default_value
3553 }
3554 }
3555 "NTILE" => {
3556 let num_buckets_expr = args.first().ok_or_else(|| {
3558 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3559 })?;
3560 let num_buckets_val =
3561 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3562 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3563 anyhow!(
3564 "NTILE argument must be an integer, got: {:?}",
3565 num_buckets_val
3566 )
3567 })?;
3568
3569 if num_buckets <= 0 {
3570 return Err(anyhow!(
3571 "NTILE bucket count must be positive, got: {}",
3572 num_buckets
3573 ));
3574 }
3575
3576 let num_buckets = num_buckets as usize;
3577 let partition_size = row_indices.len();
3578
3579 let base_size = partition_size / num_buckets;
3584 let extra_rows = partition_size % num_buckets;
3585
3586 let bucket = if position < extra_rows * (base_size + 1) {
3588 position / (base_size + 1) + 1
3590 } else {
3591 let adjusted_position = position - extra_rows * (base_size + 1);
3593 extra_rows + (adjusted_position / base_size) + 1
3594 };
3595
3596 Value::from(bucket as i64)
3597 }
3598 "FIRST_VALUE" => {
3599 let value_expr = args.first().ok_or_else(|| {
3601 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3602 })?;
3603
3604 if row_indices.is_empty() {
3606 Value::Null
3607 } else {
3608 let first_idx = row_indices[0];
3609 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3610 }
3611 }
3612 "LAST_VALUE" => {
3613 let value_expr = args.first().ok_or_else(|| {
3615 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3616 })?;
3617
3618 if row_indices.is_empty() {
3620 Value::Null
3621 } else {
3622 let last_idx = row_indices[row_indices.len() - 1];
3623 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3624 }
3625 }
3626 "NTH_VALUE" => {
3627 if args.len() != 2 {
3629 return Err(anyhow!(
3630 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3631 ));
3632 }
3633
3634 let value_expr = &args[0];
3635 let n_expr = &args[1];
3636
3637 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3638 let n = n_val.as_i64().ok_or_else(|| {
3639 anyhow!(
3640 "NTH_VALUE second argument must be an integer, got: {:?}",
3641 n_val
3642 )
3643 })?;
3644
3645 if n <= 0 {
3646 return Err(anyhow!(
3647 "NTH_VALUE position must be positive, got: {}",
3648 n
3649 ));
3650 }
3651
3652 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3654 let nth_idx = row_indices[nth_index];
3655 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3656 } else {
3657 Value::Null
3658 }
3659 }
3660 _ => unreachable!("Window function {} already validated", name),
3661 };
3662
3663 let col_name = window_expr.to_string_repr();
3666 rows[row_idx].insert(col_name, window_value);
3667 }
3668 }
3669 }
3670
3671 Ok(rows)
3672 }
3673
3674 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3679 match expr {
3680 Expr::Variable(name) => row
3681 .get(name)
3682 .cloned()
3683 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3684 Expr::Property(base, prop) => {
3685 let base_val = self.evaluate_simple_expr(base, row)?;
3686 if let Value::Map(map) = base_val {
3687 map.get(prop)
3688 .cloned()
3689 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3690 } else {
3691 Err(anyhow!("Cannot access property on non-object"))
3692 }
3693 }
3694 Expr::Literal(lit) => Ok(lit.to_value()),
3695 _ => Err(anyhow!(
3696 "Unsupported expression in window function: {:?}",
3697 expr
3698 )),
3699 }
3700 }
3701
3702 fn rows_have_same_sort_keys(
3704 &self,
3705 order_by: &[uni_cypher::ast::SortItem],
3706 rows: &[HashMap<String, Value>],
3707 idx_a: usize,
3708 idx_b: usize,
3709 ) -> bool {
3710 order_by.iter().all(|sort_item| {
3711 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3712 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3713 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3714 })
3715 }
3716
3717 fn extract_lag_lead_params<'a>(
3719 &self,
3720 func_name: &str,
3721 args: &'a [Expr],
3722 row: &HashMap<String, Value>,
3723 ) -> Result<(&'a Expr, usize, Value)> {
3724 let value_expr = args.first().ok_or_else(|| {
3725 anyhow!(
3726 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3727 func_name,
3728 func_name
3729 )
3730 })?;
3731
3732 let offset = if let Some(offset_expr) = args.get(1) {
3733 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3734 offset_val.as_i64().ok_or_else(|| {
3735 anyhow!(
3736 "{} offset must be an integer, got: {:?}",
3737 func_name,
3738 offset_val
3739 )
3740 })? as usize
3741 } else {
3742 1
3743 };
3744
3745 let default_value = if let Some(default_expr) = args.get(2) {
3746 self.evaluate_simple_expr(default_expr, row)?
3747 } else {
3748 Value::Null
3749 };
3750
3751 Ok((value_expr, offset, default_value))
3752 }
3753
3754 pub(crate) async fn evaluate_group_keys(
3756 &self,
3757 group_by: &[Expr],
3758 row: &HashMap<String, Value>,
3759 prop_manager: &PropertyManager,
3760 params: &HashMap<String, Value>,
3761 ctx: Option<&QueryContext>,
3762 ) -> Result<Vec<Value>> {
3763 let mut key_vals = Vec::new();
3764 for expr in group_by {
3765 key_vals.push(
3766 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3767 .await?,
3768 );
3769 }
3770 Ok(key_vals)
3771 }
3772
3773 pub(crate) async fn update_accumulators(
3775 &self,
3776 accs: &mut [Accumulator],
3777 aggregates: &[Expr],
3778 row: &HashMap<String, Value>,
3779 prop_manager: &PropertyManager,
3780 params: &HashMap<String, Value>,
3781 ctx: Option<&QueryContext>,
3782 ) -> Result<()> {
3783 for (i, agg_expr) in aggregates.iter().enumerate() {
3784 if let Expr::FunctionCall { args, .. } = agg_expr {
3785 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3786 let val = if is_wildcard {
3787 Value::Null
3788 } else {
3789 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3790 .await?
3791 };
3792 accs[i].update(&val, is_wildcard);
3793 }
3794 }
3795 Ok(())
3796 }
3797
3798 pub(crate) async fn execute_recursive_cte(
3800 &self,
3801 cte_name: &str,
3802 initial: LogicalPlan,
3803 recursive: LogicalPlan,
3804 prop_manager: &PropertyManager,
3805 params: &HashMap<String, Value>,
3806 ctx: Option<&QueryContext>,
3807 ) -> Result<Vec<HashMap<String, Value>>> {
3808 use std::collections::HashSet;
3809
3810 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3813 let mut pairs: Vec<_> = row.iter().collect();
3814 pairs.sort_by(|a, b| a.0.cmp(b.0));
3815 format!("{pairs:?}")
3816 }
3817
3818 let mut working_table = self
3820 .execute_subplan(initial, prop_manager, params, ctx)
3821 .await?;
3822 let mut result_table = working_table.clone();
3823
3824 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3826
3827 let max_iterations = self.config.max_recursive_cte_iterations;
3830 for _iteration in 0..max_iterations {
3831 if let Some(ctx) = ctx {
3833 ctx.check_timeout()?;
3834 }
3835
3836 if working_table.is_empty() {
3837 break;
3838 }
3839
3840 let working_val = Value::List(
3842 working_table
3843 .iter()
3844 .map(|row| {
3845 if row.len() == 1 {
3846 row.values().next().unwrap().clone()
3847 } else {
3848 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3849 }
3850 })
3851 .collect(),
3852 );
3853
3854 let mut next_params = params.clone();
3855 next_params.insert(cte_name.to_string(), working_val);
3856
3857 let next_result = self
3859 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3860 .await?;
3861
3862 if next_result.is_empty() {
3863 break;
3864 }
3865
3866 let new_rows: Vec<_> = next_result
3868 .into_iter()
3869 .filter(|row| {
3870 let key = row_key(row);
3871 seen.insert(key) })
3873 .collect();
3874
3875 if new_rows.is_empty() {
3876 break;
3878 }
3879
3880 result_table.extend(new_rows.clone());
3881 working_table = new_rows;
3882 }
3883
3884 let final_list = Value::List(
3886 result_table
3887 .into_iter()
3888 .map(|row| {
3889 if row.len() == 1 {
3901 row.values().next().unwrap().clone()
3902 } else {
3903 Value::Map(row.into_iter().collect())
3904 }
3905 })
3906 .collect(),
3907 );
3908
3909 let mut final_row = HashMap::new();
3910 final_row.insert(cte_name.to_string(), final_list);
3911 Ok(vec![final_row])
3912 }
3913
3914 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3916
3917 pub(crate) async fn execute_sort(
3918 &self,
3919 rows: Vec<HashMap<String, Value>>,
3920 order_by: &[uni_cypher::ast::SortItem],
3921 prop_manager: &PropertyManager,
3922 params: &HashMap<String, Value>,
3923 ctx: Option<&QueryContext>,
3924 ) -> Result<Vec<HashMap<String, Value>>> {
3925 if let Some(ctx) = ctx {
3927 ctx.check_timeout()?;
3928 }
3929
3930 let mut rows_with_keys = Vec::with_capacity(rows.len());
3931 for (idx, row) in rows.into_iter().enumerate() {
3932 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3934 && let Some(ctx) = ctx
3935 {
3936 ctx.check_timeout()?;
3937 }
3938
3939 let mut keys = Vec::new();
3940 for item in order_by {
3941 let val = row
3942 .get(&item.expr.to_string_repr())
3943 .cloned()
3944 .unwrap_or(Value::Null);
3945 let val = if val.is_null() {
3946 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3947 .await
3948 .unwrap_or(Value::Null)
3949 } else {
3950 val
3951 };
3952 keys.push(val);
3953 }
3954 rows_with_keys.push((row, keys));
3955 }
3956
3957 if let Some(ctx) = ctx {
3959 ctx.check_timeout()?;
3960 }
3961
3962 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3963
3964 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3965 }
3966
3967 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3969 aggregates
3970 .iter()
3971 .map(|expr| {
3972 if let Expr::FunctionCall { name, distinct, .. } = expr {
3973 Accumulator::new(name, *distinct)
3974 } else {
3975 Accumulator::new("COUNT", false)
3976 }
3977 })
3978 .collect()
3979 }
3980
3981 pub(crate) fn build_aggregate_result(
3983 group_by: &[Expr],
3984 aggregates: &[Expr],
3985 key_vals: &[Value],
3986 accs: &[Accumulator],
3987 ) -> HashMap<String, Value> {
3988 let mut res_row = HashMap::new();
3989 for (i, expr) in group_by.iter().enumerate() {
3990 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3991 }
3992 for (i, expr) in aggregates.iter().enumerate() {
3993 let col_name = crate::query::planner::aggregate_column_name(expr);
3995 res_row.insert(col_name, accs[i].finish());
3996 }
3997 res_row
3998 }
3999
4000 pub(crate) fn compare_sort_keys(
4002 a_keys: &[Value],
4003 b_keys: &[Value],
4004 order_by: &[uni_cypher::ast::SortItem],
4005 ) -> std::cmp::Ordering {
4006 for (i, item) in order_by.iter().enumerate() {
4007 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
4008 if order != std::cmp::Ordering::Equal {
4009 return if item.ascending {
4010 order
4011 } else {
4012 order.reverse()
4013 };
4014 }
4015 }
4016 std::cmp::Ordering::Equal
4017 }
4018
4019 pub(crate) async fn execute_backup(
4023 &self,
4024 destination: &str,
4025 _options: &HashMap<String, Value>,
4026 ) -> Result<Vec<HashMap<String, Value>>> {
4027 if let Some(writer_arc) = &self.writer {
4029 let writer: &uni_store::Writer = writer_arc.as_ref();
4030 writer.flush_to_l1(None).await?;
4031 }
4032
4033 let snapshot_manager = self.storage.snapshot_manager();
4035 let snapshot = snapshot_manager
4036 .load_latest_snapshot()
4037 .await?
4038 .ok_or_else(|| anyhow!("No snapshot found"))?;
4039
4040 if is_cloud_url(destination) {
4042 self.backup_to_cloud(destination, &snapshot.snapshot_id)
4043 .await?;
4044 } else {
4045 let validated_dest = self.validate_path(destination)?;
4047 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4048 .await?;
4049 }
4050
4051 let mut res = HashMap::new();
4052 res.insert(
4053 "status".to_string(),
4054 Value::String("Backup completed".to_string()),
4055 );
4056 res.insert(
4057 "snapshot_id".to_string(),
4058 Value::String(snapshot.snapshot_id),
4059 );
4060 Ok(vec![res])
4061 }
4062
4063 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4065 let source_path = std::path::Path::new(self.storage.base_path());
4066
4067 if !dest_path.exists() {
4068 std::fs::create_dir_all(dest_path)?;
4069 }
4070
4071 if source_path.exists() {
4073 Self::copy_dir_all(source_path, dest_path)?;
4074 }
4075
4076 let schema_manager = self.storage.schema_manager();
4078 let dest_catalog = dest_path.join("catalog");
4079 if !dest_catalog.exists() {
4080 std::fs::create_dir_all(&dest_catalog)?;
4081 }
4082
4083 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4084 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4085
4086 Ok(())
4087 }
4088
4089 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4093 use object_store::ObjectStore;
4094 use object_store::ObjectStoreExt;
4095 use object_store::local::LocalFileSystem;
4096 use object_store::path::Path as ObjPath;
4097
4098 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4099 let source_path = std::path::Path::new(self.storage.base_path());
4100
4101 let src_store: Arc<dyn ObjectStore> =
4103 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4104
4105 let catalog_src = ObjPath::from("catalog");
4107 let catalog_dst = if dest_prefix.as_ref().is_empty() {
4108 ObjPath::from("catalog")
4109 } else {
4110 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4111 };
4112 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4113
4114 let storage_src = ObjPath::from("storage");
4116 let storage_dst = if dest_prefix.as_ref().is_empty() {
4117 ObjPath::from("storage")
4118 } else {
4119 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4120 };
4121 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4122
4123 let schema_manager = self.storage.schema_manager();
4125 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4126 let schema_path = if dest_prefix.as_ref().is_empty() {
4127 ObjPath::from("catalog/schema.json")
4128 } else {
4129 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4130 };
4131 dest_store
4132 .put(&schema_path, bytes::Bytes::from(schema_content).into())
4133 .await?;
4134
4135 Ok(())
4136 }
4137
4138 const MAX_BACKUP_DEPTH: usize = 100;
4143
4144 const MAX_BACKUP_FILES: usize = 100_000;
4149
4150 pub(crate) fn copy_dir_all(
4158 src: &std::path::Path,
4159 dst: &std::path::Path,
4160 ) -> std::io::Result<()> {
4161 let mut file_count = 0usize;
4162 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4163 }
4164
4165 pub(crate) fn copy_dir_all_impl(
4167 src: &std::path::Path,
4168 dst: &std::path::Path,
4169 depth: usize,
4170 file_count: &mut usize,
4171 ) -> std::io::Result<()> {
4172 if depth >= Self::MAX_BACKUP_DEPTH {
4173 return Err(std::io::Error::new(
4174 std::io::ErrorKind::InvalidInput,
4175 format!(
4176 "Maximum backup depth {} exceeded at {:?}",
4177 Self::MAX_BACKUP_DEPTH,
4178 src
4179 ),
4180 ));
4181 }
4182
4183 std::fs::create_dir_all(dst)?;
4184
4185 for entry in std::fs::read_dir(src)? {
4186 if *file_count >= Self::MAX_BACKUP_FILES {
4187 return Err(std::io::Error::new(
4188 std::io::ErrorKind::InvalidInput,
4189 format!(
4190 "Maximum backup file count {} exceeded",
4191 Self::MAX_BACKUP_FILES
4192 ),
4193 ));
4194 }
4195 *file_count += 1;
4196
4197 let entry = entry?;
4198 let metadata = entry.metadata()?;
4199
4200 if metadata.file_type().is_symlink() {
4202 continue;
4204 }
4205
4206 let dst_path = dst.join(entry.file_name());
4207 if metadata.is_dir() {
4208 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4209 } else {
4210 std::fs::copy(entry.path(), dst_path)?;
4211 }
4212 }
4213 Ok(())
4214 }
4215
4216 pub(crate) async fn execute_copy(
4217 &self,
4218 target: &str,
4219 source: &str,
4220 options: &HashMap<String, Value>,
4221 prop_manager: &PropertyManager,
4222 ) -> Result<Vec<HashMap<String, Value>>> {
4223 let format = options
4224 .get("format")
4225 .and_then(|v| v.as_str())
4226 .unwrap_or_else(|| {
4227 if source.ends_with(".parquet") {
4228 "parquet"
4229 } else {
4230 "csv"
4231 }
4232 });
4233
4234 match format.to_lowercase().as_str() {
4235 "csv" => self.execute_csv_import(target, source, options).await,
4236 "parquet" => {
4237 self.execute_parquet_import(target, source, options, prop_manager)
4238 .await
4239 }
4240 _ => Err(anyhow!("Unsupported format: {}", format)),
4241 }
4242 }
4243
4244 pub(crate) async fn execute_csv_import(
4245 &self,
4246 target: &str,
4247 source: &str,
4248 options: &HashMap<String, Value>,
4249 ) -> Result<Vec<HashMap<String, Value>>> {
4250 let validated_source = self.validate_path(source)?;
4252
4253 let writer_lock = self
4254 .writer
4255 .as_ref()
4256 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4257
4258 let schema = self.storage.schema_manager().schema();
4259
4260 let label_meta = schema.labels.get(target);
4262 let edge_meta = schema.edge_types.get(target);
4263
4264 if label_meta.is_none() && edge_meta.is_none() {
4265 return Err(anyhow!("Target '{}' not found in schema", target));
4266 }
4267
4268 let delimiter_str = options
4270 .get("delimiter")
4271 .and_then(|v| v.as_str())
4272 .unwrap_or(",");
4273 let delimiter = if delimiter_str.is_empty() {
4274 b','
4275 } else {
4276 delimiter_str.as_bytes()[0]
4277 };
4278 let has_header = options
4279 .get("header")
4280 .and_then(|v| v.as_bool())
4281 .unwrap_or(true);
4282
4283 let mut rdr = csv::ReaderBuilder::new()
4284 .delimiter(delimiter)
4285 .has_headers(has_header)
4286 .from_path(&validated_source)?;
4287
4288 let headers = rdr.headers()?.clone();
4289 let mut count = 0;
4290
4291 let writer: &uni_store::Writer = writer_lock.as_ref();
4292
4293 const CSV_ID_CHUNK: usize = 256;
4297
4298 if label_meta.is_some() {
4299 let target_props = schema
4300 .properties
4301 .get(target)
4302 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4303
4304 let mut vid_chunk: std::collections::VecDeque<Vid> =
4305 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4306 for result in rdr.records() {
4307 let record = result?;
4308 let mut props = HashMap::new();
4309
4310 for (i, header) in headers.iter().enumerate() {
4311 if let Some(val_str) = record.get(i)
4312 && let Some(prop_meta) = target_props.get(header)
4313 {
4314 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4315 props.insert(header.to_string(), val);
4316 }
4317 }
4318
4319 if vid_chunk.is_empty() {
4320 vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4321 }
4322 let vid = vid_chunk.pop_front().unwrap();
4323 writer
4324 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4325 .await?;
4326 count += 1;
4327 }
4328 } else if let Some(meta) = edge_meta {
4329 let type_id = meta.id;
4330 let target_props = schema
4331 .properties
4332 .get(target)
4333 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4334
4335 let src_col = options
4338 .get("src_col")
4339 .and_then(|v| v.as_str())
4340 .unwrap_or("_src");
4341 let dst_col = options
4342 .get("dst_col")
4343 .and_then(|v| v.as_str())
4344 .unwrap_or("_dst");
4345
4346 let mut eid_chunk: std::collections::VecDeque<Eid> =
4347 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4348 for result in rdr.records() {
4349 let record = result?;
4350 let mut props = HashMap::new();
4351 let mut src_vid = None;
4352 let mut dst_vid = None;
4353
4354 for (i, header) in headers.iter().enumerate() {
4355 if let Some(val_str) = record.get(i) {
4356 if header == src_col {
4357 src_vid =
4358 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4359 } else if header == dst_col {
4360 dst_vid =
4361 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4362 } else if let Some(prop_meta) = target_props.get(header) {
4363 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4364 props.insert(header.to_string(), val);
4365 }
4366 }
4367 }
4368
4369 let src =
4370 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4371 let dst = dst_vid
4372 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4373
4374 if eid_chunk.is_empty() {
4375 eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4376 }
4377 let eid = eid_chunk.pop_front().unwrap();
4378 writer
4379 .insert_edge(
4380 src,
4381 dst,
4382 type_id,
4383 eid,
4384 props,
4385 Some(target.to_string()),
4386 None,
4387 )
4388 .await?;
4389 count += 1;
4390 }
4391 }
4392
4393 let mut res = HashMap::new();
4394 res.insert("count".to_string(), Value::Int(count as i64));
4395 Ok(vec![res])
4396 }
4397
4398 pub(crate) async fn execute_parquet_import(
4402 &self,
4403 target: &str,
4404 source: &str,
4405 options: &HashMap<String, Value>,
4406 _prop_manager: &PropertyManager,
4407 ) -> Result<Vec<HashMap<String, Value>>> {
4408 let writer_lock = self
4409 .writer
4410 .as_ref()
4411 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4412
4413 let schema = self.storage.schema_manager().schema();
4414
4415 let label_meta = schema.labels.get(target);
4417 let edge_meta = schema.edge_types.get(target);
4418
4419 if label_meta.is_none() && edge_meta.is_none() {
4420 return Err(anyhow!("Target '{}' not found in schema", target));
4421 }
4422
4423 let reader = if is_cloud_url(source) {
4425 self.open_parquet_from_cloud(source).await?
4426 } else {
4427 let validated_source = self.validate_path(source)?;
4429 let file = std::fs::File::open(&validated_source)?;
4430 let builder =
4431 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4432 builder.build()?
4433 };
4434 let mut reader = reader;
4435
4436 let mut count = 0;
4437 let writer: &uni_store::Writer = writer_lock.as_ref();
4438
4439 if label_meta.is_some() {
4440 let target_props = schema
4441 .properties
4442 .get(target)
4443 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4444
4445 for batch in reader.by_ref() {
4446 let batch = batch?;
4447 let num_rows = batch.num_rows();
4448 let vids = writer.allocate_vids(num_rows).await?;
4450 for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4451 let mut props = HashMap::new();
4452 for field in batch.schema().fields() {
4453 let name = field.name();
4454 if target_props.contains_key(name) {
4455 let col = batch.column_by_name(name).unwrap();
4456 if !col.is_null(row) {
4457 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4459 let val =
4460 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4461 props.insert(name.clone(), val);
4462 }
4463 }
4464 }
4465 writer
4466 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4467 .await?;
4468 count += 1;
4469 }
4470 }
4471 } else if let Some(meta) = edge_meta {
4472 let type_id = meta.id;
4473 let target_props = schema
4474 .properties
4475 .get(target)
4476 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4477
4478 let src_col = options
4479 .get("src_col")
4480 .and_then(|v| v.as_str())
4481 .unwrap_or("_src");
4482 let dst_col = options
4483 .get("dst_col")
4484 .and_then(|v| v.as_str())
4485 .unwrap_or("_dst");
4486
4487 for batch in reader {
4488 let batch = batch?;
4489 let num_rows = batch.num_rows();
4490 let eids = writer.allocate_eids(num_rows).await?;
4492 for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4493 let mut props = HashMap::new();
4494 let mut src_vid = None;
4495 let mut dst_vid = None;
4496
4497 for field in batch.schema().fields() {
4498 let name = field.name();
4499 let col = batch.column_by_name(name).unwrap();
4500 if col.is_null(row) {
4501 continue;
4502 }
4503
4504 if name == src_col {
4505 let val = Self::arrow_to_value(col.as_ref(), row);
4506 src_vid = Some(Self::vid_from_value(&val)?);
4507 } else if name == dst_col {
4508 let val = Self::arrow_to_value(col.as_ref(), row);
4509 dst_vid = Some(Self::vid_from_value(&val)?);
4510 } else if let Some(pm) = target_props.get(name) {
4511 let val =
4513 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4514 props.insert(name.clone(), val);
4515 }
4516 }
4517
4518 let src = src_vid
4519 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4520 let dst = dst_vid.ok_or_else(|| {
4521 anyhow!("Missing destination VID in column '{}'", dst_col)
4522 })?;
4523
4524 writer
4525 .insert_edge(
4526 src,
4527 dst,
4528 type_id,
4529 eid,
4530 props,
4531 Some(target.to_string()),
4532 None,
4533 )
4534 .await?;
4535 count += 1;
4536 }
4537 }
4538 }
4539
4540 let mut res = HashMap::new();
4541 res.insert("count".to_string(), Value::Int(count as i64));
4542 Ok(vec![res])
4543 }
4544
4545 async fn open_parquet_from_cloud(
4549 &self,
4550 source_url: &str,
4551 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4552 use object_store::ObjectStoreExt;
4553
4554 let (store, path) = build_store_from_url(source_url)?;
4555
4556 let bytes = store.get(&path).await?.bytes().await?;
4558
4559 let reader = bytes::Bytes::from(bytes.to_vec());
4561 let builder =
4562 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4563 Ok(builder.build()?)
4564 }
4565
4566 pub(crate) async fn scan_edge_type(
4567 &self,
4568 edge_type: &str,
4569 ctx: Option<&QueryContext>,
4570 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4571 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4572
4573 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4575
4576 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4578
4579 if let Some(ctx) = ctx {
4581 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4582 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4583 }
4584
4585 Ok(edges
4586 .into_iter()
4587 .map(|(eid, (src, dst))| (eid, src, dst))
4588 .collect())
4589 }
4590
4591 pub(crate) async fn scan_edge_type_l2(
4596 &self,
4597 _edge_type: &str,
4598 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4599 ) -> Result<()> {
4600 Ok(())
4603 }
4604
4605 pub(crate) async fn scan_edge_type_l1(
4607 &self,
4608 edge_type: &str,
4609 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4610 ) -> Result<()> {
4611 if let Ok(Some(batch)) = self
4612 .storage
4613 .scan_delta_table(
4614 edge_type,
4615 "fwd",
4616 &["eid", "src_vid", "dst_vid", "op", "_version"],
4617 None,
4618 )
4619 .await
4620 {
4621 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4623 HashMap::new();
4624
4625 self.process_delta_batch(&batch, &mut versioned_ops)?;
4626
4627 for (eid, (_, op, src, dst)) in versioned_ops {
4629 if op == 0 {
4630 edges.insert(eid, (src, dst));
4631 } else if op == 1 {
4632 edges.remove(&eid);
4633 }
4634 }
4635 }
4636 Ok(())
4637 }
4638
4639 pub(crate) fn process_delta_batch(
4641 &self,
4642 batch: &arrow_array::RecordBatch,
4643 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4644 ) -> Result<()> {
4645 use arrow_array::UInt64Array;
4646 let eid_col = batch
4647 .column_by_name("eid")
4648 .ok_or(anyhow!("Missing eid"))?
4649 .as_any()
4650 .downcast_ref::<UInt64Array>()
4651 .ok_or(anyhow!("Invalid eid"))?;
4652 let src_col = batch
4653 .column_by_name("src_vid")
4654 .ok_or(anyhow!("Missing src_vid"))?
4655 .as_any()
4656 .downcast_ref::<UInt64Array>()
4657 .ok_or(anyhow!("Invalid src_vid"))?;
4658 let dst_col = batch
4659 .column_by_name("dst_vid")
4660 .ok_or(anyhow!("Missing dst_vid"))?
4661 .as_any()
4662 .downcast_ref::<UInt64Array>()
4663 .ok_or(anyhow!("Invalid dst_vid"))?;
4664 let op_col = batch
4665 .column_by_name("op")
4666 .ok_or(anyhow!("Missing op"))?
4667 .as_any()
4668 .downcast_ref::<arrow_array::UInt8Array>()
4669 .ok_or(anyhow!("Invalid op"))?;
4670 let version_col = batch
4671 .column_by_name("_version")
4672 .ok_or(anyhow!("Missing _version"))?
4673 .as_any()
4674 .downcast_ref::<UInt64Array>()
4675 .ok_or(anyhow!("Invalid _version"))?;
4676
4677 for i in 0..batch.num_rows() {
4678 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4679 let version = version_col.value(i);
4680 let op = op_col.value(i);
4681 let src = Vid::from(src_col.value(i));
4682 let dst = Vid::from(dst_col.value(i));
4683
4684 match versioned_ops.entry(eid) {
4685 std::collections::hash_map::Entry::Vacant(e) => {
4686 e.insert((version, op, src, dst));
4687 }
4688 std::collections::hash_map::Entry::Occupied(mut e) => {
4689 if version > e.get().0 {
4690 e.insert((version, op, src, dst));
4691 }
4692 }
4693 }
4694 }
4695 Ok(())
4696 }
4697
4698 pub(crate) fn scan_edge_type_l0(
4700 &self,
4701 edge_type: &str,
4702 ctx: &QueryContext,
4703 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4704 ) {
4705 let schema = self.storage.schema_manager().schema();
4706 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4707
4708 if let Some(type_id) = type_id {
4709 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4711
4712 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4714 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4715 }
4716
4717 for pending_l0_arc in &ctx.pending_flush_l0s {
4719 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4720 }
4721 }
4722 }
4723
4724 pub(crate) fn scan_single_l0(
4726 &self,
4727 l0: &uni_store::runtime::L0Buffer,
4728 type_id: u32,
4729 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4730 ) {
4731 for edge_entry in l0.graph.edges() {
4732 if edge_entry.edge_type == type_id {
4733 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4734 }
4735 }
4736 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4738 for eid in eids_to_check {
4739 if l0.is_tombstoned(eid) {
4740 edges.remove(&eid);
4741 }
4742 }
4743 }
4744
4745 pub(crate) fn filter_tombstoned_vertex_edges(
4747 &self,
4748 ctx: &QueryContext,
4749 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4750 ) {
4751 let l0 = ctx.l0.read();
4752 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4753
4754 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4756 let tx_l0 = tx_l0_arc.read();
4757 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4758 }
4759
4760 for pending_l0_arc in &ctx.pending_flush_l0s {
4762 let pending_l0 = pending_l0_arc.read();
4763 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4764 }
4765
4766 edges.retain(|_, (src, dst)| {
4767 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4768 });
4769 }
4770
4771 pub(crate) async fn execute_project(
4773 &self,
4774 input_rows: Vec<HashMap<String, Value>>,
4775 projections: &[(Expr, Option<String>)],
4776 prop_manager: &PropertyManager,
4777 params: &HashMap<String, Value>,
4778 ctx: Option<&QueryContext>,
4779 ) -> Result<Vec<HashMap<String, Value>>> {
4780 let mut results = Vec::new();
4781 for m in input_rows {
4782 let mut row = HashMap::new();
4783 for (expr, alias) in projections {
4784 let val = self
4785 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4786 .await?;
4787 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4788 row.insert(name, val);
4789 }
4790 results.push(row);
4791 }
4792 Ok(results)
4793 }
4794
4795 pub(crate) async fn execute_unwind(
4797 &self,
4798 input_rows: Vec<HashMap<String, Value>>,
4799 expr: &Expr,
4800 variable: &str,
4801 prop_manager: &PropertyManager,
4802 params: &HashMap<String, Value>,
4803 ctx: Option<&QueryContext>,
4804 ) -> Result<Vec<HashMap<String, Value>>> {
4805 let mut results = Vec::new();
4806 for row in input_rows {
4807 let val = self
4808 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4809 .await?;
4810 if let Value::List(items) = val {
4811 for item in items {
4812 let mut new_row = row.clone();
4813 new_row.insert(variable.to_string(), item);
4814 results.push(new_row);
4815 }
4816 }
4817 }
4818 Ok(results)
4819 }
4820
4821 pub(crate) async fn execute_apply(
4823 &self,
4824 input_rows: Vec<HashMap<String, Value>>,
4825 subquery: &LogicalPlan,
4826 input_filter: Option<&Expr>,
4827 prop_manager: &PropertyManager,
4828 params: &HashMap<String, Value>,
4829 ctx: Option<&QueryContext>,
4830 ) -> Result<Vec<HashMap<String, Value>>> {
4831 let mut filtered_rows = input_rows;
4832
4833 if let Some(filter) = input_filter {
4834 let mut filtered = Vec::new();
4835 for row in filtered_rows {
4836 let res = self
4837 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4838 .await?;
4839 if res.as_bool().unwrap_or(false) {
4840 filtered.push(row);
4841 }
4842 }
4843 filtered_rows = filtered;
4844 }
4845
4846 if filtered_rows.is_empty() {
4849 let sub_rows = self
4850 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4851 .await?;
4852 return Ok(sub_rows);
4853 }
4854
4855 let mut results = Vec::new();
4856 for row in filtered_rows {
4857 let mut sub_params = params.clone();
4858 sub_params.extend(row.clone());
4859
4860 let sub_rows = self
4861 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4862 .await?;
4863
4864 for sub_row in sub_rows {
4865 let mut new_row = row.clone();
4866 new_row.extend(sub_row);
4867 results.push(new_row);
4868 }
4869 }
4870 Ok(results)
4871 }
4872
4873 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4875 let schema = self.storage.schema_manager().schema();
4876 let mut rows = Vec::new();
4877 for idx in &schema.indexes {
4878 let (name, type_str, details) = match idx {
4879 uni_common::core::schema::IndexDefinition::Vector(c) => (
4880 c.name.clone(),
4881 "VECTOR",
4882 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4883 ),
4884 uni_common::core::schema::IndexDefinition::FullText(c) => (
4885 c.name.clone(),
4886 "FULLTEXT",
4887 format!("on {}:{:?}", c.label, c.properties),
4888 ),
4889 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4890 cfg.name.clone(),
4891 "SCALAR",
4892 format!(":{}({:?})", cfg.label, cfg.properties),
4893 ),
4894 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4895 };
4896
4897 if let Some(f) = filter
4898 && f != type_str
4899 {
4900 continue;
4901 }
4902
4903 let mut row = HashMap::new();
4904 row.insert("name".to_string(), Value::String(name));
4905 row.insert("type".to_string(), Value::String(type_str.to_string()));
4906 row.insert("details".to_string(), Value::String(details));
4907 rows.push(row);
4908 }
4909 rows
4910 }
4911
4912 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4913 let mut row = HashMap::new();
4914 row.insert("name".to_string(), Value::String("uni".to_string()));
4915 vec![row]
4917 }
4918
4919 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4920 vec![]
4922 }
4923
4924 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4925 let snapshot = self
4926 .storage
4927 .snapshot_manager()
4928 .load_latest_snapshot()
4929 .await?;
4930 let mut results = Vec::new();
4931
4932 if let Some(snap) = snapshot {
4933 for (label, s) in &snap.vertices {
4934 let mut row = HashMap::new();
4935 row.insert("type".to_string(), Value::String("Label".to_string()));
4936 row.insert("name".to_string(), Value::String(label.clone()));
4937 row.insert("count".to_string(), Value::Int(s.count as i64));
4938 results.push(row);
4939 }
4940 for (edge, s) in &snap.edges {
4941 let mut row = HashMap::new();
4942 row.insert("type".to_string(), Value::String("Edge".to_string()));
4943 row.insert("name".to_string(), Value::String(edge.clone()));
4944 row.insert("count".to_string(), Value::Int(s.count as i64));
4945 results.push(row);
4946 }
4947 }
4948
4949 Ok(results)
4950 }
4951
4952 pub(crate) fn execute_show_constraints(
4953 &self,
4954 clause: ShowConstraints,
4955 ) -> Vec<HashMap<String, Value>> {
4956 let schema = self.storage.schema_manager().schema();
4957 let mut rows = Vec::new();
4958 for c in &schema.constraints {
4959 if let Some(target) = &clause.target {
4960 match (target, &c.target) {
4961 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4962 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4963 if e1 == e2 => {}
4964 _ => continue,
4965 }
4966 }
4967
4968 let mut row = HashMap::new();
4969 row.insert("name".to_string(), Value::String(c.name.clone()));
4970 let type_str = match c.constraint_type {
4971 ConstraintType::Unique { .. } => "UNIQUE",
4972 ConstraintType::Exists { .. } => "EXISTS",
4973 ConstraintType::Check { .. } => "CHECK",
4974 _ => "UNKNOWN",
4975 };
4976 row.insert("type".to_string(), Value::String(type_str.to_string()));
4977
4978 let target_str = match &c.target {
4979 ConstraintTarget::Label(l) => format!("(:{})", l),
4980 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4981 _ => "UNKNOWN".to_string(),
4982 };
4983 row.insert("target".to_string(), Value::String(target_str));
4984
4985 rows.push(row);
4986 }
4987 rows
4988 }
4989
4990 pub(crate) async fn execute_cross_join(
4992 &self,
4993 left: Box<LogicalPlan>,
4994 right: Box<LogicalPlan>,
4995 prop_manager: &PropertyManager,
4996 params: &HashMap<String, Value>,
4997 ctx: Option<&QueryContext>,
4998 ) -> Result<Vec<HashMap<String, Value>>> {
4999 let left_rows = self
5000 .execute_subplan(*left, prop_manager, params, ctx)
5001 .await?;
5002 let right_rows = self
5003 .execute_subplan(*right, prop_manager, params, ctx)
5004 .await?;
5005
5006 let mut results = Vec::new();
5007 for l in &left_rows {
5008 for r in &right_rows {
5009 let mut combined = l.clone();
5010 combined.extend(r.clone());
5011 results.push(combined);
5012 }
5013 }
5014 Ok(results)
5015 }
5016
5017 pub(crate) async fn execute_union(
5019 &self,
5020 left: Box<LogicalPlan>,
5021 right: Box<LogicalPlan>,
5022 all: bool,
5023 prop_manager: &PropertyManager,
5024 params: &HashMap<String, Value>,
5025 ctx: Option<&QueryContext>,
5026 ) -> Result<Vec<HashMap<String, Value>>> {
5027 let mut left_rows = self
5028 .execute_subplan(*left, prop_manager, params, ctx)
5029 .await?;
5030 let mut right_rows = self
5031 .execute_subplan(*right, prop_manager, params, ctx)
5032 .await?;
5033
5034 left_rows.append(&mut right_rows);
5035
5036 if !all {
5037 let mut seen = HashSet::new();
5038 left_rows.retain(|row| {
5039 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5040 let key = format!("{sorted_row:?}");
5041 seen.insert(key)
5042 });
5043 }
5044 Ok(left_rows)
5045 }
5046
5047 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5049 let schema = self.storage.schema_manager().schema();
5050 schema.indexes.iter().any(|idx| match idx {
5051 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5052 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5053 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5054 _ => false,
5055 })
5056 }
5057
5058 pub(crate) async fn execute_export(
5059 &self,
5060 target: &str,
5061 source: &str,
5062 options: &HashMap<String, Value>,
5063 prop_manager: &PropertyManager,
5064 ctx: Option<&QueryContext>,
5065 ) -> Result<Vec<HashMap<String, Value>>> {
5066 let format = options
5067 .get("format")
5068 .and_then(|v| v.as_str())
5069 .unwrap_or("csv")
5070 .to_lowercase();
5071
5072 match format.as_str() {
5073 "csv" => {
5074 self.execute_csv_export(target, source, options, prop_manager, ctx)
5075 .await
5076 }
5077 "parquet" => {
5078 self.execute_parquet_export(target, source, options, prop_manager, ctx)
5079 .await
5080 }
5081 _ => Err(anyhow!("Unsupported export format: {}", format)),
5082 }
5083 }
5084
5085 pub(crate) async fn execute_csv_export(
5086 &self,
5087 target: &str,
5088 source: &str,
5089 options: &HashMap<String, Value>,
5090 prop_manager: &PropertyManager,
5091 ctx: Option<&QueryContext>,
5092 ) -> Result<Vec<HashMap<String, Value>>> {
5093 let validated_dest = self.validate_path(source)?;
5095
5096 let schema = self.storage.schema_manager().schema();
5097 let label_meta = schema.labels.get(target);
5098 let edge_meta = schema.edge_types.get(target);
5099
5100 if label_meta.is_none() && edge_meta.is_none() {
5101 return Err(anyhow!("Target '{}' not found in schema", target));
5102 }
5103
5104 let delimiter_str = options
5105 .get("delimiter")
5106 .and_then(|v| v.as_str())
5107 .unwrap_or(",");
5108 let delimiter = if delimiter_str.is_empty() {
5109 b','
5110 } else {
5111 delimiter_str.as_bytes()[0]
5112 };
5113 let has_header = options
5114 .get("header")
5115 .and_then(|v| v.as_bool())
5116 .unwrap_or(true);
5117
5118 let mut wtr = csv::WriterBuilder::new()
5119 .delimiter(delimiter)
5120 .from_path(&validated_dest)?;
5121
5122 let mut count = 0;
5123 let empty_props = HashMap::new();
5125
5126 if let Some(meta) = label_meta {
5127 let label_id = meta.id;
5128 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5129 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5130 prop_names.sort();
5131
5132 let mut headers = vec!["_vid".to_string()];
5133 headers.extend(prop_names.clone());
5134
5135 if has_header {
5136 wtr.write_record(&headers)?;
5137 }
5138
5139 let vids = self
5140 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5141 .await?;
5142
5143 for vid in vids {
5144 let props = prop_manager
5145 .get_all_vertex_props_with_ctx(vid, ctx)
5146 .await?
5147 .unwrap_or_default();
5148
5149 let mut row = Vec::with_capacity(headers.len());
5150 row.push(vid.to_string());
5151 for p_name in &prop_names {
5152 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5153 row.push(self.format_csv_value(val));
5154 }
5155 wtr.write_record(&row)?;
5156 count += 1;
5157 }
5158 } else if let Some(meta) = edge_meta {
5159 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5160 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5161 prop_names.sort();
5162
5163 let mut headers = vec![
5165 "_eid".to_string(),
5166 "_src".to_string(),
5167 "_dst".to_string(),
5168 "_type".to_string(),
5169 ];
5170 headers.extend(prop_names.clone());
5171
5172 if has_header {
5173 wtr.write_record(&headers)?;
5174 }
5175
5176 let edges = self.scan_edge_type(target, ctx).await?;
5177
5178 for (eid, src, dst) in edges {
5179 let props = prop_manager
5180 .get_all_edge_props_with_ctx(eid, ctx)
5181 .await?
5182 .unwrap_or_default();
5183
5184 let mut row = Vec::with_capacity(headers.len());
5185 row.push(eid.to_string());
5186 row.push(src.to_string());
5187 row.push(dst.to_string());
5188 row.push(meta.id.to_string());
5189
5190 for p_name in &prop_names {
5191 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5192 row.push(self.format_csv_value(val));
5193 }
5194 wtr.write_record(&row)?;
5195 count += 1;
5196 }
5197 }
5198
5199 wtr.flush()?;
5200 let mut res = HashMap::new();
5201 res.insert("count".to_string(), Value::Int(count as i64));
5202 Ok(vec![res])
5203 }
5204
5205 pub(crate) async fn execute_parquet_export(
5209 &self,
5210 target: &str,
5211 destination: &str,
5212 _options: &HashMap<String, Value>,
5213 prop_manager: &PropertyManager,
5214 ctx: Option<&QueryContext>,
5215 ) -> Result<Vec<HashMap<String, Value>>> {
5216 let schema_manager = self.storage.schema_manager();
5217 let schema = schema_manager.schema();
5218 let label_meta = schema.labels.get(target);
5219 let edge_meta = schema.edge_types.get(target);
5220
5221 if label_meta.is_none() && edge_meta.is_none() {
5222 return Err(anyhow!("Target '{}' not found in schema", target));
5223 }
5224
5225 let arrow_schema = if label_meta.is_some() {
5226 let dataset = self.storage.vertex_dataset(target)?;
5227 dataset.get_arrow_schema(&schema)?
5228 } else {
5229 let dataset = self.storage.edge_dataset(target, "", "")?;
5231 dataset.get_arrow_schema(&schema)?
5232 };
5233
5234 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5235
5236 if let Some(meta) = label_meta {
5237 let label_id = meta.id;
5238 let vids = self
5239 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5240 .await?;
5241
5242 for vid in vids {
5243 let mut props = prop_manager
5244 .get_all_vertex_props_with_ctx(vid, ctx)
5245 .await?
5246 .unwrap_or_default();
5247
5248 props.insert(
5249 "_vid".to_string(),
5250 uni_common::Value::Int(vid.as_u64() as i64),
5251 );
5252 if !props.contains_key("_uid") {
5253 props.insert(
5254 "_uid".to_string(),
5255 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5256 );
5257 }
5258 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5259 props.insert("_version".to_string(), uni_common::Value::Int(1));
5260 rows.push(props);
5261 }
5262 } else if edge_meta.is_some() {
5263 let edges = self.scan_edge_type(target, ctx).await?;
5264 for (eid, src, dst) in edges {
5265 let mut props = prop_manager
5266 .get_all_edge_props_with_ctx(eid, ctx)
5267 .await?
5268 .unwrap_or_default();
5269
5270 props.insert(
5271 "eid".to_string(),
5272 uni_common::Value::Int(eid.as_u64() as i64),
5273 );
5274 props.insert(
5275 "src_vid".to_string(),
5276 uni_common::Value::Int(src.as_u64() as i64),
5277 );
5278 props.insert(
5279 "dst_vid".to_string(),
5280 uni_common::Value::Int(dst.as_u64() as i64),
5281 );
5282 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5283 props.insert("_version".to_string(), uni_common::Value::Int(1));
5284 rows.push(props);
5285 }
5286 }
5287
5288 if is_cloud_url(destination) {
5290 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5291 .await?;
5292 } else {
5293 let validated_dest = self.validate_path(destination)?;
5295 let file = std::fs::File::create(&validated_dest)?;
5296 let mut writer =
5297 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5298
5299 if !rows.is_empty() {
5301 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5302 writer.write(&batch)?;
5303 }
5304
5305 writer.close()?;
5306 }
5307
5308 let mut res = HashMap::new();
5309 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5310 Ok(vec![res])
5311 }
5312
5313 async fn write_parquet_to_cloud(
5315 &self,
5316 dest_url: &str,
5317 rows: &[HashMap<String, uni_common::Value>],
5318 arrow_schema: &arrow_schema::Schema,
5319 ) -> Result<()> {
5320 use object_store::ObjectStoreExt;
5321
5322 let (store, path) = build_store_from_url(dest_url)?;
5323
5324 let mut buffer = Vec::new();
5326 {
5327 let mut writer = parquet::arrow::ArrowWriter::try_new(
5328 &mut buffer,
5329 Arc::new(arrow_schema.clone()),
5330 None,
5331 )?;
5332
5333 if !rows.is_empty() {
5334 let batch = self.rows_to_batch(rows, arrow_schema)?;
5335 writer.write(&batch)?;
5336 }
5337
5338 writer.close()?;
5339 }
5340
5341 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5343
5344 Ok(())
5345 }
5346
5347 pub(crate) fn rows_to_batch(
5348 &self,
5349 rows: &[HashMap<String, uni_common::Value>],
5350 schema: &arrow_schema::Schema,
5351 ) -> Result<RecordBatch> {
5352 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5353
5354 for field in schema.fields() {
5355 let name = field.name();
5356 let dt = field.data_type();
5357
5358 let values: Vec<uni_common::Value> = rows
5359 .iter()
5360 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5361 .collect();
5362 let array = self.values_to_array(&values, dt)?;
5363 columns.push(array);
5364 }
5365
5366 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5367 }
5368
5369 pub(crate) fn values_to_array(
5372 &self,
5373 values: &[uni_common::Value],
5374 dt: &arrow_schema::DataType,
5375 ) -> Result<Arc<dyn Array>> {
5376 arrow_convert::values_to_array(values, dt)
5377 }
5378
5379 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5380 match val {
5381 Value::Null => "".to_string(),
5382 Value::String(s) => s,
5383 Value::Int(i) => i.to_string(),
5384 Value::Float(f) => f.to_string(),
5385 Value::Bool(b) => b.to_string(),
5386 _ => format!("{val}"),
5387 }
5388 }
5389
5390 pub(crate) fn parse_csv_value(
5391 &self,
5392 s: &str,
5393 data_type: &uni_common::core::schema::DataType,
5394 prop_name: &str,
5395 ) -> Result<Value> {
5396 if s.is_empty() || s.to_lowercase() == "null" {
5397 return Ok(Value::Null);
5398 }
5399
5400 use uni_common::core::schema::DataType;
5401 match data_type {
5402 DataType::String => Ok(Value::String(s.to_string())),
5403 DataType::Int32 | DataType::Int64 => {
5404 let i = s.parse::<i64>().map_err(|_| {
5405 anyhow!(
5406 "Failed to parse integer for property '{}': {}",
5407 prop_name,
5408 s
5409 )
5410 })?;
5411 Ok(Value::Int(i))
5412 }
5413 DataType::Float32 | DataType::Float64 => {
5414 let f = s.parse::<f64>().map_err(|_| {
5415 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5416 })?;
5417 Ok(Value::Float(f))
5418 }
5419 DataType::Bool => {
5420 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5421 anyhow!(
5422 "Failed to parse boolean for property '{}': {}",
5423 prop_name,
5424 s
5425 )
5426 })?;
5427 Ok(Value::Bool(b))
5428 }
5429 DataType::CypherValue => {
5430 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5431 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5432 })?;
5433 Ok(Value::from(json_val))
5434 }
5435 DataType::Vector { .. } => {
5436 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5437 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5438 })?;
5439 Ok(Value::Vector(v))
5440 }
5441 _ => Ok(Value::String(s.to_string())),
5442 }
5443 }
5444
5445 pub(crate) async fn detach_delete_vertex(
5446 &self,
5447 vid: Vid,
5448 writer: &Writer,
5449 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5450 ) -> Result<()> {
5451 let schema = self.storage.schema_manager().schema();
5452 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5453
5454 let out_graph = self
5456 .storage
5457 .load_subgraph_cached(
5458 &[vid],
5459 &edge_type_ids,
5460 1,
5461 uni_store::runtime::Direction::Outgoing,
5462 Some(writer.l0_manager.get_current()),
5463 )
5464 .await?;
5465
5466 for edge in out_graph.edges() {
5467 writer
5468 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5469 .await?;
5470 }
5471
5472 let in_graph = self
5474 .storage
5475 .load_subgraph_cached(
5476 &[vid],
5477 &edge_type_ids,
5478 1,
5479 uni_store::runtime::Direction::Incoming,
5480 Some(writer.l0_manager.get_current()),
5481 )
5482 .await?;
5483
5484 for edge in in_graph.edges() {
5485 writer
5486 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5487 .await?;
5488 }
5489
5490 Ok(())
5491 }
5492
5493 async fn batch_load_incident_edges(
5504 &self,
5505 vids: &[Vid],
5506 writer: &Writer,
5507 ) -> Result<(
5508 uni_store::runtime::working_graph::WorkingGraph,
5509 uni_store::runtime::working_graph::WorkingGraph,
5510 )> {
5511 let schema = self.storage.schema_manager().schema();
5512 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5513 let l0 = Some(writer.l0_manager.get_current());
5514
5515 let out_graph = self
5516 .storage
5517 .load_subgraph_cached(
5518 vids,
5519 &edge_type_ids,
5520 1,
5521 uni_store::runtime::Direction::Outgoing,
5522 l0.clone(),
5523 )
5524 .await?;
5525 let in_graph = self
5526 .storage
5527 .load_subgraph_cached(
5528 vids,
5529 &edge_type_ids,
5530 1,
5531 uni_store::runtime::Direction::Incoming,
5532 l0,
5533 )
5534 .await?;
5535 Ok((out_graph, in_graph))
5536 }
5537
5538 pub(crate) async fn batch_detach_delete_vertices(
5540 &self,
5541 vids: &[Vid],
5542 labels_per_vid: Vec<Option<Vec<String>>>,
5543 writer: &Writer,
5544 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5545 ) -> Result<()> {
5546 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5547
5548 for edge in out_graph.edges() {
5549 writer
5550 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5551 .await?;
5552 }
5553 for edge in in_graph.edges() {
5554 writer
5555 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5556 .await?;
5557 }
5558
5559 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5560 writer.delete_vertex(*vid, labels, tx_l0).await?;
5561 }
5562
5563 Ok(())
5564 }
5565
5566 pub(crate) async fn batch_check_vertices_have_no_edges(
5584 &self,
5585 vids: &[Vid],
5586 writer: &Writer,
5587 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5588 ) -> Result<()> {
5589 if vids.is_empty() {
5590 return Ok(());
5591 }
5592
5593 let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5594 std::collections::HashSet::new();
5595 {
5596 let writer_l0 = writer.l0_manager.get_current();
5597 let guard = writer_l0.read();
5598 for &eid in guard.tombstones.keys() {
5599 tombstoned_eids.insert(eid);
5600 }
5601 }
5602 if let Some(tx) = tx_l0 {
5603 let guard = tx.read();
5604 for &eid in guard.tombstones.keys() {
5605 tombstoned_eids.insert(eid);
5606 }
5607 }
5608
5609 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5610
5611 for edge in out_graph.edges().chain(in_graph.edges()) {
5612 if !tombstoned_eids.contains(&edge.eid) {
5613 return Err(anyhow!(
5614 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5615 edge.src_vid
5616 ));
5617 }
5618 }
5619 Ok(())
5620 }
5621}