1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73fn collect_l0_vids(
78 ctx: Option<&QueryContext>,
79 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81 let mut vids = Vec::new();
82 if let Some(ctx) = ctx {
83 vids.extend(extractor(&ctx.l0.read()));
84 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85 vids.extend(extractor(&tx_l0_arc.read()));
86 }
87 for pending_l0_arc in &ctx.pending_flush_l0s {
88 vids.extend(extractor(&pending_l0_arc.read()));
89 }
90 }
91 vids
92}
93
94async fn hydrate_entity_if_needed(
103 map: &mut HashMap<String, Value>,
104 prop_manager: &PropertyManager,
105 ctx: Option<&QueryContext>,
106) {
107 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110 tracing::debug!(
111 "Pushdown fallback: hydrating edge {} at execution time",
112 eid_u64
113 );
114 if let Ok(Some(props)) = prop_manager
115 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116 .await
117 {
118 for (key, value) in props {
119 map.entry(key).or_insert(value);
120 }
121 }
122 } else {
123 tracing::trace!(
124 "Pushdown success: edge {} already has {} properties",
125 eid_u64,
126 map.len() - EDGE_SYSTEM_FIELD_COUNT
127 );
128 }
129 return;
130 }
131
132 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135 tracing::debug!(
136 "Pushdown fallback: hydrating vertex {} at execution time",
137 vid_u64
138 );
139 if let Ok(Some(props)) = prop_manager
140 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141 .await
142 {
143 for (key, value) in props {
144 map.entry(key).or_insert(value);
145 }
146 }
147 } else {
148 tracing::trace!(
149 "Pushdown success: vertex {} already has {} properties",
150 vid_u64,
151 map.len() - VERTEX_SYSTEM_FIELD_COUNT
152 );
153 }
154 }
155}
156
157fn promote_vid_placeholder(row: &mut HashMap<String, Value>, var: &str) {
163 let prefix = format!("{}.", var);
164 let mut map = HashMap::new();
165
166 let dotted_keys: Vec<String> = row
167 .keys()
168 .filter(|k| k.starts_with(&prefix))
169 .cloned()
170 .collect();
171
172 for key in &dotted_keys {
173 let prop_name = &key[prefix.len()..];
174 if let Some(val) = row.remove(key) {
175 map.insert(prop_name.to_string(), val);
176 }
177 }
178
179 row.insert(var.to_string(), Value::Map(map));
181}
182
183fn merge_dotted_columns(row: &mut HashMap<String, Value>, var: &str) {
193 let vid_key = format!("{}._vid", var);
195 let labels_key = format!("{}._labels", var);
196
197 let vid_val = row.remove(&vid_key);
198 let labels_val = row.remove(&labels_key);
199
200 if let Some(Value::Map(map)) = row.get_mut(var) {
201 if let Some(v) = vid_val {
202 map.insert("_vid".to_string(), v);
203 }
204 if let Some(v) = labels_val {
205 map.insert("_labels".to_string(), v);
206 }
207 }
208
209 let eid_key = format!("{}._eid", var);
214 let type_key = format!("{}._type", var);
215
216 let eid_val = row.remove(&eid_key);
217 let type_val = row.remove(&type_key);
218
219 if (eid_val.is_some() || type_val.is_some())
220 && let Some(Value::Map(map)) = row.get_mut(var)
221 {
222 if let Some(v) = eid_val {
223 map.entry("_eid".to_string()).or_insert(v);
224 }
225 if let Some(v) = type_val {
226 map.entry("_type".to_string()).or_insert(v);
227 }
228 }
229
230 let prefix = format!("{}.", var);
232 let dotted_keys: Vec<String> = row
233 .keys()
234 .filter(|k| k.starts_with(&prefix))
235 .cloned()
236 .collect();
237 for key in dotted_keys {
238 let prop_name = key[prefix.len()..].to_string();
239 let val = row.remove(&key);
240 if prop_name.starts_with('_') || prop_name == "overflow_json" {
241 continue;
242 }
243 if let (Some(val), Some(Value::Map(map))) = (val, row.get_mut(var)) {
244 map.insert(prop_name, val);
245 }
246 }
247}
248
249impl Executor {
250 async fn verify_and_filter_candidates(
255 &self,
256 mut candidates: Vec<Vid>,
257 variable: &str,
258 filter: Option<&Expr>,
259 ctx: Option<&QueryContext>,
260 prop_manager: &PropertyManager,
261 params: &HashMap<String, Value>,
262 ) -> Result<Vec<Vid>> {
263 candidates.sort_unstable();
264 candidates.dedup();
265
266 let mut verified_vids = Vec::new();
267 for vid in candidates {
268 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
269 continue; };
271
272 if let Some(expr) = filter {
273 let mut props_map: HashMap<String, Value> = props;
274 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
275
276 let mut row = HashMap::new();
277 row.insert(variable.to_string(), Value::Map(props_map));
278
279 let res = self
280 .evaluate_expr(expr, &row, prop_manager, params, ctx)
281 .await?;
282 if res.as_bool().unwrap_or(false) {
283 verified_vids.push(vid);
284 }
285 } else {
286 verified_vids.push(vid);
287 }
288 }
289
290 Ok(verified_vids)
291 }
292
293 pub(crate) async fn scan_storage_candidates(
294 &self,
295 label_id: u16,
296 variable: &str,
297 filter: Option<&Expr>,
298 ) -> Result<Vec<Vid>> {
299 let schema = self.storage.schema_manager().schema();
300 let label_name = schema
301 .label_name_by_id(label_id)
302 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
303
304 let empty_props = std::collections::HashMap::new();
306 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
307 let filter_sql = filter.and_then(|expr| {
308 LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
309 });
310
311 self.storage
312 .scan_vertex_candidates(label_name, filter_sql.as_deref())
313 .await
314 }
315
316 pub(crate) async fn scan_label_with_filter(
317 &self,
318 label_id: u16,
319 variable: &str,
320 filter: Option<&Expr>,
321 ctx: Option<&QueryContext>,
322 prop_manager: &PropertyManager,
323 params: &HashMap<String, Value>,
324 ) -> Result<Vec<Vid>> {
325 let mut candidates = self
326 .scan_storage_candidates(label_id, variable, filter)
327 .await?;
328
329 let schema = self.storage.schema_manager().schema();
331 if let Some(label_name) = schema.label_name_by_id(label_id) {
332 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
333 }
334
335 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
336 .await
337 }
338
339 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
340 if let Value::Node(node) = val {
342 return Ok(node.vid);
343 }
344 if let Value::Map(map) = val
346 && let Some(vid_val) = map.get("_vid")
347 && let Some(v) = vid_val.as_u64()
348 {
349 return Ok(Vid::from(v));
350 }
351 if let Some(s) = val.as_str()
353 && let Ok(id) = s.parse::<u64>()
354 {
355 return Ok(Vid::new(id));
356 }
357 if let Some(v) = val.as_u64() {
359 return Ok(Vid::from(v));
360 }
361 Err(anyhow!("Invalid Vid format: {:?}", val))
362 }
363
364 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
370 for val in row.values() {
371 if let Ok(vid) = Self::vid_from_value(val)
372 && vid == target_vid
373 {
374 return val.clone();
375 }
376 }
377 Value::Map(HashMap::from([(
379 "_vid".to_string(),
380 Value::Int(target_vid.as_u64() as i64),
381 )]))
382 }
383
384 pub async fn create_datafusion_planner(
389 &self,
390 prop_manager: &PropertyManager,
391 params: &HashMap<String, Value>,
392 ) -> Result<(
393 Arc<SyncRwLock<SessionContext>>,
394 HybridPhysicalPlanner,
395 Arc<PropertyManager>,
396 )> {
397 let query_ctx = self.get_context().await;
398 let l0_context = match query_ctx {
399 Some(ref ctx) => L0Context::from_query_context(ctx),
400 None => L0Context::empty(),
401 };
402
403 let effective_storage = self.effective_storage();
412
413 let prop_manager_arc = if let Some(shared) = self.prop_manager_arc.as_ref() {
419 shared.clone()
420 } else {
421 Arc::new(PropertyManager::new(
422 self.storage.clone(),
423 self.storage.schema_manager_arc(),
424 prop_manager.cache_size(),
425 ))
426 };
427
428 let has_custom_udfs = self
447 .custom_function_registry
448 .as_ref()
449 .is_some_and(|r| !r.is_empty());
450 let host_plugin_registry = self
451 .procedure_registry
452 .as_ref()
453 .and_then(|pr| pr.plugin_registry());
454 let optimizer_providers = host_plugin_registry
458 .as_ref()
459 .map(|pr| pr.optimizer_rules())
460 .unwrap_or_default();
461 let session_local_registry =
462 crate::query::df_udfs_plugin::current_session_plugin_registry();
463 let needs_dynamic_registration = has_custom_udfs
464 || !optimizer_providers.is_empty()
465 || host_plugin_registry.is_some()
466 || session_local_registry.is_some();
467
468 let session = if let (Some(tmpl), false) = (
469 self.df_session_template.as_ref(),
470 needs_dynamic_registration,
471 ) {
472 (**tmpl).clone()
474 } else {
475 let session = if optimizer_providers.is_empty() {
483 SessionContext::new()
484 } else {
485 use datafusion::execution::session_state::SessionStateBuilder;
486 use uni_plugin::traits::operator::OptimizerPhase;
487 let mut builder = SessionStateBuilder::new().with_default_features();
488 for provider in optimizer_providers.iter() {
489 match provider.phase() {
490 OptimizerPhase::Logical => {
491 builder = builder.with_optimizer_rule(provider.rule());
492 }
493 OptimizerPhase::Physical => {
494 if let Some(rule) = provider.physical_rule() {
495 builder = builder.with_physical_optimizer_rule(rule);
496 } else {
497 tracing::debug!(
498 target: "uni.plugin.registry",
499 "physical-phase provider returned no physical_rule(); skipping"
500 );
501 }
502 }
503 OptimizerPhase::Both => {
504 builder = builder.with_optimizer_rule(provider.rule());
505 if let Some(rule) = provider.physical_rule() {
506 builder = builder.with_physical_optimizer_rule(rule);
507 }
508 }
509 _ => {
510 tracing::debug!(
511 target: "uni.plugin.registry",
512 "skipping optimizer rule for unknown phase"
513 );
514 }
515 }
516 }
517 let state = builder.build();
518 SessionContext::new_with_state(state)
519 };
520 crate::query::df_udfs::register_cypher_udfs(&session)?;
521 if let Some(ref registry) = self.custom_function_registry {
525 crate::query::df_udfs_plugin::register_custom_functions_as_plugin_scalars(
526 &session, registry,
527 )?;
528 }
529 if let Some(ref host_pr) = host_plugin_registry {
540 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, host_pr)?;
541 }
542 if let Some(ref session_local) = session_local_registry {
548 crate::query::df_udfs_plugin::register_plugin_scalar_udfs(&session, session_local)?;
549 }
550 session
551 };
552 let session_ctx = Arc::new(SyncRwLock::new(session));
553
554 let mut planner = HybridPhysicalPlanner::with_l0_context(
555 session_ctx.clone(),
556 effective_storage.clone(),
557 l0_context,
558 prop_manager_arc.clone(),
559 effective_storage.schema_manager().schema(),
560 params.clone(),
561 HashMap::new(),
562 );
563
564 planner = planner.with_algo_registry(self.algo_registry.clone());
565 if let Some(ref registry) = self.procedure_registry {
566 planner = planner.with_procedure_registry(registry.clone());
567 }
568 let host_plugin_registry = self
577 .procedure_registry
578 .as_ref()
579 .and_then(|r| r.plugin_registry());
580 if let Some(registry) = host_plugin_registry {
581 planner = planner.with_plugin_registry(registry);
582 }
583 if let Some(ref xervo_runtime) = self.xervo_runtime {
584 planner = planner.with_xervo_runtime(xervo_runtime.clone());
585 }
586 if let Some(writer) = &self.writer {
591 planner = planner.with_writer(Arc::clone(writer));
592 }
593
594 Ok((session_ctx, planner, prop_manager_arc))
595 }
596
597 pub fn collect_batches(
599 session_ctx: &Arc<SyncRwLock<SessionContext>>,
600 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
601 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
602 Box::pin(async move {
603 use futures::TryStreamExt;
604
605 let task_ctx = session_ctx.read().task_ctx();
606 let partition_count = execution_plan.output_partitioning().partition_count();
607 let mut all_batches = Vec::new();
608 for partition in 0..partition_count {
609 let stream = execution_plan.execute(partition, task_ctx.clone())?;
610 let batches: Vec<RecordBatch> = stream.try_collect().await?;
611 all_batches.extend(batches);
612 }
613 Ok(all_batches)
614 })
615 }
616
617 pub async fn execute_datafusion(
622 &self,
623 plan: LogicalPlan,
624 prop_manager: &PropertyManager,
625 params: &HashMap<String, Value>,
626 ) -> Result<Vec<RecordBatch>> {
627 let (batches, _plan) = self
628 .execute_datafusion_with_plan(plan, prop_manager, params)
629 .await?;
630 Ok(batches)
631 }
632
633 pub async fn execute_datafusion_with_plan(
640 &self,
641 plan: LogicalPlan,
642 prop_manager: &PropertyManager,
643 params: &HashMap<String, Value>,
644 ) -> Result<(
645 Vec<RecordBatch>,
646 Arc<dyn datafusion::physical_plan::ExecutionPlan>,
647 )> {
648 let (session_ctx, mut planner, prop_manager_arc) =
649 self.create_datafusion_planner(prop_manager, params).await?;
650
651 if Self::contains_write_operations(&plan) {
653 let writer = self
654 .writer
655 .as_ref()
656 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
657 .clone();
658 let query_ctx = self.get_context().await;
659
660 debug_assert!(
661 query_ctx.is_some(),
662 "BUG: query_ctx is None for write operation"
663 );
664
665 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
666 executor: self.clone(),
667 writer,
668 prop_manager: prop_manager_arc,
669 params: params.clone(),
670 query_ctx,
671 tx_l0_override: self.transaction_l0_override.clone(),
672 });
673 planner = planner.with_mutation_context(mutation_ctx);
674 tracing::debug!(
675 plan_type = Self::get_plan_type(&plan),
676 "Mutation routed to DataFusion engine"
677 );
678 }
679
680 let execution_plan = planner.plan(&plan)?;
681 let plan_clone = Arc::clone(&execution_plan);
682 let result = Self::collect_batches(&session_ctx, execution_plan).await;
683
684 let graph_warnings = planner.graph_ctx().take_warnings();
686 if !graph_warnings.is_empty()
687 && let Ok(mut w) = self.warnings.lock()
688 {
689 w.extend(graph_warnings);
690 }
691
692 result.map(|batches| (batches, plan_clone))
693 }
694
695 pub(crate) async fn execute_merge_read_plan(
701 &self,
702 plan: LogicalPlan,
703 prop_manager: &PropertyManager,
704 params: &HashMap<String, Value>,
705 merge_variables: Vec<String>,
706 ) -> Result<Vec<HashMap<String, Value>>> {
707 let (session_ctx, planner, _prop_manager_arc) =
708 self.create_datafusion_planner(prop_manager, params).await?;
709
710 let extra: HashMap<String, HashSet<String>> = merge_variables
713 .iter()
714 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
715 .collect();
716 let execution_plan = planner.plan_with_properties(&plan, extra)?;
717 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
718
719 let flat_rows = self.record_batches_to_rows(all_batches)?;
721
722 let rows = flat_rows
725 .into_iter()
726 .map(|mut row| {
727 for var in &merge_variables {
728 if row.contains_key(var) {
730 continue;
731 }
732 let prefix = format!("{}.", var);
733 let dotted_keys: Vec<String> = row
734 .keys()
735 .filter(|k| k.starts_with(&prefix))
736 .cloned()
737 .collect();
738 if !dotted_keys.is_empty() {
739 let mut map = HashMap::new();
740 for key in dotted_keys {
741 let prop_name = key[prefix.len()..].to_string();
742 if let Some(val) = row.remove(&key) {
743 map.insert(prop_name, val);
744 }
745 }
746 row.insert(var.clone(), Value::Map(map));
747 }
748 }
749 row
750 })
751 .collect();
752
753 Ok(rows)
754 }
755
756 pub(crate) fn record_batches_to_rows(
763 &self,
764 batches: Vec<RecordBatch>,
765 ) -> Result<Vec<HashMap<String, Value>>> {
766 let mut rows = Vec::new();
767
768 for batch in batches {
769 let num_rows = batch.num_rows();
770 let schema = batch.schema();
771
772 for row_idx in 0..num_rows {
773 let mut row = HashMap::new();
774
775 for (col_idx, field) in schema.fields().iter().enumerate() {
776 let column = batch.column(col_idx);
777 let data_type = if field
784 .metadata()
785 .get("uni_raw_bytes")
786 .is_some_and(|v| v == "true")
787 {
788 Some(&uni_common::DataType::Bytes)
789 } else if uni_common::core::schema::is_datetime_struct(field.data_type()) {
790 Some(&uni_common::DataType::DateTime)
791 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
792 Some(&uni_common::DataType::Time)
793 } else {
794 None
795 };
796 let mut value =
797 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
798
799 if field
802 .metadata()
803 .get("cv_encoded")
804 .is_some_and(|v| v == "true")
805 && let Value::String(s) = &value
806 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
807 {
808 value = Value::from(parsed);
809 }
810
811 value = Self::normalize_path_if_needed(value);
813
814 row.insert(field.name().clone(), value);
815 }
816
817 let bare_vars: Vec<String> = row
826 .keys()
827 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
828 .cloned()
829 .collect();
830
831 let vid_placeholder_vars: Vec<String> = row
835 .keys()
836 .filter(|k| {
837 !k.contains('.')
838 && matches!(row.get(*k), Some(Value::String(_)))
839 && row.contains_key(&format!("{}._vid", k))
840 })
841 .cloned()
842 .collect();
843
844 for var in &vid_placeholder_vars {
845 promote_vid_placeholder(&mut row, var);
846 }
847
848 for var in &bare_vars {
849 merge_dotted_columns(&mut row, var);
850 }
851
852 rows.push(row);
853 }
854 }
855
856 Ok(rows)
857 }
858
859 fn normalize_path_if_needed(value: Value) -> Value {
864 match value {
865 Value::Map(map)
866 if map.contains_key("nodes")
867 && (map.contains_key("relationships") || map.contains_key("edges")) =>
868 {
869 Self::normalize_path_map(map)
870 }
871 other => other,
872 }
873 }
874
875 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
877 if let Some(Value::List(nodes)) = map.remove("nodes") {
879 let normalized_nodes: Vec<Value> = nodes
880 .into_iter()
881 .map(|n| {
882 if let Value::Map(node_map) = n {
883 Self::normalize_path_node_map(node_map)
884 } else {
885 n
886 }
887 })
888 .collect();
889 map.insert("nodes".to_string(), Value::List(normalized_nodes));
890 }
891
892 let rels_key = if map.contains_key("relationships") {
894 "relationships"
895 } else {
896 "edges"
897 };
898 if let Some(Value::List(rels)) = map.remove(rels_key) {
899 let normalized_rels: Vec<Value> = rels
900 .into_iter()
901 .map(|r| {
902 if let Value::Map(rel_map) = r {
903 Self::normalize_path_edge_map(rel_map)
904 } else {
905 r
906 }
907 })
908 .collect();
909 map.insert("relationships".to_string(), Value::List(normalized_rels));
910 }
911
912 Value::Map(map)
913 }
914
915 fn value_to_id_string(val: Value) -> String {
917 match val {
918 Value::Int(n) => n.to_string(),
919 Value::Float(n) => n.to_string(),
920 Value::String(s) => s,
921 other => other.to_string(),
922 }
923 }
924
925 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
928 if let Some(val) = map.remove(src_key) {
929 map.insert(
930 dst_key.to_string(),
931 Value::String(Self::value_to_id_string(val)),
932 );
933 }
934 }
935
936 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
938 match map.get("properties") {
939 Some(props) if !props.is_null() => {}
940 _ => {
941 map.insert("properties".to_string(), Value::Map(HashMap::new()));
942 }
943 }
944 }
945
946 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
948 Self::stringify_map_field(&mut map, "_vid", "_id");
949 Self::ensure_properties_map(&mut map);
950 Value::Map(map)
951 }
952
953 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
955 Self::stringify_map_field(&mut map, "_eid", "_id");
956 Self::stringify_map_field(&mut map, "_src", "_src");
957 Self::stringify_map_field(&mut map, "_dst", "_dst");
958
959 if let Some(type_name) = map.remove("_type_name") {
960 map.insert("_type".to_string(), type_name);
961 }
962
963 Self::ensure_properties_map(&mut map);
964 Value::Map(map)
965 }
966
967 #[instrument(
968 skip(self, prop_manager, params),
969 fields(rows_returned, duration_ms),
970 level = "info"
971 )]
972 pub fn execute<'a>(
973 &'a self,
974 plan: LogicalPlan,
975 prop_manager: &'a PropertyManager,
976 params: &'a HashMap<String, Value>,
977 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
978 Box::pin(async move {
979 let query_type = Self::get_plan_type(&plan);
980 let ctx = self.get_context().await;
981 let start = Instant::now();
982
983 let res = if Self::is_ddl_or_admin(&plan) {
986 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
987 .await
988 } else {
989 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
990 self.record_batches_to_rows(batches)
991 };
992
993 let duration = start.elapsed();
994 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
995 .record(duration.as_secs_f64());
996
997 tracing::Span::current().record("duration_ms", duration.as_millis());
998 match &res {
999 Ok(rows) => {
1000 tracing::Span::current().record("rows_returned", rows.len());
1001 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
1002 .increment(rows.len() as u64);
1003 }
1004 Err(e) => {
1005 let error_type = if e.to_string().contains("timed out") {
1006 "timeout"
1007 } else if e.to_string().contains("syntax") {
1008 "syntax"
1009 } else {
1010 "execution"
1011 };
1012 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
1013 }
1014 }
1015
1016 res
1017 })
1018 }
1019
1020 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
1021 match plan {
1022 LogicalPlan::Scan { .. } => "read_scan",
1023 LogicalPlan::FusedIndexScan { .. } => "read_fused_index_scan",
1024 LogicalPlan::FusedIndexScanWrapped { .. } => "read_fused_index_scan_wrapped",
1025 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
1026 LogicalPlan::Traverse { .. } => "read_traverse",
1027 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
1028 LogicalPlan::ScanAll { .. } => "read_scan_all",
1029 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
1030 LogicalPlan::VectorKnn { .. } => "read_vector",
1031 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
1032 LogicalPlan::Merge { .. } => "write_merge",
1033 LogicalPlan::Delete { .. } => "write_delete",
1034 LogicalPlan::Set { .. } => "write_set",
1035 LogicalPlan::Remove { .. } => "write_remove",
1036 LogicalPlan::ProcedureCall { .. } => "call",
1037 LogicalPlan::Copy { .. } => "copy",
1038 LogicalPlan::Backup { .. } => "backup",
1039 _ => "other",
1040 }
1041 }
1042
1043 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
1053 match plan {
1054 LogicalPlan::Project { input, .. }
1056 | LogicalPlan::Sort { input, .. }
1057 | LogicalPlan::Limit { input, .. }
1058 | LogicalPlan::Distinct { input }
1059 | LogicalPlan::Aggregate { input, .. }
1060 | LogicalPlan::Window { input, .. }
1061 | LogicalPlan::Unwind { input, .. }
1062 | LogicalPlan::Filter { input, .. }
1063 | LogicalPlan::Create { input, .. }
1064 | LogicalPlan::CreateBatch { input, .. }
1065 | LogicalPlan::Set { input, .. }
1066 | LogicalPlan::Remove { input, .. }
1067 | LogicalPlan::Delete { input, .. }
1068 | LogicalPlan::Merge { input, .. }
1069 | LogicalPlan::Foreach { input, .. }
1070 | LogicalPlan::Traverse { input, .. }
1071 | LogicalPlan::TraverseMainByType { input, .. }
1072 | LogicalPlan::BindZeroLengthPath { input, .. }
1073 | LogicalPlan::BindPath { input, .. }
1074 | LogicalPlan::ShortestPath { input, .. }
1075 | LogicalPlan::AllShortestPaths { input, .. }
1076 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
1077
1078 LogicalPlan::Apply {
1080 input, subquery, ..
1081 }
1082 | LogicalPlan::SubqueryCall { input, subquery } => {
1083 vec![input.as_ref(), subquery.as_ref()]
1084 }
1085 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
1086 vec![left.as_ref(), right.as_ref()]
1087 }
1088 LogicalPlan::RecursiveCTE {
1089 initial, recursive, ..
1090 } => vec![initial.as_ref(), recursive.as_ref()],
1091 LogicalPlan::QuantifiedPattern {
1092 input,
1093 pattern_plan,
1094 ..
1095 } => vec![input.as_ref(), pattern_plan.as_ref()],
1096
1097 _ => vec![],
1099 }
1100 }
1101
1102 pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
1109 match plan {
1110 LogicalPlan::CreateLabel(_)
1112 | LogicalPlan::CreateEdgeType(_)
1113 | LogicalPlan::AlterLabel(_)
1114 | LogicalPlan::AlterEdgeType(_)
1115 | LogicalPlan::DropLabel(_)
1116 | LogicalPlan::DropEdgeType(_)
1117 | LogicalPlan::CreateConstraint(_)
1118 | LogicalPlan::DropConstraint(_)
1119 | LogicalPlan::ShowConstraints(_) => true,
1120
1121 LogicalPlan::CreateVectorIndex { .. }
1123 | LogicalPlan::CreateSparseIndex { .. }
1124 | LogicalPlan::CreateFullTextIndex { .. }
1125 | LogicalPlan::CreateScalarIndex { .. }
1126 | LogicalPlan::CreateJsonFtsIndex { .. }
1127 | LogicalPlan::DropIndex { .. }
1128 | LogicalPlan::ShowIndexes { .. } => true,
1129
1130 LogicalPlan::ShowDatabase
1132 | LogicalPlan::ShowConfig
1133 | LogicalPlan::ShowStatistics
1134 | LogicalPlan::Vacuum
1135 | LogicalPlan::Checkpoint
1136 | LogicalPlan::Copy { .. }
1137 | LogicalPlan::CopyTo { .. }
1138 | LogicalPlan::CopyFrom { .. }
1139 | LogicalPlan::Backup { .. }
1140 | LogicalPlan::Explain { .. } => true,
1141
1142 LogicalPlan::ProcedureCall { procedure_name, .. } => {
1145 !Self::is_df_eligible_procedure(procedure_name)
1146 }
1147
1148 _ => Self::plan_children(plan)
1150 .iter()
1151 .any(|child| Self::is_ddl_or_admin(child)),
1152 }
1153 }
1154
1155 fn is_df_eligible_procedure(name: &str) -> bool {
1161 matches!(
1162 name,
1163 "uni.schema.labels"
1164 | "uni.schema.edgeTypes"
1165 | "uni.schema.relationshipTypes"
1166 | "uni.schema.indexes"
1167 | "uni.schema.constraints"
1168 | "uni.schema.labelInfo"
1169 | "uni.vector.query"
1170 | "uni.fts.query"
1171 | "uni.sparse.query"
1172 | "uni.search"
1173 | "uni.create.vNode"
1180 | "uni.create.vEdge"
1181 ) || name.starts_with("uni.algo.")
1182 }
1183
1184 fn contains_write_operations(plan: &LogicalPlan) -> bool {
1191 match plan {
1192 LogicalPlan::Create { .. }
1193 | LogicalPlan::CreateBatch { .. }
1194 | LogicalPlan::Merge { .. }
1195 | LogicalPlan::Delete { .. }
1196 | LogicalPlan::Set { .. }
1197 | LogicalPlan::Remove { .. }
1198 | LogicalPlan::Foreach { .. } => true,
1199 _ => Self::plan_children(plan)
1200 .iter()
1201 .any(|child| Self::contains_write_operations(child)),
1202 }
1203 }
1204
1205 pub fn execute_stream(
1210 self,
1211 plan: LogicalPlan,
1212 prop_manager: Arc<PropertyManager>,
1213 params: HashMap<String, Value>,
1214 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1215 let this = self;
1216 let this_for_ctx = this.clone();
1217
1218 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1219
1220 ctx_stream
1221 .flat_map(move |ctx| {
1222 let plan = plan.clone();
1223 let this = this.clone();
1224 let prop_manager = prop_manager.clone();
1225 let params = params.clone();
1226
1227 let fut = async move {
1228 if Self::is_ddl_or_admin(&plan) {
1229 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1230 .await
1231 } else {
1232 let batches = this
1233 .execute_datafusion(plan, &prop_manager, ¶ms)
1234 .await?;
1235 this.record_batches_to_rows(batches)
1236 }
1237 };
1238 stream::once(fut).boxed()
1239 })
1240 .boxed()
1241 }
1242
1243 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1246 arrow_convert::arrow_to_value(col, row, None)
1247 }
1248
1249 pub(crate) fn evaluate_expr<'a>(
1250 &'a self,
1251 expr: &'a Expr,
1252 row: &'a HashMap<String, Value>,
1253 prop_manager: &'a PropertyManager,
1254 params: &'a HashMap<String, Value>,
1255 ctx: Option<&'a QueryContext>,
1256 ) -> BoxFuture<'a, Result<Value>> {
1257 let this = self;
1258 Box::pin(async move {
1259 let repr = expr.to_string_repr();
1261 if let Some(val) = row.get(&repr) {
1262 return Ok(val.clone());
1263 }
1264
1265 match expr {
1266 Expr::PatternComprehension { .. } => {
1267 Err(anyhow::anyhow!(
1269 "Pattern comprehensions are handled by DataFusion executor"
1270 ))
1271 }
1272 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1273 "COLLECT subqueries not yet supported in executor"
1274 )),
1275 Expr::Variable(name) => {
1276 if let Some(val) = row.get(name) {
1277 Ok(val.clone())
1278 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1279 Ok(vid_val.clone())
1283 } else {
1284 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1285 }
1286 }
1287 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1288 Expr::Property(var_expr, prop_name) => {
1289 if let Expr::Variable(var_name) = var_expr.as_ref() {
1293 let flat_key = format!("{}.{}", var_name, prop_name);
1294 if let Some(val) = row.get(flat_key.as_str()) {
1295 return Ok(val.clone());
1296 }
1297 }
1298
1299 let base_val = this
1300 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1301 .await?;
1302
1303 if (prop_name == "_vid" || prop_name == "_id")
1305 && let Ok(vid) = Self::vid_from_value(&base_val)
1306 {
1307 return Ok(Value::Int(vid.as_u64() as i64));
1308 }
1309
1310 if let Value::Node(node) = &base_val {
1312 if prop_name == "_vid" || prop_name == "_id" {
1314 return Ok(Value::Int(node.vid.as_u64() as i64));
1315 }
1316 if prop_name == "_labels" {
1317 return Ok(Value::List(
1318 node.labels
1319 .iter()
1320 .map(|l| Value::String(l.clone()))
1321 .collect(),
1322 ));
1323 }
1324 if let Some(val) = node.properties.get(prop_name.as_str()) {
1326 return Ok(val.clone());
1327 }
1328 if let Ok(val) = prop_manager
1330 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1331 .await
1332 {
1333 return Ok(val);
1334 }
1335 return Ok(Value::Null);
1336 }
1337
1338 if let Value::Edge(edge) = &base_val {
1340 if prop_name == "_eid" || prop_name == "_id" {
1342 return Ok(Value::Int(edge.eid.as_u64() as i64));
1343 }
1344 if prop_name == "_type" {
1345 return Ok(Value::String(edge.edge_type.clone()));
1346 }
1347 if prop_name == "_src" {
1348 return Ok(Value::Int(edge.src.as_u64() as i64));
1349 }
1350 if prop_name == "_dst" {
1351 return Ok(Value::Int(edge.dst.as_u64() as i64));
1352 }
1353 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1355 return Ok(val.clone());
1356 }
1357 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1359 {
1360 return Ok(val);
1361 }
1362 return Ok(Value::Null);
1363 }
1364
1365 if let Value::Map(map) = &base_val {
1368 if let Some(val) = map.get(prop_name.as_str()) {
1370 return Ok(val.clone());
1371 }
1372 if let Some(Value::Map(props)) = map.get("properties")
1374 && let Some(val) = props.get(prop_name.as_str())
1375 {
1376 return Ok(val.clone());
1377 }
1378 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1380 map.get("_id")
1381 .and_then(|v| v.as_str())
1382 .and_then(|s| s.parse::<u64>().ok())
1383 });
1384 if let Some(id) = vid_opt {
1385 let vid = Vid::from(id);
1386 if let Ok(val) = prop_manager
1387 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1388 .await
1389 {
1390 return Ok(val);
1391 }
1392 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1393 let eid = uni_common::core::id::Eid::from(id);
1394 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1395 return Ok(val);
1396 }
1397 }
1398 return Ok(Value::Null);
1399 }
1400
1401 if let Ok(vid) = Self::vid_from_value(&base_val) {
1403 return prop_manager
1404 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1405 .await;
1406 }
1407
1408 if base_val.is_null() {
1409 return Ok(Value::Null);
1410 }
1411
1412 {
1414 use crate::query::datetime::{
1415 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1416 is_duration_string, is_temporal_accessor, is_temporal_string,
1417 };
1418
1419 if let Value::Temporal(tv) = &base_val {
1421 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1422 if is_duration_accessor(prop_name) {
1423 return eval_duration_accessor(
1425 &base_val.to_string(),
1426 prop_name,
1427 );
1428 }
1429 } else if is_temporal_accessor(prop_name) {
1430 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1431 }
1432 }
1433
1434 if let Value::String(s) = &base_val {
1436 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1437 return eval_temporal_accessor(s, prop_name);
1438 }
1439 if is_duration_string(s) && is_duration_accessor(prop_name) {
1440 return eval_duration_accessor(s, prop_name);
1441 }
1442 }
1443 }
1444
1445 Err(anyhow!(
1446 "Cannot access property '{}' on {:?}",
1447 prop_name,
1448 base_val
1449 ))
1450 }
1451 Expr::ArrayIndex {
1452 array: arr_expr,
1453 index: idx_expr,
1454 } => {
1455 let arr_val = this
1456 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1457 .await?;
1458 let idx_val = this
1459 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1460 .await?;
1461
1462 if let Value::List(arr) = &arr_val {
1463 if let Some(i) = idx_val.as_i64() {
1465 let idx = if i < 0 {
1466 let positive_idx = arr.len() as i64 + i;
1468 if positive_idx < 0 {
1469 return Ok(Value::Null); }
1471 positive_idx as usize
1472 } else {
1473 i as usize
1474 };
1475 if idx < arr.len() {
1476 return Ok(arr[idx].clone());
1477 }
1478 return Ok(Value::Null);
1479 } else if idx_val.is_null() {
1480 return Ok(Value::Null);
1481 } else {
1482 return Err(anyhow::anyhow!(
1483 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1484 idx_val
1485 ));
1486 }
1487 }
1488 if let Value::Map(map) = &arr_val {
1489 if let Some(key) = idx_val.as_str() {
1490 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1491 } else if !idx_val.is_null() {
1492 return Err(anyhow::anyhow!(
1493 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1494 idx_val
1495 ));
1496 }
1497 }
1498 if let Value::Node(node) = &arr_val {
1500 if let Some(key) = idx_val.as_str() {
1501 if let Some(val) = node.properties.get(key) {
1503 return Ok(val.clone());
1504 }
1505 if let Ok(val) = prop_manager
1507 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1508 .await
1509 {
1510 return Ok(val);
1511 }
1512 return Ok(Value::Null);
1513 } else if !idx_val.is_null() {
1514 return Err(anyhow::anyhow!(
1515 "TypeError: Node index must be a string, got: {:?}",
1516 idx_val
1517 ));
1518 }
1519 }
1520 if let Value::Edge(edge) = &arr_val {
1522 if let Some(key) = idx_val.as_str() {
1523 if let Some(val) = edge.properties.get(key) {
1525 return Ok(val.clone());
1526 }
1527 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1529 return Ok(val);
1530 }
1531 return Ok(Value::Null);
1532 } else if !idx_val.is_null() {
1533 return Err(anyhow::anyhow!(
1534 "TypeError: Edge index must be a string, got: {:?}",
1535 idx_val
1536 ));
1537 }
1538 }
1539 if let Ok(vid) = Self::vid_from_value(&arr_val)
1541 && let Some(key) = idx_val.as_str()
1542 {
1543 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1544 {
1545 return Ok(val);
1546 }
1547 return Ok(Value::Null);
1548 }
1549 if arr_val.is_null() {
1550 return Ok(Value::Null);
1551 }
1552 Err(anyhow!(
1553 "TypeError: InvalidArgumentType - cannot index into {:?}",
1554 arr_val
1555 ))
1556 }
1557 Expr::ArraySlice { array, start, end } => {
1558 let arr_val = this
1559 .evaluate_expr(array, row, prop_manager, params, ctx)
1560 .await?;
1561
1562 if let Value::List(arr) = &arr_val {
1563 let len = arr.len();
1564
1565 let start_idx = if let Some(s) = start {
1567 let v = this
1568 .evaluate_expr(s, row, prop_manager, params, ctx)
1569 .await?;
1570 if v.is_null() {
1571 return Ok(Value::Null);
1572 }
1573 let raw = v.as_i64().unwrap_or(0);
1574 if raw < 0 {
1575 (len as i64 + raw).max(0) as usize
1576 } else {
1577 (raw as usize).min(len)
1578 }
1579 } else {
1580 0
1581 };
1582
1583 let end_idx = if let Some(e) = end {
1585 let v = this
1586 .evaluate_expr(e, row, prop_manager, params, ctx)
1587 .await?;
1588 if v.is_null() {
1589 return Ok(Value::Null);
1590 }
1591 let raw = v.as_i64().unwrap_or(len as i64);
1592 if raw < 0 {
1593 (len as i64 + raw).max(0) as usize
1594 } else {
1595 (raw as usize).min(len)
1596 }
1597 } else {
1598 len
1599 };
1600
1601 if start_idx >= end_idx {
1603 return Ok(Value::List(vec![]));
1604 }
1605 let end_idx = end_idx.min(len);
1606 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1607 }
1608
1609 if arr_val.is_null() {
1610 return Ok(Value::Null);
1611 }
1612 Err(anyhow!("Cannot slice {:?}", arr_val))
1613 }
1614 Expr::Literal(lit) => Ok(lit.to_value()),
1615 Expr::List(items) => {
1616 let mut vals = Vec::new();
1617 for item in items {
1618 vals.push(
1619 this.evaluate_expr(item, row, prop_manager, params, ctx)
1620 .await?,
1621 );
1622 }
1623 Ok(Value::List(vals))
1624 }
1625 Expr::Map(items) => {
1626 let mut map = HashMap::new();
1627 for (key, value_expr) in items {
1628 let val = this
1629 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1630 .await?;
1631 map.insert(key.clone(), val);
1632 }
1633 Ok(Value::Map(map))
1634 }
1635 Expr::Exists { query, .. } => {
1636 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1638 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1639
1640 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1641 Ok(plan) => {
1642 let mut sub_params = params.clone();
1643 sub_params.extend(row.clone());
1644
1645 match this.execute(plan, prop_manager, &sub_params).await {
1646 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1647 Err(e) => {
1648 log::debug!("EXISTS subquery execution failed: {}", e);
1649 Ok(Value::Bool(false))
1650 }
1651 }
1652 }
1653 Err(e) => {
1654 log::debug!("EXISTS subquery planning failed: {}", e);
1655 Ok(Value::Bool(false))
1656 }
1657 }
1658 }
1659 Expr::CountSubquery(query) => {
1660 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1662
1663 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1664
1665 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1666 Ok(plan) => {
1667 let mut sub_params = params.clone();
1668 sub_params.extend(row.clone());
1669
1670 match this.execute(plan, prop_manager, &sub_params).await {
1671 Ok(results) => Ok(Value::from(results.len() as i64)),
1672 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1673 }
1674 }
1675 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1676 }
1677 }
1678 Expr::Quantifier {
1679 quantifier,
1680 variable,
1681 list,
1682 predicate,
1683 } => {
1684 let list_val = this
1698 .evaluate_expr(list, row, prop_manager, params, ctx)
1699 .await?;
1700
1701 if list_val.is_null() {
1703 return Ok(Value::Null);
1704 }
1705
1706 let items = match list_val {
1708 Value::List(arr) => arr,
1709 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1710 };
1711
1712 let mut satisfied_count = 0;
1714 for item in &items {
1715 let mut item_row = row.clone();
1717 item_row.insert(variable.clone(), item.clone());
1718
1719 let pred_result = this
1721 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1722 .await?;
1723
1724 if let Value::Bool(true) = pred_result {
1726 satisfied_count += 1;
1727 }
1728 }
1729
1730 let result = match quantifier {
1732 Quantifier::All => satisfied_count == items.len(),
1733 Quantifier::Any => satisfied_count > 0,
1734 Quantifier::Single => satisfied_count == 1,
1735 Quantifier::None => satisfied_count == 0,
1736 };
1737
1738 Ok(Value::Bool(result))
1739 }
1740 Expr::ListComprehension {
1741 variable,
1742 list,
1743 where_clause,
1744 map_expr,
1745 } => {
1746 let list_val = this
1753 .evaluate_expr(list, row, prop_manager, params, ctx)
1754 .await?;
1755
1756 if list_val.is_null() {
1758 return Ok(Value::Null);
1759 }
1760
1761 let items = match list_val {
1763 Value::List(arr) => arr,
1764 _ => {
1765 return Err(anyhow!(
1766 "List comprehension expects a list, got: {:?}",
1767 list_val
1768 ));
1769 }
1770 };
1771
1772 let mut results = Vec::new();
1774 for item in &items {
1775 let mut item_row = row.clone();
1777 item_row.insert(variable.clone(), item.clone());
1778
1779 if let Some(predicate) = where_clause {
1781 let pred_result = this
1782 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1783 .await?;
1784
1785 if !matches!(pred_result, Value::Bool(true)) {
1787 continue;
1788 }
1789 }
1790
1791 let mapped_val = this
1793 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1794 .await?;
1795 results.push(mapped_val);
1796 }
1797
1798 Ok(Value::List(results))
1799 }
1800 Expr::BinaryOp { left, op, right } => {
1801 match op {
1803 BinaryOp::And => {
1804 let l_val = this
1805 .evaluate_expr(left, row, prop_manager, params, ctx)
1806 .await?;
1807 if let Some(false) = l_val.as_bool() {
1809 return Ok(Value::Bool(false));
1810 }
1811 let r_val = this
1812 .evaluate_expr(right, row, prop_manager, params, ctx)
1813 .await?;
1814 eval_binary_op(&l_val, op, &r_val)
1815 }
1816 BinaryOp::Or => {
1817 let l_val = this
1818 .evaluate_expr(left, row, prop_manager, params, ctx)
1819 .await?;
1820 if let Some(true) = l_val.as_bool() {
1822 return Ok(Value::Bool(true));
1823 }
1824 let r_val = this
1825 .evaluate_expr(right, row, prop_manager, params, ctx)
1826 .await?;
1827 eval_binary_op(&l_val, op, &r_val)
1828 }
1829 _ => {
1830 let l_val = this
1832 .evaluate_expr(left, row, prop_manager, params, ctx)
1833 .await?;
1834 let r_val = this
1835 .evaluate_expr(right, row, prop_manager, params, ctx)
1836 .await?;
1837 eval_binary_op(&l_val, op, &r_val)
1838 }
1839 }
1840 }
1841 Expr::In { expr, list } => {
1842 let l_val = this
1843 .evaluate_expr(expr, row, prop_manager, params, ctx)
1844 .await?;
1845 let r_val = this
1846 .evaluate_expr(list, row, prop_manager, params, ctx)
1847 .await?;
1848 eval_in_op(&l_val, &r_val)
1849 }
1850 Expr::UnaryOp { op, expr } => {
1851 let val = this
1852 .evaluate_expr(expr, row, prop_manager, params, ctx)
1853 .await?;
1854 match op {
1855 UnaryOp::Not => {
1856 match val.as_bool() {
1858 Some(b) => Ok(Value::Bool(!b)),
1859 None if val.is_null() => Ok(Value::Null),
1860 None => Err(anyhow!(
1861 "InvalidArgumentType: NOT requires a boolean argument"
1862 )),
1863 }
1864 }
1865 UnaryOp::Neg => {
1866 if let Some(i) = val.as_i64() {
1867 Ok(Value::Int(-i))
1868 } else if let Some(f) = val.as_f64() {
1869 Ok(Value::Float(-f))
1870 } else {
1871 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1872 }
1873 }
1874 }
1875 }
1876 Expr::IsNull(expr) => {
1877 let val = this
1878 .evaluate_expr(expr, row, prop_manager, params, ctx)
1879 .await?;
1880 Ok(Value::Bool(val.is_null()))
1881 }
1882 Expr::IsNotNull(expr) => {
1883 let val = this
1884 .evaluate_expr(expr, row, prop_manager, params, ctx)
1885 .await?;
1886 Ok(Value::Bool(!val.is_null()))
1887 }
1888 Expr::IsUnique(_) => {
1889 Err(anyhow!(
1891 "IS UNIQUE can only be used in constraint definitions"
1892 ))
1893 }
1894 Expr::Case {
1895 expr,
1896 when_then,
1897 else_expr,
1898 } => {
1899 if let Some(base_expr) = expr {
1900 let base_val = this
1901 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1902 .await?;
1903 for (w, t) in when_then {
1904 let w_val = this
1905 .evaluate_expr(w, row, prop_manager, params, ctx)
1906 .await?;
1907 if base_val == w_val {
1908 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1909 }
1910 }
1911 } else {
1912 for (w, t) in when_then {
1913 let w_val = this
1914 .evaluate_expr(w, row, prop_manager, params, ctx)
1915 .await?;
1916 if w_val.as_bool() == Some(true) {
1917 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1918 }
1919 }
1920 }
1921 if let Some(e) = else_expr {
1922 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1923 }
1924 Ok(Value::Null)
1925 }
1926 Expr::Wildcard => Ok(Value::Null),
1927 Expr::FunctionCall { name, args, .. } => {
1928 if name.eq_ignore_ascii_case("ID") {
1930 if args.len() != 1 {
1931 return Err(anyhow!("id() requires exactly 1 argument"));
1932 }
1933 let val = this
1934 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1935 .await?;
1936 if let Value::Map(map) = &val {
1937 if let Some(vid_val) = map.get("_vid") {
1939 return Ok(vid_val.clone());
1940 }
1941 if let Some(eid_val) = map.get("_eid") {
1943 return Ok(eid_val.clone());
1944 }
1945 if let Some(id_val) = map.get("_id") {
1947 return Ok(id_val.clone());
1948 }
1949 }
1950 return Ok(Value::Null);
1951 }
1952
1953 if name.eq_ignore_ascii_case("ELEMENTID") {
1955 if args.len() != 1 {
1956 return Err(anyhow!("elementId() requires exactly 1 argument"));
1957 }
1958 let val = this
1959 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1960 .await?;
1961 if let Value::Map(map) = &val {
1962 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1965 return Ok(Value::String(vid_val.to_string()));
1966 }
1967 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1970 return Ok(Value::String(eid_val.to_string()));
1971 }
1972 }
1973 return Ok(Value::Null);
1974 }
1975
1976 if name.eq_ignore_ascii_case("TYPE") {
1978 if args.len() != 1 {
1979 return Err(anyhow!("type() requires exactly 1 argument"));
1980 }
1981 let val = this
1982 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1983 .await?;
1984 if let Value::Map(map) = &val
1985 && let Some(type_val) = map.get("_type")
1986 {
1987 if let Some(type_id) =
1989 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1990 {
1991 if let Some(name) = this
1992 .storage
1993 .schema_manager()
1994 .edge_type_name_by_id_unified(type_id)
1995 {
1996 return Ok(Value::String(name));
1997 }
1998 } else if let Some(name) = type_val.as_str() {
1999 return Ok(Value::String(name.to_string()));
2000 }
2001 }
2002 return Ok(Value::Null);
2003 }
2004
2005 if name.eq_ignore_ascii_case("LABELS") {
2007 if args.len() != 1 {
2008 return Err(anyhow!("labels() requires exactly 1 argument"));
2009 }
2010 let val = this
2011 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2012 .await?;
2013 if let Value::Map(map) = &val
2014 && let Some(labels_val) = map.get("_labels")
2015 {
2016 return Ok(labels_val.clone());
2017 }
2018 return Ok(Value::Null);
2019 }
2020
2021 if name.eq_ignore_ascii_case("PROPERTIES") {
2023 if args.len() != 1 {
2024 return Err(anyhow!("properties() requires exactly 1 argument"));
2025 }
2026 let val = this
2027 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2028 .await?;
2029 if let Value::Map(map) = &val {
2030 let mut props = HashMap::new();
2032 for (k, v) in map.iter() {
2033 if !k.starts_with('_') {
2034 props.insert(k.clone(), v.clone());
2035 }
2036 }
2037 return Ok(Value::Map(props));
2038 }
2039 return Ok(Value::Null);
2040 }
2041
2042 if name.eq_ignore_ascii_case("STARTNODE") {
2044 if args.len() != 1 {
2045 return Err(anyhow!("startNode() requires exactly 1 argument"));
2046 }
2047 let val = this
2048 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2049 .await?;
2050 if let Value::Edge(edge) = &val {
2051 return Ok(Self::find_node_by_vid(row, edge.src));
2052 }
2053 if let Value::Map(map) = &val {
2054 if let Some(start_node) = map.get("_startNode") {
2055 return Ok(start_node.clone());
2056 }
2057 if let Some(src_vid) = map.get("_src_vid") {
2058 return Ok(Value::Map(HashMap::from([(
2059 "_vid".to_string(),
2060 src_vid.clone(),
2061 )])));
2062 }
2063 if let Some(src_id) = map.get("_src")
2065 && let Some(u) = src_id.as_u64()
2066 {
2067 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2068 }
2069 }
2070 return Ok(Value::Null);
2071 }
2072
2073 if name.eq_ignore_ascii_case("ENDNODE") {
2075 if args.len() != 1 {
2076 return Err(anyhow!("endNode() requires exactly 1 argument"));
2077 }
2078 let val = this
2079 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2080 .await?;
2081 if let Value::Edge(edge) = &val {
2082 return Ok(Self::find_node_by_vid(row, edge.dst));
2083 }
2084 if let Value::Map(map) = &val {
2085 if let Some(end_node) = map.get("_endNode") {
2086 return Ok(end_node.clone());
2087 }
2088 if let Some(dst_vid) = map.get("_dst_vid") {
2089 return Ok(Value::Map(HashMap::from([(
2090 "_vid".to_string(),
2091 dst_vid.clone(),
2092 )])));
2093 }
2094 if let Some(dst_id) = map.get("_dst")
2096 && let Some(u) = dst_id.as_u64()
2097 {
2098 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
2099 }
2100 }
2101 return Ok(Value::Null);
2102 }
2103
2104 if name.eq_ignore_ascii_case("HASLABEL") {
2107 if args.len() != 2 {
2108 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
2109 }
2110 let node_val = this
2111 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2112 .await?;
2113 let label_val = this
2114 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2115 .await?;
2116
2117 let label_to_check = label_val.as_str().ok_or_else(|| {
2118 anyhow!("Second argument to hasLabel must be a string")
2119 })?;
2120
2121 let has_label = match &node_val {
2122 Value::Map(map) if map.contains_key("_vid") => {
2124 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2125 labels_arr
2126 .iter()
2127 .any(|l| l.as_str() == Some(label_to_check))
2128 } else {
2129 false
2130 }
2131 }
2132 Value::Map(map) => {
2134 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2135 labels_arr
2136 .iter()
2137 .any(|l| l.as_str() == Some(label_to_check))
2138 } else {
2139 false
2140 }
2141 }
2142 _ => false,
2143 };
2144 return Ok(Value::Bool(has_label));
2145 }
2146
2147 if matches!(
2150 name.to_uppercase().as_str(),
2151 "ANY" | "ALL" | "NONE" | "SINGLE"
2152 ) {
2153 return Err(anyhow!(
2154 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
2155 name.to_lowercase()
2156 ));
2157 }
2158
2159 if name.eq_ignore_ascii_case("COALESCE") {
2161 for arg in args {
2162 let val = this
2163 .evaluate_expr(arg, row, prop_manager, params, ctx)
2164 .await?;
2165 if !val.is_null() {
2166 return Ok(val);
2167 }
2168 }
2169 return Ok(Value::Null);
2170 }
2171
2172 if name.eq_ignore_ascii_case("vector_similarity") {
2174 if args.len() != 2 {
2175 return Err(anyhow!("vector_similarity takes 2 arguments"));
2176 }
2177 let v1 = this
2178 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2179 .await?;
2180 let v2 = this
2181 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2182 .await?;
2183 return eval_vector_similarity(&v1, &v2);
2184 }
2185
2186 if name.eq_ignore_ascii_case("uni.temporal.validAt")
2188 || name.eq_ignore_ascii_case("uni.validAt")
2189 || name.eq_ignore_ascii_case("validAt")
2190 {
2191 if args.len() != 4 {
2192 return Err(anyhow!("validAt requires 4 arguments"));
2193 }
2194 let node_val = this
2195 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2196 .await?;
2197 let start_prop = this
2198 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2199 .await?
2200 .as_str()
2201 .ok_or(anyhow!("start_prop must be string"))?
2202 .to_string();
2203 let end_prop = this
2204 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2205 .await?
2206 .as_str()
2207 .ok_or(anyhow!("end_prop must be string"))?
2208 .to_string();
2209 let time_val = this
2210 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2211 .await?;
2212
2213 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2214 anyhow!("time argument must be a datetime value or string")
2215 })?;
2216
2217 let valid_from_val: Option<Value> = if let Ok(vid) =
2219 Self::vid_from_value(&node_val)
2220 {
2221 prop_manager
2223 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2224 .await
2225 .ok()
2226 } else if let Value::Map(map) = &node_val {
2227 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2229 let vid = Vid::from(vid_val);
2230 prop_manager
2231 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2232 .await
2233 .ok()
2234 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2235 let eid = uni_common::core::id::Eid::from(eid_val);
2237 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2238 } else {
2239 map.get(&start_prop).cloned()
2241 }
2242 } else {
2243 return Ok(Value::Bool(false));
2244 };
2245
2246 let valid_from = match valid_from_val {
2247 Some(ref v) => match value_to_datetime_utc(v) {
2248 Some(dt) => dt,
2249 None if v.is_null() => return Ok(Value::Bool(false)),
2250 None => {
2251 return Err(anyhow!(
2252 "Property {} must be a datetime value or string",
2253 start_prop
2254 ));
2255 }
2256 },
2257 None => return Ok(Value::Bool(false)),
2258 };
2259
2260 let valid_to_val: Option<Value> = if let Ok(vid) =
2261 Self::vid_from_value(&node_val)
2262 {
2263 prop_manager
2265 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2266 .await
2267 .ok()
2268 } else if let Value::Map(map) = &node_val {
2269 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2271 let vid = Vid::from(vid_val);
2272 prop_manager
2273 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2274 .await
2275 .ok()
2276 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2277 let eid = uni_common::core::id::Eid::from(eid_val);
2279 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2280 } else {
2281 map.get(&end_prop).cloned()
2283 }
2284 } else {
2285 return Ok(Value::Bool(false));
2286 };
2287
2288 let valid_to = match valid_to_val {
2289 Some(ref v) => match value_to_datetime_utc(v) {
2290 Some(dt) => Some(dt),
2291 None if v.is_null() => None,
2292 None => {
2293 return Err(anyhow!(
2294 "Property {} must be a datetime value or null",
2295 end_prop
2296 ));
2297 }
2298 },
2299 None => None,
2300 };
2301
2302 let is_valid = valid_from <= query_time
2303 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2304 return Ok(Value::Bool(is_valid));
2305 }
2306
2307 let mut evaluated_args = Vec::with_capacity(args.len());
2309 for arg in args {
2310 let mut val = this
2311 .evaluate_expr(arg, row, prop_manager, params, ctx)
2312 .await?;
2313
2314 if let Value::Map(ref mut map) = val {
2317 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2318 }
2319
2320 evaluated_args.push(val);
2321 }
2322 eval_scalar_function(
2323 name,
2324 &evaluated_args,
2325 self.custom_function_registry.as_deref(),
2326 )
2327 }
2328 Expr::Reduce {
2329 accumulator,
2330 init,
2331 variable,
2332 list,
2333 expr,
2334 } => {
2335 let mut acc = self
2336 .evaluate_expr(init, row, prop_manager, params, ctx)
2337 .await?;
2338 let list_val = self
2339 .evaluate_expr(list, row, prop_manager, params, ctx)
2340 .await?;
2341
2342 if let Value::List(items) = list_val {
2343 for item in items {
2344 let mut scope = row.clone();
2348 scope.insert(accumulator.clone(), acc.clone());
2349 scope.insert(variable.clone(), item);
2350
2351 acc = self
2352 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2353 .await?;
2354 }
2355 } else {
2356 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2357 }
2358 Ok(acc)
2359 }
2360 Expr::ValidAt { .. } => {
2361 Err(anyhow!(
2363 "VALID_AT expression should have been transformed to function call in planner"
2364 ))
2365 }
2366
2367 Expr::LabelCheck { expr, labels } => {
2368 let val = this
2369 .evaluate_expr(expr, row, prop_manager, params, ctx)
2370 .await?;
2371 match &val {
2372 Value::Null => Ok(Value::Null),
2373 Value::Map(map) => {
2374 let is_edge = map.contains_key("_eid")
2376 || map.contains_key("_type_name")
2377 || (map.contains_key("_type") && !map.contains_key("_vid"));
2378
2379 if is_edge {
2380 if labels.len() > 1 {
2382 return Ok(Value::Bool(false));
2383 }
2384 let label_to_check = &labels[0];
2385 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2386 {
2387 t == label_to_check
2388 } else if let Some(Value::String(t)) = map.get("_type") {
2389 t == label_to_check
2390 } else {
2391 false
2392 };
2393 Ok(Value::Bool(has_type))
2394 } else {
2395 let has_all = labels.iter().all(|label_to_check| {
2397 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2398 labels_arr
2399 .iter()
2400 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2401 } else {
2402 false
2403 }
2404 });
2405 Ok(Value::Bool(has_all))
2406 }
2407 }
2408 _ => Ok(Value::Bool(false)),
2409 }
2410 }
2411
2412 Expr::MapProjection { base, items } => {
2413 let base_value = this
2414 .evaluate_expr(base, row, prop_manager, params, ctx)
2415 .await?;
2416
2417 let properties = match &base_value {
2419 Value::Map(map) => map,
2420 _ => {
2421 return Err(anyhow!(
2422 "Map projection requires object, got {:?}",
2423 base_value
2424 ));
2425 }
2426 };
2427
2428 let mut result_map = HashMap::new();
2429
2430 for item in items {
2431 match item {
2432 MapProjectionItem::Property(prop) => {
2433 if let Some(value) = properties.get(prop.as_str()) {
2434 result_map.insert(prop.clone(), value.clone());
2435 }
2436 }
2437 MapProjectionItem::AllProperties => {
2438 for (key, value) in properties.iter() {
2440 if !key.starts_with('_') {
2441 result_map.insert(key.clone(), value.clone());
2442 }
2443 }
2444 }
2445 MapProjectionItem::LiteralEntry(key, expr) => {
2446 let value = this
2447 .evaluate_expr(expr, row, prop_manager, params, ctx)
2448 .await?;
2449 result_map.insert(key.clone(), value);
2450 }
2451 MapProjectionItem::Variable(var_name) => {
2452 if let Some(value) = row.get(var_name.as_str()) {
2455 result_map.insert(var_name.clone(), value.clone());
2456 }
2457 }
2458 }
2459 }
2460
2461 Ok(Value::Map(result_map))
2462 }
2463 }
2464 })
2465 }
2466
2467 pub(crate) fn execute_subplan<'a>(
2468 &'a self,
2469 plan: LogicalPlan,
2470 prop_manager: &'a PropertyManager,
2471 params: &'a HashMap<String, Value>,
2472 ctx: Option<&'a QueryContext>,
2473 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2474 Box::pin(async move {
2475 if let Some(ctx) = ctx {
2476 ctx.check_timeout()?;
2477 }
2478 match plan {
2479 LogicalPlan::Union { left, right, all } => {
2480 self.execute_union(left, right, all, prop_manager, params, ctx)
2481 .await
2482 }
2483 LogicalPlan::CreateVectorIndex {
2484 config,
2485 if_not_exists,
2486 } => {
2487 if if_not_exists && self.index_exists_by_name(&config.name) {
2488 return Ok(vec![]);
2489 }
2490 let idx_mgr = self.storage.index_manager();
2491 idx_mgr.create_vector_index(config).await?;
2492 Ok(vec![])
2493 }
2494 LogicalPlan::CreateSparseIndex {
2495 config,
2496 if_not_exists,
2497 } => {
2498 if if_not_exists && self.index_exists_by_name(&config.name) {
2499 return Ok(vec![]);
2500 }
2501 let idx_mgr = self.storage.index_manager();
2505 idx_mgr.create_sparse_vector_index(config).await?;
2506 Ok(vec![])
2507 }
2508 LogicalPlan::CreateFullTextIndex {
2509 config,
2510 if_not_exists,
2511 } => {
2512 if if_not_exists && self.index_exists_by_name(&config.name) {
2513 return Ok(vec![]);
2514 }
2515 let idx_mgr = self.storage.index_manager();
2516 idx_mgr.create_fts_index(config).await?;
2517 Ok(vec![])
2518 }
2519 LogicalPlan::CreateScalarIndex {
2520 mut config,
2521 if_not_exists,
2522 } => {
2523 if if_not_exists && self.index_exists_by_name(&config.name) {
2524 return Ok(vec![]);
2525 }
2526
2527 let mut modified_properties = Vec::new();
2529
2530 for prop in &config.properties {
2531 if prop.contains('(') && prop.contains(')') {
2533 let gen_col = SchemaManager::generated_column_name(prop);
2534
2535 let sm = self.storage.schema_manager_arc();
2537 if let Err(e) = sm.add_generated_property(
2538 &config.label,
2539 &gen_col,
2540 DataType::String, prop.clone(),
2542 ) {
2543 log::warn!("Failed to add generated property (might exist): {}", e);
2544 }
2545
2546 modified_properties.push(gen_col);
2547 } else {
2548 modified_properties.push(prop.clone());
2550 }
2551 }
2552
2553 config.properties = modified_properties;
2554
2555 let idx_mgr = self.storage.index_manager();
2556 idx_mgr.create_scalar_index(config).await?;
2557 Ok(vec![])
2558 }
2559 LogicalPlan::CreateJsonFtsIndex {
2560 config,
2561 if_not_exists,
2562 } => {
2563 if if_not_exists && self.index_exists_by_name(&config.name) {
2564 return Ok(vec![]);
2565 }
2566 let idx_mgr = self.storage.index_manager();
2567 idx_mgr.create_json_fts_index(config).await?;
2568 Ok(vec![])
2569 }
2570 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2571 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2572 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2573 LogicalPlan::Vacuum => {
2574 self.execute_vacuum().await?;
2575 Ok(vec![])
2576 }
2577 LogicalPlan::Checkpoint => {
2578 self.execute_checkpoint().await?;
2579 Ok(vec![])
2580 }
2581 LogicalPlan::CopyTo {
2582 label,
2583 path,
2584 format,
2585 options,
2586 } => {
2587 let count = self
2588 .execute_copy_to(&label, &path, &format, &options)
2589 .await?;
2590 let mut result = HashMap::new();
2591 result.insert("count".to_string(), Value::Int(count as i64));
2592 Ok(vec![result])
2593 }
2594 LogicalPlan::CopyFrom {
2595 label,
2596 path,
2597 format,
2598 options,
2599 } => {
2600 let count = self
2601 .execute_copy_from(&label, &path, &format, &options)
2602 .await?;
2603 let mut result = HashMap::new();
2604 result.insert("count".to_string(), Value::Int(count as i64));
2605 Ok(vec![result])
2606 }
2607 LogicalPlan::CreateLabel(clause) => {
2608 self.execute_create_label(clause).await?;
2609 Ok(vec![])
2610 }
2611 LogicalPlan::CreateEdgeType(clause) => {
2612 self.execute_create_edge_type(clause).await?;
2613 Ok(vec![])
2614 }
2615 LogicalPlan::AlterLabel(clause) => {
2616 self.execute_alter_label(clause).await?;
2617 Ok(vec![])
2618 }
2619 LogicalPlan::AlterEdgeType(clause) => {
2620 self.execute_alter_edge_type(clause).await?;
2621 Ok(vec![])
2622 }
2623 LogicalPlan::DropLabel(clause) => {
2624 self.execute_drop_label(clause).await?;
2625 Ok(vec![])
2626 }
2627 LogicalPlan::DropEdgeType(clause) => {
2628 self.execute_drop_edge_type(clause).await?;
2629 Ok(vec![])
2630 }
2631 LogicalPlan::CreateConstraint(clause) => {
2632 self.execute_create_constraint(clause).await?;
2633 Ok(vec![])
2634 }
2635 LogicalPlan::DropConstraint(clause) => {
2636 self.execute_drop_constraint(clause).await?;
2637 Ok(vec![])
2638 }
2639 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2640 LogicalPlan::DropIndex { name, if_exists } => {
2641 let idx_mgr = self.storage.index_manager();
2642 match idx_mgr.drop_index(&name).await {
2643 Ok(_) => Ok(vec![]),
2644 Err(e) => {
2645 if if_exists && e.to_string().contains("not found") {
2646 Ok(vec![])
2647 } else {
2648 Err(e)
2649 }
2650 }
2651 }
2652 }
2653 LogicalPlan::ShowIndexes { filter } => {
2654 Ok(self.execute_show_indexes(filter.as_deref()))
2655 }
2656 LogicalPlan::Scan { .. }
2659 | LogicalPlan::FusedIndexScan { .. }
2660 | LogicalPlan::FusedIndexScanWrapped { .. }
2661 | LogicalPlan::ExtIdLookup { .. }
2662 | LogicalPlan::ScanAll { .. }
2663 | LogicalPlan::ScanMainByLabels { .. }
2664 | LogicalPlan::Traverse { .. }
2665 | LogicalPlan::TraverseMainByType { .. } => {
2666 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2667 self.record_batches_to_rows(batches)
2668 }
2669 LogicalPlan::Filter {
2670 input,
2671 predicate,
2672 optional_variables,
2673 } => {
2674 let input_matches = self
2675 .execute_subplan(*input, prop_manager, params, ctx)
2676 .await?;
2677
2678 tracing::debug!(
2679 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2680 predicate,
2681 input_matches.len(),
2682 optional_variables
2683 );
2684
2685 if !optional_variables.is_empty() {
2689 let is_optional_key = |k: &str| -> bool {
2692 optional_variables.contains(k)
2693 || optional_variables
2694 .iter()
2695 .any(|var| k.starts_with(&format!("{}.", var)))
2696 };
2697
2698 let is_internal_key =
2700 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2701
2702 let non_optional_vars: Vec<String> = input_matches
2704 .first()
2705 .map(|row| {
2706 row.keys()
2707 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2708 .cloned()
2709 .collect()
2710 })
2711 .unwrap_or_default();
2712
2713 let mut groups: std::collections::HashMap<
2715 Vec<u8>,
2716 Vec<HashMap<String, Value>>,
2717 > = std::collections::HashMap::new();
2718
2719 for row in &input_matches {
2720 let key: Vec<u8> = non_optional_vars
2722 .iter()
2723 .map(|var| {
2724 row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2725 })
2726 .collect::<Vec<_>>()
2727 .join("|")
2728 .into_bytes();
2729
2730 groups.entry(key).or_default().push(row.clone());
2731 }
2732
2733 let mut filtered = Vec::new();
2734 for (_key, group_rows) in groups {
2735 let mut group_passed = Vec::new();
2736
2737 for row in &group_rows {
2738 let has_null_optional = optional_variables.iter().any(|var| {
2740 let direct_null =
2742 matches!(row.get(var), Some(Value::Null) | None);
2743 let prefixed_null = row
2744 .keys()
2745 .filter(|k| k.starts_with(&format!("{}.", var)))
2746 .any(|k| matches!(row.get(k), Some(Value::Null)));
2747 direct_null || prefixed_null
2748 });
2749
2750 if has_null_optional {
2751 group_passed.push(row.clone());
2752 continue;
2753 }
2754
2755 let res = self
2756 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2757 .await?;
2758
2759 if res.as_bool().unwrap_or(false) {
2760 group_passed.push(row.clone());
2761 }
2762 }
2763
2764 if group_passed.is_empty() {
2765 if let Some(template) = group_rows.first() {
2768 let mut null_row = HashMap::new();
2769 for (k, v) in template {
2770 if is_optional_key(k) {
2771 null_row.insert(k.clone(), Value::Null);
2772 } else {
2773 null_row.insert(k.clone(), v.clone());
2774 }
2775 }
2776 filtered.push(null_row);
2777 }
2778 } else {
2779 filtered.extend(group_passed);
2780 }
2781 }
2782
2783 tracing::debug!(
2784 "Filter (OPTIONAL): {} input rows -> {} output rows",
2785 input_matches.len(),
2786 filtered.len()
2787 );
2788
2789 return Ok(filtered);
2790 }
2791
2792 let mut filtered = Vec::new();
2794 for row in input_matches.iter() {
2795 let res = self
2796 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2797 .await?;
2798
2799 let passes = res.as_bool().unwrap_or(false);
2800
2801 if passes {
2802 filtered.push(row.clone());
2803 }
2804 }
2805
2806 tracing::debug!(
2807 "Filter: {} input rows -> {} output rows",
2808 input_matches.len(),
2809 filtered.len()
2810 );
2811
2812 Ok(filtered)
2813 }
2814 LogicalPlan::ProcedureCall {
2815 procedure_name,
2816 arguments,
2817 yield_items,
2818 } => {
2819 let yield_names: Vec<String> =
2820 yield_items.iter().map(|(n, _)| n.clone()).collect();
2821 let results = self
2822 .execute_procedure(
2823 &procedure_name,
2824 &arguments,
2825 &yield_names,
2826 prop_manager,
2827 params,
2828 ctx,
2829 )
2830 .await?;
2831
2832 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2838 if !has_aliases {
2839 Ok(results)
2842 } else {
2843 let mut aliased_results = Vec::with_capacity(results.len());
2844 for row in results {
2845 let mut new_row = HashMap::new();
2846 for (name, alias) in &yield_items {
2847 let col_name = alias.as_ref().unwrap_or(name);
2848 let val = row.get(name).cloned().unwrap_or(Value::Null);
2849 new_row.insert(col_name.clone(), val);
2850 }
2851 aliased_results.push(new_row);
2852 }
2853 Ok(aliased_results)
2854 }
2855 }
2856 LogicalPlan::VectorKnn { .. } => {
2857 unreachable!("VectorKnn is handled by DataFusion engine")
2858 }
2859 LogicalPlan::InvertedIndexLookup { .. } => {
2860 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2861 }
2862 LogicalPlan::Sort { input, order_by } => {
2863 let rows = self
2864 .execute_subplan(*input, prop_manager, params, ctx)
2865 .await?;
2866 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2867 .await
2868 }
2869 LogicalPlan::Limit { input, skip, fetch } => {
2870 let rows = self
2871 .execute_subplan(*input, prop_manager, params, ctx)
2872 .await?;
2873 let skip = skip.unwrap_or(0);
2874 let take = fetch.unwrap_or(usize::MAX);
2875 Ok(rows.into_iter().skip(skip).take(take).collect())
2876 }
2877 LogicalPlan::Aggregate {
2878 input,
2879 group_by,
2880 aggregates,
2881 } => {
2882 let rows = self
2883 .execute_subplan(*input, prop_manager, params, ctx)
2884 .await?;
2885 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2886 .await
2887 }
2888 LogicalPlan::Window {
2889 input,
2890 window_exprs,
2891 } => {
2892 let rows = self
2893 .execute_subplan(*input, prop_manager, params, ctx)
2894 .await?;
2895 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2896 .await
2897 }
2898 LogicalPlan::Project { input, projections } => {
2899 let matches = self
2900 .execute_subplan(*input, prop_manager, params, ctx)
2901 .await?;
2902 self.execute_project(matches, &projections, prop_manager, params, ctx)
2903 .await
2904 }
2905 LogicalPlan::Distinct { input } => {
2906 let rows = self
2907 .execute_subplan(*input, prop_manager, params, ctx)
2908 .await?;
2909 let mut seen = std::collections::HashSet::new();
2910 let mut result = Vec::new();
2911 for row in rows {
2912 let key = Self::canonical_row_key(&row);
2913 if seen.insert(key) {
2914 result.push(row);
2915 }
2916 }
2917 Ok(result)
2918 }
2919 LogicalPlan::Unwind {
2920 input,
2921 expr,
2922 variable,
2923 } => {
2924 let input_rows = self
2925 .execute_subplan(*input, prop_manager, params, ctx)
2926 .await?;
2927 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2928 .await
2929 }
2930 LogicalPlan::Apply {
2931 input,
2932 subquery,
2933 input_filter,
2934 } => {
2935 let input_rows = self
2936 .execute_subplan(*input, prop_manager, params, ctx)
2937 .await?;
2938 self.execute_apply(
2939 input_rows,
2940 &subquery,
2941 input_filter.as_ref(),
2942 prop_manager,
2943 params,
2944 ctx,
2945 )
2946 .await
2947 }
2948 LogicalPlan::SubqueryCall { input, subquery } => {
2949 let input_rows = self
2950 .execute_subplan(*input, prop_manager, params, ctx)
2951 .await?;
2952 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2955 .await
2956 }
2957 LogicalPlan::RecursiveCTE {
2958 cte_name,
2959 initial,
2960 recursive,
2961 } => {
2962 self.execute_recursive_cte(
2963 &cte_name,
2964 *initial,
2965 *recursive,
2966 prop_manager,
2967 params,
2968 ctx,
2969 )
2970 .await
2971 }
2972 LogicalPlan::CrossJoin { left, right } => {
2973 self.execute_cross_join(left, right, prop_manager, params, ctx)
2974 .await
2975 }
2976 LogicalPlan::Set { .. }
2977 | LogicalPlan::Remove { .. }
2978 | LogicalPlan::Merge { .. }
2979 | LogicalPlan::Create { .. }
2980 | LogicalPlan::CreateBatch { .. } => {
2981 unreachable!("mutations are handled by DataFusion engine")
2982 }
2983 LogicalPlan::Delete { .. } => {
2984 unreachable!("mutations are handled by DataFusion engine")
2985 }
2986 LogicalPlan::Copy {
2987 target,
2988 source,
2989 is_export,
2990 options,
2991 } => {
2992 if is_export {
2993 self.execute_export(&target, &source, &options, prop_manager, ctx)
2994 .await
2995 } else {
2996 self.execute_copy(&target, &source, &options, prop_manager)
2997 .await
2998 }
2999 }
3000 LogicalPlan::Backup {
3001 destination,
3002 options,
3003 } => self.execute_backup(&destination, &options).await,
3004 LogicalPlan::Explain { plan } => {
3005 let plan_str = format!("{:#?}", plan);
3006 let mut row = HashMap::new();
3007 row.insert("plan".to_string(), Value::String(plan_str));
3008 Ok(vec![row])
3009 }
3010 LogicalPlan::ShortestPath { .. } => {
3011 unreachable!("ShortestPath is handled by DataFusion engine")
3012 }
3013 LogicalPlan::AllShortestPaths { .. } => {
3014 unreachable!("AllShortestPaths is handled by DataFusion engine")
3015 }
3016 LogicalPlan::Foreach { .. } => {
3017 unreachable!("mutations are handled by DataFusion engine")
3018 }
3019 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
3020 LogicalPlan::BindZeroLengthPath { .. } => {
3021 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
3022 }
3023 LogicalPlan::BindPath { .. } => {
3024 unreachable!("BindPath is handled by DataFusion engine")
3025 }
3026 LogicalPlan::QuantifiedPattern { .. } => {
3027 unreachable!("QuantifiedPattern is handled by DataFusion engine")
3028 }
3029 LogicalPlan::LocyProgram { .. }
3030 | LogicalPlan::LocyFold { .. }
3031 | LogicalPlan::LocyBestBy { .. }
3032 | LogicalPlan::LocyPriority { .. }
3033 | LogicalPlan::LocyDerivedScan { .. }
3034 | LogicalPlan::LocyProject { .. }
3035 | LogicalPlan::LocyModelInvoke { .. } => {
3036 unreachable!("Locy operators are handled by DataFusion engine")
3037 }
3038 }
3039 })
3040 }
3041
3042 #[expect(clippy::too_many_arguments)]
3047 pub(crate) async fn execute_foreach_body_plan(
3048 &self,
3049 plan: LogicalPlan,
3050 scope: &mut HashMap<String, Value>,
3051 writer: &uni_store::runtime::writer::Writer,
3052 prop_manager: &PropertyManager,
3053 params: &HashMap<String, Value>,
3054 ctx: Option<&QueryContext>,
3055 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
3056 ) -> Result<()> {
3057 match plan {
3058 LogicalPlan::Set { items, .. } => {
3059 self.execute_set_items_locked(
3060 &items,
3061 scope,
3062 writer,
3063 prop_manager,
3064 params,
3065 ctx,
3066 tx_l0,
3067 &crate::query::df_graph::mutation_common::Prefetch::default(),
3068 )
3069 .await?;
3070 }
3071 LogicalPlan::Remove { items, .. } => {
3072 self.execute_remove_items_locked(
3073 &items,
3074 scope,
3075 writer,
3076 prop_manager,
3077 ctx,
3078 tx_l0,
3079 &crate::query::df_graph::mutation_common::Prefetch::default(),
3080 )
3081 .await?;
3082 }
3083 LogicalPlan::Delete { items, detach, .. } => {
3084 for expr in &items {
3085 let val = self
3086 .evaluate_expr(expr, scope, prop_manager, params, ctx)
3087 .await?;
3088 self.execute_delete_item_locked(&val, detach, writer, tx_l0)
3089 .await?;
3090 }
3091 }
3092 LogicalPlan::Create { pattern, .. } => {
3093 self.execute_create_pattern(
3094 &pattern,
3095 scope,
3096 writer,
3097 prop_manager,
3098 params,
3099 ctx,
3100 tx_l0,
3101 None,
3102 )
3103 .await?;
3104 }
3105 LogicalPlan::CreateBatch { patterns, .. } => {
3106 for pattern in &patterns {
3107 self.execute_create_pattern(
3108 pattern,
3109 scope,
3110 writer,
3111 prop_manager,
3112 params,
3113 ctx,
3114 tx_l0,
3115 None,
3116 )
3117 .await?;
3118 }
3119 }
3120 LogicalPlan::Merge {
3121 pattern,
3122 on_match: _,
3123 on_create,
3124 ..
3125 } => {
3126 let seed_props = self
3130 .on_create_seed_props(on_create.as_ref(), scope, prop_manager, params, ctx)
3131 .await?;
3132 self.execute_create_pattern(
3133 &pattern,
3134 scope,
3135 writer,
3136 prop_manager,
3137 params,
3138 ctx,
3139 tx_l0,
3140 Some(&seed_props),
3141 )
3142 .await?;
3143 if let Some(on_create_clause) = on_create {
3144 self.execute_set_items_locked(
3145 &on_create_clause.items,
3146 scope,
3147 writer,
3148 prop_manager,
3149 params,
3150 ctx,
3151 tx_l0,
3152 &crate::query::df_graph::mutation_common::Prefetch::default(),
3153 )
3154 .await?;
3155 }
3156 }
3157 LogicalPlan::Foreach {
3158 variable,
3159 list,
3160 body,
3161 ..
3162 } => {
3163 let list_val = self
3164 .evaluate_expr(&list, scope, prop_manager, params, ctx)
3165 .await?;
3166 let items = match list_val {
3167 Value::List(arr) => arr,
3168 Value::Null => return Ok(()),
3169 _ => return Err(anyhow!("FOREACH requires a list")),
3170 };
3171 for item in items {
3172 let mut nested_scope = scope.clone();
3173 nested_scope.insert(variable.clone(), item);
3174 for nested_plan in &body {
3175 Box::pin(self.execute_foreach_body_plan(
3176 nested_plan.clone(),
3177 &mut nested_scope,
3178 writer,
3179 prop_manager,
3180 params,
3181 ctx,
3182 tx_l0,
3183 ))
3184 .await?;
3185 }
3186 }
3187 }
3188 _ => {
3189 return Err(anyhow!(
3190 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
3191 ));
3192 }
3193 }
3194 Ok(())
3195 }
3196
3197 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3198 let mut pairs: Vec<_> = row.iter().collect();
3199 pairs.sort_by_key(|(k, _)| *k);
3200
3201 pairs
3202 .into_iter()
3203 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3204 .collect::<Vec<_>>()
3205 .join("|")
3206 }
3207
3208 fn canonical_value_key(v: &Value) -> String {
3209 match v {
3210 Value::Null => "null".to_string(),
3211 Value::Bool(b) => format!("b:{b}"),
3212 Value::Int(i) => format!("n:{i}"),
3213 Value::Float(f) => {
3214 if f.is_nan() {
3215 "nan".to_string()
3216 } else if f.is_infinite() {
3217 if f.is_sign_positive() {
3218 "inf:+".to_string()
3219 } else {
3220 "inf:-".to_string()
3221 }
3222 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3223 format!("n:{}", *f as i64)
3224 } else {
3225 format!("f:{f}")
3226 }
3227 }
3228 Value::String(s) => {
3229 if let Some(k) = Self::temporal_string_key(s) {
3230 format!("temporal:{k}")
3231 } else {
3232 format!("s:{s}")
3233 }
3234 }
3235 Value::Bytes(b) => format!("bytes:{:?}", b),
3236 Value::List(items) => format!(
3237 "list:[{}]",
3238 items
3239 .iter()
3240 .map(Self::canonical_value_key)
3241 .collect::<Vec<_>>()
3242 .join(",")
3243 ),
3244 Value::Map(map) => {
3245 let mut pairs: Vec<_> = map.iter().collect();
3246 pairs.sort_by_key(|(k, _)| *k);
3247 format!(
3248 "map:{{{}}}",
3249 pairs
3250 .into_iter()
3251 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3252 .collect::<Vec<_>>()
3253 .join(",")
3254 )
3255 }
3256 Value::Node(n) => {
3257 let mut labels = n.labels.clone();
3258 labels.sort();
3259 format!(
3260 "node:{}:{}:{}",
3261 n.vid.as_u64(),
3262 labels.join(":"),
3263 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3264 )
3265 }
3266 Value::Edge(e) => format!(
3267 "edge:{}:{}:{}:{}:{}",
3268 e.eid.as_u64(),
3269 e.edge_type,
3270 e.src.as_u64(),
3271 e.dst.as_u64(),
3272 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3273 ),
3274 Value::Path(p) => format!(
3275 "path:nodes=[{}];edges=[{}]",
3276 p.nodes
3277 .iter()
3278 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3279 .collect::<Vec<_>>()
3280 .join(","),
3281 p.edges
3282 .iter()
3283 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3284 .collect::<Vec<_>>()
3285 .join(",")
3286 ),
3287 Value::Vector(vs) => format!("vec:{:?}", vs),
3288 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3289 _ => format!("{v:?}"),
3290 }
3291 }
3292
3293 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3294 match t {
3295 uni_common::TemporalValue::Date { days_since_epoch } => {
3296 format!("date:{days_since_epoch}")
3297 }
3298 uni_common::TemporalValue::LocalTime {
3299 nanos_since_midnight,
3300 } => format!("localtime:{nanos_since_midnight}"),
3301 uni_common::TemporalValue::Time {
3302 nanos_since_midnight,
3303 offset_seconds,
3304 } => {
3305 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3306 format!("time:{utc_nanos}")
3307 }
3308 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3309 format!("localdatetime:{nanos_since_epoch}")
3310 }
3311 uni_common::TemporalValue::DateTime {
3312 nanos_since_epoch, ..
3313 } => format!("datetime:{nanos_since_epoch}"),
3314 uni_common::TemporalValue::Duration {
3315 months,
3316 days,
3317 nanos,
3318 } => format!("duration:{months}:{days}:{nanos}"),
3319 uni_common::TemporalValue::Btic { lo, hi, meta } => {
3320 format!("btic:{lo}:{hi}:{meta}")
3321 }
3322 }
3323 }
3324
3325 fn temporal_string_key(s: &str) -> Option<String> {
3326 let fn_name = match classify_temporal(s)? {
3327 uni_common::TemporalType::Date => "DATE",
3328 uni_common::TemporalType::LocalTime => "LOCALTIME",
3329 uni_common::TemporalType::Time => "TIME",
3330 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3331 uni_common::TemporalType::DateTime => "DATETIME",
3332 uni_common::TemporalType::Duration => "DURATION",
3333 uni_common::TemporalType::Btic => return None, };
3335 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3336 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3337 _ => None,
3338 }
3339 }
3340
3341 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3344
3345 pub(crate) async fn execute_aggregate(
3346 &self,
3347 rows: Vec<HashMap<String, Value>>,
3348 group_by: &[Expr],
3349 aggregates: &[Expr],
3350 prop_manager: &PropertyManager,
3351 params: &HashMap<String, Value>,
3352 ctx: Option<&QueryContext>,
3353 ) -> Result<Vec<HashMap<String, Value>>> {
3354 if let Some(ctx) = ctx {
3356 ctx.check_timeout()?;
3357 }
3358
3359 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3360
3361 if rows.is_empty() {
3364 if group_by.is_empty() {
3365 let accs = Self::create_accumulators(aggregates);
3366 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3367 return Ok(vec![row]);
3368 }
3369 return Ok(vec![]);
3370 }
3371
3372 for (idx, row) in rows.into_iter().enumerate() {
3373 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3375 && let Some(ctx) = ctx
3376 {
3377 ctx.check_timeout()?;
3378 }
3379
3380 let key_vals = self
3381 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3382 .await?;
3383 let key_str = format!(
3386 "[{}]",
3387 key_vals
3388 .iter()
3389 .map(Self::canonical_value_key)
3390 .collect::<Vec<_>>()
3391 .join(",")
3392 );
3393
3394 let entry = groups
3395 .entry(key_str)
3396 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3397
3398 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3399 .await?;
3400 }
3401
3402 let results = groups
3403 .values()
3404 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3405 .collect();
3406
3407 Ok(results)
3408 }
3409
3410 pub(crate) async fn execute_window(
3411 &self,
3412 mut rows: Vec<HashMap<String, Value>>,
3413 window_exprs: &[Expr],
3414 _prop_manager: &PropertyManager,
3415 _params: &HashMap<String, Value>,
3416 ctx: Option<&QueryContext>,
3417 ) -> Result<Vec<HashMap<String, Value>>> {
3418 if let Some(ctx) = ctx {
3420 ctx.check_timeout()?;
3421 }
3422
3423 if rows.is_empty() || window_exprs.is_empty() {
3425 return Ok(rows);
3426 }
3427
3428 for window_expr in window_exprs {
3430 let Expr::FunctionCall {
3432 name,
3433 args,
3434 window_spec: Some(window_spec),
3435 ..
3436 } = window_expr
3437 else {
3438 return Err(anyhow!(
3439 "Window expression must be a FunctionCall with OVER clause: {:?}",
3440 window_expr
3441 ));
3442 };
3443
3444 let name_upper = name.to_uppercase();
3445
3446 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3448 return Err(anyhow!(
3449 "Unsupported window function: {}. Supported functions: {}",
3450 name,
3451 WINDOW_FUNCTIONS.join(", ")
3452 ));
3453 }
3454
3455 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3457
3458 for (row_idx, row) in rows.iter().enumerate() {
3459 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3461 vec![]
3463 } else {
3464 window_spec
3465 .partition_by
3466 .iter()
3467 .map(|expr| self.evaluate_simple_expr(expr, row))
3468 .collect::<Result<Vec<_>>>()?
3469 };
3470
3471 partition_map
3472 .entry(partition_key)
3473 .or_default()
3474 .push(row_idx);
3475 }
3476
3477 for (_partition_key, row_indices) in partition_map.iter_mut() {
3479 if !window_spec.order_by.is_empty() {
3481 row_indices.sort_by(|&a, &b| {
3482 for sort_item in &window_spec.order_by {
3483 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3484 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3485
3486 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3487 let cmp = Executor::compare_values(&va, &vb);
3488 let cmp = if sort_item.ascending {
3489 cmp
3490 } else {
3491 cmp.reverse()
3492 };
3493 if cmp != std::cmp::Ordering::Equal {
3494 return cmp;
3495 }
3496 }
3497 }
3498 std::cmp::Ordering::Equal
3499 });
3500 }
3501
3502 for (position, &row_idx) in row_indices.iter().enumerate() {
3504 let window_value = match name_upper.as_str() {
3505 "ROW_NUMBER" => Value::from((position + 1) as i64),
3506 "RANK" => {
3507 let rank = if position == 0 {
3509 1i64
3510 } else {
3511 let prev_row_idx = row_indices[position - 1];
3512 let same_as_prev = self.rows_have_same_sort_keys(
3513 &window_spec.order_by,
3514 &rows,
3515 row_idx,
3516 prev_row_idx,
3517 );
3518
3519 if same_as_prev {
3520 let mut group_start = position - 1;
3522 while group_start > 0 {
3523 let curr_idx = row_indices[group_start];
3524 let prev_idx = row_indices[group_start - 1];
3525 if !self.rows_have_same_sort_keys(
3526 &window_spec.order_by,
3527 &rows,
3528 curr_idx,
3529 prev_idx,
3530 ) {
3531 break;
3532 }
3533 group_start -= 1;
3534 }
3535 (group_start + 1) as i64
3536 } else {
3537 (position + 1) as i64
3538 }
3539 };
3540 Value::from(rank)
3541 }
3542 "DENSE_RANK" => {
3543 let mut dense_rank = 1i64;
3545 for i in 0..position {
3546 let curr_idx = row_indices[i + 1];
3547 let prev_idx = row_indices[i];
3548 if !self.rows_have_same_sort_keys(
3549 &window_spec.order_by,
3550 &rows,
3551 curr_idx,
3552 prev_idx,
3553 ) {
3554 dense_rank += 1;
3555 }
3556 }
3557 Value::from(dense_rank)
3558 }
3559 "LAG" => {
3560 let (value_expr, offset, default_value) =
3561 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3562
3563 if position >= offset {
3564 let target_idx = row_indices[position - offset];
3565 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3566 } else {
3567 default_value
3568 }
3569 }
3570 "LEAD" => {
3571 let (value_expr, offset, default_value) =
3572 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3573
3574 if position + offset < row_indices.len() {
3575 let target_idx = row_indices[position + offset];
3576 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3577 } else {
3578 default_value
3579 }
3580 }
3581 "NTILE" => {
3582 let num_buckets_expr = args.first().ok_or_else(|| {
3584 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3585 })?;
3586 let num_buckets_val =
3587 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3588 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3589 anyhow!(
3590 "NTILE argument must be an integer, got: {:?}",
3591 num_buckets_val
3592 )
3593 })?;
3594
3595 if num_buckets <= 0 {
3596 return Err(anyhow!(
3597 "NTILE bucket count must be positive, got: {}",
3598 num_buckets
3599 ));
3600 }
3601
3602 let num_buckets = num_buckets as usize;
3603 let partition_size = row_indices.len();
3604
3605 let base_size = partition_size / num_buckets;
3610 let extra_rows = partition_size % num_buckets;
3611
3612 let bucket = if position < extra_rows * (base_size + 1) {
3614 position / (base_size + 1) + 1
3616 } else {
3617 let adjusted_position = position - extra_rows * (base_size + 1);
3619 extra_rows + (adjusted_position / base_size) + 1
3620 };
3621
3622 Value::from(bucket as i64)
3623 }
3624 "FIRST_VALUE" => {
3625 let value_expr = args.first().ok_or_else(|| {
3627 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3628 })?;
3629
3630 if row_indices.is_empty() {
3632 Value::Null
3633 } else {
3634 let first_idx = row_indices[0];
3635 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3636 }
3637 }
3638 "LAST_VALUE" => {
3639 let value_expr = args.first().ok_or_else(|| {
3641 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3642 })?;
3643
3644 if row_indices.is_empty() {
3646 Value::Null
3647 } else {
3648 let last_idx = row_indices[row_indices.len() - 1];
3649 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3650 }
3651 }
3652 "NTH_VALUE" => {
3653 if args.len() != 2 {
3655 return Err(anyhow!(
3656 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3657 ));
3658 }
3659
3660 let value_expr = &args[0];
3661 let n_expr = &args[1];
3662
3663 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3664 let n = n_val.as_i64().ok_or_else(|| {
3665 anyhow!(
3666 "NTH_VALUE second argument must be an integer, got: {:?}",
3667 n_val
3668 )
3669 })?;
3670
3671 if n <= 0 {
3672 return Err(anyhow!(
3673 "NTH_VALUE position must be positive, got: {}",
3674 n
3675 ));
3676 }
3677
3678 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3680 let nth_idx = row_indices[nth_index];
3681 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3682 } else {
3683 Value::Null
3684 }
3685 }
3686 _ => unreachable!("Window function {} already validated", name),
3687 };
3688
3689 let col_name = window_expr.to_string_repr();
3692 rows[row_idx].insert(col_name, window_value);
3693 }
3694 }
3695 }
3696
3697 Ok(rows)
3698 }
3699
3700 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3705 match expr {
3706 Expr::Variable(name) => row
3707 .get(name)
3708 .cloned()
3709 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3710 Expr::Property(base, prop) => {
3711 let base_val = self.evaluate_simple_expr(base, row)?;
3712 if let Value::Map(map) = base_val {
3713 map.get(prop)
3714 .cloned()
3715 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3716 } else {
3717 Err(anyhow!("Cannot access property on non-object"))
3718 }
3719 }
3720 Expr::Literal(lit) => Ok(lit.to_value()),
3721 _ => Err(anyhow!(
3722 "Unsupported expression in window function: {:?}",
3723 expr
3724 )),
3725 }
3726 }
3727
3728 fn rows_have_same_sort_keys(
3730 &self,
3731 order_by: &[uni_cypher::ast::SortItem],
3732 rows: &[HashMap<String, Value>],
3733 idx_a: usize,
3734 idx_b: usize,
3735 ) -> bool {
3736 order_by.iter().all(|sort_item| {
3737 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3738 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3739 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3740 })
3741 }
3742
3743 fn extract_lag_lead_params<'a>(
3745 &self,
3746 func_name: &str,
3747 args: &'a [Expr],
3748 row: &HashMap<String, Value>,
3749 ) -> Result<(&'a Expr, usize, Value)> {
3750 let value_expr = args.first().ok_or_else(|| {
3751 anyhow!(
3752 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3753 func_name,
3754 func_name
3755 )
3756 })?;
3757
3758 let offset = if let Some(offset_expr) = args.get(1) {
3759 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3760 offset_val.as_i64().ok_or_else(|| {
3761 anyhow!(
3762 "{} offset must be an integer, got: {:?}",
3763 func_name,
3764 offset_val
3765 )
3766 })? as usize
3767 } else {
3768 1
3769 };
3770
3771 let default_value = if let Some(default_expr) = args.get(2) {
3772 self.evaluate_simple_expr(default_expr, row)?
3773 } else {
3774 Value::Null
3775 };
3776
3777 Ok((value_expr, offset, default_value))
3778 }
3779
3780 pub(crate) async fn evaluate_group_keys(
3782 &self,
3783 group_by: &[Expr],
3784 row: &HashMap<String, Value>,
3785 prop_manager: &PropertyManager,
3786 params: &HashMap<String, Value>,
3787 ctx: Option<&QueryContext>,
3788 ) -> Result<Vec<Value>> {
3789 let mut key_vals = Vec::new();
3790 for expr in group_by {
3791 key_vals.push(
3792 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3793 .await?,
3794 );
3795 }
3796 Ok(key_vals)
3797 }
3798
3799 pub(crate) async fn update_accumulators(
3801 &self,
3802 accs: &mut [Accumulator],
3803 aggregates: &[Expr],
3804 row: &HashMap<String, Value>,
3805 prop_manager: &PropertyManager,
3806 params: &HashMap<String, Value>,
3807 ctx: Option<&QueryContext>,
3808 ) -> Result<()> {
3809 for (i, agg_expr) in aggregates.iter().enumerate() {
3810 if let Expr::FunctionCall { args, .. } = agg_expr {
3811 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3812 let val = if is_wildcard {
3813 Value::Null
3814 } else {
3815 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3816 .await?
3817 };
3818 accs[i].update(&val, is_wildcard);
3819 }
3820 }
3821 Ok(())
3822 }
3823
3824 pub(crate) async fn execute_recursive_cte(
3826 &self,
3827 cte_name: &str,
3828 initial: LogicalPlan,
3829 recursive: LogicalPlan,
3830 prop_manager: &PropertyManager,
3831 params: &HashMap<String, Value>,
3832 ctx: Option<&QueryContext>,
3833 ) -> Result<Vec<HashMap<String, Value>>> {
3834 use std::collections::HashSet;
3835
3836 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3839 let mut pairs: Vec<_> = row.iter().collect();
3840 pairs.sort_by(|a, b| a.0.cmp(b.0));
3841 format!("{pairs:?}")
3842 }
3843
3844 let mut working_table = self
3846 .execute_subplan(initial, prop_manager, params, ctx)
3847 .await?;
3848 let mut result_table = working_table.clone();
3849
3850 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3852
3853 let max_iterations = self.config.max_recursive_cte_iterations;
3856 for _iteration in 0..max_iterations {
3857 if let Some(ctx) = ctx {
3859 ctx.check_timeout()?;
3860 }
3861
3862 if working_table.is_empty() {
3863 break;
3864 }
3865
3866 let working_val = Value::List(
3868 working_table
3869 .iter()
3870 .map(|row| {
3871 if row.len() == 1 {
3872 row.values().next().unwrap().clone()
3873 } else {
3874 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3875 }
3876 })
3877 .collect(),
3878 );
3879
3880 let mut next_params = params.clone();
3881 next_params.insert(cte_name.to_string(), working_val);
3882
3883 let next_result = self
3885 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3886 .await?;
3887
3888 if next_result.is_empty() {
3889 break;
3890 }
3891
3892 let new_rows: Vec<_> = next_result
3894 .into_iter()
3895 .filter(|row| {
3896 let key = row_key(row);
3897 seen.insert(key) })
3899 .collect();
3900
3901 if new_rows.is_empty() {
3902 break;
3904 }
3905
3906 result_table.extend(new_rows.clone());
3907 working_table = new_rows;
3908 }
3909
3910 let final_list = Value::List(
3912 result_table
3913 .into_iter()
3914 .map(|row| {
3915 if row.len() == 1 {
3927 row.values().next().unwrap().clone()
3928 } else {
3929 Value::Map(row.into_iter().collect())
3930 }
3931 })
3932 .collect(),
3933 );
3934
3935 let mut final_row = HashMap::new();
3936 final_row.insert(cte_name.to_string(), final_list);
3937 Ok(vec![final_row])
3938 }
3939
3940 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3942
3943 pub(crate) async fn execute_sort(
3944 &self,
3945 rows: Vec<HashMap<String, Value>>,
3946 order_by: &[uni_cypher::ast::SortItem],
3947 prop_manager: &PropertyManager,
3948 params: &HashMap<String, Value>,
3949 ctx: Option<&QueryContext>,
3950 ) -> Result<Vec<HashMap<String, Value>>> {
3951 if let Some(ctx) = ctx {
3953 ctx.check_timeout()?;
3954 }
3955
3956 let mut rows_with_keys = Vec::with_capacity(rows.len());
3957 for (idx, row) in rows.into_iter().enumerate() {
3958 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3960 && let Some(ctx) = ctx
3961 {
3962 ctx.check_timeout()?;
3963 }
3964
3965 let mut keys = Vec::new();
3966 for item in order_by {
3967 let val = row
3968 .get(&item.expr.to_string_repr())
3969 .cloned()
3970 .unwrap_or(Value::Null);
3971 let val = if val.is_null() {
3972 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3973 .await
3974 .unwrap_or(Value::Null)
3975 } else {
3976 val
3977 };
3978 keys.push(val);
3979 }
3980 rows_with_keys.push((row, keys));
3981 }
3982
3983 if let Some(ctx) = ctx {
3985 ctx.check_timeout()?;
3986 }
3987
3988 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3989
3990 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3991 }
3992
3993 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3995 aggregates
3996 .iter()
3997 .map(|expr| {
3998 if let Expr::FunctionCall { name, distinct, .. } = expr {
3999 Accumulator::new(name, *distinct)
4000 } else {
4001 Accumulator::new("COUNT", false)
4002 }
4003 })
4004 .collect()
4005 }
4006
4007 pub(crate) fn build_aggregate_result(
4009 group_by: &[Expr],
4010 aggregates: &[Expr],
4011 key_vals: &[Value],
4012 accs: &[Accumulator],
4013 ) -> HashMap<String, Value> {
4014 let mut res_row = HashMap::new();
4015 for (i, expr) in group_by.iter().enumerate() {
4016 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
4017 }
4018 for (i, expr) in aggregates.iter().enumerate() {
4019 let col_name = crate::query::planner::aggregate_column_name(expr);
4021 res_row.insert(col_name, accs[i].finish());
4022 }
4023 res_row
4024 }
4025
4026 pub(crate) fn compare_sort_keys(
4028 a_keys: &[Value],
4029 b_keys: &[Value],
4030 order_by: &[uni_cypher::ast::SortItem],
4031 ) -> std::cmp::Ordering {
4032 for (i, item) in order_by.iter().enumerate() {
4033 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
4034 if order != std::cmp::Ordering::Equal {
4035 return if item.ascending {
4036 order
4037 } else {
4038 order.reverse()
4039 };
4040 }
4041 }
4042 std::cmp::Ordering::Equal
4043 }
4044
4045 pub(crate) async fn execute_backup(
4049 &self,
4050 destination: &str,
4051 _options: &HashMap<String, Value>,
4052 ) -> Result<Vec<HashMap<String, Value>>> {
4053 if let Some(writer_arc) = &self.writer {
4055 let writer: &uni_store::Writer = writer_arc.as_ref();
4056 writer.flush_to_l1(None).await?;
4057 }
4058
4059 let snapshot_manager = self.storage.snapshot_manager();
4061 let snapshot = snapshot_manager
4062 .load_latest_snapshot()
4063 .await?
4064 .ok_or_else(|| anyhow!("No snapshot found"))?;
4065
4066 if is_cloud_url(destination) {
4068 self.backup_to_cloud(destination, &snapshot.snapshot_id)
4069 .await?;
4070 } else {
4071 let validated_dest = self.validate_path(destination)?;
4073 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
4074 .await?;
4075 }
4076
4077 let mut res = HashMap::new();
4078 res.insert(
4079 "status".to_string(),
4080 Value::String("Backup completed".to_string()),
4081 );
4082 res.insert(
4083 "snapshot_id".to_string(),
4084 Value::String(snapshot.snapshot_id),
4085 );
4086 Ok(vec![res])
4087 }
4088
4089 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
4091 let source_path = std::path::Path::new(self.storage.base_path());
4092
4093 if !dest_path.exists() {
4094 std::fs::create_dir_all(dest_path)?;
4095 }
4096
4097 if source_path.exists() {
4099 Self::copy_dir_all(source_path, dest_path)?;
4100 }
4101
4102 let schema_manager = self.storage.schema_manager();
4104 let dest_catalog = dest_path.join("catalog");
4105 if !dest_catalog.exists() {
4106 std::fs::create_dir_all(&dest_catalog)?;
4107 }
4108
4109 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4110 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
4111
4112 Ok(())
4113 }
4114
4115 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
4119 use object_store::ObjectStore;
4120 use object_store::ObjectStoreExt;
4121 use object_store::local::LocalFileSystem;
4122 use object_store::path::Path as ObjPath;
4123
4124 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
4125 let source_path = std::path::Path::new(self.storage.base_path());
4126
4127 let src_store: Arc<dyn ObjectStore> =
4129 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
4130
4131 let catalog_src = ObjPath::from("catalog");
4133 let catalog_dst = if dest_prefix.as_ref().is_empty() {
4134 ObjPath::from("catalog")
4135 } else {
4136 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
4137 };
4138 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
4139
4140 let storage_src = ObjPath::from("storage");
4142 let storage_dst = if dest_prefix.as_ref().is_empty() {
4143 ObjPath::from("storage")
4144 } else {
4145 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
4146 };
4147 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
4148
4149 let schema_manager = self.storage.schema_manager();
4151 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
4152 let schema_path = if dest_prefix.as_ref().is_empty() {
4153 ObjPath::from("catalog/schema.json")
4154 } else {
4155 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
4156 };
4157 dest_store
4158 .put(&schema_path, bytes::Bytes::from(schema_content).into())
4159 .await?;
4160
4161 Ok(())
4162 }
4163
4164 const MAX_BACKUP_DEPTH: usize = 100;
4169
4170 const MAX_BACKUP_FILES: usize = 100_000;
4175
4176 pub(crate) fn copy_dir_all(
4184 src: &std::path::Path,
4185 dst: &std::path::Path,
4186 ) -> std::io::Result<()> {
4187 let mut file_count = 0usize;
4188 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
4189 }
4190
4191 pub(crate) fn copy_dir_all_impl(
4193 src: &std::path::Path,
4194 dst: &std::path::Path,
4195 depth: usize,
4196 file_count: &mut usize,
4197 ) -> std::io::Result<()> {
4198 if depth >= Self::MAX_BACKUP_DEPTH {
4199 return Err(std::io::Error::new(
4200 std::io::ErrorKind::InvalidInput,
4201 format!(
4202 "Maximum backup depth {} exceeded at {:?}",
4203 Self::MAX_BACKUP_DEPTH,
4204 src
4205 ),
4206 ));
4207 }
4208
4209 std::fs::create_dir_all(dst)?;
4210
4211 for entry in std::fs::read_dir(src)? {
4212 if *file_count >= Self::MAX_BACKUP_FILES {
4213 return Err(std::io::Error::new(
4214 std::io::ErrorKind::InvalidInput,
4215 format!(
4216 "Maximum backup file count {} exceeded",
4217 Self::MAX_BACKUP_FILES
4218 ),
4219 ));
4220 }
4221 *file_count += 1;
4222
4223 let entry = entry?;
4224 let metadata = entry.metadata()?;
4225
4226 if metadata.file_type().is_symlink() {
4228 continue;
4230 }
4231
4232 let dst_path = dst.join(entry.file_name());
4233 if metadata.is_dir() {
4234 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4235 } else {
4236 std::fs::copy(entry.path(), dst_path)?;
4237 }
4238 }
4239 Ok(())
4240 }
4241
4242 pub(crate) async fn execute_copy(
4243 &self,
4244 target: &str,
4245 source: &str,
4246 options: &HashMap<String, Value>,
4247 prop_manager: &PropertyManager,
4248 ) -> Result<Vec<HashMap<String, Value>>> {
4249 let format = options
4250 .get("format")
4251 .and_then(|v| v.as_str())
4252 .unwrap_or_else(|| {
4253 if source.ends_with(".parquet") {
4254 "parquet"
4255 } else {
4256 "csv"
4257 }
4258 });
4259
4260 match format.to_lowercase().as_str() {
4261 "csv" => self.execute_csv_import(target, source, options).await,
4262 "parquet" => {
4263 self.execute_parquet_import(target, source, options, prop_manager)
4264 .await
4265 }
4266 _ => Err(anyhow!("Unsupported format: {}", format)),
4267 }
4268 }
4269
4270 pub(crate) async fn execute_csv_import(
4271 &self,
4272 target: &str,
4273 source: &str,
4274 options: &HashMap<String, Value>,
4275 ) -> Result<Vec<HashMap<String, Value>>> {
4276 let validated_source = self.validate_path(source)?;
4278
4279 let writer_lock = self
4280 .writer
4281 .as_ref()
4282 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4283
4284 let schema = self.storage.schema_manager().schema();
4285
4286 let label_meta = schema.labels.get(target);
4288 let edge_meta = schema.edge_types.get(target);
4289
4290 if label_meta.is_none() && edge_meta.is_none() {
4291 return Err(anyhow!("Target '{}' not found in schema", target));
4292 }
4293
4294 let delimiter_str = options
4296 .get("delimiter")
4297 .and_then(|v| v.as_str())
4298 .unwrap_or(",");
4299 let delimiter = if delimiter_str.is_empty() {
4300 b','
4301 } else {
4302 delimiter_str.as_bytes()[0]
4303 };
4304 let has_header = options
4305 .get("header")
4306 .and_then(|v| v.as_bool())
4307 .unwrap_or(true);
4308
4309 let mut rdr = csv::ReaderBuilder::new()
4310 .delimiter(delimiter)
4311 .has_headers(has_header)
4312 .from_path(&validated_source)?;
4313
4314 let headers = rdr.headers()?.clone();
4315 let mut count = 0;
4316
4317 let writer: &uni_store::Writer = writer_lock.as_ref();
4318
4319 const CSV_ID_CHUNK: usize = 256;
4323
4324 if label_meta.is_some() {
4325 let target_props = schema
4326 .properties
4327 .get(target)
4328 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4329
4330 let mut vid_chunk: std::collections::VecDeque<Vid> =
4331 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4332 for result in rdr.records() {
4333 let record = result?;
4334 let mut props = HashMap::new();
4335
4336 for (i, header) in headers.iter().enumerate() {
4337 if let Some(val_str) = record.get(i)
4338 && let Some(prop_meta) = target_props.get(header)
4339 {
4340 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4341 props.insert(header.to_string(), val);
4342 }
4343 }
4344
4345 if vid_chunk.is_empty() {
4346 vid_chunk.extend(writer.allocate_vids(CSV_ID_CHUNK).await?);
4347 }
4348 let vid = vid_chunk.pop_front().unwrap();
4349 writer
4350 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4351 .await?;
4352 count += 1;
4353 }
4354 } else if let Some(meta) = edge_meta {
4355 let type_id = meta.id;
4356 let target_props = schema
4357 .properties
4358 .get(target)
4359 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4360
4361 let src_col = options
4364 .get("src_col")
4365 .and_then(|v| v.as_str())
4366 .unwrap_or("_src");
4367 let dst_col = options
4368 .get("dst_col")
4369 .and_then(|v| v.as_str())
4370 .unwrap_or("_dst");
4371
4372 let mut eid_chunk: std::collections::VecDeque<Eid> =
4373 std::collections::VecDeque::with_capacity(CSV_ID_CHUNK);
4374 for result in rdr.records() {
4375 let record = result?;
4376 let mut props = HashMap::new();
4377 let mut src_vid = None;
4378 let mut dst_vid = None;
4379
4380 for (i, header) in headers.iter().enumerate() {
4381 if let Some(val_str) = record.get(i) {
4382 if header == src_col {
4383 src_vid =
4384 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4385 } else if header == dst_col {
4386 dst_vid =
4387 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4388 } else if let Some(prop_meta) = target_props.get(header) {
4389 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4390 props.insert(header.to_string(), val);
4391 }
4392 }
4393 }
4394
4395 let src =
4396 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4397 let dst = dst_vid
4398 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4399
4400 if eid_chunk.is_empty() {
4401 eid_chunk.extend(writer.allocate_eids(CSV_ID_CHUNK).await?);
4402 }
4403 let eid = eid_chunk.pop_front().unwrap();
4404 writer
4405 .insert_edge(
4406 src,
4407 dst,
4408 type_id,
4409 eid,
4410 props,
4411 Some(target.to_string()),
4412 None,
4413 )
4414 .await?;
4415 count += 1;
4416 }
4417 }
4418
4419 let mut res = HashMap::new();
4420 res.insert("count".to_string(), Value::Int(count as i64));
4421 Ok(vec![res])
4422 }
4423
4424 pub(crate) async fn execute_parquet_import(
4428 &self,
4429 target: &str,
4430 source: &str,
4431 options: &HashMap<String, Value>,
4432 _prop_manager: &PropertyManager,
4433 ) -> Result<Vec<HashMap<String, Value>>> {
4434 let writer_lock = self
4435 .writer
4436 .as_ref()
4437 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4438
4439 let schema = self.storage.schema_manager().schema();
4440
4441 let label_meta = schema.labels.get(target);
4443 let edge_meta = schema.edge_types.get(target);
4444
4445 if label_meta.is_none() && edge_meta.is_none() {
4446 return Err(anyhow!("Target '{}' not found in schema", target));
4447 }
4448
4449 let reader = if is_cloud_url(source) {
4451 self.open_parquet_from_cloud(source).await?
4452 } else {
4453 let validated_source = self.validate_path(source)?;
4455 let file = std::fs::File::open(&validated_source)?;
4456 let builder =
4457 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4458 builder.build()?
4459 };
4460 let mut reader = reader;
4461
4462 let mut count = 0;
4463 let writer: &uni_store::Writer = writer_lock.as_ref();
4464
4465 if label_meta.is_some() {
4466 let target_props = schema
4467 .properties
4468 .get(target)
4469 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4470
4471 for batch in reader.by_ref() {
4472 let batch = batch?;
4473 let num_rows = batch.num_rows();
4474 let vids = writer.allocate_vids(num_rows).await?;
4476 for (row, &vid) in vids.iter().enumerate().take(num_rows) {
4477 let mut props = HashMap::new();
4478 for field in batch.schema().fields() {
4479 let name = field.name();
4480 if target_props.contains_key(name) {
4481 let col = batch.column_by_name(name).unwrap();
4482 if !col.is_null(row) {
4483 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4485 let val =
4486 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4487 props.insert(name.clone(), val);
4488 }
4489 }
4490 }
4491 writer
4492 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4493 .await?;
4494 count += 1;
4495 }
4496 }
4497 } else if let Some(meta) = edge_meta {
4498 let type_id = meta.id;
4499 let target_props = schema
4500 .properties
4501 .get(target)
4502 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4503
4504 let src_col = options
4505 .get("src_col")
4506 .and_then(|v| v.as_str())
4507 .unwrap_or("_src");
4508 let dst_col = options
4509 .get("dst_col")
4510 .and_then(|v| v.as_str())
4511 .unwrap_or("_dst");
4512
4513 for batch in reader {
4514 let batch = batch?;
4515 let num_rows = batch.num_rows();
4516 let eids = writer.allocate_eids(num_rows).await?;
4518 for (row, &eid) in eids.iter().enumerate().take(num_rows) {
4519 let mut props = HashMap::new();
4520 let mut src_vid = None;
4521 let mut dst_vid = None;
4522
4523 for field in batch.schema().fields() {
4524 let name = field.name();
4525 let col = batch.column_by_name(name).unwrap();
4526 if col.is_null(row) {
4527 continue;
4528 }
4529
4530 if name == src_col {
4531 let val = Self::arrow_to_value(col.as_ref(), row);
4532 src_vid = Some(Self::vid_from_value(&val)?);
4533 } else if name == dst_col {
4534 let val = Self::arrow_to_value(col.as_ref(), row);
4535 dst_vid = Some(Self::vid_from_value(&val)?);
4536 } else if let Some(pm) = target_props.get(name) {
4537 let val =
4539 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4540 props.insert(name.clone(), val);
4541 }
4542 }
4543
4544 let src = src_vid
4545 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4546 let dst = dst_vid.ok_or_else(|| {
4547 anyhow!("Missing destination VID in column '{}'", dst_col)
4548 })?;
4549
4550 writer
4551 .insert_edge(
4552 src,
4553 dst,
4554 type_id,
4555 eid,
4556 props,
4557 Some(target.to_string()),
4558 None,
4559 )
4560 .await?;
4561 count += 1;
4562 }
4563 }
4564 }
4565
4566 let mut res = HashMap::new();
4567 res.insert("count".to_string(), Value::Int(count as i64));
4568 Ok(vec![res])
4569 }
4570
4571 async fn open_parquet_from_cloud(
4575 &self,
4576 source_url: &str,
4577 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4578 use object_store::ObjectStoreExt;
4579
4580 let (store, path) = build_store_from_url(source_url)?;
4581
4582 let bytes = store.get(&path).await?.bytes().await?;
4584
4585 let reader = bytes::Bytes::from(bytes.to_vec());
4587 let builder =
4588 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4589 Ok(builder.build()?)
4590 }
4591
4592 pub(crate) async fn scan_edge_type(
4593 &self,
4594 edge_type: &str,
4595 ctx: Option<&QueryContext>,
4596 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4597 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4598
4599 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4601
4602 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4604
4605 if let Some(ctx) = ctx {
4607 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4608 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4609 }
4610
4611 Ok(edges
4612 .into_iter()
4613 .map(|(eid, (src, dst))| (eid, src, dst))
4614 .collect())
4615 }
4616
4617 pub(crate) async fn scan_edge_type_l2(
4622 &self,
4623 _edge_type: &str,
4624 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4625 ) -> Result<()> {
4626 Ok(())
4629 }
4630
4631 pub(crate) async fn scan_edge_type_l1(
4633 &self,
4634 edge_type: &str,
4635 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4636 ) -> Result<()> {
4637 if let Ok(Some(batch)) = self
4638 .storage
4639 .scan_delta_table(
4640 edge_type,
4641 "fwd",
4642 &["eid", "src_vid", "dst_vid", "op", "_version"],
4643 None,
4644 )
4645 .await
4646 {
4647 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4649 HashMap::new();
4650
4651 self.process_delta_batch(&batch, &mut versioned_ops)?;
4652
4653 for (eid, (_, op, src, dst)) in versioned_ops {
4655 if op == 0 {
4656 edges.insert(eid, (src, dst));
4657 } else if op == 1 {
4658 edges.remove(&eid);
4659 }
4660 }
4661 }
4662 Ok(())
4663 }
4664
4665 pub(crate) fn process_delta_batch(
4667 &self,
4668 batch: &arrow_array::RecordBatch,
4669 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4670 ) -> Result<()> {
4671 use arrow_array::UInt64Array;
4672 let eid_col = batch
4673 .column_by_name("eid")
4674 .ok_or(anyhow!("Missing eid"))?
4675 .as_any()
4676 .downcast_ref::<UInt64Array>()
4677 .ok_or(anyhow!("Invalid eid"))?;
4678 let src_col = batch
4679 .column_by_name("src_vid")
4680 .ok_or(anyhow!("Missing src_vid"))?
4681 .as_any()
4682 .downcast_ref::<UInt64Array>()
4683 .ok_or(anyhow!("Invalid src_vid"))?;
4684 let dst_col = batch
4685 .column_by_name("dst_vid")
4686 .ok_or(anyhow!("Missing dst_vid"))?
4687 .as_any()
4688 .downcast_ref::<UInt64Array>()
4689 .ok_or(anyhow!("Invalid dst_vid"))?;
4690 let op_col = batch
4691 .column_by_name("op")
4692 .ok_or(anyhow!("Missing op"))?
4693 .as_any()
4694 .downcast_ref::<arrow_array::UInt8Array>()
4695 .ok_or(anyhow!("Invalid op"))?;
4696 let version_col = batch
4697 .column_by_name("_version")
4698 .ok_or(anyhow!("Missing _version"))?
4699 .as_any()
4700 .downcast_ref::<UInt64Array>()
4701 .ok_or(anyhow!("Invalid _version"))?;
4702
4703 for i in 0..batch.num_rows() {
4704 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4705 let version = version_col.value(i);
4706 let op = op_col.value(i);
4707 let src = Vid::from(src_col.value(i));
4708 let dst = Vid::from(dst_col.value(i));
4709
4710 match versioned_ops.entry(eid) {
4711 std::collections::hash_map::Entry::Vacant(e) => {
4712 e.insert((version, op, src, dst));
4713 }
4714 std::collections::hash_map::Entry::Occupied(mut e) => {
4715 if version > e.get().0 {
4716 e.insert((version, op, src, dst));
4717 }
4718 }
4719 }
4720 }
4721 Ok(())
4722 }
4723
4724 pub(crate) fn scan_edge_type_l0(
4726 &self,
4727 edge_type: &str,
4728 ctx: &QueryContext,
4729 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4730 ) {
4731 let schema = self.storage.schema_manager().schema();
4732 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4733
4734 if let Some(type_id) = type_id {
4735 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4737
4738 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4740 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4741 }
4742
4743 for pending_l0_arc in &ctx.pending_flush_l0s {
4745 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4746 }
4747 }
4748 }
4749
4750 pub(crate) fn scan_single_l0(
4752 &self,
4753 l0: &uni_store::runtime::L0Buffer,
4754 type_id: u32,
4755 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4756 ) {
4757 for edge_entry in l0.graph.edges() {
4758 if edge_entry.edge_type == type_id {
4759 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4760 }
4761 }
4762 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4764 for eid in eids_to_check {
4765 if l0.is_tombstoned(eid) {
4766 edges.remove(&eid);
4767 }
4768 }
4769 }
4770
4771 pub(crate) fn filter_tombstoned_vertex_edges(
4773 &self,
4774 ctx: &QueryContext,
4775 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4776 ) {
4777 let l0 = ctx.l0.read();
4778 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4779
4780 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4782 let tx_l0 = tx_l0_arc.read();
4783 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4784 }
4785
4786 for pending_l0_arc in &ctx.pending_flush_l0s {
4788 let pending_l0 = pending_l0_arc.read();
4789 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4790 }
4791
4792 edges.retain(|_, (src, dst)| {
4793 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4794 });
4795 }
4796
4797 pub(crate) async fn execute_project(
4799 &self,
4800 input_rows: Vec<HashMap<String, Value>>,
4801 projections: &[(Expr, Option<String>)],
4802 prop_manager: &PropertyManager,
4803 params: &HashMap<String, Value>,
4804 ctx: Option<&QueryContext>,
4805 ) -> Result<Vec<HashMap<String, Value>>> {
4806 let mut results = Vec::new();
4807 for m in input_rows {
4808 let mut row = HashMap::new();
4809 for (expr, alias) in projections {
4810 let val = self
4811 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4812 .await?;
4813 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4814 row.insert(name, val);
4815 }
4816 results.push(row);
4817 }
4818 Ok(results)
4819 }
4820
4821 pub(crate) async fn execute_unwind(
4823 &self,
4824 input_rows: Vec<HashMap<String, Value>>,
4825 expr: &Expr,
4826 variable: &str,
4827 prop_manager: &PropertyManager,
4828 params: &HashMap<String, Value>,
4829 ctx: Option<&QueryContext>,
4830 ) -> Result<Vec<HashMap<String, Value>>> {
4831 let mut results = Vec::new();
4832 for row in input_rows {
4833 let val = self
4834 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4835 .await?;
4836 if let Value::List(items) = val {
4837 for item in items {
4838 let mut new_row = row.clone();
4839 new_row.insert(variable.to_string(), item);
4840 results.push(new_row);
4841 }
4842 }
4843 }
4844 Ok(results)
4845 }
4846
4847 pub(crate) async fn execute_apply(
4849 &self,
4850 input_rows: Vec<HashMap<String, Value>>,
4851 subquery: &LogicalPlan,
4852 input_filter: Option<&Expr>,
4853 prop_manager: &PropertyManager,
4854 params: &HashMap<String, Value>,
4855 ctx: Option<&QueryContext>,
4856 ) -> Result<Vec<HashMap<String, Value>>> {
4857 let mut filtered_rows = input_rows;
4858
4859 if let Some(filter) = input_filter {
4860 let mut filtered = Vec::new();
4861 for row in filtered_rows {
4862 let res = self
4863 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4864 .await?;
4865 if res.as_bool().unwrap_or(false) {
4866 filtered.push(row);
4867 }
4868 }
4869 filtered_rows = filtered;
4870 }
4871
4872 if filtered_rows.is_empty() {
4875 let sub_rows = self
4876 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4877 .await?;
4878 return Ok(sub_rows);
4879 }
4880
4881 let mut results = Vec::new();
4882 for row in filtered_rows {
4883 let mut sub_params = params.clone();
4884 sub_params.extend(row.clone());
4885
4886 let sub_rows = self
4887 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4888 .await?;
4889
4890 for sub_row in sub_rows {
4891 let mut new_row = row.clone();
4892 new_row.extend(sub_row);
4893 results.push(new_row);
4894 }
4895 }
4896 Ok(results)
4897 }
4898
4899 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4901 let schema = self.storage.schema_manager().schema();
4902 let mut rows = Vec::new();
4903 for idx in &schema.indexes {
4904 let (name, type_str, details) = match idx {
4905 uni_common::core::schema::IndexDefinition::Vector(c) => (
4906 c.name.clone(),
4907 "VECTOR",
4908 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4909 ),
4910 uni_common::core::schema::IndexDefinition::FullText(c) => (
4911 c.name.clone(),
4912 "FULLTEXT",
4913 format!("on {}:{:?}", c.label, c.properties),
4914 ),
4915 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4916 cfg.name.clone(),
4917 "SCALAR",
4918 format!(":{}({:?})", cfg.label, cfg.properties),
4919 ),
4920 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4921 };
4922
4923 if let Some(f) = filter
4924 && f != type_str
4925 {
4926 continue;
4927 }
4928
4929 let mut row = HashMap::new();
4930 row.insert("name".to_string(), Value::String(name));
4931 row.insert("type".to_string(), Value::String(type_str.to_string()));
4932 row.insert("details".to_string(), Value::String(details));
4933 rows.push(row);
4934 }
4935 rows
4936 }
4937
4938 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4939 let mut row = HashMap::new();
4940 row.insert("name".to_string(), Value::String("uni".to_string()));
4941 vec![row]
4943 }
4944
4945 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4946 vec![]
4948 }
4949
4950 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4951 let snapshot = self
4952 .storage
4953 .snapshot_manager()
4954 .load_latest_snapshot()
4955 .await?;
4956 let mut results = Vec::new();
4957
4958 if let Some(snap) = snapshot {
4959 for (label, s) in &snap.vertices {
4960 let mut row = HashMap::new();
4961 row.insert("type".to_string(), Value::String("Label".to_string()));
4962 row.insert("name".to_string(), Value::String(label.clone()));
4963 row.insert("count".to_string(), Value::Int(s.count as i64));
4964 results.push(row);
4965 }
4966 for (edge, s) in &snap.edges {
4967 let mut row = HashMap::new();
4968 row.insert("type".to_string(), Value::String("Edge".to_string()));
4969 row.insert("name".to_string(), Value::String(edge.clone()));
4970 row.insert("count".to_string(), Value::Int(s.count as i64));
4971 results.push(row);
4972 }
4973 }
4974
4975 Ok(results)
4976 }
4977
4978 pub(crate) fn execute_show_constraints(
4979 &self,
4980 clause: ShowConstraints,
4981 ) -> Vec<HashMap<String, Value>> {
4982 let schema = self.storage.schema_manager().schema();
4983 let mut rows = Vec::new();
4984 for c in &schema.constraints {
4985 if let Some(target) = &clause.target {
4986 match (target, &c.target) {
4987 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4988 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4989 if e1 == e2 => {}
4990 _ => continue,
4991 }
4992 }
4993
4994 let mut row = HashMap::new();
4995 row.insert("name".to_string(), Value::String(c.name.clone()));
4996 let type_str = match c.constraint_type {
4997 ConstraintType::Unique { .. } => "UNIQUE",
4998 ConstraintType::Exists { .. } => "EXISTS",
4999 ConstraintType::Check { .. } => "CHECK",
5000 _ => "UNKNOWN",
5001 };
5002 row.insert("type".to_string(), Value::String(type_str.to_string()));
5003
5004 let target_str = match &c.target {
5005 ConstraintTarget::Label(l) => format!("(:{})", l),
5006 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
5007 _ => "UNKNOWN".to_string(),
5008 };
5009 row.insert("target".to_string(), Value::String(target_str));
5010
5011 rows.push(row);
5012 }
5013 rows
5014 }
5015
5016 pub(crate) async fn execute_cross_join(
5018 &self,
5019 left: Box<LogicalPlan>,
5020 right: Box<LogicalPlan>,
5021 prop_manager: &PropertyManager,
5022 params: &HashMap<String, Value>,
5023 ctx: Option<&QueryContext>,
5024 ) -> Result<Vec<HashMap<String, Value>>> {
5025 let left_rows = self
5026 .execute_subplan(*left, prop_manager, params, ctx)
5027 .await?;
5028 let right_rows = self
5029 .execute_subplan(*right, prop_manager, params, ctx)
5030 .await?;
5031
5032 let mut results = Vec::new();
5033 for l in &left_rows {
5034 for r in &right_rows {
5035 let mut combined = l.clone();
5036 combined.extend(r.clone());
5037 results.push(combined);
5038 }
5039 }
5040 Ok(results)
5041 }
5042
5043 pub(crate) async fn execute_union(
5045 &self,
5046 left: Box<LogicalPlan>,
5047 right: Box<LogicalPlan>,
5048 all: bool,
5049 prop_manager: &PropertyManager,
5050 params: &HashMap<String, Value>,
5051 ctx: Option<&QueryContext>,
5052 ) -> Result<Vec<HashMap<String, Value>>> {
5053 let mut left_rows = self
5054 .execute_subplan(*left, prop_manager, params, ctx)
5055 .await?;
5056 let mut right_rows = self
5057 .execute_subplan(*right, prop_manager, params, ctx)
5058 .await?;
5059
5060 left_rows.append(&mut right_rows);
5061
5062 if !all {
5063 let mut seen = HashSet::new();
5064 left_rows.retain(|row| {
5065 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
5066 let key = format!("{sorted_row:?}");
5067 seen.insert(key)
5068 });
5069 }
5070 Ok(left_rows)
5071 }
5072
5073 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
5075 let schema = self.storage.schema_manager().schema();
5076 schema.indexes.iter().any(|idx| match idx {
5077 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
5078 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
5079 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
5080 _ => false,
5081 })
5082 }
5083
5084 pub(crate) async fn execute_export(
5085 &self,
5086 target: &str,
5087 source: &str,
5088 options: &HashMap<String, Value>,
5089 prop_manager: &PropertyManager,
5090 ctx: Option<&QueryContext>,
5091 ) -> Result<Vec<HashMap<String, Value>>> {
5092 let format = options
5093 .get("format")
5094 .and_then(|v| v.as_str())
5095 .unwrap_or("csv")
5096 .to_lowercase();
5097
5098 match format.as_str() {
5099 "csv" => {
5100 self.execute_csv_export(target, source, options, prop_manager, ctx)
5101 .await
5102 }
5103 "parquet" => {
5104 self.execute_parquet_export(target, source, options, prop_manager, ctx)
5105 .await
5106 }
5107 _ => Err(anyhow!("Unsupported export format: {}", format)),
5108 }
5109 }
5110
5111 pub(crate) async fn execute_csv_export(
5112 &self,
5113 target: &str,
5114 source: &str,
5115 options: &HashMap<String, Value>,
5116 prop_manager: &PropertyManager,
5117 ctx: Option<&QueryContext>,
5118 ) -> Result<Vec<HashMap<String, Value>>> {
5119 let validated_dest = self.validate_path(source)?;
5121
5122 let schema = self.storage.schema_manager().schema();
5123 let label_meta = schema.labels.get(target);
5124 let edge_meta = schema.edge_types.get(target);
5125
5126 if label_meta.is_none() && edge_meta.is_none() {
5127 return Err(anyhow!("Target '{}' not found in schema", target));
5128 }
5129
5130 let delimiter_str = options
5131 .get("delimiter")
5132 .and_then(|v| v.as_str())
5133 .unwrap_or(",");
5134 let delimiter = if delimiter_str.is_empty() {
5135 b','
5136 } else {
5137 delimiter_str.as_bytes()[0]
5138 };
5139 let has_header = options
5140 .get("header")
5141 .and_then(|v| v.as_bool())
5142 .unwrap_or(true);
5143
5144 let mut wtr = csv::WriterBuilder::new()
5145 .delimiter(delimiter)
5146 .from_path(&validated_dest)?;
5147
5148 let mut count = 0;
5149 let empty_props = HashMap::new();
5151
5152 if let Some(meta) = label_meta {
5153 let label_id = meta.id;
5154 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5155 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5156 prop_names.sort();
5157
5158 let mut headers = vec!["_vid".to_string()];
5159 headers.extend(prop_names.clone());
5160
5161 if has_header {
5162 wtr.write_record(&headers)?;
5163 }
5164
5165 let vids = self
5166 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5167 .await?;
5168
5169 for vid in vids {
5170 let props = prop_manager
5171 .get_all_vertex_props_with_ctx(vid, ctx)
5172 .await?
5173 .unwrap_or_default();
5174
5175 let mut row = Vec::with_capacity(headers.len());
5176 row.push(vid.to_string());
5177 for p_name in &prop_names {
5178 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5179 row.push(self.format_csv_value(val));
5180 }
5181 wtr.write_record(&row)?;
5182 count += 1;
5183 }
5184 } else if let Some(meta) = edge_meta {
5185 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
5186 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
5187 prop_names.sort();
5188
5189 let mut headers = vec![
5191 "_eid".to_string(),
5192 "_src".to_string(),
5193 "_dst".to_string(),
5194 "_type".to_string(),
5195 ];
5196 headers.extend(prop_names.clone());
5197
5198 if has_header {
5199 wtr.write_record(&headers)?;
5200 }
5201
5202 let edges = self.scan_edge_type(target, ctx).await?;
5203
5204 for (eid, src, dst) in edges {
5205 let props = prop_manager
5206 .get_all_edge_props_with_ctx(eid, ctx)
5207 .await?
5208 .unwrap_or_default();
5209
5210 let mut row = Vec::with_capacity(headers.len());
5211 row.push(eid.to_string());
5212 row.push(src.to_string());
5213 row.push(dst.to_string());
5214 row.push(meta.id.to_string());
5215
5216 for p_name in &prop_names {
5217 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
5218 row.push(self.format_csv_value(val));
5219 }
5220 wtr.write_record(&row)?;
5221 count += 1;
5222 }
5223 }
5224
5225 wtr.flush()?;
5226 let mut res = HashMap::new();
5227 res.insert("count".to_string(), Value::Int(count as i64));
5228 Ok(vec![res])
5229 }
5230
5231 pub(crate) async fn execute_parquet_export(
5235 &self,
5236 target: &str,
5237 destination: &str,
5238 _options: &HashMap<String, Value>,
5239 prop_manager: &PropertyManager,
5240 ctx: Option<&QueryContext>,
5241 ) -> Result<Vec<HashMap<String, Value>>> {
5242 let schema_manager = self.storage.schema_manager();
5243 let schema = schema_manager.schema();
5244 let label_meta = schema.labels.get(target);
5245 let edge_meta = schema.edge_types.get(target);
5246
5247 if label_meta.is_none() && edge_meta.is_none() {
5248 return Err(anyhow!("Target '{}' not found in schema", target));
5249 }
5250
5251 let arrow_schema = if label_meta.is_some() {
5252 let dataset = self.storage.vertex_dataset(target)?;
5253 dataset.get_arrow_schema(&schema)?
5254 } else {
5255 let dataset = self.storage.edge_dataset(target, "", "")?;
5257 dataset.get_arrow_schema(&schema)?
5258 };
5259
5260 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5261
5262 if let Some(meta) = label_meta {
5263 let label_id = meta.id;
5264 let vids = self
5265 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5266 .await?;
5267
5268 for vid in vids {
5269 let mut props = prop_manager
5270 .get_all_vertex_props_with_ctx(vid, ctx)
5271 .await?
5272 .unwrap_or_default();
5273
5274 props.insert(
5275 "_vid".to_string(),
5276 uni_common::Value::Int(vid.as_u64() as i64),
5277 );
5278 if !props.contains_key("_uid") {
5279 props.insert(
5280 "_uid".to_string(),
5281 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5282 );
5283 }
5284 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5285 props.insert("_version".to_string(), uni_common::Value::Int(1));
5286 rows.push(props);
5287 }
5288 } else if edge_meta.is_some() {
5289 let edges = self.scan_edge_type(target, ctx).await?;
5290 for (eid, src, dst) in edges {
5291 let mut props = prop_manager
5292 .get_all_edge_props_with_ctx(eid, ctx)
5293 .await?
5294 .unwrap_or_default();
5295
5296 props.insert(
5297 "eid".to_string(),
5298 uni_common::Value::Int(eid.as_u64() as i64),
5299 );
5300 props.insert(
5301 "src_vid".to_string(),
5302 uni_common::Value::Int(src.as_u64() as i64),
5303 );
5304 props.insert(
5305 "dst_vid".to_string(),
5306 uni_common::Value::Int(dst.as_u64() as i64),
5307 );
5308 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5309 props.insert("_version".to_string(), uni_common::Value::Int(1));
5310 rows.push(props);
5311 }
5312 }
5313
5314 if is_cloud_url(destination) {
5316 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5317 .await?;
5318 } else {
5319 let validated_dest = self.validate_path(destination)?;
5321 let file = std::fs::File::create(&validated_dest)?;
5322 let mut writer =
5323 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5324
5325 if !rows.is_empty() {
5327 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5328 writer.write(&batch)?;
5329 }
5330
5331 writer.close()?;
5332 }
5333
5334 let mut res = HashMap::new();
5335 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5336 Ok(vec![res])
5337 }
5338
5339 async fn write_parquet_to_cloud(
5341 &self,
5342 dest_url: &str,
5343 rows: &[HashMap<String, uni_common::Value>],
5344 arrow_schema: &arrow_schema::Schema,
5345 ) -> Result<()> {
5346 use object_store::ObjectStoreExt;
5347
5348 let (store, path) = build_store_from_url(dest_url)?;
5349
5350 let mut buffer = Vec::new();
5352 {
5353 let mut writer = parquet::arrow::ArrowWriter::try_new(
5354 &mut buffer,
5355 Arc::new(arrow_schema.clone()),
5356 None,
5357 )?;
5358
5359 if !rows.is_empty() {
5360 let batch = self.rows_to_batch(rows, arrow_schema)?;
5361 writer.write(&batch)?;
5362 }
5363
5364 writer.close()?;
5365 }
5366
5367 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5369
5370 Ok(())
5371 }
5372
5373 pub(crate) fn rows_to_batch(
5374 &self,
5375 rows: &[HashMap<String, uni_common::Value>],
5376 schema: &arrow_schema::Schema,
5377 ) -> Result<RecordBatch> {
5378 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5379
5380 for field in schema.fields() {
5381 let name = field.name();
5382 let dt = field.data_type();
5383
5384 let values: Vec<uni_common::Value> = rows
5385 .iter()
5386 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5387 .collect();
5388 let array = self.values_to_array(&values, dt)?;
5389 columns.push(array);
5390 }
5391
5392 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5393 }
5394
5395 pub(crate) fn values_to_array(
5398 &self,
5399 values: &[uni_common::Value],
5400 dt: &arrow_schema::DataType,
5401 ) -> Result<Arc<dyn Array>> {
5402 arrow_convert::values_to_array(values, dt)
5403 }
5404
5405 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5406 match val {
5407 Value::Null => "".to_string(),
5408 Value::String(s) => s,
5409 Value::Int(i) => i.to_string(),
5410 Value::Float(f) => f.to_string(),
5411 Value::Bool(b) => b.to_string(),
5412 _ => format!("{val}"),
5413 }
5414 }
5415
5416 pub(crate) fn parse_csv_value(
5417 &self,
5418 s: &str,
5419 data_type: &uni_common::core::schema::DataType,
5420 prop_name: &str,
5421 ) -> Result<Value> {
5422 if s.is_empty() || s.to_lowercase() == "null" {
5423 return Ok(Value::Null);
5424 }
5425
5426 use uni_common::core::schema::DataType;
5427 match data_type {
5428 DataType::String => Ok(Value::String(s.to_string())),
5429 DataType::Int32 | DataType::Int64 => {
5430 let i = s.parse::<i64>().map_err(|_| {
5431 anyhow!(
5432 "Failed to parse integer for property '{}': {}",
5433 prop_name,
5434 s
5435 )
5436 })?;
5437 Ok(Value::Int(i))
5438 }
5439 DataType::Float32 | DataType::Float64 => {
5440 let f = s.parse::<f64>().map_err(|_| {
5441 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5442 })?;
5443 Ok(Value::Float(f))
5444 }
5445 DataType::Bool => {
5446 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5447 anyhow!(
5448 "Failed to parse boolean for property '{}': {}",
5449 prop_name,
5450 s
5451 )
5452 })?;
5453 Ok(Value::Bool(b))
5454 }
5455 DataType::CypherValue => {
5456 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5457 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5458 })?;
5459 Ok(Value::from(json_val))
5460 }
5461 DataType::Vector { .. } => {
5462 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5463 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5464 })?;
5465 Ok(Value::Vector(v))
5466 }
5467 _ => Ok(Value::String(s.to_string())),
5468 }
5469 }
5470
5471 pub(crate) async fn detach_delete_vertex(
5472 &self,
5473 vid: Vid,
5474 writer: &Writer,
5475 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5476 ) -> Result<()> {
5477 let schema = self.storage.schema_manager().schema();
5478 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5479
5480 let out_graph = self
5482 .storage
5483 .load_subgraph_cached(
5484 &[vid],
5485 &edge_type_ids,
5486 1,
5487 uni_store::runtime::Direction::Outgoing,
5488 Some(writer.l0_manager.get_current()),
5489 )
5490 .await?;
5491
5492 for edge in out_graph.edges() {
5493 writer
5494 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5495 .await?;
5496 }
5497
5498 let in_graph = self
5500 .storage
5501 .load_subgraph_cached(
5502 &[vid],
5503 &edge_type_ids,
5504 1,
5505 uni_store::runtime::Direction::Incoming,
5506 Some(writer.l0_manager.get_current()),
5507 )
5508 .await?;
5509
5510 for edge in in_graph.edges() {
5511 writer
5512 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5513 .await?;
5514 }
5515
5516 Ok(())
5517 }
5518
5519 async fn batch_load_incident_edges(
5530 &self,
5531 vids: &[Vid],
5532 writer: &Writer,
5533 ) -> Result<(
5534 uni_store::runtime::working_graph::WorkingGraph,
5535 uni_store::runtime::working_graph::WorkingGraph,
5536 )> {
5537 let schema = self.storage.schema_manager().schema();
5538 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5539 let l0 = Some(writer.l0_manager.get_current());
5540
5541 let out_graph = self
5542 .storage
5543 .load_subgraph_cached(
5544 vids,
5545 &edge_type_ids,
5546 1,
5547 uni_store::runtime::Direction::Outgoing,
5548 l0.clone(),
5549 )
5550 .await?;
5551 let in_graph = self
5552 .storage
5553 .load_subgraph_cached(
5554 vids,
5555 &edge_type_ids,
5556 1,
5557 uni_store::runtime::Direction::Incoming,
5558 l0,
5559 )
5560 .await?;
5561 Ok((out_graph, in_graph))
5562 }
5563
5564 pub(crate) async fn batch_detach_delete_vertices(
5566 &self,
5567 vids: &[Vid],
5568 labels_per_vid: Vec<Option<Vec<String>>>,
5569 writer: &Writer,
5570 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5571 ) -> Result<()> {
5572 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5573
5574 for edge in out_graph.edges() {
5575 writer
5576 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5577 .await?;
5578 }
5579 for edge in in_graph.edges() {
5580 writer
5581 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5582 .await?;
5583 }
5584
5585 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5586 writer.delete_vertex(*vid, labels, tx_l0).await?;
5587 }
5588
5589 Ok(())
5590 }
5591
5592 pub(crate) async fn batch_check_vertices_have_no_edges(
5610 &self,
5611 vids: &[Vid],
5612 writer: &Writer,
5613 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5614 ) -> Result<()> {
5615 if vids.is_empty() {
5616 return Ok(());
5617 }
5618
5619 let mut tombstoned_eids: std::collections::HashSet<uni_common::core::id::Eid> =
5620 std::collections::HashSet::new();
5621 {
5622 let writer_l0 = writer.l0_manager.get_current();
5623 let guard = writer_l0.read();
5624 for &eid in guard.tombstones.keys() {
5625 tombstoned_eids.insert(eid);
5626 }
5627 }
5628 if let Some(tx) = tx_l0 {
5629 let guard = tx.read();
5630 for &eid in guard.tombstones.keys() {
5631 tombstoned_eids.insert(eid);
5632 }
5633 }
5634
5635 let (out_graph, in_graph) = self.batch_load_incident_edges(vids, writer).await?;
5636
5637 for edge in out_graph.edges().chain(in_graph.edges()) {
5638 if !tombstoned_eids.contains(&edge.eid) {
5639 return Err(anyhow!(
5640 "ConstraintVerificationFailed: DeleteConnectedNode - Cannot delete node {}, because it still has relationships. To delete the node and its relationships, use DETACH DELETE.",
5641 edge.src_vid
5642 ));
5643 }
5644 }
5645 Ok(())
5646 }
5647}