1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54use uni_store::storage::index_manager::IndexManager;
55
56use crate::query::df_graph::L0Context;
58use crate::query::df_planner::HybridPhysicalPlanner;
59use datafusion::physical_plan::ExecutionPlanProperties;
60use datafusion::prelude::SessionContext;
61use parking_lot::RwLock as SyncRwLock;
62
63use arrow_array::{Array, RecordBatch};
64use csv;
65use parquet;
66
67use super::core::*;
68
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
73
74fn collect_l0_vids(
79 ctx: Option<&QueryContext>,
80 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
81) -> Vec<Vid> {
82 let mut vids = Vec::new();
83 if let Some(ctx) = ctx {
84 vids.extend(extractor(&ctx.l0.read()));
85 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
86 vids.extend(extractor(&tx_l0_arc.read()));
87 }
88 for pending_l0_arc in &ctx.pending_flush_l0s {
89 vids.extend(extractor(&pending_l0_arc.read()));
90 }
91 }
92 vids
93}
94
95async fn hydrate_entity_if_needed(
104 map: &mut HashMap<String, Value>,
105 prop_manager: &PropertyManager,
106 ctx: Option<&QueryContext>,
107) {
108 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
110 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
111 tracing::debug!(
112 "Pushdown fallback: hydrating edge {} at execution time",
113 eid_u64
114 );
115 if let Ok(Some(props)) = prop_manager
116 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
117 .await
118 {
119 for (key, value) in props {
120 map.entry(key).or_insert(value);
121 }
122 }
123 } else {
124 tracing::trace!(
125 "Pushdown success: edge {} already has {} properties",
126 eid_u64,
127 map.len() - EDGE_SYSTEM_FIELD_COUNT
128 );
129 }
130 return;
131 }
132
133 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
135 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
136 tracing::debug!(
137 "Pushdown fallback: hydrating vertex {} at execution time",
138 vid_u64
139 );
140 if let Ok(Some(props)) = prop_manager
141 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
142 .await
143 {
144 for (key, value) in props {
145 map.entry(key).or_insert(value);
146 }
147 }
148 } else {
149 tracing::trace!(
150 "Pushdown success: vertex {} already has {} properties",
151 vid_u64,
152 map.len() - VERTEX_SYSTEM_FIELD_COUNT
153 );
154 }
155 }
156}
157
158impl Executor {
159 async fn verify_and_filter_candidates(
164 &self,
165 mut candidates: Vec<Vid>,
166 variable: &str,
167 filter: Option<&Expr>,
168 ctx: Option<&QueryContext>,
169 prop_manager: &PropertyManager,
170 params: &HashMap<String, Value>,
171 ) -> Result<Vec<Vid>> {
172 candidates.sort_unstable();
173 candidates.dedup();
174
175 let mut verified_vids = Vec::new();
176 for vid in candidates {
177 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
178 continue; };
180
181 if let Some(expr) = filter {
182 let mut props_map: HashMap<String, Value> = props;
183 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
184
185 let mut row = HashMap::new();
186 row.insert(variable.to_string(), Value::Map(props_map));
187
188 let res = self
189 .evaluate_expr(expr, &row, prop_manager, params, ctx)
190 .await?;
191 if res.as_bool().unwrap_or(false) {
192 verified_vids.push(vid);
193 }
194 } else {
195 verified_vids.push(vid);
196 }
197 }
198
199 Ok(verified_vids)
200 }
201
202 pub(crate) async fn scan_storage_candidates(
203 &self,
204 label_id: u16,
205 variable: &str,
206 filter: Option<&Expr>,
207 ) -> Result<Vec<Vid>> {
208 let schema = self.storage.schema_manager().schema();
209 let label_name = schema
210 .label_name_by_id(label_id)
211 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
212
213 let ds = self.storage.vertex_dataset(label_name)?;
214 let lancedb_store = self.storage.lancedb_store();
215
216 match ds.open_lancedb(lancedb_store).await {
218 Ok(table) => {
219 use arrow_array::UInt64Array;
220 use futures::TryStreamExt;
221 use lancedb::query::{ExecutableQuery, QueryBase, Select};
222
223 let mut query = table.query();
224
225 let empty_props = std::collections::HashMap::new();
230 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
231 if let Some(expr) = filter
232 && let Some(sql) = LanceFilterGenerator::generate(
233 std::slice::from_ref(expr),
234 variable,
235 Some(label_props),
236 )
237 {
238 query = query.only_if(format!("_deleted = false AND ({})", sql));
239 } else {
240 query = query.only_if("_deleted = false");
241 }
242
243 let query = query.select(Select::columns(&["_vid"]));
245 let stream = query.execute().await?;
246 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
247
248 let mut vids = Vec::new();
249 for batch in batches {
250 let vid_col = batch
251 .column_by_name("_vid")
252 .ok_or(anyhow!("Missing _vid"))?
253 .as_any()
254 .downcast_ref::<UInt64Array>()
255 .ok_or(anyhow!("Invalid _vid"))?;
256 for i in 0..batch.num_rows() {
257 vids.push(Vid::from(vid_col.value(i)));
258 }
259 }
260 Ok(vids)
261 }
262 Err(e) => {
263 let err_msg = e.to_string().to_lowercase();
266 if err_msg.contains("not found")
267 || err_msg.contains("does not exist")
268 || err_msg.contains("no such file")
269 || err_msg.contains("object not found")
270 {
271 Ok(Vec::new())
272 } else {
273 Err(e)
274 }
275 }
276 }
277 }
278
279 pub(crate) async fn scan_label_with_filter(
280 &self,
281 label_id: u16,
282 variable: &str,
283 filter: Option<&Expr>,
284 ctx: Option<&QueryContext>,
285 prop_manager: &PropertyManager,
286 params: &HashMap<String, Value>,
287 ) -> Result<Vec<Vid>> {
288 let mut candidates = self
289 .scan_storage_candidates(label_id, variable, filter)
290 .await?;
291
292 let schema = self.storage.schema_manager().schema();
294 if let Some(label_name) = schema.label_name_by_id(label_id) {
295 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
296 }
297
298 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
299 .await
300 }
301
302 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
303 if let Value::Node(node) = val {
305 return Ok(node.vid);
306 }
307 if let Value::Map(map) = val
309 && let Some(vid_val) = map.get("_vid")
310 && let Some(v) = vid_val.as_u64()
311 {
312 return Ok(Vid::from(v));
313 }
314 if let Some(s) = val.as_str()
316 && let Ok(id) = s.parse::<u64>()
317 {
318 return Ok(Vid::new(id));
319 }
320 if let Some(v) = val.as_u64() {
322 return Ok(Vid::from(v));
323 }
324 Err(anyhow!("Invalid Vid format: {:?}", val))
325 }
326
327 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
333 for val in row.values() {
334 if let Ok(vid) = Self::vid_from_value(val)
335 && vid == target_vid
336 {
337 return val.clone();
338 }
339 }
340 Value::Map(HashMap::from([(
342 "_vid".to_string(),
343 Value::Int(target_vid.as_u64() as i64),
344 )]))
345 }
346
347 pub async fn create_datafusion_planner(
352 &self,
353 prop_manager: &PropertyManager,
354 params: &HashMap<String, Value>,
355 ) -> Result<(
356 Arc<SyncRwLock<SessionContext>>,
357 HybridPhysicalPlanner,
358 Arc<PropertyManager>,
359 )> {
360 let query_ctx = self.get_context().await;
361 let l0_context = match query_ctx {
362 Some(ref ctx) => L0Context::from_query_context(ctx),
363 None => L0Context::empty(),
364 };
365
366 let prop_manager_arc = Arc::new(PropertyManager::new(
367 self.storage.clone(),
368 self.storage.schema_manager_arc(),
369 prop_manager.cache_size(),
370 ));
371
372 let session = SessionContext::new();
373 crate::query::df_udfs::register_cypher_udfs(&session)?;
374 let session_ctx = Arc::new(SyncRwLock::new(session));
375
376 let mut planner = HybridPhysicalPlanner::with_l0_context(
377 session_ctx.clone(),
378 self.storage.clone(),
379 l0_context,
380 prop_manager_arc.clone(),
381 self.storage.schema_manager().schema(),
382 params.clone(),
383 HashMap::new(),
384 );
385
386 planner = planner.with_algo_registry(self.algo_registry.clone());
387 if let Some(ref registry) = self.procedure_registry {
388 planner = planner.with_procedure_registry(registry.clone());
389 }
390 if let Some(ref xervo_runtime) = self.xervo_runtime {
391 planner = planner.with_xervo_runtime(xervo_runtime.clone());
392 }
393
394 Ok((session_ctx, planner, prop_manager_arc))
395 }
396
397 pub fn collect_batches(
399 session_ctx: &Arc<SyncRwLock<SessionContext>>,
400 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
401 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
402 Box::pin(async move {
403 use futures::TryStreamExt;
404
405 let task_ctx = session_ctx.read().task_ctx();
406 let partition_count = execution_plan.output_partitioning().partition_count();
407 let mut all_batches = Vec::new();
408 for partition in 0..partition_count {
409 let stream = execution_plan.execute(partition, task_ctx.clone())?;
410 let batches: Vec<RecordBatch> = stream.try_collect().await?;
411 all_batches.extend(batches);
412 }
413 Ok(all_batches)
414 })
415 }
416
417 pub async fn execute_datafusion(
422 &self,
423 plan: LogicalPlan,
424 prop_manager: &PropertyManager,
425 params: &HashMap<String, Value>,
426 ) -> Result<Vec<RecordBatch>> {
427 let (session_ctx, mut planner, prop_manager_arc) =
428 self.create_datafusion_planner(prop_manager, params).await?;
429
430 if Self::contains_write_operations(&plan) {
432 let writer = self
433 .writer
434 .as_ref()
435 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
436 .clone();
437 let query_ctx = self.get_context().await;
438
439 debug_assert!(
440 query_ctx.is_some(),
441 "BUG: query_ctx is None for write operation"
442 );
443
444 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
445 executor: self.clone(),
446 writer,
447 prop_manager: prop_manager_arc,
448 params: params.clone(),
449 query_ctx,
450 });
451 planner = planner.with_mutation_context(mutation_ctx);
452 tracing::debug!(
453 plan_type = Self::get_plan_type(&plan),
454 "Mutation routed to DataFusion engine"
455 );
456 }
457
458 let execution_plan = planner.plan(&plan)?;
459 Self::collect_batches(&session_ctx, execution_plan).await
460 }
461
462 pub(crate) async fn execute_merge_read_plan(
468 &self,
469 plan: LogicalPlan,
470 prop_manager: &PropertyManager,
471 params: &HashMap<String, Value>,
472 merge_variables: Vec<String>,
473 ) -> Result<Vec<HashMap<String, Value>>> {
474 let (session_ctx, planner, _prop_manager_arc) =
475 self.create_datafusion_planner(prop_manager, params).await?;
476
477 let extra: HashMap<String, HashSet<String>> = merge_variables
480 .iter()
481 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
482 .collect();
483 let execution_plan = planner.plan_with_properties(&plan, extra)?;
484 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
485
486 let flat_rows = self.record_batches_to_rows(all_batches)?;
488
489 let rows = flat_rows
492 .into_iter()
493 .map(|mut row| {
494 for var in &merge_variables {
495 if row.contains_key(var) {
497 continue;
498 }
499 let prefix = format!("{}.", var);
500 let dotted_keys: Vec<String> = row
501 .keys()
502 .filter(|k| k.starts_with(&prefix))
503 .cloned()
504 .collect();
505 if !dotted_keys.is_empty() {
506 let mut map = HashMap::new();
507 for key in dotted_keys {
508 let prop_name = key[prefix.len()..].to_string();
509 if let Some(val) = row.remove(&key) {
510 map.insert(prop_name, val);
511 }
512 }
513 row.insert(var.clone(), Value::Map(map));
514 }
515 }
516 row
517 })
518 .collect();
519
520 Ok(rows)
521 }
522
523 fn record_batches_to_rows(
530 &self,
531 batches: Vec<RecordBatch>,
532 ) -> Result<Vec<HashMap<String, Value>>> {
533 let mut rows = Vec::new();
534
535 for batch in batches {
536 let num_rows = batch.num_rows();
537 let schema = batch.schema();
538
539 for row_idx in 0..num_rows {
540 let mut row = HashMap::new();
541
542 for (col_idx, field) in schema.fields().iter().enumerate() {
543 let column = batch.column(col_idx);
544 let data_type =
546 if uni_common::core::schema::is_datetime_struct(field.data_type()) {
547 Some(&uni_common::DataType::DateTime)
548 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
549 Some(&uni_common::DataType::Time)
550 } else {
551 None
552 };
553 let mut value =
554 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
555
556 if field
559 .metadata()
560 .get("cv_encoded")
561 .is_some_and(|v| v == "true")
562 && let Value::String(s) = &value
563 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
564 {
565 value = Value::from(parsed);
566 }
567
568 value = Self::normalize_path_if_needed(value);
570
571 row.insert(field.name().clone(), value);
572 }
573
574 let bare_vars: Vec<String> = row
583 .keys()
584 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
585 .cloned()
586 .collect();
587
588 let vid_placeholder_vars: Vec<String> = row
592 .keys()
593 .filter(|k| {
594 !k.contains('.')
595 && matches!(row.get(*k), Some(Value::String(_)))
596 && row.contains_key(&format!("{}._vid", k))
597 })
598 .cloned()
599 .collect();
600
601 for var in &vid_placeholder_vars {
602 let prefix = format!("{}.", var);
604 let mut map = HashMap::new();
605
606 let dotted_keys: Vec<String> = row
607 .keys()
608 .filter(|k| k.starts_with(&prefix))
609 .cloned()
610 .collect();
611
612 for key in &dotted_keys {
613 let prop_name = &key[prefix.len()..];
614 if let Some(val) = row.remove(key) {
615 map.insert(prop_name.to_string(), val);
616 }
617 }
618
619 row.insert(var.clone(), Value::Map(map));
621 }
622
623 for var in &bare_vars {
624 let vid_key = format!("{}._vid", var);
626 let labels_key = format!("{}._labels", var);
627
628 let vid_val = row.remove(&vid_key);
629 let labels_val = row.remove(&labels_key);
630
631 if let Some(Value::Map(map)) = row.get_mut(var) {
632 if let Some(v) = vid_val {
633 map.insert("_vid".to_string(), v);
634 }
635 if let Some(v) = labels_val {
636 map.insert("_labels".to_string(), v);
637 }
638 }
639
640 let eid_key = format!("{}._eid", var);
645 let type_key = format!("{}._type", var);
646
647 let eid_val = row.remove(&eid_key);
648 let type_val = row.remove(&type_key);
649
650 if (eid_val.is_some() || type_val.is_some())
651 && let Some(Value::Map(map)) = row.get_mut(var)
652 {
653 if let Some(v) = eid_val {
654 map.entry("_eid".to_string()).or_insert(v);
655 }
656 if let Some(v) = type_val {
657 map.entry("_type".to_string()).or_insert(v);
658 }
659 }
660
661 let prefix = format!("{}.", var);
663 let helper_keys: Vec<String> = row
664 .keys()
665 .filter(|k| k.starts_with(&prefix))
666 .cloned()
667 .collect();
668 for key in helper_keys {
669 row.remove(&key);
670 }
671 }
672
673 rows.push(row);
674 }
675 }
676
677 Ok(rows)
678 }
679
680 fn normalize_path_if_needed(value: Value) -> Value {
685 match value {
686 Value::Map(map)
687 if map.contains_key("nodes")
688 && (map.contains_key("relationships") || map.contains_key("edges")) =>
689 {
690 Self::normalize_path_map(map)
691 }
692 other => other,
693 }
694 }
695
696 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
698 if let Some(Value::List(nodes)) = map.remove("nodes") {
700 let normalized_nodes: Vec<Value> = nodes
701 .into_iter()
702 .map(|n| {
703 if let Value::Map(node_map) = n {
704 Self::normalize_path_node_map(node_map)
705 } else {
706 n
707 }
708 })
709 .collect();
710 map.insert("nodes".to_string(), Value::List(normalized_nodes));
711 }
712
713 let rels_key = if map.contains_key("relationships") {
715 "relationships"
716 } else {
717 "edges"
718 };
719 if let Some(Value::List(rels)) = map.remove(rels_key) {
720 let normalized_rels: Vec<Value> = rels
721 .into_iter()
722 .map(|r| {
723 if let Value::Map(rel_map) = r {
724 Self::normalize_path_edge_map(rel_map)
725 } else {
726 r
727 }
728 })
729 .collect();
730 map.insert("relationships".to_string(), Value::List(normalized_rels));
731 }
732
733 Value::Map(map)
734 }
735
736 fn value_to_id_string(val: Value) -> String {
738 match val {
739 Value::Int(n) => n.to_string(),
740 Value::Float(n) => n.to_string(),
741 Value::String(s) => s,
742 other => other.to_string(),
743 }
744 }
745
746 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
749 if let Some(val) = map.remove(src_key) {
750 map.insert(
751 dst_key.to_string(),
752 Value::String(Self::value_to_id_string(val)),
753 );
754 }
755 }
756
757 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
759 match map.get("properties") {
760 Some(props) if !props.is_null() => {}
761 _ => {
762 map.insert("properties".to_string(), Value::Map(HashMap::new()));
763 }
764 }
765 }
766
767 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
769 Self::stringify_map_field(&mut map, "_vid", "_id");
770 Self::ensure_properties_map(&mut map);
771 Value::Map(map)
772 }
773
774 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
776 Self::stringify_map_field(&mut map, "_eid", "_id");
777 Self::stringify_map_field(&mut map, "_src", "_src");
778 Self::stringify_map_field(&mut map, "_dst", "_dst");
779
780 if let Some(type_name) = map.remove("_type_name") {
781 map.insert("_type".to_string(), type_name);
782 }
783
784 Self::ensure_properties_map(&mut map);
785 Value::Map(map)
786 }
787
788 #[instrument(
789 skip(self, prop_manager, params),
790 fields(rows_returned, duration_ms),
791 level = "info"
792 )]
793 pub fn execute<'a>(
794 &'a self,
795 plan: LogicalPlan,
796 prop_manager: &'a PropertyManager,
797 params: &'a HashMap<String, Value>,
798 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
799 Box::pin(async move {
800 let query_type = Self::get_plan_type(&plan);
801 let ctx = self.get_context().await;
802 let start = Instant::now();
803
804 let res = if Self::is_ddl_or_admin(&plan) {
807 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
808 .await
809 } else {
810 let batches = self
811 .execute_datafusion(plan.clone(), prop_manager, params)
812 .await?;
813 self.record_batches_to_rows(batches)
814 };
815
816 let duration = start.elapsed();
817 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
818 .record(duration.as_secs_f64());
819
820 tracing::Span::current().record("duration_ms", duration.as_millis());
821 match &res {
822 Ok(rows) => {
823 tracing::Span::current().record("rows_returned", rows.len());
824 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
825 .increment(rows.len() as u64);
826 }
827 Err(e) => {
828 let error_type = if e.to_string().contains("timed out") {
829 "timeout"
830 } else if e.to_string().contains("syntax") {
831 "syntax"
832 } else {
833 "execution"
834 };
835 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
836 }
837 }
838
839 res
840 })
841 }
842
843 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
844 match plan {
845 LogicalPlan::Scan { .. } => "read_scan",
846 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
847 LogicalPlan::Traverse { .. } => "read_traverse",
848 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
849 LogicalPlan::ScanAll { .. } => "read_scan_all",
850 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
851 LogicalPlan::VectorKnn { .. } => "read_vector",
852 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
853 LogicalPlan::Merge { .. } => "write_merge",
854 LogicalPlan::Delete { .. } => "write_delete",
855 LogicalPlan::Set { .. } => "write_set",
856 LogicalPlan::Remove { .. } => "write_remove",
857 LogicalPlan::ProcedureCall { .. } => "call",
858 LogicalPlan::Copy { .. } => "copy",
859 LogicalPlan::Backup { .. } => "backup",
860 _ => "other",
861 }
862 }
863
864 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
874 match plan {
875 LogicalPlan::Project { input, .. }
877 | LogicalPlan::Sort { input, .. }
878 | LogicalPlan::Limit { input, .. }
879 | LogicalPlan::Distinct { input }
880 | LogicalPlan::Aggregate { input, .. }
881 | LogicalPlan::Window { input, .. }
882 | LogicalPlan::Unwind { input, .. }
883 | LogicalPlan::Filter { input, .. }
884 | LogicalPlan::Create { input, .. }
885 | LogicalPlan::CreateBatch { input, .. }
886 | LogicalPlan::Set { input, .. }
887 | LogicalPlan::Remove { input, .. }
888 | LogicalPlan::Delete { input, .. }
889 | LogicalPlan::Merge { input, .. }
890 | LogicalPlan::Foreach { input, .. }
891 | LogicalPlan::Traverse { input, .. }
892 | LogicalPlan::TraverseMainByType { input, .. }
893 | LogicalPlan::BindZeroLengthPath { input, .. }
894 | LogicalPlan::BindPath { input, .. }
895 | LogicalPlan::ShortestPath { input, .. }
896 | LogicalPlan::AllShortestPaths { input, .. }
897 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
898
899 LogicalPlan::Apply {
901 input, subquery, ..
902 }
903 | LogicalPlan::SubqueryCall { input, subquery } => {
904 vec![input.as_ref(), subquery.as_ref()]
905 }
906 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
907 vec![left.as_ref(), right.as_ref()]
908 }
909 LogicalPlan::RecursiveCTE {
910 initial, recursive, ..
911 } => vec![initial.as_ref(), recursive.as_ref()],
912 LogicalPlan::QuantifiedPattern {
913 input,
914 pattern_plan,
915 ..
916 } => vec![input.as_ref(), pattern_plan.as_ref()],
917
918 _ => vec![],
920 }
921 }
922
923 fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
930 match plan {
931 LogicalPlan::CreateLabel(_)
933 | LogicalPlan::CreateEdgeType(_)
934 | LogicalPlan::AlterLabel(_)
935 | LogicalPlan::AlterEdgeType(_)
936 | LogicalPlan::DropLabel(_)
937 | LogicalPlan::DropEdgeType(_)
938 | LogicalPlan::CreateConstraint(_)
939 | LogicalPlan::DropConstraint(_)
940 | LogicalPlan::ShowConstraints(_) => true,
941
942 LogicalPlan::CreateVectorIndex { .. }
944 | LogicalPlan::CreateFullTextIndex { .. }
945 | LogicalPlan::CreateScalarIndex { .. }
946 | LogicalPlan::CreateJsonFtsIndex { .. }
947 | LogicalPlan::DropIndex { .. }
948 | LogicalPlan::ShowIndexes { .. } => true,
949
950 LogicalPlan::ShowDatabase
952 | LogicalPlan::ShowConfig
953 | LogicalPlan::ShowStatistics
954 | LogicalPlan::Vacuum
955 | LogicalPlan::Checkpoint
956 | LogicalPlan::Begin
957 | LogicalPlan::Commit
958 | LogicalPlan::Rollback
959 | LogicalPlan::Copy { .. }
960 | LogicalPlan::CopyTo { .. }
961 | LogicalPlan::CopyFrom { .. }
962 | LogicalPlan::Backup { .. }
963 | LogicalPlan::Explain { .. } => true,
964
965 LogicalPlan::ProcedureCall { procedure_name, .. } => {
968 !Self::is_df_eligible_procedure(procedure_name)
969 }
970
971 _ => Self::plan_children(plan)
973 .iter()
974 .any(|child| Self::is_ddl_or_admin(child)),
975 }
976 }
977
978 fn is_df_eligible_procedure(name: &str) -> bool {
984 matches!(
985 name,
986 "uni.schema.labels"
987 | "uni.schema.edgeTypes"
988 | "uni.schema.relationshipTypes"
989 | "uni.schema.indexes"
990 | "uni.schema.constraints"
991 | "uni.schema.labelInfo"
992 | "uni.vector.query"
993 | "uni.fts.query"
994 | "uni.search"
995 ) || name.starts_with("uni.algo.")
996 }
997
998 fn contains_write_operations(plan: &LogicalPlan) -> bool {
1005 match plan {
1006 LogicalPlan::Create { .. }
1007 | LogicalPlan::CreateBatch { .. }
1008 | LogicalPlan::Merge { .. }
1009 | LogicalPlan::Delete { .. }
1010 | LogicalPlan::Set { .. }
1011 | LogicalPlan::Remove { .. }
1012 | LogicalPlan::Foreach { .. } => true,
1013 _ => Self::plan_children(plan)
1014 .iter()
1015 .any(|child| Self::contains_write_operations(child)),
1016 }
1017 }
1018
1019 pub fn execute_stream(
1024 self,
1025 plan: LogicalPlan,
1026 prop_manager: Arc<PropertyManager>,
1027 params: HashMap<String, Value>,
1028 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1029 let this = self;
1030 let this_for_ctx = this.clone();
1031
1032 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1033
1034 ctx_stream
1035 .flat_map(move |ctx| {
1036 let plan = plan.clone();
1037 let this = this.clone();
1038 let prop_manager = prop_manager.clone();
1039 let params = params.clone();
1040
1041 let fut = async move {
1042 if Self::is_ddl_or_admin(&plan) {
1043 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1044 .await
1045 } else {
1046 let batches = this
1047 .execute_datafusion(plan, &prop_manager, ¶ms)
1048 .await?;
1049 this.record_batches_to_rows(batches)
1050 }
1051 };
1052 stream::once(fut).boxed()
1053 })
1054 .boxed()
1055 }
1056
1057 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1060 arrow_convert::arrow_to_value(col, row, None)
1061 }
1062
1063 pub(crate) fn evaluate_expr<'a>(
1064 &'a self,
1065 expr: &'a Expr,
1066 row: &'a HashMap<String, Value>,
1067 prop_manager: &'a PropertyManager,
1068 params: &'a HashMap<String, Value>,
1069 ctx: Option<&'a QueryContext>,
1070 ) -> BoxFuture<'a, Result<Value>> {
1071 let this = self;
1072 Box::pin(async move {
1073 let repr = expr.to_string_repr();
1075 if let Some(val) = row.get(&repr) {
1076 return Ok(val.clone());
1077 }
1078
1079 match expr {
1080 Expr::PatternComprehension { .. } => {
1081 Err(anyhow::anyhow!(
1083 "Pattern comprehensions are handled by DataFusion executor"
1084 ))
1085 }
1086 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1087 "COLLECT subqueries not yet supported in executor"
1088 )),
1089 Expr::Variable(name) => {
1090 if let Some(val) = row.get(name) {
1091 Ok(val.clone())
1092 } else if let Some(vid_val) = row.get(&format!("{}._vid", name)) {
1093 Ok(vid_val.clone())
1097 } else {
1098 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1099 }
1100 }
1101 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1102 Expr::Property(var_expr, prop_name) => {
1103 if let Expr::Variable(var_name) = var_expr.as_ref() {
1107 let flat_key = format!("{}.{}", var_name, prop_name);
1108 if let Some(val) = row.get(flat_key.as_str()) {
1109 return Ok(val.clone());
1110 }
1111 }
1112
1113 let base_val = this
1114 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1115 .await?;
1116
1117 if (prop_name == "_vid" || prop_name == "_id")
1119 && let Ok(vid) = Self::vid_from_value(&base_val)
1120 {
1121 return Ok(Value::Int(vid.as_u64() as i64));
1122 }
1123
1124 if let Value::Node(node) = &base_val {
1126 if prop_name == "_vid" || prop_name == "_id" {
1128 return Ok(Value::Int(node.vid.as_u64() as i64));
1129 }
1130 if prop_name == "_labels" {
1131 return Ok(Value::List(
1132 node.labels
1133 .iter()
1134 .map(|l| Value::String(l.clone()))
1135 .collect(),
1136 ));
1137 }
1138 if let Some(val) = node.properties.get(prop_name.as_str()) {
1140 return Ok(val.clone());
1141 }
1142 if let Ok(val) = prop_manager
1144 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1145 .await
1146 {
1147 return Ok(val);
1148 }
1149 return Ok(Value::Null);
1150 }
1151
1152 if let Value::Edge(edge) = &base_val {
1154 if prop_name == "_eid" || prop_name == "_id" {
1156 return Ok(Value::Int(edge.eid.as_u64() as i64));
1157 }
1158 if prop_name == "_type" {
1159 return Ok(Value::String(edge.edge_type.clone()));
1160 }
1161 if prop_name == "_src" {
1162 return Ok(Value::Int(edge.src.as_u64() as i64));
1163 }
1164 if prop_name == "_dst" {
1165 return Ok(Value::Int(edge.dst.as_u64() as i64));
1166 }
1167 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1169 return Ok(val.clone());
1170 }
1171 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1173 {
1174 return Ok(val);
1175 }
1176 return Ok(Value::Null);
1177 }
1178
1179 if let Value::Map(map) = &base_val {
1182 if let Some(val) = map.get(prop_name.as_str()) {
1184 return Ok(val.clone());
1185 }
1186 if let Some(Value::Map(props)) = map.get("properties")
1188 && let Some(val) = props.get(prop_name.as_str())
1189 {
1190 return Ok(val.clone());
1191 }
1192 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1194 map.get("_id")
1195 .and_then(|v| v.as_str())
1196 .and_then(|s| s.parse::<u64>().ok())
1197 });
1198 if let Some(id) = vid_opt {
1199 let vid = Vid::from(id);
1200 if let Ok(val) = prop_manager
1201 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1202 .await
1203 {
1204 return Ok(val);
1205 }
1206 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1207 let eid = uni_common::core::id::Eid::from(id);
1208 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1209 return Ok(val);
1210 }
1211 }
1212 return Ok(Value::Null);
1213 }
1214
1215 if let Ok(vid) = Self::vid_from_value(&base_val) {
1217 return prop_manager
1218 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1219 .await;
1220 }
1221
1222 if base_val.is_null() {
1223 return Ok(Value::Null);
1224 }
1225
1226 {
1228 use crate::query::datetime::{
1229 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1230 is_duration_string, is_temporal_accessor, is_temporal_string,
1231 };
1232
1233 if let Value::Temporal(tv) = &base_val {
1235 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1236 if is_duration_accessor(prop_name) {
1237 return eval_duration_accessor(
1239 &base_val.to_string(),
1240 prop_name,
1241 );
1242 }
1243 } else if is_temporal_accessor(prop_name) {
1244 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1245 }
1246 }
1247
1248 if let Value::String(s) = &base_val {
1250 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1251 return eval_temporal_accessor(s, prop_name);
1252 }
1253 if is_duration_string(s) && is_duration_accessor(prop_name) {
1254 return eval_duration_accessor(s, prop_name);
1255 }
1256 }
1257 }
1258
1259 Err(anyhow!(
1260 "Cannot access property '{}' on {:?}",
1261 prop_name,
1262 base_val
1263 ))
1264 }
1265 Expr::ArrayIndex {
1266 array: arr_expr,
1267 index: idx_expr,
1268 } => {
1269 let arr_val = this
1270 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1271 .await?;
1272 let idx_val = this
1273 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1274 .await?;
1275
1276 if let Value::List(arr) = &arr_val {
1277 if let Some(i) = idx_val.as_i64() {
1279 let idx = if i < 0 {
1280 let positive_idx = arr.len() as i64 + i;
1282 if positive_idx < 0 {
1283 return Ok(Value::Null); }
1285 positive_idx as usize
1286 } else {
1287 i as usize
1288 };
1289 if idx < arr.len() {
1290 return Ok(arr[idx].clone());
1291 }
1292 return Ok(Value::Null);
1293 } else if idx_val.is_null() {
1294 return Ok(Value::Null);
1295 } else {
1296 return Err(anyhow::anyhow!(
1297 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1298 idx_val
1299 ));
1300 }
1301 }
1302 if let Value::Map(map) = &arr_val {
1303 if let Some(key) = idx_val.as_str() {
1304 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1305 } else if !idx_val.is_null() {
1306 return Err(anyhow::anyhow!(
1307 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1308 idx_val
1309 ));
1310 }
1311 }
1312 if let Value::Node(node) = &arr_val {
1314 if let Some(key) = idx_val.as_str() {
1315 if let Some(val) = node.properties.get(key) {
1317 return Ok(val.clone());
1318 }
1319 if let Ok(val) = prop_manager
1321 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1322 .await
1323 {
1324 return Ok(val);
1325 }
1326 return Ok(Value::Null);
1327 } else if !idx_val.is_null() {
1328 return Err(anyhow::anyhow!(
1329 "TypeError: Node index must be a string, got: {:?}",
1330 idx_val
1331 ));
1332 }
1333 }
1334 if let Value::Edge(edge) = &arr_val {
1336 if let Some(key) = idx_val.as_str() {
1337 if let Some(val) = edge.properties.get(key) {
1339 return Ok(val.clone());
1340 }
1341 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1343 return Ok(val);
1344 }
1345 return Ok(Value::Null);
1346 } else if !idx_val.is_null() {
1347 return Err(anyhow::anyhow!(
1348 "TypeError: Edge index must be a string, got: {:?}",
1349 idx_val
1350 ));
1351 }
1352 }
1353 if let Ok(vid) = Self::vid_from_value(&arr_val)
1355 && let Some(key) = idx_val.as_str()
1356 {
1357 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1358 {
1359 return Ok(val);
1360 }
1361 return Ok(Value::Null);
1362 }
1363 if arr_val.is_null() {
1364 return Ok(Value::Null);
1365 }
1366 Err(anyhow!(
1367 "TypeError: InvalidArgumentType - cannot index into {:?}",
1368 arr_val
1369 ))
1370 }
1371 Expr::ArraySlice { array, start, end } => {
1372 let arr_val = this
1373 .evaluate_expr(array, row, prop_manager, params, ctx)
1374 .await?;
1375
1376 if let Value::List(arr) = &arr_val {
1377 let len = arr.len();
1378
1379 let start_idx = if let Some(s) = start {
1381 let v = this
1382 .evaluate_expr(s, row, prop_manager, params, ctx)
1383 .await?;
1384 if v.is_null() {
1385 return Ok(Value::Null);
1386 }
1387 let raw = v.as_i64().unwrap_or(0);
1388 if raw < 0 {
1389 (len as i64 + raw).max(0) as usize
1390 } else {
1391 (raw as usize).min(len)
1392 }
1393 } else {
1394 0
1395 };
1396
1397 let end_idx = if let Some(e) = end {
1399 let v = this
1400 .evaluate_expr(e, row, prop_manager, params, ctx)
1401 .await?;
1402 if v.is_null() {
1403 return Ok(Value::Null);
1404 }
1405 let raw = v.as_i64().unwrap_or(len as i64);
1406 if raw < 0 {
1407 (len as i64 + raw).max(0) as usize
1408 } else {
1409 (raw as usize).min(len)
1410 }
1411 } else {
1412 len
1413 };
1414
1415 if start_idx >= end_idx {
1417 return Ok(Value::List(vec![]));
1418 }
1419 let end_idx = end_idx.min(len);
1420 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1421 }
1422
1423 if arr_val.is_null() {
1424 return Ok(Value::Null);
1425 }
1426 Err(anyhow!("Cannot slice {:?}", arr_val))
1427 }
1428 Expr::Literal(lit) => Ok(lit.to_value()),
1429 Expr::List(items) => {
1430 let mut vals = Vec::new();
1431 for item in items {
1432 vals.push(
1433 this.evaluate_expr(item, row, prop_manager, params, ctx)
1434 .await?,
1435 );
1436 }
1437 Ok(Value::List(vals))
1438 }
1439 Expr::Map(items) => {
1440 let mut map = HashMap::new();
1441 for (key, value_expr) in items {
1442 let val = this
1443 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1444 .await?;
1445 map.insert(key.clone(), val);
1446 }
1447 Ok(Value::Map(map))
1448 }
1449 Expr::Exists { query, .. } => {
1450 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1452 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1453
1454 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1455 Ok(plan) => {
1456 let mut sub_params = params.clone();
1457 sub_params.extend(row.clone());
1458
1459 match this.execute(plan, prop_manager, &sub_params).await {
1460 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1461 Err(e) => {
1462 log::debug!("EXISTS subquery execution failed: {}", e);
1463 Ok(Value::Bool(false))
1464 }
1465 }
1466 }
1467 Err(e) => {
1468 log::debug!("EXISTS subquery planning failed: {}", e);
1469 Ok(Value::Bool(false))
1470 }
1471 }
1472 }
1473 Expr::CountSubquery(query) => {
1474 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1476
1477 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1478
1479 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1480 Ok(plan) => {
1481 let mut sub_params = params.clone();
1482 sub_params.extend(row.clone());
1483
1484 match this.execute(plan, prop_manager, &sub_params).await {
1485 Ok(results) => Ok(Value::from(results.len() as i64)),
1486 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1487 }
1488 }
1489 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1490 }
1491 }
1492 Expr::Quantifier {
1493 quantifier,
1494 variable,
1495 list,
1496 predicate,
1497 } => {
1498 let list_val = this
1512 .evaluate_expr(list, row, prop_manager, params, ctx)
1513 .await?;
1514
1515 if list_val.is_null() {
1517 return Ok(Value::Null);
1518 }
1519
1520 let items = match list_val {
1522 Value::List(arr) => arr,
1523 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1524 };
1525
1526 let mut satisfied_count = 0;
1528 for item in &items {
1529 let mut item_row = row.clone();
1531 item_row.insert(variable.clone(), item.clone());
1532
1533 let pred_result = this
1535 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1536 .await?;
1537
1538 if let Value::Bool(true) = pred_result {
1540 satisfied_count += 1;
1541 }
1542 }
1543
1544 let result = match quantifier {
1546 Quantifier::All => satisfied_count == items.len(),
1547 Quantifier::Any => satisfied_count > 0,
1548 Quantifier::Single => satisfied_count == 1,
1549 Quantifier::None => satisfied_count == 0,
1550 };
1551
1552 Ok(Value::Bool(result))
1553 }
1554 Expr::ListComprehension {
1555 variable,
1556 list,
1557 where_clause,
1558 map_expr,
1559 } => {
1560 let list_val = this
1567 .evaluate_expr(list, row, prop_manager, params, ctx)
1568 .await?;
1569
1570 if list_val.is_null() {
1572 return Ok(Value::Null);
1573 }
1574
1575 let items = match list_val {
1577 Value::List(arr) => arr,
1578 _ => {
1579 return Err(anyhow!(
1580 "List comprehension expects a list, got: {:?}",
1581 list_val
1582 ));
1583 }
1584 };
1585
1586 let mut results = Vec::new();
1588 for item in &items {
1589 let mut item_row = row.clone();
1591 item_row.insert(variable.clone(), item.clone());
1592
1593 if let Some(predicate) = where_clause {
1595 let pred_result = this
1596 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1597 .await?;
1598
1599 if !matches!(pred_result, Value::Bool(true)) {
1601 continue;
1602 }
1603 }
1604
1605 let mapped_val = this
1607 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1608 .await?;
1609 results.push(mapped_val);
1610 }
1611
1612 Ok(Value::List(results))
1613 }
1614 Expr::BinaryOp { left, op, right } => {
1615 match op {
1617 BinaryOp::And => {
1618 let l_val = this
1619 .evaluate_expr(left, row, prop_manager, params, ctx)
1620 .await?;
1621 if let Some(false) = l_val.as_bool() {
1623 return Ok(Value::Bool(false));
1624 }
1625 let r_val = this
1626 .evaluate_expr(right, row, prop_manager, params, ctx)
1627 .await?;
1628 eval_binary_op(&l_val, op, &r_val)
1629 }
1630 BinaryOp::Or => {
1631 let l_val = this
1632 .evaluate_expr(left, row, prop_manager, params, ctx)
1633 .await?;
1634 if let Some(true) = l_val.as_bool() {
1636 return Ok(Value::Bool(true));
1637 }
1638 let r_val = this
1639 .evaluate_expr(right, row, prop_manager, params, ctx)
1640 .await?;
1641 eval_binary_op(&l_val, op, &r_val)
1642 }
1643 _ => {
1644 let l_val = this
1646 .evaluate_expr(left, row, prop_manager, params, ctx)
1647 .await?;
1648 let r_val = this
1649 .evaluate_expr(right, row, prop_manager, params, ctx)
1650 .await?;
1651 eval_binary_op(&l_val, op, &r_val)
1652 }
1653 }
1654 }
1655 Expr::In { expr, list } => {
1656 let l_val = this
1657 .evaluate_expr(expr, row, prop_manager, params, ctx)
1658 .await?;
1659 let r_val = this
1660 .evaluate_expr(list, row, prop_manager, params, ctx)
1661 .await?;
1662 eval_in_op(&l_val, &r_val)
1663 }
1664 Expr::UnaryOp { op, expr } => {
1665 let val = this
1666 .evaluate_expr(expr, row, prop_manager, params, ctx)
1667 .await?;
1668 match op {
1669 UnaryOp::Not => {
1670 match val.as_bool() {
1672 Some(b) => Ok(Value::Bool(!b)),
1673 None if val.is_null() => Ok(Value::Null),
1674 None => Err(anyhow!(
1675 "InvalidArgumentType: NOT requires a boolean argument"
1676 )),
1677 }
1678 }
1679 UnaryOp::Neg => {
1680 if let Some(i) = val.as_i64() {
1681 Ok(Value::Int(-i))
1682 } else if let Some(f) = val.as_f64() {
1683 Ok(Value::Float(-f))
1684 } else {
1685 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1686 }
1687 }
1688 }
1689 }
1690 Expr::IsNull(expr) => {
1691 let val = this
1692 .evaluate_expr(expr, row, prop_manager, params, ctx)
1693 .await?;
1694 Ok(Value::Bool(val.is_null()))
1695 }
1696 Expr::IsNotNull(expr) => {
1697 let val = this
1698 .evaluate_expr(expr, row, prop_manager, params, ctx)
1699 .await?;
1700 Ok(Value::Bool(!val.is_null()))
1701 }
1702 Expr::IsUnique(_) => {
1703 Err(anyhow!(
1705 "IS UNIQUE can only be used in constraint definitions"
1706 ))
1707 }
1708 Expr::Case {
1709 expr,
1710 when_then,
1711 else_expr,
1712 } => {
1713 if let Some(base_expr) = expr {
1714 let base_val = this
1715 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1716 .await?;
1717 for (w, t) in when_then {
1718 let w_val = this
1719 .evaluate_expr(w, row, prop_manager, params, ctx)
1720 .await?;
1721 if base_val == w_val {
1722 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1723 }
1724 }
1725 } else {
1726 for (w, t) in when_then {
1727 let w_val = this
1728 .evaluate_expr(w, row, prop_manager, params, ctx)
1729 .await?;
1730 if w_val.as_bool() == Some(true) {
1731 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1732 }
1733 }
1734 }
1735 if let Some(e) = else_expr {
1736 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1737 }
1738 Ok(Value::Null)
1739 }
1740 Expr::Wildcard => Ok(Value::Null),
1741 Expr::FunctionCall { name, args, .. } => {
1742 if name.eq_ignore_ascii_case("ID") {
1744 if args.len() != 1 {
1745 return Err(anyhow!("id() requires exactly 1 argument"));
1746 }
1747 let val = this
1748 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1749 .await?;
1750 if let Value::Map(map) = &val {
1751 if let Some(vid_val) = map.get("_vid") {
1753 return Ok(vid_val.clone());
1754 }
1755 if let Some(eid_val) = map.get("_eid") {
1757 return Ok(eid_val.clone());
1758 }
1759 if let Some(id_val) = map.get("_id") {
1761 return Ok(id_val.clone());
1762 }
1763 }
1764 return Ok(Value::Null);
1765 }
1766
1767 if name.eq_ignore_ascii_case("ELEMENTID") {
1769 if args.len() != 1 {
1770 return Err(anyhow!("elementId() requires exactly 1 argument"));
1771 }
1772 let val = this
1773 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1774 .await?;
1775 if let Value::Map(map) = &val {
1776 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1779 return Ok(Value::String(vid_val.to_string()));
1780 }
1781 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1784 return Ok(Value::String(eid_val.to_string()));
1785 }
1786 }
1787 return Ok(Value::Null);
1788 }
1789
1790 if name.eq_ignore_ascii_case("TYPE") {
1792 if args.len() != 1 {
1793 return Err(anyhow!("type() requires exactly 1 argument"));
1794 }
1795 let val = this
1796 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1797 .await?;
1798 if let Value::Map(map) = &val
1799 && let Some(type_val) = map.get("_type")
1800 {
1801 if let Some(type_id) =
1803 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1804 {
1805 if let Some(name) = this
1806 .storage
1807 .schema_manager()
1808 .edge_type_name_by_id_unified(type_id)
1809 {
1810 return Ok(Value::String(name));
1811 }
1812 } else if let Some(name) = type_val.as_str() {
1813 return Ok(Value::String(name.to_string()));
1814 }
1815 }
1816 return Ok(Value::Null);
1817 }
1818
1819 if name.eq_ignore_ascii_case("LABELS") {
1821 if args.len() != 1 {
1822 return Err(anyhow!("labels() requires exactly 1 argument"));
1823 }
1824 let val = this
1825 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1826 .await?;
1827 if let Value::Map(map) = &val
1828 && let Some(labels_val) = map.get("_labels")
1829 {
1830 return Ok(labels_val.clone());
1831 }
1832 return Ok(Value::Null);
1833 }
1834
1835 if name.eq_ignore_ascii_case("PROPERTIES") {
1837 if args.len() != 1 {
1838 return Err(anyhow!("properties() requires exactly 1 argument"));
1839 }
1840 let val = this
1841 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1842 .await?;
1843 if let Value::Map(map) = &val {
1844 let mut props = HashMap::new();
1846 for (k, v) in map.iter() {
1847 if !k.starts_with('_') {
1848 props.insert(k.clone(), v.clone());
1849 }
1850 }
1851 return Ok(Value::Map(props));
1852 }
1853 return Ok(Value::Null);
1854 }
1855
1856 if name.eq_ignore_ascii_case("STARTNODE") {
1858 if args.len() != 1 {
1859 return Err(anyhow!("startNode() requires exactly 1 argument"));
1860 }
1861 let val = this
1862 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1863 .await?;
1864 if let Value::Edge(edge) = &val {
1865 return Ok(Self::find_node_by_vid(row, edge.src));
1866 }
1867 if let Value::Map(map) = &val {
1868 if let Some(start_node) = map.get("_startNode") {
1869 return Ok(start_node.clone());
1870 }
1871 if let Some(src_vid) = map.get("_src_vid") {
1872 return Ok(Value::Map(HashMap::from([(
1873 "_vid".to_string(),
1874 src_vid.clone(),
1875 )])));
1876 }
1877 if let Some(src_id) = map.get("_src")
1879 && let Some(u) = src_id.as_u64()
1880 {
1881 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1882 }
1883 }
1884 return Ok(Value::Null);
1885 }
1886
1887 if name.eq_ignore_ascii_case("ENDNODE") {
1889 if args.len() != 1 {
1890 return Err(anyhow!("endNode() requires exactly 1 argument"));
1891 }
1892 let val = this
1893 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1894 .await?;
1895 if let Value::Edge(edge) = &val {
1896 return Ok(Self::find_node_by_vid(row, edge.dst));
1897 }
1898 if let Value::Map(map) = &val {
1899 if let Some(end_node) = map.get("_endNode") {
1900 return Ok(end_node.clone());
1901 }
1902 if let Some(dst_vid) = map.get("_dst_vid") {
1903 return Ok(Value::Map(HashMap::from([(
1904 "_vid".to_string(),
1905 dst_vid.clone(),
1906 )])));
1907 }
1908 if let Some(dst_id) = map.get("_dst")
1910 && let Some(u) = dst_id.as_u64()
1911 {
1912 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1913 }
1914 }
1915 return Ok(Value::Null);
1916 }
1917
1918 if name.eq_ignore_ascii_case("HASLABEL") {
1921 if args.len() != 2 {
1922 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1923 }
1924 let node_val = this
1925 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1926 .await?;
1927 let label_val = this
1928 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1929 .await?;
1930
1931 let label_to_check = label_val.as_str().ok_or_else(|| {
1932 anyhow!("Second argument to hasLabel must be a string")
1933 })?;
1934
1935 let has_label = match &node_val {
1936 Value::Map(map) if map.contains_key("_vid") => {
1938 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1939 labels_arr
1940 .iter()
1941 .any(|l| l.as_str() == Some(label_to_check))
1942 } else {
1943 false
1944 }
1945 }
1946 Value::Map(map) => {
1948 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1949 labels_arr
1950 .iter()
1951 .any(|l| l.as_str() == Some(label_to_check))
1952 } else {
1953 false
1954 }
1955 }
1956 _ => false,
1957 };
1958 return Ok(Value::Bool(has_label));
1959 }
1960
1961 if matches!(
1964 name.to_uppercase().as_str(),
1965 "ANY" | "ALL" | "NONE" | "SINGLE"
1966 ) {
1967 return Err(anyhow!(
1968 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1969 name.to_lowercase()
1970 ));
1971 }
1972
1973 if name.eq_ignore_ascii_case("COALESCE") {
1975 for arg in args {
1976 let val = this
1977 .evaluate_expr(arg, row, prop_manager, params, ctx)
1978 .await?;
1979 if !val.is_null() {
1980 return Ok(val);
1981 }
1982 }
1983 return Ok(Value::Null);
1984 }
1985
1986 if name.eq_ignore_ascii_case("vector_similarity") {
1988 if args.len() != 2 {
1989 return Err(anyhow!("vector_similarity takes 2 arguments"));
1990 }
1991 let v1 = this
1992 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1993 .await?;
1994 let v2 = this
1995 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1996 .await?;
1997 return eval_vector_similarity(&v1, &v2);
1998 }
1999
2000 if name.eq_ignore_ascii_case("uni.temporal.validAt")
2002 || name.eq_ignore_ascii_case("uni.validAt")
2003 || name.eq_ignore_ascii_case("validAt")
2004 {
2005 if args.len() != 4 {
2006 return Err(anyhow!("validAt requires 4 arguments"));
2007 }
2008 let node_val = this
2009 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2010 .await?;
2011 let start_prop = this
2012 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2013 .await?
2014 .as_str()
2015 .ok_or(anyhow!("start_prop must be string"))?
2016 .to_string();
2017 let end_prop = this
2018 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2019 .await?
2020 .as_str()
2021 .ok_or(anyhow!("end_prop must be string"))?
2022 .to_string();
2023 let time_val = this
2024 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2025 .await?;
2026
2027 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2028 anyhow!("time argument must be a datetime value or string")
2029 })?;
2030
2031 let valid_from_val: Option<Value> = if let Ok(vid) =
2033 Self::vid_from_value(&node_val)
2034 {
2035 prop_manager
2037 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2038 .await
2039 .ok()
2040 } else if let Value::Map(map) = &node_val {
2041 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2043 let vid = Vid::from(vid_val);
2044 prop_manager
2045 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2046 .await
2047 .ok()
2048 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2049 let eid = uni_common::core::id::Eid::from(eid_val);
2051 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2052 } else {
2053 map.get(&start_prop).cloned()
2055 }
2056 } else {
2057 return Ok(Value::Bool(false));
2058 };
2059
2060 let valid_from = match valid_from_val {
2061 Some(ref v) => match value_to_datetime_utc(v) {
2062 Some(dt) => dt,
2063 None if v.is_null() => return Ok(Value::Bool(false)),
2064 None => {
2065 return Err(anyhow!(
2066 "Property {} must be a datetime value or string",
2067 start_prop
2068 ));
2069 }
2070 },
2071 None => return Ok(Value::Bool(false)),
2072 };
2073
2074 let valid_to_val: Option<Value> = if let Ok(vid) =
2075 Self::vid_from_value(&node_val)
2076 {
2077 prop_manager
2079 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2080 .await
2081 .ok()
2082 } else if let Value::Map(map) = &node_val {
2083 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2085 let vid = Vid::from(vid_val);
2086 prop_manager
2087 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2088 .await
2089 .ok()
2090 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2091 let eid = uni_common::core::id::Eid::from(eid_val);
2093 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2094 } else {
2095 map.get(&end_prop).cloned()
2097 }
2098 } else {
2099 return Ok(Value::Bool(false));
2100 };
2101
2102 let valid_to = match valid_to_val {
2103 Some(ref v) => match value_to_datetime_utc(v) {
2104 Some(dt) => Some(dt),
2105 None if v.is_null() => None,
2106 None => {
2107 return Err(anyhow!(
2108 "Property {} must be a datetime value or null",
2109 end_prop
2110 ));
2111 }
2112 },
2113 None => None,
2114 };
2115
2116 let is_valid = valid_from <= query_time
2117 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2118 return Ok(Value::Bool(is_valid));
2119 }
2120
2121 let mut evaluated_args = Vec::with_capacity(args.len());
2123 for arg in args {
2124 let mut val = this
2125 .evaluate_expr(arg, row, prop_manager, params, ctx)
2126 .await?;
2127
2128 if let Value::Map(ref mut map) = val {
2131 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2132 }
2133
2134 evaluated_args.push(val);
2135 }
2136 eval_scalar_function(name, &evaluated_args)
2137 }
2138 Expr::Reduce {
2139 accumulator,
2140 init,
2141 variable,
2142 list,
2143 expr,
2144 } => {
2145 let mut acc = self
2146 .evaluate_expr(init, row, prop_manager, params, ctx)
2147 .await?;
2148 let list_val = self
2149 .evaluate_expr(list, row, prop_manager, params, ctx)
2150 .await?;
2151
2152 if let Value::List(items) = list_val {
2153 for item in items {
2154 let mut scope = row.clone();
2158 scope.insert(accumulator.clone(), acc.clone());
2159 scope.insert(variable.clone(), item);
2160
2161 acc = self
2162 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2163 .await?;
2164 }
2165 } else {
2166 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2167 }
2168 Ok(acc)
2169 }
2170 Expr::ValidAt { .. } => {
2171 Err(anyhow!(
2173 "VALID_AT expression should have been transformed to function call in planner"
2174 ))
2175 }
2176
2177 Expr::LabelCheck { expr, labels } => {
2178 let val = this
2179 .evaluate_expr(expr, row, prop_manager, params, ctx)
2180 .await?;
2181 match &val {
2182 Value::Null => Ok(Value::Null),
2183 Value::Map(map) => {
2184 let is_edge = map.contains_key("_eid")
2186 || map.contains_key("_type_name")
2187 || (map.contains_key("_type") && !map.contains_key("_vid"));
2188
2189 if is_edge {
2190 if labels.len() > 1 {
2192 return Ok(Value::Bool(false));
2193 }
2194 let label_to_check = &labels[0];
2195 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2196 {
2197 t == label_to_check
2198 } else if let Some(Value::String(t)) = map.get("_type") {
2199 t == label_to_check
2200 } else {
2201 false
2202 };
2203 Ok(Value::Bool(has_type))
2204 } else {
2205 let has_all = labels.iter().all(|label_to_check| {
2207 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2208 labels_arr
2209 .iter()
2210 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2211 } else {
2212 false
2213 }
2214 });
2215 Ok(Value::Bool(has_all))
2216 }
2217 }
2218 _ => Ok(Value::Bool(false)),
2219 }
2220 }
2221
2222 Expr::MapProjection { base, items } => {
2223 let base_value = this
2224 .evaluate_expr(base, row, prop_manager, params, ctx)
2225 .await?;
2226
2227 let properties = match &base_value {
2229 Value::Map(map) => map,
2230 _ => {
2231 return Err(anyhow!(
2232 "Map projection requires object, got {:?}",
2233 base_value
2234 ));
2235 }
2236 };
2237
2238 let mut result_map = HashMap::new();
2239
2240 for item in items {
2241 match item {
2242 MapProjectionItem::Property(prop) => {
2243 if let Some(value) = properties.get(prop.as_str()) {
2244 result_map.insert(prop.clone(), value.clone());
2245 }
2246 }
2247 MapProjectionItem::AllProperties => {
2248 for (key, value) in properties.iter() {
2250 if !key.starts_with('_') {
2251 result_map.insert(key.clone(), value.clone());
2252 }
2253 }
2254 }
2255 MapProjectionItem::LiteralEntry(key, expr) => {
2256 let value = this
2257 .evaluate_expr(expr, row, prop_manager, params, ctx)
2258 .await?;
2259 result_map.insert(key.clone(), value);
2260 }
2261 MapProjectionItem::Variable(var_name) => {
2262 if let Some(value) = row.get(var_name.as_str()) {
2265 result_map.insert(var_name.clone(), value.clone());
2266 }
2267 }
2268 }
2269 }
2270
2271 Ok(Value::Map(result_map))
2272 }
2273 }
2274 })
2275 }
2276
2277 pub(crate) fn execute_subplan<'a>(
2278 &'a self,
2279 plan: LogicalPlan,
2280 prop_manager: &'a PropertyManager,
2281 params: &'a HashMap<String, Value>,
2282 ctx: Option<&'a QueryContext>,
2283 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2284 Box::pin(async move {
2285 if let Some(ctx) = ctx {
2286 ctx.check_timeout()?;
2287 }
2288 match plan {
2289 LogicalPlan::Union { left, right, all } => {
2290 self.execute_union(left, right, all, prop_manager, params, ctx)
2291 .await
2292 }
2293 LogicalPlan::CreateVectorIndex {
2294 config,
2295 if_not_exists,
2296 } => {
2297 if if_not_exists && self.index_exists_by_name(&config.name) {
2298 return Ok(vec![]);
2299 }
2300 let idx_mgr = IndexManager::new(
2301 self.storage.base_path(),
2302 self.storage.schema_manager_arc(),
2303 self.storage.lancedb_store_arc(),
2304 );
2305 idx_mgr.create_vector_index(config).await?;
2306 Ok(vec![])
2307 }
2308 LogicalPlan::CreateFullTextIndex {
2309 config,
2310 if_not_exists,
2311 } => {
2312 if if_not_exists && self.index_exists_by_name(&config.name) {
2313 return Ok(vec![]);
2314 }
2315 let idx_mgr = IndexManager::new(
2316 self.storage.base_path(),
2317 self.storage.schema_manager_arc(),
2318 self.storage.lancedb_store_arc(),
2319 );
2320 idx_mgr.create_fts_index(config).await?;
2321 Ok(vec![])
2322 }
2323 LogicalPlan::CreateScalarIndex {
2324 mut config,
2325 if_not_exists,
2326 } => {
2327 if if_not_exists && self.index_exists_by_name(&config.name) {
2328 return Ok(vec![]);
2329 }
2330
2331 let mut modified_properties = Vec::new();
2333
2334 for prop in &config.properties {
2335 if prop.contains('(') && prop.contains(')') {
2337 let gen_col = SchemaManager::generated_column_name(prop);
2338
2339 let sm = self.storage.schema_manager_arc();
2341 if let Err(e) = sm.add_generated_property(
2342 &config.label,
2343 &gen_col,
2344 DataType::String, prop.clone(),
2346 ) {
2347 log::warn!("Failed to add generated property (might exist): {}", e);
2348 }
2349
2350 modified_properties.push(gen_col);
2351 } else {
2352 modified_properties.push(prop.clone());
2354 }
2355 }
2356
2357 config.properties = modified_properties;
2358
2359 let idx_mgr = IndexManager::new(
2360 self.storage.base_path(),
2361 self.storage.schema_manager_arc(),
2362 self.storage.lancedb_store_arc(),
2363 );
2364 idx_mgr.create_scalar_index(config).await?;
2365 Ok(vec![])
2366 }
2367 LogicalPlan::CreateJsonFtsIndex {
2368 config,
2369 if_not_exists,
2370 } => {
2371 if if_not_exists && self.index_exists_by_name(&config.name) {
2372 return Ok(vec![]);
2373 }
2374 let idx_mgr = IndexManager::new(
2375 self.storage.base_path(),
2376 self.storage.schema_manager_arc(),
2377 self.storage.lancedb_store_arc(),
2378 );
2379 idx_mgr.create_json_fts_index(config).await?;
2380 Ok(vec![])
2381 }
2382 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2383 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2384 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2385 LogicalPlan::Vacuum => {
2386 self.execute_vacuum().await?;
2387 Ok(vec![])
2388 }
2389 LogicalPlan::Checkpoint => {
2390 self.execute_checkpoint().await?;
2391 Ok(vec![])
2392 }
2393 LogicalPlan::CopyTo {
2394 label,
2395 path,
2396 format,
2397 options,
2398 } => {
2399 let count = self
2400 .execute_copy_to(&label, &path, &format, &options)
2401 .await?;
2402 let mut result = HashMap::new();
2403 result.insert("count".to_string(), Value::Int(count as i64));
2404 Ok(vec![result])
2405 }
2406 LogicalPlan::CopyFrom {
2407 label,
2408 path,
2409 format,
2410 options,
2411 } => {
2412 let count = self
2413 .execute_copy_from(&label, &path, &format, &options)
2414 .await?;
2415 let mut result = HashMap::new();
2416 result.insert("count".to_string(), Value::Int(count as i64));
2417 Ok(vec![result])
2418 }
2419 LogicalPlan::CreateLabel(clause) => {
2420 self.execute_create_label(clause).await?;
2421 Ok(vec![])
2422 }
2423 LogicalPlan::CreateEdgeType(clause) => {
2424 self.execute_create_edge_type(clause).await?;
2425 Ok(vec![])
2426 }
2427 LogicalPlan::AlterLabel(clause) => {
2428 self.execute_alter_label(clause).await?;
2429 Ok(vec![])
2430 }
2431 LogicalPlan::AlterEdgeType(clause) => {
2432 self.execute_alter_edge_type(clause).await?;
2433 Ok(vec![])
2434 }
2435 LogicalPlan::DropLabel(clause) => {
2436 self.execute_drop_label(clause).await?;
2437 Ok(vec![])
2438 }
2439 LogicalPlan::DropEdgeType(clause) => {
2440 self.execute_drop_edge_type(clause).await?;
2441 Ok(vec![])
2442 }
2443 LogicalPlan::CreateConstraint(clause) => {
2444 self.execute_create_constraint(clause).await?;
2445 Ok(vec![])
2446 }
2447 LogicalPlan::DropConstraint(clause) => {
2448 self.execute_drop_constraint(clause).await?;
2449 Ok(vec![])
2450 }
2451 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2452 LogicalPlan::DropIndex { name, if_exists } => {
2453 let idx_mgr = IndexManager::new(
2454 self.storage.base_path(),
2455 self.storage.schema_manager_arc(),
2456 self.storage.lancedb_store_arc(),
2457 );
2458 match idx_mgr.drop_index(&name).await {
2459 Ok(_) => Ok(vec![]),
2460 Err(e) => {
2461 if if_exists && e.to_string().contains("not found") {
2462 Ok(vec![])
2463 } else {
2464 Err(e)
2465 }
2466 }
2467 }
2468 }
2469 LogicalPlan::ShowIndexes { filter } => {
2470 Ok(self.execute_show_indexes(filter.as_deref()))
2471 }
2472 LogicalPlan::Scan { .. }
2475 | LogicalPlan::ExtIdLookup { .. }
2476 | LogicalPlan::ScanAll { .. }
2477 | LogicalPlan::ScanMainByLabels { .. }
2478 | LogicalPlan::Traverse { .. }
2479 | LogicalPlan::TraverseMainByType { .. } => {
2480 let batches = self.execute_datafusion(plan, prop_manager, params).await?;
2481 self.record_batches_to_rows(batches)
2482 }
2483 LogicalPlan::Filter {
2484 input,
2485 predicate,
2486 optional_variables,
2487 } => {
2488 let input_matches = self
2489 .execute_subplan(*input, prop_manager, params, ctx)
2490 .await?;
2491
2492 tracing::debug!(
2493 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2494 predicate,
2495 input_matches.len(),
2496 optional_variables
2497 );
2498
2499 if !optional_variables.is_empty() {
2503 let is_optional_key = |k: &str| -> bool {
2506 optional_variables.contains(k)
2507 || optional_variables
2508 .iter()
2509 .any(|var| k.starts_with(&format!("{}.", var)))
2510 };
2511
2512 let is_internal_key =
2514 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2515
2516 let non_optional_vars: Vec<String> = input_matches
2518 .first()
2519 .map(|row| {
2520 row.keys()
2521 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2522 .cloned()
2523 .collect()
2524 })
2525 .unwrap_or_default();
2526
2527 let mut groups: std::collections::HashMap<
2529 Vec<u8>,
2530 Vec<HashMap<String, Value>>,
2531 > = std::collections::HashMap::new();
2532
2533 for row in &input_matches {
2534 let key: Vec<u8> = non_optional_vars
2536 .iter()
2537 .map(|var| {
2538 row.get(var).map(|v| format!("{:?}", v)).unwrap_or_default()
2539 })
2540 .collect::<Vec<_>>()
2541 .join("|")
2542 .into_bytes();
2543
2544 groups.entry(key).or_default().push(row.clone());
2545 }
2546
2547 let mut filtered = Vec::new();
2548 for (_key, group_rows) in groups {
2549 let mut group_passed = Vec::new();
2550
2551 for row in &group_rows {
2552 let has_null_optional = optional_variables.iter().any(|var| {
2554 let direct_null =
2556 matches!(row.get(var), Some(Value::Null) | None);
2557 let prefixed_null = row
2558 .keys()
2559 .filter(|k| k.starts_with(&format!("{}.", var)))
2560 .any(|k| matches!(row.get(k), Some(Value::Null)));
2561 direct_null || prefixed_null
2562 });
2563
2564 if has_null_optional {
2565 group_passed.push(row.clone());
2566 continue;
2567 }
2568
2569 let res = self
2570 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2571 .await?;
2572
2573 if res.as_bool().unwrap_or(false) {
2574 group_passed.push(row.clone());
2575 }
2576 }
2577
2578 if group_passed.is_empty() {
2579 if let Some(template) = group_rows.first() {
2582 let mut null_row = HashMap::new();
2583 for (k, v) in template {
2584 if is_optional_key(k) {
2585 null_row.insert(k.clone(), Value::Null);
2586 } else {
2587 null_row.insert(k.clone(), v.clone());
2588 }
2589 }
2590 filtered.push(null_row);
2591 }
2592 } else {
2593 filtered.extend(group_passed);
2594 }
2595 }
2596
2597 tracing::debug!(
2598 "Filter (OPTIONAL): {} input rows -> {} output rows",
2599 input_matches.len(),
2600 filtered.len()
2601 );
2602
2603 return Ok(filtered);
2604 }
2605
2606 let mut filtered = Vec::new();
2608 for row in input_matches.iter() {
2609 let res = self
2610 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2611 .await?;
2612
2613 let passes = res.as_bool().unwrap_or(false);
2614
2615 if passes {
2616 filtered.push(row.clone());
2617 }
2618 }
2619
2620 tracing::debug!(
2621 "Filter: {} input rows -> {} output rows",
2622 input_matches.len(),
2623 filtered.len()
2624 );
2625
2626 Ok(filtered)
2627 }
2628 LogicalPlan::ProcedureCall {
2629 procedure_name,
2630 arguments,
2631 yield_items,
2632 } => {
2633 let yield_names: Vec<String> =
2634 yield_items.iter().map(|(n, _)| n.clone()).collect();
2635 let results = self
2636 .execute_procedure(
2637 &procedure_name,
2638 &arguments,
2639 &yield_names,
2640 prop_manager,
2641 params,
2642 ctx,
2643 )
2644 .await?;
2645
2646 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2652 if !has_aliases {
2653 Ok(results)
2656 } else {
2657 let mut aliased_results = Vec::with_capacity(results.len());
2658 for row in results {
2659 let mut new_row = HashMap::new();
2660 for (name, alias) in &yield_items {
2661 let col_name = alias.as_ref().unwrap_or(name);
2662 let val = row.get(name).cloned().unwrap_or(Value::Null);
2663 new_row.insert(col_name.clone(), val);
2664 }
2665 aliased_results.push(new_row);
2666 }
2667 Ok(aliased_results)
2668 }
2669 }
2670 LogicalPlan::VectorKnn { .. } => {
2671 unreachable!("VectorKnn is handled by DataFusion engine")
2672 }
2673 LogicalPlan::InvertedIndexLookup { .. } => {
2674 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2675 }
2676 LogicalPlan::Sort { input, order_by } => {
2677 let rows = self
2678 .execute_subplan(*input, prop_manager, params, ctx)
2679 .await?;
2680 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2681 .await
2682 }
2683 LogicalPlan::Limit { input, skip, fetch } => {
2684 let rows = self
2685 .execute_subplan(*input, prop_manager, params, ctx)
2686 .await?;
2687 let skip = skip.unwrap_or(0);
2688 let take = fetch.unwrap_or(usize::MAX);
2689 Ok(rows.into_iter().skip(skip).take(take).collect())
2690 }
2691 LogicalPlan::Aggregate {
2692 input,
2693 group_by,
2694 aggregates,
2695 } => {
2696 let rows = self
2697 .execute_subplan(*input, prop_manager, params, ctx)
2698 .await?;
2699 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2700 .await
2701 }
2702 LogicalPlan::Window {
2703 input,
2704 window_exprs,
2705 } => {
2706 let rows = self
2707 .execute_subplan(*input, prop_manager, params, ctx)
2708 .await?;
2709 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2710 .await
2711 }
2712 LogicalPlan::Project { input, projections } => {
2713 let matches = self
2714 .execute_subplan(*input, prop_manager, params, ctx)
2715 .await?;
2716 self.execute_project(matches, &projections, prop_manager, params, ctx)
2717 .await
2718 }
2719 LogicalPlan::Distinct { input } => {
2720 let rows = self
2721 .execute_subplan(*input, prop_manager, params, ctx)
2722 .await?;
2723 let mut seen = std::collections::HashSet::new();
2724 let mut result = Vec::new();
2725 for row in rows {
2726 let key = Self::canonical_row_key(&row);
2727 if seen.insert(key) {
2728 result.push(row);
2729 }
2730 }
2731 Ok(result)
2732 }
2733 LogicalPlan::Unwind {
2734 input,
2735 expr,
2736 variable,
2737 } => {
2738 let input_rows = self
2739 .execute_subplan(*input, prop_manager, params, ctx)
2740 .await?;
2741 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2742 .await
2743 }
2744 LogicalPlan::Apply {
2745 input,
2746 subquery,
2747 input_filter,
2748 } => {
2749 let input_rows = self
2750 .execute_subplan(*input, prop_manager, params, ctx)
2751 .await?;
2752 self.execute_apply(
2753 input_rows,
2754 &subquery,
2755 input_filter.as_ref(),
2756 prop_manager,
2757 params,
2758 ctx,
2759 )
2760 .await
2761 }
2762 LogicalPlan::SubqueryCall { input, subquery } => {
2763 let input_rows = self
2764 .execute_subplan(*input, prop_manager, params, ctx)
2765 .await?;
2766 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2769 .await
2770 }
2771 LogicalPlan::RecursiveCTE {
2772 cte_name,
2773 initial,
2774 recursive,
2775 } => {
2776 self.execute_recursive_cte(
2777 &cte_name,
2778 *initial,
2779 *recursive,
2780 prop_manager,
2781 params,
2782 ctx,
2783 )
2784 .await
2785 }
2786 LogicalPlan::CrossJoin { left, right } => {
2787 self.execute_cross_join(left, right, prop_manager, params, ctx)
2788 .await
2789 }
2790 LogicalPlan::Set { .. }
2791 | LogicalPlan::Remove { .. }
2792 | LogicalPlan::Merge { .. }
2793 | LogicalPlan::Create { .. }
2794 | LogicalPlan::CreateBatch { .. } => {
2795 unreachable!("mutations are handled by DataFusion engine")
2796 }
2797 LogicalPlan::Delete { .. } => {
2798 unreachable!("mutations are handled by DataFusion engine")
2799 }
2800 LogicalPlan::Begin => {
2801 if let Some(writer_lock) = &self.writer {
2802 let mut writer = writer_lock.write().await;
2803 writer.begin_transaction()?;
2804 } else {
2805 return Err(anyhow!("Transaction requires a Writer"));
2806 }
2807 Ok(vec![HashMap::new()])
2808 }
2809 LogicalPlan::Commit => {
2810 if let Some(writer_lock) = &self.writer {
2811 let mut writer = writer_lock.write().await;
2812 writer.commit_transaction().await?;
2813 } else {
2814 return Err(anyhow!("Transaction requires a Writer"));
2815 }
2816 Ok(vec![HashMap::new()])
2817 }
2818 LogicalPlan::Rollback => {
2819 if let Some(writer_lock) = &self.writer {
2820 let mut writer = writer_lock.write().await;
2821 writer.rollback_transaction()?;
2822 } else {
2823 return Err(anyhow!("Transaction requires a Writer"));
2824 }
2825 Ok(vec![HashMap::new()])
2826 }
2827 LogicalPlan::Copy {
2828 target,
2829 source,
2830 is_export,
2831 options,
2832 } => {
2833 if is_export {
2834 self.execute_export(&target, &source, &options, prop_manager, ctx)
2835 .await
2836 } else {
2837 self.execute_copy(&target, &source, &options, prop_manager)
2838 .await
2839 }
2840 }
2841 LogicalPlan::Backup {
2842 destination,
2843 options,
2844 } => self.execute_backup(&destination, &options).await,
2845 LogicalPlan::Explain { plan } => {
2846 let plan_str = format!("{:#?}", plan);
2847 let mut row = HashMap::new();
2848 row.insert("plan".to_string(), Value::String(plan_str));
2849 Ok(vec![row])
2850 }
2851 LogicalPlan::ShortestPath { .. } => {
2852 unreachable!("ShortestPath is handled by DataFusion engine")
2853 }
2854 LogicalPlan::AllShortestPaths { .. } => {
2855 unreachable!("AllShortestPaths is handled by DataFusion engine")
2856 }
2857 LogicalPlan::Foreach { .. } => {
2858 unreachable!("mutations are handled by DataFusion engine")
2859 }
2860 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2861 LogicalPlan::BindZeroLengthPath { .. } => {
2862 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2863 }
2864 LogicalPlan::BindPath { .. } => {
2865 unreachable!("BindPath is handled by DataFusion engine")
2866 }
2867 LogicalPlan::QuantifiedPattern { .. } => {
2868 unreachable!("QuantifiedPattern is handled by DataFusion engine")
2869 }
2870 LogicalPlan::LocyProgram { .. }
2871 | LogicalPlan::LocyFold { .. }
2872 | LogicalPlan::LocyBestBy { .. }
2873 | LogicalPlan::LocyPriority { .. }
2874 | LogicalPlan::LocyDerivedScan { .. }
2875 | LogicalPlan::LocyProject { .. } => {
2876 unreachable!("Locy operators are handled by DataFusion engine")
2877 }
2878 }
2879 })
2880 }
2881
2882 pub(crate) async fn execute_foreach_body_plan(
2887 &self,
2888 plan: LogicalPlan,
2889 scope: &mut HashMap<String, Value>,
2890 writer: &mut uni_store::runtime::writer::Writer,
2891 prop_manager: &PropertyManager,
2892 params: &HashMap<String, Value>,
2893 ctx: Option<&QueryContext>,
2894 ) -> Result<()> {
2895 match plan {
2896 LogicalPlan::Set { items, .. } => {
2897 self.execute_set_items_locked(&items, scope, writer, prop_manager, params, ctx)
2898 .await?;
2899 }
2900 LogicalPlan::Remove { items, .. } => {
2901 self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx)
2902 .await?;
2903 }
2904 LogicalPlan::Delete { items, detach, .. } => {
2905 for expr in &items {
2906 let val = self
2907 .evaluate_expr(expr, scope, prop_manager, params, ctx)
2908 .await?;
2909 self.execute_delete_item_locked(&val, detach, writer)
2910 .await?;
2911 }
2912 }
2913 LogicalPlan::Create { pattern, .. } => {
2914 self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2915 .await?;
2916 }
2917 LogicalPlan::CreateBatch { patterns, .. } => {
2918 for pattern in &patterns {
2919 self.execute_create_pattern(pattern, scope, writer, prop_manager, params, ctx)
2920 .await?;
2921 }
2922 }
2923 LogicalPlan::Merge {
2924 pattern,
2925 on_match: _,
2926 on_create,
2927 ..
2928 } => {
2929 self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2930 .await?;
2931 if let Some(on_create_clause) = on_create {
2932 self.execute_set_items_locked(
2933 &on_create_clause.items,
2934 scope,
2935 writer,
2936 prop_manager,
2937 params,
2938 ctx,
2939 )
2940 .await?;
2941 }
2942 }
2943 LogicalPlan::Foreach {
2944 variable,
2945 list,
2946 body,
2947 ..
2948 } => {
2949 let list_val = self
2950 .evaluate_expr(&list, scope, prop_manager, params, ctx)
2951 .await?;
2952 let items = match list_val {
2953 Value::List(arr) => arr,
2954 Value::Null => return Ok(()),
2955 _ => return Err(anyhow!("FOREACH requires a list")),
2956 };
2957 for item in items {
2958 let mut nested_scope = scope.clone();
2959 nested_scope.insert(variable.clone(), item);
2960 for nested_plan in &body {
2961 Box::pin(self.execute_foreach_body_plan(
2962 nested_plan.clone(),
2963 &mut nested_scope,
2964 writer,
2965 prop_manager,
2966 params,
2967 ctx,
2968 ))
2969 .await?;
2970 }
2971 }
2972 }
2973 _ => {
2974 return Err(anyhow!(
2975 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2976 ));
2977 }
2978 }
2979 Ok(())
2980 }
2981
2982 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
2983 let mut pairs: Vec<_> = row.iter().collect();
2984 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
2985
2986 pairs
2987 .into_iter()
2988 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
2989 .collect::<Vec<_>>()
2990 .join("|")
2991 }
2992
2993 fn canonical_value_key(v: &Value) -> String {
2994 match v {
2995 Value::Null => "null".to_string(),
2996 Value::Bool(b) => format!("b:{b}"),
2997 Value::Int(i) => format!("n:{i}"),
2998 Value::Float(f) => {
2999 if f.is_nan() {
3000 "nan".to_string()
3001 } else if f.is_infinite() {
3002 if f.is_sign_positive() {
3003 "inf:+".to_string()
3004 } else {
3005 "inf:-".to_string()
3006 }
3007 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3008 format!("n:{}", *f as i64)
3009 } else {
3010 format!("f:{f}")
3011 }
3012 }
3013 Value::String(s) => {
3014 if let Some(k) = Self::temporal_string_key(s) {
3015 format!("temporal:{k}")
3016 } else {
3017 format!("s:{s}")
3018 }
3019 }
3020 Value::Bytes(b) => format!("bytes:{:?}", b),
3021 Value::List(items) => format!(
3022 "list:[{}]",
3023 items
3024 .iter()
3025 .map(Self::canonical_value_key)
3026 .collect::<Vec<_>>()
3027 .join(",")
3028 ),
3029 Value::Map(map) => {
3030 let mut pairs: Vec<_> = map.iter().collect();
3031 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3032 format!(
3033 "map:{{{}}}",
3034 pairs
3035 .into_iter()
3036 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3037 .collect::<Vec<_>>()
3038 .join(",")
3039 )
3040 }
3041 Value::Node(n) => {
3042 let mut labels = n.labels.clone();
3043 labels.sort();
3044 format!(
3045 "node:{}:{}:{}",
3046 n.vid.as_u64(),
3047 labels.join(":"),
3048 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3049 )
3050 }
3051 Value::Edge(e) => format!(
3052 "edge:{}:{}:{}:{}:{}",
3053 e.eid.as_u64(),
3054 e.edge_type,
3055 e.src.as_u64(),
3056 e.dst.as_u64(),
3057 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3058 ),
3059 Value::Path(p) => format!(
3060 "path:nodes=[{}];edges=[{}]",
3061 p.nodes
3062 .iter()
3063 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3064 .collect::<Vec<_>>()
3065 .join(","),
3066 p.edges
3067 .iter()
3068 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3069 .collect::<Vec<_>>()
3070 .join(",")
3071 ),
3072 Value::Vector(vs) => format!("vec:{:?}", vs),
3073 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3074 _ => format!("{v:?}"),
3075 }
3076 }
3077
3078 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3079 match t {
3080 uni_common::TemporalValue::Date { days_since_epoch } => {
3081 format!("date:{days_since_epoch}")
3082 }
3083 uni_common::TemporalValue::LocalTime {
3084 nanos_since_midnight,
3085 } => format!("localtime:{nanos_since_midnight}"),
3086 uni_common::TemporalValue::Time {
3087 nanos_since_midnight,
3088 offset_seconds,
3089 } => {
3090 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3091 format!("time:{utc_nanos}")
3092 }
3093 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3094 format!("localdatetime:{nanos_since_epoch}")
3095 }
3096 uni_common::TemporalValue::DateTime {
3097 nanos_since_epoch, ..
3098 } => format!("datetime:{nanos_since_epoch}"),
3099 uni_common::TemporalValue::Duration {
3100 months,
3101 days,
3102 nanos,
3103 } => format!("duration:{months}:{days}:{nanos}"),
3104 }
3105 }
3106
3107 fn temporal_string_key(s: &str) -> Option<String> {
3108 let fn_name = match classify_temporal(s)? {
3109 uni_common::TemporalType::Date => "DATE",
3110 uni_common::TemporalType::LocalTime => "LOCALTIME",
3111 uni_common::TemporalType::Time => "TIME",
3112 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3113 uni_common::TemporalType::DateTime => "DATETIME",
3114 uni_common::TemporalType::Duration => "DURATION",
3115 };
3116 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3117 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3118 _ => None,
3119 }
3120 }
3121
3122 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3125
3126 pub(crate) async fn execute_aggregate(
3127 &self,
3128 rows: Vec<HashMap<String, Value>>,
3129 group_by: &[Expr],
3130 aggregates: &[Expr],
3131 prop_manager: &PropertyManager,
3132 params: &HashMap<String, Value>,
3133 ctx: Option<&QueryContext>,
3134 ) -> Result<Vec<HashMap<String, Value>>> {
3135 if let Some(ctx) = ctx {
3137 ctx.check_timeout()?;
3138 }
3139
3140 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3141
3142 if rows.is_empty() {
3145 if group_by.is_empty() {
3146 let accs = Self::create_accumulators(aggregates);
3147 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3148 return Ok(vec![row]);
3149 }
3150 return Ok(vec![]);
3151 }
3152
3153 for (idx, row) in rows.into_iter().enumerate() {
3154 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3156 && let Some(ctx) = ctx
3157 {
3158 ctx.check_timeout()?;
3159 }
3160
3161 let key_vals = self
3162 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3163 .await?;
3164 let key_str = format!(
3167 "[{}]",
3168 key_vals
3169 .iter()
3170 .map(Self::canonical_value_key)
3171 .collect::<Vec<_>>()
3172 .join(",")
3173 );
3174
3175 let entry = groups
3176 .entry(key_str)
3177 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3178
3179 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3180 .await?;
3181 }
3182
3183 let results = groups
3184 .values()
3185 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3186 .collect();
3187
3188 Ok(results)
3189 }
3190
3191 pub(crate) async fn execute_window(
3192 &self,
3193 mut rows: Vec<HashMap<String, Value>>,
3194 window_exprs: &[Expr],
3195 _prop_manager: &PropertyManager,
3196 _params: &HashMap<String, Value>,
3197 ctx: Option<&QueryContext>,
3198 ) -> Result<Vec<HashMap<String, Value>>> {
3199 if let Some(ctx) = ctx {
3201 ctx.check_timeout()?;
3202 }
3203
3204 if rows.is_empty() || window_exprs.is_empty() {
3206 return Ok(rows);
3207 }
3208
3209 for window_expr in window_exprs {
3211 let Expr::FunctionCall {
3213 name,
3214 args,
3215 window_spec: Some(window_spec),
3216 ..
3217 } = window_expr
3218 else {
3219 return Err(anyhow!(
3220 "Window expression must be a FunctionCall with OVER clause: {:?}",
3221 window_expr
3222 ));
3223 };
3224
3225 let name_upper = name.to_uppercase();
3226
3227 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3229 return Err(anyhow!(
3230 "Unsupported window function: {}. Supported functions: {}",
3231 name,
3232 WINDOW_FUNCTIONS.join(", ")
3233 ));
3234 }
3235
3236 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3238
3239 for (row_idx, row) in rows.iter().enumerate() {
3240 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3242 vec![]
3244 } else {
3245 window_spec
3246 .partition_by
3247 .iter()
3248 .map(|expr| self.evaluate_simple_expr(expr, row))
3249 .collect::<Result<Vec<_>>>()?
3250 };
3251
3252 partition_map
3253 .entry(partition_key)
3254 .or_default()
3255 .push(row_idx);
3256 }
3257
3258 for (_partition_key, row_indices) in partition_map.iter_mut() {
3260 if !window_spec.order_by.is_empty() {
3262 row_indices.sort_by(|&a, &b| {
3263 for sort_item in &window_spec.order_by {
3264 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3265 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3266
3267 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3268 let cmp = Executor::compare_values(&va, &vb);
3269 let cmp = if sort_item.ascending {
3270 cmp
3271 } else {
3272 cmp.reverse()
3273 };
3274 if cmp != std::cmp::Ordering::Equal {
3275 return cmp;
3276 }
3277 }
3278 }
3279 std::cmp::Ordering::Equal
3280 });
3281 }
3282
3283 for (position, &row_idx) in row_indices.iter().enumerate() {
3285 let window_value = match name_upper.as_str() {
3286 "ROW_NUMBER" => Value::from((position + 1) as i64),
3287 "RANK" => {
3288 let rank = if position == 0 {
3290 1i64
3291 } else {
3292 let prev_row_idx = row_indices[position - 1];
3293 let same_as_prev = self.rows_have_same_sort_keys(
3294 &window_spec.order_by,
3295 &rows,
3296 row_idx,
3297 prev_row_idx,
3298 );
3299
3300 if same_as_prev {
3301 let mut group_start = position - 1;
3303 while group_start > 0 {
3304 let curr_idx = row_indices[group_start];
3305 let prev_idx = row_indices[group_start - 1];
3306 if !self.rows_have_same_sort_keys(
3307 &window_spec.order_by,
3308 &rows,
3309 curr_idx,
3310 prev_idx,
3311 ) {
3312 break;
3313 }
3314 group_start -= 1;
3315 }
3316 (group_start + 1) as i64
3317 } else {
3318 (position + 1) as i64
3319 }
3320 };
3321 Value::from(rank)
3322 }
3323 "DENSE_RANK" => {
3324 let mut dense_rank = 1i64;
3326 for i in 0..position {
3327 let curr_idx = row_indices[i + 1];
3328 let prev_idx = row_indices[i];
3329 if !self.rows_have_same_sort_keys(
3330 &window_spec.order_by,
3331 &rows,
3332 curr_idx,
3333 prev_idx,
3334 ) {
3335 dense_rank += 1;
3336 }
3337 }
3338 Value::from(dense_rank)
3339 }
3340 "LAG" => {
3341 let (value_expr, offset, default_value) =
3342 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3343
3344 if position >= offset {
3345 let target_idx = row_indices[position - offset];
3346 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3347 } else {
3348 default_value
3349 }
3350 }
3351 "LEAD" => {
3352 let (value_expr, offset, default_value) =
3353 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3354
3355 if position + offset < row_indices.len() {
3356 let target_idx = row_indices[position + offset];
3357 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3358 } else {
3359 default_value
3360 }
3361 }
3362 "NTILE" => {
3363 let num_buckets_expr = args.first().ok_or_else(|| {
3365 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3366 })?;
3367 let num_buckets_val =
3368 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3369 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3370 anyhow!(
3371 "NTILE argument must be an integer, got: {:?}",
3372 num_buckets_val
3373 )
3374 })?;
3375
3376 if num_buckets <= 0 {
3377 return Err(anyhow!(
3378 "NTILE bucket count must be positive, got: {}",
3379 num_buckets
3380 ));
3381 }
3382
3383 let num_buckets = num_buckets as usize;
3384 let partition_size = row_indices.len();
3385
3386 let base_size = partition_size / num_buckets;
3391 let extra_rows = partition_size % num_buckets;
3392
3393 let bucket = if position < extra_rows * (base_size + 1) {
3395 position / (base_size + 1) + 1
3397 } else {
3398 let adjusted_position = position - extra_rows * (base_size + 1);
3400 extra_rows + (adjusted_position / base_size) + 1
3401 };
3402
3403 Value::from(bucket as i64)
3404 }
3405 "FIRST_VALUE" => {
3406 let value_expr = args.first().ok_or_else(|| {
3408 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3409 })?;
3410
3411 if row_indices.is_empty() {
3413 Value::Null
3414 } else {
3415 let first_idx = row_indices[0];
3416 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3417 }
3418 }
3419 "LAST_VALUE" => {
3420 let value_expr = args.first().ok_or_else(|| {
3422 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3423 })?;
3424
3425 if row_indices.is_empty() {
3427 Value::Null
3428 } else {
3429 let last_idx = row_indices[row_indices.len() - 1];
3430 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3431 }
3432 }
3433 "NTH_VALUE" => {
3434 if args.len() != 2 {
3436 return Err(anyhow!(
3437 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3438 ));
3439 }
3440
3441 let value_expr = &args[0];
3442 let n_expr = &args[1];
3443
3444 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3445 let n = n_val.as_i64().ok_or_else(|| {
3446 anyhow!(
3447 "NTH_VALUE second argument must be an integer, got: {:?}",
3448 n_val
3449 )
3450 })?;
3451
3452 if n <= 0 {
3453 return Err(anyhow!(
3454 "NTH_VALUE position must be positive, got: {}",
3455 n
3456 ));
3457 }
3458
3459 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3461 let nth_idx = row_indices[nth_index];
3462 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3463 } else {
3464 Value::Null
3465 }
3466 }
3467 _ => unreachable!("Window function {} already validated", name),
3468 };
3469
3470 let col_name = window_expr.to_string_repr();
3473 rows[row_idx].insert(col_name, window_value);
3474 }
3475 }
3476 }
3477
3478 Ok(rows)
3479 }
3480
3481 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3486 match expr {
3487 Expr::Variable(name) => row
3488 .get(name)
3489 .cloned()
3490 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3491 Expr::Property(base, prop) => {
3492 let base_val = self.evaluate_simple_expr(base, row)?;
3493 if let Value::Map(map) = base_val {
3494 map.get(prop)
3495 .cloned()
3496 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3497 } else {
3498 Err(anyhow!("Cannot access property on non-object"))
3499 }
3500 }
3501 Expr::Literal(lit) => Ok(lit.to_value()),
3502 _ => Err(anyhow!(
3503 "Unsupported expression in window function: {:?}",
3504 expr
3505 )),
3506 }
3507 }
3508
3509 fn rows_have_same_sort_keys(
3511 &self,
3512 order_by: &[uni_cypher::ast::SortItem],
3513 rows: &[HashMap<String, Value>],
3514 idx_a: usize,
3515 idx_b: usize,
3516 ) -> bool {
3517 order_by.iter().all(|sort_item| {
3518 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3519 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3520 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3521 })
3522 }
3523
3524 fn extract_lag_lead_params<'a>(
3526 &self,
3527 func_name: &str,
3528 args: &'a [Expr],
3529 row: &HashMap<String, Value>,
3530 ) -> Result<(&'a Expr, usize, Value)> {
3531 let value_expr = args.first().ok_or_else(|| {
3532 anyhow!(
3533 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3534 func_name,
3535 func_name
3536 )
3537 })?;
3538
3539 let offset = if let Some(offset_expr) = args.get(1) {
3540 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3541 offset_val.as_i64().ok_or_else(|| {
3542 anyhow!(
3543 "{} offset must be an integer, got: {:?}",
3544 func_name,
3545 offset_val
3546 )
3547 })? as usize
3548 } else {
3549 1
3550 };
3551
3552 let default_value = if let Some(default_expr) = args.get(2) {
3553 self.evaluate_simple_expr(default_expr, row)?
3554 } else {
3555 Value::Null
3556 };
3557
3558 Ok((value_expr, offset, default_value))
3559 }
3560
3561 pub(crate) async fn evaluate_group_keys(
3563 &self,
3564 group_by: &[Expr],
3565 row: &HashMap<String, Value>,
3566 prop_manager: &PropertyManager,
3567 params: &HashMap<String, Value>,
3568 ctx: Option<&QueryContext>,
3569 ) -> Result<Vec<Value>> {
3570 let mut key_vals = Vec::new();
3571 for expr in group_by {
3572 key_vals.push(
3573 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3574 .await?,
3575 );
3576 }
3577 Ok(key_vals)
3578 }
3579
3580 pub(crate) async fn update_accumulators(
3582 &self,
3583 accs: &mut [Accumulator],
3584 aggregates: &[Expr],
3585 row: &HashMap<String, Value>,
3586 prop_manager: &PropertyManager,
3587 params: &HashMap<String, Value>,
3588 ctx: Option<&QueryContext>,
3589 ) -> Result<()> {
3590 for (i, agg_expr) in aggregates.iter().enumerate() {
3591 if let Expr::FunctionCall { args, .. } = agg_expr {
3592 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3593 let val = if is_wildcard {
3594 Value::Null
3595 } else {
3596 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3597 .await?
3598 };
3599 accs[i].update(&val, is_wildcard);
3600 }
3601 }
3602 Ok(())
3603 }
3604
3605 pub(crate) async fn execute_recursive_cte(
3607 &self,
3608 cte_name: &str,
3609 initial: LogicalPlan,
3610 recursive: LogicalPlan,
3611 prop_manager: &PropertyManager,
3612 params: &HashMap<String, Value>,
3613 ctx: Option<&QueryContext>,
3614 ) -> Result<Vec<HashMap<String, Value>>> {
3615 use std::collections::HashSet;
3616
3617 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3620 let mut pairs: Vec<_> = row.iter().collect();
3621 pairs.sort_by(|a, b| a.0.cmp(b.0));
3622 format!("{:?}", pairs)
3623 }
3624
3625 let mut working_table = self
3627 .execute_subplan(initial, prop_manager, params, ctx)
3628 .await?;
3629 let mut result_table = working_table.clone();
3630
3631 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3633
3634 let max_iterations = 1000;
3638 for _iteration in 0..max_iterations {
3639 if let Some(ctx) = ctx {
3641 ctx.check_timeout()?;
3642 }
3643
3644 if working_table.is_empty() {
3645 break;
3646 }
3647
3648 let working_val = Value::List(
3650 working_table
3651 .iter()
3652 .map(|row| {
3653 if row.len() == 1 {
3654 row.values().next().unwrap().clone()
3655 } else {
3656 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3657 }
3658 })
3659 .collect(),
3660 );
3661
3662 let mut next_params = params.clone();
3663 next_params.insert(cte_name.to_string(), working_val);
3664
3665 let next_result = self
3667 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3668 .await?;
3669
3670 if next_result.is_empty() {
3671 break;
3672 }
3673
3674 let new_rows: Vec<_> = next_result
3676 .into_iter()
3677 .filter(|row| {
3678 let key = row_key(row);
3679 seen.insert(key) })
3681 .collect();
3682
3683 if new_rows.is_empty() {
3684 break;
3686 }
3687
3688 result_table.extend(new_rows.clone());
3689 working_table = new_rows;
3690 }
3691
3692 let final_list = Value::List(
3694 result_table
3695 .into_iter()
3696 .map(|row| {
3697 if row.len() == 1 {
3709 row.values().next().unwrap().clone()
3710 } else {
3711 Value::Map(row.into_iter().collect())
3712 }
3713 })
3714 .collect(),
3715 );
3716
3717 let mut final_row = HashMap::new();
3718 final_row.insert(cte_name.to_string(), final_list);
3719 Ok(vec![final_row])
3720 }
3721
3722 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3724
3725 pub(crate) async fn execute_sort(
3726 &self,
3727 rows: Vec<HashMap<String, Value>>,
3728 order_by: &[uni_cypher::ast::SortItem],
3729 prop_manager: &PropertyManager,
3730 params: &HashMap<String, Value>,
3731 ctx: Option<&QueryContext>,
3732 ) -> Result<Vec<HashMap<String, Value>>> {
3733 if let Some(ctx) = ctx {
3735 ctx.check_timeout()?;
3736 }
3737
3738 let mut rows_with_keys = Vec::with_capacity(rows.len());
3739 for (idx, row) in rows.into_iter().enumerate() {
3740 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3742 && let Some(ctx) = ctx
3743 {
3744 ctx.check_timeout()?;
3745 }
3746
3747 let mut keys = Vec::new();
3748 for item in order_by {
3749 let val = row
3750 .get(&item.expr.to_string_repr())
3751 .cloned()
3752 .unwrap_or(Value::Null);
3753 let val = if val.is_null() {
3754 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3755 .await
3756 .unwrap_or(Value::Null)
3757 } else {
3758 val
3759 };
3760 keys.push(val);
3761 }
3762 rows_with_keys.push((row, keys));
3763 }
3764
3765 if let Some(ctx) = ctx {
3767 ctx.check_timeout()?;
3768 }
3769
3770 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3771
3772 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3773 }
3774
3775 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3777 aggregates
3778 .iter()
3779 .map(|expr| {
3780 if let Expr::FunctionCall { name, distinct, .. } = expr {
3781 Accumulator::new(name, *distinct)
3782 } else {
3783 Accumulator::new("COUNT", false)
3784 }
3785 })
3786 .collect()
3787 }
3788
3789 pub(crate) fn build_aggregate_result(
3791 group_by: &[Expr],
3792 aggregates: &[Expr],
3793 key_vals: &[Value],
3794 accs: &[Accumulator],
3795 ) -> HashMap<String, Value> {
3796 let mut res_row = HashMap::new();
3797 for (i, expr) in group_by.iter().enumerate() {
3798 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3799 }
3800 for (i, expr) in aggregates.iter().enumerate() {
3801 let col_name = crate::query::planner::aggregate_column_name(expr);
3803 res_row.insert(col_name, accs[i].finish());
3804 }
3805 res_row
3806 }
3807
3808 pub(crate) fn compare_sort_keys(
3810 a_keys: &[Value],
3811 b_keys: &[Value],
3812 order_by: &[uni_cypher::ast::SortItem],
3813 ) -> std::cmp::Ordering {
3814 for (i, item) in order_by.iter().enumerate() {
3815 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3816 if order != std::cmp::Ordering::Equal {
3817 return if item.ascending {
3818 order
3819 } else {
3820 order.reverse()
3821 };
3822 }
3823 }
3824 std::cmp::Ordering::Equal
3825 }
3826
3827 pub(crate) async fn execute_backup(
3831 &self,
3832 destination: &str,
3833 _options: &HashMap<String, Value>,
3834 ) -> Result<Vec<HashMap<String, Value>>> {
3835 if let Some(writer_arc) = &self.writer {
3837 let mut writer = writer_arc.write().await;
3838 writer.flush_to_l1(None).await?;
3839 }
3840
3841 let snapshot_manager = self.storage.snapshot_manager();
3843 let snapshot = snapshot_manager
3844 .load_latest_snapshot()
3845 .await?
3846 .ok_or_else(|| anyhow!("No snapshot found"))?;
3847
3848 if is_cloud_url(destination) {
3850 self.backup_to_cloud(destination, &snapshot.snapshot_id)
3851 .await?;
3852 } else {
3853 let validated_dest = self.validate_path(destination)?;
3855 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3856 .await?;
3857 }
3858
3859 let mut res = HashMap::new();
3860 res.insert(
3861 "status".to_string(),
3862 Value::String("Backup completed".to_string()),
3863 );
3864 res.insert(
3865 "snapshot_id".to_string(),
3866 Value::String(snapshot.snapshot_id),
3867 );
3868 Ok(vec![res])
3869 }
3870
3871 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3873 let source_path = std::path::Path::new(self.storage.base_path());
3874
3875 if !dest_path.exists() {
3876 std::fs::create_dir_all(dest_path)?;
3877 }
3878
3879 if source_path.exists() {
3881 Self::copy_dir_all(source_path, dest_path)?;
3882 }
3883
3884 let schema_manager = self.storage.schema_manager();
3886 let dest_catalog = dest_path.join("catalog");
3887 if !dest_catalog.exists() {
3888 std::fs::create_dir_all(&dest_catalog)?;
3889 }
3890
3891 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3892 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3893
3894 Ok(())
3895 }
3896
3897 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3901 use object_store::ObjectStore;
3902 use object_store::local::LocalFileSystem;
3903 use object_store::path::Path as ObjPath;
3904
3905 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3906 let source_path = std::path::Path::new(self.storage.base_path());
3907
3908 let src_store: Arc<dyn ObjectStore> =
3910 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3911
3912 let catalog_src = ObjPath::from("catalog");
3914 let catalog_dst = if dest_prefix.as_ref().is_empty() {
3915 ObjPath::from("catalog")
3916 } else {
3917 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3918 };
3919 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3920
3921 let storage_src = ObjPath::from("storage");
3923 let storage_dst = if dest_prefix.as_ref().is_empty() {
3924 ObjPath::from("storage")
3925 } else {
3926 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3927 };
3928 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3929
3930 let schema_manager = self.storage.schema_manager();
3932 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3933 let schema_path = if dest_prefix.as_ref().is_empty() {
3934 ObjPath::from("catalog/schema.json")
3935 } else {
3936 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3937 };
3938 dest_store
3939 .put(&schema_path, bytes::Bytes::from(schema_content).into())
3940 .await?;
3941
3942 Ok(())
3943 }
3944
3945 const MAX_BACKUP_DEPTH: usize = 100;
3950
3951 const MAX_BACKUP_FILES: usize = 100_000;
3956
3957 pub(crate) fn copy_dir_all(
3965 src: &std::path::Path,
3966 dst: &std::path::Path,
3967 ) -> std::io::Result<()> {
3968 let mut file_count = 0usize;
3969 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3970 }
3971
3972 pub(crate) fn copy_dir_all_impl(
3974 src: &std::path::Path,
3975 dst: &std::path::Path,
3976 depth: usize,
3977 file_count: &mut usize,
3978 ) -> std::io::Result<()> {
3979 if depth >= Self::MAX_BACKUP_DEPTH {
3980 return Err(std::io::Error::new(
3981 std::io::ErrorKind::InvalidInput,
3982 format!(
3983 "Maximum backup depth {} exceeded at {:?}",
3984 Self::MAX_BACKUP_DEPTH,
3985 src
3986 ),
3987 ));
3988 }
3989
3990 std::fs::create_dir_all(dst)?;
3991
3992 for entry in std::fs::read_dir(src)? {
3993 if *file_count >= Self::MAX_BACKUP_FILES {
3994 return Err(std::io::Error::new(
3995 std::io::ErrorKind::InvalidInput,
3996 format!(
3997 "Maximum backup file count {} exceeded",
3998 Self::MAX_BACKUP_FILES
3999 ),
4000 ));
4001 }
4002 *file_count += 1;
4003
4004 let entry = entry?;
4005 let metadata = entry.metadata()?;
4006
4007 if metadata.file_type().is_symlink() {
4009 continue;
4011 }
4012
4013 let dst_path = dst.join(entry.file_name());
4014 if metadata.is_dir() {
4015 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4016 } else {
4017 std::fs::copy(entry.path(), dst_path)?;
4018 }
4019 }
4020 Ok(())
4021 }
4022
4023 pub(crate) async fn execute_copy(
4024 &self,
4025 target: &str,
4026 source: &str,
4027 options: &HashMap<String, Value>,
4028 prop_manager: &PropertyManager,
4029 ) -> Result<Vec<HashMap<String, Value>>> {
4030 let format = options
4031 .get("format")
4032 .and_then(|v| v.as_str())
4033 .unwrap_or_else(|| {
4034 if source.ends_with(".parquet") {
4035 "parquet"
4036 } else {
4037 "csv"
4038 }
4039 });
4040
4041 match format.to_lowercase().as_str() {
4042 "csv" => self.execute_csv_import(target, source, options).await,
4043 "parquet" => {
4044 self.execute_parquet_import(target, source, options, prop_manager)
4045 .await
4046 }
4047 _ => Err(anyhow!("Unsupported format: {}", format)),
4048 }
4049 }
4050
4051 pub(crate) async fn execute_csv_import(
4052 &self,
4053 target: &str,
4054 source: &str,
4055 options: &HashMap<String, Value>,
4056 ) -> Result<Vec<HashMap<String, Value>>> {
4057 let validated_source = self.validate_path(source)?;
4059
4060 let writer_lock = self
4061 .writer
4062 .as_ref()
4063 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4064
4065 let schema = self.storage.schema_manager().schema();
4066
4067 let label_meta = schema.labels.get(target);
4069 let edge_meta = schema.edge_types.get(target);
4070
4071 if label_meta.is_none() && edge_meta.is_none() {
4072 return Err(anyhow!("Target '{}' not found in schema", target));
4073 }
4074
4075 let delimiter_str = options
4077 .get("delimiter")
4078 .and_then(|v| v.as_str())
4079 .unwrap_or(",");
4080 let delimiter = if delimiter_str.is_empty() {
4081 b','
4082 } else {
4083 delimiter_str.as_bytes()[0]
4084 };
4085 let has_header = options
4086 .get("header")
4087 .and_then(|v| v.as_bool())
4088 .unwrap_or(true);
4089
4090 let mut rdr = csv::ReaderBuilder::new()
4091 .delimiter(delimiter)
4092 .has_headers(has_header)
4093 .from_path(&validated_source)?;
4094
4095 let headers = rdr.headers()?.clone();
4096 let mut count = 0;
4097
4098 let mut writer = writer_lock.write().await;
4099
4100 if label_meta.is_some() {
4101 let target_props = schema
4102 .properties
4103 .get(target)
4104 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4105
4106 for result in rdr.records() {
4107 let record = result?;
4108 let mut props = HashMap::new();
4109
4110 for (i, header) in headers.iter().enumerate() {
4111 if let Some(val_str) = record.get(i)
4112 && let Some(prop_meta) = target_props.get(header)
4113 {
4114 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4115 props.insert(header.to_string(), val);
4116 }
4117 }
4118
4119 let vid = writer.next_vid().await?;
4120 writer
4121 .insert_vertex_with_labels(vid, props, &[target.to_string()])
4122 .await?;
4123 count += 1;
4124 }
4125 } else if let Some(meta) = edge_meta {
4126 let type_id = meta.id;
4127 let target_props = schema
4128 .properties
4129 .get(target)
4130 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4131
4132 let src_col = options
4135 .get("src_col")
4136 .and_then(|v| v.as_str())
4137 .unwrap_or("_src");
4138 let dst_col = options
4139 .get("dst_col")
4140 .and_then(|v| v.as_str())
4141 .unwrap_or("_dst");
4142
4143 for result in rdr.records() {
4144 let record = result?;
4145 let mut props = HashMap::new();
4146 let mut src_vid = None;
4147 let mut dst_vid = None;
4148
4149 for (i, header) in headers.iter().enumerate() {
4150 if let Some(val_str) = record.get(i) {
4151 if header == src_col {
4152 src_vid =
4153 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4154 } else if header == dst_col {
4155 dst_vid =
4156 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4157 } else if let Some(prop_meta) = target_props.get(header) {
4158 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4159 props.insert(header.to_string(), val);
4160 }
4161 }
4162 }
4163
4164 let src =
4165 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4166 let dst = dst_vid
4167 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4168
4169 let eid = writer.next_eid(type_id).await?;
4170 writer
4171 .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4172 .await?;
4173 count += 1;
4174 }
4175 }
4176
4177 let mut res = HashMap::new();
4178 res.insert("count".to_string(), Value::Int(count as i64));
4179 Ok(vec![res])
4180 }
4181
4182 pub(crate) async fn execute_parquet_import(
4186 &self,
4187 target: &str,
4188 source: &str,
4189 options: &HashMap<String, Value>,
4190 _prop_manager: &PropertyManager,
4191 ) -> Result<Vec<HashMap<String, Value>>> {
4192 let writer_lock = self
4193 .writer
4194 .as_ref()
4195 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4196
4197 let schema = self.storage.schema_manager().schema();
4198
4199 let label_meta = schema.labels.get(target);
4201 let edge_meta = schema.edge_types.get(target);
4202
4203 if label_meta.is_none() && edge_meta.is_none() {
4204 return Err(anyhow!("Target '{}' not found in schema", target));
4205 }
4206
4207 let reader = if is_cloud_url(source) {
4209 self.open_parquet_from_cloud(source).await?
4210 } else {
4211 let validated_source = self.validate_path(source)?;
4213 let file = std::fs::File::open(&validated_source)?;
4214 let builder =
4215 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4216 builder.build()?
4217 };
4218 let mut reader = reader;
4219
4220 let mut count = 0;
4221 let mut writer = writer_lock.write().await;
4222
4223 if label_meta.is_some() {
4224 let target_props = schema
4225 .properties
4226 .get(target)
4227 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4228
4229 for batch in reader.by_ref() {
4230 let batch = batch?;
4231 for row in 0..batch.num_rows() {
4232 let mut props = HashMap::new();
4233 for field in batch.schema().fields() {
4234 let name = field.name();
4235 if target_props.contains_key(name) {
4236 let col = batch.column_by_name(name).unwrap();
4237 if !col.is_null(row) {
4238 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4240 let val =
4241 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4242 props.insert(name.clone(), val);
4243 }
4244 }
4245 }
4246 let vid = writer.next_vid().await?;
4247 writer
4248 .insert_vertex_with_labels(vid, props, &[target.to_string()])
4249 .await?;
4250 count += 1;
4251 }
4252 }
4253 } else if let Some(meta) = edge_meta {
4254 let type_id = meta.id;
4255 let target_props = schema
4256 .properties
4257 .get(target)
4258 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4259
4260 let src_col = options
4261 .get("src_col")
4262 .and_then(|v| v.as_str())
4263 .unwrap_or("_src");
4264 let dst_col = options
4265 .get("dst_col")
4266 .and_then(|v| v.as_str())
4267 .unwrap_or("_dst");
4268
4269 for batch in reader {
4270 let batch = batch?;
4271 for row in 0..batch.num_rows() {
4272 let mut props = HashMap::new();
4273 let mut src_vid = None;
4274 let mut dst_vid = None;
4275
4276 for field in batch.schema().fields() {
4277 let name = field.name();
4278 let col = batch.column_by_name(name).unwrap();
4279 if col.is_null(row) {
4280 continue;
4281 }
4282
4283 if name == src_col {
4284 let val = Self::arrow_to_value(col.as_ref(), row);
4285 src_vid = Some(Self::vid_from_value(&val)?);
4286 } else if name == dst_col {
4287 let val = Self::arrow_to_value(col.as_ref(), row);
4288 dst_vid = Some(Self::vid_from_value(&val)?);
4289 } else if let Some(pm) = target_props.get(name) {
4290 let val =
4292 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4293 props.insert(name.clone(), val);
4294 }
4295 }
4296
4297 let src = src_vid
4298 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4299 let dst = dst_vid.ok_or_else(|| {
4300 anyhow!("Missing destination VID in column '{}'", dst_col)
4301 })?;
4302
4303 let eid = writer.next_eid(type_id).await?;
4304 writer
4305 .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4306 .await?;
4307 count += 1;
4308 }
4309 }
4310 }
4311
4312 let mut res = HashMap::new();
4313 res.insert("count".to_string(), Value::Int(count as i64));
4314 Ok(vec![res])
4315 }
4316
4317 async fn open_parquet_from_cloud(
4321 &self,
4322 source_url: &str,
4323 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4324 use object_store::ObjectStore;
4325
4326 let (store, path) = build_store_from_url(source_url)?;
4327
4328 let bytes = store.get(&path).await?.bytes().await?;
4330
4331 let reader = bytes::Bytes::from(bytes.to_vec());
4333 let builder =
4334 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4335 Ok(builder.build()?)
4336 }
4337
4338 pub(crate) async fn scan_edge_type(
4339 &self,
4340 edge_type: &str,
4341 ctx: Option<&QueryContext>,
4342 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4343 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4344
4345 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4347
4348 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4350
4351 if let Some(ctx) = ctx {
4353 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4354 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4355 }
4356
4357 Ok(edges
4358 .into_iter()
4359 .map(|(eid, (src, dst))| (eid, src, dst))
4360 .collect())
4361 }
4362
4363 pub(crate) async fn scan_edge_type_l2(
4368 &self,
4369 _edge_type: &str,
4370 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4371 ) -> Result<()> {
4372 Ok(())
4375 }
4376
4377 pub(crate) async fn scan_edge_type_l1(
4379 &self,
4380 edge_type: &str,
4381 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4382 ) -> Result<()> {
4383 use futures::TryStreamExt;
4384 use lancedb::query::{ExecutableQuery, QueryBase, Select};
4385
4386 if let Ok(ds) = self.storage.delta_dataset(edge_type, "fwd") {
4387 let lancedb_store = self.storage.lancedb_store();
4388 if let Ok(table) = ds.open_lancedb(lancedb_store).await {
4389 let query = table.query().select(Select::Columns(vec![
4390 "eid".into(),
4391 "src_vid".into(),
4392 "dst_vid".into(),
4393 "op".into(),
4394 "_version".into(),
4395 ]));
4396
4397 if let Ok(stream) = query.execute().await {
4398 let batches: Vec<arrow_array::RecordBatch> =
4399 stream.try_collect().await.unwrap_or_default();
4400
4401 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4403 HashMap::new();
4404
4405 for batch in batches {
4406 self.process_delta_batch(&batch, &mut versioned_ops)?;
4407 }
4408
4409 for (eid, (_, op, src, dst)) in versioned_ops {
4411 if op == 0 {
4412 edges.insert(eid, (src, dst));
4413 } else if op == 1 {
4414 edges.remove(&eid);
4415 }
4416 }
4417 }
4418 }
4419 }
4420 Ok(())
4421 }
4422
4423 pub(crate) fn process_delta_batch(
4425 &self,
4426 batch: &arrow_array::RecordBatch,
4427 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4428 ) -> Result<()> {
4429 use arrow_array::UInt64Array;
4430 let eid_col = batch
4431 .column_by_name("eid")
4432 .ok_or(anyhow!("Missing eid"))?
4433 .as_any()
4434 .downcast_ref::<UInt64Array>()
4435 .ok_or(anyhow!("Invalid eid"))?;
4436 let src_col = batch
4437 .column_by_name("src_vid")
4438 .ok_or(anyhow!("Missing src_vid"))?
4439 .as_any()
4440 .downcast_ref::<UInt64Array>()
4441 .ok_or(anyhow!("Invalid src_vid"))?;
4442 let dst_col = batch
4443 .column_by_name("dst_vid")
4444 .ok_or(anyhow!("Missing dst_vid"))?
4445 .as_any()
4446 .downcast_ref::<UInt64Array>()
4447 .ok_or(anyhow!("Invalid dst_vid"))?;
4448 let op_col = batch
4449 .column_by_name("op")
4450 .ok_or(anyhow!("Missing op"))?
4451 .as_any()
4452 .downcast_ref::<arrow_array::UInt8Array>()
4453 .ok_or(anyhow!("Invalid op"))?;
4454 let version_col = batch
4455 .column_by_name("_version")
4456 .ok_or(anyhow!("Missing _version"))?
4457 .as_any()
4458 .downcast_ref::<UInt64Array>()
4459 .ok_or(anyhow!("Invalid _version"))?;
4460
4461 for i in 0..batch.num_rows() {
4462 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4463 let version = version_col.value(i);
4464 let op = op_col.value(i);
4465 let src = Vid::from(src_col.value(i));
4466 let dst = Vid::from(dst_col.value(i));
4467
4468 match versioned_ops.entry(eid) {
4469 std::collections::hash_map::Entry::Vacant(e) => {
4470 e.insert((version, op, src, dst));
4471 }
4472 std::collections::hash_map::Entry::Occupied(mut e) => {
4473 if version > e.get().0 {
4474 e.insert((version, op, src, dst));
4475 }
4476 }
4477 }
4478 }
4479 Ok(())
4480 }
4481
4482 pub(crate) fn scan_edge_type_l0(
4484 &self,
4485 edge_type: &str,
4486 ctx: &QueryContext,
4487 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4488 ) {
4489 let schema = self.storage.schema_manager().schema();
4490 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4491
4492 if let Some(type_id) = type_id {
4493 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4495
4496 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4498 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4499 }
4500
4501 for pending_l0_arc in &ctx.pending_flush_l0s {
4503 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4504 }
4505 }
4506 }
4507
4508 pub(crate) fn scan_single_l0(
4510 &self,
4511 l0: &uni_store::runtime::L0Buffer,
4512 type_id: u32,
4513 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4514 ) {
4515 for edge_entry in l0.graph.edges() {
4516 if edge_entry.edge_type == type_id {
4517 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4518 }
4519 }
4520 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4522 for eid in eids_to_check {
4523 if l0.is_tombstoned(eid) {
4524 edges.remove(&eid);
4525 }
4526 }
4527 }
4528
4529 pub(crate) fn filter_tombstoned_vertex_edges(
4531 &self,
4532 ctx: &QueryContext,
4533 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4534 ) {
4535 let l0 = ctx.l0.read();
4536 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4537
4538 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4540 let tx_l0 = tx_l0_arc.read();
4541 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4542 }
4543
4544 for pending_l0_arc in &ctx.pending_flush_l0s {
4546 let pending_l0 = pending_l0_arc.read();
4547 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4548 }
4549
4550 edges.retain(|_, (src, dst)| {
4551 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4552 });
4553 }
4554
4555 pub(crate) async fn execute_project(
4557 &self,
4558 input_rows: Vec<HashMap<String, Value>>,
4559 projections: &[(Expr, Option<String>)],
4560 prop_manager: &PropertyManager,
4561 params: &HashMap<String, Value>,
4562 ctx: Option<&QueryContext>,
4563 ) -> Result<Vec<HashMap<String, Value>>> {
4564 let mut results = Vec::new();
4565 for m in input_rows {
4566 let mut row = HashMap::new();
4567 for (expr, alias) in projections {
4568 let val = self
4569 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4570 .await?;
4571 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4572 row.insert(name, val);
4573 }
4574 results.push(row);
4575 }
4576 Ok(results)
4577 }
4578
4579 pub(crate) async fn execute_unwind(
4581 &self,
4582 input_rows: Vec<HashMap<String, Value>>,
4583 expr: &Expr,
4584 variable: &str,
4585 prop_manager: &PropertyManager,
4586 params: &HashMap<String, Value>,
4587 ctx: Option<&QueryContext>,
4588 ) -> Result<Vec<HashMap<String, Value>>> {
4589 let mut results = Vec::new();
4590 for row in input_rows {
4591 let val = self
4592 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4593 .await?;
4594 if let Value::List(items) = val {
4595 for item in items {
4596 let mut new_row = row.clone();
4597 new_row.insert(variable.to_string(), item);
4598 results.push(new_row);
4599 }
4600 }
4601 }
4602 Ok(results)
4603 }
4604
4605 pub(crate) async fn execute_apply(
4607 &self,
4608 input_rows: Vec<HashMap<String, Value>>,
4609 subquery: &LogicalPlan,
4610 input_filter: Option<&Expr>,
4611 prop_manager: &PropertyManager,
4612 params: &HashMap<String, Value>,
4613 ctx: Option<&QueryContext>,
4614 ) -> Result<Vec<HashMap<String, Value>>> {
4615 let mut filtered_rows = input_rows;
4616
4617 if let Some(filter) = input_filter {
4618 let mut filtered = Vec::new();
4619 for row in filtered_rows {
4620 let res = self
4621 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4622 .await?;
4623 if res.as_bool().unwrap_or(false) {
4624 filtered.push(row);
4625 }
4626 }
4627 filtered_rows = filtered;
4628 }
4629
4630 if filtered_rows.is_empty() {
4633 let sub_rows = self
4634 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4635 .await?;
4636 return Ok(sub_rows);
4637 }
4638
4639 let mut results = Vec::new();
4640 for row in filtered_rows {
4641 let mut sub_params = params.clone();
4642 sub_params.extend(row.clone());
4643
4644 let sub_rows = self
4645 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4646 .await?;
4647
4648 for sub_row in sub_rows {
4649 let mut new_row = row.clone();
4650 new_row.extend(sub_row);
4651 results.push(new_row);
4652 }
4653 }
4654 Ok(results)
4655 }
4656
4657 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4659 let schema = self.storage.schema_manager().schema();
4660 let mut rows = Vec::new();
4661 for idx in &schema.indexes {
4662 let (name, type_str, details) = match idx {
4663 uni_common::core::schema::IndexDefinition::Vector(c) => (
4664 c.name.clone(),
4665 "VECTOR",
4666 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4667 ),
4668 uni_common::core::schema::IndexDefinition::FullText(c) => (
4669 c.name.clone(),
4670 "FULLTEXT",
4671 format!("on {}:{:?}", c.label, c.properties),
4672 ),
4673 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4674 cfg.name.clone(),
4675 "SCALAR",
4676 format!(":{}({:?})", cfg.label, cfg.properties),
4677 ),
4678 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4679 };
4680
4681 if let Some(f) = filter
4682 && f != type_str
4683 {
4684 continue;
4685 }
4686
4687 let mut row = HashMap::new();
4688 row.insert("name".to_string(), Value::String(name));
4689 row.insert("type".to_string(), Value::String(type_str.to_string()));
4690 row.insert("details".to_string(), Value::String(details));
4691 rows.push(row);
4692 }
4693 rows
4694 }
4695
4696 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4697 let mut row = HashMap::new();
4698 row.insert("name".to_string(), Value::String("uni".to_string()));
4699 vec![row]
4701 }
4702
4703 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4704 vec![]
4706 }
4707
4708 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4709 let snapshot = self
4710 .storage
4711 .snapshot_manager()
4712 .load_latest_snapshot()
4713 .await?;
4714 let mut results = Vec::new();
4715
4716 if let Some(snap) = snapshot {
4717 for (label, s) in &snap.vertices {
4718 let mut row = HashMap::new();
4719 row.insert("type".to_string(), Value::String("Label".to_string()));
4720 row.insert("name".to_string(), Value::String(label.clone()));
4721 row.insert("count".to_string(), Value::Int(s.count as i64));
4722 results.push(row);
4723 }
4724 for (edge, s) in &snap.edges {
4725 let mut row = HashMap::new();
4726 row.insert("type".to_string(), Value::String("Edge".to_string()));
4727 row.insert("name".to_string(), Value::String(edge.clone()));
4728 row.insert("count".to_string(), Value::Int(s.count as i64));
4729 results.push(row);
4730 }
4731 }
4732
4733 Ok(results)
4734 }
4735
4736 pub(crate) fn execute_show_constraints(
4737 &self,
4738 clause: ShowConstraints,
4739 ) -> Vec<HashMap<String, Value>> {
4740 let schema = self.storage.schema_manager().schema();
4741 let mut rows = Vec::new();
4742 for c in &schema.constraints {
4743 if let Some(target) = &clause.target {
4744 match (target, &c.target) {
4745 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4746 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4747 if e1 == e2 => {}
4748 _ => continue,
4749 }
4750 }
4751
4752 let mut row = HashMap::new();
4753 row.insert("name".to_string(), Value::String(c.name.clone()));
4754 let type_str = match c.constraint_type {
4755 ConstraintType::Unique { .. } => "UNIQUE",
4756 ConstraintType::Exists { .. } => "EXISTS",
4757 ConstraintType::Check { .. } => "CHECK",
4758 _ => "UNKNOWN",
4759 };
4760 row.insert("type".to_string(), Value::String(type_str.to_string()));
4761
4762 let target_str = match &c.target {
4763 ConstraintTarget::Label(l) => format!("(:{})", l),
4764 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4765 _ => "UNKNOWN".to_string(),
4766 };
4767 row.insert("target".to_string(), Value::String(target_str));
4768
4769 rows.push(row);
4770 }
4771 rows
4772 }
4773
4774 pub(crate) async fn execute_cross_join(
4776 &self,
4777 left: Box<LogicalPlan>,
4778 right: Box<LogicalPlan>,
4779 prop_manager: &PropertyManager,
4780 params: &HashMap<String, Value>,
4781 ctx: Option<&QueryContext>,
4782 ) -> Result<Vec<HashMap<String, Value>>> {
4783 let left_rows = self
4784 .execute_subplan(*left, prop_manager, params, ctx)
4785 .await?;
4786 let right_rows = self
4787 .execute_subplan(*right, prop_manager, params, ctx)
4788 .await?;
4789
4790 let mut results = Vec::new();
4791 for l in &left_rows {
4792 for r in &right_rows {
4793 let mut combined = l.clone();
4794 combined.extend(r.clone());
4795 results.push(combined);
4796 }
4797 }
4798 Ok(results)
4799 }
4800
4801 pub(crate) async fn execute_union(
4803 &self,
4804 left: Box<LogicalPlan>,
4805 right: Box<LogicalPlan>,
4806 all: bool,
4807 prop_manager: &PropertyManager,
4808 params: &HashMap<String, Value>,
4809 ctx: Option<&QueryContext>,
4810 ) -> Result<Vec<HashMap<String, Value>>> {
4811 let mut left_rows = self
4812 .execute_subplan(*left, prop_manager, params, ctx)
4813 .await?;
4814 let mut right_rows = self
4815 .execute_subplan(*right, prop_manager, params, ctx)
4816 .await?;
4817
4818 left_rows.append(&mut right_rows);
4819
4820 if !all {
4821 let mut seen = HashSet::new();
4822 left_rows.retain(|row| {
4823 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4824 let key = format!("{:?}", sorted_row);
4825 seen.insert(key)
4826 });
4827 }
4828 Ok(left_rows)
4829 }
4830
4831 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4833 let schema = self.storage.schema_manager().schema();
4834 schema.indexes.iter().any(|idx| match idx {
4835 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4836 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4837 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4838 _ => false,
4839 })
4840 }
4841
4842 pub(crate) async fn execute_export(
4843 &self,
4844 target: &str,
4845 source: &str,
4846 options: &HashMap<String, Value>,
4847 prop_manager: &PropertyManager,
4848 ctx: Option<&QueryContext>,
4849 ) -> Result<Vec<HashMap<String, Value>>> {
4850 let format = options
4851 .get("format")
4852 .and_then(|v| v.as_str())
4853 .unwrap_or("csv")
4854 .to_lowercase();
4855
4856 match format.as_str() {
4857 "csv" => {
4858 self.execute_csv_export(target, source, options, prop_manager, ctx)
4859 .await
4860 }
4861 "parquet" => {
4862 self.execute_parquet_export(target, source, options, prop_manager, ctx)
4863 .await
4864 }
4865 _ => Err(anyhow!("Unsupported export format: {}", format)),
4866 }
4867 }
4868
4869 pub(crate) async fn execute_csv_export(
4870 &self,
4871 target: &str,
4872 source: &str,
4873 options: &HashMap<String, Value>,
4874 prop_manager: &PropertyManager,
4875 ctx: Option<&QueryContext>,
4876 ) -> Result<Vec<HashMap<String, Value>>> {
4877 let validated_dest = self.validate_path(source)?;
4879
4880 let schema = self.storage.schema_manager().schema();
4881 let label_meta = schema.labels.get(target);
4882 let edge_meta = schema.edge_types.get(target);
4883
4884 if label_meta.is_none() && edge_meta.is_none() {
4885 return Err(anyhow!("Target '{}' not found in schema", target));
4886 }
4887
4888 let delimiter_str = options
4889 .get("delimiter")
4890 .and_then(|v| v.as_str())
4891 .unwrap_or(",");
4892 let delimiter = if delimiter_str.is_empty() {
4893 b','
4894 } else {
4895 delimiter_str.as_bytes()[0]
4896 };
4897 let has_header = options
4898 .get("header")
4899 .and_then(|v| v.as_bool())
4900 .unwrap_or(true);
4901
4902 let mut wtr = csv::WriterBuilder::new()
4903 .delimiter(delimiter)
4904 .from_path(&validated_dest)?;
4905
4906 let mut count = 0;
4907 let empty_props = HashMap::new();
4909
4910 if let Some(meta) = label_meta {
4911 let label_id = meta.id;
4912 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4913 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4914 prop_names.sort();
4915
4916 let mut headers = vec!["_vid".to_string()];
4917 headers.extend(prop_names.clone());
4918
4919 if has_header {
4920 wtr.write_record(&headers)?;
4921 }
4922
4923 let vids = self
4924 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4925 .await?;
4926
4927 for vid in vids {
4928 let props = prop_manager
4929 .get_all_vertex_props_with_ctx(vid, ctx)
4930 .await?
4931 .unwrap_or_default();
4932
4933 let mut row = Vec::with_capacity(headers.len());
4934 row.push(vid.to_string());
4935 for p_name in &prop_names {
4936 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4937 row.push(self.format_csv_value(val));
4938 }
4939 wtr.write_record(&row)?;
4940 count += 1;
4941 }
4942 } else if let Some(meta) = edge_meta {
4943 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4944 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4945 prop_names.sort();
4946
4947 let mut headers = vec![
4949 "_eid".to_string(),
4950 "_src".to_string(),
4951 "_dst".to_string(),
4952 "_type".to_string(),
4953 ];
4954 headers.extend(prop_names.clone());
4955
4956 if has_header {
4957 wtr.write_record(&headers)?;
4958 }
4959
4960 let edges = self.scan_edge_type(target, ctx).await?;
4961
4962 for (eid, src, dst) in edges {
4963 let props = prop_manager
4964 .get_all_edge_props_with_ctx(eid, ctx)
4965 .await?
4966 .unwrap_or_default();
4967
4968 let mut row = Vec::with_capacity(headers.len());
4969 row.push(eid.to_string());
4970 row.push(src.to_string());
4971 row.push(dst.to_string());
4972 row.push(meta.id.to_string());
4973
4974 for p_name in &prop_names {
4975 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4976 row.push(self.format_csv_value(val));
4977 }
4978 wtr.write_record(&row)?;
4979 count += 1;
4980 }
4981 }
4982
4983 wtr.flush()?;
4984 let mut res = HashMap::new();
4985 res.insert("count".to_string(), Value::Int(count as i64));
4986 Ok(vec![res])
4987 }
4988
4989 pub(crate) async fn execute_parquet_export(
4993 &self,
4994 target: &str,
4995 destination: &str,
4996 _options: &HashMap<String, Value>,
4997 prop_manager: &PropertyManager,
4998 ctx: Option<&QueryContext>,
4999 ) -> Result<Vec<HashMap<String, Value>>> {
5000 let schema_manager = self.storage.schema_manager();
5001 let schema = schema_manager.schema();
5002 let label_meta = schema.labels.get(target);
5003 let edge_meta = schema.edge_types.get(target);
5004
5005 if label_meta.is_none() && edge_meta.is_none() {
5006 return Err(anyhow!("Target '{}' not found in schema", target));
5007 }
5008
5009 let arrow_schema = if label_meta.is_some() {
5010 let dataset = self.storage.vertex_dataset(target)?;
5011 dataset.get_arrow_schema(&schema)?
5012 } else {
5013 let dataset = self.storage.edge_dataset(target, "", "")?;
5015 dataset.get_arrow_schema(&schema)?
5016 };
5017
5018 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5019
5020 if let Some(meta) = label_meta {
5021 let label_id = meta.id;
5022 let vids = self
5023 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5024 .await?;
5025
5026 for vid in vids {
5027 let mut props = prop_manager
5028 .get_all_vertex_props_with_ctx(vid, ctx)
5029 .await?
5030 .unwrap_or_default();
5031
5032 props.insert(
5033 "_vid".to_string(),
5034 uni_common::Value::Int(vid.as_u64() as i64),
5035 );
5036 if !props.contains_key("_uid") {
5037 props.insert(
5038 "_uid".to_string(),
5039 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5040 );
5041 }
5042 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5043 props.insert("_version".to_string(), uni_common::Value::Int(1));
5044 rows.push(props);
5045 }
5046 } else if edge_meta.is_some() {
5047 let edges = self.scan_edge_type(target, ctx).await?;
5048 for (eid, src, dst) in edges {
5049 let mut props = prop_manager
5050 .get_all_edge_props_with_ctx(eid, ctx)
5051 .await?
5052 .unwrap_or_default();
5053
5054 props.insert(
5055 "eid".to_string(),
5056 uni_common::Value::Int(eid.as_u64() as i64),
5057 );
5058 props.insert(
5059 "src_vid".to_string(),
5060 uni_common::Value::Int(src.as_u64() as i64),
5061 );
5062 props.insert(
5063 "dst_vid".to_string(),
5064 uni_common::Value::Int(dst.as_u64() as i64),
5065 );
5066 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5067 props.insert("_version".to_string(), uni_common::Value::Int(1));
5068 rows.push(props);
5069 }
5070 }
5071
5072 if is_cloud_url(destination) {
5074 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5075 .await?;
5076 } else {
5077 let validated_dest = self.validate_path(destination)?;
5079 let file = std::fs::File::create(&validated_dest)?;
5080 let mut writer =
5081 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5082
5083 if !rows.is_empty() {
5085 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5086 writer.write(&batch)?;
5087 }
5088
5089 writer.close()?;
5090 }
5091
5092 let mut res = HashMap::new();
5093 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5094 Ok(vec![res])
5095 }
5096
5097 async fn write_parquet_to_cloud(
5099 &self,
5100 dest_url: &str,
5101 rows: &[HashMap<String, uni_common::Value>],
5102 arrow_schema: &arrow_schema::Schema,
5103 ) -> Result<()> {
5104 use object_store::ObjectStore;
5105
5106 let (store, path) = build_store_from_url(dest_url)?;
5107
5108 let mut buffer = Vec::new();
5110 {
5111 let mut writer = parquet::arrow::ArrowWriter::try_new(
5112 &mut buffer,
5113 Arc::new(arrow_schema.clone()),
5114 None,
5115 )?;
5116
5117 if !rows.is_empty() {
5118 let batch = self.rows_to_batch(rows, arrow_schema)?;
5119 writer.write(&batch)?;
5120 }
5121
5122 writer.close()?;
5123 }
5124
5125 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5127
5128 Ok(())
5129 }
5130
5131 pub(crate) fn rows_to_batch(
5132 &self,
5133 rows: &[HashMap<String, uni_common::Value>],
5134 schema: &arrow_schema::Schema,
5135 ) -> Result<RecordBatch> {
5136 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5137
5138 for field in schema.fields() {
5139 let name = field.name();
5140 let dt = field.data_type();
5141
5142 let values: Vec<uni_common::Value> = rows
5143 .iter()
5144 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5145 .collect();
5146 let array = self.values_to_array(&values, dt)?;
5147 columns.push(array);
5148 }
5149
5150 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5151 }
5152
5153 pub(crate) fn values_to_array(
5156 &self,
5157 values: &[uni_common::Value],
5158 dt: &arrow_schema::DataType,
5159 ) -> Result<Arc<dyn Array>> {
5160 arrow_convert::values_to_array(values, dt)
5161 }
5162
5163 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5164 match val {
5165 Value::Null => "".to_string(),
5166 Value::String(s) => s,
5167 Value::Int(i) => i.to_string(),
5168 Value::Float(f) => f.to_string(),
5169 Value::Bool(b) => b.to_string(),
5170 _ => format!("{}", val),
5171 }
5172 }
5173
5174 pub(crate) fn parse_csv_value(
5175 &self,
5176 s: &str,
5177 data_type: &uni_common::core::schema::DataType,
5178 prop_name: &str,
5179 ) -> Result<Value> {
5180 if s.is_empty() || s.to_lowercase() == "null" {
5181 return Ok(Value::Null);
5182 }
5183
5184 use uni_common::core::schema::DataType;
5185 match data_type {
5186 DataType::String => Ok(Value::String(s.to_string())),
5187 DataType::Int32 | DataType::Int64 => {
5188 let i = s.parse::<i64>().map_err(|_| {
5189 anyhow!(
5190 "Failed to parse integer for property '{}': {}",
5191 prop_name,
5192 s
5193 )
5194 })?;
5195 Ok(Value::Int(i))
5196 }
5197 DataType::Float32 | DataType::Float64 => {
5198 let f = s.parse::<f64>().map_err(|_| {
5199 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5200 })?;
5201 Ok(Value::Float(f))
5202 }
5203 DataType::Bool => {
5204 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5205 anyhow!(
5206 "Failed to parse boolean for property '{}': {}",
5207 prop_name,
5208 s
5209 )
5210 })?;
5211 Ok(Value::Bool(b))
5212 }
5213 DataType::CypherValue => {
5214 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5215 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5216 })?;
5217 Ok(Value::from(json_val))
5218 }
5219 DataType::Vector { .. } => {
5220 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5221 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5222 })?;
5223 Ok(Value::Vector(v))
5224 }
5225 _ => Ok(Value::String(s.to_string())),
5226 }
5227 }
5228
5229 pub(crate) async fn detach_delete_vertex(&self, vid: Vid, writer: &mut Writer) -> Result<()> {
5230 let schema = self.storage.schema_manager().schema();
5231 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5232
5233 let out_graph = self
5235 .storage
5236 .load_subgraph_cached(
5237 &[vid],
5238 &edge_type_ids,
5239 1,
5240 uni_store::runtime::Direction::Outgoing,
5241 Some(writer.l0_manager.get_current()),
5242 )
5243 .await?;
5244
5245 for edge in out_graph.edges() {
5246 writer
5247 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5248 .await?;
5249 }
5250
5251 let in_graph = self
5253 .storage
5254 .load_subgraph_cached(
5255 &[vid],
5256 &edge_type_ids,
5257 1,
5258 uni_store::runtime::Direction::Incoming,
5259 Some(writer.l0_manager.get_current()),
5260 )
5261 .await?;
5262
5263 for edge in in_graph.edges() {
5264 writer
5265 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5266 .await?;
5267 }
5268
5269 Ok(())
5270 }
5271
5272 pub(crate) async fn batch_detach_delete_vertices(
5274 &self,
5275 vids: &[Vid],
5276 labels_per_vid: Vec<Option<Vec<String>>>,
5277 writer: &mut Writer,
5278 ) -> Result<()> {
5279 let schema = self.storage.schema_manager().schema();
5280 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5281
5282 let out_graph = self
5284 .storage
5285 .load_subgraph_cached(
5286 vids,
5287 &edge_type_ids,
5288 1,
5289 uni_store::runtime::Direction::Outgoing,
5290 Some(writer.l0_manager.get_current()),
5291 )
5292 .await?;
5293
5294 for edge in out_graph.edges() {
5295 writer
5296 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5297 .await?;
5298 }
5299
5300 let in_graph = self
5302 .storage
5303 .load_subgraph_cached(
5304 vids,
5305 &edge_type_ids,
5306 1,
5307 uni_store::runtime::Direction::Incoming,
5308 Some(writer.l0_manager.get_current()),
5309 )
5310 .await?;
5311
5312 for edge in in_graph.edges() {
5313 writer
5314 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5315 .await?;
5316 }
5317
5318 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5320 writer.delete_vertex(*vid, labels).await?;
5321 }
5322
5323 Ok(())
5324 }
5325}