1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54
55use crate::query::df_graph::L0Context;
57use crate::query::df_planner::HybridPhysicalPlanner;
58use datafusion::physical_plan::ExecutionPlanProperties;
59use datafusion::prelude::SessionContext;
60use parking_lot::RwLock as SyncRwLock;
61
62use arrow_array::{Array, RecordBatch};
63use csv;
64use parquet;
65
66use super::core::*;
67
68const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
70const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
72
73fn collect_l0_vids(
78 ctx: Option<&QueryContext>,
79 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
80) -> Vec<Vid> {
81 let mut vids = Vec::new();
82 if let Some(ctx) = ctx {
83 vids.extend(extractor(&ctx.l0.read()));
84 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
85 vids.extend(extractor(&tx_l0_arc.read()));
86 }
87 for pending_l0_arc in &ctx.pending_flush_l0s {
88 vids.extend(extractor(&pending_l0_arc.read()));
89 }
90 }
91 vids
92}
93
94async fn hydrate_entity_if_needed(
103 map: &mut HashMap<String, Value>,
104 prop_manager: &PropertyManager,
105 ctx: Option<&QueryContext>,
106) {
107 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
109 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
110 tracing::debug!(
111 "Pushdown fallback: hydrating edge {} at execution time",
112 eid_u64
113 );
114 if let Ok(Some(props)) = prop_manager
115 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
116 .await
117 {
118 for (key, value) in props {
119 map.entry(key).or_insert(value);
120 }
121 }
122 } else {
123 tracing::trace!(
124 "Pushdown success: edge {} already has {} properties",
125 eid_u64,
126 map.len() - EDGE_SYSTEM_FIELD_COUNT
127 );
128 }
129 return;
130 }
131
132 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
134 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
135 tracing::debug!(
136 "Pushdown fallback: hydrating vertex {} at execution time",
137 vid_u64
138 );
139 if let Ok(Some(props)) = prop_manager
140 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
141 .await
142 {
143 for (key, value) in props {
144 map.entry(key).or_insert(value);
145 }
146 }
147 } else {
148 tracing::trace!(
149 "Pushdown success: vertex {} already has {} properties",
150 vid_u64,
151 map.len() - VERTEX_SYSTEM_FIELD_COUNT
152 );
153 }
154 }
155}
156
157impl Executor {
158 async fn verify_and_filter_candidates(
163 &self,
164 mut candidates: Vec<Vid>,
165 variable: &str,
166 filter: Option<&Expr>,
167 ctx: Option<&QueryContext>,
168 prop_manager: &PropertyManager,
169 params: &HashMap<String, Value>,
170 ) -> Result<Vec<Vid>> {
171 candidates.sort_unstable();
172 candidates.dedup();
173
174 let mut verified_vids = Vec::new();
175 for vid in candidates {
176 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
177 continue; };
179
180 if let Some(expr) = filter {
181 let mut props_map: HashMap<String, Value> = props;
182 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
183
184 let mut row = HashMap::new();
185 row.insert(variable.to_string(), Value::Map(props_map));
186
187 let res = self
188 .evaluate_expr(expr, &row, prop_manager, params, ctx)
189 .await?;
190 if res.as_bool().unwrap_or(false) {
191 verified_vids.push(vid);
192 }
193 } else {
194 verified_vids.push(vid);
195 }
196 }
197
198 Ok(verified_vids)
199 }
200
201 pub(crate) async fn scan_storage_candidates(
202 &self,
203 label_id: u16,
204 variable: &str,
205 filter: Option<&Expr>,
206 ) -> Result<Vec<Vid>> {
207 let schema = self.storage.schema_manager().schema();
208 let label_name = schema
209 .label_name_by_id(label_id)
210 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
211
212 let empty_props = std::collections::HashMap::new();
214 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
215 let filter_sql = filter.and_then(|expr| {
216 LanceFilterGenerator::generate(std::slice::from_ref(expr), variable, Some(label_props))
217 });
218
219 self.storage
220 .scan_vertex_candidates(label_name, filter_sql.as_deref())
221 .await
222 }
223
224 pub(crate) async fn scan_label_with_filter(
225 &self,
226 label_id: u16,
227 variable: &str,
228 filter: Option<&Expr>,
229 ctx: Option<&QueryContext>,
230 prop_manager: &PropertyManager,
231 params: &HashMap<String, Value>,
232 ) -> Result<Vec<Vid>> {
233 let mut candidates = self
234 .scan_storage_candidates(label_id, variable, filter)
235 .await?;
236
237 let schema = self.storage.schema_manager().schema();
239 if let Some(label_name) = schema.label_name_by_id(label_id) {
240 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
241 }
242
243 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
244 .await
245 }
246
247 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
248 if let Value::Node(node) = val {
250 return Ok(node.vid);
251 }
252 if let Value::Map(map) = val
254 && let Some(vid_val) = map.get("_vid")
255 && let Some(v) = vid_val.as_u64()
256 {
257 return Ok(Vid::from(v));
258 }
259 if let Some(s) = val.as_str()
261 && let Ok(id) = s.parse::<u64>()
262 {
263 return Ok(Vid::new(id));
264 }
265 if let Some(v) = val.as_u64() {
267 return Ok(Vid::from(v));
268 }
269 Err(anyhow!("Invalid Vid format: {:?}", val))
270 }
271
272 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
278 for val in row.values() {
279 if let Ok(vid) = Self::vid_from_value(val)
280 && vid == target_vid
281 {
282 return val.clone();
283 }
284 }
285 Value::Map(HashMap::from([(
287 "_vid".to_string(),
288 Value::Int(target_vid.as_u64() as i64),
289 )]))
290 }
291
292 pub async fn create_datafusion_planner(
297 &self,
298 prop_manager: &PropertyManager,
299 params: &HashMap<String, Value>,
300 ) -> Result<(
301 Arc<SyncRwLock<SessionContext>>,
302 HybridPhysicalPlanner,
303 Arc<PropertyManager>,
304 )> {
305 let query_ctx = self.get_context().await;
306 let l0_context = match query_ctx {
307 Some(ref ctx) => L0Context::from_query_context(ctx),
308 None => L0Context::empty(),
309 };
310
311 let prop_manager_arc = Arc::new(PropertyManager::new(
312 self.storage.clone(),
313 self.storage.schema_manager_arc(),
314 prop_manager.cache_size(),
315 ));
316
317 let session = SessionContext::new();
318 crate::query::df_udfs::register_cypher_udfs(&session)?;
319 if let Some(ref registry) = self.custom_function_registry {
320 crate::query::df_udfs::register_custom_udfs(&session, registry)?;
321 }
322 let session_ctx = Arc::new(SyncRwLock::new(session));
323
324 let mut planner = HybridPhysicalPlanner::with_l0_context(
325 session_ctx.clone(),
326 self.storage.clone(),
327 l0_context,
328 prop_manager_arc.clone(),
329 self.storage.schema_manager().schema(),
330 params.clone(),
331 HashMap::new(),
332 );
333
334 planner = planner.with_algo_registry(self.algo_registry.clone());
335 if let Some(ref registry) = self.procedure_registry {
336 planner = planner.with_procedure_registry(registry.clone());
337 }
338 if let Some(ref xervo_runtime) = self.xervo_runtime {
339 planner = planner.with_xervo_runtime(xervo_runtime.clone());
340 }
341
342 Ok((session_ctx, planner, prop_manager_arc))
343 }
344
345 pub fn collect_batches(
347 session_ctx: &Arc<SyncRwLock<SessionContext>>,
348 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
349 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
350 Box::pin(async move {
351 use futures::TryStreamExt;
352
353 let task_ctx = session_ctx.read().task_ctx();
354 let partition_count = execution_plan.output_partitioning().partition_count();
355 let mut all_batches = Vec::new();
356 for partition in 0..partition_count {
357 let stream = execution_plan.execute(partition, task_ctx.clone())?;
358 let batches: Vec<RecordBatch> = stream.try_collect().await?;
359 all_batches.extend(batches);
360 }
361 Ok(all_batches)
362 })
363 }
364
365 pub async fn execute_datafusion(
370 &self,
371 plan: LogicalPlan,
372 prop_manager: &PropertyManager,
373 params: &HashMap<String, Value>,
374 ) -> Result<Vec<RecordBatch>> {
375 let (batches, _plan) = self
376 .execute_datafusion_with_plan(plan, prop_manager, params)
377 .await?;
378 Ok(batches)
379 }
380
381 pub async fn execute_datafusion_with_plan(
388 &self,
389 plan: LogicalPlan,
390 prop_manager: &PropertyManager,
391 params: &HashMap<String, Value>,
392 ) -> Result<(
393 Vec<RecordBatch>,
394 Arc<dyn datafusion::physical_plan::ExecutionPlan>,
395 )> {
396 let (session_ctx, mut planner, prop_manager_arc) =
397 self.create_datafusion_planner(prop_manager, params).await?;
398
399 if Self::contains_write_operations(&plan) {
401 let writer = self
402 .writer
403 .as_ref()
404 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
405 .clone();
406 let query_ctx = self.get_context().await;
407
408 debug_assert!(
409 query_ctx.is_some(),
410 "BUG: query_ctx is None for write operation"
411 );
412
413 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
414 executor: self.clone(),
415 writer,
416 prop_manager: prop_manager_arc,
417 params: params.clone(),
418 query_ctx,
419 tx_l0_override: self.transaction_l0_override.clone(),
420 });
421 planner = planner.with_mutation_context(mutation_ctx);
422 tracing::debug!(
423 plan_type = Self::get_plan_type(&plan),
424 "Mutation routed to DataFusion engine"
425 );
426 }
427
428 let execution_plan = planner.plan(&plan)?;
429 let plan_clone = Arc::clone(&execution_plan);
430 let result = Self::collect_batches(&session_ctx, execution_plan).await;
431
432 let graph_warnings = planner.graph_ctx().take_warnings();
434 if !graph_warnings.is_empty()
435 && let Ok(mut w) = self.warnings.lock()
436 {
437 w.extend(graph_warnings);
438 }
439
440 result.map(|batches| (batches, plan_clone))
441 }
442
443 pub(crate) async fn execute_merge_read_plan(
449 &self,
450 plan: LogicalPlan,
451 prop_manager: &PropertyManager,
452 params: &HashMap<String, Value>,
453 merge_variables: Vec<String>,
454 ) -> Result<Vec<HashMap<String, Value>>> {
455 let (session_ctx, planner, _prop_manager_arc) =
456 self.create_datafusion_planner(prop_manager, params).await?;
457
458 let extra: HashMap<String, HashSet<String>> = merge_variables
461 .iter()
462 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
463 .collect();
464 let execution_plan = planner.plan_with_properties(&plan, extra)?;
465 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
466
467 let flat_rows = self.record_batches_to_rows(all_batches)?;
469
470 let rows = flat_rows
473 .into_iter()
474 .map(|mut row| {
475 for var in &merge_variables {
476 if row.contains_key(var) {
478 continue;
479 }
480 let prefix = format!("{}.", var);
481 let dotted_keys: Vec<String> = row
482 .keys()
483 .filter(|k| k.starts_with(&prefix))
484 .cloned()
485 .collect();
486 if !dotted_keys.is_empty() {
487 let mut map = HashMap::new();
488 for key in dotted_keys {
489 let prop_name = key[prefix.len()..].to_string();
490 if let Some(val) = row.remove(&key) {
491 map.insert(prop_name, val);
492 }
493 }
494 row.insert(var.clone(), Value::Map(map));
495 }
496 }
497 row
498 })
499 .collect();
500
501 Ok(rows)
502 }
503
504 pub(crate) fn record_batches_to_rows(
511 &self,
512 batches: Vec<RecordBatch>,
513 ) -> Result<Vec<HashMap<String, Value>>> {
514 let mut rows = Vec::new();
515
516 for batch in batches {
517 let num_rows = batch.num_rows();
518 let schema = batch.schema();
519
520 for row_idx in 0..num_rows {
521 let mut row = HashMap::new();
522
523 for (col_idx, field) in schema.fields().iter().enumerate() {
524 let column = batch.column(col_idx);
525 let data_type =
527 if uni_common::core::schema::is_datetime_struct(field.data_type()) {
528 Some(&uni_common::DataType::DateTime)
529 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
530 Some(&uni_common::DataType::Time)
531 } else {
532 None
533 };
534 let mut value =
535 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
536
537 if field
540 .metadata()
541 .get("cv_encoded")
542 .is_some_and(|v| v == "true")
543 && let Value::String(s) = &value
544 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
545 {
546 value = Value::from(parsed);
547 }
548
549 value = Self::normalize_path_if_needed(value);
551
552 row.insert(field.name().clone(), value);
553 }
554
555 let bare_vars: Vec<String> = row
564 .keys()
565 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
566 .cloned()
567 .collect();
568
569 let vid_placeholder_vars: Vec<String> = row
573 .keys()
574 .filter(|k| {
575 !k.contains('.')
576 && matches!(row.get(*k), Some(Value::String(_)))
577 && row.contains_key(&format!("{}._vid", k))
578 })
579 .cloned()
580 .collect();
581
582 for var in &vid_placeholder_vars {
583 let prefix = format!("{}.", var);
585 let mut map = HashMap::new();
586
587 let dotted_keys: Vec<String> = row
588 .keys()
589 .filter(|k| k.starts_with(&prefix))
590 .cloned()
591 .collect();
592
593 for key in &dotted_keys {
594 let prop_name = &key[prefix.len()..];
595 if let Some(val) = row.remove(key) {
596 map.insert(prop_name.to_string(), val);
597 }
598 }
599
600 row.insert(var.clone(), Value::Map(map));
602 }
603
604 for var in &bare_vars {
605 let vid_key = format!("{}._vid", var);
607 let labels_key = format!("{}._labels", var);
608
609 let vid_val = row.remove(&vid_key);
610 let labels_val = row.remove(&labels_key);
611
612 if let Some(Value::Map(map)) = row.get_mut(var) {
613 if let Some(v) = vid_val {
614 map.insert("_vid".to_string(), v);
615 }
616 if let Some(v) = labels_val {
617 map.insert("_labels".to_string(), v);
618 }
619 }
620
621 let eid_key = format!("{}._eid", var);
626 let type_key = format!("{}._type", var);
627
628 let eid_val = row.remove(&eid_key);
629 let type_val = row.remove(&type_key);
630
631 if (eid_val.is_some() || type_val.is_some())
632 && let Some(Value::Map(map)) = row.get_mut(var)
633 {
634 if let Some(v) = eid_val {
635 map.entry("_eid".to_string()).or_insert(v);
636 }
637 if let Some(v) = type_val {
638 map.entry("_type".to_string()).or_insert(v);
639 }
640 }
641
642 let prefix = format!("{}.", var);
644 let helper_keys: Vec<String> = row
645 .keys()
646 .filter(|k| k.starts_with(&prefix))
647 .cloned()
648 .collect();
649 for key in helper_keys {
650 row.remove(&key);
651 }
652 }
653
654 rows.push(row);
655 }
656 }
657
658 Ok(rows)
659 }
660
661 fn normalize_path_if_needed(value: Value) -> Value {
666 match value {
667 Value::Map(map)
668 if map.contains_key("nodes")
669 && (map.contains_key("relationships") || map.contains_key("edges")) =>
670 {
671 Self::normalize_path_map(map)
672 }
673 other => other,
674 }
675 }
676
677 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
679 if let Some(Value::List(nodes)) = map.remove("nodes") {
681 let normalized_nodes: Vec<Value> = nodes
682 .into_iter()
683 .map(|n| {
684 if let Value::Map(node_map) = n {
685 Self::normalize_path_node_map(node_map)
686 } else {
687 n
688 }
689 })
690 .collect();
691 map.insert("nodes".to_string(), Value::List(normalized_nodes));
692 }
693
694 let rels_key = if map.contains_key("relationships") {
696 "relationships"
697 } else {
698 "edges"
699 };
700 if let Some(Value::List(rels)) = map.remove(rels_key) {
701 let normalized_rels: Vec<Value> = rels
702 .into_iter()
703 .map(|r| {
704 if let Value::Map(rel_map) = r {
705 Self::normalize_path_edge_map(rel_map)
706 } else {
707 r
708 }
709 })
710 .collect();
711 map.insert("relationships".to_string(), Value::List(normalized_rels));
712 }
713
714 Value::Map(map)
715 }
716
717 fn value_to_id_string(val: Value) -> String {
719 match val {
720 Value::Int(n) => n.to_string(),
721 Value::Float(n) => n.to_string(),
722 Value::String(s) => s,
723 other => other.to_string(),
724 }
725 }
726
727 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
730 if let Some(val) = map.remove(src_key) {
731 map.insert(
732 dst_key.to_string(),
733 Value::String(Self::value_to_id_string(val)),
734 );
735 }
736 }
737
738 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
740 match map.get("properties") {
741 Some(props) if !props.is_null() => {}
742 _ => {
743 map.insert("properties".to_string(), Value::Map(HashMap::new()));
744 }
745 }
746 }
747
748 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
750 Self::stringify_map_field(&mut map, "_vid", "_id");
751 Self::ensure_properties_map(&mut map);
752 Value::Map(map)
753 }
754
755 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
757 Self::stringify_map_field(&mut map, "_eid", "_id");
758 Self::stringify_map_field(&mut map, "_src", "_src");
759 Self::stringify_map_field(&mut map, "_dst", "_dst");
760
761 if let Some(type_name) = map.remove("_type_name") {
762 map.insert("_type".to_string(), type_name);
763 }
764
765 Self::ensure_properties_map(&mut map);
766 Value::Map(map)
767 }
768
769 #[instrument(
770 skip(self, prop_manager, params),
771 fields(rows_returned, duration_ms),
772 level = "info"
773 )]
774 pub fn execute<'a>(
775 &'a self,
776 plan: LogicalPlan,
777 prop_manager: &'a PropertyManager,
778 params: &'a HashMap<String, Value>,
779 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
780 Box::pin(async move {
781 let query_type = Self::get_plan_type(&plan);
782 let ctx = self.get_context().await;
783 let start = Instant::now();
784
785 let res = if Self::is_ddl_or_admin(&plan) {
788 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
789 .await
790 } else {
791 let batches = self
792 .execute_datafusion(plan.clone(), prop_manager, params)
793 .await?;
794 self.record_batches_to_rows(batches)
795 };
796
797 let duration = start.elapsed();
798 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
799 .record(duration.as_secs_f64());
800
801 tracing::Span::current().record("duration_ms", duration.as_millis());
802 match &res {
803 Ok(rows) => {
804 tracing::Span::current().record("rows_returned", rows.len());
805 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
806 .increment(rows.len() as u64);
807 }
808 Err(e) => {
809 let error_type = if e.to_string().contains("timed out") {
810 "timeout"
811 } else if e.to_string().contains("syntax") {
812 "syntax"
813 } else {
814 "execution"
815 };
816 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
817 }
818 }
819
820 res
821 })
822 }
823
824 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
825 match plan {
826 LogicalPlan::Scan { .. } => "read_scan",
827 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
828 LogicalPlan::Traverse { .. } => "read_traverse",
829 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
830 LogicalPlan::ScanAll { .. } => "read_scan_all",
831 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
832 LogicalPlan::VectorKnn { .. } => "read_vector",
833 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
834 LogicalPlan::Merge { .. } => "write_merge",
835 LogicalPlan::Delete { .. } => "write_delete",
836 LogicalPlan::Set { .. } => "write_set",
837 LogicalPlan::Remove { .. } => "write_remove",
838 LogicalPlan::ProcedureCall { .. } => "call",
839 LogicalPlan::Copy { .. } => "copy",
840 LogicalPlan::Backup { .. } => "backup",
841 _ => "other",
842 }
843 }
844
845 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
855 match plan {
856 LogicalPlan::Project { input, .. }
858 | LogicalPlan::Sort { input, .. }
859 | LogicalPlan::Limit { input, .. }
860 | LogicalPlan::Distinct { input }
861 | LogicalPlan::Aggregate { input, .. }
862 | LogicalPlan::Window { input, .. }
863 | LogicalPlan::Unwind { input, .. }
864 | LogicalPlan::Filter { input, .. }
865 | LogicalPlan::Create { input, .. }
866 | LogicalPlan::CreateBatch { input, .. }
867 | LogicalPlan::Set { input, .. }
868 | LogicalPlan::Remove { input, .. }
869 | LogicalPlan::Delete { input, .. }
870 | LogicalPlan::Merge { input, .. }
871 | LogicalPlan::Foreach { input, .. }
872 | LogicalPlan::Traverse { input, .. }
873 | LogicalPlan::TraverseMainByType { input, .. }
874 | LogicalPlan::BindZeroLengthPath { input, .. }
875 | LogicalPlan::BindPath { input, .. }
876 | LogicalPlan::ShortestPath { input, .. }
877 | LogicalPlan::AllShortestPaths { input, .. }
878 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
879
880 LogicalPlan::Apply {
882 input, subquery, ..
883 }
884 | LogicalPlan::SubqueryCall { input, subquery } => {
885 vec![input.as_ref(), subquery.as_ref()]
886 }
887 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
888 vec![left.as_ref(), right.as_ref()]
889 }
890 LogicalPlan::RecursiveCTE {
891 initial, recursive, ..
892 } => vec![initial.as_ref(), recursive.as_ref()],
893 LogicalPlan::QuantifiedPattern {
894 input,
895 pattern_plan,
896 ..
897 } => vec![input.as_ref(), pattern_plan.as_ref()],
898
899 _ => vec![],
901 }
902 }
903
904 pub(crate) fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
911 match plan {
912 LogicalPlan::CreateLabel(_)
914 | LogicalPlan::CreateEdgeType(_)
915 | LogicalPlan::AlterLabel(_)
916 | LogicalPlan::AlterEdgeType(_)
917 | LogicalPlan::DropLabel(_)
918 | LogicalPlan::DropEdgeType(_)
919 | LogicalPlan::CreateConstraint(_)
920 | LogicalPlan::DropConstraint(_)
921 | LogicalPlan::ShowConstraints(_) => true,
922
923 LogicalPlan::CreateVectorIndex { .. }
925 | LogicalPlan::CreateFullTextIndex { .. }
926 | LogicalPlan::CreateScalarIndex { .. }
927 | LogicalPlan::CreateJsonFtsIndex { .. }
928 | LogicalPlan::DropIndex { .. }
929 | LogicalPlan::ShowIndexes { .. } => true,
930
931 LogicalPlan::ShowDatabase
933 | LogicalPlan::ShowConfig
934 | LogicalPlan::ShowStatistics
935 | LogicalPlan::Vacuum
936 | LogicalPlan::Checkpoint
937 | LogicalPlan::Copy { .. }
938 | LogicalPlan::CopyTo { .. }
939 | LogicalPlan::CopyFrom { .. }
940 | LogicalPlan::Backup { .. }
941 | LogicalPlan::Explain { .. } => true,
942
943 LogicalPlan::ProcedureCall { procedure_name, .. } => {
946 !Self::is_df_eligible_procedure(procedure_name)
947 }
948
949 _ => Self::plan_children(plan)
951 .iter()
952 .any(|child| Self::is_ddl_or_admin(child)),
953 }
954 }
955
956 fn is_df_eligible_procedure(name: &str) -> bool {
962 matches!(
963 name,
964 "uni.schema.labels"
965 | "uni.schema.edgeTypes"
966 | "uni.schema.relationshipTypes"
967 | "uni.schema.indexes"
968 | "uni.schema.constraints"
969 | "uni.schema.labelInfo"
970 | "uni.vector.query"
971 | "uni.fts.query"
972 | "uni.search"
973 ) || name.starts_with("uni.algo.")
974 }
975
976 fn contains_write_operations(plan: &LogicalPlan) -> bool {
983 match plan {
984 LogicalPlan::Create { .. }
985 | LogicalPlan::CreateBatch { .. }
986 | LogicalPlan::Merge { .. }
987 | LogicalPlan::Delete { .. }
988 | LogicalPlan::Set { .. }
989 | LogicalPlan::Remove { .. }
990 | LogicalPlan::Foreach { .. } => true,
991 _ => Self::plan_children(plan)
992 .iter()
993 .any(|child| Self::contains_write_operations(child)),
994 }
995 }
996
997 pub fn execute_stream(
1002 self,
1003 plan: LogicalPlan,
1004 prop_manager: Arc<PropertyManager>,
1005 params: HashMap<String, Value>,
1006 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1007 let this = self;
1008 let this_for_ctx = this.clone();
1009
1010 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1011
1012 ctx_stream
1013 .flat_map(move |ctx| {
1014 let plan = plan.clone();
1015 let this = this.clone();
1016 let prop_manager = prop_manager.clone();
1017 let params = params.clone();
1018
1019 let fut = async move {
1020 if Self::is_ddl_or_admin(&plan) {
1021 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1022 .await
1023 } else {
1024 let batches = this
1025 .execute_datafusion(plan, &prop_manager, ¶ms)
1026 .await?;
1027 this.record_batches_to_rows(batches)
1028 }
1029 };
1030 stream::once(fut).boxed()
1031 })
1032 .boxed()
1033 }
1034
1035 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1038 arrow_convert::arrow_to_value(col, row, None)
1039 }
1040
1041 pub(crate) fn evaluate_expr<'a>(
1042 &'a self,
1043 expr: &'a Expr,
1044 row: &'a HashMap<String, Value>,
1045 prop_manager: &'a PropertyManager,
1046 params: &'a HashMap<String, Value>,
1047 ctx: Option<&'a QueryContext>,
1048 ) -> BoxFuture<'a, Result<Value>> {
1049 let this = self;
1050 Box::pin(async move {
1051 let repr = expr.to_string_repr();
1053 if let Some(val) = row.get(&repr) {
1054 return Ok(val.clone());
1055 }
1056
1057 match expr {
1058 Expr::PatternComprehension { .. } => {
1059 Err(anyhow::anyhow!(
1061 "Pattern comprehensions are handled by DataFusion executor"
1062 ))
1063 }
1064 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1065 "COLLECT subqueries not yet supported in executor"
1066 )),
1067 Expr::Variable(name) => {
1068 if let Some(val) = row.get(name) {
1069 Ok(val.clone())
1070 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1071 Ok(vid_val.clone())
1075 } else {
1076 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1077 }
1078 }
1079 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1080 Expr::Property(var_expr, prop_name) => {
1081 if let Expr::Variable(var_name) = var_expr.as_ref() {
1085 let flat_key = format!("{}.{}", var_name, prop_name);
1086 if let Some(val) = row.get(flat_key.as_str()) {
1087 return Ok(val.clone());
1088 }
1089 }
1090
1091 let base_val = this
1092 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1093 .await?;
1094
1095 if (prop_name == "_vid" || prop_name == "_id")
1097 && let Ok(vid) = Self::vid_from_value(&base_val)
1098 {
1099 return Ok(Value::Int(vid.as_u64() as i64));
1100 }
1101
1102 if let Value::Node(node) = &base_val {
1104 if prop_name == "_vid" || prop_name == "_id" {
1106 return Ok(Value::Int(node.vid.as_u64() as i64));
1107 }
1108 if prop_name == "_labels" {
1109 return Ok(Value::List(
1110 node.labels
1111 .iter()
1112 .map(|l| Value::String(l.clone()))
1113 .collect(),
1114 ));
1115 }
1116 if let Some(val) = node.properties.get(prop_name.as_str()) {
1118 return Ok(val.clone());
1119 }
1120 if let Ok(val) = prop_manager
1122 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1123 .await
1124 {
1125 return Ok(val);
1126 }
1127 return Ok(Value::Null);
1128 }
1129
1130 if let Value::Edge(edge) = &base_val {
1132 if prop_name == "_eid" || prop_name == "_id" {
1134 return Ok(Value::Int(edge.eid.as_u64() as i64));
1135 }
1136 if prop_name == "_type" {
1137 return Ok(Value::String(edge.edge_type.clone()));
1138 }
1139 if prop_name == "_src" {
1140 return Ok(Value::Int(edge.src.as_u64() as i64));
1141 }
1142 if prop_name == "_dst" {
1143 return Ok(Value::Int(edge.dst.as_u64() as i64));
1144 }
1145 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1147 return Ok(val.clone());
1148 }
1149 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1151 {
1152 return Ok(val);
1153 }
1154 return Ok(Value::Null);
1155 }
1156
1157 if let Value::Map(map) = &base_val {
1160 if let Some(val) = map.get(prop_name.as_str()) {
1162 return Ok(val.clone());
1163 }
1164 if let Some(Value::Map(props)) = map.get("properties")
1166 && let Some(val) = props.get(prop_name.as_str())
1167 {
1168 return Ok(val.clone());
1169 }
1170 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1172 map.get("_id")
1173 .and_then(|v| v.as_str())
1174 .and_then(|s| s.parse::<u64>().ok())
1175 });
1176 if let Some(id) = vid_opt {
1177 let vid = Vid::from(id);
1178 if let Ok(val) = prop_manager
1179 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1180 .await
1181 {
1182 return Ok(val);
1183 }
1184 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1185 let eid = uni_common::core::id::Eid::from(id);
1186 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1187 return Ok(val);
1188 }
1189 }
1190 return Ok(Value::Null);
1191 }
1192
1193 if let Ok(vid) = Self::vid_from_value(&base_val) {
1195 return prop_manager
1196 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1197 .await;
1198 }
1199
1200 if base_val.is_null() {
1201 return Ok(Value::Null);
1202 }
1203
1204 {
1206 use crate::query::datetime::{
1207 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1208 is_duration_string, is_temporal_accessor, is_temporal_string,
1209 };
1210
1211 if let Value::Temporal(tv) = &base_val {
1213 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1214 if is_duration_accessor(prop_name) {
1215 return eval_duration_accessor(
1217 &base_val.to_string(),
1218 prop_name,
1219 );
1220 }
1221 } else if is_temporal_accessor(prop_name) {
1222 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1223 }
1224 }
1225
1226 if let Value::String(s) = &base_val {
1228 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1229 return eval_temporal_accessor(s, prop_name);
1230 }
1231 if is_duration_string(s) && is_duration_accessor(prop_name) {
1232 return eval_duration_accessor(s, prop_name);
1233 }
1234 }
1235 }
1236
1237 Err(anyhow!(
1238 "Cannot access property '{}' on {:?}",
1239 prop_name,
1240 base_val
1241 ))
1242 }
1243 Expr::ArrayIndex {
1244 array: arr_expr,
1245 index: idx_expr,
1246 } => {
1247 let arr_val = this
1248 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1249 .await?;
1250 let idx_val = this
1251 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1252 .await?;
1253
1254 if let Value::List(arr) = &arr_val {
1255 if let Some(i) = idx_val.as_i64() {
1257 let idx = if i < 0 {
1258 let positive_idx = arr.len() as i64 + i;
1260 if positive_idx < 0 {
1261 return Ok(Value::Null); }
1263 positive_idx as usize
1264 } else {
1265 i as usize
1266 };
1267 if idx < arr.len() {
1268 return Ok(arr[idx].clone());
1269 }
1270 return Ok(Value::Null);
1271 } else if idx_val.is_null() {
1272 return Ok(Value::Null);
1273 } else {
1274 return Err(anyhow::anyhow!(
1275 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1276 idx_val
1277 ));
1278 }
1279 }
1280 if let Value::Map(map) = &arr_val {
1281 if let Some(key) = idx_val.as_str() {
1282 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1283 } else if !idx_val.is_null() {
1284 return Err(anyhow::anyhow!(
1285 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1286 idx_val
1287 ));
1288 }
1289 }
1290 if let Value::Node(node) = &arr_val {
1292 if let Some(key) = idx_val.as_str() {
1293 if let Some(val) = node.properties.get(key) {
1295 return Ok(val.clone());
1296 }
1297 if let Ok(val) = prop_manager
1299 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1300 .await
1301 {
1302 return Ok(val);
1303 }
1304 return Ok(Value::Null);
1305 } else if !idx_val.is_null() {
1306 return Err(anyhow::anyhow!(
1307 "TypeError: Node index must be a string, got: {:?}",
1308 idx_val
1309 ));
1310 }
1311 }
1312 if let Value::Edge(edge) = &arr_val {
1314 if let Some(key) = idx_val.as_str() {
1315 if let Some(val) = edge.properties.get(key) {
1317 return Ok(val.clone());
1318 }
1319 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1321 return Ok(val);
1322 }
1323 return Ok(Value::Null);
1324 } else if !idx_val.is_null() {
1325 return Err(anyhow::anyhow!(
1326 "TypeError: Edge index must be a string, got: {:?}",
1327 idx_val
1328 ));
1329 }
1330 }
1331 if let Ok(vid) = Self::vid_from_value(&arr_val)
1333 && let Some(key) = idx_val.as_str()
1334 {
1335 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1336 {
1337 return Ok(val);
1338 }
1339 return Ok(Value::Null);
1340 }
1341 if arr_val.is_null() {
1342 return Ok(Value::Null);
1343 }
1344 Err(anyhow!(
1345 "TypeError: InvalidArgumentType - cannot index into {:?}",
1346 arr_val
1347 ))
1348 }
1349 Expr::ArraySlice { array, start, end } => {
1350 let arr_val = this
1351 .evaluate_expr(array, row, prop_manager, params, ctx)
1352 .await?;
1353
1354 if let Value::List(arr) = &arr_val {
1355 let len = arr.len();
1356
1357 let start_idx = if let Some(s) = start {
1359 let v = this
1360 .evaluate_expr(s, row, prop_manager, params, ctx)
1361 .await?;
1362 if v.is_null() {
1363 return Ok(Value::Null);
1364 }
1365 let raw = v.as_i64().unwrap_or(0);
1366 if raw < 0 {
1367 (len as i64 + raw).max(0) as usize
1368 } else {
1369 (raw as usize).min(len)
1370 }
1371 } else {
1372 0
1373 };
1374
1375 let end_idx = if let Some(e) = end {
1377 let v = this
1378 .evaluate_expr(e, row, prop_manager, params, ctx)
1379 .await?;
1380 if v.is_null() {
1381 return Ok(Value::Null);
1382 }
1383 let raw = v.as_i64().unwrap_or(len as i64);
1384 if raw < 0 {
1385 (len as i64 + raw).max(0) as usize
1386 } else {
1387 (raw as usize).min(len)
1388 }
1389 } else {
1390 len
1391 };
1392
1393 if start_idx >= end_idx {
1395 return Ok(Value::List(vec![]));
1396 }
1397 let end_idx = end_idx.min(len);
1398 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1399 }
1400
1401 if arr_val.is_null() {
1402 return Ok(Value::Null);
1403 }
1404 Err(anyhow!("Cannot slice {:?}", arr_val))
1405 }
1406 Expr::Literal(lit) => Ok(lit.to_value()),
1407 Expr::List(items) => {
1408 let mut vals = Vec::new();
1409 for item in items {
1410 vals.push(
1411 this.evaluate_expr(item, row, prop_manager, params, ctx)
1412 .await?,
1413 );
1414 }
1415 Ok(Value::List(vals))
1416 }
1417 Expr::Map(items) => {
1418 let mut map = HashMap::new();
1419 for (key, value_expr) in items {
1420 let val = this
1421 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1422 .await?;
1423 map.insert(key.clone(), val);
1424 }
1425 Ok(Value::Map(map))
1426 }
1427 Expr::Exists { query, .. } => {
1428 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1430 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1431
1432 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1433 Ok(plan) => {
1434 let mut sub_params = params.clone();
1435 sub_params.extend(row.clone());
1436
1437 match this.execute(plan, prop_manager, &sub_params).await {
1438 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1439 Err(e) => {
1440 log::debug!("EXISTS subquery execution failed: {}", e);
1441 Ok(Value::Bool(false))
1442 }
1443 }
1444 }
1445 Err(e) => {
1446 log::debug!("EXISTS subquery planning failed: {}", e);
1447 Ok(Value::Bool(false))
1448 }
1449 }
1450 }
1451 Expr::CountSubquery(query) => {
1452 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1454
1455 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1456
1457 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1458 Ok(plan) => {
1459 let mut sub_params = params.clone();
1460 sub_params.extend(row.clone());
1461
1462 match this.execute(plan, prop_manager, &sub_params).await {
1463 Ok(results) => Ok(Value::from(results.len() as i64)),
1464 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1465 }
1466 }
1467 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1468 }
1469 }
1470 Expr::Quantifier {
1471 quantifier,
1472 variable,
1473 list,
1474 predicate,
1475 } => {
1476 let list_val = this
1490 .evaluate_expr(list, row, prop_manager, params, ctx)
1491 .await?;
1492
1493 if list_val.is_null() {
1495 return Ok(Value::Null);
1496 }
1497
1498 let items = match list_val {
1500 Value::List(arr) => arr,
1501 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1502 };
1503
1504 let mut satisfied_count = 0;
1506 for item in &items {
1507 let mut item_row = row.clone();
1509 item_row.insert(variable.clone(), item.clone());
1510
1511 let pred_result = this
1513 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1514 .await?;
1515
1516 if let Value::Bool(true) = pred_result {
1518 satisfied_count += 1;
1519 }
1520 }
1521
1522 let result = match quantifier {
1524 Quantifier::All => satisfied_count == items.len(),
1525 Quantifier::Any => satisfied_count > 0,
1526 Quantifier::Single => satisfied_count == 1,
1527 Quantifier::None => satisfied_count == 0,
1528 };
1529
1530 Ok(Value::Bool(result))
1531 }
1532 Expr::ListComprehension {
1533 variable,
1534 list,
1535 where_clause,
1536 map_expr,
1537 } => {
1538 let list_val = this
1545 .evaluate_expr(list, row, prop_manager, params, ctx)
1546 .await?;
1547
1548 if list_val.is_null() {
1550 return Ok(Value::Null);
1551 }
1552
1553 let items = match list_val {
1555 Value::List(arr) => arr,
1556 _ => {
1557 return Err(anyhow!(
1558 "List comprehension expects a list, got: {:?}",
1559 list_val
1560 ));
1561 }
1562 };
1563
1564 let mut results = Vec::new();
1566 for item in &items {
1567 let mut item_row = row.clone();
1569 item_row.insert(variable.clone(), item.clone());
1570
1571 if let Some(predicate) = where_clause {
1573 let pred_result = this
1574 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1575 .await?;
1576
1577 if !matches!(pred_result, Value::Bool(true)) {
1579 continue;
1580 }
1581 }
1582
1583 let mapped_val = this
1585 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1586 .await?;
1587 results.push(mapped_val);
1588 }
1589
1590 Ok(Value::List(results))
1591 }
1592 Expr::BinaryOp { left, op, right } => {
1593 match op {
1595 BinaryOp::And => {
1596 let l_val = this
1597 .evaluate_expr(left, row, prop_manager, params, ctx)
1598 .await?;
1599 if let Some(false) = l_val.as_bool() {
1601 return Ok(Value::Bool(false));
1602 }
1603 let r_val = this
1604 .evaluate_expr(right, row, prop_manager, params, ctx)
1605 .await?;
1606 eval_binary_op(&l_val, op, &r_val)
1607 }
1608 BinaryOp::Or => {
1609 let l_val = this
1610 .evaluate_expr(left, row, prop_manager, params, ctx)
1611 .await?;
1612 if let Some(true) = l_val.as_bool() {
1614 return Ok(Value::Bool(true));
1615 }
1616 let r_val = this
1617 .evaluate_expr(right, row, prop_manager, params, ctx)
1618 .await?;
1619 eval_binary_op(&l_val, op, &r_val)
1620 }
1621 _ => {
1622 let l_val = this
1624 .evaluate_expr(left, row, prop_manager, params, ctx)
1625 .await?;
1626 let r_val = this
1627 .evaluate_expr(right, row, prop_manager, params, ctx)
1628 .await?;
1629 eval_binary_op(&l_val, op, &r_val)
1630 }
1631 }
1632 }
1633 Expr::In { expr, list } => {
1634 let l_val = this
1635 .evaluate_expr(expr, row, prop_manager, params, ctx)
1636 .await?;
1637 let r_val = this
1638 .evaluate_expr(list, row, prop_manager, params, ctx)
1639 .await?;
1640 eval_in_op(&l_val, &r_val)
1641 }
1642 Expr::UnaryOp { op, expr } => {
1643 let val = this
1644 .evaluate_expr(expr, row, prop_manager, params, ctx)
1645 .await?;
1646 match op {
1647 UnaryOp::Not => {
1648 match val.as_bool() {
1650 Some(b) => Ok(Value::Bool(!b)),
1651 None if val.is_null() => Ok(Value::Null),
1652 None => Err(anyhow!(
1653 "InvalidArgumentType: NOT requires a boolean argument"
1654 )),
1655 }
1656 }
1657 UnaryOp::Neg => {
1658 if let Some(i) = val.as_i64() {
1659 Ok(Value::Int(-i))
1660 } else if let Some(f) = val.as_f64() {
1661 Ok(Value::Float(-f))
1662 } else {
1663 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1664 }
1665 }
1666 }
1667 }
1668 Expr::IsNull(expr) => {
1669 let val = this
1670 .evaluate_expr(expr, row, prop_manager, params, ctx)
1671 .await?;
1672 Ok(Value::Bool(val.is_null()))
1673 }
1674 Expr::IsNotNull(expr) => {
1675 let val = this
1676 .evaluate_expr(expr, row, prop_manager, params, ctx)
1677 .await?;
1678 Ok(Value::Bool(!val.is_null()))
1679 }
1680 Expr::IsUnique(_) => {
1681 Err(anyhow!(
1683 "IS UNIQUE can only be used in constraint definitions"
1684 ))
1685 }
1686 Expr::Case {
1687 expr,
1688 when_then,
1689 else_expr,
1690 } => {
1691 if let Some(base_expr) = expr {
1692 let base_val = this
1693 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1694 .await?;
1695 for (w, t) in when_then {
1696 let w_val = this
1697 .evaluate_expr(w, row, prop_manager, params, ctx)
1698 .await?;
1699 if base_val == w_val {
1700 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1701 }
1702 }
1703 } else {
1704 for (w, t) in when_then {
1705 let w_val = this
1706 .evaluate_expr(w, row, prop_manager, params, ctx)
1707 .await?;
1708 if w_val.as_bool() == Some(true) {
1709 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1710 }
1711 }
1712 }
1713 if let Some(e) = else_expr {
1714 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1715 }
1716 Ok(Value::Null)
1717 }
1718 Expr::Wildcard => Ok(Value::Null),
1719 Expr::FunctionCall { name, args, .. } => {
1720 if name.eq_ignore_ascii_case("ID") {
1722 if args.len() != 1 {
1723 return Err(anyhow!("id() requires exactly 1 argument"));
1724 }
1725 let val = this
1726 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1727 .await?;
1728 if let Value::Map(map) = &val {
1729 if let Some(vid_val) = map.get("_vid") {
1731 return Ok(vid_val.clone());
1732 }
1733 if let Some(eid_val) = map.get("_eid") {
1735 return Ok(eid_val.clone());
1736 }
1737 if let Some(id_val) = map.get("_id") {
1739 return Ok(id_val.clone());
1740 }
1741 }
1742 return Ok(Value::Null);
1743 }
1744
1745 if name.eq_ignore_ascii_case("ELEMENTID") {
1747 if args.len() != 1 {
1748 return Err(anyhow!("elementId() requires exactly 1 argument"));
1749 }
1750 let val = this
1751 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1752 .await?;
1753 if let Value::Map(map) = &val {
1754 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1757 return Ok(Value::String(vid_val.to_string()));
1758 }
1759 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1762 return Ok(Value::String(eid_val.to_string()));
1763 }
1764 }
1765 return Ok(Value::Null);
1766 }
1767
1768 if name.eq_ignore_ascii_case("TYPE") {
1770 if args.len() != 1 {
1771 return Err(anyhow!("type() requires exactly 1 argument"));
1772 }
1773 let val = this
1774 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1775 .await?;
1776 if let Value::Map(map) = &val
1777 && let Some(type_val) = map.get("_type")
1778 {
1779 if let Some(type_id) =
1781 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1782 {
1783 if let Some(name) = this
1784 .storage
1785 .schema_manager()
1786 .edge_type_name_by_id_unified(type_id)
1787 {
1788 return Ok(Value::String(name));
1789 }
1790 } else if let Some(name) = type_val.as_str() {
1791 return Ok(Value::String(name.to_string()));
1792 }
1793 }
1794 return Ok(Value::Null);
1795 }
1796
1797 if name.eq_ignore_ascii_case("LABELS") {
1799 if args.len() != 1 {
1800 return Err(anyhow!("labels() requires exactly 1 argument"));
1801 }
1802 let val = this
1803 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1804 .await?;
1805 if let Value::Map(map) = &val
1806 && let Some(labels_val) = map.get("_labels")
1807 {
1808 return Ok(labels_val.clone());
1809 }
1810 return Ok(Value::Null);
1811 }
1812
1813 if name.eq_ignore_ascii_case("PROPERTIES") {
1815 if args.len() != 1 {
1816 return Err(anyhow!("properties() requires exactly 1 argument"));
1817 }
1818 let val = this
1819 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1820 .await?;
1821 if let Value::Map(map) = &val {
1822 let mut props = HashMap::new();
1824 for (k, v) in map.iter() {
1825 if !k.starts_with('_') {
1826 props.insert(k.clone(), v.clone());
1827 }
1828 }
1829 return Ok(Value::Map(props));
1830 }
1831 return Ok(Value::Null);
1832 }
1833
1834 if name.eq_ignore_ascii_case("STARTNODE") {
1836 if args.len() != 1 {
1837 return Err(anyhow!("startNode() requires exactly 1 argument"));
1838 }
1839 let val = this
1840 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1841 .await?;
1842 if let Value::Edge(edge) = &val {
1843 return Ok(Self::find_node_by_vid(row, edge.src));
1844 }
1845 if let Value::Map(map) = &val {
1846 if let Some(start_node) = map.get("_startNode") {
1847 return Ok(start_node.clone());
1848 }
1849 if let Some(src_vid) = map.get("_src_vid") {
1850 return Ok(Value::Map(HashMap::from([(
1851 "_vid".to_string(),
1852 src_vid.clone(),
1853 )])));
1854 }
1855 if let Some(src_id) = map.get("_src")
1857 && let Some(u) = src_id.as_u64()
1858 {
1859 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1860 }
1861 }
1862 return Ok(Value::Null);
1863 }
1864
1865 if name.eq_ignore_ascii_case("ENDNODE") {
1867 if args.len() != 1 {
1868 return Err(anyhow!("endNode() requires exactly 1 argument"));
1869 }
1870 let val = this
1871 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1872 .await?;
1873 if let Value::Edge(edge) = &val {
1874 return Ok(Self::find_node_by_vid(row, edge.dst));
1875 }
1876 if let Value::Map(map) = &val {
1877 if let Some(end_node) = map.get("_endNode") {
1878 return Ok(end_node.clone());
1879 }
1880 if let Some(dst_vid) = map.get("_dst_vid") {
1881 return Ok(Value::Map(HashMap::from([(
1882 "_vid".to_string(),
1883 dst_vid.clone(),
1884 )])));
1885 }
1886 if let Some(dst_id) = map.get("_dst")
1888 && let Some(u) = dst_id.as_u64()
1889 {
1890 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1891 }
1892 }
1893 return Ok(Value::Null);
1894 }
1895
1896 if name.eq_ignore_ascii_case("HASLABEL") {
1899 if args.len() != 2 {
1900 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1901 }
1902 let node_val = this
1903 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1904 .await?;
1905 let label_val = this
1906 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1907 .await?;
1908
1909 let label_to_check = label_val.as_str().ok_or_else(|| {
1910 anyhow!("Second argument to hasLabel must be a string")
1911 })?;
1912
1913 let has_label = match &node_val {
1914 Value::Map(map) if map.contains_key("_vid") => {
1916 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1917 labels_arr
1918 .iter()
1919 .any(|l| l.as_str() == Some(label_to_check))
1920 } else {
1921 false
1922 }
1923 }
1924 Value::Map(map) => {
1926 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1927 labels_arr
1928 .iter()
1929 .any(|l| l.as_str() == Some(label_to_check))
1930 } else {
1931 false
1932 }
1933 }
1934 _ => false,
1935 };
1936 return Ok(Value::Bool(has_label));
1937 }
1938
1939 if matches!(
1942 name.to_uppercase().as_str(),
1943 "ANY" | "ALL" | "NONE" | "SINGLE"
1944 ) {
1945 return Err(anyhow!(
1946 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1947 name.to_lowercase()
1948 ));
1949 }
1950
1951 if name.eq_ignore_ascii_case("COALESCE") {
1953 for arg in args {
1954 let val = this
1955 .evaluate_expr(arg, row, prop_manager, params, ctx)
1956 .await?;
1957 if !val.is_null() {
1958 return Ok(val);
1959 }
1960 }
1961 return Ok(Value::Null);
1962 }
1963
1964 if name.eq_ignore_ascii_case("vector_similarity") {
1966 if args.len() != 2 {
1967 return Err(anyhow!("vector_similarity takes 2 arguments"));
1968 }
1969 let v1 = this
1970 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1971 .await?;
1972 let v2 = this
1973 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1974 .await?;
1975 return eval_vector_similarity(&v1, &v2);
1976 }
1977
1978 if name.eq_ignore_ascii_case("uni.temporal.validAt")
1980 || name.eq_ignore_ascii_case("uni.validAt")
1981 || name.eq_ignore_ascii_case("validAt")
1982 {
1983 if args.len() != 4 {
1984 return Err(anyhow!("validAt requires 4 arguments"));
1985 }
1986 let node_val = this
1987 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1988 .await?;
1989 let start_prop = this
1990 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1991 .await?
1992 .as_str()
1993 .ok_or(anyhow!("start_prop must be string"))?
1994 .to_string();
1995 let end_prop = this
1996 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
1997 .await?
1998 .as_str()
1999 .ok_or(anyhow!("end_prop must be string"))?
2000 .to_string();
2001 let time_val = this
2002 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2003 .await?;
2004
2005 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2006 anyhow!("time argument must be a datetime value or string")
2007 })?;
2008
2009 let valid_from_val: Option<Value> = if let Ok(vid) =
2011 Self::vid_from_value(&node_val)
2012 {
2013 prop_manager
2015 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2016 .await
2017 .ok()
2018 } else if let Value::Map(map) = &node_val {
2019 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2021 let vid = Vid::from(vid_val);
2022 prop_manager
2023 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2024 .await
2025 .ok()
2026 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2027 let eid = uni_common::core::id::Eid::from(eid_val);
2029 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2030 } else {
2031 map.get(&start_prop).cloned()
2033 }
2034 } else {
2035 return Ok(Value::Bool(false));
2036 };
2037
2038 let valid_from = match valid_from_val {
2039 Some(ref v) => match value_to_datetime_utc(v) {
2040 Some(dt) => dt,
2041 None if v.is_null() => return Ok(Value::Bool(false)),
2042 None => {
2043 return Err(anyhow!(
2044 "Property {} must be a datetime value or string",
2045 start_prop
2046 ));
2047 }
2048 },
2049 None => return Ok(Value::Bool(false)),
2050 };
2051
2052 let valid_to_val: Option<Value> = if let Ok(vid) =
2053 Self::vid_from_value(&node_val)
2054 {
2055 prop_manager
2057 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2058 .await
2059 .ok()
2060 } else if let Value::Map(map) = &node_val {
2061 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2063 let vid = Vid::from(vid_val);
2064 prop_manager
2065 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2066 .await
2067 .ok()
2068 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2069 let eid = uni_common::core::id::Eid::from(eid_val);
2071 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2072 } else {
2073 map.get(&end_prop).cloned()
2075 }
2076 } else {
2077 return Ok(Value::Bool(false));
2078 };
2079
2080 let valid_to = match valid_to_val {
2081 Some(ref v) => match value_to_datetime_utc(v) {
2082 Some(dt) => Some(dt),
2083 None if v.is_null() => None,
2084 None => {
2085 return Err(anyhow!(
2086 "Property {} must be a datetime value or null",
2087 end_prop
2088 ));
2089 }
2090 },
2091 None => None,
2092 };
2093
2094 let is_valid = valid_from <= query_time
2095 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2096 return Ok(Value::Bool(is_valid));
2097 }
2098
2099 let mut evaluated_args = Vec::with_capacity(args.len());
2101 for arg in args {
2102 let mut val = this
2103 .evaluate_expr(arg, row, prop_manager, params, ctx)
2104 .await?;
2105
2106 if let Value::Map(ref mut map) = val {
2109 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2110 }
2111
2112 evaluated_args.push(val);
2113 }
2114 eval_scalar_function(
2115 name,
2116 &evaluated_args,
2117 self.custom_function_registry.as_deref(),
2118 )
2119 }
2120 Expr::Reduce {
2121 accumulator,
2122 init,
2123 variable,
2124 list,
2125 expr,
2126 } => {
2127 let mut acc = self
2128 .evaluate_expr(init, row, prop_manager, params, ctx)
2129 .await?;
2130 let list_val = self
2131 .evaluate_expr(list, row, prop_manager, params, ctx)
2132 .await?;
2133
2134 if let Value::List(items) = list_val {
2135 for item in items {
2136 let mut scope = row.clone();
2140 scope.insert(accumulator.clone(), acc.clone());
2141 scope.insert(variable.clone(), item);
2142
2143 acc = self
2144 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2145 .await?;
2146 }
2147 } else {
2148 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2149 }
2150 Ok(acc)
2151 }
2152 Expr::ValidAt { .. } => {
2153 Err(anyhow!(
2155 "VALID_AT expression should have been transformed to function call in planner"
2156 ))
2157 }
2158
2159 Expr::LabelCheck { expr, labels } => {
2160 let val = this
2161 .evaluate_expr(expr, row, prop_manager, params, ctx)
2162 .await?;
2163 match &val {
2164 Value::Null => Ok(Value::Null),
2165 Value::Map(map) => {
2166 let is_edge = map.contains_key("_eid")
2168 || map.contains_key("_type_name")
2169 || (map.contains_key("_type") && !map.contains_key("_vid"));
2170
2171 if is_edge {
2172 if labels.len() > 1 {
2174 return Ok(Value::Bool(false));
2175 }
2176 let label_to_check = &labels[0];
2177 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2178 {
2179 t == label_to_check
2180 } else if let Some(Value::String(t)) = map.get("_type") {
2181 t == label_to_check
2182 } else {
2183 false
2184 };
2185 Ok(Value::Bool(has_type))
2186 } else {
2187 let has_all = labels.iter().all(|label_to_check| {
2189 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2190 labels_arr
2191 .iter()
2192 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2193 } else {
2194 false
2195 }
2196 });
2197 Ok(Value::Bool(has_all))
2198 }
2199 }
2200 _ => Ok(Value::Bool(false)),
2201 }
2202 }
2203
2204 Expr::MapProjection { base, items } => {
2205 let base_value = this
2206 .evaluate_expr(base, row, prop_manager, params, ctx)
2207 .await?;
2208
2209 let properties = match &base_value {
2211 Value::Map(map) => map,
2212 _ => {
2213 return Err(anyhow!(
2214 "Map projection requires object, got {:?}",
2215 base_value
2216 ));
2217 }
2218 };
2219
2220 let mut result_map = HashMap::new();
2221
2222 for item in items {
2223 match item {
2224 MapProjectionItem::Property(prop) => {
2225 if let Some(value) = properties.get(prop.as_str()) {
2226 result_map.insert(prop.clone(), value.clone());
2227 }
2228 }
2229 MapProjectionItem::AllProperties => {
2230 for (key, value) in properties.iter() {
2232 if !key.starts_with('_') {
2233 result_map.insert(key.clone(), value.clone());
2234 }
2235 }
2236 }
2237 MapProjectionItem::LiteralEntry(key, expr) => {
2238 let value = this
2239 .evaluate_expr(expr, row, prop_manager, params, ctx)
2240 .await?;
2241 result_map.insert(key.clone(), value);
2242 }
2243 MapProjectionItem::Variable(var_name) => {
2244 if let Some(value) = row.get(var_name.as_str()) {
2247 result_map.insert(var_name.clone(), value.clone());
2248 }
2249 }
2250 }
2251 }
2252
2253 Ok(Value::Map(result_map))
2254 }
2255 }
2256 })
2257 }
2258
2259 pub(crate) fn execute_subplan<'a>(
2260 &'a self,
2261 plan: LogicalPlan,
2262 prop_manager: &'a PropertyManager,
2263 params: &'a HashMap<String, Value>,
2264 ctx: Option<&'a QueryContext>,
2265 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2266 Box::pin(async move {
2267 if let Some(ctx) = ctx {
2268 ctx.check_timeout()?;
2269 }
2270 match plan {
2271 LogicalPlan::Union { left, right, all } => {
2272 self.execute_union(left, right, all, prop_manager, params, ctx)
2273 .await
2274 }
2275 LogicalPlan::CreateVectorIndex {
2276 config,
2277 if_not_exists,
2278 } => {
2279 if if_not_exists && self.index_exists_by_name(&config.name) {
2280 return Ok(vec![]);
2281 }
2282 let idx_mgr = self.storage.index_manager();
2283 idx_mgr.create_vector_index(config).await?;
2284 Ok(vec![])
2285 }
2286 LogicalPlan::CreateFullTextIndex {
2287 config,
2288 if_not_exists,
2289 } => {
2290 if if_not_exists && self.index_exists_by_name(&config.name) {
2291 return Ok(vec![]);
2292 }
2293 let idx_mgr = self.storage.index_manager();
2294 idx_mgr.create_fts_index(config).await?;
2295 Ok(vec![])
2296 }
2297 LogicalPlan::CreateScalarIndex {
2298 mut config,
2299 if_not_exists,
2300 } => {
2301 if if_not_exists && self.index_exists_by_name(&config.name) {
2302 return Ok(vec![]);
2303 }
2304
2305 let mut modified_properties = Vec::new();
2307
2308 for prop in &config.properties {
2309 if prop.contains('(') && prop.contains(')') {
2311 let gen_col = SchemaManager::generated_column_name(prop);
2312
2313 let sm = self.storage.schema_manager_arc();
2315 if let Err(e) = sm.add_generated_property(
2316 &config.label,
2317 &gen_col,
2318 DataType::String, prop.clone(),
2320 ) {
2321 log::warn!("Failed to add generated property (might exist): {}", e);
2322 }
2323
2324 modified_properties.push(gen_col);
2325 } else {
2326 modified_properties.push(prop.clone());
2328 }
2329 }
2330
2331 config.properties = modified_properties;
2332
2333 let idx_mgr = self.storage.index_manager();
2334 idx_mgr.create_scalar_index(config).await?;
2335 Ok(vec![])
2336 }
2337 LogicalPlan::CreateJsonFtsIndex {
2338 config,
2339 if_not_exists,
2340 } => {
2341 if if_not_exists && self.index_exists_by_name(&config.name) {
2342 return Ok(vec![]);
2343 }
2344 let idx_mgr = self.storage.index_manager();
2345 idx_mgr.create_json_fts_index(config).await?;
2346 Ok(vec![])
2347 }
2348 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2349 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2350 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2351 LogicalPlan::Vacuum => {
2352 self.execute_vacuum().await?;
2353 Ok(vec![])
2354 }
2355 LogicalPlan::Checkpoint => {
2356 self.execute_checkpoint().await?;
2357 Ok(vec![])
2358 }
2359 LogicalPlan::CopyTo {
2360 label,
2361 path,
2362 format,
2363 options,
2364 } => {
2365 let count = self
2366 .execute_copy_to(&label, &path, &format, &options)
2367 .await?;
2368 let mut result = HashMap::new();
2369 result.insert("count".to_string(), Value::Int(count as i64));
2370 Ok(vec![result])
2371 }
2372 LogicalPlan::CopyFrom {
2373 label,
2374 path,
2375 format,
2376 options,
2377 } => {
2378 let count = self
2379 .execute_copy_from(&label, &path, &format, &options)
2380 .await?;
2381 let mut result = HashMap::new();
2382 result.insert("count".to_string(), Value::Int(count as i64));
2383 Ok(vec![result])
2384 }
2385 LogicalPlan::CreateLabel(clause) => {
2386 self.execute_create_label(clause).await?;
2387 Ok(vec![])
2388 }
2389 LogicalPlan::CreateEdgeType(clause) => {
2390 self.execute_create_edge_type(clause).await?;
2391 Ok(vec![])
2392 }
2393 LogicalPlan::AlterLabel(clause) => {
2394 self.execute_alter_label(clause).await?;
2395 Ok(vec![])
2396 }
2397 LogicalPlan::AlterEdgeType(clause) => {
2398 self.execute_alter_edge_type(clause).await?;
2399 Ok(vec![])
2400 }
2401 LogicalPlan::DropLabel(clause) => {
2402 self.execute_drop_label(clause).await?;
2403 Ok(vec![])
2404 }
2405 LogicalPlan::DropEdgeType(clause) => {
2406 self.execute_drop_edge_type(clause).await?;
2407 Ok(vec![])
2408 }
2409 LogicalPlan::CreateConstraint(clause) => {
2410 self.execute_create_constraint(clause).await?;
2411 Ok(vec![])
2412 }
2413 LogicalPlan::DropConstraint(clause) => {
2414 self.execute_drop_constraint(clause).await?;
2415 Ok(vec![])
2416 }
2417 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2418 LogicalPlan::DropIndex { name, if_exists } => {
2419 let idx_mgr = self.storage.index_manager();
2420 match idx_mgr.drop_index(&name).await {
2421 Ok(_) => Ok(vec![]),
2422 Err(e) => {
2423 if if_exists && e.to_string().contains("not found") {
2424 Ok(vec![])
2425 } else {
2426 Err(e)
2427 }
2428 }
2429 }
2430 }
2431 LogicalPlan::ShowIndexes { filter } => {
2432 Ok(self.execute_show_indexes(filter.as_deref()))
2433 }
2434 LogicalPlan::Scan { .. }
2437 | LogicalPlan::ExtIdLookup { .. }
2438 | LogicalPlan::ScanAll { .. }
2439 | LogicalPlan::ScanMainByLabels { .. }
2440 | LogicalPlan::Traverse { .. }
2441 | LogicalPlan::TraverseMainByType { .. } => {
2442 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2443 self.record_batches_to_rows(batches)
2444 }
2445 LogicalPlan::Filter {
2446 input,
2447 predicate,
2448 optional_variables,
2449 } => {
2450 let input_matches = self
2451 .execute_subplan(*input, prop_manager, params, ctx)
2452 .await?;
2453
2454 tracing::debug!(
2455 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2456 predicate,
2457 input_matches.len(),
2458 optional_variables
2459 );
2460
2461 if !optional_variables.is_empty() {
2465 let is_optional_key = |k: &str| -> bool {
2468 optional_variables.contains(k)
2469 || optional_variables
2470 .iter()
2471 .any(|var| k.starts_with(&format!("{}.", var)))
2472 };
2473
2474 let is_internal_key =
2476 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2477
2478 let non_optional_vars: Vec<String> = input_matches
2480 .first()
2481 .map(|row| {
2482 row.keys()
2483 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2484 .cloned()
2485 .collect()
2486 })
2487 .unwrap_or_default();
2488
2489 let mut groups: std::collections::HashMap<
2491 Vec<u8>,
2492 Vec<HashMap<String, Value>>,
2493 > = std::collections::HashMap::new();
2494
2495 for row in &input_matches {
2496 let key: Vec<u8> = non_optional_vars
2498 .iter()
2499 .map(|var| {
2500 row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2501 })
2502 .collect::<Vec<_>>()
2503 .join("|")
2504 .into_bytes();
2505
2506 groups.entry(key).or_default().push(row.clone());
2507 }
2508
2509 let mut filtered = Vec::new();
2510 for (_key, group_rows) in groups {
2511 let mut group_passed = Vec::new();
2512
2513 for row in &group_rows {
2514 let has_null_optional = optional_variables.iter().any(|var| {
2516 let direct_null =
2518 matches!(row.get(var), Some(Value::Null) | None);
2519 let prefixed_null = row
2520 .keys()
2521 .filter(|k| k.starts_with(&format!("{}.", var)))
2522 .any(|k| matches!(row.get(k), Some(Value::Null)));
2523 direct_null || prefixed_null
2524 });
2525
2526 if has_null_optional {
2527 group_passed.push(row.clone());
2528 continue;
2529 }
2530
2531 let res = self
2532 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2533 .await?;
2534
2535 if res.as_bool().unwrap_or(false) {
2536 group_passed.push(row.clone());
2537 }
2538 }
2539
2540 if group_passed.is_empty() {
2541 if let Some(template) = group_rows.first() {
2544 let mut null_row = HashMap::new();
2545 for (k, v) in template {
2546 if is_optional_key(k) {
2547 null_row.insert(k.clone(), Value::Null);
2548 } else {
2549 null_row.insert(k.clone(), v.clone());
2550 }
2551 }
2552 filtered.push(null_row);
2553 }
2554 } else {
2555 filtered.extend(group_passed);
2556 }
2557 }
2558
2559 tracing::debug!(
2560 "Filter (OPTIONAL): {} input rows -> {} output rows",
2561 input_matches.len(),
2562 filtered.len()
2563 );
2564
2565 return Ok(filtered);
2566 }
2567
2568 let mut filtered = Vec::new();
2570 for row in input_matches.iter() {
2571 let res = self
2572 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2573 .await?;
2574
2575 let passes = res.as_bool().unwrap_or(false);
2576
2577 if passes {
2578 filtered.push(row.clone());
2579 }
2580 }
2581
2582 tracing::debug!(
2583 "Filter: {} input rows -> {} output rows",
2584 input_matches.len(),
2585 filtered.len()
2586 );
2587
2588 Ok(filtered)
2589 }
2590 LogicalPlan::ProcedureCall {
2591 procedure_name,
2592 arguments,
2593 yield_items,
2594 } => {
2595 let yield_names: Vec<String> =
2596 yield_items.iter().map(|(n, _)| n.clone()).collect();
2597 let results = self
2598 .execute_procedure(
2599 &procedure_name,
2600 &arguments,
2601 &yield_names,
2602 prop_manager,
2603 params,
2604 ctx,
2605 )
2606 .await?;
2607
2608 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2614 if !has_aliases {
2615 Ok(results)
2618 } else {
2619 let mut aliased_results = Vec::with_capacity(results.len());
2620 for row in results {
2621 let mut new_row = HashMap::new();
2622 for (name, alias) in &yield_items {
2623 let col_name = alias.as_ref().unwrap_or(name);
2624 let val = row.get(name).cloned().unwrap_or(Value::Null);
2625 new_row.insert(col_name.clone(), val);
2626 }
2627 aliased_results.push(new_row);
2628 }
2629 Ok(aliased_results)
2630 }
2631 }
2632 LogicalPlan::VectorKnn { .. } => {
2633 unreachable!("VectorKnn is handled by DataFusion engine")
2634 }
2635 LogicalPlan::InvertedIndexLookup { .. } => {
2636 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2637 }
2638 LogicalPlan::Sort { input, order_by } => {
2639 let rows = self
2640 .execute_subplan(*input, prop_manager, params, ctx)
2641 .await?;
2642 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2643 .await
2644 }
2645 LogicalPlan::Limit { input, skip, fetch } => {
2646 let rows = self
2647 .execute_subplan(*input, prop_manager, params, ctx)
2648 .await?;
2649 let skip = skip.unwrap_or(0);
2650 let take = fetch.unwrap_or(usize::MAX);
2651 Ok(rows.into_iter().skip(skip).take(take).collect())
2652 }
2653 LogicalPlan::Aggregate {
2654 input,
2655 group_by,
2656 aggregates,
2657 } => {
2658 let rows = self
2659 .execute_subplan(*input, prop_manager, params, ctx)
2660 .await?;
2661 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2662 .await
2663 }
2664 LogicalPlan::Window {
2665 input,
2666 window_exprs,
2667 } => {
2668 let rows = self
2669 .execute_subplan(*input, prop_manager, params, ctx)
2670 .await?;
2671 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2672 .await
2673 }
2674 LogicalPlan::Project { input, projections } => {
2675 let matches = self
2676 .execute_subplan(*input, prop_manager, params, ctx)
2677 .await?;
2678 self.execute_project(matches, &projections, prop_manager, params, ctx)
2679 .await
2680 }
2681 LogicalPlan::Distinct { input } => {
2682 let rows = self
2683 .execute_subplan(*input, prop_manager, params, ctx)
2684 .await?;
2685 let mut seen = std::collections::HashSet::new();
2686 let mut result = Vec::new();
2687 for row in rows {
2688 let key = Self::canonical_row_key(&row);
2689 if seen.insert(key) {
2690 result.push(row);
2691 }
2692 }
2693 Ok(result)
2694 }
2695 LogicalPlan::Unwind {
2696 input,
2697 expr,
2698 variable,
2699 } => {
2700 let input_rows = self
2701 .execute_subplan(*input, prop_manager, params, ctx)
2702 .await?;
2703 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2704 .await
2705 }
2706 LogicalPlan::Apply {
2707 input,
2708 subquery,
2709 input_filter,
2710 } => {
2711 let input_rows = self
2712 .execute_subplan(*input, prop_manager, params, ctx)
2713 .await?;
2714 self.execute_apply(
2715 input_rows,
2716 &subquery,
2717 input_filter.as_ref(),
2718 prop_manager,
2719 params,
2720 ctx,
2721 )
2722 .await
2723 }
2724 LogicalPlan::SubqueryCall { input, subquery } => {
2725 let input_rows = self
2726 .execute_subplan(*input, prop_manager, params, ctx)
2727 .await?;
2728 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2731 .await
2732 }
2733 LogicalPlan::RecursiveCTE {
2734 cte_name,
2735 initial,
2736 recursive,
2737 } => {
2738 self.execute_recursive_cte(
2739 &cte_name,
2740 *initial,
2741 *recursive,
2742 prop_manager,
2743 params,
2744 ctx,
2745 )
2746 .await
2747 }
2748 LogicalPlan::CrossJoin { left, right } => {
2749 self.execute_cross_join(left, right, prop_manager, params, ctx)
2750 .await
2751 }
2752 LogicalPlan::Set { .. }
2753 | LogicalPlan::Remove { .. }
2754 | LogicalPlan::Merge { .. }
2755 | LogicalPlan::Create { .. }
2756 | LogicalPlan::CreateBatch { .. } => {
2757 unreachable!("mutations are handled by DataFusion engine")
2758 }
2759 LogicalPlan::Delete { .. } => {
2760 unreachable!("mutations are handled by DataFusion engine")
2761 }
2762 LogicalPlan::Copy {
2763 target,
2764 source,
2765 is_export,
2766 options,
2767 } => {
2768 if is_export {
2769 self.execute_export(&target, &source, &options, prop_manager, ctx)
2770 .await
2771 } else {
2772 self.execute_copy(&target, &source, &options, prop_manager)
2773 .await
2774 }
2775 }
2776 LogicalPlan::Backup {
2777 destination,
2778 options,
2779 } => self.execute_backup(&destination, &options).await,
2780 LogicalPlan::Explain { plan } => {
2781 let plan_str = format!("{:#?}", plan);
2782 let mut row = HashMap::new();
2783 row.insert("plan".to_string(), Value::String(plan_str));
2784 Ok(vec![row])
2785 }
2786 LogicalPlan::ShortestPath { .. } => {
2787 unreachable!("ShortestPath is handled by DataFusion engine")
2788 }
2789 LogicalPlan::AllShortestPaths { .. } => {
2790 unreachable!("AllShortestPaths is handled by DataFusion engine")
2791 }
2792 LogicalPlan::Foreach { .. } => {
2793 unreachable!("mutations are handled by DataFusion engine")
2794 }
2795 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2796 LogicalPlan::BindZeroLengthPath { .. } => {
2797 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2798 }
2799 LogicalPlan::BindPath { .. } => {
2800 unreachable!("BindPath is handled by DataFusion engine")
2801 }
2802 LogicalPlan::QuantifiedPattern { .. } => {
2803 unreachable!("QuantifiedPattern is handled by DataFusion engine")
2804 }
2805 LogicalPlan::LocyProgram { .. }
2806 | LogicalPlan::LocyFold { .. }
2807 | LogicalPlan::LocyBestBy { .. }
2808 | LogicalPlan::LocyPriority { .. }
2809 | LogicalPlan::LocyDerivedScan { .. }
2810 | LogicalPlan::LocyProject { .. } => {
2811 unreachable!("Locy operators are handled by DataFusion engine")
2812 }
2813 }
2814 })
2815 }
2816
2817 #[expect(clippy::too_many_arguments)]
2822 pub(crate) async fn execute_foreach_body_plan(
2823 &self,
2824 plan: LogicalPlan,
2825 scope: &mut HashMap<String, Value>,
2826 writer: &mut uni_store::runtime::writer::Writer,
2827 prop_manager: &PropertyManager,
2828 params: &HashMap<String, Value>,
2829 ctx: Option<&QueryContext>,
2830 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
2831 ) -> Result<()> {
2832 match plan {
2833 LogicalPlan::Set { items, .. } => {
2834 self.execute_set_items_locked(
2835 &items,
2836 scope,
2837 writer,
2838 prop_manager,
2839 params,
2840 ctx,
2841 tx_l0,
2842 )
2843 .await?;
2844 }
2845 LogicalPlan::Remove { items, .. } => {
2846 self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx, tx_l0)
2847 .await?;
2848 }
2849 LogicalPlan::Delete { items, detach, .. } => {
2850 for expr in &items {
2851 let val = self
2852 .evaluate_expr(expr, scope, prop_manager, params, ctx)
2853 .await?;
2854 self.execute_delete_item_locked(&val, detach, writer, tx_l0)
2855 .await?;
2856 }
2857 }
2858 LogicalPlan::Create { pattern, .. } => {
2859 self.execute_create_pattern(
2860 &pattern,
2861 scope,
2862 writer,
2863 prop_manager,
2864 params,
2865 ctx,
2866 tx_l0,
2867 )
2868 .await?;
2869 }
2870 LogicalPlan::CreateBatch { patterns, .. } => {
2871 for pattern in &patterns {
2872 self.execute_create_pattern(
2873 pattern,
2874 scope,
2875 writer,
2876 prop_manager,
2877 params,
2878 ctx,
2879 tx_l0,
2880 )
2881 .await?;
2882 }
2883 }
2884 LogicalPlan::Merge {
2885 pattern,
2886 on_match: _,
2887 on_create,
2888 ..
2889 } => {
2890 self.execute_create_pattern(
2891 &pattern,
2892 scope,
2893 writer,
2894 prop_manager,
2895 params,
2896 ctx,
2897 tx_l0,
2898 )
2899 .await?;
2900 if let Some(on_create_clause) = on_create {
2901 self.execute_set_items_locked(
2902 &on_create_clause.items,
2903 scope,
2904 writer,
2905 prop_manager,
2906 params,
2907 ctx,
2908 tx_l0,
2909 )
2910 .await?;
2911 }
2912 }
2913 LogicalPlan::Foreach {
2914 variable,
2915 list,
2916 body,
2917 ..
2918 } => {
2919 let list_val = self
2920 .evaluate_expr(&list, scope, prop_manager, params, ctx)
2921 .await?;
2922 let items = match list_val {
2923 Value::List(arr) => arr,
2924 Value::Null => return Ok(()),
2925 _ => return Err(anyhow!("FOREACH requires a list")),
2926 };
2927 for item in items {
2928 let mut nested_scope = scope.clone();
2929 nested_scope.insert(variable.clone(), item);
2930 for nested_plan in &body {
2931 Box::pin(self.execute_foreach_body_plan(
2932 nested_plan.clone(),
2933 &mut nested_scope,
2934 writer,
2935 prop_manager,
2936 params,
2937 ctx,
2938 tx_l0,
2939 ))
2940 .await?;
2941 }
2942 }
2943 }
2944 _ => {
2945 return Err(anyhow!(
2946 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2947 ));
2948 }
2949 }
2950 Ok(())
2951 }
2952
2953 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2954 let mut pairs: Vec<_> = row.iter().collect();
2955 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2956
2957 pairs
2958 .into_iter()
2959 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2960 .collect::<Vec<_>>()
2961 .join("|")
2962 }
2963
2964 fn canonical_value_key(v: &Value) -> String {
2965 match v {
2966 Value::Null => "null".to_string(),
2967 Value::Bool(b) => format!("b:{b}"),
2968 Value::Int(i) => format!("n:{i}"),
2969 Value::Float(f) => {
2970 if f.is_nan() {
2971 "nan".to_string()
2972 } else if f.is_infinite() {
2973 if f.is_sign_positive() {
2974 "inf:+".to_string()
2975 } else {
2976 "inf:-".to_string()
2977 }
2978 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
2979 format!("n:{}", *f as i64)
2980 } else {
2981 format!("f:{f}")
2982 }
2983 }
2984 Value::String(s) => {
2985 if let Some(k) = Self::temporal_string_key(s) {
2986 format!("temporal:{k}")
2987 } else {
2988 format!("s:{s}")
2989 }
2990 }
2991 Value::Bytes(b) => format!("bytes:{:?}", b),
2992 Value::List(items) => format!(
2993 "list:[{}]",
2994 items
2995 .iter()
2996 .map(Self::canonical_value_key)
2997 .collect::<Vec<_>>()
2998 .join(",")
2999 ),
3000 Value::Map(map) => {
3001 let mut pairs: Vec<_> = map.iter().collect();
3002 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3003 format!(
3004 "map:{{{}}}",
3005 pairs
3006 .into_iter()
3007 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3008 .collect::<Vec<_>>()
3009 .join(",")
3010 )
3011 }
3012 Value::Node(n) => {
3013 let mut labels = n.labels.clone();
3014 labels.sort();
3015 format!(
3016 "node:{}:{}:{}",
3017 n.vid.as_u64(),
3018 labels.join(":"),
3019 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3020 )
3021 }
3022 Value::Edge(e) => format!(
3023 "edge:{}:{}:{}:{}:{}",
3024 e.eid.as_u64(),
3025 e.edge_type,
3026 e.src.as_u64(),
3027 e.dst.as_u64(),
3028 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3029 ),
3030 Value::Path(p) => format!(
3031 "path:nodes=[{}];edges=[{}]",
3032 p.nodes
3033 .iter()
3034 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3035 .collect::<Vec<_>>()
3036 .join(","),
3037 p.edges
3038 .iter()
3039 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3040 .collect::<Vec<_>>()
3041 .join(",")
3042 ),
3043 Value::Vector(vs) => format!("vec:{:?}", vs),
3044 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3045 _ => format!("{v:?}"),
3046 }
3047 }
3048
3049 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3050 match t {
3051 uni_common::TemporalValue::Date { days_since_epoch } => {
3052 format!("date:{days_since_epoch}")
3053 }
3054 uni_common::TemporalValue::LocalTime {
3055 nanos_since_midnight,
3056 } => format!("localtime:{nanos_since_midnight}"),
3057 uni_common::TemporalValue::Time {
3058 nanos_since_midnight,
3059 offset_seconds,
3060 } => {
3061 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3062 format!("time:{utc_nanos}")
3063 }
3064 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3065 format!("localdatetime:{nanos_since_epoch}")
3066 }
3067 uni_common::TemporalValue::DateTime {
3068 nanos_since_epoch, ..
3069 } => format!("datetime:{nanos_since_epoch}"),
3070 uni_common::TemporalValue::Duration {
3071 months,
3072 days,
3073 nanos,
3074 } => format!("duration:{months}:{days}:{nanos}"),
3075 uni_common::TemporalValue::Btic { lo, hi, meta } => {
3076 format!("btic:{lo}:{hi}:{meta}")
3077 }
3078 }
3079 }
3080
3081 fn temporal_string_key(s: &str) -> Option<String> {
3082 let fn_name = match classify_temporal(s)? {
3083 uni_common::TemporalType::Date => "DATE",
3084 uni_common::TemporalType::LocalTime => "LOCALTIME",
3085 uni_common::TemporalType::Time => "TIME",
3086 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3087 uni_common::TemporalType::DateTime => "DATETIME",
3088 uni_common::TemporalType::Duration => "DURATION",
3089 uni_common::TemporalType::Btic => return None, };
3091 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3092 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3093 _ => None,
3094 }
3095 }
3096
3097 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3100
3101 pub(crate) async fn execute_aggregate(
3102 &self,
3103 rows: Vec<HashMap<String, Value>>,
3104 group_by: &[Expr],
3105 aggregates: &[Expr],
3106 prop_manager: &PropertyManager,
3107 params: &HashMap<String, Value>,
3108 ctx: Option<&QueryContext>,
3109 ) -> Result<Vec<HashMap<String, Value>>> {
3110 if let Some(ctx) = ctx {
3112 ctx.check_timeout()?;
3113 }
3114
3115 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3116
3117 if rows.is_empty() {
3120 if group_by.is_empty() {
3121 let accs = Self::create_accumulators(aggregates);
3122 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3123 return Ok(vec![row]);
3124 }
3125 return Ok(vec![]);
3126 }
3127
3128 for (idx, row) in rows.into_iter().enumerate() {
3129 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3131 && let Some(ctx) = ctx
3132 {
3133 ctx.check_timeout()?;
3134 }
3135
3136 let key_vals = self
3137 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3138 .await?;
3139 let key_str = format!(
3142 "[{}]",
3143 key_vals
3144 .iter()
3145 .map(Self::canonical_value_key)
3146 .collect::<Vec<_>>()
3147 .join(",")
3148 );
3149
3150 let entry = groups
3151 .entry(key_str)
3152 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3153
3154 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3155 .await?;
3156 }
3157
3158 let results = groups
3159 .values()
3160 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3161 .collect();
3162
3163 Ok(results)
3164 }
3165
3166 pub(crate) async fn execute_window(
3167 &self,
3168 mut rows: Vec<HashMap<String, Value>>,
3169 window_exprs: &[Expr],
3170 _prop_manager: &PropertyManager,
3171 _params: &HashMap<String, Value>,
3172 ctx: Option<&QueryContext>,
3173 ) -> Result<Vec<HashMap<String, Value>>> {
3174 if let Some(ctx) = ctx {
3176 ctx.check_timeout()?;
3177 }
3178
3179 if rows.is_empty() || window_exprs.is_empty() {
3181 return Ok(rows);
3182 }
3183
3184 for window_expr in window_exprs {
3186 let Expr::FunctionCall {
3188 name,
3189 args,
3190 window_spec: Some(window_spec),
3191 ..
3192 } = window_expr
3193 else {
3194 return Err(anyhow!(
3195 "Window expression must be a FunctionCall with OVER clause: {:?}",
3196 window_expr
3197 ));
3198 };
3199
3200 let name_upper = name.to_uppercase();
3201
3202 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3204 return Err(anyhow!(
3205 "Unsupported window function: {}. Supported functions: {}",
3206 name,
3207 WINDOW_FUNCTIONS.join(", ")
3208 ));
3209 }
3210
3211 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3213
3214 for (row_idx, row) in rows.iter().enumerate() {
3215 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3217 vec![]
3219 } else {
3220 window_spec
3221 .partition_by
3222 .iter()
3223 .map(|expr| self.evaluate_simple_expr(expr, row))
3224 .collect::<Result<Vec<_>>>()?
3225 };
3226
3227 partition_map
3228 .entry(partition_key)
3229 .or_default()
3230 .push(row_idx);
3231 }
3232
3233 for (_partition_key, row_indices) in partition_map.iter_mut() {
3235 if !window_spec.order_by.is_empty() {
3237 row_indices.sort_by(|&a, &b| {
3238 for sort_item in &window_spec.order_by {
3239 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3240 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3241
3242 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3243 let cmp = Executor::compare_values(&va, &vb);
3244 let cmp = if sort_item.ascending {
3245 cmp
3246 } else {
3247 cmp.reverse()
3248 };
3249 if cmp != std::cmp::Ordering::Equal {
3250 return cmp;
3251 }
3252 }
3253 }
3254 std::cmp::Ordering::Equal
3255 });
3256 }
3257
3258 for (position, &row_idx) in row_indices.iter().enumerate() {
3260 let window_value = match name_upper.as_str() {
3261 "ROW_NUMBER" => Value::from((position + 1) as i64),
3262 "RANK" => {
3263 let rank = if position == 0 {
3265 1i64
3266 } else {
3267 let prev_row_idx = row_indices[position - 1];
3268 let same_as_prev = self.rows_have_same_sort_keys(
3269 &window_spec.order_by,
3270 &rows,
3271 row_idx,
3272 prev_row_idx,
3273 );
3274
3275 if same_as_prev {
3276 let mut group_start = position - 1;
3278 while group_start > 0 {
3279 let curr_idx = row_indices[group_start];
3280 let prev_idx = row_indices[group_start - 1];
3281 if !self.rows_have_same_sort_keys(
3282 &window_spec.order_by,
3283 &rows,
3284 curr_idx,
3285 prev_idx,
3286 ) {
3287 break;
3288 }
3289 group_start -= 1;
3290 }
3291 (group_start + 1) as i64
3292 } else {
3293 (position + 1) as i64
3294 }
3295 };
3296 Value::from(rank)
3297 }
3298 "DENSE_RANK" => {
3299 let mut dense_rank = 1i64;
3301 for i in 0..position {
3302 let curr_idx = row_indices[i + 1];
3303 let prev_idx = row_indices[i];
3304 if !self.rows_have_same_sort_keys(
3305 &window_spec.order_by,
3306 &rows,
3307 curr_idx,
3308 prev_idx,
3309 ) {
3310 dense_rank += 1;
3311 }
3312 }
3313 Value::from(dense_rank)
3314 }
3315 "LAG" => {
3316 let (value_expr, offset, default_value) =
3317 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3318
3319 if position >= offset {
3320 let target_idx = row_indices[position - offset];
3321 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3322 } else {
3323 default_value
3324 }
3325 }
3326 "LEAD" => {
3327 let (value_expr, offset, default_value) =
3328 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3329
3330 if position + offset < row_indices.len() {
3331 let target_idx = row_indices[position + offset];
3332 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3333 } else {
3334 default_value
3335 }
3336 }
3337 "NTILE" => {
3338 let num_buckets_expr = args.first().ok_or_else(|| {
3340 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3341 })?;
3342 let num_buckets_val =
3343 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3344 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3345 anyhow!(
3346 "NTILE argument must be an integer, got: {:?}",
3347 num_buckets_val
3348 )
3349 })?;
3350
3351 if num_buckets <= 0 {
3352 return Err(anyhow!(
3353 "NTILE bucket count must be positive, got: {}",
3354 num_buckets
3355 ));
3356 }
3357
3358 let num_buckets = num_buckets as usize;
3359 let partition_size = row_indices.len();
3360
3361 let base_size = partition_size / num_buckets;
3366 let extra_rows = partition_size % num_buckets;
3367
3368 let bucket = if position < extra_rows * (base_size + 1) {
3370 position / (base_size + 1) + 1
3372 } else {
3373 let adjusted_position = position - extra_rows * (base_size + 1);
3375 extra_rows + (adjusted_position / base_size) + 1
3376 };
3377
3378 Value::from(bucket as i64)
3379 }
3380 "FIRST_VALUE" => {
3381 let value_expr = args.first().ok_or_else(|| {
3383 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3384 })?;
3385
3386 if row_indices.is_empty() {
3388 Value::Null
3389 } else {
3390 let first_idx = row_indices[0];
3391 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3392 }
3393 }
3394 "LAST_VALUE" => {
3395 let value_expr = args.first().ok_or_else(|| {
3397 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3398 })?;
3399
3400 if row_indices.is_empty() {
3402 Value::Null
3403 } else {
3404 let last_idx = row_indices[row_indices.len() - 1];
3405 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3406 }
3407 }
3408 "NTH_VALUE" => {
3409 if args.len() != 2 {
3411 return Err(anyhow!(
3412 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3413 ));
3414 }
3415
3416 let value_expr = &args[0];
3417 let n_expr = &args[1];
3418
3419 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3420 let n = n_val.as_i64().ok_or_else(|| {
3421 anyhow!(
3422 "NTH_VALUE second argument must be an integer, got: {:?}",
3423 n_val
3424 )
3425 })?;
3426
3427 if n <= 0 {
3428 return Err(anyhow!(
3429 "NTH_VALUE position must be positive, got: {}",
3430 n
3431 ));
3432 }
3433
3434 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3436 let nth_idx = row_indices[nth_index];
3437 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3438 } else {
3439 Value::Null
3440 }
3441 }
3442 _ => unreachable!("Window function {} already validated", name),
3443 };
3444
3445 let col_name = window_expr.to_string_repr();
3448 rows[row_idx].insert(col_name, window_value);
3449 }
3450 }
3451 }
3452
3453 Ok(rows)
3454 }
3455
3456 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3461 match expr {
3462 Expr::Variable(name) => row
3463 .get(name)
3464 .cloned()
3465 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3466 Expr::Property(base, prop) => {
3467 let base_val = self.evaluate_simple_expr(base, row)?;
3468 if let Value::Map(map) = base_val {
3469 map.get(prop)
3470 .cloned()
3471 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3472 } else {
3473 Err(anyhow!("Cannot access property on non-object"))
3474 }
3475 }
3476 Expr::Literal(lit) => Ok(lit.to_value()),
3477 _ => Err(anyhow!(
3478 "Unsupported expression in window function: {:?}",
3479 expr
3480 )),
3481 }
3482 }
3483
3484 fn rows_have_same_sort_keys(
3486 &self,
3487 order_by: &[uni_cypher::ast::SortItem],
3488 rows: &[HashMap<String, Value>],
3489 idx_a: usize,
3490 idx_b: usize,
3491 ) -> bool {
3492 order_by.iter().all(|sort_item| {
3493 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3494 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3495 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3496 })
3497 }
3498
3499 fn extract_lag_lead_params<'a>(
3501 &self,
3502 func_name: &str,
3503 args: &'a [Expr],
3504 row: &HashMap<String, Value>,
3505 ) -> Result<(&'a Expr, usize, Value)> {
3506 let value_expr = args.first().ok_or_else(|| {
3507 anyhow!(
3508 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3509 func_name,
3510 func_name
3511 )
3512 })?;
3513
3514 let offset = if let Some(offset_expr) = args.get(1) {
3515 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3516 offset_val.as_i64().ok_or_else(|| {
3517 anyhow!(
3518 "{} offset must be an integer, got: {:?}",
3519 func_name,
3520 offset_val
3521 )
3522 })? as usize
3523 } else {
3524 1
3525 };
3526
3527 let default_value = if let Some(default_expr) = args.get(2) {
3528 self.evaluate_simple_expr(default_expr, row)?
3529 } else {
3530 Value::Null
3531 };
3532
3533 Ok((value_expr, offset, default_value))
3534 }
3535
3536 pub(crate) async fn evaluate_group_keys(
3538 &self,
3539 group_by: &[Expr],
3540 row: &HashMap<String, Value>,
3541 prop_manager: &PropertyManager,
3542 params: &HashMap<String, Value>,
3543 ctx: Option<&QueryContext>,
3544 ) -> Result<Vec<Value>> {
3545 let mut key_vals = Vec::new();
3546 for expr in group_by {
3547 key_vals.push(
3548 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3549 .await?,
3550 );
3551 }
3552 Ok(key_vals)
3553 }
3554
3555 pub(crate) async fn update_accumulators(
3557 &self,
3558 accs: &mut [Accumulator],
3559 aggregates: &[Expr],
3560 row: &HashMap<String, Value>,
3561 prop_manager: &PropertyManager,
3562 params: &HashMap<String, Value>,
3563 ctx: Option<&QueryContext>,
3564 ) -> Result<()> {
3565 for (i, agg_expr) in aggregates.iter().enumerate() {
3566 if let Expr::FunctionCall { args, .. } = agg_expr {
3567 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3568 let val = if is_wildcard {
3569 Value::Null
3570 } else {
3571 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3572 .await?
3573 };
3574 accs[i].update(&val, is_wildcard);
3575 }
3576 }
3577 Ok(())
3578 }
3579
3580 pub(crate) async fn execute_recursive_cte(
3582 &self,
3583 cte_name: &str,
3584 initial: LogicalPlan,
3585 recursive: LogicalPlan,
3586 prop_manager: &PropertyManager,
3587 params: &HashMap<String, Value>,
3588 ctx: Option<&QueryContext>,
3589 ) -> Result<Vec<HashMap<String, Value>>> {
3590 use std::collections::HashSet;
3591
3592 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3595 let mut pairs: Vec<_> = row.iter().collect();
3596 pairs.sort_by(|a, b| a.0.cmp(b.0));
3597 format!("{pairs:?}")
3598 }
3599
3600 let mut working_table = self
3602 .execute_subplan(initial, prop_manager, params, ctx)
3603 .await?;
3604 let mut result_table = working_table.clone();
3605
3606 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3608
3609 let max_iterations = self.config.max_recursive_cte_iterations;
3612 for _iteration in 0..max_iterations {
3613 if let Some(ctx) = ctx {
3615 ctx.check_timeout()?;
3616 }
3617
3618 if working_table.is_empty() {
3619 break;
3620 }
3621
3622 let working_val = Value::List(
3624 working_table
3625 .iter()
3626 .map(|row| {
3627 if row.len() == 1 {
3628 row.values().next().unwrap().clone()
3629 } else {
3630 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3631 }
3632 })
3633 .collect(),
3634 );
3635
3636 let mut next_params = params.clone();
3637 next_params.insert(cte_name.to_string(), working_val);
3638
3639 let next_result = self
3641 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3642 .await?;
3643
3644 if next_result.is_empty() {
3645 break;
3646 }
3647
3648 let new_rows: Vec<_> = next_result
3650 .into_iter()
3651 .filter(|row| {
3652 let key = row_key(row);
3653 seen.insert(key) })
3655 .collect();
3656
3657 if new_rows.is_empty() {
3658 break;
3660 }
3661
3662 result_table.extend(new_rows.clone());
3663 working_table = new_rows;
3664 }
3665
3666 let final_list = Value::List(
3668 result_table
3669 .into_iter()
3670 .map(|row| {
3671 if row.len() == 1 {
3683 row.values().next().unwrap().clone()
3684 } else {
3685 Value::Map(row.into_iter().collect())
3686 }
3687 })
3688 .collect(),
3689 );
3690
3691 let mut final_row = HashMap::new();
3692 final_row.insert(cte_name.to_string(), final_list);
3693 Ok(vec![final_row])
3694 }
3695
3696 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3698
3699 pub(crate) async fn execute_sort(
3700 &self,
3701 rows: Vec<HashMap<String, Value>>,
3702 order_by: &[uni_cypher::ast::SortItem],
3703 prop_manager: &PropertyManager,
3704 params: &HashMap<String, Value>,
3705 ctx: Option<&QueryContext>,
3706 ) -> Result<Vec<HashMap<String, Value>>> {
3707 if let Some(ctx) = ctx {
3709 ctx.check_timeout()?;
3710 }
3711
3712 let mut rows_with_keys = Vec::with_capacity(rows.len());
3713 for (idx, row) in rows.into_iter().enumerate() {
3714 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3716 && let Some(ctx) = ctx
3717 {
3718 ctx.check_timeout()?;
3719 }
3720
3721 let mut keys = Vec::new();
3722 for item in order_by {
3723 let val = row
3724 .get(&item.expr.to_string_repr())
3725 .cloned()
3726 .unwrap_or(Value::Null);
3727 let val = if val.is_null() {
3728 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3729 .await
3730 .unwrap_or(Value::Null)
3731 } else {
3732 val
3733 };
3734 keys.push(val);
3735 }
3736 rows_with_keys.push((row, keys));
3737 }
3738
3739 if let Some(ctx) = ctx {
3741 ctx.check_timeout()?;
3742 }
3743
3744 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3745
3746 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3747 }
3748
3749 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3751 aggregates
3752 .iter()
3753 .map(|expr| {
3754 if let Expr::FunctionCall { name, distinct, .. } = expr {
3755 Accumulator::new(name, *distinct)
3756 } else {
3757 Accumulator::new("COUNT", false)
3758 }
3759 })
3760 .collect()
3761 }
3762
3763 pub(crate) fn build_aggregate_result(
3765 group_by: &[Expr],
3766 aggregates: &[Expr],
3767 key_vals: &[Value],
3768 accs: &[Accumulator],
3769 ) -> HashMap<String, Value> {
3770 let mut res_row = HashMap::new();
3771 for (i, expr) in group_by.iter().enumerate() {
3772 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3773 }
3774 for (i, expr) in aggregates.iter().enumerate() {
3775 let col_name = crate::query::planner::aggregate_column_name(expr);
3777 res_row.insert(col_name, accs[i].finish());
3778 }
3779 res_row
3780 }
3781
3782 pub(crate) fn compare_sort_keys(
3784 a_keys: &[Value],
3785 b_keys: &[Value],
3786 order_by: &[uni_cypher::ast::SortItem],
3787 ) -> std::cmp::Ordering {
3788 for (i, item) in order_by.iter().enumerate() {
3789 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3790 if order != std::cmp::Ordering::Equal {
3791 return if item.ascending {
3792 order
3793 } else {
3794 order.reverse()
3795 };
3796 }
3797 }
3798 std::cmp::Ordering::Equal
3799 }
3800
3801 pub(crate) async fn execute_backup(
3805 &self,
3806 destination: &str,
3807 _options: &HashMap<String, Value>,
3808 ) -> Result<Vec<HashMap<String, Value>>> {
3809 if let Some(writer_arc) = &self.writer {
3811 let mut writer = writer_arc.write().await;
3812 writer.flush_to_l1(None).await?;
3813 }
3814
3815 let snapshot_manager = self.storage.snapshot_manager();
3817 let snapshot = snapshot_manager
3818 .load_latest_snapshot()
3819 .await?
3820 .ok_or_else(|| anyhow!("No snapshot found"))?;
3821
3822 if is_cloud_url(destination) {
3824 self.backup_to_cloud(destination, &snapshot.snapshot_id)
3825 .await?;
3826 } else {
3827 let validated_dest = self.validate_path(destination)?;
3829 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3830 .await?;
3831 }
3832
3833 let mut res = HashMap::new();
3834 res.insert(
3835 "status".to_string(),
3836 Value::String("Backup completed".to_string()),
3837 );
3838 res.insert(
3839 "snapshot_id".to_string(),
3840 Value::String(snapshot.snapshot_id),
3841 );
3842 Ok(vec![res])
3843 }
3844
3845 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3847 let source_path = std::path::Path::new(self.storage.base_path());
3848
3849 if !dest_path.exists() {
3850 std::fs::create_dir_all(dest_path)?;
3851 }
3852
3853 if source_path.exists() {
3855 Self::copy_dir_all(source_path, dest_path)?;
3856 }
3857
3858 let schema_manager = self.storage.schema_manager();
3860 let dest_catalog = dest_path.join("catalog");
3861 if !dest_catalog.exists() {
3862 std::fs::create_dir_all(&dest_catalog)?;
3863 }
3864
3865 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3866 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3867
3868 Ok(())
3869 }
3870
3871 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3875 use object_store::ObjectStore;
3876 use object_store::local::LocalFileSystem;
3877 use object_store::path::Path as ObjPath;
3878
3879 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3880 let source_path = std::path::Path::new(self.storage.base_path());
3881
3882 let src_store: Arc<dyn ObjectStore> =
3884 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3885
3886 let catalog_src = ObjPath::from("catalog");
3888 let catalog_dst = if dest_prefix.as_ref().is_empty() {
3889 ObjPath::from("catalog")
3890 } else {
3891 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3892 };
3893 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3894
3895 let storage_src = ObjPath::from("storage");
3897 let storage_dst = if dest_prefix.as_ref().is_empty() {
3898 ObjPath::from("storage")
3899 } else {
3900 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3901 };
3902 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3903
3904 let schema_manager = self.storage.schema_manager();
3906 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3907 let schema_path = if dest_prefix.as_ref().is_empty() {
3908 ObjPath::from("catalog/schema.json")
3909 } else {
3910 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3911 };
3912 dest_store
3913 .put(&schema_path, bytes::Bytes::from(schema_content).into())
3914 .await?;
3915
3916 Ok(())
3917 }
3918
3919 const MAX_BACKUP_DEPTH: usize = 100;
3924
3925 const MAX_BACKUP_FILES: usize = 100_000;
3930
3931 pub(crate) fn copy_dir_all(
3939 src: &std::path::Path,
3940 dst: &std::path::Path,
3941 ) -> std::io::Result<()> {
3942 let mut file_count = 0usize;
3943 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3944 }
3945
3946 pub(crate) fn copy_dir_all_impl(
3948 src: &std::path::Path,
3949 dst: &std::path::Path,
3950 depth: usize,
3951 file_count: &mut usize,
3952 ) -> std::io::Result<()> {
3953 if depth >= Self::MAX_BACKUP_DEPTH {
3954 return Err(std::io::Error::new(
3955 std::io::ErrorKind::InvalidInput,
3956 format!(
3957 "Maximum backup depth {} exceeded at {:?}",
3958 Self::MAX_BACKUP_DEPTH,
3959 src
3960 ),
3961 ));
3962 }
3963
3964 std::fs::create_dir_all(dst)?;
3965
3966 for entry in std::fs::read_dir(src)? {
3967 if *file_count >= Self::MAX_BACKUP_FILES {
3968 return Err(std::io::Error::new(
3969 std::io::ErrorKind::InvalidInput,
3970 format!(
3971 "Maximum backup file count {} exceeded",
3972 Self::MAX_BACKUP_FILES
3973 ),
3974 ));
3975 }
3976 *file_count += 1;
3977
3978 let entry = entry?;
3979 let metadata = entry.metadata()?;
3980
3981 if metadata.file_type().is_symlink() {
3983 continue;
3985 }
3986
3987 let dst_path = dst.join(entry.file_name());
3988 if metadata.is_dir() {
3989 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
3990 } else {
3991 std::fs::copy(entry.path(), dst_path)?;
3992 }
3993 }
3994 Ok(())
3995 }
3996
3997 pub(crate) async fn execute_copy(
3998 &self,
3999 target: &str,
4000 source: &str,
4001 options: &HashMap<String, Value>,
4002 prop_manager: &PropertyManager,
4003 ) -> Result<Vec<HashMap<String, Value>>> {
4004 let format = options
4005 .get("format")
4006 .and_then(|v| v.as_str())
4007 .unwrap_or_else(|| {
4008 if source.ends_with(".parquet") {
4009 "parquet"
4010 } else {
4011 "csv"
4012 }
4013 });
4014
4015 match format.to_lowercase().as_str() {
4016 "csv" => self.execute_csv_import(target, source, options).await,
4017 "parquet" => {
4018 self.execute_parquet_import(target, source, options, prop_manager)
4019 .await
4020 }
4021 _ => Err(anyhow!("Unsupported format: {}", format)),
4022 }
4023 }
4024
4025 pub(crate) async fn execute_csv_import(
4026 &self,
4027 target: &str,
4028 source: &str,
4029 options: &HashMap<String, Value>,
4030 ) -> Result<Vec<HashMap<String, Value>>> {
4031 let validated_source = self.validate_path(source)?;
4033
4034 let writer_lock = self
4035 .writer
4036 .as_ref()
4037 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4038
4039 let schema = self.storage.schema_manager().schema();
4040
4041 let label_meta = schema.labels.get(target);
4043 let edge_meta = schema.edge_types.get(target);
4044
4045 if label_meta.is_none() && edge_meta.is_none() {
4046 return Err(anyhow!("Target '{}' not found in schema", target));
4047 }
4048
4049 let delimiter_str = options
4051 .get("delimiter")
4052 .and_then(|v| v.as_str())
4053 .unwrap_or(",");
4054 let delimiter = if delimiter_str.is_empty() {
4055 b','
4056 } else {
4057 delimiter_str.as_bytes()[0]
4058 };
4059 let has_header = options
4060 .get("header")
4061 .and_then(|v| v.as_bool())
4062 .unwrap_or(true);
4063
4064 let mut rdr = csv::ReaderBuilder::new()
4065 .delimiter(delimiter)
4066 .has_headers(has_header)
4067 .from_path(&validated_source)?;
4068
4069 let headers = rdr.headers()?.clone();
4070 let mut count = 0;
4071
4072 let mut writer = writer_lock.write().await;
4073
4074 if label_meta.is_some() {
4075 let target_props = schema
4076 .properties
4077 .get(target)
4078 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4079
4080 for result in rdr.records() {
4081 let record = result?;
4082 let mut props = HashMap::new();
4083
4084 for (i, header) in headers.iter().enumerate() {
4085 if let Some(val_str) = record.get(i)
4086 && let Some(prop_meta) = target_props.get(header)
4087 {
4088 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4089 props.insert(header.to_string(), val);
4090 }
4091 }
4092
4093 let vid = writer.next_vid().await?;
4094 writer
4095 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4096 .await?;
4097 count += 1;
4098 }
4099 } else if let Some(meta) = edge_meta {
4100 let type_id = meta.id;
4101 let target_props = schema
4102 .properties
4103 .get(target)
4104 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4105
4106 let src_col = options
4109 .get("src_col")
4110 .and_then(|v| v.as_str())
4111 .unwrap_or("_src");
4112 let dst_col = options
4113 .get("dst_col")
4114 .and_then(|v| v.as_str())
4115 .unwrap_or("_dst");
4116
4117 for result in rdr.records() {
4118 let record = result?;
4119 let mut props = HashMap::new();
4120 let mut src_vid = None;
4121 let mut dst_vid = None;
4122
4123 for (i, header) in headers.iter().enumerate() {
4124 if let Some(val_str) = record.get(i) {
4125 if header == src_col {
4126 src_vid =
4127 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4128 } else if header == dst_col {
4129 dst_vid =
4130 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4131 } else if let Some(prop_meta) = target_props.get(header) {
4132 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4133 props.insert(header.to_string(), val);
4134 }
4135 }
4136 }
4137
4138 let src =
4139 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4140 let dst = dst_vid
4141 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4142
4143 let eid = writer.next_eid(type_id).await?;
4144 writer
4145 .insert_edge(
4146 src,
4147 dst,
4148 type_id,
4149 eid,
4150 props,
4151 Some(target.to_string()),
4152 None,
4153 )
4154 .await?;
4155 count += 1;
4156 }
4157 }
4158
4159 let mut res = HashMap::new();
4160 res.insert("count".to_string(), Value::Int(count as i64));
4161 Ok(vec![res])
4162 }
4163
4164 pub(crate) async fn execute_parquet_import(
4168 &self,
4169 target: &str,
4170 source: &str,
4171 options: &HashMap<String, Value>,
4172 _prop_manager: &PropertyManager,
4173 ) -> Result<Vec<HashMap<String, Value>>> {
4174 let writer_lock = self
4175 .writer
4176 .as_ref()
4177 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4178
4179 let schema = self.storage.schema_manager().schema();
4180
4181 let label_meta = schema.labels.get(target);
4183 let edge_meta = schema.edge_types.get(target);
4184
4185 if label_meta.is_none() && edge_meta.is_none() {
4186 return Err(anyhow!("Target '{}' not found in schema", target));
4187 }
4188
4189 let reader = if is_cloud_url(source) {
4191 self.open_parquet_from_cloud(source).await?
4192 } else {
4193 let validated_source = self.validate_path(source)?;
4195 let file = std::fs::File::open(&validated_source)?;
4196 let builder =
4197 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4198 builder.build()?
4199 };
4200 let mut reader = reader;
4201
4202 let mut count = 0;
4203 let mut writer = writer_lock.write().await;
4204
4205 if label_meta.is_some() {
4206 let target_props = schema
4207 .properties
4208 .get(target)
4209 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4210
4211 for batch in reader.by_ref() {
4212 let batch = batch?;
4213 for row in 0..batch.num_rows() {
4214 let mut props = HashMap::new();
4215 for field in batch.schema().fields() {
4216 let name = field.name();
4217 if target_props.contains_key(name) {
4218 let col = batch.column_by_name(name).unwrap();
4219 if !col.is_null(row) {
4220 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4222 let val =
4223 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4224 props.insert(name.clone(), val);
4225 }
4226 }
4227 }
4228 let vid = writer.next_vid().await?;
4229 writer
4230 .insert_vertex_with_labels(vid, props, &[target.to_string()], None)
4231 .await?;
4232 count += 1;
4233 }
4234 }
4235 } else if let Some(meta) = edge_meta {
4236 let type_id = meta.id;
4237 let target_props = schema
4238 .properties
4239 .get(target)
4240 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4241
4242 let src_col = options
4243 .get("src_col")
4244 .and_then(|v| v.as_str())
4245 .unwrap_or("_src");
4246 let dst_col = options
4247 .get("dst_col")
4248 .and_then(|v| v.as_str())
4249 .unwrap_or("_dst");
4250
4251 for batch in reader {
4252 let batch = batch?;
4253 for row in 0..batch.num_rows() {
4254 let mut props = HashMap::new();
4255 let mut src_vid = None;
4256 let mut dst_vid = None;
4257
4258 for field in batch.schema().fields() {
4259 let name = field.name();
4260 let col = batch.column_by_name(name).unwrap();
4261 if col.is_null(row) {
4262 continue;
4263 }
4264
4265 if name == src_col {
4266 let val = Self::arrow_to_value(col.as_ref(), row);
4267 src_vid = Some(Self::vid_from_value(&val)?);
4268 } else if name == dst_col {
4269 let val = Self::arrow_to_value(col.as_ref(), row);
4270 dst_vid = Some(Self::vid_from_value(&val)?);
4271 } else if let Some(pm) = target_props.get(name) {
4272 let val =
4274 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4275 props.insert(name.clone(), val);
4276 }
4277 }
4278
4279 let src = src_vid
4280 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4281 let dst = dst_vid.ok_or_else(|| {
4282 anyhow!("Missing destination VID in column '{}'", dst_col)
4283 })?;
4284
4285 let eid = writer.next_eid(type_id).await?;
4286 writer
4287 .insert_edge(
4288 src,
4289 dst,
4290 type_id,
4291 eid,
4292 props,
4293 Some(target.to_string()),
4294 None,
4295 )
4296 .await?;
4297 count += 1;
4298 }
4299 }
4300 }
4301
4302 let mut res = HashMap::new();
4303 res.insert("count".to_string(), Value::Int(count as i64));
4304 Ok(vec![res])
4305 }
4306
4307 async fn open_parquet_from_cloud(
4311 &self,
4312 source_url: &str,
4313 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4314 use object_store::ObjectStore;
4315
4316 let (store, path) = build_store_from_url(source_url)?;
4317
4318 let bytes = store.get(&path).await?.bytes().await?;
4320
4321 let reader = bytes::Bytes::from(bytes.to_vec());
4323 let builder =
4324 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4325 Ok(builder.build()?)
4326 }
4327
4328 pub(crate) async fn scan_edge_type(
4329 &self,
4330 edge_type: &str,
4331 ctx: Option<&QueryContext>,
4332 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4333 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4334
4335 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4337
4338 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4340
4341 if let Some(ctx) = ctx {
4343 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4344 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4345 }
4346
4347 Ok(edges
4348 .into_iter()
4349 .map(|(eid, (src, dst))| (eid, src, dst))
4350 .collect())
4351 }
4352
4353 pub(crate) async fn scan_edge_type_l2(
4358 &self,
4359 _edge_type: &str,
4360 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4361 ) -> Result<()> {
4362 Ok(())
4365 }
4366
4367 pub(crate) async fn scan_edge_type_l1(
4369 &self,
4370 edge_type: &str,
4371 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4372 ) -> Result<()> {
4373 if let Ok(Some(batch)) = self
4374 .storage
4375 .scan_delta_table(
4376 edge_type,
4377 "fwd",
4378 &["eid", "src_vid", "dst_vid", "op", "_version"],
4379 None,
4380 )
4381 .await
4382 {
4383 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4385 HashMap::new();
4386
4387 self.process_delta_batch(&batch, &mut versioned_ops)?;
4388
4389 for (eid, (_, op, src, dst)) in versioned_ops {
4391 if op == 0 {
4392 edges.insert(eid, (src, dst));
4393 } else if op == 1 {
4394 edges.remove(&eid);
4395 }
4396 }
4397 }
4398 Ok(())
4399 }
4400
4401 pub(crate) fn process_delta_batch(
4403 &self,
4404 batch: &arrow_array::RecordBatch,
4405 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4406 ) -> Result<()> {
4407 use arrow_array::UInt64Array;
4408 let eid_col = batch
4409 .column_by_name("eid")
4410 .ok_or(anyhow!("Missing eid"))?
4411 .as_any()
4412 .downcast_ref::<UInt64Array>()
4413 .ok_or(anyhow!("Invalid eid"))?;
4414 let src_col = batch
4415 .column_by_name("src_vid")
4416 .ok_or(anyhow!("Missing src_vid"))?
4417 .as_any()
4418 .downcast_ref::<UInt64Array>()
4419 .ok_or(anyhow!("Invalid src_vid"))?;
4420 let dst_col = batch
4421 .column_by_name("dst_vid")
4422 .ok_or(anyhow!("Missing dst_vid"))?
4423 .as_any()
4424 .downcast_ref::<UInt64Array>()
4425 .ok_or(anyhow!("Invalid dst_vid"))?;
4426 let op_col = batch
4427 .column_by_name("op")
4428 .ok_or(anyhow!("Missing op"))?
4429 .as_any()
4430 .downcast_ref::<arrow_array::UInt8Array>()
4431 .ok_or(anyhow!("Invalid op"))?;
4432 let version_col = batch
4433 .column_by_name("_version")
4434 .ok_or(anyhow!("Missing _version"))?
4435 .as_any()
4436 .downcast_ref::<UInt64Array>()
4437 .ok_or(anyhow!("Invalid _version"))?;
4438
4439 for i in 0..batch.num_rows() {
4440 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4441 let version = version_col.value(i);
4442 let op = op_col.value(i);
4443 let src = Vid::from(src_col.value(i));
4444 let dst = Vid::from(dst_col.value(i));
4445
4446 match versioned_ops.entry(eid) {
4447 std::collections::hash_map::Entry::Vacant(e) => {
4448 e.insert((version, op, src, dst));
4449 }
4450 std::collections::hash_map::Entry::Occupied(mut e) => {
4451 if version > e.get().0 {
4452 e.insert((version, op, src, dst));
4453 }
4454 }
4455 }
4456 }
4457 Ok(())
4458 }
4459
4460 pub(crate) fn scan_edge_type_l0(
4462 &self,
4463 edge_type: &str,
4464 ctx: &QueryContext,
4465 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4466 ) {
4467 let schema = self.storage.schema_manager().schema();
4468 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4469
4470 if let Some(type_id) = type_id {
4471 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4473
4474 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4476 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4477 }
4478
4479 for pending_l0_arc in &ctx.pending_flush_l0s {
4481 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4482 }
4483 }
4484 }
4485
4486 pub(crate) fn scan_single_l0(
4488 &self,
4489 l0: &uni_store::runtime::L0Buffer,
4490 type_id: u32,
4491 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4492 ) {
4493 for edge_entry in l0.graph.edges() {
4494 if edge_entry.edge_type == type_id {
4495 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4496 }
4497 }
4498 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4500 for eid in eids_to_check {
4501 if l0.is_tombstoned(eid) {
4502 edges.remove(&eid);
4503 }
4504 }
4505 }
4506
4507 pub(crate) fn filter_tombstoned_vertex_edges(
4509 &self,
4510 ctx: &QueryContext,
4511 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4512 ) {
4513 let l0 = ctx.l0.read();
4514 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4515
4516 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4518 let tx_l0 = tx_l0_arc.read();
4519 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4520 }
4521
4522 for pending_l0_arc in &ctx.pending_flush_l0s {
4524 let pending_l0 = pending_l0_arc.read();
4525 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4526 }
4527
4528 edges.retain(|_, (src, dst)| {
4529 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4530 });
4531 }
4532
4533 pub(crate) async fn execute_project(
4535 &self,
4536 input_rows: Vec<HashMap<String, Value>>,
4537 projections: &[(Expr, Option<String>)],
4538 prop_manager: &PropertyManager,
4539 params: &HashMap<String, Value>,
4540 ctx: Option<&QueryContext>,
4541 ) -> Result<Vec<HashMap<String, Value>>> {
4542 let mut results = Vec::new();
4543 for m in input_rows {
4544 let mut row = HashMap::new();
4545 for (expr, alias) in projections {
4546 let val = self
4547 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4548 .await?;
4549 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4550 row.insert(name, val);
4551 }
4552 results.push(row);
4553 }
4554 Ok(results)
4555 }
4556
4557 pub(crate) async fn execute_unwind(
4559 &self,
4560 input_rows: Vec<HashMap<String, Value>>,
4561 expr: &Expr,
4562 variable: &str,
4563 prop_manager: &PropertyManager,
4564 params: &HashMap<String, Value>,
4565 ctx: Option<&QueryContext>,
4566 ) -> Result<Vec<HashMap<String, Value>>> {
4567 let mut results = Vec::new();
4568 for row in input_rows {
4569 let val = self
4570 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4571 .await?;
4572 if let Value::List(items) = val {
4573 for item in items {
4574 let mut new_row = row.clone();
4575 new_row.insert(variable.to_string(), item);
4576 results.push(new_row);
4577 }
4578 }
4579 }
4580 Ok(results)
4581 }
4582
4583 pub(crate) async fn execute_apply(
4585 &self,
4586 input_rows: Vec<HashMap<String, Value>>,
4587 subquery: &LogicalPlan,
4588 input_filter: Option<&Expr>,
4589 prop_manager: &PropertyManager,
4590 params: &HashMap<String, Value>,
4591 ctx: Option<&QueryContext>,
4592 ) -> Result<Vec<HashMap<String, Value>>> {
4593 let mut filtered_rows = input_rows;
4594
4595 if let Some(filter) = input_filter {
4596 let mut filtered = Vec::new();
4597 for row in filtered_rows {
4598 let res = self
4599 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4600 .await?;
4601 if res.as_bool().unwrap_or(false) {
4602 filtered.push(row);
4603 }
4604 }
4605 filtered_rows = filtered;
4606 }
4607
4608 if filtered_rows.is_empty() {
4611 let sub_rows = self
4612 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4613 .await?;
4614 return Ok(sub_rows);
4615 }
4616
4617 let mut results = Vec::new();
4618 for row in filtered_rows {
4619 let mut sub_params = params.clone();
4620 sub_params.extend(row.clone());
4621
4622 let sub_rows = self
4623 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4624 .await?;
4625
4626 for sub_row in sub_rows {
4627 let mut new_row = row.clone();
4628 new_row.extend(sub_row);
4629 results.push(new_row);
4630 }
4631 }
4632 Ok(results)
4633 }
4634
4635 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4637 let schema = self.storage.schema_manager().schema();
4638 let mut rows = Vec::new();
4639 for idx in &schema.indexes {
4640 let (name, type_str, details) = match idx {
4641 uni_common::core::schema::IndexDefinition::Vector(c) => (
4642 c.name.clone(),
4643 "VECTOR",
4644 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4645 ),
4646 uni_common::core::schema::IndexDefinition::FullText(c) => (
4647 c.name.clone(),
4648 "FULLTEXT",
4649 format!("on {}:{:?}", c.label, c.properties),
4650 ),
4651 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4652 cfg.name.clone(),
4653 "SCALAR",
4654 format!(":{}({:?})", cfg.label, cfg.properties),
4655 ),
4656 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4657 };
4658
4659 if let Some(f) = filter
4660 && f != type_str
4661 {
4662 continue;
4663 }
4664
4665 let mut row = HashMap::new();
4666 row.insert("name".to_string(), Value::String(name));
4667 row.insert("type".to_string(), Value::String(type_str.to_string()));
4668 row.insert("details".to_string(), Value::String(details));
4669 rows.push(row);
4670 }
4671 rows
4672 }
4673
4674 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4675 let mut row = HashMap::new();
4676 row.insert("name".to_string(), Value::String("uni".to_string()));
4677 vec![row]
4679 }
4680
4681 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4682 vec![]
4684 }
4685
4686 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4687 let snapshot = self
4688 .storage
4689 .snapshot_manager()
4690 .load_latest_snapshot()
4691 .await?;
4692 let mut results = Vec::new();
4693
4694 if let Some(snap) = snapshot {
4695 for (label, s) in &snap.vertices {
4696 let mut row = HashMap::new();
4697 row.insert("type".to_string(), Value::String("Label".to_string()));
4698 row.insert("name".to_string(), Value::String(label.clone()));
4699 row.insert("count".to_string(), Value::Int(s.count as i64));
4700 results.push(row);
4701 }
4702 for (edge, s) in &snap.edges {
4703 let mut row = HashMap::new();
4704 row.insert("type".to_string(), Value::String("Edge".to_string()));
4705 row.insert("name".to_string(), Value::String(edge.clone()));
4706 row.insert("count".to_string(), Value::Int(s.count as i64));
4707 results.push(row);
4708 }
4709 }
4710
4711 Ok(results)
4712 }
4713
4714 pub(crate) fn execute_show_constraints(
4715 &self,
4716 clause: ShowConstraints,
4717 ) -> Vec<HashMap<String, Value>> {
4718 let schema = self.storage.schema_manager().schema();
4719 let mut rows = Vec::new();
4720 for c in &schema.constraints {
4721 if let Some(target) = &clause.target {
4722 match (target, &c.target) {
4723 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4724 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4725 if e1 == e2 => {}
4726 _ => continue,
4727 }
4728 }
4729
4730 let mut row = HashMap::new();
4731 row.insert("name".to_string(), Value::String(c.name.clone()));
4732 let type_str = match c.constraint_type {
4733 ConstraintType::Unique { .. } => "UNIQUE",
4734 ConstraintType::Exists { .. } => "EXISTS",
4735 ConstraintType::Check { .. } => "CHECK",
4736 _ => "UNKNOWN",
4737 };
4738 row.insert("type".to_string(), Value::String(type_str.to_string()));
4739
4740 let target_str = match &c.target {
4741 ConstraintTarget::Label(l) => format!("(:{})", l),
4742 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4743 _ => "UNKNOWN".to_string(),
4744 };
4745 row.insert("target".to_string(), Value::String(target_str));
4746
4747 rows.push(row);
4748 }
4749 rows
4750 }
4751
4752 pub(crate) async fn execute_cross_join(
4754 &self,
4755 left: Box<LogicalPlan>,
4756 right: Box<LogicalPlan>,
4757 prop_manager: &PropertyManager,
4758 params: &HashMap<String, Value>,
4759 ctx: Option<&QueryContext>,
4760 ) -> Result<Vec<HashMap<String, Value>>> {
4761 let left_rows = self
4762 .execute_subplan(*left, prop_manager, params, ctx)
4763 .await?;
4764 let right_rows = self
4765 .execute_subplan(*right, prop_manager, params, ctx)
4766 .await?;
4767
4768 let mut results = Vec::new();
4769 for l in &left_rows {
4770 for r in &right_rows {
4771 let mut combined = l.clone();
4772 combined.extend(r.clone());
4773 results.push(combined);
4774 }
4775 }
4776 Ok(results)
4777 }
4778
4779 pub(crate) async fn execute_union(
4781 &self,
4782 left: Box<LogicalPlan>,
4783 right: Box<LogicalPlan>,
4784 all: bool,
4785 prop_manager: &PropertyManager,
4786 params: &HashMap<String, Value>,
4787 ctx: Option<&QueryContext>,
4788 ) -> Result<Vec<HashMap<String, Value>>> {
4789 let mut left_rows = self
4790 .execute_subplan(*left, prop_manager, params, ctx)
4791 .await?;
4792 let mut right_rows = self
4793 .execute_subplan(*right, prop_manager, params, ctx)
4794 .await?;
4795
4796 left_rows.append(&mut right_rows);
4797
4798 if !all {
4799 let mut seen = HashSet::new();
4800 left_rows.retain(|row| {
4801 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4802 let key = format!("{sorted_row:?}");
4803 seen.insert(key)
4804 });
4805 }
4806 Ok(left_rows)
4807 }
4808
4809 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4811 let schema = self.storage.schema_manager().schema();
4812 schema.indexes.iter().any(|idx| match idx {
4813 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4814 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4815 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4816 _ => false,
4817 })
4818 }
4819
4820 pub(crate) async fn execute_export(
4821 &self,
4822 target: &str,
4823 source: &str,
4824 options: &HashMap<String, Value>,
4825 prop_manager: &PropertyManager,
4826 ctx: Option<&QueryContext>,
4827 ) -> Result<Vec<HashMap<String, Value>>> {
4828 let format = options
4829 .get("format")
4830 .and_then(|v| v.as_str())
4831 .unwrap_or("csv")
4832 .to_lowercase();
4833
4834 match format.as_str() {
4835 "csv" => {
4836 self.execute_csv_export(target, source, options, prop_manager, ctx)
4837 .await
4838 }
4839 "parquet" => {
4840 self.execute_parquet_export(target, source, options, prop_manager, ctx)
4841 .await
4842 }
4843 _ => Err(anyhow!("Unsupported export format: {}", format)),
4844 }
4845 }
4846
4847 pub(crate) async fn execute_csv_export(
4848 &self,
4849 target: &str,
4850 source: &str,
4851 options: &HashMap<String, Value>,
4852 prop_manager: &PropertyManager,
4853 ctx: Option<&QueryContext>,
4854 ) -> Result<Vec<HashMap<String, Value>>> {
4855 let validated_dest = self.validate_path(source)?;
4857
4858 let schema = self.storage.schema_manager().schema();
4859 let label_meta = schema.labels.get(target);
4860 let edge_meta = schema.edge_types.get(target);
4861
4862 if label_meta.is_none() && edge_meta.is_none() {
4863 return Err(anyhow!("Target '{}' not found in schema", target));
4864 }
4865
4866 let delimiter_str = options
4867 .get("delimiter")
4868 .and_then(|v| v.as_str())
4869 .unwrap_or(",");
4870 let delimiter = if delimiter_str.is_empty() {
4871 b','
4872 } else {
4873 delimiter_str.as_bytes()[0]
4874 };
4875 let has_header = options
4876 .get("header")
4877 .and_then(|v| v.as_bool())
4878 .unwrap_or(true);
4879
4880 let mut wtr = csv::WriterBuilder::new()
4881 .delimiter(delimiter)
4882 .from_path(&validated_dest)?;
4883
4884 let mut count = 0;
4885 let empty_props = HashMap::new();
4887
4888 if let Some(meta) = label_meta {
4889 let label_id = meta.id;
4890 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4891 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4892 prop_names.sort();
4893
4894 let mut headers = vec!["_vid".to_string()];
4895 headers.extend(prop_names.clone());
4896
4897 if has_header {
4898 wtr.write_record(&headers)?;
4899 }
4900
4901 let vids = self
4902 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4903 .await?;
4904
4905 for vid in vids {
4906 let props = prop_manager
4907 .get_all_vertex_props_with_ctx(vid, ctx)
4908 .await?
4909 .unwrap_or_default();
4910
4911 let mut row = Vec::with_capacity(headers.len());
4912 row.push(vid.to_string());
4913 for p_name in &prop_names {
4914 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4915 row.push(self.format_csv_value(val));
4916 }
4917 wtr.write_record(&row)?;
4918 count += 1;
4919 }
4920 } else if let Some(meta) = edge_meta {
4921 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4922 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4923 prop_names.sort();
4924
4925 let mut headers = vec![
4927 "_eid".to_string(),
4928 "_src".to_string(),
4929 "_dst".to_string(),
4930 "_type".to_string(),
4931 ];
4932 headers.extend(prop_names.clone());
4933
4934 if has_header {
4935 wtr.write_record(&headers)?;
4936 }
4937
4938 let edges = self.scan_edge_type(target, ctx).await?;
4939
4940 for (eid, src, dst) in edges {
4941 let props = prop_manager
4942 .get_all_edge_props_with_ctx(eid, ctx)
4943 .await?
4944 .unwrap_or_default();
4945
4946 let mut row = Vec::with_capacity(headers.len());
4947 row.push(eid.to_string());
4948 row.push(src.to_string());
4949 row.push(dst.to_string());
4950 row.push(meta.id.to_string());
4951
4952 for p_name in &prop_names {
4953 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4954 row.push(self.format_csv_value(val));
4955 }
4956 wtr.write_record(&row)?;
4957 count += 1;
4958 }
4959 }
4960
4961 wtr.flush()?;
4962 let mut res = HashMap::new();
4963 res.insert("count".to_string(), Value::Int(count as i64));
4964 Ok(vec![res])
4965 }
4966
4967 pub(crate) async fn execute_parquet_export(
4971 &self,
4972 target: &str,
4973 destination: &str,
4974 _options: &HashMap<String, Value>,
4975 prop_manager: &PropertyManager,
4976 ctx: Option<&QueryContext>,
4977 ) -> Result<Vec<HashMap<String, Value>>> {
4978 let schema_manager = self.storage.schema_manager();
4979 let schema = schema_manager.schema();
4980 let label_meta = schema.labels.get(target);
4981 let edge_meta = schema.edge_types.get(target);
4982
4983 if label_meta.is_none() && edge_meta.is_none() {
4984 return Err(anyhow!("Target '{}' not found in schema", target));
4985 }
4986
4987 let arrow_schema = if label_meta.is_some() {
4988 let dataset = self.storage.vertex_dataset(target)?;
4989 dataset.get_arrow_schema(&schema)?
4990 } else {
4991 let dataset = self.storage.edge_dataset(target, "", "")?;
4993 dataset.get_arrow_schema(&schema)?
4994 };
4995
4996 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
4997
4998 if let Some(meta) = label_meta {
4999 let label_id = meta.id;
5000 let vids = self
5001 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5002 .await?;
5003
5004 for vid in vids {
5005 let mut props = prop_manager
5006 .get_all_vertex_props_with_ctx(vid, ctx)
5007 .await?
5008 .unwrap_or_default();
5009
5010 props.insert(
5011 "_vid".to_string(),
5012 uni_common::Value::Int(vid.as_u64() as i64),
5013 );
5014 if !props.contains_key("_uid") {
5015 props.insert(
5016 "_uid".to_string(),
5017 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5018 );
5019 }
5020 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5021 props.insert("_version".to_string(), uni_common::Value::Int(1));
5022 rows.push(props);
5023 }
5024 } else if edge_meta.is_some() {
5025 let edges = self.scan_edge_type(target, ctx).await?;
5026 for (eid, src, dst) in edges {
5027 let mut props = prop_manager
5028 .get_all_edge_props_with_ctx(eid, ctx)
5029 .await?
5030 .unwrap_or_default();
5031
5032 props.insert(
5033 "eid".to_string(),
5034 uni_common::Value::Int(eid.as_u64() as i64),
5035 );
5036 props.insert(
5037 "src_vid".to_string(),
5038 uni_common::Value::Int(src.as_u64() as i64),
5039 );
5040 props.insert(
5041 "dst_vid".to_string(),
5042 uni_common::Value::Int(dst.as_u64() as i64),
5043 );
5044 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5045 props.insert("_version".to_string(), uni_common::Value::Int(1));
5046 rows.push(props);
5047 }
5048 }
5049
5050 if is_cloud_url(destination) {
5052 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5053 .await?;
5054 } else {
5055 let validated_dest = self.validate_path(destination)?;
5057 let file = std::fs::File::create(&validated_dest)?;
5058 let mut writer =
5059 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5060
5061 if !rows.is_empty() {
5063 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5064 writer.write(&batch)?;
5065 }
5066
5067 writer.close()?;
5068 }
5069
5070 let mut res = HashMap::new();
5071 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5072 Ok(vec![res])
5073 }
5074
5075 async fn write_parquet_to_cloud(
5077 &self,
5078 dest_url: &str,
5079 rows: &[HashMap<String, uni_common::Value>],
5080 arrow_schema: &arrow_schema::Schema,
5081 ) -> Result<()> {
5082 use object_store::ObjectStore;
5083
5084 let (store, path) = build_store_from_url(dest_url)?;
5085
5086 let mut buffer = Vec::new();
5088 {
5089 let mut writer = parquet::arrow::ArrowWriter::try_new(
5090 &mut buffer,
5091 Arc::new(arrow_schema.clone()),
5092 None,
5093 )?;
5094
5095 if !rows.is_empty() {
5096 let batch = self.rows_to_batch(rows, arrow_schema)?;
5097 writer.write(&batch)?;
5098 }
5099
5100 writer.close()?;
5101 }
5102
5103 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5105
5106 Ok(())
5107 }
5108
5109 pub(crate) fn rows_to_batch(
5110 &self,
5111 rows: &[HashMap<String, uni_common::Value>],
5112 schema: &arrow_schema::Schema,
5113 ) -> Result<RecordBatch> {
5114 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5115
5116 for field in schema.fields() {
5117 let name = field.name();
5118 let dt = field.data_type();
5119
5120 let values: Vec<uni_common::Value> = rows
5121 .iter()
5122 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5123 .collect();
5124 let array = self.values_to_array(&values, dt)?;
5125 columns.push(array);
5126 }
5127
5128 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5129 }
5130
5131 pub(crate) fn values_to_array(
5134 &self,
5135 values: &[uni_common::Value],
5136 dt: &arrow_schema::DataType,
5137 ) -> Result<Arc<dyn Array>> {
5138 arrow_convert::values_to_array(values, dt)
5139 }
5140
5141 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5142 match val {
5143 Value::Null => "".to_string(),
5144 Value::String(s) => s,
5145 Value::Int(i) => i.to_string(),
5146 Value::Float(f) => f.to_string(),
5147 Value::Bool(b) => b.to_string(),
5148 _ => format!("{val}"),
5149 }
5150 }
5151
5152 pub(crate) fn parse_csv_value(
5153 &self,
5154 s: &str,
5155 data_type: &uni_common::core::schema::DataType,
5156 prop_name: &str,
5157 ) -> Result<Value> {
5158 if s.is_empty() || s.to_lowercase() == "null" {
5159 return Ok(Value::Null);
5160 }
5161
5162 use uni_common::core::schema::DataType;
5163 match data_type {
5164 DataType::String => Ok(Value::String(s.to_string())),
5165 DataType::Int32 | DataType::Int64 => {
5166 let i = s.parse::<i64>().map_err(|_| {
5167 anyhow!(
5168 "Failed to parse integer for property '{}': {}",
5169 prop_name,
5170 s
5171 )
5172 })?;
5173 Ok(Value::Int(i))
5174 }
5175 DataType::Float32 | DataType::Float64 => {
5176 let f = s.parse::<f64>().map_err(|_| {
5177 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5178 })?;
5179 Ok(Value::Float(f))
5180 }
5181 DataType::Bool => {
5182 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5183 anyhow!(
5184 "Failed to parse boolean for property '{}': {}",
5185 prop_name,
5186 s
5187 )
5188 })?;
5189 Ok(Value::Bool(b))
5190 }
5191 DataType::CypherValue => {
5192 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5193 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5194 })?;
5195 Ok(Value::from(json_val))
5196 }
5197 DataType::Vector { .. } => {
5198 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5199 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5200 })?;
5201 Ok(Value::Vector(v))
5202 }
5203 _ => Ok(Value::String(s.to_string())),
5204 }
5205 }
5206
5207 pub(crate) async fn detach_delete_vertex(
5208 &self,
5209 vid: Vid,
5210 writer: &mut Writer,
5211 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5212 ) -> Result<()> {
5213 let schema = self.storage.schema_manager().schema();
5214 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5215
5216 let out_graph = self
5218 .storage
5219 .load_subgraph_cached(
5220 &[vid],
5221 &edge_type_ids,
5222 1,
5223 uni_store::runtime::Direction::Outgoing,
5224 Some(writer.l0_manager.get_current()),
5225 )
5226 .await?;
5227
5228 for edge in out_graph.edges() {
5229 writer
5230 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5231 .await?;
5232 }
5233
5234 let in_graph = self
5236 .storage
5237 .load_subgraph_cached(
5238 &[vid],
5239 &edge_type_ids,
5240 1,
5241 uni_store::runtime::Direction::Incoming,
5242 Some(writer.l0_manager.get_current()),
5243 )
5244 .await?;
5245
5246 for edge in in_graph.edges() {
5247 writer
5248 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5249 .await?;
5250 }
5251
5252 Ok(())
5253 }
5254
5255 pub(crate) async fn batch_detach_delete_vertices(
5257 &self,
5258 vids: &[Vid],
5259 labels_per_vid: Vec<Option<Vec<String>>>,
5260 writer: &mut Writer,
5261 tx_l0: Option<&Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
5262 ) -> Result<()> {
5263 let schema = self.storage.schema_manager().schema();
5264 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5265
5266 let out_graph = self
5268 .storage
5269 .load_subgraph_cached(
5270 vids,
5271 &edge_type_ids,
5272 1,
5273 uni_store::runtime::Direction::Outgoing,
5274 Some(writer.l0_manager.get_current()),
5275 )
5276 .await?;
5277
5278 for edge in out_graph.edges() {
5279 writer
5280 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5281 .await?;
5282 }
5283
5284 let in_graph = self
5286 .storage
5287 .load_subgraph_cached(
5288 vids,
5289 &edge_type_ids,
5290 1,
5291 uni_store::runtime::Direction::Incoming,
5292 Some(writer.l0_manager.get_current()),
5293 )
5294 .await?;
5295
5296 for edge in in_graph.edges() {
5297 writer
5298 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type, tx_l0)
5299 .await?;
5300 }
5301
5302 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5304 writer.delete_vertex(*vid, labels, tx_l0).await?;
5305 }
5306
5307 Ok(())
5308 }
5309}