1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54use uni_store::storage::index_manager::IndexManager;
55
56use crate::query::df_graph::L0Context;
58use crate::query::df_planner::HybridPhysicalPlanner;
59use datafusion::physical_plan::ExecutionPlanProperties;
60use datafusion::prelude::SessionContext;
61use parking_lot::RwLock as SyncRwLock;
62
63use arrow_array::{Array, RecordBatch};
64use csv;
65use parquet;
66
67use super::core::*;
68
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
73
74fn collect_l0_vids(
79 ctx: Option<&QueryContext>,
80 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
81) -> Vec<Vid> {
82 let mut vids = Vec::new();
83 if let Some(ctx) = ctx {
84 vids.extend(extractor(&ctx.l0.read()));
85 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
86 vids.extend(extractor(&tx_l0_arc.read()));
87 }
88 for pending_l0_arc in &ctx.pending_flush_l0s {
89 vids.extend(extractor(&pending_l0_arc.read()));
90 }
91 }
92 vids
93}
94
95async fn hydrate_entity_if_needed(
104 map: &mut HashMap<String, Value>,
105 prop_manager: &PropertyManager,
106 ctx: Option<&QueryContext>,
107) {
108 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
110 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
111 tracing::debug!(
112 "Pushdown fallback: hydrating edge {} at execution time",
113 eid_u64
114 );
115 if let Ok(Some(props)) = prop_manager
116 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
117 .await
118 {
119 for (key, value) in props {
120 map.entry(key).or_insert(value);
121 }
122 }
123 } else {
124 tracing::trace!(
125 "Pushdown success: edge {} already has {} properties",
126 eid_u64,
127 map.len() - EDGE_SYSTEM_FIELD_COUNT
128 );
129 }
130 return;
131 }
132
133 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
135 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
136 tracing::debug!(
137 "Pushdown fallback: hydrating vertex {} at execution time",
138 vid_u64
139 );
140 if let Ok(Some(props)) = prop_manager
141 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
142 .await
143 {
144 for (key, value) in props {
145 map.entry(key).or_insert(value);
146 }
147 }
148 } else {
149 tracing::trace!(
150 "Pushdown success: vertex {} already has {} properties",
151 vid_u64,
152 map.len() - VERTEX_SYSTEM_FIELD_COUNT
153 );
154 }
155 }
156}
157
158impl Executor {
159 async fn verify_and_filter_candidates(
164 &self,
165 mut candidates: Vec<Vid>,
166 variable: &str,
167 filter: Option<&Expr>,
168 ctx: Option<&QueryContext>,
169 prop_manager: &PropertyManager,
170 params: &HashMap<String, Value>,
171 ) -> Result<Vec<Vid>> {
172 candidates.sort_unstable();
173 candidates.dedup();
174
175 let mut verified_vids = Vec::new();
176 for vid in candidates {
177 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
178 continue; };
180
181 if let Some(expr) = filter {
182 let mut props_map: HashMap<String, Value> = props;
183 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
184
185 let mut row = HashMap::new();
186 row.insert(variable.to_string(), Value::Map(props_map));
187
188 let res = self
189 .evaluate_expr(expr, &row, prop_manager, params, ctx)
190 .await?;
191 if res.as_bool().unwrap_or(false) {
192 verified_vids.push(vid);
193 }
194 } else {
195 verified_vids.push(vid);
196 }
197 }
198
199 Ok(verified_vids)
200 }
201
202 pub(crate) async fn scan_storage_candidates(
203 &self,
204 label_id: u16,
205 variable: &str,
206 filter: Option<&Expr>,
207 ) -> Result<Vec<Vid>> {
208 let schema = self.storage.schema_manager().schema();
209 let label_name = schema
210 .label_name_by_id(label_id)
211 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
212
213 let ds = self.storage.vertex_dataset(label_name)?;
214 let lancedb_store = self.storage.lancedb_store();
215
216 match ds.open_lancedb(lancedb_store).await {
218 Ok(table) => {
219 use arrow_array::UInt64Array;
220 use futures::TryStreamExt;
221 use lancedb::query::{ExecutableQuery, QueryBase, Select};
222
223 let mut query = table.query();
224
225 let empty_props = std::collections::HashMap::new();
230 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
231 if let Some(expr) = filter
232 && let Some(sql) = LanceFilterGenerator::generate(
233 std::slice::from_ref(expr),
234 variable,
235 Some(label_props),
236 )
237 {
238 query = query.only_if(format!("_deleted = false AND ({})", sql));
239 } else {
240 query = query.only_if("_deleted = false");
241 }
242
243 let query = query.select(Select::columns(&["_vid"]));
245 let stream = query.execute().await?;
246 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
247
248 let mut vids = Vec::new();
249 for batch in batches {
250 let vid_col = batch
251 .column_by_name("_vid")
252 .ok_or(anyhow!("Missing _vid"))?
253 .as_any()
254 .downcast_ref::<UInt64Array>()
255 .ok_or(anyhow!("Invalid _vid"))?;
256 for i in 0..batch.num_rows() {
257 vids.push(Vid::from(vid_col.value(i)));
258 }
259 }
260 Ok(vids)
261 }
262 Err(e) => {
263 let err_msg = e.to_string().to_lowercase();
266 if err_msg.contains("not found")
267 || err_msg.contains("does not exist")
268 || err_msg.contains("no such file")
269 || err_msg.contains("object not found")
270 {
271 Ok(Vec::new())
272 } else {
273 Err(e)
274 }
275 }
276 }
277 }
278
279 pub(crate) async fn scan_label_with_filter(
280 &self,
281 label_id: u16,
282 variable: &str,
283 filter: Option<&Expr>,
284 ctx: Option<&QueryContext>,
285 prop_manager: &PropertyManager,
286 params: &HashMap<String, Value>,
287 ) -> Result<Vec<Vid>> {
288 let mut candidates = self
289 .scan_storage_candidates(label_id, variable, filter)
290 .await?;
291
292 let schema = self.storage.schema_manager().schema();
294 if let Some(label_name) = schema.label_name_by_id(label_id) {
295 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
296 }
297
298 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
299 .await
300 }
301
302 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
303 if let Value::Node(node) = val {
305 return Ok(node.vid);
306 }
307 if let Value::Map(map) = val
309 && let Some(vid_val) = map.get("_vid")
310 && let Some(v) = vid_val.as_u64()
311 {
312 return Ok(Vid::from(v));
313 }
314 if let Some(s) = val.as_str()
316 && let Ok(id) = s.parse::<u64>()
317 {
318 return Ok(Vid::new(id));
319 }
320 if let Some(v) = val.as_u64() {
322 return Ok(Vid::from(v));
323 }
324 Err(anyhow!("Invalid Vid format: {:?}", val))
325 }
326
327 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
333 for val in row.values() {
334 if let Ok(vid) = Self::vid_from_value(val)
335 && vid == target_vid
336 {
337 return val.clone();
338 }
339 }
340 Value::Map(HashMap::from([(
342 "_vid".to_string(),
343 Value::Int(target_vid.as_u64() as i64),
344 )]))
345 }
346
347 pub async fn create_datafusion_planner(
352 &self,
353 prop_manager: &PropertyManager,
354 params: &HashMap<String, Value>,
355 ) -> Result<(
356 Arc<SyncRwLock<SessionContext>>,
357 HybridPhysicalPlanner,
358 Arc<PropertyManager>,
359 )> {
360 let query_ctx = self.get_context().await;
361 let l0_context = match query_ctx {
362 Some(ref ctx) => L0Context::from_query_context(ctx),
363 None => L0Context::empty(),
364 };
365
366 let prop_manager_arc = Arc::new(PropertyManager::new(
367 self.storage.clone(),
368 self.storage.schema_manager_arc(),
369 prop_manager.cache_size(),
370 ));
371
372 let session = SessionContext::new();
373 crate::query::df_udfs::register_cypher_udfs(&session)?;
374 let session_ctx = Arc::new(SyncRwLock::new(session));
375
376 let mut planner = HybridPhysicalPlanner::with_l0_context(
377 session_ctx.clone(),
378 self.storage.clone(),
379 l0_context,
380 prop_manager_arc.clone(),
381 self.storage.schema_manager().schema(),
382 params.clone(),
383 HashMap::new(),
384 );
385
386 planner = planner.with_algo_registry(self.algo_registry.clone());
387 if let Some(ref registry) = self.procedure_registry {
388 planner = planner.with_procedure_registry(registry.clone());
389 }
390 if let Some(ref xervo_runtime) = self.xervo_runtime {
391 planner = planner.with_xervo_runtime(xervo_runtime.clone());
392 }
393
394 Ok((session_ctx, planner, prop_manager_arc))
395 }
396
397 pub fn collect_batches(
399 session_ctx: &Arc<SyncRwLock<SessionContext>>,
400 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
401 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
402 Box::pin(async move {
403 use futures::TryStreamExt;
404
405 let task_ctx = session_ctx.read().task_ctx();
406 let partition_count = execution_plan.output_partitioning().partition_count();
407 let mut all_batches = Vec::new();
408 for partition in 0..partition_count {
409 let stream = execution_plan.execute(partition, task_ctx.clone())?;
410 let batches: Vec<RecordBatch> = stream.try_collect().await?;
411 all_batches.extend(batches);
412 }
413 Ok(all_batches)
414 })
415 }
416
417 pub async fn execute_datafusion(
422 &self,
423 plan: LogicalPlan,
424 prop_manager: &PropertyManager,
425 params: &HashMap<String, Value>,
426 ) -> Result<Vec<RecordBatch>> {
427 let (session_ctx, mut planner, prop_manager_arc) =
428 self.create_datafusion_planner(prop_manager, params).await?;
429
430 if Self::contains_write_operations(&plan) {
432 let writer = self
433 .writer
434 .as_ref()
435 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
436 .clone();
437 let query_ctx = self.get_context().await;
438
439 debug_assert!(
440 query_ctx.is_some(),
441 "BUG: query_ctx is None for write operation"
442 );
443
444 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
445 executor: self.clone(),
446 writer,
447 prop_manager: prop_manager_arc,
448 params: params.clone(),
449 query_ctx,
450 });
451 planner = planner.with_mutation_context(mutation_ctx);
452 tracing::debug!(
453 plan_type = Self::get_plan_type(&plan),
454 "Mutation routed to DataFusion engine"
455 );
456 }
457
458 let execution_plan = planner.plan(&plan)?;
459 let result = Self::collect_batches(&session_ctx, execution_plan).await;
460
461 let graph_warnings = planner.graph_ctx().take_warnings();
463 if !graph_warnings.is_empty()
464 && let Ok(mut w) = self.warnings.lock()
465 {
466 w.extend(graph_warnings);
467 }
468
469 result
470 }
471
472 pub(crate) async fn execute_merge_read_plan(
478 &self,
479 plan: LogicalPlan,
480 prop_manager: &PropertyManager,
481 params: &HashMap<String, Value>,
482 merge_variables: Vec<String>,
483 ) -> Result<Vec<HashMap<String, Value>>> {
484 let (session_ctx, planner, _prop_manager_arc) =
485 self.create_datafusion_planner(prop_manager, params).await?;
486
487 let extra: HashMap<String, HashSet<String>> = merge_variables
490 .iter()
491 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
492 .collect();
493 let execution_plan = planner.plan_with_properties(&plan, extra)?;
494 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
495
496 let flat_rows = self.record_batches_to_rows(all_batches)?;
498
499 let rows = flat_rows
502 .into_iter()
503 .map(|mut row| {
504 for var in &merge_variables {
505 if row.contains_key(var) {
507 continue;
508 }
509 let prefix = format!("{}.", var);
510 let dotted_keys: Vec<String> = row
511 .keys()
512 .filter(|k| k.starts_with(&prefix))
513 .cloned()
514 .collect();
515 if !dotted_keys.is_empty() {
516 let mut map = HashMap::new();
517 for key in dotted_keys {
518 let prop_name = key[prefix.len()..].to_string();
519 if let Some(val) = row.remove(&key) {
520 map.insert(prop_name, val);
521 }
522 }
523 row.insert(var.clone(), Value::Map(map));
524 }
525 }
526 row
527 })
528 .collect();
529
530 Ok(rows)
531 }
532
533 fn record_batches_to_rows(
540 &self,
541 batches: Vec<RecordBatch>,
542 ) -> Result<Vec<HashMap<String, Value>>> {
543 let mut rows = Vec::new();
544
545 for batch in batches {
546 let num_rows = batch.num_rows();
547 let schema = batch.schema();
548
549 for row_idx in 0..num_rows {
550 let mut row = HashMap::new();
551
552 for (col_idx, field) in schema.fields().iter().enumerate() {
553 let column = batch.column(col_idx);
554 let data_type =
556 if uni_common::core::schema::is_datetime_struct(field.data_type()) {
557 Some(&uni_common::DataType::DateTime)
558 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
559 Some(&uni_common::DataType::Time)
560 } else {
561 None
562 };
563 let mut value =
564 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
565
566 if field
569 .metadata()
570 .get("cv_encoded")
571 .is_some_and(|v| v == "true")
572 && let Value::String(s) = &value
573 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
574 {
575 value = Value::from(parsed);
576 }
577
578 value = Self::normalize_path_if_needed(value);
580
581 row.insert(field.name().clone(), value);
582 }
583
584 let bare_vars: Vec<String> = row
593 .keys()
594 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
595 .cloned()
596 .collect();
597
598 let vid_placeholder_vars: Vec<String> = row
602 .keys()
603 .filter(|k| {
604 !k.contains('.')
605 && matches!(row.get(*k), Some(Value::String(_)))
606 && row.contains_key(&format!("{}._vid", k))
607 })
608 .cloned()
609 .collect();
610
611 for var in &vid_placeholder_vars {
612 let prefix = format!("{}.", var);
614 let mut map = HashMap::new();
615
616 let dotted_keys: Vec<String> = row
617 .keys()
618 .filter(|k| k.starts_with(&prefix))
619 .cloned()
620 .collect();
621
622 for key in &dotted_keys {
623 let prop_name = &key[prefix.len()..];
624 if let Some(val) = row.remove(key) {
625 map.insert(prop_name.to_string(), val);
626 }
627 }
628
629 row.insert(var.clone(), Value::Map(map));
631 }
632
633 for var in &bare_vars {
634 let vid_key = format!("{}._vid", var);
636 let labels_key = format!("{}._labels", var);
637
638 let vid_val = row.remove(&vid_key);
639 let labels_val = row.remove(&labels_key);
640
641 if let Some(Value::Map(map)) = row.get_mut(var) {
642 if let Some(v) = vid_val {
643 map.insert("_vid".to_string(), v);
644 }
645 if let Some(v) = labels_val {
646 map.insert("_labels".to_string(), v);
647 }
648 }
649
650 let eid_key = format!("{}._eid", var);
655 let type_key = format!("{}._type", var);
656
657 let eid_val = row.remove(&eid_key);
658 let type_val = row.remove(&type_key);
659
660 if (eid_val.is_some() || type_val.is_some())
661 && let Some(Value::Map(map)) = row.get_mut(var)
662 {
663 if let Some(v) = eid_val {
664 map.entry("_eid".to_string()).or_insert(v);
665 }
666 if let Some(v) = type_val {
667 map.entry("_type".to_string()).or_insert(v);
668 }
669 }
670
671 let prefix = format!("{}.", var);
673 let helper_keys: Vec<String> = row
674 .keys()
675 .filter(|k| k.starts_with(&prefix))
676 .cloned()
677 .collect();
678 for key in helper_keys {
679 row.remove(&key);
680 }
681 }
682
683 rows.push(row);
684 }
685 }
686
687 Ok(rows)
688 }
689
690 fn normalize_path_if_needed(value: Value) -> Value {
695 match value {
696 Value::Map(map)
697 if map.contains_key("nodes")
698 && (map.contains_key("relationships") || map.contains_key("edges")) =>
699 {
700 Self::normalize_path_map(map)
701 }
702 other => other,
703 }
704 }
705
706 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
708 if let Some(Value::List(nodes)) = map.remove("nodes") {
710 let normalized_nodes: Vec<Value> = nodes
711 .into_iter()
712 .map(|n| {
713 if let Value::Map(node_map) = n {
714 Self::normalize_path_node_map(node_map)
715 } else {
716 n
717 }
718 })
719 .collect();
720 map.insert("nodes".to_string(), Value::List(normalized_nodes));
721 }
722
723 let rels_key = if map.contains_key("relationships") {
725 "relationships"
726 } else {
727 "edges"
728 };
729 if let Some(Value::List(rels)) = map.remove(rels_key) {
730 let normalized_rels: Vec<Value> = rels
731 .into_iter()
732 .map(|r| {
733 if let Value::Map(rel_map) = r {
734 Self::normalize_path_edge_map(rel_map)
735 } else {
736 r
737 }
738 })
739 .collect();
740 map.insert("relationships".to_string(), Value::List(normalized_rels));
741 }
742
743 Value::Map(map)
744 }
745
746 fn value_to_id_string(val: Value) -> String {
748 match val {
749 Value::Int(n) => n.to_string(),
750 Value::Float(n) => n.to_string(),
751 Value::String(s) => s,
752 other => other.to_string(),
753 }
754 }
755
756 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
759 if let Some(val) = map.remove(src_key) {
760 map.insert(
761 dst_key.to_string(),
762 Value::String(Self::value_to_id_string(val)),
763 );
764 }
765 }
766
767 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
769 match map.get("properties") {
770 Some(props) if !props.is_null() => {}
771 _ => {
772 map.insert("properties".to_string(), Value::Map(HashMap::new()));
773 }
774 }
775 }
776
777 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
779 Self::stringify_map_field(&mut map, "_vid", "_id");
780 Self::ensure_properties_map(&mut map);
781 Value::Map(map)
782 }
783
784 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
786 Self::stringify_map_field(&mut map, "_eid", "_id");
787 Self::stringify_map_field(&mut map, "_src", "_src");
788 Self::stringify_map_field(&mut map, "_dst", "_dst");
789
790 if let Some(type_name) = map.remove("_type_name") {
791 map.insert("_type".to_string(), type_name);
792 }
793
794 Self::ensure_properties_map(&mut map);
795 Value::Map(map)
796 }
797
798 #[instrument(
799 skip(self, prop_manager, params),
800 fields(rows_returned, duration_ms),
801 level = "info"
802 )]
803 pub fn execute<'a>(
804 &'a self,
805 plan: LogicalPlan,
806 prop_manager: &'a PropertyManager,
807 params: &'a HashMap<String, Value>,
808 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
809 Box::pin(async move {
810 let query_type = Self::get_plan_type(&plan);
811 let ctx = self.get_context().await;
812 let start = Instant::now();
813
814 let res = if Self::is_ddl_or_admin(&plan) {
817 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
818 .await
819 } else {
820 let batches = self
821 .execute_datafusion(plan.clone(), prop_manager, params)
822 .await?;
823 self.record_batches_to_rows(batches)
824 };
825
826 let duration = start.elapsed();
827 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
828 .record(duration.as_secs_f64());
829
830 tracing::Span::current().record("duration_ms", duration.as_millis());
831 match &res {
832 Ok(rows) => {
833 tracing::Span::current().record("rows_returned", rows.len());
834 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
835 .increment(rows.len() as u64);
836 }
837 Err(e) => {
838 let error_type = if e.to_string().contains("timed out") {
839 "timeout"
840 } else if e.to_string().contains("syntax") {
841 "syntax"
842 } else {
843 "execution"
844 };
845 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
846 }
847 }
848
849 res
850 })
851 }
852
853 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
854 match plan {
855 LogicalPlan::Scan { .. } => "read_scan",
856 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
857 LogicalPlan::Traverse { .. } => "read_traverse",
858 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
859 LogicalPlan::ScanAll { .. } => "read_scan_all",
860 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
861 LogicalPlan::VectorKnn { .. } => "read_vector",
862 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
863 LogicalPlan::Merge { .. } => "write_merge",
864 LogicalPlan::Delete { .. } => "write_delete",
865 LogicalPlan::Set { .. } => "write_set",
866 LogicalPlan::Remove { .. } => "write_remove",
867 LogicalPlan::ProcedureCall { .. } => "call",
868 LogicalPlan::Copy { .. } => "copy",
869 LogicalPlan::Backup { .. } => "backup",
870 _ => "other",
871 }
872 }
873
874 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
884 match plan {
885 LogicalPlan::Project { input, .. }
887 | LogicalPlan::Sort { input, .. }
888 | LogicalPlan::Limit { input, .. }
889 | LogicalPlan::Distinct { input }
890 | LogicalPlan::Aggregate { input, .. }
891 | LogicalPlan::Window { input, .. }
892 | LogicalPlan::Unwind { input, .. }
893 | LogicalPlan::Filter { input, .. }
894 | LogicalPlan::Create { input, .. }
895 | LogicalPlan::CreateBatch { input, .. }
896 | LogicalPlan::Set { input, .. }
897 | LogicalPlan::Remove { input, .. }
898 | LogicalPlan::Delete { input, .. }
899 | LogicalPlan::Merge { input, .. }
900 | LogicalPlan::Foreach { input, .. }
901 | LogicalPlan::Traverse { input, .. }
902 | LogicalPlan::TraverseMainByType { input, .. }
903 | LogicalPlan::BindZeroLengthPath { input, .. }
904 | LogicalPlan::BindPath { input, .. }
905 | LogicalPlan::ShortestPath { input, .. }
906 | LogicalPlan::AllShortestPaths { input, .. }
907 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
908
909 LogicalPlan::Apply {
911 input, subquery, ..
912 }
913 | LogicalPlan::SubqueryCall { input, subquery } => {
914 vec![input.as_ref(), subquery.as_ref()]
915 }
916 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
917 vec![left.as_ref(), right.as_ref()]
918 }
919 LogicalPlan::RecursiveCTE {
920 initial, recursive, ..
921 } => vec![initial.as_ref(), recursive.as_ref()],
922 LogicalPlan::QuantifiedPattern {
923 input,
924 pattern_plan,
925 ..
926 } => vec![input.as_ref(), pattern_plan.as_ref()],
927
928 _ => vec![],
930 }
931 }
932
933 fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
940 match plan {
941 LogicalPlan::CreateLabel(_)
943 | LogicalPlan::CreateEdgeType(_)
944 | LogicalPlan::AlterLabel(_)
945 | LogicalPlan::AlterEdgeType(_)
946 | LogicalPlan::DropLabel(_)
947 | LogicalPlan::DropEdgeType(_)
948 | LogicalPlan::CreateConstraint(_)
949 | LogicalPlan::DropConstraint(_)
950 | LogicalPlan::ShowConstraints(_) => true,
951
952 LogicalPlan::CreateVectorIndex { .. }
954 | LogicalPlan::CreateFullTextIndex { .. }
955 | LogicalPlan::CreateScalarIndex { .. }
956 | LogicalPlan::CreateJsonFtsIndex { .. }
957 | LogicalPlan::DropIndex { .. }
958 | LogicalPlan::ShowIndexes { .. } => true,
959
960 LogicalPlan::ShowDatabase
962 | LogicalPlan::ShowConfig
963 | LogicalPlan::ShowStatistics
964 | LogicalPlan::Vacuum
965 | LogicalPlan::Checkpoint
966 | LogicalPlan::Begin
967 | LogicalPlan::Commit
968 | LogicalPlan::Rollback
969 | LogicalPlan::Copy { .. }
970 | LogicalPlan::CopyTo { .. }
971 | LogicalPlan::CopyFrom { .. }
972 | LogicalPlan::Backup { .. }
973 | LogicalPlan::Explain { .. } => true,
974
975 LogicalPlan::ProcedureCall { procedure_name, .. } => {
978 !Self::is_df_eligible_procedure(procedure_name)
979 }
980
981 _ => Self::plan_children(plan)
983 .iter()
984 .any(|child| Self::is_ddl_or_admin(child)),
985 }
986 }
987
988 fn is_df_eligible_procedure(name: &str) -> bool {
994 matches!(
995 name,
996 "uni.schema.labels"
997 | "uni.schema.edgeTypes"
998 | "uni.schema.relationshipTypes"
999 | "uni.schema.indexes"
1000 | "uni.schema.constraints"
1001 | "uni.schema.labelInfo"
1002 | "uni.vector.query"
1003 | "uni.fts.query"
1004 | "uni.search"
1005 ) || name.starts_with("uni.algo.")
1006 }
1007
1008 fn contains_write_operations(plan: &LogicalPlan) -> bool {
1015 match plan {
1016 LogicalPlan::Create { .. }
1017 | LogicalPlan::CreateBatch { .. }
1018 | LogicalPlan::Merge { .. }
1019 | LogicalPlan::Delete { .. }
1020 | LogicalPlan::Set { .. }
1021 | LogicalPlan::Remove { .. }
1022 | LogicalPlan::Foreach { .. } => true,
1023 _ => Self::plan_children(plan)
1024 .iter()
1025 .any(|child| Self::contains_write_operations(child)),
1026 }
1027 }
1028
1029 pub fn execute_stream(
1034 self,
1035 plan: LogicalPlan,
1036 prop_manager: Arc<PropertyManager>,
1037 params: HashMap<String, Value>,
1038 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1039 let this = self;
1040 let this_for_ctx = this.clone();
1041
1042 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1043
1044 ctx_stream
1045 .flat_map(move |ctx| {
1046 let plan = plan.clone();
1047 let this = this.clone();
1048 let prop_manager = prop_manager.clone();
1049 let params = params.clone();
1050
1051 let fut = async move {
1052 if Self::is_ddl_or_admin(&plan) {
1053 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1054 .await
1055 } else {
1056 let batches = this
1057 .execute_datafusion(plan, &prop_manager, ¶ms)
1058 .await?;
1059 this.record_batches_to_rows(batches)
1060 }
1061 };
1062 stream::once(fut).boxed()
1063 })
1064 .boxed()
1065 }
1066
1067 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1070 arrow_convert::arrow_to_value(col, row, None)
1071 }
1072
1073 pub(crate) fn evaluate_expr<'a>(
1074 &'a self,
1075 expr: &'a Expr,
1076 row: &'a HashMap<String, Value>,
1077 prop_manager: &'a PropertyManager,
1078 params: &'a HashMap<String, Value>,
1079 ctx: Option<&'a QueryContext>,
1080 ) -> BoxFuture<'a, Result<Value>> {
1081 let this = self;
1082 Box::pin(async move {
1083 let repr = expr.to_string_repr();
1085 if let Some(val) = row.get(&repr) {
1086 return Ok(val.clone());
1087 }
1088
1089 match expr {
1090 Expr::PatternComprehension { .. } => {
1091 Err(anyhow::anyhow!(
1093 "Pattern comprehensions are handled by DataFusion executor"
1094 ))
1095 }
1096 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1097 "COLLECT subqueries not yet supported in executor"
1098 )),
1099 Expr::Variable(name) => {
1100 if let Some(val) = row.get(name) {
1101 Ok(val.clone())
1102 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1103 Ok(vid_val.clone())
1107 } else {
1108 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1109 }
1110 }
1111 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1112 Expr::Property(var_expr, prop_name) => {
1113 if let Expr::Variable(var_name) = var_expr.as_ref() {
1117 let flat_key = format!("{}.{}", var_name, prop_name);
1118 if let Some(val) = row.get(flat_key.as_str()) {
1119 return Ok(val.clone());
1120 }
1121 }
1122
1123 let base_val = this
1124 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1125 .await?;
1126
1127 if (prop_name == "_vid" || prop_name == "_id")
1129 && let Ok(vid) = Self::vid_from_value(&base_val)
1130 {
1131 return Ok(Value::Int(vid.as_u64() as i64));
1132 }
1133
1134 if let Value::Node(node) = &base_val {
1136 if prop_name == "_vid" || prop_name == "_id" {
1138 return Ok(Value::Int(node.vid.as_u64() as i64));
1139 }
1140 if prop_name == "_labels" {
1141 return Ok(Value::List(
1142 node.labels
1143 .iter()
1144 .map(|l| Value::String(l.clone()))
1145 .collect(),
1146 ));
1147 }
1148 if let Some(val) = node.properties.get(prop_name.as_str()) {
1150 return Ok(val.clone());
1151 }
1152 if let Ok(val) = prop_manager
1154 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1155 .await
1156 {
1157 return Ok(val);
1158 }
1159 return Ok(Value::Null);
1160 }
1161
1162 if let Value::Edge(edge) = &base_val {
1164 if prop_name == "_eid" || prop_name == "_id" {
1166 return Ok(Value::Int(edge.eid.as_u64() as i64));
1167 }
1168 if prop_name == "_type" {
1169 return Ok(Value::String(edge.edge_type.clone()));
1170 }
1171 if prop_name == "_src" {
1172 return Ok(Value::Int(edge.src.as_u64() as i64));
1173 }
1174 if prop_name == "_dst" {
1175 return Ok(Value::Int(edge.dst.as_u64() as i64));
1176 }
1177 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1179 return Ok(val.clone());
1180 }
1181 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1183 {
1184 return Ok(val);
1185 }
1186 return Ok(Value::Null);
1187 }
1188
1189 if let Value::Map(map) = &base_val {
1192 if let Some(val) = map.get(prop_name.as_str()) {
1194 return Ok(val.clone());
1195 }
1196 if let Some(Value::Map(props)) = map.get("properties")
1198 && let Some(val) = props.get(prop_name.as_str())
1199 {
1200 return Ok(val.clone());
1201 }
1202 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1204 map.get("_id")
1205 .and_then(|v| v.as_str())
1206 .and_then(|s| s.parse::<u64>().ok())
1207 });
1208 if let Some(id) = vid_opt {
1209 let vid = Vid::from(id);
1210 if let Ok(val) = prop_manager
1211 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1212 .await
1213 {
1214 return Ok(val);
1215 }
1216 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1217 let eid = uni_common::core::id::Eid::from(id);
1218 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1219 return Ok(val);
1220 }
1221 }
1222 return Ok(Value::Null);
1223 }
1224
1225 if let Ok(vid) = Self::vid_from_value(&base_val) {
1227 return prop_manager
1228 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1229 .await;
1230 }
1231
1232 if base_val.is_null() {
1233 return Ok(Value::Null);
1234 }
1235
1236 {
1238 use crate::query::datetime::{
1239 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1240 is_duration_string, is_temporal_accessor, is_temporal_string,
1241 };
1242
1243 if let Value::Temporal(tv) = &base_val {
1245 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1246 if is_duration_accessor(prop_name) {
1247 return eval_duration_accessor(
1249 &base_val.to_string(),
1250 prop_name,
1251 );
1252 }
1253 } else if is_temporal_accessor(prop_name) {
1254 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1255 }
1256 }
1257
1258 if let Value::String(s) = &base_val {
1260 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1261 return eval_temporal_accessor(s, prop_name);
1262 }
1263 if is_duration_string(s) && is_duration_accessor(prop_name) {
1264 return eval_duration_accessor(s, prop_name);
1265 }
1266 }
1267 }
1268
1269 Err(anyhow!(
1270 "Cannot access property '{}' on {:?}",
1271 prop_name,
1272 base_val
1273 ))
1274 }
1275 Expr::ArrayIndex {
1276 array: arr_expr,
1277 index: idx_expr,
1278 } => {
1279 let arr_val = this
1280 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1281 .await?;
1282 let idx_val = this
1283 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1284 .await?;
1285
1286 if let Value::List(arr) = &arr_val {
1287 if let Some(i) = idx_val.as_i64() {
1289 let idx = if i < 0 {
1290 let positive_idx = arr.len() as i64 + i;
1292 if positive_idx < 0 {
1293 return Ok(Value::Null); }
1295 positive_idx as usize
1296 } else {
1297 i as usize
1298 };
1299 if idx < arr.len() {
1300 return Ok(arr[idx].clone());
1301 }
1302 return Ok(Value::Null);
1303 } else if idx_val.is_null() {
1304 return Ok(Value::Null);
1305 } else {
1306 return Err(anyhow::anyhow!(
1307 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1308 idx_val
1309 ));
1310 }
1311 }
1312 if let Value::Map(map) = &arr_val {
1313 if let Some(key) = idx_val.as_str() {
1314 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1315 } else if !idx_val.is_null() {
1316 return Err(anyhow::anyhow!(
1317 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1318 idx_val
1319 ));
1320 }
1321 }
1322 if let Value::Node(node) = &arr_val {
1324 if let Some(key) = idx_val.as_str() {
1325 if let Some(val) = node.properties.get(key) {
1327 return Ok(val.clone());
1328 }
1329 if let Ok(val) = prop_manager
1331 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1332 .await
1333 {
1334 return Ok(val);
1335 }
1336 return Ok(Value::Null);
1337 } else if !idx_val.is_null() {
1338 return Err(anyhow::anyhow!(
1339 "TypeError: Node index must be a string, got: {:?}",
1340 idx_val
1341 ));
1342 }
1343 }
1344 if let Value::Edge(edge) = &arr_val {
1346 if let Some(key) = idx_val.as_str() {
1347 if let Some(val) = edge.properties.get(key) {
1349 return Ok(val.clone());
1350 }
1351 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1353 return Ok(val);
1354 }
1355 return Ok(Value::Null);
1356 } else if !idx_val.is_null() {
1357 return Err(anyhow::anyhow!(
1358 "TypeError: Edge index must be a string, got: {:?}",
1359 idx_val
1360 ));
1361 }
1362 }
1363 if let Ok(vid) = Self::vid_from_value(&arr_val)
1365 && let Some(key) = idx_val.as_str()
1366 {
1367 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1368 {
1369 return Ok(val);
1370 }
1371 return Ok(Value::Null);
1372 }
1373 if arr_val.is_null() {
1374 return Ok(Value::Null);
1375 }
1376 Err(anyhow!(
1377 "TypeError: InvalidArgumentType - cannot index into {:?}",
1378 arr_val
1379 ))
1380 }
1381 Expr::ArraySlice { array, start, end } => {
1382 let arr_val = this
1383 .evaluate_expr(array, row, prop_manager, params, ctx)
1384 .await?;
1385
1386 if let Value::List(arr) = &arr_val {
1387 let len = arr.len();
1388
1389 let start_idx = if let Some(s) = start {
1391 let v = this
1392 .evaluate_expr(s, row, prop_manager, params, ctx)
1393 .await?;
1394 if v.is_null() {
1395 return Ok(Value::Null);
1396 }
1397 let raw = v.as_i64().unwrap_or(0);
1398 if raw < 0 {
1399 (len as i64 + raw).max(0) as usize
1400 } else {
1401 (raw as usize).min(len)
1402 }
1403 } else {
1404 0
1405 };
1406
1407 let end_idx = if let Some(e) = end {
1409 let v = this
1410 .evaluate_expr(e, row, prop_manager, params, ctx)
1411 .await?;
1412 if v.is_null() {
1413 return Ok(Value::Null);
1414 }
1415 let raw = v.as_i64().unwrap_or(len as i64);
1416 if raw < 0 {
1417 (len as i64 + raw).max(0) as usize
1418 } else {
1419 (raw as usize).min(len)
1420 }
1421 } else {
1422 len
1423 };
1424
1425 if start_idx >= end_idx {
1427 return Ok(Value::List(vec![]));
1428 }
1429 let end_idx = end_idx.min(len);
1430 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1431 }
1432
1433 if arr_val.is_null() {
1434 return Ok(Value::Null);
1435 }
1436 Err(anyhow!("Cannot slice {:?}", arr_val))
1437 }
1438 Expr::Literal(lit) => Ok(lit.to_value()),
1439 Expr::List(items) => {
1440 let mut vals = Vec::new();
1441 for item in items {
1442 vals.push(
1443 this.evaluate_expr(item, row, prop_manager, params, ctx)
1444 .await?,
1445 );
1446 }
1447 Ok(Value::List(vals))
1448 }
1449 Expr::Map(items) => {
1450 let mut map = HashMap::new();
1451 for (key, value_expr) in items {
1452 let val = this
1453 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1454 .await?;
1455 map.insert(key.clone(), val);
1456 }
1457 Ok(Value::Map(map))
1458 }
1459 Expr::Exists { query, .. } => {
1460 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1462 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1463
1464 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1465 Ok(plan) => {
1466 let mut sub_params = params.clone();
1467 sub_params.extend(row.clone());
1468
1469 match this.execute(plan, prop_manager, &sub_params).await {
1470 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1471 Err(e) => {
1472 log::debug!("EXISTS subquery execution failed: {}", e);
1473 Ok(Value::Bool(false))
1474 }
1475 }
1476 }
1477 Err(e) => {
1478 log::debug!("EXISTS subquery planning failed: {}", e);
1479 Ok(Value::Bool(false))
1480 }
1481 }
1482 }
1483 Expr::CountSubquery(query) => {
1484 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1486
1487 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1488
1489 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1490 Ok(plan) => {
1491 let mut sub_params = params.clone();
1492 sub_params.extend(row.clone());
1493
1494 match this.execute(plan, prop_manager, &sub_params).await {
1495 Ok(results) => Ok(Value::from(results.len() as i64)),
1496 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1497 }
1498 }
1499 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1500 }
1501 }
1502 Expr::Quantifier {
1503 quantifier,
1504 variable,
1505 list,
1506 predicate,
1507 } => {
1508 let list_val = this
1522 .evaluate_expr(list, row, prop_manager, params, ctx)
1523 .await?;
1524
1525 if list_val.is_null() {
1527 return Ok(Value::Null);
1528 }
1529
1530 let items = match list_val {
1532 Value::List(arr) => arr,
1533 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1534 };
1535
1536 let mut satisfied_count = 0;
1538 for item in &items {
1539 let mut item_row = row.clone();
1541 item_row.insert(variable.clone(), item.clone());
1542
1543 let pred_result = this
1545 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1546 .await?;
1547
1548 if let Value::Bool(true) = pred_result {
1550 satisfied_count += 1;
1551 }
1552 }
1553
1554 let result = match quantifier {
1556 Quantifier::All => satisfied_count == items.len(),
1557 Quantifier::Any => satisfied_count > 0,
1558 Quantifier::Single => satisfied_count == 1,
1559 Quantifier::None => satisfied_count == 0,
1560 };
1561
1562 Ok(Value::Bool(result))
1563 }
1564 Expr::ListComprehension {
1565 variable,
1566 list,
1567 where_clause,
1568 map_expr,
1569 } => {
1570 let list_val = this
1577 .evaluate_expr(list, row, prop_manager, params, ctx)
1578 .await?;
1579
1580 if list_val.is_null() {
1582 return Ok(Value::Null);
1583 }
1584
1585 let items = match list_val {
1587 Value::List(arr) => arr,
1588 _ => {
1589 return Err(anyhow!(
1590 "List comprehension expects a list, got: {:?}",
1591 list_val
1592 ));
1593 }
1594 };
1595
1596 let mut results = Vec::new();
1598 for item in &items {
1599 let mut item_row = row.clone();
1601 item_row.insert(variable.clone(), item.clone());
1602
1603 if let Some(predicate) = where_clause {
1605 let pred_result = this
1606 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1607 .await?;
1608
1609 if !matches!(pred_result, Value::Bool(true)) {
1611 continue;
1612 }
1613 }
1614
1615 let mapped_val = this
1617 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1618 .await?;
1619 results.push(mapped_val);
1620 }
1621
1622 Ok(Value::List(results))
1623 }
1624 Expr::BinaryOp { left, op, right } => {
1625 match op {
1627 BinaryOp::And => {
1628 let l_val = this
1629 .evaluate_expr(left, row, prop_manager, params, ctx)
1630 .await?;
1631 if let Some(false) = l_val.as_bool() {
1633 return Ok(Value::Bool(false));
1634 }
1635 let r_val = this
1636 .evaluate_expr(right, row, prop_manager, params, ctx)
1637 .await?;
1638 eval_binary_op(&l_val, op, &r_val)
1639 }
1640 BinaryOp::Or => {
1641 let l_val = this
1642 .evaluate_expr(left, row, prop_manager, params, ctx)
1643 .await?;
1644 if let Some(true) = l_val.as_bool() {
1646 return Ok(Value::Bool(true));
1647 }
1648 let r_val = this
1649 .evaluate_expr(right, row, prop_manager, params, ctx)
1650 .await?;
1651 eval_binary_op(&l_val, op, &r_val)
1652 }
1653 _ => {
1654 let l_val = this
1656 .evaluate_expr(left, row, prop_manager, params, ctx)
1657 .await?;
1658 let r_val = this
1659 .evaluate_expr(right, row, prop_manager, params, ctx)
1660 .await?;
1661 eval_binary_op(&l_val, op, &r_val)
1662 }
1663 }
1664 }
1665 Expr::In { expr, list } => {
1666 let l_val = this
1667 .evaluate_expr(expr, row, prop_manager, params, ctx)
1668 .await?;
1669 let r_val = this
1670 .evaluate_expr(list, row, prop_manager, params, ctx)
1671 .await?;
1672 eval_in_op(&l_val, &r_val)
1673 }
1674 Expr::UnaryOp { op, expr } => {
1675 let val = this
1676 .evaluate_expr(expr, row, prop_manager, params, ctx)
1677 .await?;
1678 match op {
1679 UnaryOp::Not => {
1680 match val.as_bool() {
1682 Some(b) => Ok(Value::Bool(!b)),
1683 None if val.is_null() => Ok(Value::Null),
1684 None => Err(anyhow!(
1685 "InvalidArgumentType: NOT requires a boolean argument"
1686 )),
1687 }
1688 }
1689 UnaryOp::Neg => {
1690 if let Some(i) = val.as_i64() {
1691 Ok(Value::Int(-i))
1692 } else if let Some(f) = val.as_f64() {
1693 Ok(Value::Float(-f))
1694 } else {
1695 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1696 }
1697 }
1698 }
1699 }
1700 Expr::IsNull(expr) => {
1701 let val = this
1702 .evaluate_expr(expr, row, prop_manager, params, ctx)
1703 .await?;
1704 Ok(Value::Bool(val.is_null()))
1705 }
1706 Expr::IsNotNull(expr) => {
1707 let val = this
1708 .evaluate_expr(expr, row, prop_manager, params, ctx)
1709 .await?;
1710 Ok(Value::Bool(!val.is_null()))
1711 }
1712 Expr::IsUnique(_) => {
1713 Err(anyhow!(
1715 "IS UNIQUE can only be used in constraint definitions"
1716 ))
1717 }
1718 Expr::Case {
1719 expr,
1720 when_then,
1721 else_expr,
1722 } => {
1723 if let Some(base_expr) = expr {
1724 let base_val = this
1725 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1726 .await?;
1727 for (w, t) in when_then {
1728 let w_val = this
1729 .evaluate_expr(w, row, prop_manager, params, ctx)
1730 .await?;
1731 if base_val == w_val {
1732 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1733 }
1734 }
1735 } else {
1736 for (w, t) in when_then {
1737 let w_val = this
1738 .evaluate_expr(w, row, prop_manager, params, ctx)
1739 .await?;
1740 if w_val.as_bool() == Some(true) {
1741 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1742 }
1743 }
1744 }
1745 if let Some(e) = else_expr {
1746 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1747 }
1748 Ok(Value::Null)
1749 }
1750 Expr::Wildcard => Ok(Value::Null),
1751 Expr::FunctionCall { name, args, .. } => {
1752 if name.eq_ignore_ascii_case("ID") {
1754 if args.len() != 1 {
1755 return Err(anyhow!("id() requires exactly 1 argument"));
1756 }
1757 let val = this
1758 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1759 .await?;
1760 if let Value::Map(map) = &val {
1761 if let Some(vid_val) = map.get("_vid") {
1763 return Ok(vid_val.clone());
1764 }
1765 if let Some(eid_val) = map.get("_eid") {
1767 return Ok(eid_val.clone());
1768 }
1769 if let Some(id_val) = map.get("_id") {
1771 return Ok(id_val.clone());
1772 }
1773 }
1774 return Ok(Value::Null);
1775 }
1776
1777 if name.eq_ignore_ascii_case("ELEMENTID") {
1779 if args.len() != 1 {
1780 return Err(anyhow!("elementId() requires exactly 1 argument"));
1781 }
1782 let val = this
1783 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1784 .await?;
1785 if let Value::Map(map) = &val {
1786 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1789 return Ok(Value::String(vid_val.to_string()));
1790 }
1791 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1794 return Ok(Value::String(eid_val.to_string()));
1795 }
1796 }
1797 return Ok(Value::Null);
1798 }
1799
1800 if name.eq_ignore_ascii_case("TYPE") {
1802 if args.len() != 1 {
1803 return Err(anyhow!("type() requires exactly 1 argument"));
1804 }
1805 let val = this
1806 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1807 .await?;
1808 if let Value::Map(map) = &val
1809 && let Some(type_val) = map.get("_type")
1810 {
1811 if let Some(type_id) =
1813 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1814 {
1815 if let Some(name) = this
1816 .storage
1817 .schema_manager()
1818 .edge_type_name_by_id_unified(type_id)
1819 {
1820 return Ok(Value::String(name));
1821 }
1822 } else if let Some(name) = type_val.as_str() {
1823 return Ok(Value::String(name.to_string()));
1824 }
1825 }
1826 return Ok(Value::Null);
1827 }
1828
1829 if name.eq_ignore_ascii_case("LABELS") {
1831 if args.len() != 1 {
1832 return Err(anyhow!("labels() requires exactly 1 argument"));
1833 }
1834 let val = this
1835 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1836 .await?;
1837 if let Value::Map(map) = &val
1838 && let Some(labels_val) = map.get("_labels")
1839 {
1840 return Ok(labels_val.clone());
1841 }
1842 return Ok(Value::Null);
1843 }
1844
1845 if name.eq_ignore_ascii_case("PROPERTIES") {
1847 if args.len() != 1 {
1848 return Err(anyhow!("properties() requires exactly 1 argument"));
1849 }
1850 let val = this
1851 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1852 .await?;
1853 if let Value::Map(map) = &val {
1854 let mut props = HashMap::new();
1856 for (k, v) in map.iter() {
1857 if !k.starts_with('_') {
1858 props.insert(k.clone(), v.clone());
1859 }
1860 }
1861 return Ok(Value::Map(props));
1862 }
1863 return Ok(Value::Null);
1864 }
1865
1866 if name.eq_ignore_ascii_case("STARTNODE") {
1868 if args.len() != 1 {
1869 return Err(anyhow!("startNode() requires exactly 1 argument"));
1870 }
1871 let val = this
1872 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1873 .await?;
1874 if let Value::Edge(edge) = &val {
1875 return Ok(Self::find_node_by_vid(row, edge.src));
1876 }
1877 if let Value::Map(map) = &val {
1878 if let Some(start_node) = map.get("_startNode") {
1879 return Ok(start_node.clone());
1880 }
1881 if let Some(src_vid) = map.get("_src_vid") {
1882 return Ok(Value::Map(HashMap::from([(
1883 "_vid".to_string(),
1884 src_vid.clone(),
1885 )])));
1886 }
1887 if let Some(src_id) = map.get("_src")
1889 && let Some(u) = src_id.as_u64()
1890 {
1891 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1892 }
1893 }
1894 return Ok(Value::Null);
1895 }
1896
1897 if name.eq_ignore_ascii_case("ENDNODE") {
1899 if args.len() != 1 {
1900 return Err(anyhow!("endNode() requires exactly 1 argument"));
1901 }
1902 let val = this
1903 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1904 .await?;
1905 if let Value::Edge(edge) = &val {
1906 return Ok(Self::find_node_by_vid(row, edge.dst));
1907 }
1908 if let Value::Map(map) = &val {
1909 if let Some(end_node) = map.get("_endNode") {
1910 return Ok(end_node.clone());
1911 }
1912 if let Some(dst_vid) = map.get("_dst_vid") {
1913 return Ok(Value::Map(HashMap::from([(
1914 "_vid".to_string(),
1915 dst_vid.clone(),
1916 )])));
1917 }
1918 if let Some(dst_id) = map.get("_dst")
1920 && let Some(u) = dst_id.as_u64()
1921 {
1922 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1923 }
1924 }
1925 return Ok(Value::Null);
1926 }
1927
1928 if name.eq_ignore_ascii_case("HASLABEL") {
1931 if args.len() != 2 {
1932 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1933 }
1934 let node_val = this
1935 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1936 .await?;
1937 let label_val = this
1938 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1939 .await?;
1940
1941 let label_to_check = label_val.as_str().ok_or_else(|| {
1942 anyhow!("Second argument to hasLabel must be a string")
1943 })?;
1944
1945 let has_label = match &node_val {
1946 Value::Map(map) if map.contains_key("_vid") => {
1948 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1949 labels_arr
1950 .iter()
1951 .any(|l| l.as_str() == Some(label_to_check))
1952 } else {
1953 false
1954 }
1955 }
1956 Value::Map(map) => {
1958 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1959 labels_arr
1960 .iter()
1961 .any(|l| l.as_str() == Some(label_to_check))
1962 } else {
1963 false
1964 }
1965 }
1966 _ => false,
1967 };
1968 return Ok(Value::Bool(has_label));
1969 }
1970
1971 if matches!(
1974 name.to_uppercase().as_str(),
1975 "ANY" | "ALL" | "NONE" | "SINGLE"
1976 ) {
1977 return Err(anyhow!(
1978 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1979 name.to_lowercase()
1980 ));
1981 }
1982
1983 if name.eq_ignore_ascii_case("COALESCE") {
1985 for arg in args {
1986 let val = this
1987 .evaluate_expr(arg, row, prop_manager, params, ctx)
1988 .await?;
1989 if !val.is_null() {
1990 return Ok(val);
1991 }
1992 }
1993 return Ok(Value::Null);
1994 }
1995
1996 if name.eq_ignore_ascii_case("vector_similarity") {
1998 if args.len() != 2 {
1999 return Err(anyhow!("vector_similarity takes 2 arguments"));
2000 }
2001 let v1 = this
2002 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2003 .await?;
2004 let v2 = this
2005 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2006 .await?;
2007 return eval_vector_similarity(&v1, &v2);
2008 }
2009
2010 if name.eq_ignore_ascii_case("uni.temporal.validAt")
2012 || name.eq_ignore_ascii_case("uni.validAt")
2013 || name.eq_ignore_ascii_case("validAt")
2014 {
2015 if args.len() != 4 {
2016 return Err(anyhow!("validAt requires 4 arguments"));
2017 }
2018 let node_val = this
2019 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2020 .await?;
2021 let start_prop = this
2022 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2023 .await?
2024 .as_str()
2025 .ok_or(anyhow!("start_prop must be string"))?
2026 .to_string();
2027 let end_prop = this
2028 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2029 .await?
2030 .as_str()
2031 .ok_or(anyhow!("end_prop must be string"))?
2032 .to_string();
2033 let time_val = this
2034 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2035 .await?;
2036
2037 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2038 anyhow!("time argument must be a datetime value or string")
2039 })?;
2040
2041 let valid_from_val: Option<Value> = if let Ok(vid) =
2043 Self::vid_from_value(&node_val)
2044 {
2045 prop_manager
2047 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2048 .await
2049 .ok()
2050 } else if let Value::Map(map) = &node_val {
2051 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2053 let vid = Vid::from(vid_val);
2054 prop_manager
2055 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2056 .await
2057 .ok()
2058 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2059 let eid = uni_common::core::id::Eid::from(eid_val);
2061 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2062 } else {
2063 map.get(&start_prop).cloned()
2065 }
2066 } else {
2067 return Ok(Value::Bool(false));
2068 };
2069
2070 let valid_from = match valid_from_val {
2071 Some(ref v) => match value_to_datetime_utc(v) {
2072 Some(dt) => dt,
2073 None if v.is_null() => return Ok(Value::Bool(false)),
2074 None => {
2075 return Err(anyhow!(
2076 "Property {} must be a datetime value or string",
2077 start_prop
2078 ));
2079 }
2080 },
2081 None => return Ok(Value::Bool(false)),
2082 };
2083
2084 let valid_to_val: Option<Value> = if let Ok(vid) =
2085 Self::vid_from_value(&node_val)
2086 {
2087 prop_manager
2089 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2090 .await
2091 .ok()
2092 } else if let Value::Map(map) = &node_val {
2093 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2095 let vid = Vid::from(vid_val);
2096 prop_manager
2097 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2098 .await
2099 .ok()
2100 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2101 let eid = uni_common::core::id::Eid::from(eid_val);
2103 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2104 } else {
2105 map.get(&end_prop).cloned()
2107 }
2108 } else {
2109 return Ok(Value::Bool(false));
2110 };
2111
2112 let valid_to = match valid_to_val {
2113 Some(ref v) => match value_to_datetime_utc(v) {
2114 Some(dt) => Some(dt),
2115 None if v.is_null() => None,
2116 None => {
2117 return Err(anyhow!(
2118 "Property {} must be a datetime value or null",
2119 end_prop
2120 ));
2121 }
2122 },
2123 None => None,
2124 };
2125
2126 let is_valid = valid_from <= query_time
2127 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2128 return Ok(Value::Bool(is_valid));
2129 }
2130
2131 let mut evaluated_args = Vec::with_capacity(args.len());
2133 for arg in args {
2134 let mut val = this
2135 .evaluate_expr(arg, row, prop_manager, params, ctx)
2136 .await?;
2137
2138 if let Value::Map(ref mut map) = val {
2141 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2142 }
2143
2144 evaluated_args.push(val);
2145 }
2146 eval_scalar_function(name, &evaluated_args)
2147 }
2148 Expr::Reduce {
2149 accumulator,
2150 init,
2151 variable,
2152 list,
2153 expr,
2154 } => {
2155 let mut acc = self
2156 .evaluate_expr(init, row, prop_manager, params, ctx)
2157 .await?;
2158 let list_val = self
2159 .evaluate_expr(list, row, prop_manager, params, ctx)
2160 .await?;
2161
2162 if let Value::List(items) = list_val {
2163 for item in items {
2164 let mut scope = row.clone();
2168 scope.insert(accumulator.clone(), acc.clone());
2169 scope.insert(variable.clone(), item);
2170
2171 acc = self
2172 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2173 .await?;
2174 }
2175 } else {
2176 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2177 }
2178 Ok(acc)
2179 }
2180 Expr::ValidAt { .. } => {
2181 Err(anyhow!(
2183 "VALID_AT expression should have been transformed to function call in planner"
2184 ))
2185 }
2186
2187 Expr::LabelCheck { expr, labels } => {
2188 let val = this
2189 .evaluate_expr(expr, row, prop_manager, params, ctx)
2190 .await?;
2191 match &val {
2192 Value::Null => Ok(Value::Null),
2193 Value::Map(map) => {
2194 let is_edge = map.contains_key("_eid")
2196 || map.contains_key("_type_name")
2197 || (map.contains_key("_type") && !map.contains_key("_vid"));
2198
2199 if is_edge {
2200 if labels.len() > 1 {
2202 return Ok(Value::Bool(false));
2203 }
2204 let label_to_check = &labels[0];
2205 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2206 {
2207 t == label_to_check
2208 } else if let Some(Value::String(t)) = map.get("_type") {
2209 t == label_to_check
2210 } else {
2211 false
2212 };
2213 Ok(Value::Bool(has_type))
2214 } else {
2215 let has_all = labels.iter().all(|label_to_check| {
2217 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2218 labels_arr
2219 .iter()
2220 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2221 } else {
2222 false
2223 }
2224 });
2225 Ok(Value::Bool(has_all))
2226 }
2227 }
2228 _ => Ok(Value::Bool(false)),
2229 }
2230 }
2231
2232 Expr::MapProjection { base, items } => {
2233 let base_value = this
2234 .evaluate_expr(base, row, prop_manager, params, ctx)
2235 .await?;
2236
2237 let properties = match &base_value {
2239 Value::Map(map) => map,
2240 _ => {
2241 return Err(anyhow!(
2242 "Map projection requires object, got {:?}",
2243 base_value
2244 ));
2245 }
2246 };
2247
2248 let mut result_map = HashMap::new();
2249
2250 for item in items {
2251 match item {
2252 MapProjectionItem::Property(prop) => {
2253 if let Some(value) = properties.get(prop.as_str()) {
2254 result_map.insert(prop.clone(), value.clone());
2255 }
2256 }
2257 MapProjectionItem::AllProperties => {
2258 for (key, value) in properties.iter() {
2260 if !key.starts_with('_') {
2261 result_map.insert(key.clone(), value.clone());
2262 }
2263 }
2264 }
2265 MapProjectionItem::LiteralEntry(key, expr) => {
2266 let value = this
2267 .evaluate_expr(expr, row, prop_manager, params, ctx)
2268 .await?;
2269 result_map.insert(key.clone(), value);
2270 }
2271 MapProjectionItem::Variable(var_name) => {
2272 if let Some(value) = row.get(var_name.as_str()) {
2275 result_map.insert(var_name.clone(), value.clone());
2276 }
2277 }
2278 }
2279 }
2280
2281 Ok(Value::Map(result_map))
2282 }
2283 }
2284 })
2285 }
2286
2287 pub(crate) fn execute_subplan<'a>(
2288 &'a self,
2289 plan: LogicalPlan,
2290 prop_manager: &'a PropertyManager,
2291 params: &'a HashMap<String, Value>,
2292 ctx: Option<&'a QueryContext>,
2293 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2294 Box::pin(async move {
2295 if let Some(ctx) = ctx {
2296 ctx.check_timeout()?;
2297 }
2298 match plan {
2299 LogicalPlan::Union { left, right, all } => {
2300 self.execute_union(left, right, all, prop_manager, params, ctx)
2301 .await
2302 }
2303 LogicalPlan::CreateVectorIndex {
2304 config,
2305 if_not_exists,
2306 } => {
2307 if if_not_exists && self.index_exists_by_name(&config.name) {
2308 return Ok(vec![]);
2309 }
2310 let idx_mgr = IndexManager::new(
2311 self.storage.base_path(),
2312 self.storage.schema_manager_arc(),
2313 self.storage.lancedb_store_arc(),
2314 );
2315 idx_mgr.create_vector_index(config).await?;
2316 Ok(vec![])
2317 }
2318 LogicalPlan::CreateFullTextIndex {
2319 config,
2320 if_not_exists,
2321 } => {
2322 if if_not_exists && self.index_exists_by_name(&config.name) {
2323 return Ok(vec![]);
2324 }
2325 let idx_mgr = IndexManager::new(
2326 self.storage.base_path(),
2327 self.storage.schema_manager_arc(),
2328 self.storage.lancedb_store_arc(),
2329 );
2330 idx_mgr.create_fts_index(config).await?;
2331 Ok(vec![])
2332 }
2333 LogicalPlan::CreateScalarIndex {
2334 mut config,
2335 if_not_exists,
2336 } => {
2337 if if_not_exists && self.index_exists_by_name(&config.name) {
2338 return Ok(vec![]);
2339 }
2340
2341 let mut modified_properties = Vec::new();
2343
2344 for prop in &config.properties {
2345 if prop.contains('(') && prop.contains(')') {
2347 let gen_col = SchemaManager::generated_column_name(prop);
2348
2349 let sm = self.storage.schema_manager_arc();
2351 if let Err(e) = sm.add_generated_property(
2352 &config.label,
2353 &gen_col,
2354 DataType::String, prop.clone(),
2356 ) {
2357 log::warn!("Failed to add generated property (might exist): {}", e);
2358 }
2359
2360 modified_properties.push(gen_col);
2361 } else {
2362 modified_properties.push(prop.clone());
2364 }
2365 }
2366
2367 config.properties = modified_properties;
2368
2369 let idx_mgr = IndexManager::new(
2370 self.storage.base_path(),
2371 self.storage.schema_manager_arc(),
2372 self.storage.lancedb_store_arc(),
2373 );
2374 idx_mgr.create_scalar_index(config).await?;
2375 Ok(vec![])
2376 }
2377 LogicalPlan::CreateJsonFtsIndex {
2378 config,
2379 if_not_exists,
2380 } => {
2381 if if_not_exists && self.index_exists_by_name(&config.name) {
2382 return Ok(vec![]);
2383 }
2384 let idx_mgr = IndexManager::new(
2385 self.storage.base_path(),
2386 self.storage.schema_manager_arc(),
2387 self.storage.lancedb_store_arc(),
2388 );
2389 idx_mgr.create_json_fts_index(config).await?;
2390 Ok(vec![])
2391 }
2392 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2393 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2394 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2395 LogicalPlan::Vacuum => {
2396 self.execute_vacuum().await?;
2397 Ok(vec![])
2398 }
2399 LogicalPlan::Checkpoint => {
2400 self.execute_checkpoint().await?;
2401 Ok(vec![])
2402 }
2403 LogicalPlan::CopyTo {
2404 label,
2405 path,
2406 format,
2407 options,
2408 } => {
2409 let count = self
2410 .execute_copy_to(&label, &path, &format, &options)
2411 .await?;
2412 let mut result = HashMap::new();
2413 result.insert("count".to_string(), Value::Int(count as i64));
2414 Ok(vec![result])
2415 }
2416 LogicalPlan::CopyFrom {
2417 label,
2418 path,
2419 format,
2420 options,
2421 } => {
2422 let count = self
2423 .execute_copy_from(&label, &path, &format, &options)
2424 .await?;
2425 let mut result = HashMap::new();
2426 result.insert("count".to_string(), Value::Int(count as i64));
2427 Ok(vec![result])
2428 }
2429 LogicalPlan::CreateLabel(clause) => {
2430 self.execute_create_label(clause).await?;
2431 Ok(vec![])
2432 }
2433 LogicalPlan::CreateEdgeType(clause) => {
2434 self.execute_create_edge_type(clause).await?;
2435 Ok(vec![])
2436 }
2437 LogicalPlan::AlterLabel(clause) => {
2438 self.execute_alter_label(clause).await?;
2439 Ok(vec![])
2440 }
2441 LogicalPlan::AlterEdgeType(clause) => {
2442 self.execute_alter_edge_type(clause).await?;
2443 Ok(vec![])
2444 }
2445 LogicalPlan::DropLabel(clause) => {
2446 self.execute_drop_label(clause).await?;
2447 Ok(vec![])
2448 }
2449 LogicalPlan::DropEdgeType(clause) => {
2450 self.execute_drop_edge_type(clause).await?;
2451 Ok(vec![])
2452 }
2453 LogicalPlan::CreateConstraint(clause) => {
2454 self.execute_create_constraint(clause).await?;
2455 Ok(vec![])
2456 }
2457 LogicalPlan::DropConstraint(clause) => {
2458 self.execute_drop_constraint(clause).await?;
2459 Ok(vec![])
2460 }
2461 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2462 LogicalPlan::DropIndex { name, if_exists } => {
2463 let idx_mgr = IndexManager::new(
2464 self.storage.base_path(),
2465 self.storage.schema_manager_arc(),
2466 self.storage.lancedb_store_arc(),
2467 );
2468 match idx_mgr.drop_index(&name).await {
2469 Ok(_) => Ok(vec![]),
2470 Err(e) => {
2471 if if_exists && e.to_string().contains("not found") {
2472 Ok(vec![])
2473 } else {
2474 Err(e)
2475 }
2476 }
2477 }
2478 }
2479 LogicalPlan::ShowIndexes { filter } => {
2480 Ok(self.execute_show_indexes(filter.as_deref()))
2481 }
2482 LogicalPlan::Scan { .. }
2485 | LogicalPlan::ExtIdLookup { .. }
2486 | LogicalPlan::ScanAll { .. }
2487 | LogicalPlan::ScanMainByLabels { .. }
2488 | LogicalPlan::Traverse { .. }
2489 | LogicalPlan::TraverseMainByType { .. } => {
2490 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2491 self.record_batches_to_rows(batches)
2492 }
2493 LogicalPlan::Filter {
2494 input,
2495 predicate,
2496 optional_variables,
2497 } => {
2498 let input_matches = self
2499 .execute_subplan(*input, prop_manager, params, ctx)
2500 .await?;
2501
2502 tracing::debug!(
2503 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2504 predicate,
2505 input_matches.len(),
2506 optional_variables
2507 );
2508
2509 if !optional_variables.is_empty() {
2513 let is_optional_key = |k: &str| -> bool {
2516 optional_variables.contains(k)
2517 || optional_variables
2518 .iter()
2519 .any(|var| k.starts_with(&format!("{}.", var)))
2520 };
2521
2522 let is_internal_key =
2524 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2525
2526 let non_optional_vars: Vec<String> = input_matches
2528 .first()
2529 .map(|row| {
2530 row.keys()
2531 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2532 .cloned()
2533 .collect()
2534 })
2535 .unwrap_or_default();
2536
2537 let mut groups: std::collections::HashMap<
2539 Vec<u8>,
2540 Vec<HashMap<String, Value>>,
2541 > = std::collections::HashMap::new();
2542
2543 for row in &input_matches {
2544 let key: Vec<u8> = non_optional_vars
2546 .iter()
2547 .map(|var| {
2548 row.get(var).map(|v| format!("{v:?}")).unwrap_or_default()
2549 })
2550 .collect::<Vec<_>>()
2551 .join("|")
2552 .into_bytes();
2553
2554 groups.entry(key).or_default().push(row.clone());
2555 }
2556
2557 let mut filtered = Vec::new();
2558 for (_key, group_rows) in groups {
2559 let mut group_passed = Vec::new();
2560
2561 for row in &group_rows {
2562 let has_null_optional = optional_variables.iter().any(|var| {
2564 let direct_null =
2566 matches!(row.get(var), Some(Value::Null) | None);
2567 let prefixed_null = row
2568 .keys()
2569 .filter(|k| k.starts_with(&format!("{}.", var)))
2570 .any(|k| matches!(row.get(k), Some(Value::Null)));
2571 direct_null || prefixed_null
2572 });
2573
2574 if has_null_optional {
2575 group_passed.push(row.clone());
2576 continue;
2577 }
2578
2579 let res = self
2580 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2581 .await?;
2582
2583 if res.as_bool().unwrap_or(false) {
2584 group_passed.push(row.clone());
2585 }
2586 }
2587
2588 if group_passed.is_empty() {
2589 if let Some(template) = group_rows.first() {
2592 let mut null_row = HashMap::new();
2593 for (k, v) in template {
2594 if is_optional_key(k) {
2595 null_row.insert(k.clone(), Value::Null);
2596 } else {
2597 null_row.insert(k.clone(), v.clone());
2598 }
2599 }
2600 filtered.push(null_row);
2601 }
2602 } else {
2603 filtered.extend(group_passed);
2604 }
2605 }
2606
2607 tracing::debug!(
2608 "Filter (OPTIONAL): {} input rows -> {} output rows",
2609 input_matches.len(),
2610 filtered.len()
2611 );
2612
2613 return Ok(filtered);
2614 }
2615
2616 let mut filtered = Vec::new();
2618 for row in input_matches.iter() {
2619 let res = self
2620 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2621 .await?;
2622
2623 let passes = res.as_bool().unwrap_or(false);
2624
2625 if passes {
2626 filtered.push(row.clone());
2627 }
2628 }
2629
2630 tracing::debug!(
2631 "Filter: {} input rows -> {} output rows",
2632 input_matches.len(),
2633 filtered.len()
2634 );
2635
2636 Ok(filtered)
2637 }
2638 LogicalPlan::ProcedureCall {
2639 procedure_name,
2640 arguments,
2641 yield_items,
2642 } => {
2643 let yield_names: Vec<String> =
2644 yield_items.iter().map(|(n, _)| n.clone()).collect();
2645 let results = self
2646 .execute_procedure(
2647 &procedure_name,
2648 &arguments,
2649 &yield_names,
2650 prop_manager,
2651 params,
2652 ctx,
2653 )
2654 .await?;
2655
2656 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2662 if !has_aliases {
2663 Ok(results)
2666 } else {
2667 let mut aliased_results = Vec::with_capacity(results.len());
2668 for row in results {
2669 let mut new_row = HashMap::new();
2670 for (name, alias) in &yield_items {
2671 let col_name = alias.as_ref().unwrap_or(name);
2672 let val = row.get(name).cloned().unwrap_or(Value::Null);
2673 new_row.insert(col_name.clone(), val);
2674 }
2675 aliased_results.push(new_row);
2676 }
2677 Ok(aliased_results)
2678 }
2679 }
2680 LogicalPlan::VectorKnn { .. } => {
2681 unreachable!("VectorKnn is handled by DataFusion engine")
2682 }
2683 LogicalPlan::InvertedIndexLookup { .. } => {
2684 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2685 }
2686 LogicalPlan::Sort { input, order_by } => {
2687 let rows = self
2688 .execute_subplan(*input, prop_manager, params, ctx)
2689 .await?;
2690 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2691 .await
2692 }
2693 LogicalPlan::Limit { input, skip, fetch } => {
2694 let rows = self
2695 .execute_subplan(*input, prop_manager, params, ctx)
2696 .await?;
2697 let skip = skip.unwrap_or(0);
2698 let take = fetch.unwrap_or(usize::MAX);
2699 Ok(rows.into_iter().skip(skip).take(take).collect())
2700 }
2701 LogicalPlan::Aggregate {
2702 input,
2703 group_by,
2704 aggregates,
2705 } => {
2706 let rows = self
2707 .execute_subplan(*input, prop_manager, params, ctx)
2708 .await?;
2709 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2710 .await
2711 }
2712 LogicalPlan::Window {
2713 input,
2714 window_exprs,
2715 } => {
2716 let rows = self
2717 .execute_subplan(*input, prop_manager, params, ctx)
2718 .await?;
2719 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2720 .await
2721 }
2722 LogicalPlan::Project { input, projections } => {
2723 let matches = self
2724 .execute_subplan(*input, prop_manager, params, ctx)
2725 .await?;
2726 self.execute_project(matches, &projections, prop_manager, params, ctx)
2727 .await
2728 }
2729 LogicalPlan::Distinct { input } => {
2730 let rows = self
2731 .execute_subplan(*input, prop_manager, params, ctx)
2732 .await?;
2733 let mut seen = std::collections::HashSet::new();
2734 let mut result = Vec::new();
2735 for row in rows {
2736 let key = Self::canonical_row_key(&row);
2737 if seen.insert(key) {
2738 result.push(row);
2739 }
2740 }
2741 Ok(result)
2742 }
2743 LogicalPlan::Unwind {
2744 input,
2745 expr,
2746 variable,
2747 } => {
2748 let input_rows = self
2749 .execute_subplan(*input, prop_manager, params, ctx)
2750 .await?;
2751 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2752 .await
2753 }
2754 LogicalPlan::Apply {
2755 input,
2756 subquery,
2757 input_filter,
2758 } => {
2759 let input_rows = self
2760 .execute_subplan(*input, prop_manager, params, ctx)
2761 .await?;
2762 self.execute_apply(
2763 input_rows,
2764 &subquery,
2765 input_filter.as_ref(),
2766 prop_manager,
2767 params,
2768 ctx,
2769 )
2770 .await
2771 }
2772 LogicalPlan::SubqueryCall { input, subquery } => {
2773 let input_rows = self
2774 .execute_subplan(*input, prop_manager, params, ctx)
2775 .await?;
2776 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2779 .await
2780 }
2781 LogicalPlan::RecursiveCTE {
2782 cte_name,
2783 initial,
2784 recursive,
2785 } => {
2786 self.execute_recursive_cte(
2787 &cte_name,
2788 *initial,
2789 *recursive,
2790 prop_manager,
2791 params,
2792 ctx,
2793 )
2794 .await
2795 }
2796 LogicalPlan::CrossJoin { left, right } => {
2797 self.execute_cross_join(left, right, prop_manager, params, ctx)
2798 .await
2799 }
2800 LogicalPlan::Set { .. }
2801 | LogicalPlan::Remove { .. }
2802 | LogicalPlan::Merge { .. }
2803 | LogicalPlan::Create { .. }
2804 | LogicalPlan::CreateBatch { .. } => {
2805 unreachable!("mutations are handled by DataFusion engine")
2806 }
2807 LogicalPlan::Delete { .. } => {
2808 unreachable!("mutations are handled by DataFusion engine")
2809 }
2810 LogicalPlan::Begin => {
2811 if let Some(writer_lock) = &self.writer {
2812 let mut writer = writer_lock.write().await;
2813 writer.begin_transaction()?;
2814 } else {
2815 return Err(anyhow!("Transaction requires a Writer"));
2816 }
2817 Ok(vec![HashMap::new()])
2818 }
2819 LogicalPlan::Commit => {
2820 if let Some(writer_lock) = &self.writer {
2821 let mut writer = writer_lock.write().await;
2822 writer.commit_transaction().await?;
2823 } else {
2824 return Err(anyhow!("Transaction requires a Writer"));
2825 }
2826 Ok(vec![HashMap::new()])
2827 }
2828 LogicalPlan::Rollback => {
2829 if let Some(writer_lock) = &self.writer {
2830 let mut writer = writer_lock.write().await;
2831 writer.rollback_transaction()?;
2832 } else {
2833 return Err(anyhow!("Transaction requires a Writer"));
2834 }
2835 Ok(vec![HashMap::new()])
2836 }
2837 LogicalPlan::Copy {
2838 target,
2839 source,
2840 is_export,
2841 options,
2842 } => {
2843 if is_export {
2844 self.execute_export(&target, &source, &options, prop_manager, ctx)
2845 .await
2846 } else {
2847 self.execute_copy(&target, &source, &options, prop_manager)
2848 .await
2849 }
2850 }
2851 LogicalPlan::Backup {
2852 destination,
2853 options,
2854 } => self.execute_backup(&destination, &options).await,
2855 LogicalPlan::Explain { plan } => {
2856 let plan_str = format!("{:#?}", plan);
2857 let mut row = HashMap::new();
2858 row.insert("plan".to_string(), Value::String(plan_str));
2859 Ok(vec![row])
2860 }
2861 LogicalPlan::ShortestPath { .. } => {
2862 unreachable!("ShortestPath is handled by DataFusion engine")
2863 }
2864 LogicalPlan::AllShortestPaths { .. } => {
2865 unreachable!("AllShortestPaths is handled by DataFusion engine")
2866 }
2867 LogicalPlan::Foreach { .. } => {
2868 unreachable!("mutations are handled by DataFusion engine")
2869 }
2870 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2871 LogicalPlan::BindZeroLengthPath { .. } => {
2872 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2873 }
2874 LogicalPlan::BindPath { .. } => {
2875 unreachable!("BindPath is handled by DataFusion engine")
2876 }
2877 LogicalPlan::QuantifiedPattern { .. } => {
2878 unreachable!("QuantifiedPattern is handled by DataFusion engine")
2879 }
2880 LogicalPlan::LocyProgram { .. }
2881 | LogicalPlan::LocyFold { .. }
2882 | LogicalPlan::LocyBestBy { .. }
2883 | LogicalPlan::LocyPriority { .. }
2884 | LogicalPlan::LocyDerivedScan { .. }
2885 | LogicalPlan::LocyProject { .. } => {
2886 unreachable!("Locy operators are handled by DataFusion engine")
2887 }
2888 }
2889 })
2890 }
2891
2892 pub(crate) async fn execute_foreach_body_plan(
2897 &self,
2898 plan: LogicalPlan,
2899 scope: &mut HashMap<String, Value>,
2900 writer: &mut uni_store::runtime::writer::Writer,
2901 prop_manager: &PropertyManager,
2902 params: &HashMap<String, Value>,
2903 ctx: Option<&QueryContext>,
2904 ) -> Result<()> {
2905 match plan {
2906 LogicalPlan::Set { items, .. } => {
2907 self.execute_set_items_locked(&items, scope, writer, prop_manager, params, ctx)
2908 .await?;
2909 }
2910 LogicalPlan::Remove { items, .. } => {
2911 self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx)
2912 .await?;
2913 }
2914 LogicalPlan::Delete { items, detach, .. } => {
2915 for expr in &items {
2916 let val = self
2917 .evaluate_expr(expr, scope, prop_manager, params, ctx)
2918 .await?;
2919 self.execute_delete_item_locked(&val, detach, writer)
2920 .await?;
2921 }
2922 }
2923 LogicalPlan::Create { pattern, .. } => {
2924 self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2925 .await?;
2926 }
2927 LogicalPlan::CreateBatch { patterns, .. } => {
2928 for pattern in &patterns {
2929 self.execute_create_pattern(pattern, scope, writer, prop_manager, params, ctx)
2930 .await?;
2931 }
2932 }
2933 LogicalPlan::Merge {
2934 pattern,
2935 on_match: _,
2936 on_create,
2937 ..
2938 } => {
2939 self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2940 .await?;
2941 if let Some(on_create_clause) = on_create {
2942 self.execute_set_items_locked(
2943 &on_create_clause.items,
2944 scope,
2945 writer,
2946 prop_manager,
2947 params,
2948 ctx,
2949 )
2950 .await?;
2951 }
2952 }
2953 LogicalPlan::Foreach {
2954 variable,
2955 list,
2956 body,
2957 ..
2958 } => {
2959 let list_val = self
2960 .evaluate_expr(&list, scope, prop_manager, params, ctx)
2961 .await?;
2962 let items = match list_val {
2963 Value::List(arr) => arr,
2964 Value::Null => return Ok(()),
2965 _ => return Err(anyhow!("FOREACH requires a list")),
2966 };
2967 for item in items {
2968 let mut nested_scope = scope.clone();
2969 nested_scope.insert(variable.clone(), item);
2970 for nested_plan in &body {
2971 Box::pin(self.execute_foreach_body_plan(
2972 nested_plan.clone(),
2973 &mut nested_scope,
2974 writer,
2975 prop_manager,
2976 params,
2977 ctx,
2978 ))
2979 .await?;
2980 }
2981 }
2982 }
2983 _ => {
2984 return Err(anyhow!(
2985 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2986 ));
2987 }
2988 }
2989 Ok(())
2990 }
2991
2992 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2993 let mut pairs: Vec<_> = row.iter().collect();
2994 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2995
2996 pairs
2997 .into_iter()
2998 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2999 .collect::<Vec<_>>()
3000 .join("|")
3001 }
3002
3003 fn canonical_value_key(v: &Value) -> String {
3004 match v {
3005 Value::Null => "null".to_string(),
3006 Value::Bool(b) => format!("b:{b}"),
3007 Value::Int(i) => format!("n:{i}"),
3008 Value::Float(f) => {
3009 if f.is_nan() {
3010 "nan".to_string()
3011 } else if f.is_infinite() {
3012 if f.is_sign_positive() {
3013 "inf:+".to_string()
3014 } else {
3015 "inf:-".to_string()
3016 }
3017 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3018 format!("n:{}", *f as i64)
3019 } else {
3020 format!("f:{f}")
3021 }
3022 }
3023 Value::String(s) => {
3024 if let Some(k) = Self::temporal_string_key(s) {
3025 format!("temporal:{k}")
3026 } else {
3027 format!("s:{s}")
3028 }
3029 }
3030 Value::Bytes(b) => format!("bytes:{:?}", b),
3031 Value::List(items) => format!(
3032 "list:[{}]",
3033 items
3034 .iter()
3035 .map(Self::canonical_value_key)
3036 .collect::<Vec<_>>()
3037 .join(",")
3038 ),
3039 Value::Map(map) => {
3040 let mut pairs: Vec<_> = map.iter().collect();
3041 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3042 format!(
3043 "map:{{{}}}",
3044 pairs
3045 .into_iter()
3046 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3047 .collect::<Vec<_>>()
3048 .join(",")
3049 )
3050 }
3051 Value::Node(n) => {
3052 let mut labels = n.labels.clone();
3053 labels.sort();
3054 format!(
3055 "node:{}:{}:{}",
3056 n.vid.as_u64(),
3057 labels.join(":"),
3058 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3059 )
3060 }
3061 Value::Edge(e) => format!(
3062 "edge:{}:{}:{}:{}:{}",
3063 e.eid.as_u64(),
3064 e.edge_type,
3065 e.src.as_u64(),
3066 e.dst.as_u64(),
3067 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3068 ),
3069 Value::Path(p) => format!(
3070 "path:nodes=[{}];edges=[{}]",
3071 p.nodes
3072 .iter()
3073 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3074 .collect::<Vec<_>>()
3075 .join(","),
3076 p.edges
3077 .iter()
3078 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3079 .collect::<Vec<_>>()
3080 .join(",")
3081 ),
3082 Value::Vector(vs) => format!("vec:{:?}", vs),
3083 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3084 _ => format!("{v:?}"),
3085 }
3086 }
3087
3088 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3089 match t {
3090 uni_common::TemporalValue::Date { days_since_epoch } => {
3091 format!("date:{days_since_epoch}")
3092 }
3093 uni_common::TemporalValue::LocalTime {
3094 nanos_since_midnight,
3095 } => format!("localtime:{nanos_since_midnight}"),
3096 uni_common::TemporalValue::Time {
3097 nanos_since_midnight,
3098 offset_seconds,
3099 } => {
3100 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3101 format!("time:{utc_nanos}")
3102 }
3103 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3104 format!("localdatetime:{nanos_since_epoch}")
3105 }
3106 uni_common::TemporalValue::DateTime {
3107 nanos_since_epoch, ..
3108 } => format!("datetime:{nanos_since_epoch}"),
3109 uni_common::TemporalValue::Duration {
3110 months,
3111 days,
3112 nanos,
3113 } => format!("duration:{months}:{days}:{nanos}"),
3114 }
3115 }
3116
3117 fn temporal_string_key(s: &str) -> Option<String> {
3118 let fn_name = match classify_temporal(s)? {
3119 uni_common::TemporalType::Date => "DATE",
3120 uni_common::TemporalType::LocalTime => "LOCALTIME",
3121 uni_common::TemporalType::Time => "TIME",
3122 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3123 uni_common::TemporalType::DateTime => "DATETIME",
3124 uni_common::TemporalType::Duration => "DURATION",
3125 };
3126 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3127 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3128 _ => None,
3129 }
3130 }
3131
3132 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3135
3136 pub(crate) async fn execute_aggregate(
3137 &self,
3138 rows: Vec<HashMap<String, Value>>,
3139 group_by: &[Expr],
3140 aggregates: &[Expr],
3141 prop_manager: &PropertyManager,
3142 params: &HashMap<String, Value>,
3143 ctx: Option<&QueryContext>,
3144 ) -> Result<Vec<HashMap<String, Value>>> {
3145 if let Some(ctx) = ctx {
3147 ctx.check_timeout()?;
3148 }
3149
3150 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3151
3152 if rows.is_empty() {
3155 if group_by.is_empty() {
3156 let accs = Self::create_accumulators(aggregates);
3157 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3158 return Ok(vec![row]);
3159 }
3160 return Ok(vec![]);
3161 }
3162
3163 for (idx, row) in rows.into_iter().enumerate() {
3164 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3166 && let Some(ctx) = ctx
3167 {
3168 ctx.check_timeout()?;
3169 }
3170
3171 let key_vals = self
3172 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3173 .await?;
3174 let key_str = format!(
3177 "[{}]",
3178 key_vals
3179 .iter()
3180 .map(Self::canonical_value_key)
3181 .collect::<Vec<_>>()
3182 .join(",")
3183 );
3184
3185 let entry = groups
3186 .entry(key_str)
3187 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3188
3189 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3190 .await?;
3191 }
3192
3193 let results = groups
3194 .values()
3195 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3196 .collect();
3197
3198 Ok(results)
3199 }
3200
3201 pub(crate) async fn execute_window(
3202 &self,
3203 mut rows: Vec<HashMap<String, Value>>,
3204 window_exprs: &[Expr],
3205 _prop_manager: &PropertyManager,
3206 _params: &HashMap<String, Value>,
3207 ctx: Option<&QueryContext>,
3208 ) -> Result<Vec<HashMap<String, Value>>> {
3209 if let Some(ctx) = ctx {
3211 ctx.check_timeout()?;
3212 }
3213
3214 if rows.is_empty() || window_exprs.is_empty() {
3216 return Ok(rows);
3217 }
3218
3219 for window_expr in window_exprs {
3221 let Expr::FunctionCall {
3223 name,
3224 args,
3225 window_spec: Some(window_spec),
3226 ..
3227 } = window_expr
3228 else {
3229 return Err(anyhow!(
3230 "Window expression must be a FunctionCall with OVER clause: {:?}",
3231 window_expr
3232 ));
3233 };
3234
3235 let name_upper = name.to_uppercase();
3236
3237 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3239 return Err(anyhow!(
3240 "Unsupported window function: {}. Supported functions: {}",
3241 name,
3242 WINDOW_FUNCTIONS.join(", ")
3243 ));
3244 }
3245
3246 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3248
3249 for (row_idx, row) in rows.iter().enumerate() {
3250 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3252 vec![]
3254 } else {
3255 window_spec
3256 .partition_by
3257 .iter()
3258 .map(|expr| self.evaluate_simple_expr(expr, row))
3259 .collect::<Result<Vec<_>>>()?
3260 };
3261
3262 partition_map
3263 .entry(partition_key)
3264 .or_default()
3265 .push(row_idx);
3266 }
3267
3268 for (_partition_key, row_indices) in partition_map.iter_mut() {
3270 if !window_spec.order_by.is_empty() {
3272 row_indices.sort_by(|&a, &b| {
3273 for sort_item in &window_spec.order_by {
3274 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3275 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3276
3277 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3278 let cmp = Executor::compare_values(&va, &vb);
3279 let cmp = if sort_item.ascending {
3280 cmp
3281 } else {
3282 cmp.reverse()
3283 };
3284 if cmp != std::cmp::Ordering::Equal {
3285 return cmp;
3286 }
3287 }
3288 }
3289 std::cmp::Ordering::Equal
3290 });
3291 }
3292
3293 for (position, &row_idx) in row_indices.iter().enumerate() {
3295 let window_value = match name_upper.as_str() {
3296 "ROW_NUMBER" => Value::from((position + 1) as i64),
3297 "RANK" => {
3298 let rank = if position == 0 {
3300 1i64
3301 } else {
3302 let prev_row_idx = row_indices[position - 1];
3303 let same_as_prev = self.rows_have_same_sort_keys(
3304 &window_spec.order_by,
3305 &rows,
3306 row_idx,
3307 prev_row_idx,
3308 );
3309
3310 if same_as_prev {
3311 let mut group_start = position - 1;
3313 while group_start > 0 {
3314 let curr_idx = row_indices[group_start];
3315 let prev_idx = row_indices[group_start - 1];
3316 if !self.rows_have_same_sort_keys(
3317 &window_spec.order_by,
3318 &rows,
3319 curr_idx,
3320 prev_idx,
3321 ) {
3322 break;
3323 }
3324 group_start -= 1;
3325 }
3326 (group_start + 1) as i64
3327 } else {
3328 (position + 1) as i64
3329 }
3330 };
3331 Value::from(rank)
3332 }
3333 "DENSE_RANK" => {
3334 let mut dense_rank = 1i64;
3336 for i in 0..position {
3337 let curr_idx = row_indices[i + 1];
3338 let prev_idx = row_indices[i];
3339 if !self.rows_have_same_sort_keys(
3340 &window_spec.order_by,
3341 &rows,
3342 curr_idx,
3343 prev_idx,
3344 ) {
3345 dense_rank += 1;
3346 }
3347 }
3348 Value::from(dense_rank)
3349 }
3350 "LAG" => {
3351 let (value_expr, offset, default_value) =
3352 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3353
3354 if position >= offset {
3355 let target_idx = row_indices[position - offset];
3356 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3357 } else {
3358 default_value
3359 }
3360 }
3361 "LEAD" => {
3362 let (value_expr, offset, default_value) =
3363 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3364
3365 if position + offset < row_indices.len() {
3366 let target_idx = row_indices[position + offset];
3367 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3368 } else {
3369 default_value
3370 }
3371 }
3372 "NTILE" => {
3373 let num_buckets_expr = args.first().ok_or_else(|| {
3375 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3376 })?;
3377 let num_buckets_val =
3378 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3379 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3380 anyhow!(
3381 "NTILE argument must be an integer, got: {:?}",
3382 num_buckets_val
3383 )
3384 })?;
3385
3386 if num_buckets <= 0 {
3387 return Err(anyhow!(
3388 "NTILE bucket count must be positive, got: {}",
3389 num_buckets
3390 ));
3391 }
3392
3393 let num_buckets = num_buckets as usize;
3394 let partition_size = row_indices.len();
3395
3396 let base_size = partition_size / num_buckets;
3401 let extra_rows = partition_size % num_buckets;
3402
3403 let bucket = if position < extra_rows * (base_size + 1) {
3405 position / (base_size + 1) + 1
3407 } else {
3408 let adjusted_position = position - extra_rows * (base_size + 1);
3410 extra_rows + (adjusted_position / base_size) + 1
3411 };
3412
3413 Value::from(bucket as i64)
3414 }
3415 "FIRST_VALUE" => {
3416 let value_expr = args.first().ok_or_else(|| {
3418 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3419 })?;
3420
3421 if row_indices.is_empty() {
3423 Value::Null
3424 } else {
3425 let first_idx = row_indices[0];
3426 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3427 }
3428 }
3429 "LAST_VALUE" => {
3430 let value_expr = args.first().ok_or_else(|| {
3432 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3433 })?;
3434
3435 if row_indices.is_empty() {
3437 Value::Null
3438 } else {
3439 let last_idx = row_indices[row_indices.len() - 1];
3440 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3441 }
3442 }
3443 "NTH_VALUE" => {
3444 if args.len() != 2 {
3446 return Err(anyhow!(
3447 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3448 ));
3449 }
3450
3451 let value_expr = &args[0];
3452 let n_expr = &args[1];
3453
3454 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3455 let n = n_val.as_i64().ok_or_else(|| {
3456 anyhow!(
3457 "NTH_VALUE second argument must be an integer, got: {:?}",
3458 n_val
3459 )
3460 })?;
3461
3462 if n <= 0 {
3463 return Err(anyhow!(
3464 "NTH_VALUE position must be positive, got: {}",
3465 n
3466 ));
3467 }
3468
3469 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3471 let nth_idx = row_indices[nth_index];
3472 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3473 } else {
3474 Value::Null
3475 }
3476 }
3477 _ => unreachable!("Window function {} already validated", name),
3478 };
3479
3480 let col_name = window_expr.to_string_repr();
3483 rows[row_idx].insert(col_name, window_value);
3484 }
3485 }
3486 }
3487
3488 Ok(rows)
3489 }
3490
3491 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3496 match expr {
3497 Expr::Variable(name) => row
3498 .get(name)
3499 .cloned()
3500 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3501 Expr::Property(base, prop) => {
3502 let base_val = self.evaluate_simple_expr(base, row)?;
3503 if let Value::Map(map) = base_val {
3504 map.get(prop)
3505 .cloned()
3506 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3507 } else {
3508 Err(anyhow!("Cannot access property on non-object"))
3509 }
3510 }
3511 Expr::Literal(lit) => Ok(lit.to_value()),
3512 _ => Err(anyhow!(
3513 "Unsupported expression in window function: {:?}",
3514 expr
3515 )),
3516 }
3517 }
3518
3519 fn rows_have_same_sort_keys(
3521 &self,
3522 order_by: &[uni_cypher::ast::SortItem],
3523 rows: &[HashMap<String, Value>],
3524 idx_a: usize,
3525 idx_b: usize,
3526 ) -> bool {
3527 order_by.iter().all(|sort_item| {
3528 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3529 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3530 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3531 })
3532 }
3533
3534 fn extract_lag_lead_params<'a>(
3536 &self,
3537 func_name: &str,
3538 args: &'a [Expr],
3539 row: &HashMap<String, Value>,
3540 ) -> Result<(&'a Expr, usize, Value)> {
3541 let value_expr = args.first().ok_or_else(|| {
3542 anyhow!(
3543 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3544 func_name,
3545 func_name
3546 )
3547 })?;
3548
3549 let offset = if let Some(offset_expr) = args.get(1) {
3550 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3551 offset_val.as_i64().ok_or_else(|| {
3552 anyhow!(
3553 "{} offset must be an integer, got: {:?}",
3554 func_name,
3555 offset_val
3556 )
3557 })? as usize
3558 } else {
3559 1
3560 };
3561
3562 let default_value = if let Some(default_expr) = args.get(2) {
3563 self.evaluate_simple_expr(default_expr, row)?
3564 } else {
3565 Value::Null
3566 };
3567
3568 Ok((value_expr, offset, default_value))
3569 }
3570
3571 pub(crate) async fn evaluate_group_keys(
3573 &self,
3574 group_by: &[Expr],
3575 row: &HashMap<String, Value>,
3576 prop_manager: &PropertyManager,
3577 params: &HashMap<String, Value>,
3578 ctx: Option<&QueryContext>,
3579 ) -> Result<Vec<Value>> {
3580 let mut key_vals = Vec::new();
3581 for expr in group_by {
3582 key_vals.push(
3583 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3584 .await?,
3585 );
3586 }
3587 Ok(key_vals)
3588 }
3589
3590 pub(crate) async fn update_accumulators(
3592 &self,
3593 accs: &mut [Accumulator],
3594 aggregates: &[Expr],
3595 row: &HashMap<String, Value>,
3596 prop_manager: &PropertyManager,
3597 params: &HashMap<String, Value>,
3598 ctx: Option<&QueryContext>,
3599 ) -> Result<()> {
3600 for (i, agg_expr) in aggregates.iter().enumerate() {
3601 if let Expr::FunctionCall { args, .. } = agg_expr {
3602 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3603 let val = if is_wildcard {
3604 Value::Null
3605 } else {
3606 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3607 .await?
3608 };
3609 accs[i].update(&val, is_wildcard);
3610 }
3611 }
3612 Ok(())
3613 }
3614
3615 pub(crate) async fn execute_recursive_cte(
3617 &self,
3618 cte_name: &str,
3619 initial: LogicalPlan,
3620 recursive: LogicalPlan,
3621 prop_manager: &PropertyManager,
3622 params: &HashMap<String, Value>,
3623 ctx: Option<&QueryContext>,
3624 ) -> Result<Vec<HashMap<String, Value>>> {
3625 use std::collections::HashSet;
3626
3627 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3630 let mut pairs: Vec<_> = row.iter().collect();
3631 pairs.sort_by(|a, b| a.0.cmp(b.0));
3632 format!("{pairs:?}")
3633 }
3634
3635 let mut working_table = self
3637 .execute_subplan(initial, prop_manager, params, ctx)
3638 .await?;
3639 let mut result_table = working_table.clone();
3640
3641 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3643
3644 let max_iterations = 1000;
3648 for _iteration in 0..max_iterations {
3649 if let Some(ctx) = ctx {
3651 ctx.check_timeout()?;
3652 }
3653
3654 if working_table.is_empty() {
3655 break;
3656 }
3657
3658 let working_val = Value::List(
3660 working_table
3661 .iter()
3662 .map(|row| {
3663 if row.len() == 1 {
3664 row.values().next().unwrap().clone()
3665 } else {
3666 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3667 }
3668 })
3669 .collect(),
3670 );
3671
3672 let mut next_params = params.clone();
3673 next_params.insert(cte_name.to_string(), working_val);
3674
3675 let next_result = self
3677 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3678 .await?;
3679
3680 if next_result.is_empty() {
3681 break;
3682 }
3683
3684 let new_rows: Vec<_> = next_result
3686 .into_iter()
3687 .filter(|row| {
3688 let key = row_key(row);
3689 seen.insert(key) })
3691 .collect();
3692
3693 if new_rows.is_empty() {
3694 break;
3696 }
3697
3698 result_table.extend(new_rows.clone());
3699 working_table = new_rows;
3700 }
3701
3702 let final_list = Value::List(
3704 result_table
3705 .into_iter()
3706 .map(|row| {
3707 if row.len() == 1 {
3719 row.values().next().unwrap().clone()
3720 } else {
3721 Value::Map(row.into_iter().collect())
3722 }
3723 })
3724 .collect(),
3725 );
3726
3727 let mut final_row = HashMap::new();
3728 final_row.insert(cte_name.to_string(), final_list);
3729 Ok(vec![final_row])
3730 }
3731
3732 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3734
3735 pub(crate) async fn execute_sort(
3736 &self,
3737 rows: Vec<HashMap<String, Value>>,
3738 order_by: &[uni_cypher::ast::SortItem],
3739 prop_manager: &PropertyManager,
3740 params: &HashMap<String, Value>,
3741 ctx: Option<&QueryContext>,
3742 ) -> Result<Vec<HashMap<String, Value>>> {
3743 if let Some(ctx) = ctx {
3745 ctx.check_timeout()?;
3746 }
3747
3748 let mut rows_with_keys = Vec::with_capacity(rows.len());
3749 for (idx, row) in rows.into_iter().enumerate() {
3750 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3752 && let Some(ctx) = ctx
3753 {
3754 ctx.check_timeout()?;
3755 }
3756
3757 let mut keys = Vec::new();
3758 for item in order_by {
3759 let val = row
3760 .get(&item.expr.to_string_repr())
3761 .cloned()
3762 .unwrap_or(Value::Null);
3763 let val = if val.is_null() {
3764 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3765 .await
3766 .unwrap_or(Value::Null)
3767 } else {
3768 val
3769 };
3770 keys.push(val);
3771 }
3772 rows_with_keys.push((row, keys));
3773 }
3774
3775 if let Some(ctx) = ctx {
3777 ctx.check_timeout()?;
3778 }
3779
3780 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3781
3782 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3783 }
3784
3785 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3787 aggregates
3788 .iter()
3789 .map(|expr| {
3790 if let Expr::FunctionCall { name, distinct, .. } = expr {
3791 Accumulator::new(name, *distinct)
3792 } else {
3793 Accumulator::new("COUNT", false)
3794 }
3795 })
3796 .collect()
3797 }
3798
3799 pub(crate) fn build_aggregate_result(
3801 group_by: &[Expr],
3802 aggregates: &[Expr],
3803 key_vals: &[Value],
3804 accs: &[Accumulator],
3805 ) -> HashMap<String, Value> {
3806 let mut res_row = HashMap::new();
3807 for (i, expr) in group_by.iter().enumerate() {
3808 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3809 }
3810 for (i, expr) in aggregates.iter().enumerate() {
3811 let col_name = crate::query::planner::aggregate_column_name(expr);
3813 res_row.insert(col_name, accs[i].finish());
3814 }
3815 res_row
3816 }
3817
3818 pub(crate) fn compare_sort_keys(
3820 a_keys: &[Value],
3821 b_keys: &[Value],
3822 order_by: &[uni_cypher::ast::SortItem],
3823 ) -> std::cmp::Ordering {
3824 for (i, item) in order_by.iter().enumerate() {
3825 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3826 if order != std::cmp::Ordering::Equal {
3827 return if item.ascending {
3828 order
3829 } else {
3830 order.reverse()
3831 };
3832 }
3833 }
3834 std::cmp::Ordering::Equal
3835 }
3836
3837 pub(crate) async fn execute_backup(
3841 &self,
3842 destination: &str,
3843 _options: &HashMap<String, Value>,
3844 ) -> Result<Vec<HashMap<String, Value>>> {
3845 if let Some(writer_arc) = &self.writer {
3847 let mut writer = writer_arc.write().await;
3848 writer.flush_to_l1(None).await?;
3849 }
3850
3851 let snapshot_manager = self.storage.snapshot_manager();
3853 let snapshot = snapshot_manager
3854 .load_latest_snapshot()
3855 .await?
3856 .ok_or_else(|| anyhow!("No snapshot found"))?;
3857
3858 if is_cloud_url(destination) {
3860 self.backup_to_cloud(destination, &snapshot.snapshot_id)
3861 .await?;
3862 } else {
3863 let validated_dest = self.validate_path(destination)?;
3865 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3866 .await?;
3867 }
3868
3869 let mut res = HashMap::new();
3870 res.insert(
3871 "status".to_string(),
3872 Value::String("Backup completed".to_string()),
3873 );
3874 res.insert(
3875 "snapshot_id".to_string(),
3876 Value::String(snapshot.snapshot_id),
3877 );
3878 Ok(vec![res])
3879 }
3880
3881 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3883 let source_path = std::path::Path::new(self.storage.base_path());
3884
3885 if !dest_path.exists() {
3886 std::fs::create_dir_all(dest_path)?;
3887 }
3888
3889 if source_path.exists() {
3891 Self::copy_dir_all(source_path, dest_path)?;
3892 }
3893
3894 let schema_manager = self.storage.schema_manager();
3896 let dest_catalog = dest_path.join("catalog");
3897 if !dest_catalog.exists() {
3898 std::fs::create_dir_all(&dest_catalog)?;
3899 }
3900
3901 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3902 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3903
3904 Ok(())
3905 }
3906
3907 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3911 use object_store::ObjectStore;
3912 use object_store::local::LocalFileSystem;
3913 use object_store::path::Path as ObjPath;
3914
3915 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3916 let source_path = std::path::Path::new(self.storage.base_path());
3917
3918 let src_store: Arc<dyn ObjectStore> =
3920 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3921
3922 let catalog_src = ObjPath::from("catalog");
3924 let catalog_dst = if dest_prefix.as_ref().is_empty() {
3925 ObjPath::from("catalog")
3926 } else {
3927 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3928 };
3929 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3930
3931 let storage_src = ObjPath::from("storage");
3933 let storage_dst = if dest_prefix.as_ref().is_empty() {
3934 ObjPath::from("storage")
3935 } else {
3936 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3937 };
3938 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3939
3940 let schema_manager = self.storage.schema_manager();
3942 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3943 let schema_path = if dest_prefix.as_ref().is_empty() {
3944 ObjPath::from("catalog/schema.json")
3945 } else {
3946 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3947 };
3948 dest_store
3949 .put(&schema_path, bytes::Bytes::from(schema_content).into())
3950 .await?;
3951
3952 Ok(())
3953 }
3954
3955 const MAX_BACKUP_DEPTH: usize = 100;
3960
3961 const MAX_BACKUP_FILES: usize = 100_000;
3966
3967 pub(crate) fn copy_dir_all(
3975 src: &std::path::Path,
3976 dst: &std::path::Path,
3977 ) -> std::io::Result<()> {
3978 let mut file_count = 0usize;
3979 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3980 }
3981
3982 pub(crate) fn copy_dir_all_impl(
3984 src: &std::path::Path,
3985 dst: &std::path::Path,
3986 depth: usize,
3987 file_count: &mut usize,
3988 ) -> std::io::Result<()> {
3989 if depth >= Self::MAX_BACKUP_DEPTH {
3990 return Err(std::io::Error::new(
3991 std::io::ErrorKind::InvalidInput,
3992 format!(
3993 "Maximum backup depth {} exceeded at {:?}",
3994 Self::MAX_BACKUP_DEPTH,
3995 src
3996 ),
3997 ));
3998 }
3999
4000 std::fs::create_dir_all(dst)?;
4001
4002 for entry in std::fs::read_dir(src)? {
4003 if *file_count >= Self::MAX_BACKUP_FILES {
4004 return Err(std::io::Error::new(
4005 std::io::ErrorKind::InvalidInput,
4006 format!(
4007 "Maximum backup file count {} exceeded",
4008 Self::MAX_BACKUP_FILES
4009 ),
4010 ));
4011 }
4012 *file_count += 1;
4013
4014 let entry = entry?;
4015 let metadata = entry.metadata()?;
4016
4017 if metadata.file_type().is_symlink() {
4019 continue;
4021 }
4022
4023 let dst_path = dst.join(entry.file_name());
4024 if metadata.is_dir() {
4025 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4026 } else {
4027 std::fs::copy(entry.path(), dst_path)?;
4028 }
4029 }
4030 Ok(())
4031 }
4032
4033 pub(crate) async fn execute_copy(
4034 &self,
4035 target: &str,
4036 source: &str,
4037 options: &HashMap<String, Value>,
4038 prop_manager: &PropertyManager,
4039 ) -> Result<Vec<HashMap<String, Value>>> {
4040 let format = options
4041 .get("format")
4042 .and_then(|v| v.as_str())
4043 .unwrap_or_else(|| {
4044 if source.ends_with(".parquet") {
4045 "parquet"
4046 } else {
4047 "csv"
4048 }
4049 });
4050
4051 match format.to_lowercase().as_str() {
4052 "csv" => self.execute_csv_import(target, source, options).await,
4053 "parquet" => {
4054 self.execute_parquet_import(target, source, options, prop_manager)
4055 .await
4056 }
4057 _ => Err(anyhow!("Unsupported format: {}", format)),
4058 }
4059 }
4060
4061 pub(crate) async fn execute_csv_import(
4062 &self,
4063 target: &str,
4064 source: &str,
4065 options: &HashMap<String, Value>,
4066 ) -> Result<Vec<HashMap<String, Value>>> {
4067 let validated_source = self.validate_path(source)?;
4069
4070 let writer_lock = self
4071 .writer
4072 .as_ref()
4073 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4074
4075 let schema = self.storage.schema_manager().schema();
4076
4077 let label_meta = schema.labels.get(target);
4079 let edge_meta = schema.edge_types.get(target);
4080
4081 if label_meta.is_none() && edge_meta.is_none() {
4082 return Err(anyhow!("Target '{}' not found in schema", target));
4083 }
4084
4085 let delimiter_str = options
4087 .get("delimiter")
4088 .and_then(|v| v.as_str())
4089 .unwrap_or(",");
4090 let delimiter = if delimiter_str.is_empty() {
4091 b','
4092 } else {
4093 delimiter_str.as_bytes()[0]
4094 };
4095 let has_header = options
4096 .get("header")
4097 .and_then(|v| v.as_bool())
4098 .unwrap_or(true);
4099
4100 let mut rdr = csv::ReaderBuilder::new()
4101 .delimiter(delimiter)
4102 .has_headers(has_header)
4103 .from_path(&validated_source)?;
4104
4105 let headers = rdr.headers()?.clone();
4106 let mut count = 0;
4107
4108 let mut writer = writer_lock.write().await;
4109
4110 if label_meta.is_some() {
4111 let target_props = schema
4112 .properties
4113 .get(target)
4114 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4115
4116 for result in rdr.records() {
4117 let record = result?;
4118 let mut props = HashMap::new();
4119
4120 for (i, header) in headers.iter().enumerate() {
4121 if let Some(val_str) = record.get(i)
4122 && let Some(prop_meta) = target_props.get(header)
4123 {
4124 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4125 props.insert(header.to_string(), val);
4126 }
4127 }
4128
4129 let vid = writer.next_vid().await?;
4130 writer
4131 .insert_vertex_with_labels(vid, props, &[target.to_string()])
4132 .await?;
4133 count += 1;
4134 }
4135 } else if let Some(meta) = edge_meta {
4136 let type_id = meta.id;
4137 let target_props = schema
4138 .properties
4139 .get(target)
4140 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4141
4142 let src_col = options
4145 .get("src_col")
4146 .and_then(|v| v.as_str())
4147 .unwrap_or("_src");
4148 let dst_col = options
4149 .get("dst_col")
4150 .and_then(|v| v.as_str())
4151 .unwrap_or("_dst");
4152
4153 for result in rdr.records() {
4154 let record = result?;
4155 let mut props = HashMap::new();
4156 let mut src_vid = None;
4157 let mut dst_vid = None;
4158
4159 for (i, header) in headers.iter().enumerate() {
4160 if let Some(val_str) = record.get(i) {
4161 if header == src_col {
4162 src_vid =
4163 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4164 } else if header == dst_col {
4165 dst_vid =
4166 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4167 } else if let Some(prop_meta) = target_props.get(header) {
4168 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4169 props.insert(header.to_string(), val);
4170 }
4171 }
4172 }
4173
4174 let src =
4175 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4176 let dst = dst_vid
4177 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4178
4179 let eid = writer.next_eid(type_id).await?;
4180 writer
4181 .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4182 .await?;
4183 count += 1;
4184 }
4185 }
4186
4187 let mut res = HashMap::new();
4188 res.insert("count".to_string(), Value::Int(count as i64));
4189 Ok(vec![res])
4190 }
4191
4192 pub(crate) async fn execute_parquet_import(
4196 &self,
4197 target: &str,
4198 source: &str,
4199 options: &HashMap<String, Value>,
4200 _prop_manager: &PropertyManager,
4201 ) -> Result<Vec<HashMap<String, Value>>> {
4202 let writer_lock = self
4203 .writer
4204 .as_ref()
4205 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4206
4207 let schema = self.storage.schema_manager().schema();
4208
4209 let label_meta = schema.labels.get(target);
4211 let edge_meta = schema.edge_types.get(target);
4212
4213 if label_meta.is_none() && edge_meta.is_none() {
4214 return Err(anyhow!("Target '{}' not found in schema", target));
4215 }
4216
4217 let reader = if is_cloud_url(source) {
4219 self.open_parquet_from_cloud(source).await?
4220 } else {
4221 let validated_source = self.validate_path(source)?;
4223 let file = std::fs::File::open(&validated_source)?;
4224 let builder =
4225 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4226 builder.build()?
4227 };
4228 let mut reader = reader;
4229
4230 let mut count = 0;
4231 let mut writer = writer_lock.write().await;
4232
4233 if label_meta.is_some() {
4234 let target_props = schema
4235 .properties
4236 .get(target)
4237 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4238
4239 for batch in reader.by_ref() {
4240 let batch = batch?;
4241 for row in 0..batch.num_rows() {
4242 let mut props = HashMap::new();
4243 for field in batch.schema().fields() {
4244 let name = field.name();
4245 if target_props.contains_key(name) {
4246 let col = batch.column_by_name(name).unwrap();
4247 if !col.is_null(row) {
4248 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4250 let val =
4251 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4252 props.insert(name.clone(), val);
4253 }
4254 }
4255 }
4256 let vid = writer.next_vid().await?;
4257 writer
4258 .insert_vertex_with_labels(vid, props, &[target.to_string()])
4259 .await?;
4260 count += 1;
4261 }
4262 }
4263 } else if let Some(meta) = edge_meta {
4264 let type_id = meta.id;
4265 let target_props = schema
4266 .properties
4267 .get(target)
4268 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4269
4270 let src_col = options
4271 .get("src_col")
4272 .and_then(|v| v.as_str())
4273 .unwrap_or("_src");
4274 let dst_col = options
4275 .get("dst_col")
4276 .and_then(|v| v.as_str())
4277 .unwrap_or("_dst");
4278
4279 for batch in reader {
4280 let batch = batch?;
4281 for row in 0..batch.num_rows() {
4282 let mut props = HashMap::new();
4283 let mut src_vid = None;
4284 let mut dst_vid = None;
4285
4286 for field in batch.schema().fields() {
4287 let name = field.name();
4288 let col = batch.column_by_name(name).unwrap();
4289 if col.is_null(row) {
4290 continue;
4291 }
4292
4293 if name == src_col {
4294 let val = Self::arrow_to_value(col.as_ref(), row);
4295 src_vid = Some(Self::vid_from_value(&val)?);
4296 } else if name == dst_col {
4297 let val = Self::arrow_to_value(col.as_ref(), row);
4298 dst_vid = Some(Self::vid_from_value(&val)?);
4299 } else if let Some(pm) = target_props.get(name) {
4300 let val =
4302 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4303 props.insert(name.clone(), val);
4304 }
4305 }
4306
4307 let src = src_vid
4308 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4309 let dst = dst_vid.ok_or_else(|| {
4310 anyhow!("Missing destination VID in column '{}'", dst_col)
4311 })?;
4312
4313 let eid = writer.next_eid(type_id).await?;
4314 writer
4315 .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4316 .await?;
4317 count += 1;
4318 }
4319 }
4320 }
4321
4322 let mut res = HashMap::new();
4323 res.insert("count".to_string(), Value::Int(count as i64));
4324 Ok(vec![res])
4325 }
4326
4327 async fn open_parquet_from_cloud(
4331 &self,
4332 source_url: &str,
4333 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4334 use object_store::ObjectStore;
4335
4336 let (store, path) = build_store_from_url(source_url)?;
4337
4338 let bytes = store.get(&path).await?.bytes().await?;
4340
4341 let reader = bytes::Bytes::from(bytes.to_vec());
4343 let builder =
4344 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4345 Ok(builder.build()?)
4346 }
4347
4348 pub(crate) async fn scan_edge_type(
4349 &self,
4350 edge_type: &str,
4351 ctx: Option<&QueryContext>,
4352 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4353 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4354
4355 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4357
4358 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4360
4361 if let Some(ctx) = ctx {
4363 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4364 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4365 }
4366
4367 Ok(edges
4368 .into_iter()
4369 .map(|(eid, (src, dst))| (eid, src, dst))
4370 .collect())
4371 }
4372
4373 pub(crate) async fn scan_edge_type_l2(
4378 &self,
4379 _edge_type: &str,
4380 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4381 ) -> Result<()> {
4382 Ok(())
4385 }
4386
4387 pub(crate) async fn scan_edge_type_l1(
4389 &self,
4390 edge_type: &str,
4391 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4392 ) -> Result<()> {
4393 use futures::TryStreamExt;
4394 use lancedb::query::{ExecutableQuery, QueryBase, Select};
4395
4396 if let Ok(ds) = self.storage.delta_dataset(edge_type, "fwd") {
4397 let lancedb_store = self.storage.lancedb_store();
4398 if let Ok(table) = ds.open_lancedb(lancedb_store).await {
4399 let query = table.query().select(Select::Columns(vec![
4400 "eid".into(),
4401 "src_vid".into(),
4402 "dst_vid".into(),
4403 "op".into(),
4404 "_version".into(),
4405 ]));
4406
4407 if let Ok(stream) = query.execute().await {
4408 let batches: Vec<arrow_array::RecordBatch> =
4409 stream.try_collect().await.unwrap_or_default();
4410
4411 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4413 HashMap::new();
4414
4415 for batch in batches {
4416 self.process_delta_batch(&batch, &mut versioned_ops)?;
4417 }
4418
4419 for (eid, (_, op, src, dst)) in versioned_ops {
4421 if op == 0 {
4422 edges.insert(eid, (src, dst));
4423 } else if op == 1 {
4424 edges.remove(&eid);
4425 }
4426 }
4427 }
4428 }
4429 }
4430 Ok(())
4431 }
4432
4433 pub(crate) fn process_delta_batch(
4435 &self,
4436 batch: &arrow_array::RecordBatch,
4437 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4438 ) -> Result<()> {
4439 use arrow_array::UInt64Array;
4440 let eid_col = batch
4441 .column_by_name("eid")
4442 .ok_or(anyhow!("Missing eid"))?
4443 .as_any()
4444 .downcast_ref::<UInt64Array>()
4445 .ok_or(anyhow!("Invalid eid"))?;
4446 let src_col = batch
4447 .column_by_name("src_vid")
4448 .ok_or(anyhow!("Missing src_vid"))?
4449 .as_any()
4450 .downcast_ref::<UInt64Array>()
4451 .ok_or(anyhow!("Invalid src_vid"))?;
4452 let dst_col = batch
4453 .column_by_name("dst_vid")
4454 .ok_or(anyhow!("Missing dst_vid"))?
4455 .as_any()
4456 .downcast_ref::<UInt64Array>()
4457 .ok_or(anyhow!("Invalid dst_vid"))?;
4458 let op_col = batch
4459 .column_by_name("op")
4460 .ok_or(anyhow!("Missing op"))?
4461 .as_any()
4462 .downcast_ref::<arrow_array::UInt8Array>()
4463 .ok_or(anyhow!("Invalid op"))?;
4464 let version_col = batch
4465 .column_by_name("_version")
4466 .ok_or(anyhow!("Missing _version"))?
4467 .as_any()
4468 .downcast_ref::<UInt64Array>()
4469 .ok_or(anyhow!("Invalid _version"))?;
4470
4471 for i in 0..batch.num_rows() {
4472 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4473 let version = version_col.value(i);
4474 let op = op_col.value(i);
4475 let src = Vid::from(src_col.value(i));
4476 let dst = Vid::from(dst_col.value(i));
4477
4478 match versioned_ops.entry(eid) {
4479 std::collections::hash_map::Entry::Vacant(e) => {
4480 e.insert((version, op, src, dst));
4481 }
4482 std::collections::hash_map::Entry::Occupied(mut e) => {
4483 if version > e.get().0 {
4484 e.insert((version, op, src, dst));
4485 }
4486 }
4487 }
4488 }
4489 Ok(())
4490 }
4491
4492 pub(crate) fn scan_edge_type_l0(
4494 &self,
4495 edge_type: &str,
4496 ctx: &QueryContext,
4497 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4498 ) {
4499 let schema = self.storage.schema_manager().schema();
4500 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4501
4502 if let Some(type_id) = type_id {
4503 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4505
4506 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4508 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4509 }
4510
4511 for pending_l0_arc in &ctx.pending_flush_l0s {
4513 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4514 }
4515 }
4516 }
4517
4518 pub(crate) fn scan_single_l0(
4520 &self,
4521 l0: &uni_store::runtime::L0Buffer,
4522 type_id: u32,
4523 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4524 ) {
4525 for edge_entry in l0.graph.edges() {
4526 if edge_entry.edge_type == type_id {
4527 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4528 }
4529 }
4530 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4532 for eid in eids_to_check {
4533 if l0.is_tombstoned(eid) {
4534 edges.remove(&eid);
4535 }
4536 }
4537 }
4538
4539 pub(crate) fn filter_tombstoned_vertex_edges(
4541 &self,
4542 ctx: &QueryContext,
4543 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4544 ) {
4545 let l0 = ctx.l0.read();
4546 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4547
4548 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4550 let tx_l0 = tx_l0_arc.read();
4551 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4552 }
4553
4554 for pending_l0_arc in &ctx.pending_flush_l0s {
4556 let pending_l0 = pending_l0_arc.read();
4557 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4558 }
4559
4560 edges.retain(|_, (src, dst)| {
4561 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4562 });
4563 }
4564
4565 pub(crate) async fn execute_project(
4567 &self,
4568 input_rows: Vec<HashMap<String, Value>>,
4569 projections: &[(Expr, Option<String>)],
4570 prop_manager: &PropertyManager,
4571 params: &HashMap<String, Value>,
4572 ctx: Option<&QueryContext>,
4573 ) -> Result<Vec<HashMap<String, Value>>> {
4574 let mut results = Vec::new();
4575 for m in input_rows {
4576 let mut row = HashMap::new();
4577 for (expr, alias) in projections {
4578 let val = self
4579 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4580 .await?;
4581 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4582 row.insert(name, val);
4583 }
4584 results.push(row);
4585 }
4586 Ok(results)
4587 }
4588
4589 pub(crate) async fn execute_unwind(
4591 &self,
4592 input_rows: Vec<HashMap<String, Value>>,
4593 expr: &Expr,
4594 variable: &str,
4595 prop_manager: &PropertyManager,
4596 params: &HashMap<String, Value>,
4597 ctx: Option<&QueryContext>,
4598 ) -> Result<Vec<HashMap<String, Value>>> {
4599 let mut results = Vec::new();
4600 for row in input_rows {
4601 let val = self
4602 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4603 .await?;
4604 if let Value::List(items) = val {
4605 for item in items {
4606 let mut new_row = row.clone();
4607 new_row.insert(variable.to_string(), item);
4608 results.push(new_row);
4609 }
4610 }
4611 }
4612 Ok(results)
4613 }
4614
4615 pub(crate) async fn execute_apply(
4617 &self,
4618 input_rows: Vec<HashMap<String, Value>>,
4619 subquery: &LogicalPlan,
4620 input_filter: Option<&Expr>,
4621 prop_manager: &PropertyManager,
4622 params: &HashMap<String, Value>,
4623 ctx: Option<&QueryContext>,
4624 ) -> Result<Vec<HashMap<String, Value>>> {
4625 let mut filtered_rows = input_rows;
4626
4627 if let Some(filter) = input_filter {
4628 let mut filtered = Vec::new();
4629 for row in filtered_rows {
4630 let res = self
4631 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4632 .await?;
4633 if res.as_bool().unwrap_or(false) {
4634 filtered.push(row);
4635 }
4636 }
4637 filtered_rows = filtered;
4638 }
4639
4640 if filtered_rows.is_empty() {
4643 let sub_rows = self
4644 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4645 .await?;
4646 return Ok(sub_rows);
4647 }
4648
4649 let mut results = Vec::new();
4650 for row in filtered_rows {
4651 let mut sub_params = params.clone();
4652 sub_params.extend(row.clone());
4653
4654 let sub_rows = self
4655 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4656 .await?;
4657
4658 for sub_row in sub_rows {
4659 let mut new_row = row.clone();
4660 new_row.extend(sub_row);
4661 results.push(new_row);
4662 }
4663 }
4664 Ok(results)
4665 }
4666
4667 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4669 let schema = self.storage.schema_manager().schema();
4670 let mut rows = Vec::new();
4671 for idx in &schema.indexes {
4672 let (name, type_str, details) = match idx {
4673 uni_common::core::schema::IndexDefinition::Vector(c) => (
4674 c.name.clone(),
4675 "VECTOR",
4676 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4677 ),
4678 uni_common::core::schema::IndexDefinition::FullText(c) => (
4679 c.name.clone(),
4680 "FULLTEXT",
4681 format!("on {}:{:?}", c.label, c.properties),
4682 ),
4683 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4684 cfg.name.clone(),
4685 "SCALAR",
4686 format!(":{}({:?})", cfg.label, cfg.properties),
4687 ),
4688 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4689 };
4690
4691 if let Some(f) = filter
4692 && f != type_str
4693 {
4694 continue;
4695 }
4696
4697 let mut row = HashMap::new();
4698 row.insert("name".to_string(), Value::String(name));
4699 row.insert("type".to_string(), Value::String(type_str.to_string()));
4700 row.insert("details".to_string(), Value::String(details));
4701 rows.push(row);
4702 }
4703 rows
4704 }
4705
4706 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4707 let mut row = HashMap::new();
4708 row.insert("name".to_string(), Value::String("uni".to_string()));
4709 vec![row]
4711 }
4712
4713 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4714 vec![]
4716 }
4717
4718 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4719 let snapshot = self
4720 .storage
4721 .snapshot_manager()
4722 .load_latest_snapshot()
4723 .await?;
4724 let mut results = Vec::new();
4725
4726 if let Some(snap) = snapshot {
4727 for (label, s) in &snap.vertices {
4728 let mut row = HashMap::new();
4729 row.insert("type".to_string(), Value::String("Label".to_string()));
4730 row.insert("name".to_string(), Value::String(label.clone()));
4731 row.insert("count".to_string(), Value::Int(s.count as i64));
4732 results.push(row);
4733 }
4734 for (edge, s) in &snap.edges {
4735 let mut row = HashMap::new();
4736 row.insert("type".to_string(), Value::String("Edge".to_string()));
4737 row.insert("name".to_string(), Value::String(edge.clone()));
4738 row.insert("count".to_string(), Value::Int(s.count as i64));
4739 results.push(row);
4740 }
4741 }
4742
4743 Ok(results)
4744 }
4745
4746 pub(crate) fn execute_show_constraints(
4747 &self,
4748 clause: ShowConstraints,
4749 ) -> Vec<HashMap<String, Value>> {
4750 let schema = self.storage.schema_manager().schema();
4751 let mut rows = Vec::new();
4752 for c in &schema.constraints {
4753 if let Some(target) = &clause.target {
4754 match (target, &c.target) {
4755 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4756 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4757 if e1 == e2 => {}
4758 _ => continue,
4759 }
4760 }
4761
4762 let mut row = HashMap::new();
4763 row.insert("name".to_string(), Value::String(c.name.clone()));
4764 let type_str = match c.constraint_type {
4765 ConstraintType::Unique { .. } => "UNIQUE",
4766 ConstraintType::Exists { .. } => "EXISTS",
4767 ConstraintType::Check { .. } => "CHECK",
4768 _ => "UNKNOWN",
4769 };
4770 row.insert("type".to_string(), Value::String(type_str.to_string()));
4771
4772 let target_str = match &c.target {
4773 ConstraintTarget::Label(l) => format!("(:{})", l),
4774 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4775 _ => "UNKNOWN".to_string(),
4776 };
4777 row.insert("target".to_string(), Value::String(target_str));
4778
4779 rows.push(row);
4780 }
4781 rows
4782 }
4783
4784 pub(crate) async fn execute_cross_join(
4786 &self,
4787 left: Box<LogicalPlan>,
4788 right: Box<LogicalPlan>,
4789 prop_manager: &PropertyManager,
4790 params: &HashMap<String, Value>,
4791 ctx: Option<&QueryContext>,
4792 ) -> Result<Vec<HashMap<String, Value>>> {
4793 let left_rows = self
4794 .execute_subplan(*left, prop_manager, params, ctx)
4795 .await?;
4796 let right_rows = self
4797 .execute_subplan(*right, prop_manager, params, ctx)
4798 .await?;
4799
4800 let mut results = Vec::new();
4801 for l in &left_rows {
4802 for r in &right_rows {
4803 let mut combined = l.clone();
4804 combined.extend(r.clone());
4805 results.push(combined);
4806 }
4807 }
4808 Ok(results)
4809 }
4810
4811 pub(crate) async fn execute_union(
4813 &self,
4814 left: Box<LogicalPlan>,
4815 right: Box<LogicalPlan>,
4816 all: bool,
4817 prop_manager: &PropertyManager,
4818 params: &HashMap<String, Value>,
4819 ctx: Option<&QueryContext>,
4820 ) -> Result<Vec<HashMap<String, Value>>> {
4821 let mut left_rows = self
4822 .execute_subplan(*left, prop_manager, params, ctx)
4823 .await?;
4824 let mut right_rows = self
4825 .execute_subplan(*right, prop_manager, params, ctx)
4826 .await?;
4827
4828 left_rows.append(&mut right_rows);
4829
4830 if !all {
4831 let mut seen = HashSet::new();
4832 left_rows.retain(|row| {
4833 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4834 let key = format!("{sorted_row:?}");
4835 seen.insert(key)
4836 });
4837 }
4838 Ok(left_rows)
4839 }
4840
4841 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4843 let schema = self.storage.schema_manager().schema();
4844 schema.indexes.iter().any(|idx| match idx {
4845 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4846 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4847 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4848 _ => false,
4849 })
4850 }
4851
4852 pub(crate) async fn execute_export(
4853 &self,
4854 target: &str,
4855 source: &str,
4856 options: &HashMap<String, Value>,
4857 prop_manager: &PropertyManager,
4858 ctx: Option<&QueryContext>,
4859 ) -> Result<Vec<HashMap<String, Value>>> {
4860 let format = options
4861 .get("format")
4862 .and_then(|v| v.as_str())
4863 .unwrap_or("csv")
4864 .to_lowercase();
4865
4866 match format.as_str() {
4867 "csv" => {
4868 self.execute_csv_export(target, source, options, prop_manager, ctx)
4869 .await
4870 }
4871 "parquet" => {
4872 self.execute_parquet_export(target, source, options, prop_manager, ctx)
4873 .await
4874 }
4875 _ => Err(anyhow!("Unsupported export format: {}", format)),
4876 }
4877 }
4878
4879 pub(crate) async fn execute_csv_export(
4880 &self,
4881 target: &str,
4882 source: &str,
4883 options: &HashMap<String, Value>,
4884 prop_manager: &PropertyManager,
4885 ctx: Option<&QueryContext>,
4886 ) -> Result<Vec<HashMap<String, Value>>> {
4887 let validated_dest = self.validate_path(source)?;
4889
4890 let schema = self.storage.schema_manager().schema();
4891 let label_meta = schema.labels.get(target);
4892 let edge_meta = schema.edge_types.get(target);
4893
4894 if label_meta.is_none() && edge_meta.is_none() {
4895 return Err(anyhow!("Target '{}' not found in schema", target));
4896 }
4897
4898 let delimiter_str = options
4899 .get("delimiter")
4900 .and_then(|v| v.as_str())
4901 .unwrap_or(",");
4902 let delimiter = if delimiter_str.is_empty() {
4903 b','
4904 } else {
4905 delimiter_str.as_bytes()[0]
4906 };
4907 let has_header = options
4908 .get("header")
4909 .and_then(|v| v.as_bool())
4910 .unwrap_or(true);
4911
4912 let mut wtr = csv::WriterBuilder::new()
4913 .delimiter(delimiter)
4914 .from_path(&validated_dest)?;
4915
4916 let mut count = 0;
4917 let empty_props = HashMap::new();
4919
4920 if let Some(meta) = label_meta {
4921 let label_id = meta.id;
4922 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4923 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4924 prop_names.sort();
4925
4926 let mut headers = vec!["_vid".to_string()];
4927 headers.extend(prop_names.clone());
4928
4929 if has_header {
4930 wtr.write_record(&headers)?;
4931 }
4932
4933 let vids = self
4934 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4935 .await?;
4936
4937 for vid in vids {
4938 let props = prop_manager
4939 .get_all_vertex_props_with_ctx(vid, ctx)
4940 .await?
4941 .unwrap_or_default();
4942
4943 let mut row = Vec::with_capacity(headers.len());
4944 row.push(vid.to_string());
4945 for p_name in &prop_names {
4946 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4947 row.push(self.format_csv_value(val));
4948 }
4949 wtr.write_record(&row)?;
4950 count += 1;
4951 }
4952 } else if let Some(meta) = edge_meta {
4953 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4954 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4955 prop_names.sort();
4956
4957 let mut headers = vec![
4959 "_eid".to_string(),
4960 "_src".to_string(),
4961 "_dst".to_string(),
4962 "_type".to_string(),
4963 ];
4964 headers.extend(prop_names.clone());
4965
4966 if has_header {
4967 wtr.write_record(&headers)?;
4968 }
4969
4970 let edges = self.scan_edge_type(target, ctx).await?;
4971
4972 for (eid, src, dst) in edges {
4973 let props = prop_manager
4974 .get_all_edge_props_with_ctx(eid, ctx)
4975 .await?
4976 .unwrap_or_default();
4977
4978 let mut row = Vec::with_capacity(headers.len());
4979 row.push(eid.to_string());
4980 row.push(src.to_string());
4981 row.push(dst.to_string());
4982 row.push(meta.id.to_string());
4983
4984 for p_name in &prop_names {
4985 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4986 row.push(self.format_csv_value(val));
4987 }
4988 wtr.write_record(&row)?;
4989 count += 1;
4990 }
4991 }
4992
4993 wtr.flush()?;
4994 let mut res = HashMap::new();
4995 res.insert("count".to_string(), Value::Int(count as i64));
4996 Ok(vec![res])
4997 }
4998
4999 pub(crate) async fn execute_parquet_export(
5003 &self,
5004 target: &str,
5005 destination: &str,
5006 _options: &HashMap<String, Value>,
5007 prop_manager: &PropertyManager,
5008 ctx: Option<&QueryContext>,
5009 ) -> Result<Vec<HashMap<String, Value>>> {
5010 let schema_manager = self.storage.schema_manager();
5011 let schema = schema_manager.schema();
5012 let label_meta = schema.labels.get(target);
5013 let edge_meta = schema.edge_types.get(target);
5014
5015 if label_meta.is_none() && edge_meta.is_none() {
5016 return Err(anyhow!("Target '{}' not found in schema", target));
5017 }
5018
5019 let arrow_schema = if label_meta.is_some() {
5020 let dataset = self.storage.vertex_dataset(target)?;
5021 dataset.get_arrow_schema(&schema)?
5022 } else {
5023 let dataset = self.storage.edge_dataset(target, "", "")?;
5025 dataset.get_arrow_schema(&schema)?
5026 };
5027
5028 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5029
5030 if let Some(meta) = label_meta {
5031 let label_id = meta.id;
5032 let vids = self
5033 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5034 .await?;
5035
5036 for vid in vids {
5037 let mut props = prop_manager
5038 .get_all_vertex_props_with_ctx(vid, ctx)
5039 .await?
5040 .unwrap_or_default();
5041
5042 props.insert(
5043 "_vid".to_string(),
5044 uni_common::Value::Int(vid.as_u64() as i64),
5045 );
5046 if !props.contains_key("_uid") {
5047 props.insert(
5048 "_uid".to_string(),
5049 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5050 );
5051 }
5052 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5053 props.insert("_version".to_string(), uni_common::Value::Int(1));
5054 rows.push(props);
5055 }
5056 } else if edge_meta.is_some() {
5057 let edges = self.scan_edge_type(target, ctx).await?;
5058 for (eid, src, dst) in edges {
5059 let mut props = prop_manager
5060 .get_all_edge_props_with_ctx(eid, ctx)
5061 .await?
5062 .unwrap_or_default();
5063
5064 props.insert(
5065 "eid".to_string(),
5066 uni_common::Value::Int(eid.as_u64() as i64),
5067 );
5068 props.insert(
5069 "src_vid".to_string(),
5070 uni_common::Value::Int(src.as_u64() as i64),
5071 );
5072 props.insert(
5073 "dst_vid".to_string(),
5074 uni_common::Value::Int(dst.as_u64() as i64),
5075 );
5076 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5077 props.insert("_version".to_string(), uni_common::Value::Int(1));
5078 rows.push(props);
5079 }
5080 }
5081
5082 if is_cloud_url(destination) {
5084 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5085 .await?;
5086 } else {
5087 let validated_dest = self.validate_path(destination)?;
5089 let file = std::fs::File::create(&validated_dest)?;
5090 let mut writer =
5091 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5092
5093 if !rows.is_empty() {
5095 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5096 writer.write(&batch)?;
5097 }
5098
5099 writer.close()?;
5100 }
5101
5102 let mut res = HashMap::new();
5103 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5104 Ok(vec![res])
5105 }
5106
5107 async fn write_parquet_to_cloud(
5109 &self,
5110 dest_url: &str,
5111 rows: &[HashMap<String, uni_common::Value>],
5112 arrow_schema: &arrow_schema::Schema,
5113 ) -> Result<()> {
5114 use object_store::ObjectStore;
5115
5116 let (store, path) = build_store_from_url(dest_url)?;
5117
5118 let mut buffer = Vec::new();
5120 {
5121 let mut writer = parquet::arrow::ArrowWriter::try_new(
5122 &mut buffer,
5123 Arc::new(arrow_schema.clone()),
5124 None,
5125 )?;
5126
5127 if !rows.is_empty() {
5128 let batch = self.rows_to_batch(rows, arrow_schema)?;
5129 writer.write(&batch)?;
5130 }
5131
5132 writer.close()?;
5133 }
5134
5135 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5137
5138 Ok(())
5139 }
5140
5141 pub(crate) fn rows_to_batch(
5142 &self,
5143 rows: &[HashMap<String, uni_common::Value>],
5144 schema: &arrow_schema::Schema,
5145 ) -> Result<RecordBatch> {
5146 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5147
5148 for field in schema.fields() {
5149 let name = field.name();
5150 let dt = field.data_type();
5151
5152 let values: Vec<uni_common::Value> = rows
5153 .iter()
5154 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5155 .collect();
5156 let array = self.values_to_array(&values, dt)?;
5157 columns.push(array);
5158 }
5159
5160 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5161 }
5162
5163 pub(crate) fn values_to_array(
5166 &self,
5167 values: &[uni_common::Value],
5168 dt: &arrow_schema::DataType,
5169 ) -> Result<Arc<dyn Array>> {
5170 arrow_convert::values_to_array(values, dt)
5171 }
5172
5173 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5174 match val {
5175 Value::Null => "".to_string(),
5176 Value::String(s) => s,
5177 Value::Int(i) => i.to_string(),
5178 Value::Float(f) => f.to_string(),
5179 Value::Bool(b) => b.to_string(),
5180 _ => format!("{val}"),
5181 }
5182 }
5183
5184 pub(crate) fn parse_csv_value(
5185 &self,
5186 s: &str,
5187 data_type: &uni_common::core::schema::DataType,
5188 prop_name: &str,
5189 ) -> Result<Value> {
5190 if s.is_empty() || s.to_lowercase() == "null" {
5191 return Ok(Value::Null);
5192 }
5193
5194 use uni_common::core::schema::DataType;
5195 match data_type {
5196 DataType::String => Ok(Value::String(s.to_string())),
5197 DataType::Int32 | DataType::Int64 => {
5198 let i = s.parse::<i64>().map_err(|_| {
5199 anyhow!(
5200 "Failed to parse integer for property '{}': {}",
5201 prop_name,
5202 s
5203 )
5204 })?;
5205 Ok(Value::Int(i))
5206 }
5207 DataType::Float32 | DataType::Float64 => {
5208 let f = s.parse::<f64>().map_err(|_| {
5209 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5210 })?;
5211 Ok(Value::Float(f))
5212 }
5213 DataType::Bool => {
5214 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5215 anyhow!(
5216 "Failed to parse boolean for property '{}': {}",
5217 prop_name,
5218 s
5219 )
5220 })?;
5221 Ok(Value::Bool(b))
5222 }
5223 DataType::CypherValue => {
5224 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5225 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5226 })?;
5227 Ok(Value::from(json_val))
5228 }
5229 DataType::Vector { .. } => {
5230 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5231 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5232 })?;
5233 Ok(Value::Vector(v))
5234 }
5235 _ => Ok(Value::String(s.to_string())),
5236 }
5237 }
5238
5239 pub(crate) async fn detach_delete_vertex(&self, vid: Vid, writer: &mut Writer) -> Result<()> {
5240 let schema = self.storage.schema_manager().schema();
5241 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5242
5243 let out_graph = self
5245 .storage
5246 .load_subgraph_cached(
5247 &[vid],
5248 &edge_type_ids,
5249 1,
5250 uni_store::runtime::Direction::Outgoing,
5251 Some(writer.l0_manager.get_current()),
5252 )
5253 .await?;
5254
5255 for edge in out_graph.edges() {
5256 writer
5257 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5258 .await?;
5259 }
5260
5261 let in_graph = self
5263 .storage
5264 .load_subgraph_cached(
5265 &[vid],
5266 &edge_type_ids,
5267 1,
5268 uni_store::runtime::Direction::Incoming,
5269 Some(writer.l0_manager.get_current()),
5270 )
5271 .await?;
5272
5273 for edge in in_graph.edges() {
5274 writer
5275 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5276 .await?;
5277 }
5278
5279 Ok(())
5280 }
5281
5282 pub(crate) async fn batch_detach_delete_vertices(
5284 &self,
5285 vids: &[Vid],
5286 labels_per_vid: Vec<Option<Vec<String>>>,
5287 writer: &mut Writer,
5288 ) -> Result<()> {
5289 let schema = self.storage.schema_manager().schema();
5290 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5291
5292 let out_graph = self
5294 .storage
5295 .load_subgraph_cached(
5296 vids,
5297 &edge_type_ids,
5298 1,
5299 uni_store::runtime::Direction::Outgoing,
5300 Some(writer.l0_manager.get_current()),
5301 )
5302 .await?;
5303
5304 for edge in out_graph.edges() {
5305 writer
5306 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5307 .await?;
5308 }
5309
5310 let in_graph = self
5312 .storage
5313 .load_subgraph_cached(
5314 vids,
5315 &edge_type_ids,
5316 1,
5317 uni_store::runtime::Direction::Incoming,
5318 Some(writer.l0_manager.get_current()),
5319 )
5320 .await?;
5321
5322 for edge in in_graph.edges() {
5323 writer
5324 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5325 .await?;
5326 }
5327
5328 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5330 writer.delete_vertex(*vid, labels).await?;
5331 }
5332
5333 Ok(())
5334 }
5335}