1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73fn collect_l0_vids(
78 ctx: Option<&QueryContext>,
79 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81 let mut vids = Vec::new();
82 if let Some(ctx) = ctx {
83 vids.extend(extractor(&ctx.l0.read()));
84 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85 vids.extend(extractor(&tx_l0_arc.read()));
86 }
87 for pending_l0_arc in &ctx.pending_flush_l0s {
88 vids.extend(extractor(&pending_l0_arc.read()));
89 }
90 }
91 vids
92}
93
94async fn hydrate_entity_if_needed(
103 map: &mut HashMap<String, Value>,
104 prop_manager: &PropertyManager,
105 ctx: Option<&QueryContext>,
106) {
107 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110 tracing::debug!(
111 "Pushdown fallback: hydrating edge {} at execution time",
112 eid_u64
113 );
114 if let Ok(Some(props)) = prop_manager
115 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116 .await
117 {
118 for (key, value) in props {
119 map.entry(key).or_insert(value);
120 }
121 }
122 } else {
123 tracing::trace!(
124 "Pushdown success: edge {} already has {} properties",
125 eid_u64,
126 map.len() - EDGE_SYSTEM_FIELD_COUNT
127 );
128 }
129 return;
130 }
131
132 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135 tracing::debug!(
136 "Pushdown fallback: hydrating vertex {} at execution time",
137 vid_u64
138 );
139 if let Ok(Some(props)) = prop_manager
140 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141 .await
142 {
143 for (key, value) in props {
144 map.entry(key).or_insert(value);
145 }
146 }
147 } else {
148 tracing::trace!(
149 "Pushdown success: vertex {} already has {} properties",
150 vid_u64,
151 map.len() - VERTEX_SYSTEM_FIELD_COUNT
152 );
153 }
154 }
155}
156
157fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163 let prefix = format!("{}.", var);
164 let mut map = HashMap::new();
165
166 let dotted_keys: Vec<String> = row
167 .keys()
168 .filter(|k| k.starts_with(&prefix))
169 .cloned()
170 .collect();
171
172 for key in &dotted_keys {
173 let prop_name = &key[prefix.len()..];
174 if let Some(val) = row.remove(key) {
175 map.insert(prop_name.to_string(), val);
176 }
177 }
178
179 row.insert(var.to_string(), Value::Map(map));
181}
182
183fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193 let vid_key = format!("{}._vid", var);
195 let labels_key = format!("{}._labels", var);
196
197 let vid_val = row.remove(&vid_key);
198 let labels_val = row.remove(&labels_key);
199
200 if let Some(Value::Map(map)) = row.get_mut(var) {
201 if let Some(v) = vid_val {
202 map.insert("_vid".to_string(), v);
203 }
204 if let Some(v) = labels_val {
205 map.insert("_labels".to_string(), v);
206 }
207 }
208
209 let eid_key = format!("{}._eid", var);
214 let type_key = format!("{}._type", var);
215
216 let eid_val = row.remove(&eid_key);
217 let type_val = row.remove(&type_key);
218
219 if (eid_val.is_some() || type_val.is_some())
220 && let Some(Value::Map(map)) = row.get_mut(var)
221 {
222 if let Some(v) = eid_val {
223 map.entry("_eid".to_string()).or_insert(v);
224 }
225 if let Some(v) = type_val {
226 map.entry("_type".to_string()).or_insert(v);
227 }
228 }
229
230 let prefix = format!("{}.", var);
232 let dotted_keys: Vec<String> = row
233 .keys()
234 .filter(|k| k.starts_with(&prefix))
235 .cloned()
236 .collect();
237 for key in dotted_keys {
238 let prop_name = key[prefix.len()..].to_string();
239 let val = row.remove(&key);
240 if prop_name.starts_with('_') || prop_name == "overflow_json" {
241 continue;
242 }
243 if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244 map.insert(prop_name, val);
245 }
246 }
247}
248
249impl Executor {
250 async fn verify_and_filter_candidates(
255 &self,
256 mut candidates: Vec<Vid>,
257 variable: &str,
258 filter: Option<&Expr>,
259 ctx: Option<&QueryContext>,
260 prop_manager: &PropertyManager,
261 params: &HashMap<String, Value>,
262 ) -> Result<Vec<Vid>> {
263 candidates.sort_unstable();
264 candidates.dedup();
265
266 let mut verified_vids = Vec::new();
267 for vid in candidates {
268 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269 continue; };
271
272 if let Some(expr) = filter {
273 let mut props_map: HashMap<String, Value> = props;
274 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276 let mut row = HashMap::new();
277 row.insert(variable.to_string(), Value::Map(props_map));
278
279 let res = self
280 .evaluate_expr(expr, &row, prop_manager, params, ctx)
281 .await?;
282 if res.as_bool().unwrap_or(false) {
283 verified_vids.push(vid);
284 }
285 } else {
286 verified_vids.push(vid);
287 }
288 }
289
290 Ok(verified_vids)
291 }
292
293 pub(crate) async fn scan_storage_candidates(
294 &self,
295 label_id: u16,
296 variable: &str,
297 filter: Option<&Expr>,
298 ) -> Result<Vec<Vid>> {
299 let schema = self.storage.schema_manager().schema();
300 let label_name = schema
301 .label_name_by_id(label_id)
302 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304 let empty_props = std::collections::HashMap::new();
306 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307 let filter_sql = filter.and_then(|expr| {
308 LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309 });
310
311 self.storage
312 .scan_vertex_candidates(label_name, filter_sql.as_deref())
313 .await
314 }
315
316 pub(crate) async fn scan_label_with_filter(
317 &self,
318 label_id: u16,
319 variable: &str,
320 filter: Option<&Expr>,
321 ctx: Option<&QueryContext>,
322 prop_manager: &PropertyManager,
323 params: &HashMap<String, Value>,
324 ) -> Result<Vec<Vid>> {
325 let mut candidates = self
326 .scan_storage_candidates(label_id, variable, filter)
327 .await?;
328
329 let schema = self.storage.schema_manager().schema();
331 if let Some(label_name) = schema.label_name_by_id(label_id) {
332 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333 }
334
335 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336 .await
337 }
338
339 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340 if let Value::Node(node) = val {
342 return Ok(node.vid);
343 }
344 if let Value::Map(map) = val
346 && let Some(vid_val) = map.get("_vid")
347 && let Some(v) = vid_val.as_u64()
348 {
349 return Ok(Vid::from(v));
350 }
351 if let Some(s) = val.as_str()
353 && let Ok(id) = s.parse::<u64>()
354 {
355 return Ok(Vid::new(id));
356 }
357 if let Some(v) = val.as_u64() {
359 return Ok(Vid::from(v));
360 }
361 Err(anyhow!("Invalid Vid format: {:?}", val))
362 }
363
364 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370 for val in row.values() {
371 if let Ok(vid) = Self::vid_from_value(val)
372 && vid == target_vid
373 {
374 return val.clone();
375 }
376 }
377 Value::Map(HashMap::from([(
379 "_vid".to_string(),
380 Value::Int(target_vid.as_u64() as i64),
381 )]))
382 }
383
384 pub async fn create_datafusion_planner(
389 &self,
390 prop_manager: &PropertyManager,
391 params: &HashMap<String, Value>,
392 ) -> Result<(
393 Arc<SyncRwLock<SessionContext>>,
394 HybridPhysicalPlanner,
395 Arc<PropertyManager>,
396 )> {
397 let query_ctx = self.get_context().await;
398 let l0_context = match query_ctx {
399 Some(ref ctx) => L0Context::from_query_context(ctx),
400 None => L0Context::empty(),
401 };
402
403 let effective_storage = self.effective_storage();
412
413 let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
419 shared.clone()
420 } else {
421 Arc::new(PropertyManager::new(
422 self.storage.clone(),
423 self.storage.schema_manager_arc(),
424 prop_manager.cache_size(),
425 ))
426 };
427
428 let has_custom_udfs = self
447 .custom_function_registry
448 .as_ref()
449 .is_some_and(|r| !r.is_empty());
450 let host_plugin_registry = self
451 .procedure_registry
452 .as_ref()
453 .and_then(|pr| pr.plugin_registry());
454 let optimizer_providers = host_plugin_registry
458 .as_ref()
459 .map(|pr| pr.optimizer_rules())
460 .unwrap_or_default();
461 let session_local_registry =
462 crate::query::df_udfs_plugin::current_session_plugin_registry();
463 let needs_dynamic_registration = has_custom_udfs
464 || !optimizer_providers.is_empty()
465 || host_plugin_registry.is_some()
466 || session_local_registry.is_some();
467
468 let session = if let (Some(tmpl), false) = (
469 self.df_session_template.as_ref(),
470 needs_dynamic_registration,
471 ) {
472 (**tmpl).clone()
474 } else {
475 let session = if optimizer_providers.is_empty() {
483 SessionContext::new()
484 } else {
485 use datafusion::execution::session_state::SessionStateBuilder;
486 use uni_plugin::traits::operator::OptimizerPhase;
487 let mut builder = SessionStateBuilder::new().with_default_features();
488 for provider in optimizer_providers.iter() {
489 match provider.phase() {
490 OptimizerPhase::Logical => {
491 builder = builder.with_optimizer_rule(provider.rule());
492 }
493 OptimizerPhase::Physical => {
494 if let Some(rule) = provider.physical_rule() {
495 builder = builder.with_physical_optimizer_rule(rule);
496 } else {
497 tracing::debug!(
498 target: "uni.plugin.registry",
499 "physical-phase provider returned no physical_rule(); skipping"
500 );
501 }
502 }
503 OptimizerPhase::Both => {
504 builder = builder.with_optimizer_rule(provider.rule());
505 if let Some(rule) = provider.physical_rule() {
506 builder = builder.with_physical_optimizer_rule(rule);
507 }
508 }
509 _ => {
510 tracing::debug!(
511 target: "uni.plugin.registry",
512 "skipping optimizer rule for unknown phase"
513 );
514 }
515 }
516 }
517 let state = builder.build();
518 SessionContext::new_with_state(state)
519 };
520 crate::query::df_udfs::register_cypher_udfs(&session)?;
521 if let Some(ref registry) = self.custom_function_registry {
525 crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
526 &session, registry,
527 )?;
528 }
529 if let Some(ref host_pr) = host_plugin_registry {
540 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
541 }
542 if let Some(ref session_local) = session_local_registry {
548 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
549 }
550 session
551 };
552 let session_ctx = Arc::new(SyncRwLock::new(session));
553
554 let mut planner = HybridPhysicalPlanner::with_l0_context(
555 session_ctx.clone(),
556 effective_storage.clone(),
557 l0_context,
558 prop_manager_arc.clone(),
559 effective_storage.schema_manager().schema(),
560 params.clone(),
561 HashMap::new(),
562 );
563
564 planner = planner.with_algo_registry(self.algo_registry.clone());
565 if let Some(ref registry) = self.procedure_registry {
566 planner = planner.with_procedure_registry(registry.clone());
567 }
568 let host_plugin_registry = self
577 .procedure_registry
578 .as_ref()
579 .and_then(|r| r.plugin_registry());
580 if let Some(registry) = host_plugin_registry {
581 planner = planner.with_plugin_registry(registry);
582 }
583 if let Some(ref xervo_runtime) = self.xervo_runtime {
584 planner = planner.with_xervo_runtime(xervo_runtime.clone());
585 }
586 if let Some(writer) = &self.writer {
591 planner = planner.with_writer(Arc::clone(writer));
592 }
593
594 Ok((session_ctx, planner, prop_manager_arc))
595 }
596
597 pub fn collect_batches(
599 session_ctx: &Arc<SyncRwLock<SessionContext>>,
600 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
601 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
602 Box::pin(async move {
603 use futures::TryStreamExt;
604
605 let task_ctx = session_ctx.read().task_ctx();
606 let partition_count = execution_plan.output_partitioning().partition_count();
607 let mut all_batches = Vec::new();
608 for partition in 0..partition_count {
609 let stream = execution_plan.execute(partition, task_ctx.clone())?;
610 let batches: Vec<RecordBatch> = stream.try_collect().await?;
611 all_batches.extend(batches);
612 }
613 Ok(all_batches)
614 })
615 }
616
617 pub async fn execute_datafusion(
622 &self,
623 plan: LogicalPlan,
624 prop_manager: &PropertyManager,
625 params: &HashMap<String, Value>,
626 ) -> Result<Vec<RecordBatch>> {
627 let (batches, _plan) = self
628 .execute_datafusion_with_plan(plan, prop_manager, params)
629 .await?;
630 Ok(batches)
631 }
632
633 pub async fn execute_datafusion_with_plan(
640 &self,
641 plan: LogicalPlan,
642 prop_manager: &PropertyManager,
643 params: &HashMap<String, Value>,
644 ) -> Result<(
645 Vec<RecordBatch>,
646 Arc<dyn datafusion::physical_plan::ExecutionPlan>,
647 )> {
648 let (session_ctx, mut planner, prop_manager_arc) =
649 self.create_datafusion_planner(prop_manager, params).await?;
650
651 if Self::contains_write_operations(&plan) {
653 let writer = self
654 .writer
655 .as_ref()
656 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
657 .clone();
658 let query_ctx = self.get_context().await;
659
660 debug_assert!(
661 query_ctx.is_some(),
662 "BUG: query_ctx is None for write operation"
663 );
664
665 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
666 executor: self.clone(),
667 writer,
668 prop_manager: prop_manager_arc,
669 params: params.clone(),
670 query_ctx,
671 tx_l0_override: self.transaction_l0_override.clone(),
672 });
673 planner = planner.with_mutation_context(mutation_ctx);
674 tracing::debug!(
675 plan_type = Self::get_plan_type(&plan),
676 "Mutation routed to DataFusion engine"
677 );
678 }
679
680 let execution_plan = planner.plan(&plan)?;
681 let plan_clone = Arc::clone(&execution_plan);
682 let result = Self::collect_batches(&session_ctx, execution_plan).await;
683
684 let graph_warnings = planner.graph_ctx().take_warnings();
686 if !graph_warnings.is_empty()
687 && let Ok(mut w) = self.warnings.lock()
688 {
689 w.extend(graph_warnings);
690 }
691
692 result.map(|batches| (batches, plan_clone))
693 }
694
695 pub(crate) async fn execute_merge_read_plan(
701 &self,
702 plan: LogicalPlan,
703 prop_manager: &PropertyManager,
704 params: &HashMap<String, Value>,
705 merge_variables: Vec<String>,
706 ) -> Result<Vec<HashMap<String, Value>>> {
707 let (session_ctx, planner, _prop_manager_arc) =
708 self.create_datafusion_planner(prop_manager, params).await?;
709
710 let extra: HashMap<String, HashSet<String>> = merge_variables
713 .iter()
714 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
715 .collect();
716 let execution_plan = planner.plan_with_properties(&plan, extra)?;
717 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
718
719 let flat_rows = self.record_batches_to_rows(all_batches)?;
721
722 let rows = flat_rows
725 .into_iter()
726 .map(|mut row| {
727 for var in &merge_variables {
728 if row.contains_key(var) {
730 continue;
731 }
732 let prefix = format!("{}.", var);
733 let dotted_keys: Vec<String> = row
734 .keys()
735 .filter(|k| k.starts_with(&prefix))
736 .cloned()
737 .collect();
738 if !dotted_keys.is_empty() {
739 let mut map = HashMap::new();
740 for key in dotted_keys {
741 let prop_name = key[prefix.len()..].to_string();
742 if let Some(val) = row.remove(&key) {
743 map.insert(prop_name, val);
744 }
745 }
746 row.insert(var.clone(), Value::Map(map));
747 }
748 }
749 row
750 })
751 .collect();
752
753 Ok(rows)
754 }
755
756 pub(crate) fn record_batches_to_rows(
763 &self,
764 batches: Vec<RecordBatch>,
765 ) -> Result<Vec<HashMap<String, Value>>> {
766 let mut rows = Vec::new();
767
768 for batch in batches {
769 let num_rows = batch.num_rows();
770 let schema = batch.schema();
771
772 for row_idx in 0..num_rows {
773 let mut row = HashMap::new();
774
775 for (col_idx, field) in schema.fields().iter().enumerate() {
776 let column = batch.column(col_idx);
777 let data_type = if field
784 .metadata()
785 .get("uni_raw_bytes")
786 .is_some_and(|v| v == "true")
787 {
788 Some(&uni_common::DataType::Bytes)
789 } else if uni_common::core::schema::is_datetime_struct(field.data_type()) {
790 Some(&uni_common::DataType::DateTime)
791 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
792 Some(&uni_common::DataType::Time)
793 } else {
794 None
795 };
796 let mut value =
797 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
798
799 if field
802 .metadata()
803 .get("cv_encoded")
804 .is_some_and(|v| v == "true")
805 && let Value::String(s) = &value
806 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
807 {
808 value = Value::from(parsed);
809 }
810
811 value = Self::normalize_path_if_needed(value);
813
814 row.insert(field.name().clone(), value);
815 }
816
817 let bare_vars: Vec<String> = row
826 .keys()
827 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
828 .cloned()
829 .collect();
830
831 let vid_placeholder_vars: Vec<String> = row
835 .keys()
836 .filter(|k| {
837 !k.contains('.')
838 && matches!(row.get(*k), Some(Value::String(_)))
839 && row.contains_key(&format!("{}._vid", k))
840 })
841 .cloned()
842 .collect();
843
844 for var in &vid_placeholder_vars {
845 promote_vid_placeholder(&mut row, var);
846 }
847
848 for var in &bare_vars {
849 merge_dotted_columns(&mut row, var);
850 }
851
852 rows.push(row);
853 }
854 }
855
856 Ok(rows)
857 }
858
859 fn normalize_path_if_needed(value: Value) -> Value {
864 match value {
865 Value::Map(map)
866 if map.contains_key("nodes")
867 && (map.contains_key("relationships") || map.contains_key("edges")) =>
868 {
869 Self::normalize_path_map(map)
870 }
871 other => other,
872 }
873 }
874
875 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
877 if let Some(Value::List(nodes)) = map.remove("nodes") {
879 let normalized_nodes: Vec<Value> = nodes
880 .into_iter()
881 .map(|n| {
882 if let Value::Map(node_map) = n {
883 Self::normalize_path_node_map(node_map)
884 } else {
885 n
886 }
887 })
888 .collect();
889 map.insert("nodes".to_string(), Value::List(normalized_nodes));
890 }
891
892 let rels_key = if map.contains_key("relationships") {
894 "relationships"
895 } else {
896 "edges"
897 };
898 if let Some(Value::List(rels)) = map.remove(rels_key) {
899 let normalized_rels: Vec<Value> = rels
900 .into_iter()
901 .map(|r| {
902 if let Value::Map(rel_map) = r {
903 Self::normalize_path_edge_map(rel_map)
904 } else {
905 r
906 }
907 })
908 .collect();
909 map.insert("relationships".to_string(), Value::List(normalized_rels));
910 }
911
912 Value::Map(map)
913 }
914
915 fn value_to_id_string(val: Value) -> String {
917 match val {
918 Value::Int(n) => n.to_string(),
919 Value::Float(n) => n.to_string(),
920 Value::String(s) => s,
921 other => other.to_string(),
922 }
923 }
924
925 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
928 if let Some(val) = map.remove(src_key) {
929 map.insert(
930 dst_key.to_string(),
931 Value::String(Self::value_to_id_string(val)),
932 );
933 }
934 }
935
936 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
938 match map.get("properties") {
939 Some(props) if !props.is_null() => {}
940 _ => {
941 map.insert("properties".to_string(), Value::Map(HashMap::new()));
942 }
943 }
944 }
945
946 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
948 Self::stringify_map_field(&mut map, "_vid", "_id");
949 Self::ensure_properties_map(&mut map);
950 Value::Map(map)
951 }
952
953 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
955 Self::stringify_map_field(&mut map, "_eid", "_id");
956 Self::stringify_map_field(&mut map, "_src", "_src");
957 Self::stringify_map_field(&mut map, "_dst", "_dst");
958
959 if let Some(type_name) = map.remove("_type_name") {
960 map.insert("_type".to_string(), type_name);
961 }
962
963 Self::ensure_properties_map(&mut map);
964 Value::Map(map)
965 }
966
967 #[instrument(
968 skip(self, prop_manager, params),
969 fields(rows_returned, duration_ms),
970 level = "info"
971 )]
972 pub fn execute<'a>(
973 &'a self,
974 plan: LogicalPlan,
975 prop_manager: &'a PropertyManager,
976 params: &'a HashMap<String, Value>,
977 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
978 Box::pin(async move {
979 let query_type = Self::get_plan_type(&plan);
980 let ctx = self.get_context().await;
981 let start = Instant::now();
982
983 let res = if Self::is_ddl_or_admin(&plan) {
986 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
987 .await
988 } else {
989 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
990 self.record_batches_to_rows(batches)
991 };
992
993 let duration = start.elapsed();
994 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
995 .record(duration.as_secs_f64());
996
997 tracing::Span::current().record("duration_ms", duration.as_millis());
998 match &res {
999 Ok(rows) => {
1000 tracing::Span::current().record("rows_returned", rows.len());
1001 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
1002 .increment(rows.len() as u64);
1003 }
1004 Err(e) => {
1005 let error_type = if e.to_string().contains("timed out") {
1006 "timeout"
1007 } else if e.to_string().contains("syntax") {
1008 "syntax"
1009 } else {
1010 "execution"
1011 };
1012 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
1013 }
1014 }
1015
1016 res
1017 })
1018 }
1019
1020 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1021 match plan {
1022 LogicalPlan::Scan { .. } => "read_scan",
1023 LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1024 LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1025 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1026 LogicalPlan::Traverse { .. } => "read_traverse",
1027 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1028 LogicalPlan::ScanAll { .. } => "read_scan_all",
1029 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1030 LogicalPlan::VectorKnn { .. } => "read_vector",
1031 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1032 LogicalPlan::Merge { .. } => "write_merge",
1033 LogicalPlan::Delete { .. } => "write_delete",
1034 LogicalPlan::Set { .. } => "write_set",
1035 LogicalPlan::Remove { .. } => "write_remove",
1036 LogicalPlan::ProcedureCall { .. } => "call",
1037 LogicalPlan::Copy { .. } => "copy",
1038 LogicalPlan::Backup { .. } => "backup",
1039 _ => "other",
1040 }
1041 }
1042
1043 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1053 match plan {
1054 LogicalPlan::Project { input, .. }
1056 | LogicalPlan::Sort { input, .. }
1057 | LogicalPlan::Limit { input, .. }
1058 | LogicalPlan::Distinct { input }
1059 | LogicalPlan::Aggregate { input, .. }
1060 | LogicalPlan::Window { input, .. }
1061 | LogicalPlan::Unwind { input, .. }
1062 | LogicalPlan::Filter { input, .. }
1063 | LogicalPlan::Create { input, .. }
1064 | LogicalPlan::CreateBatch { input, .. }
1065 | LogicalPlan::Set { input, .. }
1066 | LogicalPlan::Remove { input, .. }
1067 | LogicalPlan::Delete { input, .. }
1068 | LogicalPlan::Merge { input, .. }
1069 | LogicalPlan::Foreach { input, .. }
1070 | LogicalPlan::Traverse { input, .. }
1071 | LogicalPlan::TraverseMainByType { input, .. }
1072 | LogicalPlan::BindZeroLengthPath { input, .. }
1073 | LogicalPlan::BindPath { input, .. }
1074 | LogicalPlan::ShortestPath { input, .. }
1075 | LogicalPlan::AllShortestPaths { input, .. }
1076 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1077
1078 LogicalPlan::Apply {
1080 input, subquery, ..
1081 }
1082 | LogicalPlan::SubqueryCall { input, subquery } => {
1083 vec![input.as_ref(), subquery.as_ref()]
1084 }
1085 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1086 vec![left.as_ref(), right.as_ref()]
1087 }
1088 LogicalPlan::RecursiveCTE {
1089 initial, recursive, ..
1090 } => vec![initial.as_ref(), recursive.as_ref()],
1091 LogicalPlan::QuantifiedPattern {
1092 input,
1093 pattern_plan,
1094 ..
1095 } => vec![input.as_ref(), pattern_plan.as_ref()],
1096
1097 _ => vec![],
1099 }
1100 }
1101
1102 pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1109 match plan {
1110 LogicalPlan::CreateLabel(_)
1112 | LogicalPlan::CreateEdgeType(_)
1113 | LogicalPlan::AlterLabel(_)
1114 | LogicalPlan::AlterEdgeType(_)
1115 | LogicalPlan::DropLabel(_)
1116 | LogicalPlan::DropEdgeType(_)
1117 | LogicalPlan::CreateConstraint(_)
1118 | LogicalPlan::DropConstraint(_)
1119 | LogicalPlan::ShowConstraints(_) => true,
1120
1121 LogicalPlan::CreateVectorIndex { .. }
1123 | LogicalPlan::CreateFullTextIndex { .. }
1124 | LogicalPlan::CreateScalarIndex { .. }
1125 | LogicalPlan::CreateJsonFtsIndex { .. }
1126 | LogicalPlan::DropIndex { .. }
1127 | LogicalPlan::ShowIndexes { .. } => true,
1128
1129 LogicalPlan::ShowDatabase
1131 | LogicalPlan::ShowConfig
1132 | LogicalPlan::ShowStatistics
1133 | LogicalPlan::Vacuum
1134 | LogicalPlan::Checkpoint
1135 | LogicalPlan::Copy { .. }
1136 | LogicalPlan::CopyTo { .. }
1137 | LogicalPlan::CopyFrom { .. }
1138 | LogicalPlan::Backup { .. }
1139 | LogicalPlan::Explain { .. } => true,
1140
1141 LogicalPlan::ProcedureCall { procedure_name, .. } => {
1144 !Self::is_df_eligible_procedure(procedure_name)
1145 }
1146
1147 _ => Self::plan_children(plan)
1149 .iter()
1150 .any(|child| Self::is_ddl_or_admin(child)),
1151 }
1152 }
1153
1154 fn is_df_eligible_procedure(name: &str) -> bool {
1160 matches!(
1161 name,
1162 "uni.schema.labels"
1163 | "uni.schema.edgeTypes"
1164 | "uni.schema.relationshipTypes"
1165 | "uni.schema.indexes"
1166 | "uni.schema.constraints"
1167 | "uni.schema.labelInfo"
1168 | "uni.vector.query"
1169 | "uni.fts.query"
1170 | "uni.search"
1171 | "uni.create.vNode"
1178 | "uni.create.vEdge"
1179 ) || name.starts_with("uni.algo.")
1180 }
1181
1182 fn contains_write_operations(plan: &LogicalPlan) -> bool {
1189 match plan {
1190 LogicalPlan::Create { .. }
1191 | LogicalPlan::CreateBatch { .. }
1192 | LogicalPlan::Merge { .. }
1193 | LogicalPlan::Delete { .. }
1194 | LogicalPlan::Set { .. }
1195 | LogicalPlan::Remove { .. }
1196 | LogicalPlan::Foreach { .. } => true,
1197 _ => Self::plan_children(plan)
1198 .iter()
1199 .any(|child| Self::contains_write_operations(child)),
1200 }
1201 }
1202
1203 pub fn execute_stream(
1208 self,
1209 plan: LogicalPlan,
1210 prop_manager: Arc<PropertyManager>,
1211 params: HashMap<String, Value>,
1212 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1213 let this = self;
1214 let this_for_ctx = this.clone();
1215
1216 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1217
1218 ctx_stream
1219 .flat_map(move |ctx| {
1220 let plan = plan.clone();
1221 let this = this.clone();
1222 let prop_manager = prop_manager.clone();
1223 let params = params.clone();
1224
1225 let fut = async move {
1226 if Self::is_ddl_or_admin(&plan) {
1227 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1228 .await
1229 } else {
1230 let batches = this
1231 .execute_datafusion(plan, &prop_manager, ¶ms)
1232 .await?;
1233 this.record_batches_to_rows(batches)
1234 }
1235 };
1236 stream::once(fut).boxed()
1237 })
1238 .boxed()
1239 }
1240
1241 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1244 arrow_convert::arrow_to_value(col, row, None)
1245 }
1246
1247 pub(crate) fn evaluate_expr<'a>(
1248 &'a self,
1249 expr: &'a Expr,
1250 row: &'a HashMap<String, Value>,
1251 prop_manager: &'a PropertyManager,
1252 params: &'a HashMap<String, Value>,
1253 ctx: Option<&'a QueryContext>,
1254 ) -> BoxFuture<'a, Result<Value>> {
1255 let this = self;
1256 Box::pin(async move {
1257 let repr = expr.to_string_repr();
1259 if let Some(val) = row.get(&repr) {
1260 return Ok(val.clone());
1261 }
1262
1263 match expr {
1264 Expr::PatternComprehension { .. } => {
1265 Err(anyhow::anyhow!(
1267 "Pattern comprehensions are handled by DataFusion executor"
1268 ))
1269 }
1270 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1271 "COLLECT subqueries not yet supported in executor"
1272 )),
1273 Expr::Variable(name) => {
1274 if let Some(val) = row.get(name) {
1275 Ok(val.clone())
1276 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1277 Ok(vid_val.clone())
1281 } else {
1282 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1283 }
1284 }
1285 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1286 Expr::Property(var_expr, prop_name) => {
1287 if let Expr::Variable(var_name) = var_expr.as_ref() {
1291 let flat_key = format!("{}.{}", var_name, prop_name);
1292 if let Some(val) = row.get(flat_key.as_str()) {
1293 return Ok(val.clone());
1294 }
1295 }
1296
1297 let base_val = this
1298 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1299 .await?;
1300
1301 if (prop_name == "_vid" || prop_name == "_id")
1303 && let Ok(vid) = Self::vid_from_value(&base_val)
1304 {
1305 return Ok(Value::Int(vid.as_u64() as i64));
1306 }
1307
1308 if let Value::Node(node) = &base_val {
1310 if prop_name == "_vid" || prop_name == "_id" {
1312 return Ok(Value::Int(node.vid.as_u64() as i64));
1313 }
1314 if prop_name == "_labels" {
1315 return Ok(Value::List(
1316 node.labels
1317 .iter()
1318 .map(|l| Value::String(l.clone()))
1319 .collect(),
1320 ));
1321 }
1322 if let Some(val) = node.properties.get(prop_name.as_str()) {
1324 return Ok(val.clone());
1325 }
1326 if let Ok(val) = prop_manager
1328 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1329 .await
1330 {
1331 return Ok(val);
1332 }
1333 return Ok(Value::Null);
1334 }
1335
1336 if let Value::Edge(edge) = &base_val {
1338 if prop_name == "_eid" || prop_name == "_id" {
1340 return Ok(Value::Int(edge.eid.as_u64() as i64));
1341 }
1342 if prop_name == "_type" {
1343 return Ok(Value::String(edge.edge_type.clone()));
1344 }
1345 if prop_name == "_src" {
1346 return Ok(Value::Int(edge.src.as_u64() as i64));
1347 }
1348 if prop_name == "_dst" {
1349 return Ok(Value::Int(edge.dst.as_u64() as i64));
1350 }
1351 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1353 return Ok(val.clone());
1354 }
1355 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1357 {
1358 return Ok(val);
1359 }
1360 return Ok(Value::Null);
1361 }
1362
1363 if let Value::Map(map) = &base_val {
1366 if let Some(val) = map.get(prop_name.as_str()) {
1368 return Ok(val.clone());
1369 }
1370 if let Some(Value::Map(props)) = map.get("properties")
1372 && let Some(val) = props.get(prop_name.as_str())
1373 {
1374 return Ok(val.clone());
1375 }
1376 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1378 map.get("_id")
1379 .and_then(|v| v.as_str())
1380 .and_then(|s| s.parse::<u64>().ok())
1381 });
1382 if let Some(id) = vid_opt {
1383 let vid = Vid::from(id);
1384 if let Ok(val) = prop_manager
1385 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1386 .await
1387 {
1388 return Ok(val);
1389 }
1390 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1391 let eid = uni_common::core::id::Eid::from(id);
1392 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1393 return Ok(val);
1394 }
1395 }
1396 return Ok(Value::Null);
1397 }
1398
1399 if let Ok(vid) = Self::vid_from_value(&base_val) {
1401 return prop_manager
1402 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1403 .await;
1404 }
1405
1406 if base_val.is_null() {
1407 return Ok(Value::Null);
1408 }
1409
1410 {
1412 use crate::query::datetime::{
1413 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1414 is_duration_string, is_temporal_accessor, is_temporal_string,
1415 };
1416
1417 if let Value::Temporal(tv) = &base_val {
1419 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1420 if is_duration_accessor(prop_name) {
1421 return eval_duration_accessor(
1423 &base_val.to_string(),
1424 prop_name,
1425 );
1426 }
1427 } else if is_temporal_accessor(prop_name) {
1428 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1429 }
1430 }
1431
1432 if let Value::String(s) = &base_val {
1434 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1435 return eval_temporal_accessor(s, prop_name);
1436 }
1437 if is_duration_string(s) && is_duration_accessor(prop_name) {
1438 return eval_duration_accessor(s, prop_name);
1439 }
1440 }
1441 }
1442
1443 Err(anyhow!(
1444 "Cannot access property '{}' on {:?}",
1445 prop_name,
1446 base_val
1447 ))
1448 }
1449 Expr::ArrayIndex {
1450 array: arr_expr,
1451 index: idx_expr,
1452 } => {
1453 let arr_val = this
1454 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1455 .await?;
1456 let idx_val = this
1457 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1458 .await?;
1459
1460 if let Value::List(arr) = &arr_val {
1461 if let Some(i) = idx_val.as_i64() {
1463 let idx = if i < 0 {
1464 let positive_idx = arr.len() as i64 + i;
1466 if positive_idx < 0 {
1467 return Ok(Value::Null); }
1469 positive_idx as usize
1470 } else {
1471 i as usize
1472 };
1473 if idx < arr.len() {
1474 return Ok(arr[idx].clone());
1475 }
1476 return Ok(Value::Null);
1477 } else if idx_val.is_null() {
1478 return Ok(Value::Null);
1479 } else {
1480 return Err(anyhow::anyhow!(
1481 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1482 idx_val
1483 ));
1484 }
1485 }
1486 if let Value::Map(map) = &arr_val {
1487 if let Some(key) = idx_val.as_str() {
1488 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1489 } else if !idx_val.is_null() {
1490 return Err(anyhow::anyhow!(
1491 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1492 idx_val
1493 ));
1494 }
1495 }
1496 if let Value::Node(node) = &arr_val {
1498 if let Some(key) = idx_val.as_str() {
1499 if let Some(val) = node.properties.get(key) {
1501 return Ok(val.clone());
1502 }
1503 if let Ok(val) = prop_manager
1505 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1506 .await
1507 {
1508 return Ok(val);
1509 }
1510 return Ok(Value::Null);
1511 } else if !idx_val.is_null() {
1512 return Err(anyhow::anyhow!(
1513 "TypeError: Node index must be a string, got: {:?}",
1514 idx_val
1515 ));
1516 }
1517 }
1518 if let Value::Edge(edge) = &arr_val {
1520 if let Some(key) = idx_val.as_str() {
1521 if let Some(val) = edge.properties.get(key) {
1523 return Ok(val.clone());
1524 }
1525 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1527 return Ok(val);
1528 }
1529 return Ok(Value::Null);
1530 } else if !idx_val.is_null() {
1531 return Err(anyhow::anyhow!(
1532 "TypeError: Edge index must be a string, got: {:?}",
1533 idx_val
1534 ));
1535 }
1536 }
1537 if let Ok(vid) = Self::vid_from_value(&arr_val)
1539 && let Some(key) = idx_val.as_str()
1540 {
1541 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1542 {
1543 return Ok(val);
1544 }
1545 return Ok(Value::Null);
1546 }
1547 if arr_val.is_null() {
1548 return Ok(Value::Null);
1549 }
1550 Err(anyhow!(
1551 "TypeError: InvalidArgumentType - cannot index into {:?}",
1552 arr_val
1553 ))
1554 }
1555 Expr::ArraySlice { array, start, end } => {
1556 let arr_val = this
1557 .evaluate_expr(array, row, prop_manager, params, ctx)
1558 .await?;
1559
1560 if let Value::List(arr) = &arr_val {
1561 let len = arr.len();
1562
1563 let start_idx = if let Some(s) = start {
1565 let v = this
1566 .evaluate_expr(s, row, prop_manager, params, ctx)
1567 .await?;
1568 if v.is_null() {
1569 return Ok(Value::Null);
1570 }
1571 let raw = v.as_i64().unwrap_or(0);
1572 if raw < 0 {
1573 (len as i64 + raw).max(0) as usize
1574 } else {
1575 (raw as usize).min(len)
1576 }
1577 } else {
1578 0
1579 };
1580
1581 let end_idx = if let Some(e) = end {
1583 let v = this
1584 .evaluate_expr(e, row, prop_manager, params, ctx)
1585 .await?;
1586 if v.is_null() {
1587 return Ok(Value::Null);
1588 }
1589 let raw = v.as_i64().unwrap_or(len as i64);
1590 if raw < 0 {
1591 (len as i64 + raw).max(0) as usize
1592 } else {
1593 (raw as usize).min(len)
1594 }
1595 } else {
1596 len
1597 };
1598
1599 if start_idx >= end_idx {
1601 return Ok(Value::List(vec![]));
1602 }
1603 let end_idx = end_idx.min(len);
1604 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1605 }
1606
1607 if arr_val.is_null() {
1608 return Ok(Value::Null);
1609 }
1610 Err(anyhow!("Cannot slice {:?}", arr_val))
1611 }
1612 Expr::Literal(lit) => Ok(lit.to_value()),
1613 Expr::List(items) => {
1614 let mut vals = Vec::new();
1615 for item in items {
1616 vals.push(
1617 this.evaluate_expr(item, row, prop_manager, params, ctx)
1618 .await?,
1619 );
1620 }
1621 Ok(Value::List(vals))
1622 }
1623 Expr::Map(items) => {
1624 let mut map = HashMap::new();
1625 for (key, value_expr) in items {
1626 let val = this
1627 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1628 .await?;
1629 map.insert(key.clone(), val);
1630 }
1631 Ok(Value::Map(map))
1632 }
1633 Expr::Exists { query, .. } => {
1634 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1636 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1637
1638 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1639 Ok(plan) => {
1640 let mut sub_params = params.clone();
1641 sub_params.extend(row.clone());
1642
1643 match this.execute(plan, prop_manager, &sub_params).await {
1644 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1645 Err(e) => {
1646 log::debug!("EXISTS subquery execution failed: {}", e);
1647 Ok(Value::Bool(false))
1648 }
1649 }
1650 }
1651 Err(e) => {
1652 log::debug!("EXISTS subquery planning failed: {}", e);
1653 Ok(Value::Bool(false))
1654 }
1655 }
1656 }
1657 Expr::CountSubquery(query) => {
1658 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1660
1661 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1662
1663 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1664 Ok(plan) => {
1665 let mut sub_params = params.clone();
1666 sub_params.extend(row.clone());
1667
1668 match this.execute(plan, prop_manager, &sub_params).await {
1669 Ok(results) => Ok(Value::from(results.len() as i64)),
1670 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1671 }
1672 }
1673 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1674 }
1675 }
1676 Expr::Quantifier {
1677 quantifier,
1678 variable,
1679 list,
1680 predicate,
1681 } => {
1682 let list_val = this
1696 .evaluate_expr(list, row, prop_manager, params, ctx)
1697 .await?;
1698
1699 if list_val.is_null() {
1701 return Ok(Value::Null);
1702 }
1703
1704 let items = match list_val {
1706 Value::List(arr) => arr,
1707 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1708 };
1709
1710 let mut satisfied_count = 0;
1712 for item in &items {
1713 let mut item_row = row.clone();
1715 item_row.insert(variable.clone(), item.clone());
1716
1717 let pred_result = this
1719 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1720 .await?;
1721
1722 if let Value::Bool(true) = pred_result {
1724 satisfied_count += 1;
1725 }
1726 }
1727
1728 let result = match quantifier {
1730 Quantifier::All => satisfied_count == items.len(),
1731 Quantifier::Any => satisfied_count > 0,
1732 Quantifier::Single => satisfied_count == 1,
1733 Quantifier::None => satisfied_count == 0,
1734 };
1735
1736 Ok(Value::Bool(result))
1737 }
1738 Expr::ListComprehension {
1739 variable,
1740 list,
1741 where_clause,
1742 map_expr,
1743 } => {
1744 let list_val = this
1751 .evaluate_expr(list, row, prop_manager, params, ctx)
1752 .await?;
1753
1754 if list_val.is_null() {
1756 return Ok(Value::Null);
1757 }
1758
1759 let items = match list_val {
1761 Value::List(arr) => arr,
1762 _ => {
1763 return Err(anyhow!(
1764 "List comprehension expects a list, got: {:?}",
1765 list_val
1766 ));
1767 }
1768 };
1769
1770 let mut results = Vec::new();
1772 for item in &items {
1773 let mut item_row = row.clone();
1775 item_row.insert(variable.clone(), item.clone());
1776
1777 if let Some(predicate) = where_clause {
1779 let pred_result = this
1780 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1781 .await?;
1782
1783 if !matches!(pred_result, Value::Bool(true)) {
1785 continue;
1786 }
1787 }
1788
1789 let mapped_val = this
1791 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1792 .await?;
1793 results.push(mapped_val);
1794 }
1795
1796 Ok(Value::List(results))
1797 }
1798 Expr::BinaryOp { left, op, right } => {
1799 match op {
1801 BinaryOp::And => {
1802 let l_val = this
1803 .evaluate_expr(left, row, prop_manager, params, ctx)
1804 .await?;
1805 if let Some(false) = l_val.as_bool() {
1807 return Ok(Value::Bool(false));
1808 }
1809 let r_val = this
1810 .evaluate_expr(right, row, prop_manager, params, ctx)
1811 .await?;
1812 eval_binary_op(&l_val, op, &r_val)
1813 }
1814 BinaryOp::Or => {
1815 let l_val = this
1816 .evaluate_expr(left, row, prop_manager, params, ctx)
1817 .await?;
1818 if let Some(true) = l_val.as_bool() {
1820 return Ok(Value::Bool(true));
1821 }
1822 let r_val = this
1823 .evaluate_expr(right, row, prop_manager, params, ctx)
1824 .await?;
1825 eval_binary_op(&l_val, op, &r_val)
1826 }
1827 _ => {
1828 let l_val = this
1830 .evaluate_expr(left, row, prop_manager, params, ctx)
1831 .await?;
1832 let r_val = this
1833 .evaluate_expr(right, row, prop_manager, params, ctx)
1834 .await?;
1835 eval_binary_op(&l_val, op, &r_val)
1836 }
1837 }
1838 }
1839 Expr::In { expr, list } => {
1840 let l_val = this
1841 .evaluate_expr(expr, row, prop_manager, params, ctx)
1842 .await?;
1843 let r_val = this
1844 .evaluate_expr(list, row, prop_manager, params, ctx)
1845 .await?;
1846 eval_in_op(&l_val, &r_val)
1847 }
1848 Expr::UnaryOp { op, expr } => {
1849 let val = this
1850 .evaluate_expr(expr, row, prop_manager, params, ctx)
1851 .await?;
1852 match op {
1853 UnaryOp::Not => {
1854 match val.as_bool() {
1856 Some(b) => Ok(Value::Bool(!b)),
1857 None if val.is_null() => Ok(Value::Null),
1858 None => Err(anyhow!(
1859 "InvalidArgumentType: NOT requires a boolean argument"
1860 )),
1861 }
1862 }
1863 UnaryOp::Neg => {
1864 if let Some(i) = val.as_i64() {
1865 Ok(Value::Int(-i))
1866 } else if let Some(f) = val.as_f64() {
1867 Ok(Value::Float(-f))
1868 } else {
1869 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1870 }
1871 }
1872 }
1873 }
1874 Expr::IsNull(expr) => {
1875 let val = this
1876 .evaluate_expr(expr, row, prop_manager, params, ctx)
1877 .await?;
1878 Ok(Value::Bool(val.is_null()))
1879 }
1880 Expr::IsNotNull(expr) => {
1881 let val = this
1882 .evaluate_expr(expr, row, prop_manager, params, ctx)
1883 .await?;
1884 Ok(Value::Bool(!val.is_null()))
1885 }
1886 Expr::IsUnique(_) => {
1887 Err(anyhow!(
1889 "IS UNIQUE can only be used in constraint definitions"
1890 ))
1891 }
1892 Expr::Case {
1893 expr,
1894 when_then,
1895 else_expr,
1896 } => {
1897 if let Some(base_expr) = expr {
1898 let base_val = this
1899 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1900 .await?;
1901 for (w, t) in when_then {
1902 let w_val = this
1903 .evaluate_expr(w, row, prop_manager, params, ctx)
1904 .await?;
1905 if base_val == w_val {
1906 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1907 }
1908 }
1909 } else {
1910 for (w, t) in when_then {
1911 let w_val = this
1912 .evaluate_expr(w, row, prop_manager, params, ctx)
1913 .await?;
1914 if w_val.as_bool() == Some(true) {
1915 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1916 }
1917 }
1918 }
1919 if let Some(e) = else_expr {
1920 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1921 }
1922 Ok(Value::Null)
1923 }
1924 Expr::Wildcard => Ok(Value::Null),
1925 Expr::FunctionCall { name, args, .. } => {
1926 if name.eq_ignore_ascii_case("ID") {
1928 if args.len() != 1 {
1929 return Err(anyhow!("id() requires exactly 1 argument"));
1930 }
1931 let val = this
1932 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1933 .await?;
1934 if let Value::Map(map) = &val {
1935 if let Some(vid_val) = map.get("_vid") {
1937 return Ok(vid_val.clone());
1938 }
1939 if let Some(eid_val) = map.get("_eid") {
1941 return Ok(eid_val.clone());
1942 }
1943 if let Some(id_val) = map.get("_id") {
1945 return Ok(id_val.clone());
1946 }
1947 }
1948 return Ok(Value::Null);
1949 }
1950
1951 if name.eq_ignore_ascii_case("ELEMENTID") {
1953 if args.len() != 1 {
1954 return Err(anyhow!("elementId() requires exactly 1 argument"));
1955 }
1956 let val = this
1957 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1958 .await?;
1959 if let Value::Map(map) = &val {
1960 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1963 return Ok(Value::String(vid_val.to_string()));
1964 }
1965 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1968 return Ok(Value::String(eid_val.to_string()));
1969 }
1970 }
1971 return Ok(Value::Null);
1972 }
1973
1974 if name.eq_ignore_ascii_case("TYPE") {
1976 if args.len() != 1 {
1977 return Err(anyhow!("type() requires exactly 1 argument"));
1978 }
1979 let val = this
1980 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1981 .await?;
1982 if let Value::Map(map) = &val
1983 && let Some(type_val) = map.get("_type")
1984 {
1985 if let Some(type_id) =
1987 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1988 {
1989 if let Some(name) = this
1990 .storage
1991 .schema_manager()
1992 .edge_type_name_by_id_unified(type_id)
1993 {
1994 return Ok(Value::String(name));
1995 }
1996 } else if let Some(name) = type_val.as_str() {
1997 return Ok(Value::String(name.to_string()));
1998 }
1999 }
2000 return Ok(Value::Null);
2001 }
2002
2003 if name.eq_ignore_ascii_case("LABELS") {
2005 if args.len() != 1 {
2006 return Err(anyhow!("labels() requires exactly 1 argument"));
2007 }
2008 let val = this
2009 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2010 .await?;
2011 if let Value::Map(map) = &val
2012 && let Some(labels_val) = map.get("_labels")
2013 {
2014 return Ok(labels_val.clone());
2015 }
2016 return Ok(Value::Null);
2017 }
2018
2019 if name.eq_ignore_ascii_case("PROPERTIES") {
2021 if args.len() != 1 {
2022 return Err(anyhow!("properties() requires exactly 1 argument"));
2023 }
2024 let val = this
2025 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2026 .await?;
2027 if let Value::Map(map) = &val {
2028 let mut props = HashMap::new();
2030 for (k, v) in map.iter() {
2031 if !k.starts_with('_') {
2032 props.insert(k.clone(), v.clone());
2033 }
2034 }
2035 return Ok(Value::Map(props));
2036 }
2037 return Ok(Value::Null);
2038 }
2039
2040 if name.eq_ignore_ascii_case("STARTNODE") {
2042 if args.len() != 1 {
2043 return Err(anyhow!("startNode() requires exactly 1 argument"));
2044 }
2045 let val = this
2046 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2047 .await?;
2048 if let Value::Edge(edge) = &val {
2049 return Ok(Self::find_node_by_vid(row, edge.src));
2050 }
2051 if let Value::Map(map) = &val {
2052 if let Some(start_node) = map.get("_startNode") {
2053 return Ok(start_node.clone());
2054 }
2055 if let Some(src_vid) = map.get("_src_vid") {
2056 return Ok(Value::Map(HashMap::from([(
2057 "_vid".to_string(),
2058 src_vid.clone(),
2059 )])));
2060 }
2061 if let Some(src_id) = map.get("_src")
2063 && let Some(u) = src_id.as_u64()
2064 {
2065 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2066 }
2067 }
2068 return Ok(Value::Null);
2069 }
2070
2071 if name.eq_ignore_ascii_case("ENDNODE") {
2073 if args.len() != 1 {
2074 return Err(anyhow!("endNode() requires exactly 1 argument"));
2075 }
2076 let val = this
2077 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2078 .await?;
2079 if let Value::Edge(edge) = &val {
2080 return Ok(Self::find_node_by_vid(row, edge.dst));
2081 }
2082 if let Value::Map(map) = &val {
2083 if let Some(end_node) = map.get("_endNode") {
2084 return Ok(end_node.clone());
2085 }
2086 if let Some(dst_vid) = map.get("_dst_vid") {
2087 return Ok(Value::Map(HashMap::from([(
2088 "_vid".to_string(),
2089 dst_vid.clone(),
2090 )])));
2091 }
2092 if let Some(dst_id) = map.get("_dst")
2094 && let Some(u) = dst_id.as_u64()
2095 {
2096 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2097 }
2098 }
2099 return Ok(Value::Null);
2100 }
2101
2102 if name.eq_ignore_ascii_case("HASLABEL") {
2105 if args.len() != 2 {
2106 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2107 }
2108 let node_val = this
2109 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2110 .await?;
2111 let label_val = this
2112 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2113 .await?;
2114
2115 let label_to_check = label_val.as_str().ok_or_else(|| {
2116 anyhow!("Second argument to hasLabel must be a string")
2117 })?;
2118
2119 let has_label = match &node_val {
2120 Value::Map(map) if map.contains_key("_vid") => {
2122 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2123 labels_arr
2124 .iter()
2125 .any(|l| l.as_str() == Some(label_to_check))
2126 } else {
2127 false
2128 }
2129 }
2130 Value::Map(map) => {
2132 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2133 labels_arr
2134 .iter()
2135 .any(|l| l.as_str() == Some(label_to_check))
2136 } else {
2137 false
2138 }
2139 }
2140 _ => false,
2141 };
2142 return Ok(Value::Bool(has_label));
2143 }
2144
2145 if matches!(
2148 name.to_uppercase().as_str(),
2149 "ANY" | "ALL" | "NONE" | "SINGLE"
2150 ) {
2151 return Err(anyhow!(
2152 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2153 name.to_lowercase()
2154 ));
2155 }
2156
2157 if name.eq_ignore_ascii_case("COALESCE") {
2159 for arg in args {
2160 let val = this
2161 .evaluate_expr(arg, row, prop_manager, params, ctx)
2162 .await?;
2163 if !val.is_null() {
2164 return Ok(val);
2165 }
2166 }
2167 return Ok(Value::Null);
2168 }
2169
2170 if name.eq_ignore_ascii_case("vector_similarity") {
2172 if args.len() != 2 {
2173 return Err(anyhow!("vector_similarity takes 2 arguments"));
2174 }
2175 let v1 = this
2176 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2177 .await?;
2178 let v2 = this
2179 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2180 .await?;
2181 return eval_vector_similarity(&v1, &v2);
2182 }
2183
2184 if name.eq_ignore_ascii_case("uni.temporal.validAt")
2186 || name.eq_ignore_ascii_case("uni.validAt")
2187 || name.eq_ignore_ascii_case("validAt")
2188 {
2189 if args.len() != 4 {
2190 return Err(anyhow!("validAt requires 4 arguments"));
2191 }
2192 let node_val = this
2193 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2194 .await?;
2195 let start_prop = this
2196 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2197 .await?
2198 .as_str()
2199 .ok_or(anyhow!("start_prop must be string"))?
2200 .to_string();
2201 let end_prop = this
2202 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2203 .await?
2204 .as_str()
2205 .ok_or(anyhow!("end_prop must be string"))?
2206 .to_string();
2207 let time_val = this
2208 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2209 .await?;
2210
2211 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2212 anyhow!("time argument must be a datetime value or string")
2213 })?;
2214
2215 let valid_from_val: Option<Value> = if let Ok(vid) =
2217 Self::vid_from_value(&node_val)
2218 {
2219 prop_manager
2221 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2222 .await
2223 .ok()
2224 } else if let Value::Map(map) = &node_val {
2225 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2227 let vid = Vid::from(vid_val);
2228 prop_manager
2229 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2230 .await
2231 .ok()
2232 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2233 let eid = uni_common::core::id::Eid::from(eid_val);
2235 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2236 } else {
2237 map.get(&start_prop).cloned()
2239 }
2240 } else {
2241 return Ok(Value::Bool(false));
2242 };
2243
2244 let valid_from = match valid_from_val {
2245 Some(ref v) => match value_to_datetime_utc(v) {
2246 Some(dt) => dt,
2247 None if v.is_null() => return Ok(Value::Bool(false)),
2248 None => {
2249 return Err(anyhow!(
2250 "Property {} must be a datetime value or string",
2251 start_prop
2252 ));
2253 }
2254 },
2255 None => return Ok(Value::Bool(false)),
2256 };
2257
2258 let valid_to_val: Option<Value> = if let Ok(vid) =
2259 Self::vid_from_value(&node_val)
2260 {
2261 prop_manager
2263 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2264 .await
2265 .ok()
2266 } else if let Value::Map(map) = &node_val {
2267 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2269 let vid = Vid::from(vid_val);
2270 prop_manager
2271 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2272 .await
2273 .ok()
2274 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2275 let eid = uni_common::core::id::Eid::from(eid_val);
2277 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2278 } else {
2279 map.get(&end_prop).cloned()
2281 }
2282 } else {
2283 return Ok(Value::Bool(false));
2284 };
2285
2286 let valid_to = match valid_to_val {
2287 Some(ref v) => match value_to_datetime_utc(v) {
2288 Some(dt) => Some(dt),
2289 None if v.is_null() => None,
2290 None => {
2291 return Err(anyhow!(
2292 "Property {} must be a datetime value or null",
2293 end_prop
2294 ));
2295 }
2296 },
2297 None => None,
2298 };
2299
2300 let is_valid = valid_from <= query_time
2301 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2302 return Ok(Value::Bool(is_valid));
2303 }
2304
2305 let mut evaluated_args = Vec::with_capacity(args.len());
2307 for arg in args {
2308 let mut val = this
2309 .evaluate_expr(arg, row, prop_manager, params, ctx)
2310 .await?;
2311
2312 if let Value::Map(ref mut map) = val {
2315 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2316 }
2317
2318 evaluated_args.push(val);
2319 }
2320 eval_scalar_function(
2321 name,
2322 &evaluated_args,
2323 self.custom_function_registry.as_deref(),
2324 )
2325 }
2326 Expr::Reduce {
2327 accumulator,
2328 init,
2329 variable,
2330 list,
2331 expr,
2332 } => {
2333 let mut acc = self
2334 .evaluate_expr(init, row, prop_manager, params, ctx)
2335 .await?;
2336 let list_val = self
2337 .evaluate_expr(list, row, prop_manager, params, ctx)
2338 .await?;
2339
2340 if let Value::List(items) = list_val {
2341 for item in items {
2342 let mut scope = row.clone();
2346 scope.insert(accumulator.clone(), acc.clone());
2347 scope.insert(variable.clone(), item);
2348
2349 acc = self
2350 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2351 .await?;
2352 }
2353 } else {
2354 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2355 }
2356 Ok(acc)
2357 }
2358 Expr::ValidAt { .. } => {
2359 Err(anyhow!(
2361 "VALID_AT expression should have been transformed to function call in planner"
2362 ))
2363 }
2364
2365 Expr::LabelCheck { expr, labels } => {
2366 let val = this
2367 .evaluate_expr(expr, row, prop_manager, params, ctx)
2368 .await?;
2369 match &val {
2370 Value::Null => Ok(Value::Null),
2371 Value::Map(map) => {
2372 let is_edge = map.contains_key("_eid")
2374 || map.contains_key("_type_name")
2375 || (map.contains_key("_type") && !map.contains_key("_vid"));
2376
2377 if is_edge {
2378 if labels.len() > 1 {
2380 return Ok(Value::Bool(false));
2381 }
2382 let label_to_check = &labels[0];
2383 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2384 {
2385 t == label_to_check
2386 } else if let Some(Value::String(t)) = map.get("_type") {
2387 t == label_to_check
2388 } else {
2389 false
2390 };
2391 Ok(Value::Bool(has_type))
2392 } else {
2393 let has_all = labels.iter().all(|label_to_check| {
2395 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2396 labels_arr
2397 .iter()
2398 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2399 } else {
2400 false
2401 }
2402 });
2403 Ok(Value::Bool(has_all))
2404 }
2405 }
2406 _ => Ok(Value::Bool(false)),
2407 }
2408 }
2409
2410 Expr::MapProjection { base, items } => {
2411 let base_value = this
2412 .evaluate_expr(base, row, prop_manager, params, ctx)
2413 .await?;
2414
2415 let properties = match &base_value {
2417 Value::Map(map) => map,
2418 _ => {
2419 return Err(anyhow!(
2420 "Map projection requires object, got {:?}",
2421 base_value
2422 ));
2423 }
2424 };
2425
2426 let mut result_map = HashMap::new();
2427
2428 for item in items {
2429 match item {
2430 MapProjectionItem::Property(prop) => {
2431 if let Some(value) = properties.get(prop.as_str()) {
2432 result_map.insert(prop.clone(), value.clone());
2433 }
2434 }
2435 MapProjectionItem::AllProperties => {
2436 for (key, value) in properties.iter() {
2438 if !key.starts_with('_') {
2439 result_map.insert(key.clone(), value.clone());
2440 }
2441 }
2442 }
2443 MapProjectionItem::LiteralEntry(key, expr) => {
2444 let value = this
2445 .evaluate_expr(expr, row, prop_manager, params, ctx)
2446 .await?;
2447 result_map.insert(key.clone(), value);
2448 }
2449 MapProjectionItem::Variable(var_name) => {
2450 if let Some(value) = row.get(var_name.as_str()) {
2453 result_map.insert(var_name.clone(), value.clone());
2454 }
2455 }
2456 }
2457 }
2458
2459 Ok(Value::Map(result_map))
2460 }
2461 }
2462 })
2463 }
2464
2465 pub(crate) fn execute_subplan<'a>(
2466 &'a self,
2467 plan: LogicalPlan,
2468 prop_manager: &'a PropertyManager,
2469 params: &'a HashMap<String, Value>,
2470 ctx: Option<&'a QueryContext>,
2471 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2472 Box::pin(async move {
2473 if let Some(ctx) = ctx {
2474 ctx.check_timeout()?;
2475 }
2476 match plan {
2477 LogicalPlan::Union { left, right, all } => {
2478 self.execute_union(left, right, all, prop_manager, params, ctx)
2479 .await
2480 }
2481 LogicalPlan::CreateVectorIndex {
2482 config,
2483 if_not_exists,
2484 } => {
2485 if if_not_exists && self.index_exists_by_name(&config.name) {
2486 return Ok(vec![]);
2487 }
2488 let idx_mgr = self.storage.index_manager();
2489 idx_mgr.create_vector_index(config).await?;
2490 Ok(vec![])
2491 }
2492 LogicalPlan::CreateFullTextIndex {
2493 config,
2494 if_not_exists,
2495 } => {
2496 if if_not_exists && self.index_exists_by_name(&config.name) {
2497 return Ok(vec![]);
2498 }
2499 let idx_mgr = self.storage.index_manager();
2500 idx_mgr.create_fts_index(config).await?;
2501 Ok(vec![])
2502 }
2503 LogicalPlan::CreateScalarIndex {
2504 mut config,
2505 if_not_exists,
2506 } => {
2507 if if_not_exists && self.index_exists_by_name(&config.name) {
2508 return Ok(vec![]);
2509 }
2510
2511 let mut modified_properties = Vec::new();
2513
2514 for prop in &config.properties {
2515 if prop.contains('(') && prop.contains(')') {
2517 let gen_col = SchemaManager::generated_column_name(prop);
2518
2519 let sm = self.storage.schema_manager_arc();
2521 if let Err(e) = sm.add_generated_property(
2522 &config.label,
2523 &gen_col,
2524 DataType::String, prop.clone(),
2526 ) {
2527 log::warn!("Failed to add generated property (might exist): {}", e);
2528 }
2529
2530 modified_properties.push(gen_col);
2531 } else {
2532 modified_properties.push(prop.clone());
2534 }
2535 }
2536
2537 config.properties = modified_properties;
2538
2539 let idx_mgr = self.storage.index_manager();
2540 idx_mgr.create_scalar_index(config).await?;
2541 Ok(vec![])
2542 }
2543 LogicalPlan::CreateJsonFtsIndex {
2544 config,
2545 if_not_exists,
2546 } => {
2547 if if_not_exists && self.index_exists_by_name(&config.name) {
2548 return Ok(vec![]);
2549 }
2550 let idx_mgr = self.storage.index_manager();
2551 idx_mgr.create_json_fts_index(config).await?;
2552 Ok(vec![])
2553 }
2554 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2555 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2556 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2557 LogicalPlan::Vacuum => {
2558 self.execute_vacuum().await?;
2559 Ok(vec![])
2560 }
2561 LogicalPlan::Checkpoint => {
2562 self.execute_checkpoint().await?;
2563 Ok(vec![])
2564 }
2565 LogicalPlan::CopyTo {
2566 label,
2567 path,
2568 format,
2569 options,
2570 } => {
2571 let count = self
2572 .execute_copy_to(&label, &path, &format, &options)
2573 .await?;
2574 let mut result = HashMap::new();
2575 result.insert("count".to_string(), Value::Int(count as i64));
2576 Ok(vec![result])
2577 }
2578 LogicalPlan::CopyFrom {
2579 label,
2580 path,
2581 format,
2582 options,
2583 } => {
2584 let count = self
2585 .execute_copy_from(&label, &path, &format, &options)
2586 .await?;
2587 let mut result = HashMap::new();
2588 result.insert("count".to_string(), Value::Int(count as i64));
2589 Ok(vec![result])
2590 }
2591 LogicalPlan::CreateLabel(clause) => {
2592 self.execute_create_label(clause).await?;
2593 Ok(vec![])
2594 }
2595 LogicalPlan::CreateEdgeType(clause) => {
2596 self.execute_create_edge_type(clause).await?;
2597 Ok(vec![])
2598 }
2599 LogicalPlan::AlterLabel(clause) => {
2600 self.execute_alter_label(clause).await?;
2601 Ok(vec![])
2602 }
2603 LogicalPlan::AlterEdgeType(clause) => {
2604 self.execute_alter_edge_type(clause).await?;
2605 Ok(vec![])
2606 }
2607 LogicalPlan::DropLabel(clause) => {
2608 self.execute_drop_label(clause).await?;
2609 Ok(vec![])
2610 }
2611 LogicalPlan::DropEdgeType(clause) => {
2612 self.execute_drop_edge_type(clause).await?;
2613 Ok(vec![])
2614 }
2615 LogicalPlan::CreateConstraint(clause) => {
2616 self.execute_create_constraint(clause).await?;
2617 Ok(vec![])
2618 }
2619 LogicalPlan::DropConstraint(clause) => {
2620 self.execute_drop_constraint(clause).await?;
2621 Ok(vec![])
2622 }
2623 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2624 LogicalPlan::DropIndex { name, if_exists } => {
2625 let idx_mgr = self.storage.index_manager();
2626 match idx_mgr.drop_index(&name).await {
2627 Ok(_) => Ok(vec![]),
2628 Err(e) => {
2629 if if_exists && e.to_string().contains("not found") {
2630 Ok(vec![])
2631 } else {
2632 Err(e)
2633 }
2634 }
2635 }
2636 }
2637 LogicalPlan::ShowIndexes { filter } => {
2638 Ok(self.execute_show_indexes(filter.as_deref()))
2639 }
2640 LogicalPlan::Scan { .. }
2643 | LogicalPlan::FusedIndexScan { .. }
2644 | LogicalPlan::FusedIndexScanWrapped { .. }
2645 | LogicalPlan::ExtIdLookup { .. }
2646 | LogicalPlan::ScanAll { .. }
2647 | LogicalPlan::ScanMainByLabels { .. }
2648 | LogicalPlan::Traverse { .. }
2649 | LogicalPlan::TraverseMainByType { .. } => {
2650 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2651 self.record_batches_to_rows(batches)
2652 }
2653 LogicalPlan::Filter {
2654 input,
2655 predicate,
2656 optional_variables,
2657 } => {
2658 let input_matches = self
2659 .execute_subplan(*input, prop_manager, params, ctx)
2660 .await?;
2661
2662 tracing::debug!(
2663 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2664 predicate,
2665 input_matches.len(),
2666 optional_variables
2667 );
2668
2669 if !optional_variables.is_empty() {
2673 let is_optional_key = |k: &str| -> bool {
2676 optional_variables.contains(k)
2677 || optional_variables
2678 .iter()
2679 .any(|var| k.starts_with(&format!("{}.", var)))
2680 };
2681
2682 let is_internal_key =
2684 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2685
2686 let non_optional_vars: Vec<String> = input_matches
2688 .first()
2689 .map(|row| {
2690 row.keys()
2691 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2692 .cloned()
2693 .collect()
2694 })
2695 .unwrap_or_default();
2696
2697 let mut groups: std::collections::HashMap<
2699 Vec<u8>,
2700 Vec<HashMap<String, Value>>,
2701 > = std::collections::HashMap::new();
2702
2703 for row in &input_matches {
2704 let key: Vec<u8> = non_optional_vars
2706 .iter()
2707 .map(|var| {
2708 row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2709 })
2710 .collect::<Vec<_>>()
2711 .join("|")
2712 .into_bytes();
2713
2714 groups.entry(key).or_default().push(row.clone());
2715 }
2716
2717 let mut filtered = Vec::new();
2718 for (_key, group_rows) in groups {
2719 let mut group_passed = Vec::new();
2720
2721 for row in &group_rows {
2722 let has_null_optional = optional_variables.iter().any(|var| {
2724 let direct_null =
2726 matches!(row.get(var), Some(Value::Null) | None);
2727 let prefixed_null = row
2728 .keys()
2729 .filter(|k| k.starts_with(&format!("{}.", var)))
2730 .any(|k| matches!(row.get(k), Some(Value::Null)));
2731 direct_null || prefixed_null
2732 });
2733
2734 if has_null_optional {
2735 group_passed.push(row.clone());
2736 continue;
2737 }
2738
2739 let res = self
2740 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2741 .await?;
2742
2743 if res.as_bool().unwrap_or(false) {
2744 group_passed.push(row.clone());
2745 }
2746 }
2747
2748 if group_passed.is_empty() {
2749 if let Some(template) = group_rows.first() {
2752 let mut null_row = HashMap::new();
2753 for (k, v) in template {
2754 if is_optional_key(k) {
2755 null_row.insert(k.clone(), Value::Null);
2756 } else {
2757 null_row.insert(k.clone(), v.clone());
2758 }
2759 }
2760 filtered.push(null_row);
2761 }
2762 } else {
2763 filtered.extend(group_passed);
2764 }
2765 }
2766
2767 tracing::debug!(
2768 "Filter (OPTIONAL): {} input rows -> {} output rows",
2769 input_matches.len(),
2770 filtered.len()
2771 );
2772
2773 return Ok(filtered);
2774 }
2775
2776 let mut filtered = Vec::new();
2778 for row in input_matches.iter() {
2779 let res = self
2780 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2781 .await?;
2782
2783 let passes = res.as_bool().unwrap_or(false);
2784
2785 if passes {
2786 filtered.push(row.clone());
2787 }
2788 }
2789
2790 tracing::debug!(
2791 "Filter: {} input rows -> {} output rows",
2792 input_matches.len(),
2793 filtered.len()
2794 );
2795
2796 Ok(filtered)
2797 }
2798 LogicalPlan::ProcedureCall {
2799 procedure_name,
2800 arguments,
2801 yield_items,
2802 } => {
2803 let yield_names: Vec<String> =
2804 yield_items.iter().map(|(n, _)| n.clone()).collect();
2805 let results = self
2806 .execute_procedure(
2807 &procedure_name,
2808 &arguments,
2809 &yield_names,
2810 prop_manager,
2811 params,
2812 ctx,
2813 )
2814 .await?;
2815
2816 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2822 if !has_aliases {
2823 Ok(results)
2826 } else {
2827 let mut aliased_results = Vec::with_capacity(results.len());
2828 for row in results {
2829 let mut new_row = HashMap::new();
2830 for (name, alias) in &yield_items {
2831 let col_name = alias.as_ref().unwrap_or(name);
2832 let val = row.get(name).cloned().unwrap_or(Value::Null);
2833 new_row.insert(col_name.clone(), val);
2834 }
2835 aliased_results.push(new_row);
2836 }
2837 Ok(aliased_results)
2838 }
2839 }
2840 LogicalPlan::VectorKnn { .. } => {
2841 unreachable!("VectorKnn is handled by DataFusion engine")
2842 }
2843 LogicalPlan::InvertedIndexLookup { .. } => {
2844 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2845 }
2846 LogicalPlan::Sort { input, order_by } => {
2847 let rows = self
2848 .execute_subplan(*input, prop_manager, params, ctx)
2849 .await?;
2850 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2851 .await
2852 }
2853 LogicalPlan::Limit { input, skip, fetch } => {
2854 let rows = self
2855 .execute_subplan(*input, prop_manager, params, ctx)
2856 .await?;
2857 let skip = skip.unwrap_or(0);
2858 let take = fetch.unwrap_or(usize::MAX);
2859 Ok(rows.into_iter().skip(skip).take(take).collect())
2860 }
2861 LogicalPlan::Aggregate {
2862 input,
2863 group_by,
2864 aggregates,
2865 } => {
2866 let rows = self
2867 .execute_subplan(*input, prop_manager, params, ctx)
2868 .await?;
2869 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2870 .await
2871 }
2872 LogicalPlan::Window {
2873 input,
2874 window_exprs,
2875 } => {
2876 let rows = self
2877 .execute_subplan(*input, prop_manager, params, ctx)
2878 .await?;
2879 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2880 .await
2881 }
2882 LogicalPlan::Project { input, projections } => {
2883 let matches = self
2884 .execute_subplan(*input, prop_manager, params, ctx)
2885 .await?;
2886 self.execute_project(matches, &projections, prop_manager, params, ctx)
2887 .await
2888 }
2889 LogicalPlan::Distinct { input } => {
2890 let rows = self
2891 .execute_subplan(*input, prop_manager, params, ctx)
2892 .await?;
2893 let mut seen = std::collections::HashSet::new();
2894 let mut result = Vec::new();
2895 for row in rows {
2896 let key = Self::canonical_row_key(&row);
2897 if seen.insert(key) {
2898 result.push(row);
2899 }
2900 }
2901 Ok(result)
2902 }
2903 LogicalPlan::Unwind {
2904 input,
2905 expr,
2906 variable,
2907 } => {
2908 let input_rows = self
2909 .execute_subplan(*input, prop_manager, params, ctx)
2910 .await?;
2911 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2912 .await
2913 }
2914 LogicalPlan::Apply {
2915 input,
2916 subquery,
2917 input_filter,
2918 } => {
2919 let input_rows = self
2920 .execute_subplan(*input, prop_manager, params, ctx)
2921 .await?;
2922 self.execute_apply(
2923 input_rows,
2924 &subquery,
2925 input_filter.as_ref(),
2926 prop_manager,
2927 params,
2928 ctx,
2929 )
2930 .await
2931 }
2932 LogicalPlan::SubqueryCall { input, subquery } => {
2933 let input_rows = self
2934 .execute_subplan(*input, prop_manager, params, ctx)
2935 .await?;
2936 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2939 .await
2940 }
2941 LogicalPlan::RecursiveCTE {
2942 cte_name,
2943 initial,
2944 recursive,
2945 } => {
2946 self.execute_recursive_cte(
2947 &cte_name,
2948 *initial,
2949 *recursive,
2950 prop_manager,
2951 params,
2952 ctx,
2953 )
2954 .await
2955 }
2956 LogicalPlan::CrossJoin { left, right } => {
2957 self.execute_cross_join(left, right, prop_manager, params, ctx)
2958 .await
2959 }
2960 LogicalPlan::Set { .. }
2961 | LogicalPlan::Remove { .. }
2962 | LogicalPlan::Merge { .. }
2963 | LogicalPlan::Create { .. }
2964 | LogicalPlan::CreateBatch { .. } => {
2965 unreachable!("mutations are handled by DataFusion engine")
2966 }
2967 LogicalPlan::Delete { .. } => {
2968 unreachable!("mutations are handled by DataFusion engine")
2969 }
2970 LogicalPlan::Copy {
2971 target,
2972 source,
2973 is_export,
2974 options,
2975 } => {
2976 if is_export {
2977 self.execute_export(&target, &source, &options, prop_manager, ctx)
2978 .await
2979 } else {
2980 self.execute_copy(&target, &source, &options, prop_manager)
2981 .await
2982 }
2983 }
2984 LogicalPlan::Backup {
2985 destination,
2986 options,
2987 } => self.execute_backup(&destination, &options).await,
2988 LogicalPlan::Explain { plan } => {
2989 let plan_str = format!("{:#?}", plan);
2990 let mut row = HashMap::new();
2991 row.insert("plan".to_string(), Value::String(plan_str));
2992 Ok(vec![row])
2993 }
2994 LogicalPlan::ShortestPath { .. } => {
2995 unreachable!("ShortestPath is handled by DataFusion engine")
2996 }
2997 LogicalPlan::AllShortestPaths { .. } => {
2998 unreachable!("AllShortestPaths is handled by DataFusion engine")
2999 }
3000 LogicalPlan::Foreach { .. } => {
3001 unreachable!("mutations are handled by DataFusion engine")
3002 }
3003 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
3004 LogicalPlan::BindZeroLengthPath { .. } => {
3005 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
3006 }
3007 LogicalPlan::BindPath { .. } => {
3008 unreachable!("BindPath is handled by DataFusion engine")
3009 }
3010 LogicalPlan::QuantifiedPattern { .. } => {
3011 unreachable!("QuantifiedPattern is handled by DataFusion engine")
3012 }
3013 LogicalPlan::LocyProgram { .. }
3014 | LogicalPlan::LocyFold { .. }
3015 | LogicalPlan::LocyBestBy { .. }
3016 | LogicalPlan::LocyPriority { .. }
3017 | LogicalPlan::LocyDerivedScan { .. }
3018 | LogicalPlan::LocyProject { .. }
3019 | LogicalPlan::LocyModelInvoke { .. } => {
3020 unreachable!("Locy operators are handled by DataFusion engine")
3021 }
3022 }
3023 })
3024 }
3025
3026 #[expect(clippy::too_many_arguments)]
3031 pub(crate) async fn execute_foreach_body_plan(
3032 &self,
3033 plan: LogicalPlan,
3034 scope: &mut HashMap<String, Value>,
3035 writer: &uni_store::runtime::writer::Writer,
3036 prop_manager: &PropertyManager,
3037 params: &HashMap<String, Value>,
3038 ctx: Option<&QueryContext>,
3039 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3040 ) -> Result<()> {
3041 match plan {
3042 LogicalPlan::Set { items, .. } => {
3043 self.execute_set_items_locked(
3044 &items,
3045 scope,
3046 writer,
3047 prop_manager,
3048 params,
3049 ctx,
3050 tx_l0,
3051 &crate::query::df_graph::mutation_common::Prefetch::default(),
3052 )
3053 .await?;
3054 }
3055 LogicalPlan::Remove { items, .. } => {
3056 self.execute_remove_items_locked(
3057 &items,
3058 scope,
3059 writer,
3060 prop_manager,
3061 ctx,
3062 tx_l0,
3063 &crate::query::df_graph::mutation_common::Prefetch::default(),
3064 )
3065 .await?;
3066 }
3067 LogicalPlan::Delete { items, detach, .. } => {
3068 for expr in &items {
3069 let val = self
3070 .evaluate_expr(expr, scope, prop_manager, params, ctx)
3071 .await?;
3072 self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3073 .await?;
3074 }
3075 }
3076 LogicalPlan::Create { pattern, .. } => {
3077 self.execute_create_pattern(
3078 &pattern,
3079 scope,
3080 writer,
3081 prop_manager,
3082 params,
3083 ctx,
3084 tx_l0,
3085 None,
3086 )
3087 .await?;
3088 }
3089 LogicalPlan::CreateBatch { patterns, .. } => {
3090 for pattern in &patterns {
3091 self.execute_create_pattern(
3092 pattern,
3093 scope,
3094 writer,
3095 prop_manager,
3096 params,
3097 ctx,
3098 tx_l0,
3099 None,
3100 )
3101 .await?;
3102 }
3103 }
3104 LogicalPlan::Merge {
3105 pattern,
3106 on_match: _,
3107 on_create,
3108 ..
3109 } => {
3110 let seed_props = self
3114 .on_create_seed_props(on_create.as_ref(), scope, prop_manager, params, ctx)
3115 .await?;
3116 self.execute_create_pattern(
3117 &pattern,
3118 scope,
3119 writer,
3120 prop_manager,
3121 params,
3122 ctx,
3123 tx_l0,
3124 Some(&seed_props),
3125 )
3126 .await?;
3127 if let Some(on_create_clause) = on_create {
3128 self.execute_set_items_locked(
3129 &on_create_clause.items,
3130 scope,
3131 writer,
3132 prop_manager,
3133 params,
3134 ctx,
3135 tx_l0,
3136 &crate::query::df_graph::mutation_common::Prefetch::default(),
3137 )
3138 .await?;
3139 }
3140 }
3141 LogicalPlan::Foreach {
3142 variable,
3143 list,
3144 body,
3145 ..
3146 } => {
3147 let list_val = self
3148 .evaluate_expr(&list, scope, prop_manager, params, ctx)
3149 .await?;
3150 let items = match list_val {
3151 Value::List(arr) => arr,
3152 Value::Null => return Ok(()),
3153 _ => return Err(anyhow!("FOREACH requires a list")),
3154 };
3155 for item in items {
3156 let mut nested_scope = scope.clone();
3157 nested_scope.insert(variable.clone(), item);
3158 for nested_plan in &body {
3159 Box::pin(self.execute_foreach_body_plan(
3160 nested_plan.clone(),
3161 &mut nested_scope,
3162 writer,
3163 prop_manager,
3164 params,
3165 ctx,
3166 tx_l0,
3167 ))
3168 .await?;
3169 }
3170 }
3171 }
3172 _ => {
3173 return Err(anyhow!(
3174 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3175 ));
3176 }
3177 }
3178 Ok(())
3179 }
3180
3181 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3182 let mut pairs: Vec<_> = row.iter().collect();
3183 pairs.sort_by_key(|(k, _)| *k);
3184
3185 pairs
3186 .into_iter()
3187 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3188 .collect::<Vec<_>>()
3189 .join("|")
3190 }
3191
3192 fn canonical_value_key(v: &Value) -> String {
3193 match v {
3194 Value::Null => "null".to_string(),
3195 Value::Bool(b) => format!("b:{b}"),
3196 Value::Int(i) => format!("n:{i}"),
3197 Value::Float(f) => {
3198 if f.is_nan() {
3199 "nan".to_string()
3200 } else if f.is_infinite() {
3201 if f.is_sign_positive() {
3202 "inf:+".to_string()
3203 } else {
3204 "inf:-".to_string()
3205 }
3206 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3207 format!("n:{}", *f as i64)
3208 } else {
3209 format!("f:{f}")
3210 }
3211 }
3212 Value::String(s) => {
3213 if let Some(k) = Self::temporal_string_key(s) {
3214 format!("temporal:{k}")
3215 } else {
3216 format!("s:{s}")
3217 }
3218 }
3219 Value::Bytes(b) => format!("bytes:{:?}", b),
3220 Value::List(items) => format!(
3221 "list:[{}]",
3222 items
3223 .iter()
3224 .map(Self::canonical_value_key)
3225 .collect::<Vec<_>>()
3226 .join(",")
3227 ),
3228 Value::Map(map) => {
3229 let mut pairs: Vec<_> = map.iter().collect();
3230 pairs.sort_by_key(|(k, _)| *k);
3231 format!(
3232 "map:{{{}}}",
3233 pairs
3234 .into_iter()
3235 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3236 .collect::<Vec<_>>()
3237 .join(",")
3238 )
3239 }
3240 Value::Node(n) => {
3241 let mut labels = n.labels.clone();
3242 labels.sort();
3243 format!(
3244 "node:{}:{}:{}",
3245 n.vid.as_u64(),
3246 labels.join(":"),
3247 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3248 )
3249 }
3250 Value::Edge(e) => format!(
3251 "edge:{}:{}:{}:{}:{}",
3252 e.eid.as_u64(),
3253 e.edge_type,
3254 e.src.as_u64(),
3255 e.dst.as_u64(),
3256 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3257 ),
3258 Value::Path(p) => format!(
3259 "path:nodes=[{}];edges=[{}]",
3260 p.nodes
3261 .iter()
3262 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3263 .collect::<Vec<_>>()
3264 .join(","),
3265 p.edges
3266 .iter()
3267 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3268 .collect::<Vec<_>>()
3269 .join(",")
3270 ),
3271 Value::Vector(vs) => format!("vec:{:?}", vs),
3272 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3273 _ => format!("{v:?}"),
3274 }
3275 }
3276
3277 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3278 match t {
3279 uni_common::TemporalValue::Date { days_since_epoch } => {
3280 format!("date:{days_since_epoch}")
3281 }
3282 uni_common::TemporalValue::LocalTime {
3283 nanos_since_midnight,
3284 } => format!("localtime:{nanos_since_midnight}"),
3285 uni_common::TemporalValue::Time {
3286 nanos_since_midnight,
3287 offset_seconds,
3288 } => {
3289 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3290 format!("time:{utc_nanos}")
3291 }
3292 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3293 format!("localdatetime:{nanos_since_epoch}")
3294 }
3295 uni_common::TemporalValue::DateTime {
3296 nanos_since_epoch, ..
3297 } => format!("datetime:{nanos_since_epoch}"),
3298 uni_common::TemporalValue::Duration {
3299 months,
3300 days,
3301 nanos,
3302 } => format!("duration:{months}:{days}:{nanos}"),
3303 uni_common::TemporalValue::Btic { lo, hi, meta } => {
3304 format!("btic:{lo}:{hi}:{meta}")
3305 }
3306 }
3307 }
3308
3309 fn temporal_string_key(s: &str) -> Option<String> {
3310 let fn_name = match classify_temporal(s)? {
3311 uni_common::TemporalType::Date => "DATE",
3312 uni_common::TemporalType::LocalTime => "LOCALTIME",
3313 uni_common::TemporalType::Time => "TIME",
3314 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3315 uni_common::TemporalType::DateTime => "DATETIME",
3316 uni_common::TemporalType::Duration => "DURATION",
3317 uni_common::TemporalType::Btic => return None, };
3319 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3320 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3321 _ => None,
3322 }
3323 }
3324
3325 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3328
3329 pub(crate) async fn execute_aggregate(
3330 &self,
3331 rows: Vec<HashMap<String, Value>>,
3332 group_by: &[Expr],
3333 aggregates: &[Expr],
3334 prop_manager: &PropertyManager,
3335 params: &HashMap<String, Value>,
3336 ctx: Option<&QueryContext>,
3337 ) -> Result<Vec<HashMap<String, Value>>> {
3338 if let Some(ctx) = ctx {
3340 ctx.check_timeout()?;
3341 }
3342
3343 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3344
3345 if rows.is_empty() {
3348 if group_by.is_empty() {
3349 let accs = Self::create_accumulators(aggregates);
3350 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3351 return Ok(vec![row]);
3352 }
3353 return Ok(vec![]);
3354 }
3355
3356 for (idx, row) in rows.into_iter().enumerate() {
3357 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3359 && let Some(ctx) = ctx
3360 {
3361 ctx.check_timeout()?;
3362 }
3363
3364 let key_vals = self
3365 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3366 .await?;
3367 let key_str = format!(
3370 "[{}]",
3371 key_vals
3372 .iter()
3373 .map(Self::canonical_value_key)
3374 .collect::<Vec<_>>()
3375 .join(",")
3376 );
3377
3378 let entry = groups
3379 .entry(key_str)
3380 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3381
3382 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3383 .await?;
3384 }
3385
3386 let results = groups
3387 .values()
3388 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3389 .collect();
3390
3391 Ok(results)
3392 }
3393
3394 pub(crate) async fn execute_window(
3395 &self,
3396 mut rows: Vec<HashMap<String, Value>>,
3397 window_exprs: &[Expr],
3398 _prop_manager: &PropertyManager,
3399 _params: &HashMap<String, Value>,
3400 ctx: Option<&QueryContext>,
3401 ) -> Result<Vec<HashMap<String, Value>>> {
3402 if let Some(ctx) = ctx {
3404 ctx.check_timeout()?;
3405 }
3406
3407 if rows.is_empty() || window_exprs.is_empty() {
3409 return Ok(rows);
3410 }
3411
3412 for window_expr in window_exprs {
3414 let Expr::FunctionCall {
3416 name,
3417 args,
3418 window_spec: Some(window_spec),
3419 ..
3420 } = window_expr
3421 else {
3422 return Err(anyhow!(
3423 "Window expression must be a FunctionCall with OVER clause: {:?}",
3424 window_expr
3425 ));
3426 };
3427
3428 let name_upper = name.to_uppercase();
3429
3430 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3432 return Err(anyhow!(
3433 "Unsupported window function: {}. Supported functions: {}",
3434 name,
3435 WINDOW_FUNCTIONS.join(", ")
3436 ));
3437 }
3438
3439 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3441
3442 for (row_idx, row) in rows.iter().enumerate() {
3443 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3445 vec![]
3447 } else {
3448 window_spec
3449 .partition_by
3450 .iter()
3451 .map(|expr| self.evaluate_simple_expr(expr, row))
3452 .collect::<Result<Vec<_>>>()?
3453 };
3454
3455 partition_map
3456 .entry(partition_key)
3457 .or_default()
3458 .push(row_idx);
3459 }
3460
3461 for (_partition_key, row_indices) in partition_map.iter_mut() {
3463 if !window_spec.order_by.is_empty() {
3465 row_indices.sort_by(|&a, &b| {
3466 for sort_item in &window_spec.order_by {
3467 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3468 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3469
3470 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3471 let cmp = Executor::compare_values(&va, &vb);
3472 let cmp = if sort_item.ascending {
3473 cmp
3474 } else {
3475 cmp.reverse()
3476 };
3477 if cmp != std::cmp::Ordering::Equal {
3478 return cmp;
3479 }
3480 }
3481 }
3482 std::cmp::Ordering::Equal
3483 });
3484 }
3485
3486 for (position, &row_idx) in row_indices.iter().enumerate() {
3488 let window_value = match name_upper.as_str() {
3489 "ROW_NUMBER" => Value::from((position + 1) as i64),
3490 "RANK" => {
3491 let rank = if position == 0 {
3493 1i64
3494 } else {
3495 let prev_row_idx = row_indices[position - 1];
3496 let same_as_prev = self.rows_have_same_sort_keys(
3497 &window_spec.order_by,
3498 &rows,
3499 row_idx,
3500 prev_row_idx,
3501 );
3502
3503 if same_as_prev {
3504 let mut group_start = position - 1;
3506 while group_start > 0 {
3507 let curr_idx = row_indices[group_start];
3508 let prev_idx = row_indices[group_start - 1];
3509 if !self.rows_have_same_sort_keys(
3510 &window_spec.order_by,
3511 &rows,
3512 curr_idx,
3513 prev_idx,
3514 ) {
3515 break;
3516 }
3517 group_start -= 1;
3518 }
3519 (group_start + 1) as i64
3520 } else {
3521 (position + 1) as i64
3522 }
3523 };
3524 Value::from(rank)
3525 }
3526 "DENSE_RANK" => {
3527 let mut dense_rank = 1i64;
3529 for i in 0..position {
3530 let curr_idx = row_indices[i + 1];
3531 let prev_idx = row_indices[i];
3532 if !self.rows_have_same_sort_keys(
3533 &window_spec.order_by,
3534 &rows,
3535 curr_idx,
3536 prev_idx,
3537 ) {
3538 dense_rank += 1;
3539 }
3540 }
3541 Value::from(dense_rank)
3542 }
3543 "LAG" => {
3544 let (value_expr, offset, default_value) =
3545 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3546
3547 if position >= offset {
3548 let target_idx = row_indices[position - offset];
3549 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3550 } else {
3551 default_value
3552 }
3553 }
3554 "LEAD" => {
3555 let (value_expr, offset, default_value) =
3556 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3557
3558 if position + offset < row_indices.len() {
3559 let target_idx = row_indices[position + offset];
3560 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3561 } else {
3562 default_value
3563 }
3564 }
3565 "NTILE" => {
3566 let num_buckets_expr = args.first().ok_or_else(|| {
3568 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3569 })?;
3570 let num_buckets_val =
3571 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3572 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3573 anyhow!(
3574 "NTILE argument must be an integer, got: {:?}",
3575 num_buckets_val
3576 )
3577 })?;
3578
3579 if num_buckets <= 0 {
3580 return Err(anyhow!(
3581 "NTILE bucket count must be positive, got: {}",
3582 num_buckets
3583 ));
3584 }
3585
3586 let num_buckets = num_buckets as usize;
3587 let partition_size = row_indices.len();
3588
3589 let base_size = partition_size / num_buckets;
3594 let extra_rows = partition_size % num_buckets;
3595
3596 let bucket = if position < extra_rows * (base_size + 1) {
3598 position / (base_size + 1) + 1
3600 } else {
3601 let adjusted_position = position - extra_rows * (base_size + 1);
3603 extra_rows + (adjusted_position / base_size) + 1
3604 };
3605
3606 Value::from(bucket as i64)
3607 }
3608 "FIRST_VALUE" => {
3609 let value_expr = args.first().ok_or_else(|| {
3611 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3612 })?;
3613
3614 if row_indices.is_empty() {
3616 Value::Null
3617 } else {
3618 let first_idx = row_indices[0];
3619 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3620 }
3621 }
3622 "LAST_VALUE" => {
3623 let value_expr = args.first().ok_or_else(|| {
3625 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3626 })?;
3627
3628 if row_indices.is_empty() {
3630 Value::Null
3631 } else {
3632 let last_idx = row_indices[row_indices.len() - 1];
3633 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3634 }
3635 }
3636 "NTH_VALUE" => {
3637 if args.len() != 2 {
3639 return Err(anyhow!(
3640 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3641 ));
3642 }
3643
3644 let value_expr = &args[0];
3645 let n_expr = &args[1];
3646
3647 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3648 let n = n_val.as_i64().ok_or_else(|| {
3649 anyhow!(
3650 "NTH_VALUE second argument must be an integer, got: {:?}",
3651 n_val
3652 )
3653 })?;
3654
3655 if n <= 0 {
3656 return Err(anyhow!(
3657 "NTH_VALUE position must be positive, got: {}",
3658 n
3659 ));
3660 }
3661
3662 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3664 let nth_idx = row_indices[nth_index];
3665 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3666 } else {
3667 Value::Null
3668 }
3669 }
3670 _ => unreachable!("Window function {} already validated", name),
3671 };
3672
3673 let col_name = window_expr.to_string_repr();
3676 rows[row_idx].insert(col_name, window_value);
3677 }
3678 }
3679 }
3680
3681 Ok(rows)
3682 }
3683
3684 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3689 match expr {
3690 Expr::Variable(name) => row
3691 .get(name)
3692 .cloned()
3693 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3694 Expr::Property(base, prop) => {
3695 let base_val = self.evaluate_simple_expr(base, row)?;
3696 if let Value::Map(map) = base_val {
3697 map.get(prop)
3698 .cloned()
3699 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3700 } else {
3701 Err(anyhow!("Cannot access property on non-object"))
3702 }
3703 }
3704 Expr::Literal(lit) => Ok(lit.to_value()),
3705 _ => Err(anyhow!(
3706 "Unsupported expression in window function: {:?}",
3707 expr
3708 )),
3709 }
3710 }
3711
3712 fn rows_have_same_sort_keys(
3714 &self,
3715 order_by: &[uni_cypher::ast::SortItem],
3716 rows: &[HashMap<String, Value>],
3717 idx_a: usize,
3718 idx_b: usize,
3719 ) -> bool {
3720 order_by.iter().all(|sort_item| {
3721 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3722 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3723 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3724 })
3725 }
3726
3727 fn extract_lag_lead_params<'a>(
3729 &self,
3730 func_name: &str,
3731 args: &'a [Expr],
3732 row: &HashMap<String, Value>,
3733 ) -> Result<(&'a Expr, usize, Value)> {
3734 let value_expr = args.first().ok_or_else(|| {
3735 anyhow!(
3736 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3737 func_name,
3738 func_name
3739 )
3740 })?;
3741
3742 let offset = if let Some(offset_expr) = args.get(1) {
3743 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3744 offset_val.as_i64().ok_or_else(|| {
3745 anyhow!(
3746 "{} offset must be an integer, got: {:?}",
3747 func_name,
3748 offset_val
3749 )
3750 })? as usize
3751 } else {
3752 1
3753 };
3754
3755 let default_value = if let Some(default_expr) = args.get(2) {
3756 self.evaluate_simple_expr(default_expr, row)?
3757 } else {
3758 Value::Null
3759 };
3760
3761 Ok((value_expr, offset, default_value))
3762 }
3763
3764 pub(crate) async fn evaluate_group_keys(
3766 &self,
3767 group_by: &[Expr],
3768 row: &HashMap<String, Value>,
3769 prop_manager: &PropertyManager,
3770 params: &HashMap<String, Value>,
3771 ctx: Option<&QueryContext>,
3772 ) -> Result<Vec<Value>> {
3773 let mut key_vals = Vec::new();
3774 for expr in group_by {
3775 key_vals.push(
3776 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3777 .await?,
3778 );
3779 }
3780 Ok(key_vals)
3781 }
3782
3783 pub(crate) async fn update_accumulators(
3785 &self,
3786 accs: &mut [Accumulator],
3787 aggregates: &[Expr],
3788 row: &HashMap<String, Value>,
3789 prop_manager: &PropertyManager,
3790 params: &HashMap<String, Value>,
3791 ctx: Option<&QueryContext>,
3792 ) -> Result<()> {
3793 for (i, agg_expr) in aggregates.iter().enumerate() {
3794 if let Expr::FunctionCall { args, .. } = agg_expr {
3795 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3796 let val = if is_wildcard {
3797 Value::Null
3798 } else {
3799 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3800 .await?
3801 };
3802 accs[i].update(&val, is_wildcard);
3803 }
3804 }
3805 Ok(())
3806 }
3807
3808 pub(crate) async fn execute_recursive_cte(
3810 &self,
3811 cte_name: &str,
3812 initial: LogicalPlan,
3813 recursive: LogicalPlan,
3814 prop_manager: &PropertyManager,
3815 params: &HashMap<String, Value>,
3816 ctx: Option<&QueryContext>,
3817 ) -> Result<Vec<HashMap<String, Value>>> {
3818 use std::collections::HashSet;
3819
3820 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3823 let mut pairs: Vec<_> = row.iter().collect();
3824 pairs.sort_by(|a, b| a.0.cmp(b.0));
3825 format!("{pairs:?}")
3826 }
3827
3828 let mut working_table = self
3830 .execute_subplan(initial, prop_manager, params, ctx)
3831 .await?;
3832 let mut result_table = working_table.clone();
3833
3834 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3836
3837 let max_iterations = self.config.max_recursive_cte_iterations;
3840 for _iteration in 0..max_iterations {
3841 if let Some(ctx) = ctx {
3843 ctx.check_timeout()?;
3844 }
3845
3846 if working_table.is_empty() {
3847 break;
3848 }
3849
3850 let working_val = Value::List(
3852 working_table
3853 .iter()
3854 .map(|row| {
3855 if row.len() == 1 {
3856 row.values().next().unwrap().clone()
3857 } else {
3858 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3859 }
3860 })
3861 .collect(),
3862 );
3863
3864 let mut next_params = params.clone();
3865 next_params.insert(cte_name.to_string(), working_val);
3866
3867 let next_result = self
3869 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3870 .await?;
3871
3872 if next_result.is_empty() {
3873 break;
3874 }
3875
3876 let new_rows: Vec<_> = next_result
3878 .into_iter()
3879 .filter(|row| {
3880 let key = row_key(row);
3881 seen.insert(key) })
3883 .collect();
3884
3885 if new_rows.is_empty() {
3886 break;
3888 }
3889
3890 result_table.extend(new_rows.clone());
3891 working_table = new_rows;
3892 }
3893
3894 let final_list = Value::List(
3896 result_table
3897 .into_iter()
3898 .map(|row| {
3899 if row.len() == 1 {
3911 row.values().next().unwrap().clone()
3912 } else {
3913 Value::Map(row.into_iter().collect())
3914 }
3915 })
3916 .collect(),
3917 );
3918
3919 let mut final_row = HashMap::new();
3920 final_row.insert(cte_name.to_string(), final_list);
3921 Ok(vec![final_row])
3922 }
3923
3924 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3926
3927 pub(crate) async fn execute_sort(
3928 &self,
3929 rows: Vec<HashMap<String, Value>>,
3930 order_by: &[uni_cypher::ast::SortItem],
3931 prop_manager: &PropertyManager,
3932 params: &HashMap<String, Value>,
3933 ctx: Option<&QueryContext>,
3934 ) -> Result<Vec<HashMap<String, Value>>> {
3935 if let Some(ctx) = ctx {
3937 ctx.check_timeout()?;
3938 }
3939
3940 let mut rows_with_keys = Vec::with_capacity(rows.len());
3941 for (idx, row) in rows.into_iter().enumerate() {
3942 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3944 && let Some(ctx) = ctx
3945 {
3946 ctx.check_timeout()?;
3947 }
3948
3949 let mut keys = Vec::new();
3950 for item in order_by {
3951 let val = row
3952 .get(&item.expr.to_string_repr())
3953 .cloned()
3954 .unwrap_or(Value::Null);
3955 let val = if val.is_null() {
3956 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3957 .await
3958 .unwrap_or(Value::Null)
3959 } else {
3960 val
3961 };
3962 keys.push(val);
3963 }
3964 rows_with_keys.push((row, keys));
3965 }
3966
3967 if let Some(ctx) = ctx {
3969 ctx.check_timeout()?;
3970 }
3971
3972 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3973
3974 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3975 }
3976
3977 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3979 aggregates
3980 .iter()
3981 .map(|expr| {
3982 if let Expr::FunctionCall { name, distinct, .. } = expr {
3983 Accumulator::new(name, *distinct)
3984 } else {
3985 Accumulator::new("COUNT", false)
3986 }
3987 })
3988 .collect()
3989 }
3990
3991 pub(crate) fn build_aggregate_result(
3993 group_by: &[Expr],
3994 aggregates: &[Expr],
3995 key_vals: &[Value],
3996 accs: &[Accumulator],
3997 ) -> HashMap<String, Value> {
3998 let mut res_row = HashMap::new();
3999 for (i, expr) in group_by.iter().enumerate() {
4000 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
4001 }
4002 for (i, expr) in aggregates.iter().enumerate() {
4003 let col_name = crate::query::planner::aggregate_column_name(expr);
4005 res_row.insert(col_name, accs[i].finish());
4006 }
4007 res_row
4008 }
4009
4010 pub(crate) fn compare_sort_keys(
4012 a_keys: &[Value],
4013 b_keys: &[Value],
4014 order_by: &[uni_cypher::ast::SortItem],
4015 ) -> std::cmp::Ordering {
4016 for (i, item) in order_by.iter().enumerate() {
4017 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
4018 if order != std::cmp::Ordering::Equal {
4019 return if item.ascending {
4020 order
4021 } else {
4022 order.reverse()
4023 };
4024 }
4025 }
4026 std::cmp::Ordering::Equal
4027 }
4028
4029 pub(crate) async fn execute_backup(
4033 &self,
4034 destination: &str,
4035 _options: &HashMap<String, Value>,
4036 ) -> Result<Vec<HashMap<String, Value>>> {
4037 if let Some(writer_arc) = &self.writer {
4039 let writer: &uni_store::Writer = writer_arc.as_ref();
4040 writer.flush_to_l1(None).await?;
4041 }
4042
4043 let snapshot_manager = self.storage.snapshot_manager();
4045 let snapshot = snapshot_manager
4046 .load_latest_snapshot()
4047 .await?
4048 .ok_or_else(|| anyhow!("No snapshot found"))?;
4049
4050 if is_cloud_url(destination) {
4052 self.backup_to_cloud(destination, &snapshot.snapshot_id)
4053 .await?;
4054 } else {
4055 let validated_dest = self.validate_path(destination)?;
4057 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4058 .await?;
4059 }
4060
4061 let mut res = HashMap::new();
4062 res.insert(
4063 "status".to_string(),
4064 Value::String("Backup completed".to_string()),
4065 );
4066 res.insert(
4067 "snapshot_id".to_string(),
4068 Value::String(snapshot.snapshot_id),
4069 );
4070 Ok(vec![res])
4071 }
4072
4073 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4075 let source_path = std::path::Path::new(self.storage.base_path());
4076
4077 if !dest_path.exists() {
4078 std::fs::create_dir_all(dest_path)?;
4079 }
4080
4081 if source_path.exists() {
4083 Self::copy_dir_all(source_path, dest_path)?;
4084 }
4085
4086 let schema_manager = self.storage.schema_manager();
4088 let dest_catalog = dest_path.join("catalog");
4089 if !dest_catalog.exists() {
4090 std::fs::create_dir_all(&dest_catalog)?;
4091 }
4092
4093 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4094 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4095
4096 Ok(())
4097 }
4098
4099 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4103 use object_store::ObjectStore;
4104 use object_store::ObjectStoreExt;
4105 use object_store::local::LocalFileSystem;
4106 use object_store::path::Path as ObjPath;
4107
4108 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4109 let source_path = std::path::Path::new(self.storage.base_path());
4110
4111 let src_store: Arc<dyn ObjectStore> =
4113 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4114
4115 let catalog_src = ObjPath::from("catalog");
4117 let catalog_dst = if dest_prefix.as_ref().is_empty() {
4118 ObjPath::from("catalog")
4119 } else {
4120 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4121 };
4122 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4123
4124 let storage_src = ObjPath::from("storage");
4126 let storage_dst = if dest_prefix.as_ref().is_empty() {
4127 ObjPath::from("storage")
4128 } else {
4129 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4130 };
4131 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4132
4133 let schema_manager = self.storage.schema_manager();
4135 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4136 let schema_path = if dest_prefix.as_ref().is_empty() {
4137 ObjPath::from("catalog/schema.json")
4138 } else {
4139 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4140 };
4141 dest_store
4142 .put(&schema_path, bytes::Bytes::from(schema_content).into())
4143 .await?;
4144
4145 Ok(())
4146 }
4147
4148 const MAX_BACKUP_DEPTH: usize = 100;
4153
4154 const MAX_BACKUP_FILES: usize = 100_000;
4159
4160 pub(crate) fn copy_dir_all(
4168 src: &std::path::Path,
4169 dst: &std::path::Path,
4170 ) -> std::io::Result<()> {
4171 let mut file_count = 0usize;
4172 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4173 }
4174
4175 pub(crate) fn copy_dir_all_impl(
4177 src: &std::path::Path,
4178 dst: &std::path::Path,
4179 depth: usize,
4180 file_count: &mut usize,
4181 ) -> std::io::Result<()> {
4182 if depth >= Self::MAX_BACKUP_DEPTH {
4183 return Err(std::io::Error::new(
4184 std::io::ErrorKind::InvalidInput,
4185 format!(
4186 "Maximum backup depth {} exceeded at {:?}",
4187 Self::MAX_BACKUP_DEPTH,
4188 src
4189 ),
4190 ));
4191 }
4192
4193 std::fs::create_dir_all(dst)?;
4194
4195 for entry in std::fs::read_dir(src)? {
4196 if *file_count >= Self::MAX_BACKUP_FILES {
4197 return Err(std::io::Error::new(
4198 std::io::ErrorKind::InvalidInput,
4199 format!(
4200 "Maximum backup file count {} exceeded",
4201 Self::MAX_BACKUP_FILES
4202 ),
4203 ));
4204 }
4205 *file_count += 1;
4206
4207 let entry = entry?;
4208 let metadata = entry.metadata()?;
4209
4210 if metadata.file_type().is_symlink() {
4212 continue;
4214 }
4215
4216 let dst_path = dst.join(entry.file_name());
4217 if metadata.is_dir() {
4218 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4219 } else {
4220 std::fs::copy(entry.path(), dst_path)?;
4221 }
4222 }
4223 Ok(())
4224 }
4225
4226 pub(crate) async fn execute_copy(
4227 &self,
4228 target: &str,
4229 source: &str,
4230 options: &HashMap<String, Value>,
4231 prop_manager: &PropertyManager,
4232 ) -> Result<Vec<HashMap<String, Value>>> {
4233 let format = options
4234 .get("format")
4235 .and_then(|v| v.as_str())
4236 .unwrap_or_else(|| {
4237 if source.ends_with(".parquet") {
4238 "parquet"
4239 } else {
4240 "csv"
4241 }
4242 });
4243
4244 match format.to_lowercase().as_str() {
4245 "csv" => self.execute_csv_import(target, source, options).await,
4246 "parquet" => {
4247 self.execute_parquet_import(target, source, options, prop_manager)
4248 .await
4249 }
4250 _ => Err(anyhow!("Unsupported format: {}", format)),
4251 }
4252 }
4253
4254 pub(crate) async fn execute_csv_import(
4255 &self,
4256 target: &str,
4257 source: &str,
4258 options: &HashMap<String, Value>,
4259 ) -> Result<Vec<HashMap<String, Value>>> {
4260 let validated_source = self.validate_path(source)?;
4262
4263 let writer_lock = self
4264 .writer
4265 .as_ref()
4266 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4267
4268 let schema = self.storage.schema_manager().schema();
4269
4270 let label_meta = schema.labels.get(target);
4272 let edge_meta = schema.edge_types.get(target);
4273
4274 if label_meta.is_none() && edge_meta.is_none() {
4275 return Err(anyhow!("Target '{}' not found in schema", target));
4276 }
4277
4278 let delimiter_str = options
4280 .get("delimiter")
4281 .and_then(|v| v.as_str())
4282 .unwrap_or(",");
4283 let delimiter = if delimiter_str.is_empty() {
4284 b','
4285 } else {
4286 delimiter_str.as_bytes()[0]
4287 };
4288 let has_header = options
4289 .get("header")
4290 .and_then(|v| v.as_bool())
4291 .unwrap_or(true);
4292
4293 let mut rdr = csv::ReaderBuilder::new()
4294 .delimiter(delimiter)
4295 .has_headers(has_header)
4296 .from_path(&validated_source)?;
4297
4298 let headers = rdr.headers()?.clone();
4299 let mut count = 0;
4300
4301 let writer: &uni_store::Writer = writer_lock.as_ref();
4302
4303 const CSV_ID_CHUNK: usize = 256;
4307
4308 if label_meta.is_some() {
4309 let target_props = schema
4310 .properties
4311 .get(target)
4312 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4313
4314 let mut vid_chunk: std::collections::VecDeque<Vid> =
4315 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4316 for result in rdr.records() {
4317 let record = result?;
4318 let mut props = HashMap::new();
4319
4320 for (i, header) in headers.iter().enumerate() {
4321 if let Some(val_str) = record.get(i)
4322 && let Some(prop_meta) = target_props.get(header)
4323 {
4324 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4325 props.insert(header.to_string(), val);
4326 }
4327 }
4328
4329 if vid_chunk.is_empty() {
4330 vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4331 }
4332 let vid = vid_chunk.pop_front().unwrap();
4333 writer
4334 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4335 .await?;
4336 count += 1;
4337 }
4338 } else if let Some(meta) = edge_meta {
4339 let type_id = meta.id;
4340 let target_props = schema
4341 .properties
4342 .get(target)
4343 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4344
4345 let src_col = options
4348 .get("src_col")
4349 .and_then(|v| v.as_str())
4350 .unwrap_or("_src");
4351 let dst_col = options
4352 .get("dst_col")
4353 .and_then(|v| v.as_str())
4354 .unwrap_or("_dst");
4355
4356 let mut eid_chunk: std::collections::VecDeque<Eid> =
4357 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4358 for result in rdr.records() {
4359 let record = result?;
4360 let mut props = HashMap::new();
4361 let mut src_vid = None;
4362 let mut dst_vid = None;
4363
4364 for (i, header) in headers.iter().enumerate() {
4365 if let Some(val_str) = record.get(i) {
4366 if header == src_col {
4367 src_vid =
4368 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4369 } else if header == dst_col {
4370 dst_vid =
4371 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4372 } else if let Some(prop_meta) = target_props.get(header) {
4373 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4374 props.insert(header.to_string(), val);
4375 }
4376 }
4377 }
4378
4379 let src =
4380 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4381 let dst = dst_vid
4382 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4383
4384 if eid_chunk.is_empty() {
4385 eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4386 }
4387 let eid = eid_chunk.pop_front().unwrap();
4388 writer
4389 .insert_edge(
4390 src,
4391 dst,
4392 type_id,
4393 eid,
4394 props,
4395 Some(target.to_string()),
4396 None,
4397 )
4398 .await?;
4399 count += 1;
4400 }
4401 }
4402
4403 let mut res = HashMap::new();
4404 res.insert("count".to_string(), Value::Int(count as i64));
4405 Ok(vec![res])
4406 }
4407
4408 pub(crate) async fn execute_parquet_import(
4412 &self,
4413 target: &str,
4414 source: &str,
4415 options: &HashMap<String, Value>,
4416 _prop_manager: &PropertyManager,
4417 ) -> Result<Vec<HashMap<String, Value>>> {
4418 let writer_lock = self
4419 .writer
4420 .as_ref()
4421 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4422
4423 let schema = self.storage.schema_manager().schema();
4424
4425 let label_meta = schema.labels.get(target);
4427 let edge_meta = schema.edge_types.get(target);
4428
4429 if label_meta.is_none() && edge_meta.is_none() {
4430 return Err(anyhow!("Target '{}' not found in schema", target));
4431 }
4432
4433 let reader = if is_cloud_url(source) {
4435 self.open_parquet_from_cloud(source).await?
4436 } else {
4437 let validated_source = self.validate_path(source)?;
4439 let file = std::fs::File::open(&validated_source)?;
4440 let builder =
4441 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4442 builder.build()?
4443 };
4444 let mut reader = reader;
4445
4446 let mut count = 0;
4447 let writer: &uni_store::Writer = writer_lock.as_ref();
4448
4449 if label_meta.is_some() {
4450 let target_props = schema
4451 .properties
4452 .get(target)
4453 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4454
4455 for batch in reader.by_ref() {
4456 let batch = batch?;
4457 let num_rows = batch.num_rows();
4458 let vids = writer.allocate_vids(num_rows).await?;
4460 for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4461 let mut props = HashMap::new();
4462 for field in batch.schema().fields() {
4463 let name = field.name();
4464 if target_props.contains_key(name) {
4465 let col = batch.column_by_name(name).unwrap();
4466 if !col.is_null(row) {
4467 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4469 let val =
4470 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4471 props.insert(name.clone(), val);
4472 }
4473 }
4474 }
4475 writer
4476 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4477 .await?;
4478 count += 1;
4479 }
4480 }
4481 } else if let Some(meta) = edge_meta {
4482 let type_id = meta.id;
4483 let target_props = schema
4484 .properties
4485 .get(target)
4486 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4487
4488 let src_col = options
4489 .get("src_col")
4490 .and_then(|v| v.as_str())
4491 .unwrap_or("_src");
4492 let dst_col = options
4493 .get("dst_col")
4494 .and_then(|v| v.as_str())
4495 .unwrap_or("_dst");
4496
4497 for batch in reader {
4498 let batch = batch?;
4499 let num_rows = batch.num_rows();
4500 let eids = writer.allocate_eids(num_rows).await?;
4502 for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4503 let mut props = HashMap::new();
4504 let mut src_vid = None;
4505 let mut dst_vid = None;
4506
4507 for field in batch.schema().fields() {
4508 let name = field.name();
4509 let col = batch.column_by_name(name).unwrap();
4510 if col.is_null(row) {
4511 continue;
4512 }
4513
4514 if name == src_col {
4515 let val = Self::arrow_to_value(col.as_ref(), row);
4516 src_vid = Some(Self::vid_from_value(&val)?);
4517 } else if name == dst_col {
4518 let val = Self::arrow_to_value(col.as_ref(), row);
4519 dst_vid = Some(Self::vid_from_value(&val)?);
4520 } else if let Some(pm) = target_props.get(name) {
4521 let val =
4523 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4524 props.insert(name.clone(), val);
4525 }
4526 }
4527
4528 let src = src_vid
4529 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4530 let dst = dst_vid.ok_or_else(|| {
4531 anyhow!("Missing destination VID in column '{}'", dst_col)
4532 })?;
4533
4534 writer
4535 .insert_edge(
4536 src,
4537 dst,
4538 type_id,
4539 eid,
4540 props,
4541 Some(target.to_string()),
4542 None,
4543 )
4544 .await?;
4545 count += 1;
4546 }
4547 }
4548 }
4549
4550 let mut res = HashMap::new();
4551 res.insert("count".to_string(), Value::Int(count as i64));
4552 Ok(vec![res])
4553 }
4554
4555 async fn open_parquet_from_cloud(
4559 &self,
4560 source_url: &str,
4561 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4562 use object_store::ObjectStoreExt;
4563
4564 let (store, path) = build_store_from_url(source_url)?;
4565
4566 let bytes = store.get(&path).await?.bytes().await?;
4568
4569 let reader = bytes::Bytes::from(bytes.to_vec());
4571 let builder =
4572 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4573 Ok(builder.build()?)
4574 }
4575
4576 pub(crate) async fn scan_edge_type(
4577 &self,
4578 edge_type: &str,
4579 ctx: Option<&QueryContext>,
4580 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4581 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4582
4583 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4585
4586 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4588
4589 if let Some(ctx) = ctx {
4591 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4592 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4593 }
4594
4595 Ok(edges
4596 .into_iter()
4597 .map(|(eid, (src, dst))| (eid, src, dst))
4598 .collect())
4599 }
4600
4601 pub(crate) async fn scan_edge_type_l2(
4606 &self,
4607 _edge_type: &str,
4608 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4609 ) -> Result<()> {
4610 Ok(())
4613 }
4614
4615 pub(crate) async fn scan_edge_type_l1(
4617 &self,
4618 edge_type: &str,
4619 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4620 ) -> Result<()> {
4621 if let Ok(Some(batch)) = self
4622 .storage
4623 .scan_delta_table(
4624 edge_type,
4625 "fwd",
4626 &["eid", "src_vid", "dst_vid", "op", "_version"],
4627 None,
4628 )
4629 .await
4630 {
4631 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4633 HashMap::new();
4634
4635 self.process_delta_batch(&batch, &mut versioned_ops)?;
4636
4637 for (eid, (_, op, src, dst)) in versioned_ops {
4639 if op == 0 {
4640 edges.insert(eid, (src, dst));
4641 } else if op == 1 {
4642 edges.remove(&eid);
4643 }
4644 }
4645 }
4646 Ok(())
4647 }
4648
4649 pub(crate) fn process_delta_batch(
4651 &self,
4652 batch: &arrow_array::RecordBatch,
4653 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4654 ) -> Result<()> {
4655 use arrow_array::UInt64Array;
4656 let eid_col = batch
4657 .column_by_name("eid")
4658 .ok_or(anyhow!("Missing eid"))?
4659 .as_any()
4660 .downcast_ref::<UInt64Array>()
4661 .ok_or(anyhow!("Invalid eid"))?;
4662 let src_col = batch
4663 .column_by_name("src_vid")
4664 .ok_or(anyhow!("Missing src_vid"))?
4665 .as_any()
4666 .downcast_ref::<UInt64Array>()
4667 .ok_or(anyhow!("Invalid src_vid"))?;
4668 let dst_col = batch
4669 .column_by_name("dst_vid")
4670 .ok_or(anyhow!("Missing dst_vid"))?
4671 .as_any()
4672 .downcast_ref::<UInt64Array>()
4673 .ok_or(anyhow!("Invalid dst_vid"))?;
4674 let op_col = batch
4675 .column_by_name("op")
4676 .ok_or(anyhow!("Missing op"))?
4677 .as_any()
4678 .downcast_ref::<arrow_array::UInt8Array>()
4679 .ok_or(anyhow!("Invalid op"))?;
4680 let version_col = batch
4681 .column_by_name("_version")
4682 .ok_or(anyhow!("Missing _version"))?
4683 .as_any()
4684 .downcast_ref::<UInt64Array>()
4685 .ok_or(anyhow!("Invalid _version"))?;
4686
4687 for i in 0..batch.num_rows() {
4688 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4689 let version = version_col.value(i);
4690 let op = op_col.value(i);
4691 let src = Vid::from(src_col.value(i));
4692 let dst = Vid::from(dst_col.value(i));
4693
4694 match versioned_ops.entry(eid) {
4695 std::collections::hash_map::Entry::Vacant(e) => {
4696 e.insert((version, op, src, dst));
4697 }
4698 std::collections::hash_map::Entry::Occupied(mut e) => {
4699 if version > e.get().0 {
4700 e.insert((version, op, src, dst));
4701 }
4702 }
4703 }
4704 }
4705 Ok(())
4706 }
4707
4708 pub(crate) fn scan_edge_type_l0(
4710 &self,
4711 edge_type: &str,
4712 ctx: &QueryContext,
4713 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4714 ) {
4715 let schema = self.storage.schema_manager().schema();
4716 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4717
4718 if let Some(type_id) = type_id {
4719 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4721
4722 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4724 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4725 }
4726
4727 for pending_l0_arc in &ctx.pending_flush_l0s {
4729 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4730 }
4731 }
4732 }
4733
4734 pub(crate) fn scan_single_l0(
4736 &self,
4737 l0: &uni_store::runtime::L0Buffer,
4738 type_id: u32,
4739 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4740 ) {
4741 for edge_entry in l0.graph.edges() {
4742 if edge_entry.edge_type == type_id {
4743 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4744 }
4745 }
4746 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4748 for eid in eids_to_check {
4749 if l0.is_tombstoned(eid) {
4750 edges.remove(&eid);
4751 }
4752 }
4753 }
4754
4755 pub(crate) fn filter_tombstoned_vertex_edges(
4757 &self,
4758 ctx: &QueryContext,
4759 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4760 ) {
4761 let l0 = ctx.l0.read();
4762 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4763
4764 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4766 let tx_l0 = tx_l0_arc.read();
4767 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4768 }
4769
4770 for pending_l0_arc in &ctx.pending_flush_l0s {
4772 let pending_l0 = pending_l0_arc.read();
4773 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4774 }
4775
4776 edges.retain(|_, (src, dst)| {
4777 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4778 });
4779 }
4780
4781 pub(crate) async fn execute_project(
4783 &self,
4784 input_rows: Vec<HashMap<String, Value>>,
4785 projections: &[(Expr, Option<String>)],
4786 prop_manager: &PropertyManager,
4787 params: &HashMap<String, Value>,
4788 ctx: Option<&QueryContext>,
4789 ) -> Result<Vec<HashMap<String, Value>>> {
4790 let mut results = Vec::new();
4791 for m in input_rows {
4792 let mut row = HashMap::new();
4793 for (expr, alias) in projections {
4794 let val = self
4795 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4796 .await?;
4797 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4798 row.insert(name, val);
4799 }
4800 results.push(row);
4801 }
4802 Ok(results)
4803 }
4804
4805 pub(crate) async fn execute_unwind(
4807 &self,
4808 input_rows: Vec<HashMap<String, Value>>,
4809 expr: &Expr,
4810 variable: &str,
4811 prop_manager: &PropertyManager,
4812 params: &HashMap<String, Value>,
4813 ctx: Option<&QueryContext>,
4814 ) -> Result<Vec<HashMap<String, Value>>> {
4815 let mut results = Vec::new();
4816 for row in input_rows {
4817 let val = self
4818 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4819 .await?;
4820 if let Value::List(items) = val {
4821 for item in items {
4822 let mut new_row = row.clone();
4823 new_row.insert(variable.to_string(), item);
4824 results.push(new_row);
4825 }
4826 }
4827 }
4828 Ok(results)
4829 }
4830
4831 pub(crate) async fn execute_apply(
4833 &self,
4834 input_rows: Vec<HashMap<String, Value>>,
4835 subquery: &LogicalPlan,
4836 input_filter: Option<&Expr>,
4837 prop_manager: &PropertyManager,
4838 params: &HashMap<String, Value>,
4839 ctx: Option<&QueryContext>,
4840 ) -> Result<Vec<HashMap<String, Value>>> {
4841 let mut filtered_rows = input_rows;
4842
4843 if let Some(filter) = input_filter {
4844 let mut filtered = Vec::new();
4845 for row in filtered_rows {
4846 let res = self
4847 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4848 .await?;
4849 if res.as_bool().unwrap_or(false) {
4850 filtered.push(row);
4851 }
4852 }
4853 filtered_rows = filtered;
4854 }
4855
4856 if filtered_rows.is_empty() {
4859 let sub_rows = self
4860 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4861 .await?;
4862 return Ok(sub_rows);
4863 }
4864
4865 let mut results = Vec::new();
4866 for row in filtered_rows {
4867 let mut sub_params = params.clone();
4868 sub_params.extend(row.clone());
4869
4870 let sub_rows = self
4871 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4872 .await?;
4873
4874 for sub_row in sub_rows {
4875 let mut new_row = row.clone();
4876 new_row.extend(sub_row);
4877 results.push(new_row);
4878 }
4879 }
4880 Ok(results)
4881 }
4882
4883 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4885 let schema = self.storage.schema_manager().schema();
4886 let mut rows = Vec::new();
4887 for idx in &schema.indexes {
4888 let (name, type_str, details) = match idx {
4889 uni_common::core::schema::IndexDefinition::Vector(c) => (
4890 c.name.clone(),
4891 "VECTOR",
4892 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4893 ),
4894 uni_common::core::schema::IndexDefinition::FullText(c) => (
4895 c.name.clone(),
4896 "FULLTEXT",
4897 format!("on {}:{:?}", c.label, c.properties),
4898 ),
4899 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4900 cfg.name.clone(),
4901 "SCALAR",
4902 format!(":{}({:?})", cfg.label, cfg.properties),
4903 ),
4904 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4905 };
4906
4907 if let Some(f) = filter
4908 && f != type_str
4909 {
4910 continue;
4911 }
4912
4913 let mut row = HashMap::new();
4914 row.insert("name".to_string(), Value::String(name));
4915 row.insert("type".to_string(), Value::String(type_str.to_string()));
4916 row.insert("details".to_string(), Value::String(details));
4917 rows.push(row);
4918 }
4919 rows
4920 }
4921
4922 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4923 let mut row = HashMap::new();
4924 row.insert("name".to_string(), Value::String("uni".to_string()));
4925 vec![row]
4927 }
4928
4929 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4930 vec![]
4932 }
4933
4934 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4935 let snapshot = self
4936 .storage
4937 .snapshot_manager()
4938 .load_latest_snapshot()
4939 .await?;
4940 let mut results = Vec::new();
4941
4942 if let Some(snap) = snapshot {
4943 for (label, s) in &snap.vertices {
4944 let mut row = HashMap::new();
4945 row.insert("type".to_string(), Value::String("Label".to_string()));
4946 row.insert("name".to_string(), Value::String(label.clone()));
4947 row.insert("count".to_string(), Value::Int(s.count as i64));
4948 results.push(row);
4949 }
4950 for (edge, s) in &snap.edges {
4951 let mut row = HashMap::new();
4952 row.insert("type".to_string(), Value::String("Edge".to_string()));
4953 row.insert("name".to_string(), Value::String(edge.clone()));
4954 row.insert("count".to_string(), Value::Int(s.count as i64));
4955 results.push(row);
4956 }
4957 }
4958
4959 Ok(results)
4960 }
4961
4962 pub(crate) fn execute_show_constraints(
4963 &self,
4964 clause: ShowConstraints,
4965 ) -> Vec<HashMap<String, Value>> {
4966 let schema = self.storage.schema_manager().schema();
4967 let mut rows = Vec::new();
4968 for c in &schema.constraints {
4969 if let Some(target) = &clause.target {
4970 match (target, &c.target) {
4971 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4972 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4973 if e1 == e2 => {}
4974 _ => continue,
4975 }
4976 }
4977
4978 let mut row = HashMap::new();
4979 row.insert("name".to_string(), Value::String(c.name.clone()));
4980 let type_str = match c.constraint_type {
4981 ConstraintType::Unique { .. } => "UNIQUE",
4982 ConstraintType::Exists { .. } => "EXISTS",
4983 ConstraintType::Check { .. } => "CHECK",
4984 _ => "UNKNOWN",
4985 };
4986 row.insert("type".to_string(), Value::String(type_str.to_string()));
4987
4988 let target_str = match &c.target {
4989 ConstraintTarget::Label(l) => format!("(:{})", l),
4990 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4991 _ => "UNKNOWN".to_string(),
4992 };
4993 row.insert("target".to_string(), Value::String(target_str));
4994
4995 rows.push(row);
4996 }
4997 rows
4998 }
4999
5000 pub(crate) async fn execute_cross_join(
5002 &self,
5003 left: Box<LogicalPlan>,
5004 right: Box<LogicalPlan>,
5005 prop_manager: &PropertyManager,
5006 params: &HashMap<String, Value>,
5007 ctx: Option<&QueryContext>,
5008 ) -> Result<Vec<HashMap<String, Value>>> {
5009 let left_rows = self
5010 .execute_subplan(*left, prop_manager, params, ctx)
5011 .await?;
5012 let right_rows = self
5013 .execute_subplan(*right, prop_manager, params, ctx)
5014 .await?;
5015
5016 let mut results = Vec::new();
5017 for l in &left_rows {
5018 for r in &right_rows {
5019 let mut combined = l.clone();
5020 combined.extend(r.clone());
5021 results.push(combined);
5022 }
5023 }
5024 Ok(results)
5025 }
5026
5027 pub(crate) async fn execute_union(
5029 &self,
5030 left: Box<LogicalPlan>,
5031 right: Box<LogicalPlan>,
5032 all: bool,
5033 prop_manager: &PropertyManager,
5034 params: &HashMap<String, Value>,
5035 ctx: Option<&QueryContext>,
5036 ) -> Result<Vec<HashMap<String, Value>>> {
5037 let mut left_rows = self
5038 .execute_subplan(*left, prop_manager, params, ctx)
5039 .await?;
5040 let mut right_rows = self
5041 .execute_subplan(*right, prop_manager, params, ctx)
5042 .await?;
5043
5044 left_rows.append(&mut right_rows);
5045
5046 if !all {
5047 let mut seen = HashSet::new();
5048 left_rows.retain(|row| {
5049 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5050 let key = format!("{sorted_row:?}");
5051 seen.insert(key)
5052 });
5053 }
5054 Ok(left_rows)
5055 }
5056
5057 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5059 let schema = self.storage.schema_manager().schema();
5060 schema.indexes.iter().any(|idx| match idx {
5061 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5062 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5063 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5064 _ => false,
5065 })
5066 }
5067
5068 pub(crate) async fn execute_export(
5069 &self,
5070 target: &str,
5071 source: &str,
5072 options: &HashMap<String, Value>,
5073 prop_manager: &PropertyManager,
5074 ctx: Option<&QueryContext>,
5075 ) -> Result<Vec<HashMap<String, Value>>> {
5076 let format = options
5077 .get("format")
5078 .and_then(|v| v.as_str())
5079 .unwrap_or("csv")
5080 .to_lowercase();
5081
5082 match format.as_str() {
5083 "csv" => {
5084 self.execute_csv_export(target, source, options, prop_manager, ctx)
5085 .await
5086 }
5087 "parquet" => {
5088 self.execute_parquet_export(target, source, options, prop_manager, ctx)
5089 .await
5090 }
5091 _ => Err(anyhow!("Unsupported export format: {}", format)),
5092 }
5093 }
5094
5095 pub(crate) async fn execute_csv_export(
5096 &self,
5097 target: &str,
5098 source: &str,
5099 options: &HashMap<String, Value>,
5100 prop_manager: &PropertyManager,
5101 ctx: Option<&QueryContext>,
5102 ) -> Result<Vec<HashMap<String, Value>>> {
5103 let validated_dest = self.validate_path(source)?;
5105
5106 let schema = self.storage.schema_manager().schema();
5107 let label_meta = schema.labels.get(target);
5108 let edge_meta = schema.edge_types.get(target);
5109
5110 if label_meta.is_none() && edge_meta.is_none() {
5111 return Err(anyhow!("Target '{}' not found in schema", target));
5112 }
5113
5114 let delimiter_str = options
5115 .get("delimiter")
5116 .and_then(|v| v.as_str())
5117 .unwrap_or(",");
5118 let delimiter = if delimiter_str.is_empty() {
5119 b','
5120 } else {
5121 delimiter_str.as_bytes()[0]
5122 };
5123 let has_header = options
5124 .get("header")
5125 .and_then(|v| v.as_bool())
5126 .unwrap_or(true);
5127
5128 let mut wtr = csv::WriterBuilder::new()
5129 .delimiter(delimiter)
5130 .from_path(&validated_dest)?;
5131
5132 let mut count = 0;
5133 let empty_props = HashMap::new();
5135
5136 if let Some(meta) = label_meta {
5137 let label_id = meta.id;
5138 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5139 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5140 prop_names.sort();
5141
5142 let mut headers = vec!["_vid".to_string()];
5143 headers.extend(prop_names.clone());
5144
5145 if has_header {
5146 wtr.write_record(&headers)?;
5147 }
5148
5149 let vids = self
5150 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5151 .await?;
5152
5153 for vid in vids {
5154 let props = prop_manager
5155 .get_all_vertex_props_with_ctx(vid, ctx)
5156 .await?
5157 .unwrap_or_default();
5158
5159 let mut row = Vec::with_capacity(headers.len());
5160 row.push(vid.to_string());
5161 for p_name in &prop_names {
5162 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5163 row.push(self.format_csv_value(val));
5164 }
5165 wtr.write_record(&row)?;
5166 count += 1;
5167 }
5168 } else if let Some(meta) = edge_meta {
5169 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5170 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5171 prop_names.sort();
5172
5173 let mut headers = vec![
5175 "_eid".to_string(),
5176 "_src".to_string(),
5177 "_dst".to_string(),
5178 "_type".to_string(),
5179 ];
5180 headers.extend(prop_names.clone());
5181
5182 if has_header {
5183 wtr.write_record(&headers)?;
5184 }
5185
5186 let edges = self.scan_edge_type(target, ctx).await?;
5187
5188 for (eid, src, dst) in edges {
5189 let props = prop_manager
5190 .get_all_edge_props_with_ctx(eid, ctx)
5191 .await?
5192 .unwrap_or_default();
5193
5194 let mut row = Vec::with_capacity(headers.len());
5195 row.push(eid.to_string());
5196 row.push(src.to_string());
5197 row.push(dst.to_string());
5198 row.push(meta.id.to_string());
5199
5200 for p_name in &prop_names {
5201 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5202 row.push(self.format_csv_value(val));
5203 }
5204 wtr.write_record(&row)?;
5205 count += 1;
5206 }
5207 }
5208
5209 wtr.flush()?;
5210 let mut res = HashMap::new();
5211 res.insert("count".to_string(), Value::Int(count as i64));
5212 Ok(vec![res])
5213 }
5214
5215 pub(crate) async fn execute_parquet_export(
5219 &self,
5220 target: &str,
5221 destination: &str,
5222 _options: &HashMap<String, Value>,
5223 prop_manager: &PropertyManager,
5224 ctx: Option<&QueryContext>,
5225 ) -> Result<Vec<HashMap<String, Value>>> {
5226 let schema_manager = self.storage.schema_manager();
5227 let schema = schema_manager.schema();
5228 let label_meta = schema.labels.get(target);
5229 let edge_meta = schema.edge_types.get(target);
5230
5231 if label_meta.is_none() && edge_meta.is_none() {
5232 return Err(anyhow!("Target '{}' not found in schema", target));
5233 }
5234
5235 let arrow_schema = if label_meta.is_some() {
5236 let dataset = self.storage.vertex_dataset(target)?;
5237 dataset.get_arrow_schema(&schema)?
5238 } else {
5239 let dataset = self.storage.edge_dataset(target, "", "")?;
5241 dataset.get_arrow_schema(&schema)?
5242 };
5243
5244 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5245
5246 if let Some(meta) = label_meta {
5247 let label_id = meta.id;
5248 let vids = self
5249 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5250 .await?;
5251
5252 for vid in vids {
5253 let mut props = prop_manager
5254 .get_all_vertex_props_with_ctx(vid, ctx)
5255 .await?
5256 .unwrap_or_default();
5257
5258 props.insert(
5259 "_vid".to_string(),
5260 uni_common::Value::Int(vid.as_u64() as i64),
5261 );
5262 if !props.contains_key("_uid") {
5263 props.insert(
5264 "_uid".to_string(),
5265 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5266 );
5267 }
5268 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5269 props.insert("_version".to_string(), uni_common::Value::Int(1));
5270 rows.push(props);
5271 }
5272 } else if edge_meta.is_some() {
5273 let edges = self.scan_edge_type(target, ctx).await?;
5274 for (eid, src, dst) in edges {
5275 let mut props = prop_manager
5276 .get_all_edge_props_with_ctx(eid, ctx)
5277 .await?
5278 .unwrap_or_default();
5279
5280 props.insert(
5281 "eid".to_string(),
5282 uni_common::Value::Int(eid.as_u64() as i64),
5283 );
5284 props.insert(
5285 "src_vid".to_string(),
5286 uni_common::Value::Int(src.as_u64() as i64),
5287 );
5288 props.insert(
5289 "dst_vid".to_string(),
5290 uni_common::Value::Int(dst.as_u64() as i64),
5291 );
5292 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5293 props.insert("_version".to_string(), uni_common::Value::Int(1));
5294 rows.push(props);
5295 }
5296 }
5297
5298 if is_cloud_url(destination) {
5300 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5301 .await?;
5302 } else {
5303 let validated_dest = self.validate_path(destination)?;
5305 let file = std::fs::File::create(&validated_dest)?;
5306 let mut writer =
5307 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5308
5309 if !rows.is_empty() {
5311 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5312 writer.write(&batch)?;
5313 }
5314
5315 writer.close()?;
5316 }
5317
5318 let mut res = HashMap::new();
5319 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5320 Ok(vec![res])
5321 }
5322
5323 async fn write_parquet_to_cloud(
5325 &self,
5326 dest_url: &str,
5327 rows: &[HashMap<String, uni_common::Value>],
5328 arrow_schema: &arrow_schema::Schema,
5329 ) -> Result<()> {
5330 use object_store::ObjectStoreExt;
5331
5332 let (store, path) = build_store_from_url(dest_url)?;
5333
5334 let mut buffer = Vec::new();
5336 {
5337 let mut writer = parquet::arrow::ArrowWriter::try_new(
5338 &mut buffer,
5339 Arc::new(arrow_schema.clone()),
5340 None,
5341 )?;
5342
5343 if !rows.is_empty() {
5344 let batch = self.rows_to_batch(rows, arrow_schema)?;
5345 writer.write(&batch)?;
5346 }
5347
5348 writer.close()?;
5349 }
5350
5351 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5353
5354 Ok(())
5355 }
5356
5357 pub(crate) fn rows_to_batch(
5358 &self,
5359 rows: &[HashMap<String, uni_common::Value>],
5360 schema: &arrow_schema::Schema,
5361 ) -> Result<RecordBatch> {
5362 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5363
5364 for field in schema.fields() {
5365 let name = field.name();
5366 let dt = field.data_type();
5367
5368 let values: Vec<uni_common::Value> = rows
5369 .iter()
5370 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5371 .collect();
5372 let array = self.values_to_array(&values, dt)?;
5373 columns.push(array);
5374 }
5375
5376 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5377 }
5378
5379 pub(crate) fn values_to_array(
5382 &self,
5383 values: &[uni_common::Value],
5384 dt: &arrow_schema::DataType,
5385 ) -> Result<Arc<dyn Array>> {
5386 arrow_convert::values_to_array(values, dt)
5387 }
5388
5389 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5390 match val {
5391 Value::Null => "".to_string(),
5392 Value::String(s) => s,
5393 Value::Int(i) => i.to_string(),
5394 Value::Float(f) => f.to_string(),
5395 Value::Bool(b) => b.to_string(),
5396 _ => format!("{val}"),
5397 }
5398 }
5399
5400 pub(crate) fn parse_csv_value(
5401 &self,
5402 s: &str,
5403 data_type: &uni_common::core::schema::DataType,
5404 prop_name: &str,
5405 ) -> Result<Value> {
5406 if s.is_empty() || s.to_lowercase() == "null" {
5407 return Ok(Value::Null);
5408 }
5409
5410 use uni_common::core::schema::DataType;
5411 match data_type {
5412 DataType::String => Ok(Value::String(s.to_string())),
5413 DataType::Int32 | DataType::Int64 => {
5414 let i = s.parse::<i64>().map_err(|_| {
5415 anyhow!(
5416 "Failed to parse integer for property '{}': {}",
5417 prop_name,
5418 s
5419 )
5420 })?;
5421 Ok(Value::Int(i))
5422 }
5423 DataType::Float32 | DataType::Float64 => {
5424 let f = s.parse::<f64>().map_err(|_| {
5425 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5426 })?;
5427 Ok(Value::Float(f))
5428 }
5429 DataType::Bool => {
5430 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5431 anyhow!(
5432 "Failed to parse boolean for property '{}': {}",
5433 prop_name,
5434 s
5435 )
5436 })?;
5437 Ok(Value::Bool(b))
5438 }
5439 DataType::CypherValue => {
5440 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5441 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5442 })?;
5443 Ok(Value::from(json_val))
5444 }
5445 DataType::Vector { .. } => {
5446 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5447 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5448 })?;
5449 Ok(Value::Vector(v))
5450 }
5451 _ => Ok(Value::String(s.to_string())),
5452 }
5453 }
5454
5455 pub(crate) async fn detach_delete_vertex(
5456 &self,
5457 vid: Vid,
5458 writer: &Writer,
5459 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5460 ) -> Result<()> {
5461 let schema = self.storage.schema_manager().schema();
5462 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5463
5464 let out_graph = self
5466 .storage
5467 .load_subgraph_cached(
5468 &[vid],
5469 &edge_type_ids,
5470 1,
5471 uni_store::runtime::Direction::Outgoing,
5472 Some(writer.l0_manager.get_current()),
5473 )
5474 .await?;
5475
5476 for edge in out_graph.edges() {
5477 writer
5478 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5479 .await?;
5480 }
5481
5482 let in_graph = self
5484 .storage
5485 .load_subgraph_cached(
5486 &[vid],
5487 &edge_type_ids,
5488 1,
5489 uni_store::runtime::Direction::Incoming,
5490 Some(writer.l0_manager.get_current()),
5491 )
5492 .await?;
5493
5494 for edge in in_graph.edges() {
5495 writer
5496 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5497 .await?;
5498 }
5499
5500 Ok(())
5501 }
5502
5503 async fn batch_load_incident_edges(
5514 &self,
5515 vids: &[Vid],
5516 writer: &Writer,
5517 ) -> Result<(
5518 uni_store::runtime::working_graph::WorkingGraph,
5519 uni_store::runtime::working_graph::WorkingGraph,
5520 )> {
5521 let schema = self.storage.schema_manager().schema();
5522 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5523 let l0 = Some(writer.l0_manager.get_current());
5524
5525 let out_graph = self
5526 .storage
5527 .load_subgraph_cached(
5528 vids,
5529 &edge_type_ids,
5530 1,
5531 uni_store::runtime::Direction::Outgoing,
5532 l0.clone(),
5533 )
5534 .await?;
5535 let in_graph = self
5536 .storage
5537 .load_subgraph_cached(
5538 vids,
5539 &edge_type_ids,
5540 1,
5541 uni_store::runtime::Direction::Incoming,
5542 l0,
5543 )
5544 .await?;
5545 Ok((out_graph, in_graph))
5546 }
5547
5548 pub(crate) async fn batch_detach_delete_vertices(
5550 &self,
5551 vids: &[Vid],
5552 labels_per_vid: Vec<Option<Vec<String>>>,
5553 writer: &Writer,
5554 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5555 ) -> Result<()> {
5556 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5557
5558 for edge in out_graph.edges() {
5559 writer
5560 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5561 .await?;
5562 }
5563 for edge in in_graph.edges() {
5564 writer
5565 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5566 .await?;
5567 }
5568
5569 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5570 writer.delete_vertex(*vid, labels, tx_l0).await?;
5571 }
5572
5573 Ok(())
5574 }
5575
5576 pub(crate) async fn batch_check_vertices_have_no_edges(
5594 &self,
5595 vids: &[Vid],
5596 writer: &Writer,
5597 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5598 ) -> Result<()> {
5599 if vids.is_empty() {
5600 return Ok(());
5601 }
5602
5603 let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5604 std::collections::HashSet::new();
5605 {
5606 let writer_l0 = writer.l0_manager.get_current();
5607 let guard = writer_l0.read();
5608 for &eid in guard.tombstones.keys() {
5609 tombstoned_eids.insert(eid);
5610 }
5611 }
5612 if let Some(tx) = tx_l0 {
5613 let guard = tx.read();
5614 for &eid in guard.tombstones.keys() {
5615 tombstoned_eids.insert(eid);
5616 }
5617 }
5618
5619 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5620
5621 for edge in out_graph.edges().chain(in_graph.edges()) {
5622 if !tombstoned_eids.contains(&edge.eid) {
5623 return Err(anyhow!(
5624 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5625 edge.src_vid
5626 ));
5627 }
5628 }
5629 Ok(())
5630 }
5631}