1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73fn collect_l0_vids(
78 ctx: Option<&QueryContext>,
79 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81 let mut vids = Vec::new();
82 if let Some(ctx) = ctx {
83 vids.extend(extractor(&ctx.l0.read()));
84 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85 vids.extend(extractor(&tx_l0_arc.read()));
86 }
87 for pending_l0_arc in &ctx.pending_flush_l0s {
88 vids.extend(extractor(&pending_l0_arc.read()));
89 }
90 }
91 vids
92}
93
94async fn hydrate_entity_if_needed(
103 map: &mut HashMap<String, Value>,
104 prop_manager: &PropertyManager,
105 ctx: Option<&QueryContext>,
106) {
107 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110 tracing::debug!(
111 "Pushdown fallback: hydrating edge {} at execution time",
112 eid_u64
113 );
114 if let Ok(Some(props)) = prop_manager
115 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116 .await
117 {
118 for (key, value) in props {
119 map.entry(key).or_insert(value);
120 }
121 }
122 } else {
123 tracing::trace!(
124 "Pushdown success: edge {} already has {} properties",
125 eid_u64,
126 map.len() - EDGE_SYSTEM_FIELD_COUNT
127 );
128 }
129 return;
130 }
131
132 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135 tracing::debug!(
136 "Pushdown fallback: hydrating vertex {} at execution time",
137 vid_u64
138 );
139 if let Ok(Some(props)) = prop_manager
140 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141 .await
142 {
143 for (key, value) in props {
144 map.entry(key).or_insert(value);
145 }
146 }
147 } else {
148 tracing::trace!(
149 "Pushdown success: vertex {} already has {} properties",
150 vid_u64,
151 map.len() - VERTEX_SYSTEM_FIELD_COUNT
152 );
153 }
154 }
155}
156
157impl Executor {
158 async fn verify_and_filter_candidates(
163 &self,
164 mut candidates: Vec<Vid>,
165 variable: &str,
166 filter: Option<&Expr>,
167 ctx: Option<&QueryContext>,
168 prop_manager: &PropertyManager,
169 params: &HashMap<String, Value>,
170 ) -> Result<Vec<Vid>> {
171 candidates.sort_unstable();
172 candidates.dedup();
173
174 let mut verified_vids = Vec::new();
175 for vid in candidates {
176 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
177 continue; };
179
180 if let Some(expr) = filter {
181 let mut props_map: HashMap<String, Value> = props;
182 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
183
184 let mut row = HashMap::new();
185 row.insert(variable.to_string(), Value::Map(props_map));
186
187 let res = self
188 .evaluate_expr(expr, &row, prop_manager, params, ctx)
189 .await?;
190 if res.as_bool().unwrap_or(false) {
191 verified_vids.push(vid);
192 }
193 } else {
194 verified_vids.push(vid);
195 }
196 }
197
198 Ok(verified_vids)
199 }
200
201 pub(crate) async fn scan_storage_candidates(
202 &self,
203 label_id: u16,
204 variable: &str,
205 filter: Option<&Expr>,
206 ) -> Result<Vec<Vid>> {
207 let schema = self.storage.schema_manager().schema();
208 let label_name = schema
209 .label_name_by_id(label_id)
210 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
211
212 let empty_props = std::collections::HashMap::new();
214 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
215 let filter_sql = filter.and_then(|expr| {
216 LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
217 });
218
219 self.storage
220 .scan_vertex_candidates(label_name, filter_sql.as_deref())
221 .await
222 }
223
224 pub(crate) async fn scan_label_with_filter(
225 &self,
226 label_id: u16,
227 variable: &str,
228 filter: Option<&Expr>,
229 ctx: Option<&QueryContext>,
230 prop_manager: &PropertyManager,
231 params: &HashMap<String, Value>,
232 ) -> Result<Vec<Vid>> {
233 let mut candidates = self
234 .scan_storage_candidates(label_id, variable, filter)
235 .await?;
236
237 let schema = self.storage.schema_manager().schema();
239 if let Some(label_name) = schema.label_name_by_id(label_id) {
240 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
241 }
242
243 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
244 .await
245 }
246
247 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
248 if let Value::Node(node) = val {
250 return Ok(node.vid);
251 }
252 if let Value::Map(map) = val
254 && let Some(vid_val) = map.get("_vid")
255 && let Some(v) = vid_val.as_u64()
256 {
257 return Ok(Vid::from(v));
258 }
259 if let Some(s) = val.as_str()
261 && let Ok(id) = s.parse::<u64>()
262 {
263 return Ok(Vid::new(id));
264 }
265 if let Some(v) = val.as_u64() {
267 return Ok(Vid::from(v));
268 }
269 Err(anyhow!("Invalid Vid format: {:?}", val))
270 }
271
272 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
278 for val in row.values() {
279 if let Ok(vid) = Self::vid_from_value(val)
280 && vid == target_vid
281 {
282 return val.clone();
283 }
284 }
285 Value::Map(HashMap::from([(
287 "_vid".to_string(),
288 Value::Int(target_vid.as_u64() as i64),
289 )]))
290 }
291
292 pub async fn create_datafusion_planner(
297 &self,
298 prop_manager: &PropertyManager,
299 params: &HashMap<String, Value>,
300 ) -> Result<(
301 Arc<SyncRwLock<SessionContext>>,
302 HybridPhysicalPlanner,
303 Arc<PropertyManager>,
304 )> {
305 let query_ctx = self.get_context().await;
306 let l0_context = match query_ctx {
307 Some(ref ctx) => L0Context::from_query_context(ctx),
308 None => L0Context::empty(),
309 };
310
311 let prop_manager_arc = Arc::new(PropertyManager::new(
312 self.storage.clone(),
313 self.storage.schema_manager_arc(),
314 prop_manager.cache_size(),
315 ));
316
317 let session = SessionContext::new();
318 crate::query::df_udfs::register_cypher_udfs(&session)?;
319 if let Some(ref registry) = self.custom_function_registry {
320 crate::query::df_udfs::register_custom_udfs(&session, registry)?;
321 }
322 let session_ctx = Arc::new(SyncRwLock::new(session));
323
324 let mut planner = HybridPhysicalPlanner::with_l0_context(
325 session_ctx.clone(),
326 self.storage.clone(),
327 l0_context,
328 prop_manager_arc.clone(),
329 self.storage.schema_manager().schema(),
330 params.clone(),
331 HashMap::new(),
332 );
333
334 planner = planner.with_algo_registry(self.algo_registry.clone());
335 if let Some(ref registry) = self.procedure_registry {
336 planner = planner.with_procedure_registry(registry.clone());
337 }
338 if let Some(ref xervo_runtime) = self.xervo_runtime {
339 planner = planner.with_xervo_runtime(xervo_runtime.clone());
340 }
341
342 Ok((session_ctx, planner, prop_manager_arc))
343 }
344
345 pub fn collect_batches(
347 session_ctx: &Arc<SyncRwLock<SessionContext>>,
348 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
349 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
350 Box::pin(async move {
351 use futures::TryStreamExt;
352
353 let task_ctx = session_ctx.read().task_ctx();
354 let partition_count = execution_plan.output_partitioning().partition_count();
355 let mut all_batches = Vec::new();
356 for partition in 0..partition_count {
357 let stream = execution_plan.execute(partition, task_ctx.clone())?;
358 let batches: Vec<RecordBatch> = stream.try_collect().await?;
359 all_batches.extend(batches);
360 }
361 Ok(all_batches)
362 })
363 }
364
365 pub async fn execute_datafusion(
370 &self,
371 plan: LogicalPlan,
372 prop_manager: &PropertyManager,
373 params: &HashMap<String, Value>,
374 ) -> Result<Vec<RecordBatch>> {
375 let (batches, _plan) = self
376 .execute_datafusion_with_plan(plan, prop_manager, params)
377 .await?;
378 Ok(batches)
379 }
380
381 pub async fn execute_datafusion_with_plan(
388 &self,
389 plan: LogicalPlan,
390 prop_manager: &PropertyManager,
391 params: &HashMap<String, Value>,
392 ) -> Result<(
393 Vec<RecordBatch>,
394 Arc<dyn datafusion::physical_plan::ExecutionPlan>,
395 )> {
396 let (session_ctx, mut planner, prop_manager_arc) =
397 self.create_datafusion_planner(prop_manager, params).await?;
398
399 if Self::contains_write_operations(&plan) {
401 let writer = self
402 .writer
403 .as_ref()
404 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
405 .clone();
406 let query_ctx = self.get_context().await;
407
408 debug_assert!(
409 query_ctx.is_some(),
410 "BUG: query_ctx is None for write operation"
411 );
412
413 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
414 executor: self.clone(),
415 writer,
416 prop_manager: prop_manager_arc,
417 params: params.clone(),
418 query_ctx,
419 tx_l0_override: self.transaction_l0_override.clone(),
420 });
421 planner = planner.with_mutation_context(mutation_ctx);
422 tracing::debug!(
423 plan_type = Self::get_plan_type(&plan),
424 "Mutation routed to DataFusion engine"
425 );
426 }
427
428 let execution_plan = planner.plan(&plan)?;
429 let plan_clone = Arc::clone(&execution_plan);
430 let result = Self::collect_batches(&session_ctx, execution_plan).await;
431
432 let graph_warnings = planner.graph_ctx().take_warnings();
434 if !graph_warnings.is_empty()
435 && let Ok(mut w) = self.warnings.lock()
436 {
437 w.extend(graph_warnings);
438 }
439
440 result.map(|batches| (batches, plan_clone))
441 }
442
443 pub(crate) async fn execute_merge_read_plan(
449 &self,
450 plan: LogicalPlan,
451 prop_manager: &PropertyManager,
452 params: &HashMap<String, Value>,
453 merge_variables: Vec<String>,
454 ) -> Result<Vec<HashMap<String, Value>>> {
455 let (session_ctx, planner, _prop_manager_arc) =
456 self.create_datafusion_planner(prop_manager, params).await?;
457
458 let extra: HashMap<String, HashSet<String>> = merge_variables
461 .iter()
462 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
463 .collect();
464 let execution_plan = planner.plan_with_properties(&plan, extra)?;
465 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
466
467 let flat_rows = self.record_batches_to_rows(all_batches)?;
469
470 let rows = flat_rows
473 .into_iter()
474 .map(|mut row| {
475 for var in &merge_variables {
476 if row.contains_key(var) {
478 continue;
479 }
480 let prefix = format!("{}.", var);
481 let dotted_keys: Vec<String> = row
482 .keys()
483 .filter(|k| k.starts_with(&prefix))
484 .cloned()
485 .collect();
486 if !dotted_keys.is_empty() {
487 let mut map = HashMap::new();
488 for key in dotted_keys {
489 let prop_name = key[prefix.len()..].to_string();
490 if let Some(val) = row.remove(&key) {
491 map.insert(prop_name, val);
492 }
493 }
494 row.insert(var.clone(), Value::Map(map));
495 }
496 }
497 row
498 })
499 .collect();
500
501 Ok(rows)
502 }
503
504 pub(crate) fn record_batches_to_rows(
511 &self,
512 batches: Vec<RecordBatch>,
513 ) -> Result<Vec<HashMap<String, Value>>> {
514 let mut rows = Vec::new();
515
516 for batch in batches {
517 let num_rows = batch.num_rows();
518 let schema = batch.schema();
519
520 for row_idx in 0..num_rows {
521 let mut row = HashMap::new();
522
523 for (col_idx, field) in schema.fields().iter().enumerate() {
524 let column = batch.column(col_idx);
525 let data_type =
527 if uni_common::core::schema::is_datetime_struct(field.data_type()) {
528 Some(&uni_common::DataType::DateTime)
529 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
530 Some(&uni_common::DataType::Time)
531 } else {
532 None
533 };
534 let mut value =
535 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
536
537 if field
540 .metadata()
541 .get("cv_encoded")
542 .is_some_and(|v| v == "true")
543 && let Value::String(s) = &value
544 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
545 {
546 value = Value::from(parsed);
547 }
548
549 value = Self::normalize_path_if_needed(value);
551
552 row.insert(field.name().clone(), value);
553 }
554
555 let bare_vars: Vec<String> = row
564 .keys()
565 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
566 .cloned()
567 .collect();
568
569 let vid_placeholder_vars: Vec<String> = row
573 .keys()
574 .filter(|k| {
575 !k.contains('.')
576 && matches!(row.get(*k), Some(Value::String(_)))
577 && row.contains_key(&format!("{}._vid", k))
578 })
579 .cloned()
580 .collect();
581
582 for var in &vid_placeholder_vars {
583 let prefix = format!("{}.", var);
585 let mut map = HashMap::new();
586
587 let dotted_keys: Vec<String> = row
588 .keys()
589 .filter(|k| k.starts_with(&prefix))
590 .cloned()
591 .collect();
592
593 for key in &dotted_keys {
594 let prop_name = &key[prefix.len()..];
595 if let Some(val) = row.remove(key) {
596 map.insert(prop_name.to_string(), val);
597 }
598 }
599
600 row.insert(var.clone(), Value::Map(map));
602 }
603
604 for var in &bare_vars {
605 let vid_key = format!("{}._vid", var);
607 let labels_key = format!("{}._labels", var);
608
609 let vid_val = row.remove(&vid_key);
610 let labels_val = row.remove(&labels_key);
611
612 if let Some(Value::Map(map)) = row.get_mut(var) {
613 if let Some(v) = vid_val {
614 map.insert("_vid".to_string(), v);
615 }
616 if let Some(v) = labels_val {
617 map.insert("_labels".to_string(), v);
618 }
619 }
620
621 let eid_key = format!("{}._eid", var);
626 let type_key = format!("{}._type", var);
627
628 let eid_val = row.remove(&eid_key);
629 let type_val = row.remove(&type_key);
630
631 if (eid_val.is_some() || type_val.is_some())
632 && let Some(Value::Map(map)) = row.get_mut(var)
633 {
634 if let Some(v) = eid_val {
635 map.entry("_eid".to_string()).or_insert(v);
636 }
637 if let Some(v) = type_val {
638 map.entry("_type".to_string()).or_insert(v);
639 }
640 }
641
642 let prefix = format!("{}.", var);
644 let helper_keys: Vec<String> = row
645 .keys()
646 .filter(|k| k.starts_with(&prefix))
647 .cloned()
648 .collect();
649 for key in helper_keys {
650 row.remove(&key);
651 }
652 }
653
654 rows.push(row);
655 }
656 }
657
658 Ok(rows)
659 }
660
661 fn normalize_path_if_needed(value: Value) -> Value {
666 match value {
667 Value::Map(map)
668 if map.contains_key("nodes")
669 && (map.contains_key("relationships") || map.contains_key("edges")) =>
670 {
671 Self::normalize_path_map(map)
672 }
673 other => other,
674 }
675 }
676
677 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
679 if let Some(Value::List(nodes)) = map.remove("nodes") {
681 let normalized_nodes: Vec<Value> = nodes
682 .into_iter()
683 .map(|n| {
684 if let Value::Map(node_map) = n {
685 Self::normalize_path_node_map(node_map)
686 } else {
687 n
688 }
689 })
690 .collect();
691 map.insert("nodes".to_string(), Value::List(normalized_nodes));
692 }
693
694 let rels_key = if map.contains_key("relationships") {
696 "relationships"
697 } else {
698 "edges"
699 };
700 if let Some(Value::List(rels)) = map.remove(rels_key) {
701 let normalized_rels: Vec<Value> = rels
702 .into_iter()
703 .map(|r| {
704 if let Value::Map(rel_map) = r {
705 Self::normalize_path_edge_map(rel_map)
706 } else {
707 r
708 }
709 })
710 .collect();
711 map.insert("relationships".to_string(), Value::List(normalized_rels));
712 }
713
714 Value::Map(map)
715 }
716
717 fn value_to_id_string(val: Value) -> String {
719 match val {
720 Value::Int(n) => n.to_string(),
721 Value::Float(n) => n.to_string(),
722 Value::String(s) => s,
723 other => other.to_string(),
724 }
725 }
726
727 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
730 if let Some(val) = map.remove(src_key) {
731 map.insert(
732 dst_key.to_string(),
733 Value::String(Self::value_to_id_string(val)),
734 );
735 }
736 }
737
738 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
740 match map.get("properties") {
741 Some(props) if !props.is_null() => {}
742 _ => {
743 map.insert("properties".to_string(), Value::Map(HashMap::new()));
744 }
745 }
746 }
747
748 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
750 Self::stringify_map_field(&mut map, "_vid", "_id");
751 Self::ensure_properties_map(&mut map);
752 Value::Map(map)
753 }
754
755 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
757 Self::stringify_map_field(&mut map, "_eid", "_id");
758 Self::stringify_map_field(&mut map, "_src", "_src");
759 Self::stringify_map_field(&mut map, "_dst", "_dst");
760
761 if let Some(type_name) = map.remove("_type_name") {
762 map.insert("_type".to_string(), type_name);
763 }
764
765 Self::ensure_properties_map(&mut map);
766 Value::Map(map)
767 }
768
769 #[instrument(
770 skip(self, prop_manager, params),
771 fields(rows_returned, duration_ms),
772 level = "info"
773 )]
774 pub fn execute<'a>(
775 &'a self,
776 plan: LogicalPlan,
777 prop_manager: &'a PropertyManager,
778 params: &'a HashMap<String, Value>,
779 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
780 Box::pin(async move {
781 let query_type = Self::get_plan_type(&plan);
782 let ctx = self.get_context().await;
783 let start = Instant::now();
784
785 let res = if Self::is_ddl_or_admin(&plan) {
788 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
789 .await
790 } else {
791 let batches = self
792 .execute_datafusion(plan.clone(), prop_manager, params)
793 .await?;
794 self.record_batches_to_rows(batches)
795 };
796
797 let duration = start.elapsed();
798 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
799 .record(duration.as_secs_f64());
800
801 tracing::Span::current().record("duration_ms", duration.as_millis());
802 match &res {
803 Ok(rows) => {
804 tracing::Span::current().record("rows_returned", rows.len());
805 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
806 .increment(rows.len() as u64);
807 }
808 Err(e) => {
809 let error_type = if e.to_string().contains("timed out") {
810 "timeout"
811 } else if e.to_string().contains("syntax") {
812 "syntax"
813 } else {
814 "execution"
815 };
816 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
817 }
818 }
819
820 res
821 })
822 }
823
824 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
825 match plan {
826 LogicalPlan::Scan { .. } => "read_scan",
827 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
828 LogicalPlan::Traverse { .. } => "read_traverse",
829 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
830 LogicalPlan::ScanAll { .. } => "read_scan_all",
831 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
832 LogicalPlan::VectorKnn { .. } => "read_vector",
833 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
834 LogicalPlan::Merge { .. } => "write_merge",
835 LogicalPlan::Delete { .. } => "write_delete",
836 LogicalPlan::Set { .. } => "write_set",
837 LogicalPlan::Remove { .. } => "write_remove",
838 LogicalPlan::ProcedureCall { .. } => "call",
839 LogicalPlan::Copy { .. } => "copy",
840 LogicalPlan::Backup { .. } => "backup",
841 _ => "other",
842 }
843 }
844
845 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
855 match plan {
856 LogicalPlan::Project { input, .. }
858 | LogicalPlan::Sort { input, .. }
859 | LogicalPlan::Limit { input, .. }
860 | LogicalPlan::Distinct { input }
861 | LogicalPlan::Aggregate { input, .. }
862 | LogicalPlan::Window { input, .. }
863 | LogicalPlan::Unwind { input, .. }
864 | LogicalPlan::Filter { input, .. }
865 | LogicalPlan::Create { input, .. }
866 | LogicalPlan::CreateBatch { input, .. }
867 | LogicalPlan::Set { input, .. }
868 | LogicalPlan::Remove { input, .. }
869 | LogicalPlan::Delete { input, .. }
870 | LogicalPlan::Merge { input, .. }
871 | LogicalPlan::Foreach { input, .. }
872 | LogicalPlan::Traverse { input, .. }
873 | LogicalPlan::TraverseMainByType { input, .. }
874 | LogicalPlan::BindZeroLengthPath { input, .. }
875 | LogicalPlan::BindPath { input, .. }
876 | LogicalPlan::ShortestPath { input, .. }
877 | LogicalPlan::AllShortestPaths { input, .. }
878 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
879
880 LogicalPlan::Apply {
882 input, subquery, ..
883 }
884 | LogicalPlan::SubqueryCall { input, subquery } => {
885 vec![input.as_ref(), subquery.as_ref()]
886 }
887 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
888 vec![left.as_ref(), right.as_ref()]
889 }
890 LogicalPlan::RecursiveCTE {
891 initial, recursive, ..
892 } => vec![initial.as_ref(), recursive.as_ref()],
893 LogicalPlan::QuantifiedPattern {
894 input,
895 pattern_plan,
896 ..
897 } => vec![input.as_ref(), pattern_plan.as_ref()],
898
899 _ => vec![],
901 }
902 }
903
904 pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
911 match plan {
912 LogicalPlan::CreateLabel(_)
914 | LogicalPlan::CreateEdgeType(_)
915 | LogicalPlan::AlterLabel(_)
916 | LogicalPlan::AlterEdgeType(_)
917 | LogicalPlan::DropLabel(_)
918 | LogicalPlan::DropEdgeType(_)
919 | LogicalPlan::CreateConstraint(_)
920 | LogicalPlan::DropConstraint(_)
921 | LogicalPlan::ShowConstraints(_) => true,
922
923 LogicalPlan::CreateVectorIndex { .. }
925 | LogicalPlan::CreateFullTextIndex { .. }
926 | LogicalPlan::CreateScalarIndex { .. }
927 | LogicalPlan::CreateJsonFtsIndex { .. }
928 | LogicalPlan::DropIndex { .. }
929 | LogicalPlan::ShowIndexes { .. } => true,
930
931 LogicalPlan::ShowDatabase
933 | LogicalPlan::ShowConfig
934 | LogicalPlan::ShowStatistics
935 | LogicalPlan::Vacuum
936 | LogicalPlan::Checkpoint
937 | LogicalPlan::Copy { .. }
938 | LogicalPlan::CopyTo { .. }
939 | LogicalPlan::CopyFrom { .. }
940 | LogicalPlan::Backup { .. }
941 | LogicalPlan::Explain { .. } => true,
942
943 LogicalPlan::ProcedureCall { procedure_name, .. } => {
946 !Self::is_df_eligible_procedure(procedure_name)
947 }
948
949 _ => Self::plan_children(plan)
951 .iter()
952 .any(|child| Self::is_ddl_or_admin(child)),
953 }
954 }
955
956 fn is_df_eligible_procedure(name: &str) -> bool {
962 matches!(
963 name,
964 "uni.schema.labels"
965 | "uni.schema.edgeTypes"
966 | "uni.schema.relationshipTypes"
967 | "uni.schema.indexes"
968 | "uni.schema.constraints"
969 | "uni.schema.labelInfo"
970 | "uni.vector.query"
971 | "uni.fts.query"
972 | "uni.search"
973 ) || name.starts_with("uni.algo.")
974 }
975
976 fn contains_write_operations(plan: &LogicalPlan) -> bool {
983 match plan {
984 LogicalPlan::Create { .. }
985 | LogicalPlan::CreateBatch { .. }
986 | LogicalPlan::Merge { .. }
987 | LogicalPlan::Delete { .. }
988 | LogicalPlan::Set { .. }
989 | LogicalPlan::Remove { .. }
990 | LogicalPlan::Foreach { .. } => true,
991 _ => Self::plan_children(plan)
992 .iter()
993 .any(|child| Self::contains_write_operations(child)),
994 }
995 }
996
997 pub fn execute_stream(
1002 self,
1003 plan: LogicalPlan,
1004 prop_manager: Arc<PropertyManager>,
1005 params: HashMap<String, Value>,
1006 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1007 let this = self;
1008 let this_for_ctx = this.clone();
1009
1010 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1011
1012 ctx_stream
1013 .flat_map(move |ctx| {
1014 let plan = plan.clone();
1015 let this = this.clone();
1016 let prop_manager = prop_manager.clone();
1017 let params = params.clone();
1018
1019 let fut = async move {
1020 if Self::is_ddl_or_admin(&plan) {
1021 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1022 .await
1023 } else {
1024 let batches = this
1025 .execute_datafusion(plan, &prop_manager, ¶ms)
1026 .await?;
1027 this.record_batches_to_rows(batches)
1028 }
1029 };
1030 stream::once(fut).boxed()
1031 })
1032 .boxed()
1033 }
1034
1035 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1038 arrow_convert::arrow_to_value(col, row, None)
1039 }
1040
1041 pub(crate) fn evaluate_expr<'a>(
1042 &'a self,
1043 expr: &'a Expr,
1044 row: &'a HashMap<String, Value>,
1045 prop_manager: &'a PropertyManager,
1046 params: &'a HashMap<String, Value>,
1047 ctx: Option<&'a QueryContext>,
1048 ) -> BoxFuture<'a, Result<Value>> {
1049 let this = self;
1050 Box::pin(async move {
1051 let repr = expr.to_string_repr();
1053 if let Some(val) = row.get(&repr) {
1054 return Ok(val.clone());
1055 }
1056
1057 match expr {
1058 Expr::PatternComprehension { .. } => {
1059 Err(anyhow::anyhow!(
1061 "Pattern comprehensions are handled by DataFusion executor"
1062 ))
1063 }
1064 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1065 "COLLECT subqueries not yet supported in executor"
1066 )),
1067 Expr::Variable(name) => {
1068 if let Some(val) = row.get(name) {
1069 Ok(val.clone())
1070 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1071 Ok(vid_val.clone())
1075 } else {
1076 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1077 }
1078 }
1079 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1080 Expr::Property(var_expr, prop_name) => {
1081 if let Expr::Variable(var_name) = var_expr.as_ref() {
1085 let flat_key = format!("{}.{}", var_name, prop_name);
1086 if let Some(val) = row.get(flat_key.as_str()) {
1087 return Ok(val.clone());
1088 }
1089 }
1090
1091 let base_val = this
1092 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1093 .await?;
1094
1095 if (prop_name == "_vid" || prop_name == "_id")
1097 && let Ok(vid) = Self::vid_from_value(&base_val)
1098 {
1099 return Ok(Value::Int(vid.as_u64() as i64));
1100 }
1101
1102 if let Value::Node(node) = &base_val {
1104 if prop_name == "_vid" || prop_name == "_id" {
1106 return Ok(Value::Int(node.vid.as_u64() as i64));
1107 }
1108 if prop_name == "_labels" {
1109 return Ok(Value::List(
1110 node.labels
1111 .iter()
1112 .map(|l| Value::String(l.clone()))
1113 .collect(),
1114 ));
1115 }
1116 if let Some(val) = node.properties.get(prop_name.as_str()) {
1118 return Ok(val.clone());
1119 }
1120 if let Ok(val) = prop_manager
1122 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1123 .await
1124 {
1125 return Ok(val);
1126 }
1127 return Ok(Value::Null);
1128 }
1129
1130 if let Value::Edge(edge) = &base_val {
1132 if prop_name == "_eid" || prop_name == "_id" {
1134 return Ok(Value::Int(edge.eid.as_u64() as i64));
1135 }
1136 if prop_name == "_type" {
1137 return Ok(Value::String(edge.edge_type.clone()));
1138 }
1139 if prop_name == "_src" {
1140 return Ok(Value::Int(edge.src.as_u64() as i64));
1141 }
1142 if prop_name == "_dst" {
1143 return Ok(Value::Int(edge.dst.as_u64() as i64));
1144 }
1145 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1147 return Ok(val.clone());
1148 }
1149 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1151 {
1152 return Ok(val);
1153 }
1154 return Ok(Value::Null);
1155 }
1156
1157 if let Value::Map(map) = &base_val {
1160 if let Some(val) = map.get(prop_name.as_str()) {
1162 return Ok(val.clone());
1163 }
1164 if let Some(Value::Map(props)) = map.get("properties")
1166 && let Some(val) = props.get(prop_name.as_str())
1167 {
1168 return Ok(val.clone());
1169 }
1170 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1172 map.get("_id")
1173 .and_then(|v| v.as_str())
1174 .and_then(|s| s.parse::<u64>().ok())
1175 });
1176 if let Some(id) = vid_opt {
1177 let vid = Vid::from(id);
1178 if let Ok(val) = prop_manager
1179 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1180 .await
1181 {
1182 return Ok(val);
1183 }
1184 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1185 let eid = uni_common::core::id::Eid::from(id);
1186 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1187 return Ok(val);
1188 }
1189 }
1190 return Ok(Value::Null);
1191 }
1192
1193 if let Ok(vid) = Self::vid_from_value(&base_val) {
1195 return prop_manager
1196 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1197 .await;
1198 }
1199
1200 if base_val.is_null() {
1201 return Ok(Value::Null);
1202 }
1203
1204 {
1206 use crate::query::datetime::{
1207 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1208 is_duration_string, is_temporal_accessor, is_temporal_string,
1209 };
1210
1211 if let Value::Temporal(tv) = &base_val {
1213 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1214 if is_duration_accessor(prop_name) {
1215 return eval_duration_accessor(
1217 &base_val.to_string(),
1218 prop_name,
1219 );
1220 }
1221 } else if is_temporal_accessor(prop_name) {
1222 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1223 }
1224 }
1225
1226 if let Value::String(s) = &base_val {
1228 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1229 return eval_temporal_accessor(s, prop_name);
1230 }
1231 if is_duration_string(s) && is_duration_accessor(prop_name) {
1232 return eval_duration_accessor(s, prop_name);
1233 }
1234 }
1235 }
1236
1237 Err(anyhow!(
1238 "Cannot access property '{}' on {:?}",
1239 prop_name,
1240 base_val
1241 ))
1242 }
1243 Expr::ArrayIndex {
1244 array: arr_expr,
1245 index: idx_expr,
1246 } => {
1247 let arr_val = this
1248 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1249 .await?;
1250 let idx_val = this
1251 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1252 .await?;
1253
1254 if let Value::List(arr) = &arr_val {
1255 if let Some(i) = idx_val.as_i64() {
1257 let idx = if i < 0 {
1258 let positive_idx = arr.len() as i64 + i;
1260 if positive_idx < 0 {
1261 return Ok(Value::Null); }
1263 positive_idx as usize
1264 } else {
1265 i as usize
1266 };
1267 if idx < arr.len() {
1268 return Ok(arr[idx].clone());
1269 }
1270 return Ok(Value::Null);
1271 } else if idx_val.is_null() {
1272 return Ok(Value::Null);
1273 } else {
1274 return Err(anyhow::anyhow!(
1275 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1276 idx_val
1277 ));
1278 }
1279 }
1280 if let Value::Map(map) = &arr_val {
1281 if let Some(key) = idx_val.as_str() {
1282 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1283 } else if !idx_val.is_null() {
1284 return Err(anyhow::anyhow!(
1285 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1286 idx_val
1287 ));
1288 }
1289 }
1290 if let Value::Node(node) = &arr_val {
1292 if let Some(key) = idx_val.as_str() {
1293 if let Some(val) = node.properties.get(key) {
1295 return Ok(val.clone());
1296 }
1297 if let Ok(val) = prop_manager
1299 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1300 .await
1301 {
1302 return Ok(val);
1303 }
1304 return Ok(Value::Null);
1305 } else if !idx_val.is_null() {
1306 return Err(anyhow::anyhow!(
1307 "TypeError: Node index must be a string, got: {:?}",
1308 idx_val
1309 ));
1310 }
1311 }
1312 if let Value::Edge(edge) = &arr_val {
1314 if let Some(key) = idx_val.as_str() {
1315 if let Some(val) = edge.properties.get(key) {
1317 return Ok(val.clone());
1318 }
1319 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1321 return Ok(val);
1322 }
1323 return Ok(Value::Null);
1324 } else if !idx_val.is_null() {
1325 return Err(anyhow::anyhow!(
1326 "TypeError: Edge index must be a string, got: {:?}",
1327 idx_val
1328 ));
1329 }
1330 }
1331 if let Ok(vid) = Self::vid_from_value(&arr_val)
1333 && let Some(key) = idx_val.as_str()
1334 {
1335 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1336 {
1337 return Ok(val);
1338 }
1339 return Ok(Value::Null);
1340 }
1341 if arr_val.is_null() {
1342 return Ok(Value::Null);
1343 }
1344 Err(anyhow!(
1345 "TypeError: InvalidArgumentType - cannot index into {:?}",
1346 arr_val
1347 ))
1348 }
1349 Expr::ArraySlice { array, start, end } => {
1350 let arr_val = this
1351 .evaluate_expr(array, row, prop_manager, params, ctx)
1352 .await?;
1353
1354 if let Value::List(arr) = &arr_val {
1355 let len = arr.len();
1356
1357 let start_idx = if let Some(s) = start {
1359 let v = this
1360 .evaluate_expr(s, row, prop_manager, params, ctx)
1361 .await?;
1362 if v.is_null() {
1363 return Ok(Value::Null);
1364 }
1365 let raw = v.as_i64().unwrap_or(0);
1366 if raw < 0 {
1367 (len as i64 + raw).max(0) as usize
1368 } else {
1369 (raw as usize).min(len)
1370 }
1371 } else {
1372 0
1373 };
1374
1375 let end_idx = if let Some(e) = end {
1377 let v = this
1378 .evaluate_expr(e, row, prop_manager, params, ctx)
1379 .await?;
1380 if v.is_null() {
1381 return Ok(Value::Null);
1382 }
1383 let raw = v.as_i64().unwrap_or(len as i64);
1384 if raw < 0 {
1385 (len as i64 + raw).max(0) as usize
1386 } else {
1387 (raw as usize).min(len)
1388 }
1389 } else {
1390 len
1391 };
1392
1393 if start_idx >= end_idx {
1395 return Ok(Value::List(vec![]));
1396 }
1397 let end_idx = end_idx.min(len);
1398 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1399 }
1400
1401 if arr_val.is_null() {
1402 return Ok(Value::Null);
1403 }
1404 Err(anyhow!("Cannot slice {:?}", arr_val))
1405 }
1406 Expr::Literal(lit) => Ok(lit.to_value()),
1407 Expr::List(items) => {
1408 let mut vals = Vec::new();
1409 for item in items {
1410 vals.push(
1411 this.evaluate_expr(item, row, prop_manager, params, ctx)
1412 .await?,
1413 );
1414 }
1415 Ok(Value::List(vals))
1416 }
1417 Expr::Map(items) => {
1418 let mut map = HashMap::new();
1419 for (key, value_expr) in items {
1420 let val = this
1421 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1422 .await?;
1423 map.insert(key.clone(), val);
1424 }
1425 Ok(Value::Map(map))
1426 }
1427 Expr::Exists { query, .. } => {
1428 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1430 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1431
1432 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1433 Ok(plan) => {
1434 let mut sub_params = params.clone();
1435 sub_params.extend(row.clone());
1436
1437 match this.execute(plan, prop_manager, &sub_params).await {
1438 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1439 Err(e) => {
1440 log::debug!("EXISTS subquery execution failed: {}", e);
1441 Ok(Value::Bool(false))
1442 }
1443 }
1444 }
1445 Err(e) => {
1446 log::debug!("EXISTS subquery planning failed: {}", e);
1447 Ok(Value::Bool(false))
1448 }
1449 }
1450 }
1451 Expr::CountSubquery(query) => {
1452 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1454
1455 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1456
1457 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1458 Ok(plan) => {
1459 let mut sub_params = params.clone();
1460 sub_params.extend(row.clone());
1461
1462 match this.execute(plan, prop_manager, &sub_params).await {
1463 Ok(results) => Ok(Value::from(results.len() as i64)),
1464 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1465 }
1466 }
1467 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1468 }
1469 }
1470 Expr::Quantifier {
1471 quantifier,
1472 variable,
1473 list,
1474 predicate,
1475 } => {
1476 let list_val = this
1490 .evaluate_expr(list, row, prop_manager, params, ctx)
1491 .await?;
1492
1493 if list_val.is_null() {
1495 return Ok(Value::Null);
1496 }
1497
1498 let items = match list_val {
1500 Value::List(arr) => arr,
1501 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1502 };
1503
1504 let mut satisfied_count = 0;
1506 for item in &items {
1507 let mut item_row = row.clone();
1509 item_row.insert(variable.clone(), item.clone());
1510
1511 let pred_result = this
1513 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1514 .await?;
1515
1516 if let Value::Bool(true) = pred_result {
1518 satisfied_count += 1;
1519 }
1520 }
1521
1522 let result = match quantifier {
1524 Quantifier::All => satisfied_count == items.len(),
1525 Quantifier::Any => satisfied_count > 0,
1526 Quantifier::Single => satisfied_count == 1,
1527 Quantifier::None => satisfied_count == 0,
1528 };
1529
1530 Ok(Value::Bool(result))
1531 }
1532 Expr::ListComprehension {
1533 variable,
1534 list,
1535 where_clause,
1536 map_expr,
1537 } => {
1538 let list_val = this
1545 .evaluate_expr(list, row, prop_manager, params, ctx)
1546 .await?;
1547
1548 if list_val.is_null() {
1550 return Ok(Value::Null);
1551 }
1552
1553 let items = match list_val {
1555 Value::List(arr) => arr,
1556 _ => {
1557 return Err(anyhow!(
1558 "List comprehension expects a list, got: {:?}",
1559 list_val
1560 ));
1561 }
1562 };
1563
1564 let mut results = Vec::new();
1566 for item in &items {
1567 let mut item_row = row.clone();
1569 item_row.insert(variable.clone(), item.clone());
1570
1571 if let Some(predicate) = where_clause {
1573 let pred_result = this
1574 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1575 .await?;
1576
1577 if !matches!(pred_result, Value::Bool(true)) {
1579 continue;
1580 }
1581 }
1582
1583 let mapped_val = this
1585 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1586 .await?;
1587 results.push(mapped_val);
1588 }
1589
1590 Ok(Value::List(results))
1591 }
1592 Expr::BinaryOp { left, op, right } => {
1593 match op {
1595 BinaryOp::And => {
1596 let l_val = this
1597 .evaluate_expr(left, row, prop_manager, params, ctx)
1598 .await?;
1599 if let Some(false) = l_val.as_bool() {
1601 return Ok(Value::Bool(false));
1602 }
1603 let r_val = this
1604 .evaluate_expr(right, row, prop_manager, params, ctx)
1605 .await?;
1606 eval_binary_op(&l_val, op, &r_val)
1607 }
1608 BinaryOp::Or => {
1609 let l_val = this
1610 .evaluate_expr(left, row, prop_manager, params, ctx)
1611 .await?;
1612 if let Some(true) = l_val.as_bool() {
1614 return Ok(Value::Bool(true));
1615 }
1616 let r_val = this
1617 .evaluate_expr(right, row, prop_manager, params, ctx)
1618 .await?;
1619 eval_binary_op(&l_val, op, &r_val)
1620 }
1621 _ => {
1622 let l_val = this
1624 .evaluate_expr(left, row, prop_manager, params, ctx)
1625 .await?;
1626 let r_val = this
1627 .evaluate_expr(right, row, prop_manager, params, ctx)
1628 .await?;
1629 eval_binary_op(&l_val, op, &r_val)
1630 }
1631 }
1632 }
1633 Expr::In { expr, list } => {
1634 let l_val = this
1635 .evaluate_expr(expr, row, prop_manager, params, ctx)
1636 .await?;
1637 let r_val = this
1638 .evaluate_expr(list, row, prop_manager, params, ctx)
1639 .await?;
1640 eval_in_op(&l_val, &r_val)
1641 }
1642 Expr::UnaryOp { op, expr } => {
1643 let val = this
1644 .evaluate_expr(expr, row, prop_manager, params, ctx)
1645 .await?;
1646 match op {
1647 UnaryOp::Not => {
1648 match val.as_bool() {
1650 Some(b) => Ok(Value::Bool(!b)),
1651 None if val.is_null() => Ok(Value::Null),
1652 None => Err(anyhow!(
1653 "InvalidArgumentType: NOT requires a boolean argument"
1654 )),
1655 }
1656 }
1657 UnaryOp::Neg => {
1658 if let Some(i) = val.as_i64() {
1659 Ok(Value::Int(-i))
1660 } else if let Some(f) = val.as_f64() {
1661 Ok(Value::Float(-f))
1662 } else {
1663 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1664 }
1665 }
1666 }
1667 }
1668 Expr::IsNull(expr) => {
1669 let val = this
1670 .evaluate_expr(expr, row, prop_manager, params, ctx)
1671 .await?;
1672 Ok(Value::Bool(val.is_null()))
1673 }
1674 Expr::IsNotNull(expr) => {
1675 let val = this
1676 .evaluate_expr(expr, row, prop_manager, params, ctx)
1677 .await?;
1678 Ok(Value::Bool(!val.is_null()))
1679 }
1680 Expr::IsUnique(_) => {
1681 Err(anyhow!(
1683 "IS UNIQUE can only be used in constraint definitions"
1684 ))
1685 }
1686 Expr::Case {
1687 expr,
1688 when_then,
1689 else_expr,
1690 } => {
1691 if let Some(base_expr) = expr {
1692 let base_val = this
1693 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1694 .await?;
1695 for (w, t) in when_then {
1696 let w_val = this
1697 .evaluate_expr(w, row, prop_manager, params, ctx)
1698 .await?;
1699 if base_val == w_val {
1700 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1701 }
1702 }
1703 } else {
1704 for (w, t) in when_then {
1705 let w_val = this
1706 .evaluate_expr(w, row, prop_manager, params, ctx)
1707 .await?;
1708 if w_val.as_bool() == Some(true) {
1709 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1710 }
1711 }
1712 }
1713 if let Some(e) = else_expr {
1714 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1715 }
1716 Ok(Value::Null)
1717 }
1718 Expr::Wildcard => Ok(Value::Null),
1719 Expr::FunctionCall { name, args, .. } => {
1720 if name.eq_ignore_ascii_case("ID") {
1722 if args.len() != 1 {
1723 return Err(anyhow!("id() requires exactly 1 argument"));
1724 }
1725 let val = this
1726 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1727 .await?;
1728 if let Value::Map(map) = &val {
1729 if let Some(vid_val) = map.get("_vid") {
1731 return Ok(vid_val.clone());
1732 }
1733 if let Some(eid_val) = map.get("_eid") {
1735 return Ok(eid_val.clone());
1736 }
1737 if let Some(id_val) = map.get("_id") {
1739 return Ok(id_val.clone());
1740 }
1741 }
1742 return Ok(Value::Null);
1743 }
1744
1745 if name.eq_ignore_ascii_case("ELEMENTID") {
1747 if args.len() != 1 {
1748 return Err(anyhow!("elementId() requires exactly 1 argument"));
1749 }
1750 let val = this
1751 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1752 .await?;
1753 if let Value::Map(map) = &val {
1754 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1757 return Ok(Value::String(vid_val.to_string()));
1758 }
1759 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1762 return Ok(Value::String(eid_val.to_string()));
1763 }
1764 }
1765 return Ok(Value::Null);
1766 }
1767
1768 if name.eq_ignore_ascii_case("TYPE") {
1770 if args.len() != 1 {
1771 return Err(anyhow!("type() requires exactly 1 argument"));
1772 }
1773 let val = this
1774 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1775 .await?;
1776 if let Value::Map(map) = &val
1777 && let Some(type_val) = map.get("_type")
1778 {
1779 if let Some(type_id) =
1781 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1782 {
1783 if let Some(name) = this
1784 .storage
1785 .schema_manager()
1786 .edge_type_name_by_id_unified(type_id)
1787 {
1788 return Ok(Value::String(name));
1789 }
1790 } else if let Some(name) = type_val.as_str() {
1791 return Ok(Value::String(name.to_string()));
1792 }
1793 }
1794 return Ok(Value::Null);
1795 }
1796
1797 if name.eq_ignore_ascii_case("LABELS") {
1799 if args.len() != 1 {
1800 return Err(anyhow!("labels() requires exactly 1 argument"));
1801 }
1802 let val = this
1803 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1804 .await?;
1805 if let Value::Map(map) = &val
1806 && let Some(labels_val) = map.get("_labels")
1807 {
1808 return Ok(labels_val.clone());
1809 }
1810 return Ok(Value::Null);
1811 }
1812
1813 if name.eq_ignore_ascii_case("PROPERTIES") {
1815 if args.len() != 1 {
1816 return Err(anyhow!("properties() requires exactly 1 argument"));
1817 }
1818 let val = this
1819 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1820 .await?;
1821 if let Value::Map(map) = &val {
1822 let mut props = HashMap::new();
1824 for (k, v) in map.iter() {
1825 if !k.starts_with('_') {
1826 props.insert(k.clone(), v.clone());
1827 }
1828 }
1829 return Ok(Value::Map(props));
1830 }
1831 return Ok(Value::Null);
1832 }
1833
1834 if name.eq_ignore_ascii_case("STARTNODE") {
1836 if args.len() != 1 {
1837 return Err(anyhow!("startNode() requires exactly 1 argument"));
1838 }
1839 let val = this
1840 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1841 .await?;
1842 if let Value::Edge(edge) = &val {
1843 return Ok(Self::find_node_by_vid(row, edge.src));
1844 }
1845 if let Value::Map(map) = &val {
1846 if let Some(start_node) = map.get("_startNode") {
1847 return Ok(start_node.clone());
1848 }
1849 if let Some(src_vid) = map.get("_src_vid") {
1850 return Ok(Value::Map(HashMap::from([(
1851 "_vid".to_string(),
1852 src_vid.clone(),
1853 )])));
1854 }
1855 if let Some(src_id) = map.get("_src")
1857 && let Some(u) = src_id.as_u64()
1858 {
1859 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1860 }
1861 }
1862 return Ok(Value::Null);
1863 }
1864
1865 if name.eq_ignore_ascii_case("ENDNODE") {
1867 if args.len() != 1 {
1868 return Err(anyhow!("endNode() requires exactly 1 argument"));
1869 }
1870 let val = this
1871 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1872 .await?;
1873 if let Value::Edge(edge) = &val {
1874 return Ok(Self::find_node_by_vid(row, edge.dst));
1875 }
1876 if let Value::Map(map) = &val {
1877 if let Some(end_node) = map.get("_endNode") {
1878 return Ok(end_node.clone());
1879 }
1880 if let Some(dst_vid) = map.get("_dst_vid") {
1881 return Ok(Value::Map(HashMap::from([(
1882 "_vid".to_string(),
1883 dst_vid.clone(),
1884 )])));
1885 }
1886 if let Some(dst_id) = map.get("_dst")
1888 && let Some(u) = dst_id.as_u64()
1889 {
1890 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1891 }
1892 }
1893 return Ok(Value::Null);
1894 }
1895
1896 if name.eq_ignore_ascii_case("HASLABEL") {
1899 if args.len() != 2 {
1900 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1901 }
1902 let node_val = this
1903 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1904 .await?;
1905 let label_val = this
1906 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1907 .await?;
1908
1909 let label_to_check = label_val.as_str().ok_or_else(|| {
1910 anyhow!("Second argument to hasLabel must be a string")
1911 })?;
1912
1913 let has_label = match &node_val {
1914 Value::Map(map) if map.contains_key("_vid") => {
1916 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1917 labels_arr
1918 .iter()
1919 .any(|l| l.as_str() == Some(label_to_check))
1920 } else {
1921 false
1922 }
1923 }
1924 Value::Map(map) => {
1926 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1927 labels_arr
1928 .iter()
1929 .any(|l| l.as_str() == Some(label_to_check))
1930 } else {
1931 false
1932 }
1933 }
1934 _ => false,
1935 };
1936 return Ok(Value::Bool(has_label));
1937 }
1938
1939 if matches!(
1942 name.to_uppercase().as_str(),
1943 "ANY" | "ALL" | "NONE" | "SINGLE"
1944 ) {
1945 return Err(anyhow!(
1946 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1947 name.to_lowercase()
1948 ));
1949 }
1950
1951 if name.eq_ignore_ascii_case("COALESCE") {
1953 for arg in args {
1954 let val = this
1955 .evaluate_expr(arg, row, prop_manager, params, ctx)
1956 .await?;
1957 if !val.is_null() {
1958 return Ok(val);
1959 }
1960 }
1961 return Ok(Value::Null);
1962 }
1963
1964 if name.eq_ignore_ascii_case("vector_similarity") {
1966 if args.len() != 2 {
1967 return Err(anyhow!("vector_similarity takes 2 arguments"));
1968 }
1969 let v1 = this
1970 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1971 .await?;
1972 let v2 = this
1973 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1974 .await?;
1975 return eval_vector_similarity(&v1, &v2);
1976 }
1977
1978 if name.eq_ignore_ascii_case("uni.temporal.validAt")
1980 || name.eq_ignore_ascii_case("uni.validAt")
1981 || name.eq_ignore_ascii_case("validAt")
1982 {
1983 if args.len() != 4 {
1984 return Err(anyhow!("validAt requires 4 arguments"));
1985 }
1986 let node_val = this
1987 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1988 .await?;
1989 let start_prop = this
1990 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1991 .await?
1992 .as_str()
1993 .ok_or(anyhow!("start_prop must be string"))?
1994 .to_string();
1995 let end_prop = this
1996 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
1997 .await?
1998 .as_str()
1999 .ok_or(anyhow!("end_prop must be string"))?
2000 .to_string();
2001 let time_val = this
2002 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2003 .await?;
2004
2005 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2006 anyhow!("time argument must be a datetime value or string")
2007 })?;
2008
2009 let valid_from_val: Option<Value> = if let Ok(vid) =
2011 Self::vid_from_value(&node_val)
2012 {
2013 prop_manager
2015 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2016 .await
2017 .ok()
2018 } else if let Value::Map(map) = &node_val {
2019 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2021 let vid = Vid::from(vid_val);
2022 prop_manager
2023 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2024 .await
2025 .ok()
2026 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2027 let eid = uni_common::core::id::Eid::from(eid_val);
2029 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2030 } else {
2031 map.get(&start_prop).cloned()
2033 }
2034 } else {
2035 return Ok(Value::Bool(false));
2036 };
2037
2038 let valid_from = match valid_from_val {
2039 Some(ref v) => match value_to_datetime_utc(v) {
2040 Some(dt) => dt,
2041 None if v.is_null() => return Ok(Value::Bool(false)),
2042 None => {
2043 return Err(anyhow!(
2044 "Property {} must be a datetime value or string",
2045 start_prop
2046 ));
2047 }
2048 },
2049 None => return Ok(Value::Bool(false)),
2050 };
2051
2052 let valid_to_val: Option<Value> = if let Ok(vid) =
2053 Self::vid_from_value(&node_val)
2054 {
2055 prop_manager
2057 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2058 .await
2059 .ok()
2060 } else if let Value::Map(map) = &node_val {
2061 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2063 let vid = Vid::from(vid_val);
2064 prop_manager
2065 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2066 .await
2067 .ok()
2068 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2069 let eid = uni_common::core::id::Eid::from(eid_val);
2071 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2072 } else {
2073 map.get(&end_prop).cloned()
2075 }
2076 } else {
2077 return Ok(Value::Bool(false));
2078 };
2079
2080 let valid_to = match valid_to_val {
2081 Some(ref v) => match value_to_datetime_utc(v) {
2082 Some(dt) => Some(dt),
2083 None if v.is_null() => None,
2084 None => {
2085 return Err(anyhow!(
2086 "Property {} must be a datetime value or null",
2087 end_prop
2088 ));
2089 }
2090 },
2091 None => None,
2092 };
2093
2094 let is_valid = valid_from <= query_time
2095 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2096 return Ok(Value::Bool(is_valid));
2097 }
2098
2099 let mut evaluated_args = Vec::with_capacity(args.len());
2101 for arg in args {
2102 let mut val = this
2103 .evaluate_expr(arg, row, prop_manager, params, ctx)
2104 .await?;
2105
2106 if let Value::Map(ref mut map) = val {
2109 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2110 }
2111
2112 evaluated_args.push(val);
2113 }
2114 eval_scalar_function(
2115 name,
2116 &evaluated_args,
2117 self.custom_function_registry.as_deref(),
2118 )
2119 }
2120 Expr::Reduce {
2121 accumulator,
2122 init,
2123 variable,
2124 list,
2125 expr,
2126 } => {
2127 let mut acc = self
2128 .evaluate_expr(init, row, prop_manager, params, ctx)
2129 .await?;
2130 let list_val = self
2131 .evaluate_expr(list, row, prop_manager, params, ctx)
2132 .await?;
2133
2134 if let Value::List(items) = list_val {
2135 for item in items {
2136 let mut scope = row.clone();
2140 scope.insert(accumulator.clone(), acc.clone());
2141 scope.insert(variable.clone(), item);
2142
2143 acc = self
2144 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2145 .await?;
2146 }
2147 } else {
2148 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2149 }
2150 Ok(acc)
2151 }
2152 Expr::ValidAt { .. } => {
2153 Err(anyhow!(
2155 "VALID_AT expression should have been transformed to function call in planner"
2156 ))
2157 }
2158
2159 Expr::LabelCheck { expr, labels } => {
2160 let val = this
2161 .evaluate_expr(expr, row, prop_manager, params, ctx)
2162 .await?;
2163 match &val {
2164 Value::Null => Ok(Value::Null),
2165 Value::Map(map) => {
2166 let is_edge = map.contains_key("_eid")
2168 || map.contains_key("_type_name")
2169 || (map.contains_key("_type") && !map.contains_key("_vid"));
2170
2171 if is_edge {
2172 if labels.len() > 1 {
2174 return Ok(Value::Bool(false));
2175 }
2176 let label_to_check = &labels[0];
2177 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2178 {
2179 t == label_to_check
2180 } else if let Some(Value::String(t)) = map.get("_type") {
2181 t == label_to_check
2182 } else {
2183 false
2184 };
2185 Ok(Value::Bool(has_type))
2186 } else {
2187 let has_all = labels.iter().all(|label_to_check| {
2189 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2190 labels_arr
2191 .iter()
2192 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2193 } else {
2194 false
2195 }
2196 });
2197 Ok(Value::Bool(has_all))
2198 }
2199 }
2200 _ => Ok(Value::Bool(false)),
2201 }
2202 }
2203
2204 Expr::MapProjection { base, items } => {
2205 let base_value = this
2206 .evaluate_expr(base, row, prop_manager, params, ctx)
2207 .await?;
2208
2209 let properties = match &base_value {
2211 Value::Map(map) => map,
2212 _ => {
2213 return Err(anyhow!(
2214 "Map projection requires object, got {:?}",
2215 base_value
2216 ));
2217 }
2218 };
2219
2220 let mut result_map = HashMap::new();
2221
2222 for item in items {
2223 match item {
2224 MapProjectionItem::Property(prop) => {
2225 if let Some(value) = properties.get(prop.as_str()) {
2226 result_map.insert(prop.clone(), value.clone());
2227 }
2228 }
2229 MapProjectionItem::AllProperties => {
2230 for (key, value) in properties.iter() {
2232 if !key.starts_with('_') {
2233 result_map.insert(key.clone(), value.clone());
2234 }
2235 }
2236 }
2237 MapProjectionItem::LiteralEntry(key, expr) => {
2238 let value = this
2239 .evaluate_expr(expr, row, prop_manager, params, ctx)
2240 .await?;
2241 result_map.insert(key.clone(), value);
2242 }
2243 MapProjectionItem::Variable(var_name) => {
2244 if let Some(value) = row.get(var_name.as_str()) {
2247 result_map.insert(var_name.clone(), value.clone());
2248 }
2249 }
2250 }
2251 }
2252
2253 Ok(Value::Map(result_map))
2254 }
2255 }
2256 })
2257 }
2258
2259 pub(crate) fn execute_subplan<'a>(
2260 &'a self,
2261 plan: LogicalPlan,
2262 prop_manager: &'a PropertyManager,
2263 params: &'a HashMap<String, Value>,
2264 ctx: Option<&'a QueryContext>,
2265 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2266 Box::pin(async move {
2267 if let Some(ctx) = ctx {
2268 ctx.check_timeout()?;
2269 }
2270 match plan {
2271 LogicalPlan::Union { left, right, all } => {
2272 self.execute_union(left, right, all, prop_manager, params, ctx)
2273 .await
2274 }
2275 LogicalPlan::CreateVectorIndex {
2276 config,
2277 if_not_exists,
2278 } => {
2279 if if_not_exists && self.index_exists_by_name(&config.name) {
2280 return Ok(vec![]);
2281 }
2282 let idx_mgr = self.storage.index_manager();
2283 idx_mgr.create_vector_index(config).await?;
2284 Ok(vec![])
2285 }
2286 LogicalPlan::CreateFullTextIndex {
2287 config,
2288 if_not_exists,
2289 } => {
2290 if if_not_exists && self.index_exists_by_name(&config.name) {
2291 return Ok(vec![]);
2292 }
2293 let idx_mgr = self.storage.index_manager();
2294 idx_mgr.create_fts_index(config).await?;
2295 Ok(vec![])
2296 }
2297 LogicalPlan::CreateScalarIndex {
2298 mut config,
2299 if_not_exists,
2300 } => {
2301 if if_not_exists && self.index_exists_by_name(&config.name) {
2302 return Ok(vec![]);
2303 }
2304
2305 let mut modified_properties = Vec::new();
2307
2308 for prop in &config.properties {
2309 if prop.contains('(') && prop.contains(')') {
2311 let gen_col = SchemaManager::generated_column_name(prop);
2312
2313 let sm = self.storage.schema_manager_arc();
2315 if let Err(e) = sm.add_generated_property(
2316 &config.label,
2317 &gen_col,
2318 DataType::String, prop.clone(),
2320 ) {
2321 log::warn!("Failed to add generated property (might exist): {}", e);
2322 }
2323
2324 modified_properties.push(gen_col);
2325 } else {
2326 modified_properties.push(prop.clone());
2328 }
2329 }
2330
2331 config.properties = modified_properties;
2332
2333 let idx_mgr = self.storage.index_manager();
2334 idx_mgr.create_scalar_index(config).await?;
2335 Ok(vec![])
2336 }
2337 LogicalPlan::CreateJsonFtsIndex {
2338 config,
2339 if_not_exists,
2340 } => {
2341 if if_not_exists && self.index_exists_by_name(&config.name) {
2342 return Ok(vec![]);
2343 }
2344 let idx_mgr = self.storage.index_manager();
2345 idx_mgr.create_json_fts_index(config).await?;
2346 Ok(vec![])
2347 }
2348 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2349 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2350 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2351 LogicalPlan::Vacuum => {
2352 self.execute_vacuum().await?;
2353 Ok(vec![])
2354 }
2355 LogicalPlan::Checkpoint => {
2356 self.execute_checkpoint().await?;
2357 Ok(vec![])
2358 }
2359 LogicalPlan::CopyTo {
2360 label,
2361 path,
2362 format,
2363 options,
2364 } => {
2365 let count = self
2366 .execute_copy_to(&label, &path, &format, &options)
2367 .await?;
2368 let mut result = HashMap::new();
2369 result.insert("count".to_string(), Value::Int(count as i64));
2370 Ok(vec![result])
2371 }
2372 LogicalPlan::CopyFrom {
2373 label,
2374 path,
2375 format,
2376 options,
2377 } => {
2378 let count = self
2379 .execute_copy_from(&label, &path, &format, &options)
2380 .await?;
2381 let mut result = HashMap::new();
2382 result.insert("count".to_string(), Value::Int(count as i64));
2383 Ok(vec![result])
2384 }
2385 LogicalPlan::CreateLabel(clause) => {
2386 self.execute_create_label(clause).await?;
2387 Ok(vec![])
2388 }
2389 LogicalPlan::CreateEdgeType(clause) => {
2390 self.execute_create_edge_type(clause).await?;
2391 Ok(vec![])
2392 }
2393 LogicalPlan::AlterLabel(clause) => {
2394 self.execute_alter_label(clause).await?;
2395 Ok(vec![])
2396 }
2397 LogicalPlan::AlterEdgeType(clause) => {
2398 self.execute_alter_edge_type(clause).await?;
2399 Ok(vec![])
2400 }
2401 LogicalPlan::DropLabel(clause) => {
2402 self.execute_drop_label(clause).await?;
2403 Ok(vec![])
2404 }
2405 LogicalPlan::DropEdgeType(clause) => {
2406 self.execute_drop_edge_type(clause).await?;
2407 Ok(vec![])
2408 }
2409 LogicalPlan::CreateConstraint(clause) => {
2410 self.execute_create_constraint(clause).await?;
2411 Ok(vec![])
2412 }
2413 LogicalPlan::DropConstraint(clause) => {
2414 self.execute_drop_constraint(clause).await?;
2415 Ok(vec![])
2416 }
2417 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2418 LogicalPlan::DropIndex { name, if_exists } => {
2419 let idx_mgr = self.storage.index_manager();
2420 match idx_mgr.drop_index(&name).await {
2421 Ok(_) => Ok(vec![]),
2422 Err(e) => {
2423 if if_exists && e.to_string().contains("not found") {
2424 Ok(vec![])
2425 } else {
2426 Err(e)
2427 }
2428 }
2429 }
2430 }
2431 LogicalPlan::ShowIndexes { filter } => {
2432 Ok(self.execute_show_indexes(filter.as_deref()))
2433 }
2434 LogicalPlan::Scan { .. }
2437 | LogicalPlan::ExtIdLookup { .. }
2438 | LogicalPlan::ScanAll { .. }
2439 | LogicalPlan::ScanMainByLabels { .. }
2440 | LogicalPlan::Traverse { .. }
2441 | LogicalPlan::TraverseMainByType { .. } => {
2442 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2443 self.record_batches_to_rows(batches)
2444 }
2445 LogicalPlan::Filter {
2446 input,
2447 predicate,
2448 optional_variables,
2449 } => {
2450 let input_matches = self
2451 .execute_subplan(*input, prop_manager, params, ctx)
2452 .await?;
2453
2454 tracing::debug!(
2455 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2456 predicate,
2457 input_matches.len(),
2458 optional_variables
2459 );
2460
2461 if !optional_variables.is_empty() {
2465 let is_optional_key = |k: &str| -> bool {
2468 optional_variables.contains(k)
2469 || optional_variables
2470 .iter()
2471 .any(|var| k.starts_with(&format!("{}.", var)))
2472 };
2473
2474 let is_internal_key =
2476 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2477
2478 let non_optional_vars: Vec<String> = input_matches
2480 .first()
2481 .map(|row| {
2482 row.keys()
2483 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2484 .cloned()
2485 .collect()
2486 })
2487 .unwrap_or_default();
2488
2489 let mut groups: std::collections::HashMap<
2491 Vec<u8>,
2492 Vec<HashMap<String, Value>>,
2493 > = std::collections::HashMap::new();
2494
2495 for row in &input_matches {
2496 let key: Vec<u8> = non_optional_vars
2498 .iter()
2499 .map(|var| {
2500 row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2501 })
2502 .collect::<Vec<_>>()
2503 .join("|")
2504 .into_bytes();
2505
2506 groups.entry(key).or_default().push(row.clone());
2507 }
2508
2509 let mut filtered = Vec::new();
2510 for (_key, group_rows) in groups {
2511 let mut group_passed = Vec::new();
2512
2513 for row in &group_rows {
2514 let has_null_optional = optional_variables.iter().any(|var| {
2516 let direct_null =
2518 matches!(row.get(var), Some(Value::Null) | None);
2519 let prefixed_null = row
2520 .keys()
2521 .filter(|k| k.starts_with(&format!("{}.", var)))
2522 .any(|k| matches!(row.get(k), Some(Value::Null)));
2523 direct_null || prefixed_null
2524 });
2525
2526 if has_null_optional {
2527 group_passed.push(row.clone());
2528 continue;
2529 }
2530
2531 let res = self
2532 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2533 .await?;
2534
2535 if res.as_bool().unwrap_or(false) {
2536 group_passed.push(row.clone());
2537 }
2538 }
2539
2540 if group_passed.is_empty() {
2541 if let Some(template) = group_rows.first() {
2544 let mut null_row = HashMap::new();
2545 for (k, v) in template {
2546 if is_optional_key(k) {
2547 null_row.insert(k.clone(), Value::Null);
2548 } else {
2549 null_row.insert(k.clone(), v.clone());
2550 }
2551 }
2552 filtered.push(null_row);
2553 }
2554 } else {
2555 filtered.extend(group_passed);
2556 }
2557 }
2558
2559 tracing::debug!(
2560 "Filter (OPTIONAL): {} input rows -> {} output rows",
2561 input_matches.len(),
2562 filtered.len()
2563 );
2564
2565 return Ok(filtered);
2566 }
2567
2568 let mut filtered = Vec::new();
2570 for row in input_matches.iter() {
2571 let res = self
2572 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2573 .await?;
2574
2575 let passes = res.as_bool().unwrap_or(false);
2576
2577 if passes {
2578 filtered.push(row.clone());
2579 }
2580 }
2581
2582 tracing::debug!(
2583 "Filter: {} input rows -> {} output rows",
2584 input_matches.len(),
2585 filtered.len()
2586 );
2587
2588 Ok(filtered)
2589 }
2590 LogicalPlan::ProcedureCall {
2591 procedure_name,
2592 arguments,
2593 yield_items,
2594 } => {
2595 let yield_names: Vec<String> =
2596 yield_items.iter().map(|(n, _)| n.clone()).collect();
2597 let results = self
2598 .execute_procedure(
2599 &procedure_name,
2600 &arguments,
2601 &yield_names,
2602 prop_manager,
2603 params,
2604 ctx,
2605 )
2606 .await?;
2607
2608 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2614 if !has_aliases {
2615 Ok(results)
2618 } else {
2619 let mut aliased_results = Vec::with_capacity(results.len());
2620 for row in results {
2621 let mut new_row = HashMap::new();
2622 for (name, alias) in &yield_items {
2623 let col_name = alias.as_ref().unwrap_or(name);
2624 let val = row.get(name).cloned().unwrap_or(Value::Null);
2625 new_row.insert(col_name.clone(), val);
2626 }
2627 aliased_results.push(new_row);
2628 }
2629 Ok(aliased_results)
2630 }
2631 }
2632 LogicalPlan::VectorKnn { .. } => {
2633 unreachable!("VectorKnn is handled by DataFusion engine")
2634 }
2635 LogicalPlan::InvertedIndexLookup { .. } => {
2636 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2637 }
2638 LogicalPlan::Sort { input, order_by } => {
2639 let rows = self
2640 .execute_subplan(*input, prop_manager, params, ctx)
2641 .await?;
2642 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2643 .await
2644 }
2645 LogicalPlan::Limit { input, skip, fetch } => {
2646 let rows = self
2647 .execute_subplan(*input, prop_manager, params, ctx)
2648 .await?;
2649 let skip = skip.unwrap_or(0);
2650 let take = fetch.unwrap_or(usize::MAX);
2651 Ok(rows.into_iter().skip(skip).take(take).collect())
2652 }
2653 LogicalPlan::Aggregate {
2654 input,
2655 group_by,
2656 aggregates,
2657 } => {
2658 let rows = self
2659 .execute_subplan(*input, prop_manager, params, ctx)
2660 .await?;
2661 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2662 .await
2663 }
2664 LogicalPlan::Window {
2665 input,
2666 window_exprs,
2667 } => {
2668 let rows = self
2669 .execute_subplan(*input, prop_manager, params, ctx)
2670 .await?;
2671 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2672 .await
2673 }
2674 LogicalPlan::Project { input, projections } => {
2675 let matches = self
2676 .execute_subplan(*input, prop_manager, params, ctx)
2677 .await?;
2678 self.execute_project(matches, &projections, prop_manager, params, ctx)
2679 .await
2680 }
2681 LogicalPlan::Distinct { input } => {
2682 let rows = self
2683 .execute_subplan(*input, prop_manager, params, ctx)
2684 .await?;
2685 let mut seen = std::collections::HashSet::new();
2686 let mut result = Vec::new();
2687 for row in rows {
2688 let key = Self::canonical_row_key(&row);
2689 if seen.insert(key) {
2690 result.push(row);
2691 }
2692 }
2693 Ok(result)
2694 }
2695 LogicalPlan::Unwind {
2696 input,
2697 expr,
2698 variable,
2699 } => {
2700 let input_rows = self
2701 .execute_subplan(*input, prop_manager, params, ctx)
2702 .await?;
2703 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2704 .await
2705 }
2706 LogicalPlan::Apply {
2707 input,
2708 subquery,
2709 input_filter,
2710 } => {
2711 let input_rows = self
2712 .execute_subplan(*input, prop_manager, params, ctx)
2713 .await?;
2714 self.execute_apply(
2715 input_rows,
2716 &subquery,
2717 input_filter.as_ref(),
2718 prop_manager,
2719 params,
2720 ctx,
2721 )
2722 .await
2723 }
2724 LogicalPlan::SubqueryCall { input, subquery } => {
2725 let input_rows = self
2726 .execute_subplan(*input, prop_manager, params, ctx)
2727 .await?;
2728 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2731 .await
2732 }
2733 LogicalPlan::RecursiveCTE {
2734 cte_name,
2735 initial,
2736 recursive,
2737 } => {
2738 self.execute_recursive_cte(
2739 &cte_name,
2740 *initial,
2741 *recursive,
2742 prop_manager,
2743 params,
2744 ctx,
2745 )
2746 .await
2747 }
2748 LogicalPlan::CrossJoin { left, right } => {
2749 self.execute_cross_join(left, right, prop_manager, params, ctx)
2750 .await
2751 }
2752 LogicalPlan::Set { .. }
2753 | LogicalPlan::Remove { .. }
2754 | LogicalPlan::Merge { .. }
2755 | LogicalPlan::Create { .. }
2756 | LogicalPlan::CreateBatch { .. } => {
2757 unreachable!("mutations are handled by DataFusion engine")
2758 }
2759 LogicalPlan::Delete { .. } => {
2760 unreachable!("mutations are handled by DataFusion engine")
2761 }
2762 LogicalPlan::Copy {
2763 target,
2764 source,
2765 is_export,
2766 options,
2767 } => {
2768 if is_export {
2769 self.execute_export(&target, &source, &options, prop_manager, ctx)
2770 .await
2771 } else {
2772 self.execute_copy(&target, &source, &options, prop_manager)
2773 .await
2774 }
2775 }
2776 LogicalPlan::Backup {
2777 destination,
2778 options,
2779 } => self.execute_backup(&destination, &options).await,
2780 LogicalPlan::Explain { plan } => {
2781 let plan_str = format!("{:#?}", plan);
2782 let mut row = HashMap::new();
2783 row.insert("plan".to_string(), Value::String(plan_str));
2784 Ok(vec![row])
2785 }
2786 LogicalPlan::ShortestPath { .. } => {
2787 unreachable!("ShortestPath is handled by DataFusion engine")
2788 }
2789 LogicalPlan::AllShortestPaths { .. } => {
2790 unreachable!("AllShortestPaths is handled by DataFusion engine")
2791 }
2792 LogicalPlan::Foreach { .. } => {
2793 unreachable!("mutations are handled by DataFusion engine")
2794 }
2795 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2796 LogicalPlan::BindZeroLengthPath { .. } => {
2797 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2798 }
2799 LogicalPlan::BindPath { .. } => {
2800 unreachable!("BindPath is handled by DataFusion engine")
2801 }
2802 LogicalPlan::QuantifiedPattern { .. } => {
2803 unreachable!("QuantifiedPattern is handled by DataFusion engine")
2804 }
2805 LogicalPlan::LocyProgram { .. }
2806 | LogicalPlan::LocyFold { .. }
2807 | LogicalPlan::LocyBestBy { .. }
2808 | LogicalPlan::LocyPriority { .. }
2809 | LogicalPlan::LocyDerivedScan { .. }
2810 | LogicalPlan::LocyProject { .. } => {
2811 unreachable!("Locy operators are handled by DataFusion engine")
2812 }
2813 }
2814 })
2815 }
2816
2817 #[expect(clippy::too_many_arguments)]
2822 pub(crate) async fn execute_foreach_body_plan(
2823 &self,
2824 plan: LogicalPlan,
2825 scope: &mut HashMap<String, Value>,
2826 writer: &mut uni_store::runtime::writer::Writer,
2827 prop_manager: &PropertyManager,
2828 params: &HashMap<String, Value>,
2829 ctx: Option<&QueryContext>,
2830 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2831 ) -> Result<()> {
2832 match plan {
2833 LogicalPlan::Set { items, .. } => {
2834 self.execute_set_items_locked(
2835 &items,
2836 scope,
2837 writer,
2838 prop_manager,
2839 params,
2840 ctx,
2841 tx_l0,
2842 )
2843 .await?;
2844 }
2845 LogicalPlan::Remove { items, .. } => {
2846 self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx, tx_l0)
2847 .await?;
2848 }
2849 LogicalPlan::Delete { items, detach, .. } => {
2850 for expr in &items {
2851 let val = self
2852 .evaluate_expr(expr, scope, prop_manager, params, ctx)
2853 .await?;
2854 self.execute_delete_item_locked(&val, detach, writer, tx_l0)
2855 .await?;
2856 }
2857 }
2858 LogicalPlan::Create { pattern, .. } => {
2859 self.execute_create_pattern(
2860 &pattern,
2861 scope,
2862 writer,
2863 prop_manager,
2864 params,
2865 ctx,
2866 tx_l0,
2867 )
2868 .await?;
2869 }
2870 LogicalPlan::CreateBatch { patterns, .. } => {
2871 for pattern in &patterns {
2872 self.execute_create_pattern(
2873 pattern,
2874 scope,
2875 writer,
2876 prop_manager,
2877 params,
2878 ctx,
2879 tx_l0,
2880 )
2881 .await?;
2882 }
2883 }
2884 LogicalPlan::Merge {
2885 pattern,
2886 on_match: _,
2887 on_create,
2888 ..
2889 } => {
2890 self.execute_create_pattern(
2891 &pattern,
2892 scope,
2893 writer,
2894 prop_manager,
2895 params,
2896 ctx,
2897 tx_l0,
2898 )
2899 .await?;
2900 if let Some(on_create_clause) = on_create {
2901 self.execute_set_items_locked(
2902 &on_create_clause.items,
2903 scope,
2904 writer,
2905 prop_manager,
2906 params,
2907 ctx,
2908 tx_l0,
2909 )
2910 .await?;
2911 }
2912 }
2913 LogicalPlan::Foreach {
2914 variable,
2915 list,
2916 body,
2917 ..
2918 } => {
2919 let list_val = self
2920 .evaluate_expr(&list, scope, prop_manager, params, ctx)
2921 .await?;
2922 let items = match list_val {
2923 Value::List(arr) => arr,
2924 Value::Null => return Ok(()),
2925 _ => return Err(anyhow!("FOREACH requires a list")),
2926 };
2927 for item in items {
2928 let mut nested_scope = scope.clone();
2929 nested_scope.insert(variable.clone(), item);
2930 for nested_plan in &body {
2931 Box::pin(self.execute_foreach_body_plan(
2932 nested_plan.clone(),
2933 &mut nested_scope,
2934 writer,
2935 prop_manager,
2936 params,
2937 ctx,
2938 tx_l0,
2939 ))
2940 .await?;
2941 }
2942 }
2943 }
2944 _ => {
2945 return Err(anyhow!(
2946 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2947 ));
2948 }
2949 }
2950 Ok(())
2951 }
2952
2953 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2954 let mut pairs: Vec<_> = row.iter().collect();
2955 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2956
2957 pairs
2958 .into_iter()
2959 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2960 .collect::<Vec<_>>()
2961 .join("|")
2962 }
2963
2964 fn canonical_value_key(v: &Value) -> String {
2965 match v {
2966 Value::Null => "null".to_string(),
2967 Value::Bool(b) => format!("b:{b}"),
2968 Value::Int(i) => format!("n:{i}"),
2969 Value::Float(f) => {
2970 if f.is_nan() {
2971 "nan".to_string()
2972 } else if f.is_infinite() {
2973 if f.is_sign_positive() {
2974 "inf:+".to_string()
2975 } else {
2976 "inf:-".to_string()
2977 }
2978 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
2979 format!("n:{}", *f as i64)
2980 } else {
2981 format!("f:{f}")
2982 }
2983 }
2984 Value::String(s) => {
2985 if let Some(k) = Self::temporal_string_key(s) {
2986 format!("temporal:{k}")
2987 } else {
2988 format!("s:{s}")
2989 }
2990 }
2991 Value::Bytes(b) => format!("bytes:{:?}", b),
2992 Value::List(items) => format!(
2993 "list:[{}]",
2994 items
2995 .iter()
2996 .map(Self::canonical_value_key)
2997 .collect::<Vec<_>>()
2998 .join(",")
2999 ),
3000 Value::Map(map) => {
3001 let mut pairs: Vec<_> = map.iter().collect();
3002 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3003 format!(
3004 "map:{{{}}}",
3005 pairs
3006 .into_iter()
3007 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3008 .collect::<Vec<_>>()
3009 .join(",")
3010 )
3011 }
3012 Value::Node(n) => {
3013 let mut labels = n.labels.clone();
3014 labels.sort();
3015 format!(
3016 "node:{}:{}:{}",
3017 n.vid.as_u64(),
3018 labels.join(":"),
3019 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3020 )
3021 }
3022 Value::Edge(e) => format!(
3023 "edge:{}:{}:{}:{}:{}",
3024 e.eid.as_u64(),
3025 e.edge_type,
3026 e.src.as_u64(),
3027 e.dst.as_u64(),
3028 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3029 ),
3030 Value::Path(p) => format!(
3031 "path:nodes=[{}];edges=[{}]",
3032 p.nodes
3033 .iter()
3034 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3035 .collect::<Vec<_>>()
3036 .join(","),
3037 p.edges
3038 .iter()
3039 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3040 .collect::<Vec<_>>()
3041 .join(",")
3042 ),
3043 Value::Vector(vs) => format!("vec:{:?}", vs),
3044 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3045 _ => format!("{v:?}"),
3046 }
3047 }
3048
3049 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3050 match t {
3051 uni_common::TemporalValue::Date { days_since_epoch } => {
3052 format!("date:{days_since_epoch}")
3053 }
3054 uni_common::TemporalValue::LocalTime {
3055 nanos_since_midnight,
3056 } => format!("localtime:{nanos_since_midnight}"),
3057 uni_common::TemporalValue::Time {
3058 nanos_since_midnight,
3059 offset_seconds,
3060 } => {
3061 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3062 format!("time:{utc_nanos}")
3063 }
3064 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3065 format!("localdatetime:{nanos_since_epoch}")
3066 }
3067 uni_common::TemporalValue::DateTime {
3068 nanos_since_epoch, ..
3069 } => format!("datetime:{nanos_since_epoch}"),
3070 uni_common::TemporalValue::Duration {
3071 months,
3072 days,
3073 nanos,
3074 } => format!("duration:{months}:{days}:{nanos}"),
3075 }
3076 }
3077
3078 fn temporal_string_key(s: &str) -> Option<String> {
3079 let fn_name = match classify_temporal(s)? {
3080 uni_common::TemporalType::Date => "DATE",
3081 uni_common::TemporalType::LocalTime => "LOCALTIME",
3082 uni_common::TemporalType::Time => "TIME",
3083 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3084 uni_common::TemporalType::DateTime => "DATETIME",
3085 uni_common::TemporalType::Duration => "DURATION",
3086 };
3087 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3088 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3089 _ => None,
3090 }
3091 }
3092
3093 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3096
3097 pub(crate) async fn execute_aggregate(
3098 &self,
3099 rows: Vec<HashMap<String, Value>>,
3100 group_by: &[Expr],
3101 aggregates: &[Expr],
3102 prop_manager: &PropertyManager,
3103 params: &HashMap<String, Value>,
3104 ctx: Option<&QueryContext>,
3105 ) -> Result<Vec<HashMap<String, Value>>> {
3106 if let Some(ctx) = ctx {
3108 ctx.check_timeout()?;
3109 }
3110
3111 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3112
3113 if rows.is_empty() {
3116 if group_by.is_empty() {
3117 let accs = Self::create_accumulators(aggregates);
3118 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3119 return Ok(vec![row]);
3120 }
3121 return Ok(vec![]);
3122 }
3123
3124 for (idx, row) in rows.into_iter().enumerate() {
3125 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3127 && let Some(ctx) = ctx
3128 {
3129 ctx.check_timeout()?;
3130 }
3131
3132 let key_vals = self
3133 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3134 .await?;
3135 let key_str = format!(
3138 "[{}]",
3139 key_vals
3140 .iter()
3141 .map(Self::canonical_value_key)
3142 .collect::<Vec<_>>()
3143 .join(",")
3144 );
3145
3146 let entry = groups
3147 .entry(key_str)
3148 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3149
3150 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3151 .await?;
3152 }
3153
3154 let results = groups
3155 .values()
3156 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3157 .collect();
3158
3159 Ok(results)
3160 }
3161
3162 pub(crate) async fn execute_window(
3163 &self,
3164 mut rows: Vec<HashMap<String, Value>>,
3165 window_exprs: &[Expr],
3166 _prop_manager: &PropertyManager,
3167 _params: &HashMap<String, Value>,
3168 ctx: Option<&QueryContext>,
3169 ) -> Result<Vec<HashMap<String, Value>>> {
3170 if let Some(ctx) = ctx {
3172 ctx.check_timeout()?;
3173 }
3174
3175 if rows.is_empty() || window_exprs.is_empty() {
3177 return Ok(rows);
3178 }
3179
3180 for window_expr in window_exprs {
3182 let Expr::FunctionCall {
3184 name,
3185 args,
3186 window_spec: Some(window_spec),
3187 ..
3188 } = window_expr
3189 else {
3190 return Err(anyhow!(
3191 "Window expression must be a FunctionCall with OVER clause: {:?}",
3192 window_expr
3193 ));
3194 };
3195
3196 let name_upper = name.to_uppercase();
3197
3198 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3200 return Err(anyhow!(
3201 "Unsupported window function: {}. Supported functions: {}",
3202 name,
3203 WINDOW_FUNCTIONS.join(", ")
3204 ));
3205 }
3206
3207 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3209
3210 for (row_idx, row) in rows.iter().enumerate() {
3211 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3213 vec![]
3215 } else {
3216 window_spec
3217 .partition_by
3218 .iter()
3219 .map(|expr| self.evaluate_simple_expr(expr, row))
3220 .collect::<Result<Vec<_>>>()?
3221 };
3222
3223 partition_map
3224 .entry(partition_key)
3225 .or_default()
3226 .push(row_idx);
3227 }
3228
3229 for (_partition_key, row_indices) in partition_map.iter_mut() {
3231 if !window_spec.order_by.is_empty() {
3233 row_indices.sort_by(|&a, &b| {
3234 for sort_item in &window_spec.order_by {
3235 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3236 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3237
3238 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3239 let cmp = Executor::compare_values(&va, &vb);
3240 let cmp = if sort_item.ascending {
3241 cmp
3242 } else {
3243 cmp.reverse()
3244 };
3245 if cmp != std::cmp::Ordering::Equal {
3246 return cmp;
3247 }
3248 }
3249 }
3250 std::cmp::Ordering::Equal
3251 });
3252 }
3253
3254 for (position, &row_idx) in row_indices.iter().enumerate() {
3256 let window_value = match name_upper.as_str() {
3257 "ROW_NUMBER" => Value::from((position + 1) as i64),
3258 "RANK" => {
3259 let rank = if position == 0 {
3261 1i64
3262 } else {
3263 let prev_row_idx = row_indices[position - 1];
3264 let same_as_prev = self.rows_have_same_sort_keys(
3265 &window_spec.order_by,
3266 &rows,
3267 row_idx,
3268 prev_row_idx,
3269 );
3270
3271 if same_as_prev {
3272 let mut group_start = position - 1;
3274 while group_start > 0 {
3275 let curr_idx = row_indices[group_start];
3276 let prev_idx = row_indices[group_start - 1];
3277 if !self.rows_have_same_sort_keys(
3278 &window_spec.order_by,
3279 &rows,
3280 curr_idx,
3281 prev_idx,
3282 ) {
3283 break;
3284 }
3285 group_start -= 1;
3286 }
3287 (group_start + 1) as i64
3288 } else {
3289 (position + 1) as i64
3290 }
3291 };
3292 Value::from(rank)
3293 }
3294 "DENSE_RANK" => {
3295 let mut dense_rank = 1i64;
3297 for i in 0..position {
3298 let curr_idx = row_indices[i + 1];
3299 let prev_idx = row_indices[i];
3300 if !self.rows_have_same_sort_keys(
3301 &window_spec.order_by,
3302 &rows,
3303 curr_idx,
3304 prev_idx,
3305 ) {
3306 dense_rank += 1;
3307 }
3308 }
3309 Value::from(dense_rank)
3310 }
3311 "LAG" => {
3312 let (value_expr, offset, default_value) =
3313 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3314
3315 if position >= offset {
3316 let target_idx = row_indices[position - offset];
3317 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3318 } else {
3319 default_value
3320 }
3321 }
3322 "LEAD" => {
3323 let (value_expr, offset, default_value) =
3324 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3325
3326 if position + offset < row_indices.len() {
3327 let target_idx = row_indices[position + offset];
3328 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3329 } else {
3330 default_value
3331 }
3332 }
3333 "NTILE" => {
3334 let num_buckets_expr = args.first().ok_or_else(|| {
3336 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3337 })?;
3338 let num_buckets_val =
3339 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3340 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3341 anyhow!(
3342 "NTILE argument must be an integer, got: {:?}",
3343 num_buckets_val
3344 )
3345 })?;
3346
3347 if num_buckets <= 0 {
3348 return Err(anyhow!(
3349 "NTILE bucket count must be positive, got: {}",
3350 num_buckets
3351 ));
3352 }
3353
3354 let num_buckets = num_buckets as usize;
3355 let partition_size = row_indices.len();
3356
3357 let base_size = partition_size / num_buckets;
3362 let extra_rows = partition_size % num_buckets;
3363
3364 let bucket = if position < extra_rows * (base_size + 1) {
3366 position / (base_size + 1) + 1
3368 } else {
3369 let adjusted_position = position - extra_rows * (base_size + 1);
3371 extra_rows + (adjusted_position / base_size) + 1
3372 };
3373
3374 Value::from(bucket as i64)
3375 }
3376 "FIRST_VALUE" => {
3377 let value_expr = args.first().ok_or_else(|| {
3379 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3380 })?;
3381
3382 if row_indices.is_empty() {
3384 Value::Null
3385 } else {
3386 let first_idx = row_indices[0];
3387 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3388 }
3389 }
3390 "LAST_VALUE" => {
3391 let value_expr = args.first().ok_or_else(|| {
3393 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3394 })?;
3395
3396 if row_indices.is_empty() {
3398 Value::Null
3399 } else {
3400 let last_idx = row_indices[row_indices.len() - 1];
3401 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3402 }
3403 }
3404 "NTH_VALUE" => {
3405 if args.len() != 2 {
3407 return Err(anyhow!(
3408 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3409 ));
3410 }
3411
3412 let value_expr = &args[0];
3413 let n_expr = &args[1];
3414
3415 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3416 let n = n_val.as_i64().ok_or_else(|| {
3417 anyhow!(
3418 "NTH_VALUE second argument must be an integer, got: {:?}",
3419 n_val
3420 )
3421 })?;
3422
3423 if n <= 0 {
3424 return Err(anyhow!(
3425 "NTH_VALUE position must be positive, got: {}",
3426 n
3427 ));
3428 }
3429
3430 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3432 let nth_idx = row_indices[nth_index];
3433 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3434 } else {
3435 Value::Null
3436 }
3437 }
3438 _ => unreachable!("Window function {} already validated", name),
3439 };
3440
3441 let col_name = window_expr.to_string_repr();
3444 rows[row_idx].insert(col_name, window_value);
3445 }
3446 }
3447 }
3448
3449 Ok(rows)
3450 }
3451
3452 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3457 match expr {
3458 Expr::Variable(name) => row
3459 .get(name)
3460 .cloned()
3461 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3462 Expr::Property(base, prop) => {
3463 let base_val = self.evaluate_simple_expr(base, row)?;
3464 if let Value::Map(map) = base_val {
3465 map.get(prop)
3466 .cloned()
3467 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3468 } else {
3469 Err(anyhow!("Cannot access property on non-object"))
3470 }
3471 }
3472 Expr::Literal(lit) => Ok(lit.to_value()),
3473 _ => Err(anyhow!(
3474 "Unsupported expression in window function: {:?}",
3475 expr
3476 )),
3477 }
3478 }
3479
3480 fn rows_have_same_sort_keys(
3482 &self,
3483 order_by: &[uni_cypher::ast::SortItem],
3484 rows: &[HashMap<String, Value>],
3485 idx_a: usize,
3486 idx_b: usize,
3487 ) -> bool {
3488 order_by.iter().all(|sort_item| {
3489 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3490 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3491 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3492 })
3493 }
3494
3495 fn extract_lag_lead_params<'a>(
3497 &self,
3498 func_name: &str,
3499 args: &'a [Expr],
3500 row: &HashMap<String, Value>,
3501 ) -> Result<(&'a Expr, usize, Value)> {
3502 let value_expr = args.first().ok_or_else(|| {
3503 anyhow!(
3504 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3505 func_name,
3506 func_name
3507 )
3508 })?;
3509
3510 let offset = if let Some(offset_expr) = args.get(1) {
3511 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3512 offset_val.as_i64().ok_or_else(|| {
3513 anyhow!(
3514 "{} offset must be an integer, got: {:?}",
3515 func_name,
3516 offset_val
3517 )
3518 })? as usize
3519 } else {
3520 1
3521 };
3522
3523 let default_value = if let Some(default_expr) = args.get(2) {
3524 self.evaluate_simple_expr(default_expr, row)?
3525 } else {
3526 Value::Null
3527 };
3528
3529 Ok((value_expr, offset, default_value))
3530 }
3531
3532 pub(crate) async fn evaluate_group_keys(
3534 &self,
3535 group_by: &[Expr],
3536 row: &HashMap<String, Value>,
3537 prop_manager: &PropertyManager,
3538 params: &HashMap<String, Value>,
3539 ctx: Option<&QueryContext>,
3540 ) -> Result<Vec<Value>> {
3541 let mut key_vals = Vec::new();
3542 for expr in group_by {
3543 key_vals.push(
3544 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3545 .await?,
3546 );
3547 }
3548 Ok(key_vals)
3549 }
3550
3551 pub(crate) async fn update_accumulators(
3553 &self,
3554 accs: &mut [Accumulator],
3555 aggregates: &[Expr],
3556 row: &HashMap<String, Value>,
3557 prop_manager: &PropertyManager,
3558 params: &HashMap<String, Value>,
3559 ctx: Option<&QueryContext>,
3560 ) -> Result<()> {
3561 for (i, agg_expr) in aggregates.iter().enumerate() {
3562 if let Expr::FunctionCall { args, .. } = agg_expr {
3563 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3564 let val = if is_wildcard {
3565 Value::Null
3566 } else {
3567 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3568 .await?
3569 };
3570 accs[i].update(&val, is_wildcard);
3571 }
3572 }
3573 Ok(())
3574 }
3575
3576 pub(crate) async fn execute_recursive_cte(
3578 &self,
3579 cte_name: &str,
3580 initial: LogicalPlan,
3581 recursive: LogicalPlan,
3582 prop_manager: &PropertyManager,
3583 params: &HashMap<String, Value>,
3584 ctx: Option<&QueryContext>,
3585 ) -> Result<Vec<HashMap<String, Value>>> {
3586 use std::collections::HashSet;
3587
3588 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3591 let mut pairs: Vec<_> = row.iter().collect();
3592 pairs.sort_by(|a, b| a.0.cmp(b.0));
3593 format!("{pairs:?}")
3594 }
3595
3596 let mut working_table = self
3598 .execute_subplan(initial, prop_manager, params, ctx)
3599 .await?;
3600 let mut result_table = working_table.clone();
3601
3602 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3604
3605 let max_iterations = self.config.max_recursive_cte_iterations;
3608 for _iteration in 0..max_iterations {
3609 if let Some(ctx) = ctx {
3611 ctx.check_timeout()?;
3612 }
3613
3614 if working_table.is_empty() {
3615 break;
3616 }
3617
3618 let working_val = Value::List(
3620 working_table
3621 .iter()
3622 .map(|row| {
3623 if row.len() == 1 {
3624 row.values().next().unwrap().clone()
3625 } else {
3626 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3627 }
3628 })
3629 .collect(),
3630 );
3631
3632 let mut next_params = params.clone();
3633 next_params.insert(cte_name.to_string(), working_val);
3634
3635 let next_result = self
3637 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3638 .await?;
3639
3640 if next_result.is_empty() {
3641 break;
3642 }
3643
3644 let new_rows: Vec<_> = next_result
3646 .into_iter()
3647 .filter(|row| {
3648 let key = row_key(row);
3649 seen.insert(key) })
3651 .collect();
3652
3653 if new_rows.is_empty() {
3654 break;
3656 }
3657
3658 result_table.extend(new_rows.clone());
3659 working_table = new_rows;
3660 }
3661
3662 let final_list = Value::List(
3664 result_table
3665 .into_iter()
3666 .map(|row| {
3667 if row.len() == 1 {
3679 row.values().next().unwrap().clone()
3680 } else {
3681 Value::Map(row.into_iter().collect())
3682 }
3683 })
3684 .collect(),
3685 );
3686
3687 let mut final_row = HashMap::new();
3688 final_row.insert(cte_name.to_string(), final_list);
3689 Ok(vec![final_row])
3690 }
3691
3692 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3694
3695 pub(crate) async fn execute_sort(
3696 &self,
3697 rows: Vec<HashMap<String, Value>>,
3698 order_by: &[uni_cypher::ast::SortItem],
3699 prop_manager: &PropertyManager,
3700 params: &HashMap<String, Value>,
3701 ctx: Option<&QueryContext>,
3702 ) -> Result<Vec<HashMap<String, Value>>> {
3703 if let Some(ctx) = ctx {
3705 ctx.check_timeout()?;
3706 }
3707
3708 let mut rows_with_keys = Vec::with_capacity(rows.len());
3709 for (idx, row) in rows.into_iter().enumerate() {
3710 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3712 && let Some(ctx) = ctx
3713 {
3714 ctx.check_timeout()?;
3715 }
3716
3717 let mut keys = Vec::new();
3718 for item in order_by {
3719 let val = row
3720 .get(&item.expr.to_string_repr())
3721 .cloned()
3722 .unwrap_or(Value::Null);
3723 let val = if val.is_null() {
3724 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3725 .await
3726 .unwrap_or(Value::Null)
3727 } else {
3728 val
3729 };
3730 keys.push(val);
3731 }
3732 rows_with_keys.push((row, keys));
3733 }
3734
3735 if let Some(ctx) = ctx {
3737 ctx.check_timeout()?;
3738 }
3739
3740 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3741
3742 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3743 }
3744
3745 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3747 aggregates
3748 .iter()
3749 .map(|expr| {
3750 if let Expr::FunctionCall { name, distinct, .. } = expr {
3751 Accumulator::new(name, *distinct)
3752 } else {
3753 Accumulator::new("COUNT", false)
3754 }
3755 })
3756 .collect()
3757 }
3758
3759 pub(crate) fn build_aggregate_result(
3761 group_by: &[Expr],
3762 aggregates: &[Expr],
3763 key_vals: &[Value],
3764 accs: &[Accumulator],
3765 ) -> HashMap<String, Value> {
3766 let mut res_row = HashMap::new();
3767 for (i, expr) in group_by.iter().enumerate() {
3768 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3769 }
3770 for (i, expr) in aggregates.iter().enumerate() {
3771 let col_name = crate::query::planner::aggregate_column_name(expr);
3773 res_row.insert(col_name, accs[i].finish());
3774 }
3775 res_row
3776 }
3777
3778 pub(crate) fn compare_sort_keys(
3780 a_keys: &[Value],
3781 b_keys: &[Value],
3782 order_by: &[uni_cypher::ast::SortItem],
3783 ) -> std::cmp::Ordering {
3784 for (i, item) in order_by.iter().enumerate() {
3785 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3786 if order != std::cmp::Ordering::Equal {
3787 return if item.ascending {
3788 order
3789 } else {
3790 order.reverse()
3791 };
3792 }
3793 }
3794 std::cmp::Ordering::Equal
3795 }
3796
3797 pub(crate) async fn execute_backup(
3801 &self,
3802 destination: &str,
3803 _options: &HashMap<String, Value>,
3804 ) -> Result<Vec<HashMap<String, Value>>> {
3805 if let Some(writer_arc) = &self.writer {
3807 let mut writer = writer_arc.write().await;
3808 writer.flush_to_l1(None).await?;
3809 }
3810
3811 let snapshot_manager = self.storage.snapshot_manager();
3813 let snapshot = snapshot_manager
3814 .load_latest_snapshot()
3815 .await?
3816 .ok_or_else(|| anyhow!("No snapshot found"))?;
3817
3818 if is_cloud_url(destination) {
3820 self.backup_to_cloud(destination, &snapshot.snapshot_id)
3821 .await?;
3822 } else {
3823 let validated_dest = self.validate_path(destination)?;
3825 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3826 .await?;
3827 }
3828
3829 let mut res = HashMap::new();
3830 res.insert(
3831 "status".to_string(),
3832 Value::String("Backup completed".to_string()),
3833 );
3834 res.insert(
3835 "snapshot_id".to_string(),
3836 Value::String(snapshot.snapshot_id),
3837 );
3838 Ok(vec![res])
3839 }
3840
3841 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3843 let source_path = std::path::Path::new(self.storage.base_path());
3844
3845 if !dest_path.exists() {
3846 std::fs::create_dir_all(dest_path)?;
3847 }
3848
3849 if source_path.exists() {
3851 Self::copy_dir_all(source_path, dest_path)?;
3852 }
3853
3854 let schema_manager = self.storage.schema_manager();
3856 let dest_catalog = dest_path.join("catalog");
3857 if !dest_catalog.exists() {
3858 std::fs::create_dir_all(&dest_catalog)?;
3859 }
3860
3861 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3862 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3863
3864 Ok(())
3865 }
3866
3867 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3871 use object_store::ObjectStore;
3872 use object_store::local::LocalFileSystem;
3873 use object_store::path::Path as ObjPath;
3874
3875 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3876 let source_path = std::path::Path::new(self.storage.base_path());
3877
3878 let src_store: Arc<dyn ObjectStore> =
3880 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3881
3882 let catalog_src = ObjPath::from("catalog");
3884 let catalog_dst = if dest_prefix.as_ref().is_empty() {
3885 ObjPath::from("catalog")
3886 } else {
3887 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3888 };
3889 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3890
3891 let storage_src = ObjPath::from("storage");
3893 let storage_dst = if dest_prefix.as_ref().is_empty() {
3894 ObjPath::from("storage")
3895 } else {
3896 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3897 };
3898 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3899
3900 let schema_manager = self.storage.schema_manager();
3902 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3903 let schema_path = if dest_prefix.as_ref().is_empty() {
3904 ObjPath::from("catalog/schema.json")
3905 } else {
3906 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3907 };
3908 dest_store
3909 .put(&schema_path, bytes::Bytes::from(schema_content).into())
3910 .await?;
3911
3912 Ok(())
3913 }
3914
3915 const MAX_BACKUP_DEPTH: usize = 100;
3920
3921 const MAX_BACKUP_FILES: usize = 100_000;
3926
3927 pub(crate) fn copy_dir_all(
3935 src: &std::path::Path,
3936 dst: &std::path::Path,
3937 ) -> std::io::Result<()> {
3938 let mut file_count = 0usize;
3939 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3940 }
3941
3942 pub(crate) fn copy_dir_all_impl(
3944 src: &std::path::Path,
3945 dst: &std::path::Path,
3946 depth: usize,
3947 file_count: &mut usize,
3948 ) -> std::io::Result<()> {
3949 if depth >= Self::MAX_BACKUP_DEPTH {
3950 return Err(std::io::Error::new(
3951 std::io::ErrorKind::InvalidInput,
3952 format!(
3953 "Maximum backup depth {} exceeded at {:?}",
3954 Self::MAX_BACKUP_DEPTH,
3955 src
3956 ),
3957 ));
3958 }
3959
3960 std::fs::create_dir_all(dst)?;
3961
3962 for entry in std::fs::read_dir(src)? {
3963 if *file_count >= Self::MAX_BACKUP_FILES {
3964 return Err(std::io::Error::new(
3965 std::io::ErrorKind::InvalidInput,
3966 format!(
3967 "Maximum backup file count {} exceeded",
3968 Self::MAX_BACKUP_FILES
3969 ),
3970 ));
3971 }
3972 *file_count += 1;
3973
3974 let entry = entry?;
3975 let metadata = entry.metadata()?;
3976
3977 if metadata.file_type().is_symlink() {
3979 continue;
3981 }
3982
3983 let dst_path = dst.join(entry.file_name());
3984 if metadata.is_dir() {
3985 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
3986 } else {
3987 std::fs::copy(entry.path(), dst_path)?;
3988 }
3989 }
3990 Ok(())
3991 }
3992
3993 pub(crate) async fn execute_copy(
3994 &self,
3995 target: &str,
3996 source: &str,
3997 options: &HashMap<String, Value>,
3998 prop_manager: &PropertyManager,
3999 ) -> Result<Vec<HashMap<String, Value>>> {
4000 let format = options
4001 .get("format")
4002 .and_then(|v| v.as_str())
4003 .unwrap_or_else(|| {
4004 if source.ends_with(".parquet") {
4005 "parquet"
4006 } else {
4007 "csv"
4008 }
4009 });
4010
4011 match format.to_lowercase().as_str() {
4012 "csv" => self.execute_csv_import(target, source, options).await,
4013 "parquet" => {
4014 self.execute_parquet_import(target, source, options, prop_manager)
4015 .await
4016 }
4017 _ => Err(anyhow!("Unsupported format: {}", format)),
4018 }
4019 }
4020
4021 pub(crate) async fn execute_csv_import(
4022 &self,
4023 target: &str,
4024 source: &str,
4025 options: &HashMap<String, Value>,
4026 ) -> Result<Vec<HashMap<String, Value>>> {
4027 let validated_source = self.validate_path(source)?;
4029
4030 let writer_lock = self
4031 .writer
4032 .as_ref()
4033 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4034
4035 let schema = self.storage.schema_manager().schema();
4036
4037 let label_meta = schema.labels.get(target);
4039 let edge_meta = schema.edge_types.get(target);
4040
4041 if label_meta.is_none() && edge_meta.is_none() {
4042 return Err(anyhow!("Target '{}' not found in schema", target));
4043 }
4044
4045 let delimiter_str = options
4047 .get("delimiter")
4048 .and_then(|v| v.as_str())
4049 .unwrap_or(",");
4050 let delimiter = if delimiter_str.is_empty() {
4051 b','
4052 } else {
4053 delimiter_str.as_bytes()[0]
4054 };
4055 let has_header = options
4056 .get("header")
4057 .and_then(|v| v.as_bool())
4058 .unwrap_or(true);
4059
4060 let mut rdr = csv::ReaderBuilder::new()
4061 .delimiter(delimiter)
4062 .has_headers(has_header)
4063 .from_path(&validated_source)?;
4064
4065 let headers = rdr.headers()?.clone();
4066 let mut count = 0;
4067
4068 let mut writer = writer_lock.write().await;
4069
4070 if label_meta.is_some() {
4071 let target_props = schema
4072 .properties
4073 .get(target)
4074 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4075
4076 for result in rdr.records() {
4077 let record = result?;
4078 let mut props = HashMap::new();
4079
4080 for (i, header) in headers.iter().enumerate() {
4081 if let Some(val_str) = record.get(i)
4082 && let Some(prop_meta) = target_props.get(header)
4083 {
4084 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4085 props.insert(header.to_string(), val);
4086 }
4087 }
4088
4089 let vid = writer.next_vid().await?;
4090 writer
4091 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4092 .await?;
4093 count += 1;
4094 }
4095 } else if let Some(meta) = edge_meta {
4096 let type_id = meta.id;
4097 let target_props = schema
4098 .properties
4099 .get(target)
4100 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4101
4102 let src_col = options
4105 .get("src_col")
4106 .and_then(|v| v.as_str())
4107 .unwrap_or("_src");
4108 let dst_col = options
4109 .get("dst_col")
4110 .and_then(|v| v.as_str())
4111 .unwrap_or("_dst");
4112
4113 for result in rdr.records() {
4114 let record = result?;
4115 let mut props = HashMap::new();
4116 let mut src_vid = None;
4117 let mut dst_vid = None;
4118
4119 for (i, header) in headers.iter().enumerate() {
4120 if let Some(val_str) = record.get(i) {
4121 if header == src_col {
4122 src_vid =
4123 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4124 } else if header == dst_col {
4125 dst_vid =
4126 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4127 } else if let Some(prop_meta) = target_props.get(header) {
4128 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4129 props.insert(header.to_string(), val);
4130 }
4131 }
4132 }
4133
4134 let src =
4135 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4136 let dst = dst_vid
4137 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4138
4139 let eid = writer.next_eid(type_id).await?;
4140 writer
4141 .insert_edge(
4142 src,
4143 dst,
4144 type_id,
4145 eid,
4146 props,
4147 Some(target.to_string()),
4148 None,
4149 )
4150 .await?;
4151 count += 1;
4152 }
4153 }
4154
4155 let mut res = HashMap::new();
4156 res.insert("count".to_string(), Value::Int(count as i64));
4157 Ok(vec![res])
4158 }
4159
4160 pub(crate) async fn execute_parquet_import(
4164 &self,
4165 target: &str,
4166 source: &str,
4167 options: &HashMap<String, Value>,
4168 _prop_manager: &PropertyManager,
4169 ) -> Result<Vec<HashMap<String, Value>>> {
4170 let writer_lock = self
4171 .writer
4172 .as_ref()
4173 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4174
4175 let schema = self.storage.schema_manager().schema();
4176
4177 let label_meta = schema.labels.get(target);
4179 let edge_meta = schema.edge_types.get(target);
4180
4181 if label_meta.is_none() && edge_meta.is_none() {
4182 return Err(anyhow!("Target '{}' not found in schema", target));
4183 }
4184
4185 let reader = if is_cloud_url(source) {
4187 self.open_parquet_from_cloud(source).await?
4188 } else {
4189 let validated_source = self.validate_path(source)?;
4191 let file = std::fs::File::open(&validated_source)?;
4192 let builder =
4193 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4194 builder.build()?
4195 };
4196 let mut reader = reader;
4197
4198 let mut count = 0;
4199 let mut writer = writer_lock.write().await;
4200
4201 if label_meta.is_some() {
4202 let target_props = schema
4203 .properties
4204 .get(target)
4205 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4206
4207 for batch in reader.by_ref() {
4208 let batch = batch?;
4209 for row in 0..batch.num_rows() {
4210 let mut props = HashMap::new();
4211 for field in batch.schema().fields() {
4212 let name = field.name();
4213 if target_props.contains_key(name) {
4214 let col = batch.column_by_name(name).unwrap();
4215 if !col.is_null(row) {
4216 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4218 let val =
4219 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4220 props.insert(name.clone(), val);
4221 }
4222 }
4223 }
4224 let vid = writer.next_vid().await?;
4225 writer
4226 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4227 .await?;
4228 count += 1;
4229 }
4230 }
4231 } else if let Some(meta) = edge_meta {
4232 let type_id = meta.id;
4233 let target_props = schema
4234 .properties
4235 .get(target)
4236 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4237
4238 let src_col = options
4239 .get("src_col")
4240 .and_then(|v| v.as_str())
4241 .unwrap_or("_src");
4242 let dst_col = options
4243 .get("dst_col")
4244 .and_then(|v| v.as_str())
4245 .unwrap_or("_dst");
4246
4247 for batch in reader {
4248 let batch = batch?;
4249 for row in 0..batch.num_rows() {
4250 let mut props = HashMap::new();
4251 let mut src_vid = None;
4252 let mut dst_vid = None;
4253
4254 for field in batch.schema().fields() {
4255 let name = field.name();
4256 let col = batch.column_by_name(name).unwrap();
4257 if col.is_null(row) {
4258 continue;
4259 }
4260
4261 if name == src_col {
4262 let val = Self::arrow_to_value(col.as_ref(), row);
4263 src_vid = Some(Self::vid_from_value(&val)?);
4264 } else if name == dst_col {
4265 let val = Self::arrow_to_value(col.as_ref(), row);
4266 dst_vid = Some(Self::vid_from_value(&val)?);
4267 } else if let Some(pm) = target_props.get(name) {
4268 let val =
4270 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4271 props.insert(name.clone(), val);
4272 }
4273 }
4274
4275 let src = src_vid
4276 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4277 let dst = dst_vid.ok_or_else(|| {
4278 anyhow!("Missing destination VID in column '{}'", dst_col)
4279 })?;
4280
4281 let eid = writer.next_eid(type_id).await?;
4282 writer
4283 .insert_edge(
4284 src,
4285 dst,
4286 type_id,
4287 eid,
4288 props,
4289 Some(target.to_string()),
4290 None,
4291 )
4292 .await?;
4293 count += 1;
4294 }
4295 }
4296 }
4297
4298 let mut res = HashMap::new();
4299 res.insert("count".to_string(), Value::Int(count as i64));
4300 Ok(vec![res])
4301 }
4302
4303 async fn open_parquet_from_cloud(
4307 &self,
4308 source_url: &str,
4309 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4310 use object_store::ObjectStore;
4311
4312 let (store, path) = build_store_from_url(source_url)?;
4313
4314 let bytes = store.get(&path).await?.bytes().await?;
4316
4317 let reader = bytes::Bytes::from(bytes.to_vec());
4319 let builder =
4320 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4321 Ok(builder.build()?)
4322 }
4323
4324 pub(crate) async fn scan_edge_type(
4325 &self,
4326 edge_type: &str,
4327 ctx: Option<&QueryContext>,
4328 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4329 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4330
4331 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4333
4334 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4336
4337 if let Some(ctx) = ctx {
4339 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4340 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4341 }
4342
4343 Ok(edges
4344 .into_iter()
4345 .map(|(eid, (src, dst))| (eid, src, dst))
4346 .collect())
4347 }
4348
4349 pub(crate) async fn scan_edge_type_l2(
4354 &self,
4355 _edge_type: &str,
4356 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4357 ) -> Result<()> {
4358 Ok(())
4361 }
4362
4363 pub(crate) async fn scan_edge_type_l1(
4365 &self,
4366 edge_type: &str,
4367 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4368 ) -> Result<()> {
4369 if let Ok(Some(batch)) = self
4370 .storage
4371 .scan_delta_table(
4372 edge_type,
4373 "fwd",
4374 &["eid", "src_vid", "dst_vid", "op", "_version"],
4375 None,
4376 )
4377 .await
4378 {
4379 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4381 HashMap::new();
4382
4383 self.process_delta_batch(&batch, &mut versioned_ops)?;
4384
4385 for (eid, (_, op, src, dst)) in versioned_ops {
4387 if op == 0 {
4388 edges.insert(eid, (src, dst));
4389 } else if op == 1 {
4390 edges.remove(&eid);
4391 }
4392 }
4393 }
4394 Ok(())
4395 }
4396
4397 pub(crate) fn process_delta_batch(
4399 &self,
4400 batch: &arrow_array::RecordBatch,
4401 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4402 ) -> Result<()> {
4403 use arrow_array::UInt64Array;
4404 let eid_col = batch
4405 .column_by_name("eid")
4406 .ok_or(anyhow!("Missing eid"))?
4407 .as_any()
4408 .downcast_ref::<UInt64Array>()
4409 .ok_or(anyhow!("Invalid eid"))?;
4410 let src_col = batch
4411 .column_by_name("src_vid")
4412 .ok_or(anyhow!("Missing src_vid"))?
4413 .as_any()
4414 .downcast_ref::<UInt64Array>()
4415 .ok_or(anyhow!("Invalid src_vid"))?;
4416 let dst_col = batch
4417 .column_by_name("dst_vid")
4418 .ok_or(anyhow!("Missing dst_vid"))?
4419 .as_any()
4420 .downcast_ref::<UInt64Array>()
4421 .ok_or(anyhow!("Invalid dst_vid"))?;
4422 let op_col = batch
4423 .column_by_name("op")
4424 .ok_or(anyhow!("Missing op"))?
4425 .as_any()
4426 .downcast_ref::<arrow_array::UInt8Array>()
4427 .ok_or(anyhow!("Invalid op"))?;
4428 let version_col = batch
4429 .column_by_name("_version")
4430 .ok_or(anyhow!("Missing _version"))?
4431 .as_any()
4432 .downcast_ref::<UInt64Array>()
4433 .ok_or(anyhow!("Invalid _version"))?;
4434
4435 for i in 0..batch.num_rows() {
4436 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4437 let version = version_col.value(i);
4438 let op = op_col.value(i);
4439 let src = Vid::from(src_col.value(i));
4440 let dst = Vid::from(dst_col.value(i));
4441
4442 match versioned_ops.entry(eid) {
4443 std::collections::hash_map::Entry::Vacant(e) => {
4444 e.insert((version, op, src, dst));
4445 }
4446 std::collections::hash_map::Entry::Occupied(mut e) => {
4447 if version > e.get().0 {
4448 e.insert((version, op, src, dst));
4449 }
4450 }
4451 }
4452 }
4453 Ok(())
4454 }
4455
4456 pub(crate) fn scan_edge_type_l0(
4458 &self,
4459 edge_type: &str,
4460 ctx: &QueryContext,
4461 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4462 ) {
4463 let schema = self.storage.schema_manager().schema();
4464 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4465
4466 if let Some(type_id) = type_id {
4467 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4469
4470 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4472 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4473 }
4474
4475 for pending_l0_arc in &ctx.pending_flush_l0s {
4477 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4478 }
4479 }
4480 }
4481
4482 pub(crate) fn scan_single_l0(
4484 &self,
4485 l0: &uni_store::runtime::L0Buffer,
4486 type_id: u32,
4487 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4488 ) {
4489 for edge_entry in l0.graph.edges() {
4490 if edge_entry.edge_type == type_id {
4491 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4492 }
4493 }
4494 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4496 for eid in eids_to_check {
4497 if l0.is_tombstoned(eid) {
4498 edges.remove(&eid);
4499 }
4500 }
4501 }
4502
4503 pub(crate) fn filter_tombstoned_vertex_edges(
4505 &self,
4506 ctx: &QueryContext,
4507 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4508 ) {
4509 let l0 = ctx.l0.read();
4510 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4511
4512 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4514 let tx_l0 = tx_l0_arc.read();
4515 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4516 }
4517
4518 for pending_l0_arc in &ctx.pending_flush_l0s {
4520 let pending_l0 = pending_l0_arc.read();
4521 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4522 }
4523
4524 edges.retain(|_, (src, dst)| {
4525 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4526 });
4527 }
4528
4529 pub(crate) async fn execute_project(
4531 &self,
4532 input_rows: Vec<HashMap<String, Value>>,
4533 projections: &[(Expr, Option<String>)],
4534 prop_manager: &PropertyManager,
4535 params: &HashMap<String, Value>,
4536 ctx: Option<&QueryContext>,
4537 ) -> Result<Vec<HashMap<String, Value>>> {
4538 let mut results = Vec::new();
4539 for m in input_rows {
4540 let mut row = HashMap::new();
4541 for (expr, alias) in projections {
4542 let val = self
4543 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4544 .await?;
4545 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4546 row.insert(name, val);
4547 }
4548 results.push(row);
4549 }
4550 Ok(results)
4551 }
4552
4553 pub(crate) async fn execute_unwind(
4555 &self,
4556 input_rows: Vec<HashMap<String, Value>>,
4557 expr: &Expr,
4558 variable: &str,
4559 prop_manager: &PropertyManager,
4560 params: &HashMap<String, Value>,
4561 ctx: Option<&QueryContext>,
4562 ) -> Result<Vec<HashMap<String, Value>>> {
4563 let mut results = Vec::new();
4564 for row in input_rows {
4565 let val = self
4566 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4567 .await?;
4568 if let Value::List(items) = val {
4569 for item in items {
4570 let mut new_row = row.clone();
4571 new_row.insert(variable.to_string(), item);
4572 results.push(new_row);
4573 }
4574 }
4575 }
4576 Ok(results)
4577 }
4578
4579 pub(crate) async fn execute_apply(
4581 &self,
4582 input_rows: Vec<HashMap<String, Value>>,
4583 subquery: &LogicalPlan,
4584 input_filter: Option<&Expr>,
4585 prop_manager: &PropertyManager,
4586 params: &HashMap<String, Value>,
4587 ctx: Option<&QueryContext>,
4588 ) -> Result<Vec<HashMap<String, Value>>> {
4589 let mut filtered_rows = input_rows;
4590
4591 if let Some(filter) = input_filter {
4592 let mut filtered = Vec::new();
4593 for row in filtered_rows {
4594 let res = self
4595 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4596 .await?;
4597 if res.as_bool().unwrap_or(false) {
4598 filtered.push(row);
4599 }
4600 }
4601 filtered_rows = filtered;
4602 }
4603
4604 if filtered_rows.is_empty() {
4607 let sub_rows = self
4608 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4609 .await?;
4610 return Ok(sub_rows);
4611 }
4612
4613 let mut results = Vec::new();
4614 for row in filtered_rows {
4615 let mut sub_params = params.clone();
4616 sub_params.extend(row.clone());
4617
4618 let sub_rows = self
4619 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4620 .await?;
4621
4622 for sub_row in sub_rows {
4623 let mut new_row = row.clone();
4624 new_row.extend(sub_row);
4625 results.push(new_row);
4626 }
4627 }
4628 Ok(results)
4629 }
4630
4631 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4633 let schema = self.storage.schema_manager().schema();
4634 let mut rows = Vec::new();
4635 for idx in &schema.indexes {
4636 let (name, type_str, details) = match idx {
4637 uni_common::core::schema::IndexDefinition::Vector(c) => (
4638 c.name.clone(),
4639 "VECTOR",
4640 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4641 ),
4642 uni_common::core::schema::IndexDefinition::FullText(c) => (
4643 c.name.clone(),
4644 "FULLTEXT",
4645 format!("on {}:{:?}", c.label, c.properties),
4646 ),
4647 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4648 cfg.name.clone(),
4649 "SCALAR",
4650 format!(":{}({:?})", cfg.label, cfg.properties),
4651 ),
4652 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4653 };
4654
4655 if let Some(f) = filter
4656 && f != type_str
4657 {
4658 continue;
4659 }
4660
4661 let mut row = HashMap::new();
4662 row.insert("name".to_string(), Value::String(name));
4663 row.insert("type".to_string(), Value::String(type_str.to_string()));
4664 row.insert("details".to_string(), Value::String(details));
4665 rows.push(row);
4666 }
4667 rows
4668 }
4669
4670 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4671 let mut row = HashMap::new();
4672 row.insert("name".to_string(), Value::String("uni".to_string()));
4673 vec![row]
4675 }
4676
4677 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4678 vec![]
4680 }
4681
4682 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4683 let snapshot = self
4684 .storage
4685 .snapshot_manager()
4686 .load_latest_snapshot()
4687 .await?;
4688 let mut results = Vec::new();
4689
4690 if let Some(snap) = snapshot {
4691 for (label, s) in &snap.vertices {
4692 let mut row = HashMap::new();
4693 row.insert("type".to_string(), Value::String("Label".to_string()));
4694 row.insert("name".to_string(), Value::String(label.clone()));
4695 row.insert("count".to_string(), Value::Int(s.count as i64));
4696 results.push(row);
4697 }
4698 for (edge, s) in &snap.edges {
4699 let mut row = HashMap::new();
4700 row.insert("type".to_string(), Value::String("Edge".to_string()));
4701 row.insert("name".to_string(), Value::String(edge.clone()));
4702 row.insert("count".to_string(), Value::Int(s.count as i64));
4703 results.push(row);
4704 }
4705 }
4706
4707 Ok(results)
4708 }
4709
4710 pub(crate) fn execute_show_constraints(
4711 &self,
4712 clause: ShowConstraints,
4713 ) -> Vec<HashMap<String, Value>> {
4714 let schema = self.storage.schema_manager().schema();
4715 let mut rows = Vec::new();
4716 for c in &schema.constraints {
4717 if let Some(target) = &clause.target {
4718 match (target, &c.target) {
4719 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4720 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4721 if e1 == e2 => {}
4722 _ => continue,
4723 }
4724 }
4725
4726 let mut row = HashMap::new();
4727 row.insert("name".to_string(), Value::String(c.name.clone()));
4728 let type_str = match c.constraint_type {
4729 ConstraintType::Unique { .. } => "UNIQUE",
4730 ConstraintType::Exists { .. } => "EXISTS",
4731 ConstraintType::Check { .. } => "CHECK",
4732 _ => "UNKNOWN",
4733 };
4734 row.insert("type".to_string(), Value::String(type_str.to_string()));
4735
4736 let target_str = match &c.target {
4737 ConstraintTarget::Label(l) => format!("(:{})", l),
4738 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4739 _ => "UNKNOWN".to_string(),
4740 };
4741 row.insert("target".to_string(), Value::String(target_str));
4742
4743 rows.push(row);
4744 }
4745 rows
4746 }
4747
4748 pub(crate) async fn execute_cross_join(
4750 &self,
4751 left: Box<LogicalPlan>,
4752 right: Box<LogicalPlan>,
4753 prop_manager: &PropertyManager,
4754 params: &HashMap<String, Value>,
4755 ctx: Option<&QueryContext>,
4756 ) -> Result<Vec<HashMap<String, Value>>> {
4757 let left_rows = self
4758 .execute_subplan(*left, prop_manager, params, ctx)
4759 .await?;
4760 let right_rows = self
4761 .execute_subplan(*right, prop_manager, params, ctx)
4762 .await?;
4763
4764 let mut results = Vec::new();
4765 for l in &left_rows {
4766 for r in &right_rows {
4767 let mut combined = l.clone();
4768 combined.extend(r.clone());
4769 results.push(combined);
4770 }
4771 }
4772 Ok(results)
4773 }
4774
4775 pub(crate) async fn execute_union(
4777 &self,
4778 left: Box<LogicalPlan>,
4779 right: Box<LogicalPlan>,
4780 all: bool,
4781 prop_manager: &PropertyManager,
4782 params: &HashMap<String, Value>,
4783 ctx: Option<&QueryContext>,
4784 ) -> Result<Vec<HashMap<String, Value>>> {
4785 let mut left_rows = self
4786 .execute_subplan(*left, prop_manager, params, ctx)
4787 .await?;
4788 let mut right_rows = self
4789 .execute_subplan(*right, prop_manager, params, ctx)
4790 .await?;
4791
4792 left_rows.append(&mut right_rows);
4793
4794 if !all {
4795 let mut seen = HashSet::new();
4796 left_rows.retain(|row| {
4797 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4798 let key = format!("{sorted_row:?}");
4799 seen.insert(key)
4800 });
4801 }
4802 Ok(left_rows)
4803 }
4804
4805 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4807 let schema = self.storage.schema_manager().schema();
4808 schema.indexes.iter().any(|idx| match idx {
4809 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4810 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4811 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4812 _ => false,
4813 })
4814 }
4815
4816 pub(crate) async fn execute_export(
4817 &self,
4818 target: &str,
4819 source: &str,
4820 options: &HashMap<String, Value>,
4821 prop_manager: &PropertyManager,
4822 ctx: Option<&QueryContext>,
4823 ) -> Result<Vec<HashMap<String, Value>>> {
4824 let format = options
4825 .get("format")
4826 .and_then(|v| v.as_str())
4827 .unwrap_or("csv")
4828 .to_lowercase();
4829
4830 match format.as_str() {
4831 "csv" => {
4832 self.execute_csv_export(target, source, options, prop_manager, ctx)
4833 .await
4834 }
4835 "parquet" => {
4836 self.execute_parquet_export(target, source, options, prop_manager, ctx)
4837 .await
4838 }
4839 _ => Err(anyhow!("Unsupported export format: {}", format)),
4840 }
4841 }
4842
4843 pub(crate) async fn execute_csv_export(
4844 &self,
4845 target: &str,
4846 source: &str,
4847 options: &HashMap<String, Value>,
4848 prop_manager: &PropertyManager,
4849 ctx: Option<&QueryContext>,
4850 ) -> Result<Vec<HashMap<String, Value>>> {
4851 let validated_dest = self.validate_path(source)?;
4853
4854 let schema = self.storage.schema_manager().schema();
4855 let label_meta = schema.labels.get(target);
4856 let edge_meta = schema.edge_types.get(target);
4857
4858 if label_meta.is_none() && edge_meta.is_none() {
4859 return Err(anyhow!("Target '{}' not found in schema", target));
4860 }
4861
4862 let delimiter_str = options
4863 .get("delimiter")
4864 .and_then(|v| v.as_str())
4865 .unwrap_or(",");
4866 let delimiter = if delimiter_str.is_empty() {
4867 b','
4868 } else {
4869 delimiter_str.as_bytes()[0]
4870 };
4871 let has_header = options
4872 .get("header")
4873 .and_then(|v| v.as_bool())
4874 .unwrap_or(true);
4875
4876 let mut wtr = csv::WriterBuilder::new()
4877 .delimiter(delimiter)
4878 .from_path(&validated_dest)?;
4879
4880 let mut count = 0;
4881 let empty_props = HashMap::new();
4883
4884 if let Some(meta) = label_meta {
4885 let label_id = meta.id;
4886 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4887 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4888 prop_names.sort();
4889
4890 let mut headers = vec!["_vid".to_string()];
4891 headers.extend(prop_names.clone());
4892
4893 if has_header {
4894 wtr.write_record(&headers)?;
4895 }
4896
4897 let vids = self
4898 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4899 .await?;
4900
4901 for vid in vids {
4902 let props = prop_manager
4903 .get_all_vertex_props_with_ctx(vid, ctx)
4904 .await?
4905 .unwrap_or_default();
4906
4907 let mut row = Vec::with_capacity(headers.len());
4908 row.push(vid.to_string());
4909 for p_name in &prop_names {
4910 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4911 row.push(self.format_csv_value(val));
4912 }
4913 wtr.write_record(&row)?;
4914 count += 1;
4915 }
4916 } else if let Some(meta) = edge_meta {
4917 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4918 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4919 prop_names.sort();
4920
4921 let mut headers = vec![
4923 "_eid".to_string(),
4924 "_src".to_string(),
4925 "_dst".to_string(),
4926 "_type".to_string(),
4927 ];
4928 headers.extend(prop_names.clone());
4929
4930 if has_header {
4931 wtr.write_record(&headers)?;
4932 }
4933
4934 let edges = self.scan_edge_type(target, ctx).await?;
4935
4936 for (eid, src, dst) in edges {
4937 let props = prop_manager
4938 .get_all_edge_props_with_ctx(eid, ctx)
4939 .await?
4940 .unwrap_or_default();
4941
4942 let mut row = Vec::with_capacity(headers.len());
4943 row.push(eid.to_string());
4944 row.push(src.to_string());
4945 row.push(dst.to_string());
4946 row.push(meta.id.to_string());
4947
4948 for p_name in &prop_names {
4949 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4950 row.push(self.format_csv_value(val));
4951 }
4952 wtr.write_record(&row)?;
4953 count += 1;
4954 }
4955 }
4956
4957 wtr.flush()?;
4958 let mut res = HashMap::new();
4959 res.insert("count".to_string(), Value::Int(count as i64));
4960 Ok(vec![res])
4961 }
4962
4963 pub(crate) async fn execute_parquet_export(
4967 &self,
4968 target: &str,
4969 destination: &str,
4970 _options: &HashMap<String, Value>,
4971 prop_manager: &PropertyManager,
4972 ctx: Option<&QueryContext>,
4973 ) -> Result<Vec<HashMap<String, Value>>> {
4974 let schema_manager = self.storage.schema_manager();
4975 let schema = schema_manager.schema();
4976 let label_meta = schema.labels.get(target);
4977 let edge_meta = schema.edge_types.get(target);
4978
4979 if label_meta.is_none() && edge_meta.is_none() {
4980 return Err(anyhow!("Target '{}' not found in schema", target));
4981 }
4982
4983 let arrow_schema = if label_meta.is_some() {
4984 let dataset = self.storage.vertex_dataset(target)?;
4985 dataset.get_arrow_schema(&schema)?
4986 } else {
4987 let dataset = self.storage.edge_dataset(target, "", "")?;
4989 dataset.get_arrow_schema(&schema)?
4990 };
4991
4992 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
4993
4994 if let Some(meta) = label_meta {
4995 let label_id = meta.id;
4996 let vids = self
4997 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4998 .await?;
4999
5000 for vid in vids {
5001 let mut props = prop_manager
5002 .get_all_vertex_props_with_ctx(vid, ctx)
5003 .await?
5004 .unwrap_or_default();
5005
5006 props.insert(
5007 "_vid".to_string(),
5008 uni_common::Value::Int(vid.as_u64() as i64),
5009 );
5010 if !props.contains_key("_uid") {
5011 props.insert(
5012 "_uid".to_string(),
5013 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5014 );
5015 }
5016 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5017 props.insert("_version".to_string(), uni_common::Value::Int(1));
5018 rows.push(props);
5019 }
5020 } else if edge_meta.is_some() {
5021 let edges = self.scan_edge_type(target, ctx).await?;
5022 for (eid, src, dst) in edges {
5023 let mut props = prop_manager
5024 .get_all_edge_props_with_ctx(eid, ctx)
5025 .await?
5026 .unwrap_or_default();
5027
5028 props.insert(
5029 "eid".to_string(),
5030 uni_common::Value::Int(eid.as_u64() as i64),
5031 );
5032 props.insert(
5033 "src_vid".to_string(),
5034 uni_common::Value::Int(src.as_u64() as i64),
5035 );
5036 props.insert(
5037 "dst_vid".to_string(),
5038 uni_common::Value::Int(dst.as_u64() as i64),
5039 );
5040 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5041 props.insert("_version".to_string(), uni_common::Value::Int(1));
5042 rows.push(props);
5043 }
5044 }
5045
5046 if is_cloud_url(destination) {
5048 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5049 .await?;
5050 } else {
5051 let validated_dest = self.validate_path(destination)?;
5053 let file = std::fs::File::create(&validated_dest)?;
5054 let mut writer =
5055 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5056
5057 if !rows.is_empty() {
5059 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5060 writer.write(&batch)?;
5061 }
5062
5063 writer.close()?;
5064 }
5065
5066 let mut res = HashMap::new();
5067 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5068 Ok(vec![res])
5069 }
5070
5071 async fn write_parquet_to_cloud(
5073 &self,
5074 dest_url: &str,
5075 rows: &[HashMap<String, uni_common::Value>],
5076 arrow_schema: &arrow_schema::Schema,
5077 ) -> Result<()> {
5078 use object_store::ObjectStore;
5079
5080 let (store, path) = build_store_from_url(dest_url)?;
5081
5082 let mut buffer = Vec::new();
5084 {
5085 let mut writer = parquet::arrow::ArrowWriter::try_new(
5086 &mut buffer,
5087 Arc::new(arrow_schema.clone()),
5088 None,
5089 )?;
5090
5091 if !rows.is_empty() {
5092 let batch = self.rows_to_batch(rows, arrow_schema)?;
5093 writer.write(&batch)?;
5094 }
5095
5096 writer.close()?;
5097 }
5098
5099 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5101
5102 Ok(())
5103 }
5104
5105 pub(crate) fn rows_to_batch(
5106 &self,
5107 rows: &[HashMap<String, uni_common::Value>],
5108 schema: &arrow_schema::Schema,
5109 ) -> Result<RecordBatch> {
5110 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5111
5112 for field in schema.fields() {
5113 let name = field.name();
5114 let dt = field.data_type();
5115
5116 let values: Vec<uni_common::Value> = rows
5117 .iter()
5118 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5119 .collect();
5120 let array = self.values_to_array(&values, dt)?;
5121 columns.push(array);
5122 }
5123
5124 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5125 }
5126
5127 pub(crate) fn values_to_array(
5130 &self,
5131 values: &[uni_common::Value],
5132 dt: &arrow_schema::DataType,
5133 ) -> Result<Arc<dyn Array>> {
5134 arrow_convert::values_to_array(values, dt)
5135 }
5136
5137 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5138 match val {
5139 Value::Null => "".to_string(),
5140 Value::String(s) => s,
5141 Value::Int(i) => i.to_string(),
5142 Value::Float(f) => f.to_string(),
5143 Value::Bool(b) => b.to_string(),
5144 _ => format!("{val}"),
5145 }
5146 }
5147
5148 pub(crate) fn parse_csv_value(
5149 &self,
5150 s: &str,
5151 data_type: &uni_common::core::schema::DataType,
5152 prop_name: &str,
5153 ) -> Result<Value> {
5154 if s.is_empty() || s.to_lowercase() == "null" {
5155 return Ok(Value::Null);
5156 }
5157
5158 use uni_common::core::schema::DataType;
5159 match data_type {
5160 DataType::String => Ok(Value::String(s.to_string())),
5161 DataType::Int32 | DataType::Int64 => {
5162 let i = s.parse::<i64>().map_err(|_| {
5163 anyhow!(
5164 "Failed to parse integer for property '{}': {}",
5165 prop_name,
5166 s
5167 )
5168 })?;
5169 Ok(Value::Int(i))
5170 }
5171 DataType::Float32 | DataType::Float64 => {
5172 let f = s.parse::<f64>().map_err(|_| {
5173 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5174 })?;
5175 Ok(Value::Float(f))
5176 }
5177 DataType::Bool => {
5178 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5179 anyhow!(
5180 "Failed to parse boolean for property '{}': {}",
5181 prop_name,
5182 s
5183 )
5184 })?;
5185 Ok(Value::Bool(b))
5186 }
5187 DataType::CypherValue => {
5188 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5189 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5190 })?;
5191 Ok(Value::from(json_val))
5192 }
5193 DataType::Vector { .. } => {
5194 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5195 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5196 })?;
5197 Ok(Value::Vector(v))
5198 }
5199 _ => Ok(Value::String(s.to_string())),
5200 }
5201 }
5202
5203 pub(crate) async fn detach_delete_vertex(
5204 &self,
5205 vid: Vid,
5206 writer: &mut Writer,
5207 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5208 ) -> Result<()> {
5209 let schema = self.storage.schema_manager().schema();
5210 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5211
5212 let out_graph = self
5214 .storage
5215 .load_subgraph_cached(
5216 &[vid],
5217 &edge_type_ids,
5218 1,
5219 uni_store::runtime::Direction::Outgoing,
5220 Some(writer.l0_manager.get_current()),
5221 )
5222 .await?;
5223
5224 for edge in out_graph.edges() {
5225 writer
5226 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5227 .await?;
5228 }
5229
5230 let in_graph = self
5232 .storage
5233 .load_subgraph_cached(
5234 &[vid],
5235 &edge_type_ids,
5236 1,
5237 uni_store::runtime::Direction::Incoming,
5238 Some(writer.l0_manager.get_current()),
5239 )
5240 .await?;
5241
5242 for edge in in_graph.edges() {
5243 writer
5244 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5245 .await?;
5246 }
5247
5248 Ok(())
5249 }
5250
5251 pub(crate) async fn batch_detach_delete_vertices(
5253 &self,
5254 vids: &[Vid],
5255 labels_per_vid: Vec<Option<Vec<String>>>,
5256 writer: &mut Writer,
5257 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5258 ) -> Result<()> {
5259 let schema = self.storage.schema_manager().schema();
5260 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5261
5262 let out_graph = self
5264 .storage
5265 .load_subgraph_cached(
5266 vids,
5267 &edge_type_ids,
5268 1,
5269 uni_store::runtime::Direction::Outgoing,
5270 Some(writer.l0_manager.get_current()),
5271 )
5272 .await?;
5273
5274 for edge in out_graph.edges() {
5275 writer
5276 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5277 .await?;
5278 }
5279
5280 let in_graph = self
5282 .storage
5283 .load_subgraph_cached(
5284 vids,
5285 &edge_type_ids,
5286 1,
5287 uni_store::runtime::Direction::Incoming,
5288 Some(writer.l0_manager.get_current()),
5289 )
5290 .await?;
5291
5292 for edge in in_graph.edges() {
5293 writer
5294 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5295 .await?;
5296 }
5297
5298 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5300 writer.delete_vertex(*vid, labels, tx_l0).await?;
5301 }
5302
5303 Ok(())
5304 }
5305}