1use crate::query::WINDOW_FUNCTIONS;
5use crate::query::datetime::{classify_temporal, eval_datetime_function, parse_datetime_utc};
6use crate::query::expr_eval::{
7 eval_binary_op, eval_in_op, eval_scalar_function, eval_vector_similarity,
8};
9use crate::query::planner::{LogicalPlan, QueryPlanner};
10use crate::query::pushdown::LanceFilterGenerator;
11use crate::types::Value;
12use anyhow::{Result, anyhow};
13
14fn value_to_datetime_utc(val: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
16 match val {
17 Value::Temporal(tv) => {
18 use uni_common::TemporalValue;
19 match tv {
20 TemporalValue::DateTime {
21 nanos_since_epoch, ..
22 }
23 | TemporalValue::LocalDateTime {
24 nanos_since_epoch, ..
25 } => Some(chrono::DateTime::from_timestamp_nanos(*nanos_since_epoch)),
26 TemporalValue::Date { days_since_epoch } => {
27 chrono::DateTime::from_timestamp(*days_since_epoch as i64 * 86400, 0)
28 }
29 _ => None,
30 }
31 }
32 Value::String(s) => parse_datetime_utc(s).ok(),
33 _ => None,
34 }
35}
36use futures::future::BoxFuture;
37use futures::stream::{self, BoxStream, StreamExt};
38use metrics;
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Instant;
42use tracing::instrument;
43use uni_common::core::id::{Eid, Vid};
44use uni_common::core::schema::{ConstraintTarget, ConstraintType, DataType, SchemaManager};
45use uni_cypher::ast::{
46 BinaryOp, ConstraintTarget as AstConstraintTarget, Expr, MapProjectionItem, Quantifier,
47 ShowConstraints, UnaryOp,
48};
49use uni_store::QueryContext;
50use uni_store::cloud::{build_store_from_url, copy_store_prefix, is_cloud_url};
51use uni_store::runtime::property_manager::PropertyManager;
52use uni_store::runtime::writer::Writer;
53use uni_store::storage::arrow_convert;
54use uni_store::storage::index_manager::IndexManager;
55
56use crate::query::df_graph::L0Context;
58use crate::query::df_planner::HybridPhysicalPlanner;
59use datafusion::physical_plan::ExecutionPlanProperties;
60use datafusion::prelude::SessionContext;
61use parking_lot::RwLock as SyncRwLock;
62
63use arrow_array::{Array, RecordBatch};
64use csv;
65use parquet;
66
67use super::core::*;
68
69const EDGE_SYSTEM_FIELD_COUNT: usize = 5;
71const VERTEX_SYSTEM_FIELD_COUNT: usize = 3;
73
74fn collect_l0_vids(
79 ctx: Option<&QueryContext>,
80 extractor: impl Fn(&uni_store::runtime::l0::L0Buffer) -> Vec<Vid>,
81) -> Vec<Vid> {
82 let mut vids = Vec::new();
83 if let Some(ctx) = ctx {
84 vids.extend(extractor(&ctx.l0.read()));
85 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
86 vids.extend(extractor(&tx_l0_arc.read()));
87 }
88 for pending_l0_arc in &ctx.pending_flush_l0s {
89 vids.extend(extractor(&pending_l0_arc.read()));
90 }
91 }
92 vids
93}
94
95async fn hydrate_entity_if_needed(
104 map: &mut HashMap<String, Value>,
105 prop_manager: &PropertyManager,
106 ctx: Option<&QueryContext>,
107) {
108 if let Some(eid_u64) = map.get("_eid").and_then(|v| v.as_u64()) {
110 if map.len() <= EDGE_SYSTEM_FIELD_COUNT {
111 tracing::debug!(
112 "Pushdown fallback: hydrating edge {} at execution time",
113 eid_u64
114 );
115 if let Ok(Some(props)) = prop_manager
116 .get_all_edge_props_with_ctx(Eid::from(eid_u64), ctx)
117 .await
118 {
119 for (key, value) in props {
120 map.entry(key).or_insert(value);
121 }
122 }
123 } else {
124 tracing::trace!(
125 "Pushdown success: edge {} already has {} properties",
126 eid_u64,
127 map.len() - EDGE_SYSTEM_FIELD_COUNT
128 );
129 }
130 return;
131 }
132
133 if let Some(vid_u64) = map.get("_vid").and_then(|v| v.as_u64()) {
135 if map.len() <= VERTEX_SYSTEM_FIELD_COUNT {
136 tracing::debug!(
137 "Pushdown fallback: hydrating vertex {} at execution time",
138 vid_u64
139 );
140 if let Ok(Some(props)) = prop_manager
141 .get_all_vertex_props_with_ctx(Vid::from(vid_u64), ctx)
142 .await
143 {
144 for (key, value) in props {
145 map.entry(key).or_insert(value);
146 }
147 }
148 } else {
149 tracing::trace!(
150 "Pushdown success: vertex {} already has {} properties",
151 vid_u64,
152 map.len() - VERTEX_SYSTEM_FIELD_COUNT
153 );
154 }
155 }
156}
157
158impl Executor {
159 async fn verify_and_filter_candidates(
164 &self,
165 mut candidates: Vec<Vid>,
166 variable: &str,
167 filter: Option<&Expr>,
168 ctx: Option<&QueryContext>,
169 prop_manager: &PropertyManager,
170 params: &HashMap<String, Value>,
171 ) -> Result<Vec<Vid>> {
172 candidates.sort_unstable();
173 candidates.dedup();
174
175 let mut verified_vids = Vec::new();
176 for vid in candidates {
177 let Some(props) = prop_manager.get_all_vertex_props_with_ctx(vid, ctx).await? else {
178 continue; };
180
181 if let Some(expr) = filter {
182 let mut props_map: HashMap<String, Value> = props;
183 props_map.insert("_vid".to_string(), Value::Int(vid.as_u64() as i64));
184
185 let mut row = HashMap::new();
186 row.insert(variable.to_string(), Value::Map(props_map));
187
188 let res = self
189 .evaluate_expr(expr, &row, prop_manager, params, ctx)
190 .await?;
191 if res.as_bool().unwrap_or(false) {
192 verified_vids.push(vid);
193 }
194 } else {
195 verified_vids.push(vid);
196 }
197 }
198
199 Ok(verified_vids)
200 }
201
202 pub(crate) async fn scan_storage_candidates(
203 &self,
204 label_id: u16,
205 variable: &str,
206 filter: Option<&Expr>,
207 ) -> Result<Vec<Vid>> {
208 let schema = self.storage.schema_manager().schema();
209 let label_name = schema
210 .label_name_by_id(label_id)
211 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
212
213 let ds = self.storage.vertex_dataset(label_name)?;
214 let lancedb_store = self.storage.lancedb_store();
215
216 match ds.open_lancedb(lancedb_store).await {
218 Ok(table) => {
219 use arrow_array::UInt64Array;
220 use futures::TryStreamExt;
221 use lancedb::query::{ExecutableQuery, QueryBase, Select};
222
223 let mut query = table.query();
224
225 let empty_props = std::collections::HashMap::new();
230 let label_props = schema.properties.get(label_name).unwrap_or(&empty_props);
231 if let Some(expr) = filter
232 && let Some(sql) = LanceFilterGenerator::generate(
233 std::slice::from_ref(expr),
234 variable,
235 Some(label_props),
236 )
237 {
238 query = query.only_if(format!("_deleted = false AND ({})", sql));
239 } else {
240 query = query.only_if("_deleted = false");
241 }
242
243 let query = query.select(Select::columns(&["_vid"]));
245 let stream = query.execute().await?;
246 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
247
248 let mut vids = Vec::new();
249 for batch in batches {
250 let vid_col = batch
251 .column_by_name("_vid")
252 .ok_or(anyhow!("Missing _vid"))?
253 .as_any()
254 .downcast_ref::<UInt64Array>()
255 .ok_or(anyhow!("Invalid _vid"))?;
256 for i in 0..batch.num_rows() {
257 vids.push(Vid::from(vid_col.value(i)));
258 }
259 }
260 Ok(vids)
261 }
262 Err(e) => {
263 let err_msg = e.to_string().to_lowercase();
266 if err_msg.contains("not found")
267 || err_msg.contains("does not exist")
268 || err_msg.contains("no such file")
269 || err_msg.contains("object not found")
270 {
271 Ok(Vec::new())
272 } else {
273 Err(e)
274 }
275 }
276 }
277 }
278
279 pub(crate) async fn scan_label_with_filter(
280 &self,
281 label_id: u16,
282 variable: &str,
283 filter: Option<&Expr>,
284 ctx: Option<&QueryContext>,
285 prop_manager: &PropertyManager,
286 params: &HashMap<String, Value>,
287 ) -> Result<Vec<Vid>> {
288 let mut candidates = self
289 .scan_storage_candidates(label_id, variable, filter)
290 .await?;
291
292 let schema = self.storage.schema_manager().schema();
294 if let Some(label_name) = schema.label_name_by_id(label_id) {
295 candidates.extend(collect_l0_vids(ctx, |l0| l0.vids_for_label(label_name)));
296 }
297
298 self.verify_and_filter_candidates(candidates, variable, filter, ctx, prop_manager, params)
299 .await
300 }
301
302 pub(crate) fn vid_from_value(val: &Value) -> Result<Vid> {
303 if let Value::Node(node) = val {
305 return Ok(node.vid);
306 }
307 if let Value::Map(map) = val
309 && let Some(vid_val) = map.get("_vid")
310 && let Some(v) = vid_val.as_u64()
311 {
312 return Ok(Vid::from(v));
313 }
314 if let Some(s) = val.as_str()
316 && let Ok(id) = s.parse::<u64>()
317 {
318 return Ok(Vid::new(id));
319 }
320 if let Some(v) = val.as_u64() {
322 return Ok(Vid::from(v));
323 }
324 Err(anyhow!("Invalid Vid format: {:?}", val))
325 }
326
327 fn find_node_by_vid(row: &HashMap<String, Value>, target_vid: Vid) -> Value {
333 for val in row.values() {
334 if let Ok(vid) = Self::vid_from_value(val)
335 && vid == target_vid
336 {
337 return val.clone();
338 }
339 }
340 Value::Map(HashMap::from([(
342 "_vid".to_string(),
343 Value::Int(target_vid.as_u64() as i64),
344 )]))
345 }
346
347 pub async fn create_datafusion_planner(
352 &self,
353 prop_manager: &PropertyManager,
354 params: &HashMap<String, Value>,
355 ) -> Result<(
356 Arc<SyncRwLock<SessionContext>>,
357 HybridPhysicalPlanner,
358 Arc<PropertyManager>,
359 )> {
360 let query_ctx = self.get_context().await;
361 let l0_context = match query_ctx {
362 Some(ref ctx) => L0Context::from_query_context(ctx),
363 None => L0Context::empty(),
364 };
365
366 let prop_manager_arc = Arc::new(PropertyManager::new(
367 self.storage.clone(),
368 self.storage.schema_manager_arc(),
369 prop_manager.cache_size(),
370 ));
371
372 let session = SessionContext::new();
373 crate::query::df_udfs::register_cypher_udfs(&session)?;
374 let session_ctx = Arc::new(SyncRwLock::new(session));
375
376 let mut planner = HybridPhysicalPlanner::with_l0_context(
377 session_ctx.clone(),
378 self.storage.clone(),
379 l0_context,
380 prop_manager_arc.clone(),
381 self.storage.schema_manager().schema(),
382 params.clone(),
383 HashMap::new(),
384 );
385
386 planner = planner.with_algo_registry(self.algo_registry.clone());
387 if let Some(ref registry) = self.procedure_registry {
388 planner = planner.with_procedure_registry(registry.clone());
389 }
390 if let Some(ref xervo_runtime) = self.xervo_runtime {
391 planner = planner.with_xervo_runtime(xervo_runtime.clone());
392 }
393
394 Ok((session_ctx, planner, prop_manager_arc))
395 }
396
397 pub fn collect_batches(
399 session_ctx: &Arc<SyncRwLock<SessionContext>>,
400 execution_plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
401 ) -> BoxFuture<'_, Result<Vec<RecordBatch>>> {
402 Box::pin(async move {
403 use futures::TryStreamExt;
404
405 let task_ctx = session_ctx.read().task_ctx();
406 let partition_count = execution_plan.output_partitioning().partition_count();
407 let mut all_batches = Vec::new();
408 for partition in 0..partition_count {
409 let stream = execution_plan.execute(partition, task_ctx.clone())?;
410 let batches: Vec<RecordBatch> = stream.try_collect().await?;
411 all_batches.extend(batches);
412 }
413 Ok(all_batches)
414 })
415 }
416
417 pub async fn execute_datafusion(
422 &self,
423 plan: LogicalPlan,
424 prop_manager: &PropertyManager,
425 params: &HashMap<String, Value>,
426 ) -> Result<Vec<RecordBatch>> {
427 let (session_ctx, mut planner, prop_manager_arc) =
428 self.create_datafusion_planner(prop_manager, params).await?;
429
430 if Self::contains_write_operations(&plan) {
432 let writer = self
433 .writer
434 .as_ref()
435 .ok_or_else(|| anyhow!("Write operations require a Writer"))?
436 .clone();
437 let query_ctx = self.get_context().await;
438
439 debug_assert!(
440 query_ctx.is_some(),
441 "BUG: query_ctx is None for write operation"
442 );
443
444 let mutation_ctx = Arc::new(crate::query::df_graph::MutationContext {
445 executor: self.clone(),
446 writer,
447 prop_manager: prop_manager_arc,
448 params: params.clone(),
449 query_ctx,
450 });
451 planner = planner.with_mutation_context(mutation_ctx);
452 tracing::debug!(
453 plan_type = Self::get_plan_type(&plan),
454 "Mutation routed to DataFusion engine"
455 );
456 }
457
458 let execution_plan = planner.plan(&plan)?;
459 Self::collect_batches(&session_ctx, execution_plan).await
460 }
461
462 pub(crate) async fn execute_merge_read_plan(
468 &self,
469 plan: LogicalPlan,
470 prop_manager: &PropertyManager,
471 params: &HashMap<String, Value>,
472 merge_variables: Vec<String>,
473 ) -> Result<Vec<HashMap<String, Value>>> {
474 let (session_ctx, planner, _prop_manager_arc) =
475 self.create_datafusion_planner(prop_manager, params).await?;
476
477 let extra: HashMap<String, HashSet<String>> = merge_variables
480 .iter()
481 .map(|v| (v.clone(), ["*".to_string()].into_iter().collect()))
482 .collect();
483 let execution_plan = planner.plan_with_properties(&plan, extra)?;
484 let all_batches = Self::collect_batches(&session_ctx, execution_plan).await?;
485
486 let flat_rows = self.record_batches_to_rows(all_batches)?;
488
489 let rows = flat_rows
492 .into_iter()
493 .map(|mut row| {
494 for var in &merge_variables {
495 if row.contains_key(var) {
497 continue;
498 }
499 let prefix = format!("{}.", var);
500 let dotted_keys: Vec<String> = row
501 .keys()
502 .filter(|k| k.starts_with(&prefix))
503 .cloned()
504 .collect();
505 if !dotted_keys.is_empty() {
506 let mut map = HashMap::new();
507 for key in dotted_keys {
508 let prop_name = key[prefix.len()..].to_string();
509 if let Some(val) = row.remove(&key) {
510 map.insert(prop_name, val);
511 }
512 }
513 row.insert(var.clone(), Value::Map(map));
514 }
515 }
516 row
517 })
518 .collect();
519
520 Ok(rows)
521 }
522
523 fn record_batches_to_rows(
530 &self,
531 batches: Vec<RecordBatch>,
532 ) -> Result<Vec<HashMap<String, Value>>> {
533 let mut rows = Vec::new();
534
535 for batch in batches {
536 let num_rows = batch.num_rows();
537 let schema = batch.schema();
538
539 for row_idx in 0..num_rows {
540 let mut row = HashMap::new();
541
542 for (col_idx, field) in schema.fields().iter().enumerate() {
543 let column = batch.column(col_idx);
544 let data_type =
546 if uni_common::core::schema::is_datetime_struct(field.data_type()) {
547 Some(&uni_common::DataType::DateTime)
548 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
549 Some(&uni_common::DataType::Time)
550 } else {
551 None
552 };
553 let mut value =
554 arrow_convert::arrow_to_value(column.as_ref(), row_idx, data_type);
555
556 if field
559 .metadata()
560 .get("cv_encoded")
561 .is_some_and(|v| v == "true")
562 && let Value::String(s) = &value
563 && let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
564 {
565 value = Value::from(parsed);
566 }
567
568 value = Self::normalize_path_if_needed(value);
570
571 row.insert(field.name().clone(), value);
572 }
573
574 let bare_vars: Vec<String> = row
583 .keys()
584 .filter(|k| !k.contains('.') && matches!(row.get(*k), Some(Value::Map(_))))
585 .cloned()
586 .collect();
587
588 let vid_placeholder_vars: Vec<String> = row
592 .keys()
593 .filter(|k| {
594 !k.contains('.')
595 && matches!(row.get(*k), Some(Value::String(_)))
596 && row.contains_key(&format!("{}._vid", k))
597 })
598 .cloned()
599 .collect();
600
601 for var in &vid_placeholder_vars {
602 let prefix = format!("{}.", var);
604 let mut map = HashMap::new();
605
606 let dotted_keys: Vec<String> = row
607 .keys()
608 .filter(|k| k.starts_with(&prefix))
609 .cloned()
610 .collect();
611
612 for key in &dotted_keys {
613 let prop_name = &key[prefix.len()..];
614 if let Some(val) = row.remove(key) {
615 map.insert(prop_name.to_string(), val);
616 }
617 }
618
619 row.insert(var.clone(), Value::Map(map));
621 }
622
623 for var in &bare_vars {
624 let vid_key = format!("{}._vid", var);
626 let labels_key = format!("{}._labels", var);
627
628 let vid_val = row.remove(&vid_key);
629 let labels_val = row.remove(&labels_key);
630
631 if let Some(Value::Map(map)) = row.get_mut(var) {
632 if let Some(v) = vid_val {
633 map.insert("_vid".to_string(), v);
634 }
635 if let Some(v) = labels_val {
636 map.insert("_labels".to_string(), v);
637 }
638 }
639
640 let eid_key = format!("{}._eid", var);
645 let type_key = format!("{}._type", var);
646
647 let eid_val = row.remove(&eid_key);
648 let type_val = row.remove(&type_key);
649
650 if (eid_val.is_some() || type_val.is_some())
651 && let Some(Value::Map(map)) = row.get_mut(var)
652 {
653 if let Some(v) = eid_val {
654 map.entry("_eid".to_string()).or_insert(v);
655 }
656 if let Some(v) = type_val {
657 map.entry("_type".to_string()).or_insert(v);
658 }
659 }
660
661 let prefix = format!("{}.", var);
663 let helper_keys: Vec<String> = row
664 .keys()
665 .filter(|k| k.starts_with(&prefix))
666 .cloned()
667 .collect();
668 for key in helper_keys {
669 row.remove(&key);
670 }
671 }
672
673 rows.push(row);
674 }
675 }
676
677 Ok(rows)
678 }
679
680 fn normalize_path_if_needed(value: Value) -> Value {
685 match value {
686 Value::Map(map)
687 if map.contains_key("nodes")
688 && (map.contains_key("relationships") || map.contains_key("edges")) =>
689 {
690 Self::normalize_path_map(map)
691 }
692 other => other,
693 }
694 }
695
696 fn normalize_path_map(mut map: HashMap<String, Value>) -> Value {
698 if let Some(Value::List(nodes)) = map.remove("nodes") {
700 let normalized_nodes: Vec<Value> = nodes
701 .into_iter()
702 .map(|n| {
703 if let Value::Map(node_map) = n {
704 Self::normalize_path_node_map(node_map)
705 } else {
706 n
707 }
708 })
709 .collect();
710 map.insert("nodes".to_string(), Value::List(normalized_nodes));
711 }
712
713 let rels_key = if map.contains_key("relationships") {
715 "relationships"
716 } else {
717 "edges"
718 };
719 if let Some(Value::List(rels)) = map.remove(rels_key) {
720 let normalized_rels: Vec<Value> = rels
721 .into_iter()
722 .map(|r| {
723 if let Value::Map(rel_map) = r {
724 Self::normalize_path_edge_map(rel_map)
725 } else {
726 r
727 }
728 })
729 .collect();
730 map.insert("relationships".to_string(), Value::List(normalized_rels));
731 }
732
733 Value::Map(map)
734 }
735
736 fn value_to_id_string(val: Value) -> String {
738 match val {
739 Value::Int(n) => n.to_string(),
740 Value::Float(n) => n.to_string(),
741 Value::String(s) => s,
742 other => other.to_string(),
743 }
744 }
745
746 fn stringify_map_field(map: &mut HashMap<String, Value>, src_key: &str, dst_key: &str) {
749 if let Some(val) = map.remove(src_key) {
750 map.insert(
751 dst_key.to_string(),
752 Value::String(Self::value_to_id_string(val)),
753 );
754 }
755 }
756
757 fn ensure_properties_map(map: &mut HashMap<String, Value>) {
759 match map.get("properties") {
760 Some(props) if !props.is_null() => {}
761 _ => {
762 map.insert("properties".to_string(), Value::Map(HashMap::new()));
763 }
764 }
765 }
766
767 fn normalize_path_node_map(mut map: HashMap<String, Value>) -> Value {
769 Self::stringify_map_field(&mut map, "_vid", "_id");
770 Self::ensure_properties_map(&mut map);
771 Value::Map(map)
772 }
773
774 fn normalize_path_edge_map(mut map: HashMap<String, Value>) -> Value {
776 Self::stringify_map_field(&mut map, "_eid", "_id");
777 Self::stringify_map_field(&mut map, "_src", "_src");
778 Self::stringify_map_field(&mut map, "_dst", "_dst");
779
780 if let Some(type_name) = map.remove("_type_name") {
781 map.insert("_type".to_string(), type_name);
782 }
783
784 Self::ensure_properties_map(&mut map);
785 Value::Map(map)
786 }
787
788 #[instrument(
789 skip(self, prop_manager, params),
790 fields(rows_returned, duration_ms),
791 level = "info"
792 )]
793 pub fn execute<'a>(
794 &'a self,
795 plan: LogicalPlan,
796 prop_manager: &'a PropertyManager,
797 params: &'a HashMap<String, Value>,
798 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
799 Box::pin(async move {
800 let query_type = Self::get_plan_type(&plan);
801 let ctx = self.get_context().await;
802 let start = Instant::now();
803
804 let res = if Self::is_ddl_or_admin(&plan) {
807 self.execute_subplan(plan, prop_manager, params, ctx.as_ref())
808 .await
809 } else {
810 let batches = self
811 .execute_datafusion(plan.clone(), prop_manager, params)
812 .await?;
813 self.record_batches_to_rows(batches)
814 };
815
816 let duration = start.elapsed();
817 metrics::histogram!("uni_query_duration_seconds", "query_type" => query_type)
818 .record(duration.as_secs_f64());
819
820 tracing::Span::current().record("duration_ms", duration.as_millis());
821 match &res {
822 Ok(rows) => {
823 tracing::Span::current().record("rows_returned", rows.len());
824 metrics::counter!("uni_query_rows_returned_total", "query_type" => query_type)
825 .increment(rows.len() as u64);
826 }
827 Err(e) => {
828 let error_type = if e.to_string().contains("timed out") {
829 "timeout"
830 } else if e.to_string().contains("syntax") {
831 "syntax"
832 } else {
833 "execution"
834 };
835 metrics::counter!("uni_query_errors_total", "query_type" => query_type, "error_type" => error_type).increment(1);
836 }
837 }
838
839 res
840 })
841 }
842
843 fn get_plan_type(plan: &LogicalPlan) -> &'static str {
844 match plan {
845 LogicalPlan::Scan { .. } => "read_scan",
846 LogicalPlan::ExtIdLookup { .. } => "read_extid_lookup",
847 LogicalPlan::Traverse { .. } => "read_traverse",
848 LogicalPlan::TraverseMainByType { .. } => "read_traverse_main",
849 LogicalPlan::ScanAll { .. } => "read_scan_all",
850 LogicalPlan::ScanMainByLabels { .. } => "read_scan_main",
851 LogicalPlan::VectorKnn { .. } => "read_vector",
852 LogicalPlan::Create { .. } | LogicalPlan::CreateBatch { .. } => "write_create",
853 LogicalPlan::Merge { .. } => "write_merge",
854 LogicalPlan::Delete { .. } => "write_delete",
855 LogicalPlan::Set { .. } => "write_set",
856 LogicalPlan::Remove { .. } => "write_remove",
857 LogicalPlan::ProcedureCall { .. } => "call",
858 LogicalPlan::Copy { .. } => "copy",
859 LogicalPlan::Backup { .. } => "backup",
860 _ => "other",
861 }
862 }
863
864 fn plan_children(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
874 match plan {
875 LogicalPlan::Project { input, .. }
877 | LogicalPlan::Sort { input, .. }
878 | LogicalPlan::Limit { input, .. }
879 | LogicalPlan::Distinct { input }
880 | LogicalPlan::Aggregate { input, .. }
881 | LogicalPlan::Window { input, .. }
882 | LogicalPlan::Unwind { input, .. }
883 | LogicalPlan::Filter { input, .. }
884 | LogicalPlan::Create { input, .. }
885 | LogicalPlan::CreateBatch { input, .. }
886 | LogicalPlan::Set { input, .. }
887 | LogicalPlan::Remove { input, .. }
888 | LogicalPlan::Delete { input, .. }
889 | LogicalPlan::Merge { input, .. }
890 | LogicalPlan::Foreach { input, .. }
891 | LogicalPlan::Traverse { input, .. }
892 | LogicalPlan::TraverseMainByType { input, .. }
893 | LogicalPlan::BindZeroLengthPath { input, .. }
894 | LogicalPlan::BindPath { input, .. }
895 | LogicalPlan::ShortestPath { input, .. }
896 | LogicalPlan::AllShortestPaths { input, .. }
897 | LogicalPlan::Explain { plan: input, .. } => vec![input.as_ref()],
898
899 LogicalPlan::Apply {
901 input, subquery, ..
902 }
903 | LogicalPlan::SubqueryCall { input, subquery } => {
904 vec![input.as_ref(), subquery.as_ref()]
905 }
906 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
907 vec![left.as_ref(), right.as_ref()]
908 }
909 LogicalPlan::RecursiveCTE {
910 initial, recursive, ..
911 } => vec![initial.as_ref(), recursive.as_ref()],
912 LogicalPlan::QuantifiedPattern {
913 input,
914 pattern_plan,
915 ..
916 } => vec![input.as_ref(), pattern_plan.as_ref()],
917
918 _ => vec![],
920 }
921 }
922
923 fn is_ddl_or_admin(plan: &LogicalPlan) -> bool {
930 match plan {
931 LogicalPlan::CreateLabel(_)
933 | LogicalPlan::CreateEdgeType(_)
934 | LogicalPlan::AlterLabel(_)
935 | LogicalPlan::AlterEdgeType(_)
936 | LogicalPlan::DropLabel(_)
937 | LogicalPlan::DropEdgeType(_)
938 | LogicalPlan::CreateConstraint(_)
939 | LogicalPlan::DropConstraint(_)
940 | LogicalPlan::ShowConstraints(_) => true,
941
942 LogicalPlan::CreateVectorIndex { .. }
944 | LogicalPlan::CreateFullTextIndex { .. }
945 | LogicalPlan::CreateScalarIndex { .. }
946 | LogicalPlan::CreateJsonFtsIndex { .. }
947 | LogicalPlan::DropIndex { .. }
948 | LogicalPlan::ShowIndexes { .. } => true,
949
950 LogicalPlan::ShowDatabase
952 | LogicalPlan::ShowConfig
953 | LogicalPlan::ShowStatistics
954 | LogicalPlan::Vacuum
955 | LogicalPlan::Checkpoint
956 | LogicalPlan::Begin
957 | LogicalPlan::Commit
958 | LogicalPlan::Rollback
959 | LogicalPlan::Copy { .. }
960 | LogicalPlan::CopyTo { .. }
961 | LogicalPlan::CopyFrom { .. }
962 | LogicalPlan::Backup { .. }
963 | LogicalPlan::Explain { .. } => true,
964
965 LogicalPlan::ProcedureCall { procedure_name, .. } => {
968 !Self::is_df_eligible_procedure(procedure_name)
969 }
970
971 LogicalPlan::Project { input, .. }
973 | LogicalPlan::Sort { input, .. }
974 | LogicalPlan::Limit { input, .. }
975 | LogicalPlan::Distinct { input }
976 | LogicalPlan::Aggregate { input, .. }
977 | LogicalPlan::Window { input, .. }
978 | LogicalPlan::Unwind { input, .. }
979 | LogicalPlan::Filter { input, .. }
980 | LogicalPlan::BindZeroLengthPath { input, .. }
981 | LogicalPlan::BindPath { input, .. }
982 | LogicalPlan::ShortestPath { input, .. }
983 | LogicalPlan::AllShortestPaths { input, .. } => Self::is_ddl_or_admin(input),
984
985 LogicalPlan::Apply {
987 input, subquery, ..
988 }
989 | LogicalPlan::SubqueryCall { input, subquery } => {
990 Self::is_ddl_or_admin(input) || Self::is_ddl_or_admin(subquery)
991 }
992 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
993 Self::is_ddl_or_admin(left) || Self::is_ddl_or_admin(right)
994 }
995 LogicalPlan::RecursiveCTE {
996 initial, recursive, ..
997 } => Self::is_ddl_or_admin(initial) || Self::is_ddl_or_admin(recursive),
998 LogicalPlan::QuantifiedPattern {
999 input,
1000 pattern_plan,
1001 ..
1002 } => Self::is_ddl_or_admin(input) || Self::is_ddl_or_admin(pattern_plan),
1003
1004 _ => false,
1005 }
1006 }
1007
1008 fn is_df_eligible_procedure(name: &str) -> bool {
1014 matches!(
1015 name,
1016 "uni.schema.labels"
1017 | "uni.schema.edgeTypes"
1018 | "uni.schema.relationshipTypes"
1019 | "uni.schema.indexes"
1020 | "uni.schema.constraints"
1021 | "uni.schema.labelInfo"
1022 | "uni.vector.query"
1023 | "uni.fts.query"
1024 | "uni.search"
1025 ) || name.starts_with("uni.algo.")
1026 }
1027
1028 fn contains_write_operations(plan: &LogicalPlan) -> bool {
1035 match plan {
1036 LogicalPlan::Create { .. }
1037 | LogicalPlan::CreateBatch { .. }
1038 | LogicalPlan::Merge { .. }
1039 | LogicalPlan::Delete { .. }
1040 | LogicalPlan::Set { .. }
1041 | LogicalPlan::Remove { .. }
1042 | LogicalPlan::Foreach { .. } => true,
1043 _ => Self::plan_children(plan)
1044 .iter()
1045 .any(|child| Self::contains_write_operations(child)),
1046 }
1047 }
1048
1049 pub fn execute_stream(
1054 self,
1055 plan: LogicalPlan,
1056 prop_manager: Arc<PropertyManager>,
1057 params: HashMap<String, Value>,
1058 ) -> BoxStream<'static, Result<Vec<HashMap<String, Value>>>> {
1059 let this = self;
1060 let this_for_ctx = this.clone();
1061
1062 let ctx_stream = stream::once(async move { this_for_ctx.get_context().await });
1063
1064 ctx_stream
1065 .flat_map(move |ctx| {
1066 let plan = plan.clone();
1067 let this = this.clone();
1068 let prop_manager = prop_manager.clone();
1069 let params = params.clone();
1070
1071 let fut = async move {
1072 if Self::is_ddl_or_admin(&plan) {
1073 this.execute_subplan(plan, &prop_manager, ¶ms, ctx.as_ref())
1074 .await
1075 } else {
1076 let batches = this
1077 .execute_datafusion(plan, &prop_manager, ¶ms)
1078 .await?;
1079 this.record_batches_to_rows(batches)
1080 }
1081 };
1082 stream::once(fut).boxed()
1083 })
1084 .boxed()
1085 }
1086
1087 pub(crate) fn arrow_to_value(col: &dyn Array, row: usize) -> Value {
1090 arrow_convert::arrow_to_value(col, row, None)
1091 }
1092
1093 pub(crate) fn evaluate_expr<'a>(
1094 &'a self,
1095 expr: &'a Expr,
1096 row: &'a HashMap<String, Value>,
1097 prop_manager: &'a PropertyManager,
1098 params: &'a HashMap<String, Value>,
1099 ctx: Option<&'a QueryContext>,
1100 ) -> BoxFuture<'a, Result<Value>> {
1101 let this = self;
1102 Box::pin(async move {
1103 let repr = expr.to_string_repr();
1105 if let Some(val) = row.get(&repr) {
1106 return Ok(val.clone());
1107 }
1108
1109 match expr {
1110 Expr::PatternComprehension { .. } => {
1111 Err(anyhow::anyhow!(
1113 "Pattern comprehensions are handled by DataFusion executor"
1114 ))
1115 }
1116 Expr::CollectSubquery(_) => Err(anyhow::anyhow!(
1117 "COLLECT subqueries not yet supported in executor"
1118 )),
1119 Expr::Variable(name) => {
1120 if let Some(val) = row.get(name) {
1121 Ok(val.clone())
1122 } else {
1123 Ok(params.get(name).cloned().unwrap_or(Value::Null))
1124 }
1125 }
1126 Expr::Parameter(name) => Ok(params.get(name).cloned().unwrap_or(Value::Null)),
1127 Expr::Property(var_expr, prop_name) => {
1128 let base_val = this
1129 .evaluate_expr(var_expr, row, prop_manager, params, ctx)
1130 .await?;
1131
1132 if (prop_name == "_vid" || prop_name == "_id")
1134 && let Ok(vid) = Self::vid_from_value(&base_val)
1135 {
1136 return Ok(Value::Int(vid.as_u64() as i64));
1137 }
1138
1139 if let Value::Node(node) = &base_val {
1141 if prop_name == "_vid" || prop_name == "_id" {
1143 return Ok(Value::Int(node.vid.as_u64() as i64));
1144 }
1145 if prop_name == "_labels" {
1146 return Ok(Value::List(
1147 node.labels
1148 .iter()
1149 .map(|l| Value::String(l.clone()))
1150 .collect(),
1151 ));
1152 }
1153 if let Some(val) = node.properties.get(prop_name.as_str()) {
1155 return Ok(val.clone());
1156 }
1157 if let Ok(val) = prop_manager
1159 .get_vertex_prop_with_ctx(node.vid, prop_name, ctx)
1160 .await
1161 {
1162 return Ok(val);
1163 }
1164 return Ok(Value::Null);
1165 }
1166
1167 if let Value::Edge(edge) = &base_val {
1169 if prop_name == "_eid" || prop_name == "_id" {
1171 return Ok(Value::Int(edge.eid.as_u64() as i64));
1172 }
1173 if prop_name == "_type" {
1174 return Ok(Value::String(edge.edge_type.clone()));
1175 }
1176 if prop_name == "_src" {
1177 return Ok(Value::Int(edge.src.as_u64() as i64));
1178 }
1179 if prop_name == "_dst" {
1180 return Ok(Value::Int(edge.dst.as_u64() as i64));
1181 }
1182 if let Some(val) = edge.properties.get(prop_name.as_str()) {
1184 return Ok(val.clone());
1185 }
1186 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, prop_name, ctx).await
1188 {
1189 return Ok(val);
1190 }
1191 return Ok(Value::Null);
1192 }
1193
1194 if let Value::Map(map) = &base_val {
1197 if let Some(val) = map.get(prop_name.as_str()) {
1199 return Ok(val.clone());
1200 }
1201 if let Some(Value::Map(props)) = map.get("properties")
1203 && let Some(val) = props.get(prop_name.as_str())
1204 {
1205 return Ok(val.clone());
1206 }
1207 let vid_opt = map.get("_vid").and_then(|v| v.as_u64()).or_else(|| {
1209 map.get("_id")
1210 .and_then(|v| v.as_str())
1211 .and_then(|s| s.parse::<u64>().ok())
1212 });
1213 if let Some(id) = vid_opt {
1214 let vid = Vid::from(id);
1215 if let Ok(val) = prop_manager
1216 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1217 .await
1218 {
1219 return Ok(val);
1220 }
1221 } else if let Some(id) = map.get("_eid").and_then(|v| v.as_u64()) {
1222 let eid = uni_common::core::id::Eid::from(id);
1223 if let Ok(val) = prop_manager.get_edge_prop(eid, prop_name, ctx).await {
1224 return Ok(val);
1225 }
1226 }
1227 return Ok(Value::Null);
1228 }
1229
1230 if let Ok(vid) = Self::vid_from_value(&base_val) {
1232 return prop_manager
1233 .get_vertex_prop_with_ctx(vid, prop_name, ctx)
1234 .await;
1235 }
1236
1237 if base_val.is_null() {
1238 return Ok(Value::Null);
1239 }
1240
1241 {
1243 use crate::query::datetime::{
1244 eval_duration_accessor, eval_temporal_accessor, is_duration_accessor,
1245 is_duration_string, is_temporal_accessor, is_temporal_string,
1246 };
1247
1248 if let Value::Temporal(tv) = &base_val {
1250 if matches!(tv, uni_common::TemporalValue::Duration { .. }) {
1251 if is_duration_accessor(prop_name) {
1252 return eval_duration_accessor(
1254 &base_val.to_string(),
1255 prop_name,
1256 );
1257 }
1258 } else if is_temporal_accessor(prop_name) {
1259 return eval_temporal_accessor(&base_val.to_string(), prop_name);
1260 }
1261 }
1262
1263 if let Value::String(s) = &base_val {
1265 if is_temporal_string(s) && is_temporal_accessor(prop_name) {
1266 return eval_temporal_accessor(s, prop_name);
1267 }
1268 if is_duration_string(s) && is_duration_accessor(prop_name) {
1269 return eval_duration_accessor(s, prop_name);
1270 }
1271 }
1272 }
1273
1274 Err(anyhow!(
1275 "Cannot access property '{}' on {:?}",
1276 prop_name,
1277 base_val
1278 ))
1279 }
1280 Expr::ArrayIndex {
1281 array: arr_expr,
1282 index: idx_expr,
1283 } => {
1284 let arr_val = this
1285 .evaluate_expr(arr_expr, row, prop_manager, params, ctx)
1286 .await?;
1287 let idx_val = this
1288 .evaluate_expr(idx_expr, row, prop_manager, params, ctx)
1289 .await?;
1290
1291 if let Value::List(arr) = &arr_val {
1292 if let Some(i) = idx_val.as_i64() {
1294 let idx = if i < 0 {
1295 let positive_idx = arr.len() as i64 + i;
1297 if positive_idx < 0 {
1298 return Ok(Value::Null); }
1300 positive_idx as usize
1301 } else {
1302 i as usize
1303 };
1304 if idx < arr.len() {
1305 return Ok(arr[idx].clone());
1306 }
1307 return Ok(Value::Null);
1308 } else if idx_val.is_null() {
1309 return Ok(Value::Null);
1310 } else {
1311 return Err(anyhow::anyhow!(
1312 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
1313 idx_val
1314 ));
1315 }
1316 }
1317 if let Value::Map(map) = &arr_val {
1318 if let Some(key) = idx_val.as_str() {
1319 return Ok(map.get(key).cloned().unwrap_or(Value::Null));
1320 } else if !idx_val.is_null() {
1321 return Err(anyhow::anyhow!(
1322 "TypeError: InvalidArgumentValue - Map index must be a string, got: {:?}",
1323 idx_val
1324 ));
1325 }
1326 }
1327 if let Value::Node(node) = &arr_val {
1329 if let Some(key) = idx_val.as_str() {
1330 if let Some(val) = node.properties.get(key) {
1332 return Ok(val.clone());
1333 }
1334 if let Ok(val) = prop_manager
1336 .get_vertex_prop_with_ctx(node.vid, key, ctx)
1337 .await
1338 {
1339 return Ok(val);
1340 }
1341 return Ok(Value::Null);
1342 } else if !idx_val.is_null() {
1343 return Err(anyhow::anyhow!(
1344 "TypeError: Node index must be a string, got: {:?}",
1345 idx_val
1346 ));
1347 }
1348 }
1349 if let Value::Edge(edge) = &arr_val {
1351 if let Some(key) = idx_val.as_str() {
1352 if let Some(val) = edge.properties.get(key) {
1354 return Ok(val.clone());
1355 }
1356 if let Ok(val) = prop_manager.get_edge_prop(edge.eid, key, ctx).await {
1358 return Ok(val);
1359 }
1360 return Ok(Value::Null);
1361 } else if !idx_val.is_null() {
1362 return Err(anyhow::anyhow!(
1363 "TypeError: Edge index must be a string, got: {:?}",
1364 idx_val
1365 ));
1366 }
1367 }
1368 if let Ok(vid) = Self::vid_from_value(&arr_val)
1370 && let Some(key) = idx_val.as_str()
1371 {
1372 if let Ok(val) = prop_manager.get_vertex_prop_with_ctx(vid, key, ctx).await
1373 {
1374 return Ok(val);
1375 }
1376 return Ok(Value::Null);
1377 }
1378 if arr_val.is_null() {
1379 return Ok(Value::Null);
1380 }
1381 Err(anyhow!(
1382 "TypeError: InvalidArgumentType - cannot index into {:?}",
1383 arr_val
1384 ))
1385 }
1386 Expr::ArraySlice { array, start, end } => {
1387 let arr_val = this
1388 .evaluate_expr(array, row, prop_manager, params, ctx)
1389 .await?;
1390
1391 if let Value::List(arr) = &arr_val {
1392 let len = arr.len();
1393
1394 let start_idx = if let Some(s) = start {
1396 let v = this
1397 .evaluate_expr(s, row, prop_manager, params, ctx)
1398 .await?;
1399 if v.is_null() {
1400 return Ok(Value::Null);
1401 }
1402 let raw = v.as_i64().unwrap_or(0);
1403 if raw < 0 {
1404 (len as i64 + raw).max(0) as usize
1405 } else {
1406 (raw as usize).min(len)
1407 }
1408 } else {
1409 0
1410 };
1411
1412 let end_idx = if let Some(e) = end {
1414 let v = this
1415 .evaluate_expr(e, row, prop_manager, params, ctx)
1416 .await?;
1417 if v.is_null() {
1418 return Ok(Value::Null);
1419 }
1420 let raw = v.as_i64().unwrap_or(len as i64);
1421 if raw < 0 {
1422 (len as i64 + raw).max(0) as usize
1423 } else {
1424 (raw as usize).min(len)
1425 }
1426 } else {
1427 len
1428 };
1429
1430 if start_idx >= end_idx {
1432 return Ok(Value::List(vec![]));
1433 }
1434 let end_idx = end_idx.min(len);
1435 return Ok(Value::List(arr[start_idx..end_idx].to_vec()));
1436 }
1437
1438 if arr_val.is_null() {
1439 return Ok(Value::Null);
1440 }
1441 Err(anyhow!("Cannot slice {:?}", arr_val))
1442 }
1443 Expr::Literal(lit) => Ok(lit.to_value()),
1444 Expr::List(items) => {
1445 let mut vals = Vec::new();
1446 for item in items {
1447 vals.push(
1448 this.evaluate_expr(item, row, prop_manager, params, ctx)
1449 .await?,
1450 );
1451 }
1452 Ok(Value::List(vals))
1453 }
1454 Expr::Map(items) => {
1455 let mut map = HashMap::new();
1456 for (key, value_expr) in items {
1457 let val = this
1458 .evaluate_expr(value_expr, row, prop_manager, params, ctx)
1459 .await?;
1460 map.insert(key.clone(), val);
1461 }
1462 Ok(Value::Map(map))
1463 }
1464 Expr::Exists { query, .. } => {
1465 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1467 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1468
1469 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1470 Ok(plan) => {
1471 let mut sub_params = params.clone();
1472 sub_params.extend(row.clone());
1473
1474 match this.execute(plan, prop_manager, &sub_params).await {
1475 Ok(results) => Ok(Value::Bool(!results.is_empty())),
1476 Err(e) => {
1477 log::debug!("EXISTS subquery execution failed: {}", e);
1478 Ok(Value::Bool(false))
1479 }
1480 }
1481 }
1482 Err(e) => {
1483 log::debug!("EXISTS subquery planning failed: {}", e);
1484 Ok(Value::Bool(false))
1485 }
1486 }
1487 }
1488 Expr::CountSubquery(query) => {
1489 let planner = QueryPlanner::new(this.storage.schema_manager().schema());
1491
1492 let vars_in_scope: Vec<String> = row.keys().cloned().collect();
1493
1494 match planner.plan_with_scope(*query.clone(), vars_in_scope) {
1495 Ok(plan) => {
1496 let mut sub_params = params.clone();
1497 sub_params.extend(row.clone());
1498
1499 match this.execute(plan, prop_manager, &sub_params).await {
1500 Ok(results) => Ok(Value::from(results.len() as i64)),
1501 Err(e) => Err(anyhow!("Subquery execution failed: {}", e)),
1502 }
1503 }
1504 Err(e) => Err(anyhow!("Subquery planning failed: {}", e)),
1505 }
1506 }
1507 Expr::Quantifier {
1508 quantifier,
1509 variable,
1510 list,
1511 predicate,
1512 } => {
1513 let list_val = this
1527 .evaluate_expr(list, row, prop_manager, params, ctx)
1528 .await?;
1529
1530 if list_val.is_null() {
1532 return Ok(Value::Null);
1533 }
1534
1535 let items = match list_val {
1537 Value::List(arr) => arr,
1538 _ => return Err(anyhow!("Quantifier expects a list, got: {:?}", list_val)),
1539 };
1540
1541 let mut satisfied_count = 0;
1543 for item in &items {
1544 let mut item_row = row.clone();
1546 item_row.insert(variable.clone(), item.clone());
1547
1548 let pred_result = this
1550 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1551 .await?;
1552
1553 if let Value::Bool(true) = pred_result {
1555 satisfied_count += 1;
1556 }
1557 }
1558
1559 let result = match quantifier {
1561 Quantifier::All => satisfied_count == items.len(),
1562 Quantifier::Any => satisfied_count > 0,
1563 Quantifier::Single => satisfied_count == 1,
1564 Quantifier::None => satisfied_count == 0,
1565 };
1566
1567 Ok(Value::Bool(result))
1568 }
1569 Expr::ListComprehension {
1570 variable,
1571 list,
1572 where_clause,
1573 map_expr,
1574 } => {
1575 let list_val = this
1582 .evaluate_expr(list, row, prop_manager, params, ctx)
1583 .await?;
1584
1585 if list_val.is_null() {
1587 return Ok(Value::Null);
1588 }
1589
1590 let items = match list_val {
1592 Value::List(arr) => arr,
1593 _ => {
1594 return Err(anyhow!(
1595 "List comprehension expects a list, got: {:?}",
1596 list_val
1597 ));
1598 }
1599 };
1600
1601 let mut results = Vec::new();
1603 for item in &items {
1604 let mut item_row = row.clone();
1606 item_row.insert(variable.clone(), item.clone());
1607
1608 if let Some(predicate) = where_clause {
1610 let pred_result = this
1611 .evaluate_expr(predicate, &item_row, prop_manager, params, ctx)
1612 .await?;
1613
1614 if !matches!(pred_result, Value::Bool(true)) {
1616 continue;
1617 }
1618 }
1619
1620 let mapped_val = this
1622 .evaluate_expr(map_expr, &item_row, prop_manager, params, ctx)
1623 .await?;
1624 results.push(mapped_val);
1625 }
1626
1627 Ok(Value::List(results))
1628 }
1629 Expr::BinaryOp { left, op, right } => {
1630 match op {
1632 BinaryOp::And => {
1633 let l_val = this
1634 .evaluate_expr(left, row, prop_manager, params, ctx)
1635 .await?;
1636 if let Some(false) = l_val.as_bool() {
1638 return Ok(Value::Bool(false));
1639 }
1640 let r_val = this
1641 .evaluate_expr(right, row, prop_manager, params, ctx)
1642 .await?;
1643 eval_binary_op(&l_val, op, &r_val)
1644 }
1645 BinaryOp::Or => {
1646 let l_val = this
1647 .evaluate_expr(left, row, prop_manager, params, ctx)
1648 .await?;
1649 if let Some(true) = l_val.as_bool() {
1651 return Ok(Value::Bool(true));
1652 }
1653 let r_val = this
1654 .evaluate_expr(right, row, prop_manager, params, ctx)
1655 .await?;
1656 eval_binary_op(&l_val, op, &r_val)
1657 }
1658 _ => {
1659 let l_val = this
1661 .evaluate_expr(left, row, prop_manager, params, ctx)
1662 .await?;
1663 let r_val = this
1664 .evaluate_expr(right, row, prop_manager, params, ctx)
1665 .await?;
1666 eval_binary_op(&l_val, op, &r_val)
1667 }
1668 }
1669 }
1670 Expr::In { expr, list } => {
1671 let l_val = this
1672 .evaluate_expr(expr, row, prop_manager, params, ctx)
1673 .await?;
1674 let r_val = this
1675 .evaluate_expr(list, row, prop_manager, params, ctx)
1676 .await?;
1677 eval_in_op(&l_val, &r_val)
1678 }
1679 Expr::UnaryOp { op, expr } => {
1680 let val = this
1681 .evaluate_expr(expr, row, prop_manager, params, ctx)
1682 .await?;
1683 match op {
1684 UnaryOp::Not => {
1685 match val.as_bool() {
1687 Some(b) => Ok(Value::Bool(!b)),
1688 None if val.is_null() => Ok(Value::Null),
1689 None => Err(anyhow!(
1690 "InvalidArgumentType: NOT requires a boolean argument"
1691 )),
1692 }
1693 }
1694 UnaryOp::Neg => {
1695 if let Some(i) = val.as_i64() {
1696 Ok(Value::Int(-i))
1697 } else if let Some(f) = val.as_f64() {
1698 Ok(Value::Float(-f))
1699 } else {
1700 Err(anyhow!("Cannot negate non-numeric value: {:?}", val))
1701 }
1702 }
1703 }
1704 }
1705 Expr::IsNull(expr) => {
1706 let val = this
1707 .evaluate_expr(expr, row, prop_manager, params, ctx)
1708 .await?;
1709 Ok(Value::Bool(val.is_null()))
1710 }
1711 Expr::IsNotNull(expr) => {
1712 let val = this
1713 .evaluate_expr(expr, row, prop_manager, params, ctx)
1714 .await?;
1715 Ok(Value::Bool(!val.is_null()))
1716 }
1717 Expr::IsUnique(_) => {
1718 Err(anyhow!(
1720 "IS UNIQUE can only be used in constraint definitions"
1721 ))
1722 }
1723 Expr::Case {
1724 expr,
1725 when_then,
1726 else_expr,
1727 } => {
1728 if let Some(base_expr) = expr {
1729 let base_val = this
1730 .evaluate_expr(base_expr, row, prop_manager, params, ctx)
1731 .await?;
1732 for (w, t) in when_then {
1733 let w_val = this
1734 .evaluate_expr(w, row, prop_manager, params, ctx)
1735 .await?;
1736 if base_val == w_val {
1737 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1738 }
1739 }
1740 } else {
1741 for (w, t) in when_then {
1742 let w_val = this
1743 .evaluate_expr(w, row, prop_manager, params, ctx)
1744 .await?;
1745 if w_val.as_bool() == Some(true) {
1746 return this.evaluate_expr(t, row, prop_manager, params, ctx).await;
1747 }
1748 }
1749 }
1750 if let Some(e) = else_expr {
1751 return this.evaluate_expr(e, row, prop_manager, params, ctx).await;
1752 }
1753 Ok(Value::Null)
1754 }
1755 Expr::Wildcard => Ok(Value::Null),
1756 Expr::FunctionCall { name, args, .. } => {
1757 if name.eq_ignore_ascii_case("ID") {
1759 if args.len() != 1 {
1760 return Err(anyhow!("id() requires exactly 1 argument"));
1761 }
1762 let val = this
1763 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1764 .await?;
1765 if let Value::Map(map) = &val {
1766 if let Some(vid_val) = map.get("_vid") {
1768 return Ok(vid_val.clone());
1769 }
1770 if let Some(eid_val) = map.get("_eid") {
1772 return Ok(eid_val.clone());
1773 }
1774 if let Some(id_val) = map.get("_id") {
1776 return Ok(id_val.clone());
1777 }
1778 }
1779 return Ok(Value::Null);
1780 }
1781
1782 if name.eq_ignore_ascii_case("ELEMENTID") {
1784 if args.len() != 1 {
1785 return Err(anyhow!("elementId() requires exactly 1 argument"));
1786 }
1787 let val = this
1788 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1789 .await?;
1790 if let Value::Map(map) = &val {
1791 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
1794 return Ok(Value::String(vid_val.to_string()));
1795 }
1796 if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
1799 return Ok(Value::String(eid_val.to_string()));
1800 }
1801 }
1802 return Ok(Value::Null);
1803 }
1804
1805 if name.eq_ignore_ascii_case("TYPE") {
1807 if args.len() != 1 {
1808 return Err(anyhow!("type() requires exactly 1 argument"));
1809 }
1810 let val = this
1811 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1812 .await?;
1813 if let Value::Map(map) = &val
1814 && let Some(type_val) = map.get("_type")
1815 {
1816 if let Some(type_id) =
1818 type_val.as_u64().and_then(|v| u32::try_from(v).ok())
1819 {
1820 if let Some(name) = this
1821 .storage
1822 .schema_manager()
1823 .edge_type_name_by_id_unified(type_id)
1824 {
1825 return Ok(Value::String(name));
1826 }
1827 } else if let Some(name) = type_val.as_str() {
1828 return Ok(Value::String(name.to_string()));
1829 }
1830 }
1831 return Ok(Value::Null);
1832 }
1833
1834 if name.eq_ignore_ascii_case("LABELS") {
1836 if args.len() != 1 {
1837 return Err(anyhow!("labels() requires exactly 1 argument"));
1838 }
1839 let val = this
1840 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1841 .await?;
1842 if let Value::Map(map) = &val
1843 && let Some(labels_val) = map.get("_labels")
1844 {
1845 return Ok(labels_val.clone());
1846 }
1847 return Ok(Value::Null);
1848 }
1849
1850 if name.eq_ignore_ascii_case("PROPERTIES") {
1852 if args.len() != 1 {
1853 return Err(anyhow!("properties() requires exactly 1 argument"));
1854 }
1855 let val = this
1856 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1857 .await?;
1858 if let Value::Map(map) = &val {
1859 let mut props = HashMap::new();
1861 for (k, v) in map.iter() {
1862 if !k.starts_with('_') {
1863 props.insert(k.clone(), v.clone());
1864 }
1865 }
1866 return Ok(Value::Map(props));
1867 }
1868 return Ok(Value::Null);
1869 }
1870
1871 if name.eq_ignore_ascii_case("STARTNODE") {
1873 if args.len() != 1 {
1874 return Err(anyhow!("startNode() requires exactly 1 argument"));
1875 }
1876 let val = this
1877 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1878 .await?;
1879 if let Value::Edge(edge) = &val {
1880 return Ok(Self::find_node_by_vid(row, edge.src));
1881 }
1882 if let Value::Map(map) = &val {
1883 if let Some(start_node) = map.get("_startNode") {
1884 return Ok(start_node.clone());
1885 }
1886 if let Some(src_vid) = map.get("_src_vid") {
1887 return Ok(Value::Map(HashMap::from([(
1888 "_vid".to_string(),
1889 src_vid.clone(),
1890 )])));
1891 }
1892 if let Some(src_id) = map.get("_src")
1894 && let Some(u) = src_id.as_u64()
1895 {
1896 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1897 }
1898 }
1899 return Ok(Value::Null);
1900 }
1901
1902 if name.eq_ignore_ascii_case("ENDNODE") {
1904 if args.len() != 1 {
1905 return Err(anyhow!("endNode() requires exactly 1 argument"));
1906 }
1907 let val = this
1908 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1909 .await?;
1910 if let Value::Edge(edge) = &val {
1911 return Ok(Self::find_node_by_vid(row, edge.dst));
1912 }
1913 if let Value::Map(map) = &val {
1914 if let Some(end_node) = map.get("_endNode") {
1915 return Ok(end_node.clone());
1916 }
1917 if let Some(dst_vid) = map.get("_dst_vid") {
1918 return Ok(Value::Map(HashMap::from([(
1919 "_vid".to_string(),
1920 dst_vid.clone(),
1921 )])));
1922 }
1923 if let Some(dst_id) = map.get("_dst")
1925 && let Some(u) = dst_id.as_u64()
1926 {
1927 return Ok(Self::find_node_by_vid(row, Vid::new(u)));
1928 }
1929 }
1930 return Ok(Value::Null);
1931 }
1932
1933 if name.eq_ignore_ascii_case("HASLABEL") {
1936 if args.len() != 2 {
1937 return Err(anyhow!("hasLabel() requires exactly 2 arguments"));
1938 }
1939 let node_val = this
1940 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
1941 .await?;
1942 let label_val = this
1943 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
1944 .await?;
1945
1946 let label_to_check = label_val.as_str().ok_or_else(|| {
1947 anyhow!("Second argument to hasLabel must be a string")
1948 })?;
1949
1950 let has_label = match &node_val {
1951 Value::Map(map) if map.contains_key("_vid") => {
1953 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1954 labels_arr
1955 .iter()
1956 .any(|l| l.as_str() == Some(label_to_check))
1957 } else {
1958 false
1959 }
1960 }
1961 Value::Map(map) => {
1963 if let Some(Value::List(labels_arr)) = map.get("_labels") {
1964 labels_arr
1965 .iter()
1966 .any(|l| l.as_str() == Some(label_to_check))
1967 } else {
1968 false
1969 }
1970 }
1971 _ => false,
1972 };
1973 return Ok(Value::Bool(has_label));
1974 }
1975
1976 if matches!(
1979 name.to_uppercase().as_str(),
1980 "ANY" | "ALL" | "NONE" | "SINGLE"
1981 ) {
1982 return Err(anyhow!(
1983 "{}() with list comprehensions is not yet supported. Use MATCH with WHERE instead.",
1984 name.to_lowercase()
1985 ));
1986 }
1987
1988 if name.eq_ignore_ascii_case("COALESCE") {
1990 for arg in args {
1991 let val = this
1992 .evaluate_expr(arg, row, prop_manager, params, ctx)
1993 .await?;
1994 if !val.is_null() {
1995 return Ok(val);
1996 }
1997 }
1998 return Ok(Value::Null);
1999 }
2000
2001 if name.eq_ignore_ascii_case("vector_similarity") {
2003 if args.len() != 2 {
2004 return Err(anyhow!("vector_similarity takes 2 arguments"));
2005 }
2006 let v1 = this
2007 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2008 .await?;
2009 let v2 = this
2010 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2011 .await?;
2012 return eval_vector_similarity(&v1, &v2);
2013 }
2014
2015 if name.eq_ignore_ascii_case("uni.temporal.validAt")
2017 || name.eq_ignore_ascii_case("uni.validAt")
2018 || name.eq_ignore_ascii_case("validAt")
2019 {
2020 if args.len() != 4 {
2021 return Err(anyhow!("validAt requires 4 arguments"));
2022 }
2023 let node_val = this
2024 .evaluate_expr(&args[0], row, prop_manager, params, ctx)
2025 .await?;
2026 let start_prop = this
2027 .evaluate_expr(&args[1], row, prop_manager, params, ctx)
2028 .await?
2029 .as_str()
2030 .ok_or(anyhow!("start_prop must be string"))?
2031 .to_string();
2032 let end_prop = this
2033 .evaluate_expr(&args[2], row, prop_manager, params, ctx)
2034 .await?
2035 .as_str()
2036 .ok_or(anyhow!("end_prop must be string"))?
2037 .to_string();
2038 let time_val = this
2039 .evaluate_expr(&args[3], row, prop_manager, params, ctx)
2040 .await?;
2041
2042 let query_time = value_to_datetime_utc(&time_val).ok_or_else(|| {
2043 anyhow!("time argument must be a datetime value or string")
2044 })?;
2045
2046 let valid_from_val: Option<Value> = if let Ok(vid) =
2048 Self::vid_from_value(&node_val)
2049 {
2050 prop_manager
2052 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2053 .await
2054 .ok()
2055 } else if let Value::Map(map) = &node_val {
2056 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2058 let vid = Vid::from(vid_val);
2059 prop_manager
2060 .get_vertex_prop_with_ctx(vid, &start_prop, ctx)
2061 .await
2062 .ok()
2063 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2064 let eid = uni_common::core::id::Eid::from(eid_val);
2066 prop_manager.get_edge_prop(eid, &start_prop, ctx).await.ok()
2067 } else {
2068 map.get(&start_prop).cloned()
2070 }
2071 } else {
2072 return Ok(Value::Bool(false));
2073 };
2074
2075 let valid_from = match valid_from_val {
2076 Some(ref v) => match value_to_datetime_utc(v) {
2077 Some(dt) => dt,
2078 None if v.is_null() => return Ok(Value::Bool(false)),
2079 None => {
2080 return Err(anyhow!(
2081 "Property {} must be a datetime value or string",
2082 start_prop
2083 ));
2084 }
2085 },
2086 None => return Ok(Value::Bool(false)),
2087 };
2088
2089 let valid_to_val: Option<Value> = if let Ok(vid) =
2090 Self::vid_from_value(&node_val)
2091 {
2092 prop_manager
2094 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2095 .await
2096 .ok()
2097 } else if let Value::Map(map) = &node_val {
2098 if let Some(vid_val) = map.get("_vid").and_then(|v| v.as_u64()) {
2100 let vid = Vid::from(vid_val);
2101 prop_manager
2102 .get_vertex_prop_with_ctx(vid, &end_prop, ctx)
2103 .await
2104 .ok()
2105 } else if let Some(eid_val) = map.get("_eid").and_then(|v| v.as_u64()) {
2106 let eid = uni_common::core::id::Eid::from(eid_val);
2108 prop_manager.get_edge_prop(eid, &end_prop, ctx).await.ok()
2109 } else {
2110 map.get(&end_prop).cloned()
2112 }
2113 } else {
2114 return Ok(Value::Bool(false));
2115 };
2116
2117 let valid_to = match valid_to_val {
2118 Some(ref v) => match value_to_datetime_utc(v) {
2119 Some(dt) => Some(dt),
2120 None if v.is_null() => None,
2121 None => {
2122 return Err(anyhow!(
2123 "Property {} must be a datetime value or null",
2124 end_prop
2125 ));
2126 }
2127 },
2128 None => None,
2129 };
2130
2131 let is_valid = valid_from <= query_time
2132 && valid_to.map(|vt| query_time < vt).unwrap_or(true);
2133 return Ok(Value::Bool(is_valid));
2134 }
2135
2136 let mut evaluated_args = Vec::with_capacity(args.len());
2138 for arg in args {
2139 let mut val = this
2140 .evaluate_expr(arg, row, prop_manager, params, ctx)
2141 .await?;
2142
2143 if let Value::Map(ref mut map) = val {
2146 hydrate_entity_if_needed(map, prop_manager, ctx).await;
2147 }
2148
2149 evaluated_args.push(val);
2150 }
2151 eval_scalar_function(name, &evaluated_args)
2152 }
2153 Expr::Reduce {
2154 accumulator,
2155 init,
2156 variable,
2157 list,
2158 expr,
2159 } => {
2160 let mut acc = self
2161 .evaluate_expr(init, row, prop_manager, params, ctx)
2162 .await?;
2163 let list_val = self
2164 .evaluate_expr(list, row, prop_manager, params, ctx)
2165 .await?;
2166
2167 if let Value::List(items) = list_val {
2168 for item in items {
2169 let mut scope = row.clone();
2173 scope.insert(accumulator.clone(), acc.clone());
2174 scope.insert(variable.clone(), item);
2175
2176 acc = self
2177 .evaluate_expr(expr, &scope, prop_manager, params, ctx)
2178 .await?;
2179 }
2180 } else {
2181 return Err(anyhow!("REDUCE list argument must evaluate to a list"));
2182 }
2183 Ok(acc)
2184 }
2185 Expr::ValidAt { .. } => {
2186 Err(anyhow!(
2188 "VALID_AT expression should have been transformed to function call in planner"
2189 ))
2190 }
2191
2192 Expr::LabelCheck { expr, labels } => {
2193 let val = this
2194 .evaluate_expr(expr, row, prop_manager, params, ctx)
2195 .await?;
2196 match &val {
2197 Value::Null => Ok(Value::Null),
2198 Value::Map(map) => {
2199 let is_edge = map.contains_key("_eid")
2201 || map.contains_key("_type_name")
2202 || (map.contains_key("_type") && !map.contains_key("_vid"));
2203
2204 if is_edge {
2205 if labels.len() > 1 {
2207 return Ok(Value::Bool(false));
2208 }
2209 let label_to_check = &labels[0];
2210 let has_type = if let Some(Value::String(t)) = map.get("_type_name")
2211 {
2212 t == label_to_check
2213 } else if let Some(Value::String(t)) = map.get("_type") {
2214 t == label_to_check
2215 } else {
2216 false
2217 };
2218 Ok(Value::Bool(has_type))
2219 } else {
2220 let has_all = labels.iter().all(|label_to_check| {
2222 if let Some(Value::List(labels_arr)) = map.get("_labels") {
2223 labels_arr
2224 .iter()
2225 .any(|l| l.as_str() == Some(label_to_check.as_str()))
2226 } else {
2227 false
2228 }
2229 });
2230 Ok(Value::Bool(has_all))
2231 }
2232 }
2233 _ => Ok(Value::Bool(false)),
2234 }
2235 }
2236
2237 Expr::MapProjection { base, items } => {
2238 let base_value = this
2239 .evaluate_expr(base, row, prop_manager, params, ctx)
2240 .await?;
2241
2242 let properties = match &base_value {
2244 Value::Map(map) => map,
2245 _ => {
2246 return Err(anyhow!(
2247 "Map projection requires object, got {:?}",
2248 base_value
2249 ));
2250 }
2251 };
2252
2253 let mut result_map = HashMap::new();
2254
2255 for item in items {
2256 match item {
2257 MapProjectionItem::Property(prop) => {
2258 if let Some(value) = properties.get(prop.as_str()) {
2259 result_map.insert(prop.clone(), value.clone());
2260 }
2261 }
2262 MapProjectionItem::AllProperties => {
2263 for (key, value) in properties.iter() {
2265 if !key.starts_with('_') {
2266 result_map.insert(key.clone(), value.clone());
2267 }
2268 }
2269 }
2270 MapProjectionItem::LiteralEntry(key, expr) => {
2271 let value = this
2272 .evaluate_expr(expr, row, prop_manager, params, ctx)
2273 .await?;
2274 result_map.insert(key.clone(), value);
2275 }
2276 MapProjectionItem::Variable(var_name) => {
2277 if let Some(value) = row.get(var_name.as_str()) {
2280 result_map.insert(var_name.clone(), value.clone());
2281 }
2282 }
2283 }
2284 }
2285
2286 Ok(Value::Map(result_map))
2287 }
2288 }
2289 })
2290 }
2291
2292 pub(crate) fn execute_subplan<'a>(
2293 &'a self,
2294 plan: LogicalPlan,
2295 prop_manager: &'a PropertyManager,
2296 params: &'a HashMap<String, Value>,
2297 ctx: Option<&'a QueryContext>,
2298 ) -> BoxFuture<'a, Result<Vec<HashMap<String, Value>>>> {
2299 Box::pin(async move {
2300 if let Some(ctx) = ctx {
2301 ctx.check_timeout()?;
2302 }
2303 match plan {
2304 LogicalPlan::Union { left, right, all } => {
2305 self.execute_union(left, right, all, prop_manager, params, ctx)
2306 .await
2307 }
2308 LogicalPlan::CreateVectorIndex {
2309 config,
2310 if_not_exists,
2311 } => {
2312 if if_not_exists && self.index_exists_by_name(&config.name) {
2313 return Ok(vec![]);
2314 }
2315 let idx_mgr = IndexManager::new(
2316 self.storage.base_path(),
2317 self.storage.schema_manager_arc(),
2318 self.storage.lancedb_store_arc(),
2319 );
2320 idx_mgr.create_vector_index(config).await?;
2321 Ok(vec![])
2322 }
2323 LogicalPlan::CreateFullTextIndex {
2324 config,
2325 if_not_exists,
2326 } => {
2327 if if_not_exists && self.index_exists_by_name(&config.name) {
2328 return Ok(vec![]);
2329 }
2330 let idx_mgr = IndexManager::new(
2331 self.storage.base_path(),
2332 self.storage.schema_manager_arc(),
2333 self.storage.lancedb_store_arc(),
2334 );
2335 idx_mgr.create_fts_index(config).await?;
2336 Ok(vec![])
2337 }
2338 LogicalPlan::CreateScalarIndex {
2339 mut config,
2340 if_not_exists,
2341 } => {
2342 if if_not_exists && self.index_exists_by_name(&config.name) {
2343 return Ok(vec![]);
2344 }
2345
2346 let mut modified_properties = Vec::new();
2348
2349 for prop in &config.properties {
2350 if prop.contains('(') && prop.contains(')') {
2352 let gen_col = SchemaManager::generated_column_name(prop);
2353
2354 let sm = self.storage.schema_manager_arc();
2356 if let Err(e) = sm.add_generated_property(
2357 &config.label,
2358 &gen_col,
2359 DataType::String, prop.clone(),
2361 ) {
2362 log::warn!("Failed to add generated property (might exist): {}", e);
2363 }
2364
2365 modified_properties.push(gen_col);
2366 } else {
2367 modified_properties.push(prop.clone());
2369 }
2370 }
2371
2372 config.properties = modified_properties;
2373
2374 let idx_mgr = IndexManager::new(
2375 self.storage.base_path(),
2376 self.storage.schema_manager_arc(),
2377 self.storage.lancedb_store_arc(),
2378 );
2379 idx_mgr.create_scalar_index(config).await?;
2380 Ok(vec![])
2381 }
2382 LogicalPlan::CreateJsonFtsIndex {
2383 config,
2384 if_not_exists,
2385 } => {
2386 if if_not_exists && self.index_exists_by_name(&config.name) {
2387 return Ok(vec![]);
2388 }
2389 let idx_mgr = IndexManager::new(
2390 self.storage.base_path(),
2391 self.storage.schema_manager_arc(),
2392 self.storage.lancedb_store_arc(),
2393 );
2394 idx_mgr.create_json_fts_index(config).await?;
2395 Ok(vec![])
2396 }
2397 LogicalPlan::ShowDatabase => Ok(self.execute_show_database()),
2398 LogicalPlan::ShowConfig => Ok(self.execute_show_config()),
2399 LogicalPlan::ShowStatistics => self.execute_show_statistics().await,
2400 LogicalPlan::Vacuum => {
2401 self.execute_vacuum().await?;
2402 Ok(vec![])
2403 }
2404 LogicalPlan::Checkpoint => {
2405 self.execute_checkpoint().await?;
2406 Ok(vec![])
2407 }
2408 LogicalPlan::CopyTo {
2409 label,
2410 path,
2411 format,
2412 options,
2413 } => {
2414 let count = self
2415 .execute_copy_to(&label, &path, &format, &options)
2416 .await?;
2417 let mut result = HashMap::new();
2418 result.insert("count".to_string(), Value::Int(count as i64));
2419 Ok(vec![result])
2420 }
2421 LogicalPlan::CopyFrom {
2422 label,
2423 path,
2424 format,
2425 options,
2426 } => {
2427 let count = self
2428 .execute_copy_from(&label, &path, &format, &options)
2429 .await?;
2430 let mut result = HashMap::new();
2431 result.insert("count".to_string(), Value::Int(count as i64));
2432 Ok(vec![result])
2433 }
2434 LogicalPlan::CreateLabel(clause) => {
2435 self.execute_create_label(clause).await?;
2436 Ok(vec![])
2437 }
2438 LogicalPlan::CreateEdgeType(clause) => {
2439 self.execute_create_edge_type(clause).await?;
2440 Ok(vec![])
2441 }
2442 LogicalPlan::AlterLabel(clause) => {
2443 self.execute_alter_label(clause).await?;
2444 Ok(vec![])
2445 }
2446 LogicalPlan::AlterEdgeType(clause) => {
2447 self.execute_alter_edge_type(clause).await?;
2448 Ok(vec![])
2449 }
2450 LogicalPlan::DropLabel(clause) => {
2451 self.execute_drop_label(clause).await?;
2452 Ok(vec![])
2453 }
2454 LogicalPlan::DropEdgeType(clause) => {
2455 self.execute_drop_edge_type(clause).await?;
2456 Ok(vec![])
2457 }
2458 LogicalPlan::CreateConstraint(clause) => {
2459 self.execute_create_constraint(clause).await?;
2460 Ok(vec![])
2461 }
2462 LogicalPlan::DropConstraint(clause) => {
2463 self.execute_drop_constraint(clause).await?;
2464 Ok(vec![])
2465 }
2466 LogicalPlan::ShowConstraints(clause) => Ok(self.execute_show_constraints(clause)),
2467 LogicalPlan::DropIndex { name, if_exists } => {
2468 let idx_mgr = IndexManager::new(
2469 self.storage.base_path(),
2470 self.storage.schema_manager_arc(),
2471 self.storage.lancedb_store_arc(),
2472 );
2473 match idx_mgr.drop_index(&name).await {
2474 Ok(_) => Ok(vec![]),
2475 Err(e) => {
2476 if if_exists && e.to_string().contains("not found") {
2477 Ok(vec![])
2478 } else {
2479 Err(e)
2480 }
2481 }
2482 }
2483 }
2484 LogicalPlan::ShowIndexes { filter } => {
2485 Ok(self.execute_show_indexes(filter.as_deref()))
2486 }
2487 LogicalPlan::Scan { .. } => {
2488 unreachable!("Scan is handled by DataFusion engine")
2489 }
2490 LogicalPlan::ExtIdLookup { .. } => {
2491 unreachable!("ExtIdLookup is handled by DataFusion engine")
2492 }
2493 LogicalPlan::ScanAll { .. } => {
2494 unreachable!("ScanAll is handled by DataFusion engine")
2495 }
2496 LogicalPlan::ScanMainByLabels { .. } => {
2497 unreachable!("ScanMainByLabels is handled by DataFusion engine")
2498 }
2499 LogicalPlan::Traverse { .. } => {
2500 unreachable!("Traverse is handled by DataFusion engine")
2501 }
2502 LogicalPlan::TraverseMainByType { .. } => {
2503 unreachable!("TraverseMainByType is handled by DataFusion engine")
2504 }
2505 LogicalPlan::Filter {
2506 input,
2507 predicate,
2508 optional_variables,
2509 } => {
2510 let input_matches = self
2511 .execute_subplan(*input, prop_manager, params, ctx)
2512 .await?;
2513
2514 tracing::debug!(
2515 "Filter: Evaluating predicate {:?} on {} input rows, optional_vars={:?}",
2516 predicate,
2517 input_matches.len(),
2518 optional_variables
2519 );
2520
2521 if !optional_variables.is_empty() {
2525 let is_optional_key = |k: &str| -> bool {
2528 optional_variables.contains(k)
2529 || optional_variables
2530 .iter()
2531 .any(|var| k.starts_with(&format!("{}.", var)))
2532 };
2533
2534 let is_internal_key =
2536 |k: &str| -> bool { k.starts_with("__") || k.starts_with("_") };
2537
2538 let non_optional_vars: Vec<String> = input_matches
2540 .first()
2541 .map(|row| {
2542 row.keys()
2543 .filter(|k| !is_optional_key(k) && !is_internal_key(k))
2544 .cloned()
2545 .collect()
2546 })
2547 .unwrap_or_default();
2548
2549 let mut groups: std::collections::HashMap<
2551 Vec<u8>,
2552 Vec<HashMap<String, Value>>,
2553 > = std::collections::HashMap::new();
2554
2555 for row in &input_matches {
2556 let key: Vec<u8> = non_optional_vars
2558 .iter()
2559 .map(|var| {
2560 row.get(var).map(|v| format!("{:?}", v)).unwrap_or_default()
2561 })
2562 .collect::<Vec<_>>()
2563 .join("|")
2564 .into_bytes();
2565
2566 groups.entry(key).or_default().push(row.clone());
2567 }
2568
2569 let mut filtered = Vec::new();
2570 for (_key, group_rows) in groups {
2571 let mut group_passed = Vec::new();
2572
2573 for row in &group_rows {
2574 let has_null_optional = optional_variables.iter().any(|var| {
2576 let direct_null =
2578 matches!(row.get(var), Some(Value::Null) | None);
2579 let prefixed_null = row
2580 .keys()
2581 .filter(|k| k.starts_with(&format!("{}.", var)))
2582 .any(|k| matches!(row.get(k), Some(Value::Null)));
2583 direct_null || prefixed_null
2584 });
2585
2586 if has_null_optional {
2587 group_passed.push(row.clone());
2588 continue;
2589 }
2590
2591 let res = self
2592 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2593 .await?;
2594
2595 if res.as_bool().unwrap_or(false) {
2596 group_passed.push(row.clone());
2597 }
2598 }
2599
2600 if group_passed.is_empty() {
2601 if let Some(template) = group_rows.first() {
2604 let mut null_row = HashMap::new();
2605 for (k, v) in template {
2606 if is_optional_key(k) {
2607 null_row.insert(k.clone(), Value::Null);
2608 } else {
2609 null_row.insert(k.clone(), v.clone());
2610 }
2611 }
2612 filtered.push(null_row);
2613 }
2614 } else {
2615 filtered.extend(group_passed);
2616 }
2617 }
2618
2619 tracing::debug!(
2620 "Filter (OPTIONAL): {} input rows -> {} output rows",
2621 input_matches.len(),
2622 filtered.len()
2623 );
2624
2625 return Ok(filtered);
2626 }
2627
2628 let mut filtered = Vec::new();
2630 for row in input_matches.iter() {
2631 let res = self
2632 .evaluate_expr(&predicate, row, prop_manager, params, ctx)
2633 .await?;
2634
2635 let passes = res.as_bool().unwrap_or(false);
2636
2637 if passes {
2638 filtered.push(row.clone());
2639 }
2640 }
2641
2642 tracing::debug!(
2643 "Filter: {} input rows -> {} output rows",
2644 input_matches.len(),
2645 filtered.len()
2646 );
2647
2648 Ok(filtered)
2649 }
2650 LogicalPlan::ProcedureCall {
2651 procedure_name,
2652 arguments,
2653 yield_items,
2654 } => {
2655 let yield_names: Vec<String> =
2656 yield_items.iter().map(|(n, _)| n.clone()).collect();
2657 let results = self
2658 .execute_procedure(
2659 &procedure_name,
2660 &arguments,
2661 &yield_names,
2662 prop_manager,
2663 params,
2664 ctx,
2665 )
2666 .await?;
2667
2668 let has_aliases = yield_items.iter().any(|(_, a)| a.is_some());
2674 if !has_aliases {
2675 Ok(results)
2678 } else {
2679 let mut aliased_results = Vec::with_capacity(results.len());
2680 for row in results {
2681 let mut new_row = HashMap::new();
2682 for (name, alias) in &yield_items {
2683 let col_name = alias.as_ref().unwrap_or(name);
2684 let val = row.get(name).cloned().unwrap_or(Value::Null);
2685 new_row.insert(col_name.clone(), val);
2686 }
2687 aliased_results.push(new_row);
2688 }
2689 Ok(aliased_results)
2690 }
2691 }
2692 LogicalPlan::VectorKnn { .. } => {
2693 unreachable!("VectorKnn is handled by DataFusion engine")
2694 }
2695 LogicalPlan::InvertedIndexLookup { .. } => {
2696 unreachable!("InvertedIndexLookup is handled by DataFusion engine")
2697 }
2698 LogicalPlan::Sort { input, order_by } => {
2699 let rows = self
2700 .execute_subplan(*input, prop_manager, params, ctx)
2701 .await?;
2702 self.execute_sort(rows, &order_by, prop_manager, params, ctx)
2703 .await
2704 }
2705 LogicalPlan::Limit { input, skip, fetch } => {
2706 let rows = self
2707 .execute_subplan(*input, prop_manager, params, ctx)
2708 .await?;
2709 let skip = skip.unwrap_or(0);
2710 let take = fetch.unwrap_or(usize::MAX);
2711 Ok(rows.into_iter().skip(skip).take(take).collect())
2712 }
2713 LogicalPlan::Aggregate {
2714 input,
2715 group_by,
2716 aggregates,
2717 } => {
2718 let rows = self
2719 .execute_subplan(*input, prop_manager, params, ctx)
2720 .await?;
2721 self.execute_aggregate(rows, &group_by, &aggregates, prop_manager, params, ctx)
2722 .await
2723 }
2724 LogicalPlan::Window {
2725 input,
2726 window_exprs,
2727 } => {
2728 let rows = self
2729 .execute_subplan(*input, prop_manager, params, ctx)
2730 .await?;
2731 self.execute_window(rows, &window_exprs, prop_manager, params, ctx)
2732 .await
2733 }
2734 LogicalPlan::Project { input, projections } => {
2735 let matches = self
2736 .execute_subplan(*input, prop_manager, params, ctx)
2737 .await?;
2738 self.execute_project(matches, &projections, prop_manager, params, ctx)
2739 .await
2740 }
2741 LogicalPlan::Distinct { input } => {
2742 let rows = self
2743 .execute_subplan(*input, prop_manager, params, ctx)
2744 .await?;
2745 let mut seen = std::collections::HashSet::new();
2746 let mut result = Vec::new();
2747 for row in rows {
2748 let key = Self::canonical_row_key(&row);
2749 if seen.insert(key) {
2750 result.push(row);
2751 }
2752 }
2753 Ok(result)
2754 }
2755 LogicalPlan::Unwind {
2756 input,
2757 expr,
2758 variable,
2759 } => {
2760 let input_rows = self
2761 .execute_subplan(*input, prop_manager, params, ctx)
2762 .await?;
2763 self.execute_unwind(input_rows, &expr, &variable, prop_manager, params, ctx)
2764 .await
2765 }
2766 LogicalPlan::Apply {
2767 input,
2768 subquery,
2769 input_filter,
2770 } => {
2771 let input_rows = self
2772 .execute_subplan(*input, prop_manager, params, ctx)
2773 .await?;
2774 self.execute_apply(
2775 input_rows,
2776 &subquery,
2777 input_filter.as_ref(),
2778 prop_manager,
2779 params,
2780 ctx,
2781 )
2782 .await
2783 }
2784 LogicalPlan::SubqueryCall { input, subquery } => {
2785 let input_rows = self
2786 .execute_subplan(*input, prop_manager, params, ctx)
2787 .await?;
2788 self.execute_apply(input_rows, &subquery, None, prop_manager, params, ctx)
2791 .await
2792 }
2793 LogicalPlan::RecursiveCTE {
2794 cte_name,
2795 initial,
2796 recursive,
2797 } => {
2798 self.execute_recursive_cte(
2799 &cte_name,
2800 *initial,
2801 *recursive,
2802 prop_manager,
2803 params,
2804 ctx,
2805 )
2806 .await
2807 }
2808 LogicalPlan::CrossJoin { left, right } => {
2809 self.execute_cross_join(left, right, prop_manager, params, ctx)
2810 .await
2811 }
2812 LogicalPlan::Set { .. }
2813 | LogicalPlan::Remove { .. }
2814 | LogicalPlan::Merge { .. }
2815 | LogicalPlan::Create { .. }
2816 | LogicalPlan::CreateBatch { .. } => {
2817 unreachable!("mutations are handled by DataFusion engine")
2818 }
2819 LogicalPlan::Delete { .. } => {
2820 unreachable!("mutations are handled by DataFusion engine")
2821 }
2822 LogicalPlan::Begin => {
2823 if let Some(writer_lock) = &self.writer {
2824 let mut writer = writer_lock.write().await;
2825 writer.begin_transaction()?;
2826 } else {
2827 return Err(anyhow!("Transaction requires a Writer"));
2828 }
2829 Ok(vec![HashMap::new()])
2830 }
2831 LogicalPlan::Commit => {
2832 if let Some(writer_lock) = &self.writer {
2833 let mut writer = writer_lock.write().await;
2834 writer.commit_transaction().await?;
2835 } else {
2836 return Err(anyhow!("Transaction requires a Writer"));
2837 }
2838 Ok(vec![HashMap::new()])
2839 }
2840 LogicalPlan::Rollback => {
2841 if let Some(writer_lock) = &self.writer {
2842 let mut writer = writer_lock.write().await;
2843 writer.rollback_transaction()?;
2844 } else {
2845 return Err(anyhow!("Transaction requires a Writer"));
2846 }
2847 Ok(vec![HashMap::new()])
2848 }
2849 LogicalPlan::Copy {
2850 target,
2851 source,
2852 is_export,
2853 options,
2854 } => {
2855 if is_export {
2856 self.execute_export(&target, &source, &options, prop_manager, ctx)
2857 .await
2858 } else {
2859 self.execute_copy(&target, &source, &options, prop_manager)
2860 .await
2861 }
2862 }
2863 LogicalPlan::Backup {
2864 destination,
2865 options,
2866 } => self.execute_backup(&destination, &options).await,
2867 LogicalPlan::Explain { plan } => {
2868 let plan_str = format!("{:#?}", plan);
2869 let mut row = HashMap::new();
2870 row.insert("plan".to_string(), Value::String(plan_str));
2871 Ok(vec![row])
2872 }
2873 LogicalPlan::ShortestPath { .. } => {
2874 unreachable!("ShortestPath is handled by DataFusion engine")
2875 }
2876 LogicalPlan::AllShortestPaths { .. } => {
2877 unreachable!("AllShortestPaths is handled by DataFusion engine")
2878 }
2879 LogicalPlan::Foreach { .. } => {
2880 unreachable!("mutations are handled by DataFusion engine")
2881 }
2882 LogicalPlan::Empty => Ok(vec![HashMap::new()]),
2883 LogicalPlan::BindZeroLengthPath { .. } => {
2884 unreachable!("BindZeroLengthPath is handled by DataFusion engine")
2885 }
2886 LogicalPlan::BindPath { .. } => {
2887 unreachable!("BindPath is handled by DataFusion engine")
2888 }
2889 LogicalPlan::QuantifiedPattern { .. } => {
2890 unreachable!("QuantifiedPattern is handled by DataFusion engine")
2891 }
2892 LogicalPlan::LocyProgram { .. }
2893 | LogicalPlan::LocyFold { .. }
2894 | LogicalPlan::LocyBestBy { .. }
2895 | LogicalPlan::LocyPriority { .. }
2896 | LogicalPlan::LocyDerivedScan { .. }
2897 | LogicalPlan::LocyProject { .. } => {
2898 unreachable!("Locy operators are handled by DataFusion engine")
2899 }
2900 }
2901 })
2902 }
2903
2904 pub(crate) async fn execute_foreach_body_plan(
2909 &self,
2910 plan: LogicalPlan,
2911 scope: &mut HashMap<String, Value>,
2912 writer: &mut uni_store::runtime::writer::Writer,
2913 prop_manager: &PropertyManager,
2914 params: &HashMap<String, Value>,
2915 ctx: Option<&QueryContext>,
2916 ) -> Result<()> {
2917 match plan {
2918 LogicalPlan::Set { items, .. } => {
2919 self.execute_set_items_locked(&items, scope, writer, prop_manager, params, ctx)
2920 .await?;
2921 }
2922 LogicalPlan::Remove { items, .. } => {
2923 self.execute_remove_items_locked(&items, scope, writer, prop_manager, ctx)
2924 .await?;
2925 }
2926 LogicalPlan::Delete { items, detach, .. } => {
2927 for expr in &items {
2928 let val = self
2929 .evaluate_expr(expr, scope, prop_manager, params, ctx)
2930 .await?;
2931 self.execute_delete_item_locked(&val, detach, writer)
2932 .await?;
2933 }
2934 }
2935 LogicalPlan::Create { pattern, .. } => {
2936 self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2937 .await?;
2938 }
2939 LogicalPlan::CreateBatch { patterns, .. } => {
2940 for pattern in &patterns {
2941 self.execute_create_pattern(pattern, scope, writer, prop_manager, params, ctx)
2942 .await?;
2943 }
2944 }
2945 LogicalPlan::Merge {
2946 pattern,
2947 on_match: _,
2948 on_create,
2949 ..
2950 } => {
2951 self.execute_create_pattern(&pattern, scope, writer, prop_manager, params, ctx)
2952 .await?;
2953 if let Some(on_create_clause) = on_create {
2954 self.execute_set_items_locked(
2955 &on_create_clause.items,
2956 scope,
2957 writer,
2958 prop_manager,
2959 params,
2960 ctx,
2961 )
2962 .await?;
2963 }
2964 }
2965 LogicalPlan::Foreach {
2966 variable,
2967 list,
2968 body,
2969 ..
2970 } => {
2971 let list_val = self
2972 .evaluate_expr(&list, scope, prop_manager, params, ctx)
2973 .await?;
2974 let items = match list_val {
2975 Value::List(arr) => arr,
2976 Value::Null => return Ok(()),
2977 _ => return Err(anyhow!("FOREACH requires a list")),
2978 };
2979 for item in items {
2980 let mut nested_scope = scope.clone();
2981 nested_scope.insert(variable.clone(), item);
2982 for nested_plan in &body {
2983 Box::pin(self.execute_foreach_body_plan(
2984 nested_plan.clone(),
2985 &mut nested_scope,
2986 writer,
2987 prop_manager,
2988 params,
2989 ctx,
2990 ))
2991 .await?;
2992 }
2993 }
2994 }
2995 _ => {
2996 return Err(anyhow!(
2997 "Unsupported operation in FOREACH body: only SET, REMOVE, DELETE, CREATE, MERGE, and nested FOREACH are allowed"
2998 ));
2999 }
3000 }
3001 Ok(())
3002 }
3003
3004 fn canonical_row_key(row: &HashMap<String, Value>) -> String {
3005 let mut pairs: Vec<_> = row.iter().collect();
3006 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3007
3008 pairs
3009 .into_iter()
3010 .map(|(k, v)| format!("{k}={}", Self::canonical_value_key(v)))
3011 .collect::<Vec<_>>()
3012 .join("|")
3013 }
3014
3015 fn canonical_value_key(v: &Value) -> String {
3016 match v {
3017 Value::Null => "null".to_string(),
3018 Value::Bool(b) => format!("b:{b}"),
3019 Value::Int(i) => format!("n:{i}"),
3020 Value::Float(f) => {
3021 if f.is_nan() {
3022 "nan".to_string()
3023 } else if f.is_infinite() {
3024 if f.is_sign_positive() {
3025 "inf:+".to_string()
3026 } else {
3027 "inf:-".to_string()
3028 }
3029 } else if f.fract() == 0.0 && *f >= i64::MIN as f64 && *f <= i64::MAX as f64 {
3030 format!("n:{}", *f as i64)
3031 } else {
3032 format!("f:{f}")
3033 }
3034 }
3035 Value::String(s) => {
3036 if let Some(k) = Self::temporal_string_key(s) {
3037 format!("temporal:{k}")
3038 } else {
3039 format!("s:{s}")
3040 }
3041 }
3042 Value::Bytes(b) => format!("bytes:{:?}", b),
3043 Value::List(items) => format!(
3044 "list:[{}]",
3045 items
3046 .iter()
3047 .map(Self::canonical_value_key)
3048 .collect::<Vec<_>>()
3049 .join(",")
3050 ),
3051 Value::Map(map) => {
3052 let mut pairs: Vec<_> = map.iter().collect();
3053 pairs.sort_by(|(lk, _), (rk, _)| lk.cmp(rk));
3054 format!(
3055 "map:{{{}}}",
3056 pairs
3057 .into_iter()
3058 .map(|(k, v)| format!("{k}:{}", Self::canonical_value_key(v)))
3059 .collect::<Vec<_>>()
3060 .join(",")
3061 )
3062 }
3063 Value::Node(n) => {
3064 let mut labels = n.labels.clone();
3065 labels.sort();
3066 format!(
3067 "node:{}:{}:{}",
3068 n.vid.as_u64(),
3069 labels.join(":"),
3070 Self::canonical_value_key(&Value::Map(n.properties.clone()))
3071 )
3072 }
3073 Value::Edge(e) => format!(
3074 "edge:{}:{}:{}:{}:{}",
3075 e.eid.as_u64(),
3076 e.edge_type,
3077 e.src.as_u64(),
3078 e.dst.as_u64(),
3079 Self::canonical_value_key(&Value::Map(e.properties.clone()))
3080 ),
3081 Value::Path(p) => format!(
3082 "path:nodes=[{}];edges=[{}]",
3083 p.nodes
3084 .iter()
3085 .map(|n| Self::canonical_value_key(&Value::Node(n.clone())))
3086 .collect::<Vec<_>>()
3087 .join(","),
3088 p.edges
3089 .iter()
3090 .map(|e| Self::canonical_value_key(&Value::Edge(e.clone())))
3091 .collect::<Vec<_>>()
3092 .join(",")
3093 ),
3094 Value::Vector(vs) => format!("vec:{:?}", vs),
3095 Value::Temporal(t) => format!("temporal:{}", Self::canonical_temporal_key(t)),
3096 _ => format!("{v:?}"),
3097 }
3098 }
3099
3100 fn canonical_temporal_key(t: &uni_common::TemporalValue) -> String {
3101 match t {
3102 uni_common::TemporalValue::Date { days_since_epoch } => {
3103 format!("date:{days_since_epoch}")
3104 }
3105 uni_common::TemporalValue::LocalTime {
3106 nanos_since_midnight,
3107 } => format!("localtime:{nanos_since_midnight}"),
3108 uni_common::TemporalValue::Time {
3109 nanos_since_midnight,
3110 offset_seconds,
3111 } => {
3112 let utc_nanos = *nanos_since_midnight - (*offset_seconds as i64 * 1_000_000_000);
3113 format!("time:{utc_nanos}")
3114 }
3115 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3116 format!("localdatetime:{nanos_since_epoch}")
3117 }
3118 uni_common::TemporalValue::DateTime {
3119 nanos_since_epoch, ..
3120 } => format!("datetime:{nanos_since_epoch}"),
3121 uni_common::TemporalValue::Duration {
3122 months,
3123 days,
3124 nanos,
3125 } => format!("duration:{months}:{days}:{nanos}"),
3126 }
3127 }
3128
3129 fn temporal_string_key(s: &str) -> Option<String> {
3130 let fn_name = match classify_temporal(s)? {
3131 uni_common::TemporalType::Date => "DATE",
3132 uni_common::TemporalType::LocalTime => "LOCALTIME",
3133 uni_common::TemporalType::Time => "TIME",
3134 uni_common::TemporalType::LocalDateTime => "LOCALDATETIME",
3135 uni_common::TemporalType::DateTime => "DATETIME",
3136 uni_common::TemporalType::Duration => "DURATION",
3137 };
3138 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
3139 Value::Temporal(tv) => Some(Self::canonical_temporal_key(&tv)),
3140 _ => None,
3141 }
3142 }
3143
3144 pub(crate) const AGGREGATE_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3147
3148 pub(crate) async fn execute_aggregate(
3149 &self,
3150 rows: Vec<HashMap<String, Value>>,
3151 group_by: &[Expr],
3152 aggregates: &[Expr],
3153 prop_manager: &PropertyManager,
3154 params: &HashMap<String, Value>,
3155 ctx: Option<&QueryContext>,
3156 ) -> Result<Vec<HashMap<String, Value>>> {
3157 if let Some(ctx) = ctx {
3159 ctx.check_timeout()?;
3160 }
3161
3162 let mut groups: HashMap<String, (Vec<Value>, Vec<Accumulator>)> = HashMap::new();
3163
3164 if rows.is_empty() {
3167 if group_by.is_empty() {
3168 let accs = Self::create_accumulators(aggregates);
3169 let row = Self::build_aggregate_result(group_by, aggregates, &[], &accs);
3170 return Ok(vec![row]);
3171 }
3172 return Ok(vec![]);
3173 }
3174
3175 for (idx, row) in rows.into_iter().enumerate() {
3176 if idx.is_multiple_of(Self::AGGREGATE_TIMEOUT_CHECK_INTERVAL)
3178 && let Some(ctx) = ctx
3179 {
3180 ctx.check_timeout()?;
3181 }
3182
3183 let key_vals = self
3184 .evaluate_group_keys(group_by, &row, prop_manager, params, ctx)
3185 .await?;
3186 let key_str = format!(
3189 "[{}]",
3190 key_vals
3191 .iter()
3192 .map(Self::canonical_value_key)
3193 .collect::<Vec<_>>()
3194 .join(",")
3195 );
3196
3197 let entry = groups
3198 .entry(key_str)
3199 .or_insert_with(|| (key_vals, Self::create_accumulators(aggregates)));
3200
3201 self.update_accumulators(&mut entry.1, aggregates, &row, prop_manager, params, ctx)
3202 .await?;
3203 }
3204
3205 let results = groups
3206 .values()
3207 .map(|(k_vals, accs)| Self::build_aggregate_result(group_by, aggregates, k_vals, accs))
3208 .collect();
3209
3210 Ok(results)
3211 }
3212
3213 pub(crate) async fn execute_window(
3214 &self,
3215 mut rows: Vec<HashMap<String, Value>>,
3216 window_exprs: &[Expr],
3217 _prop_manager: &PropertyManager,
3218 _params: &HashMap<String, Value>,
3219 ctx: Option<&QueryContext>,
3220 ) -> Result<Vec<HashMap<String, Value>>> {
3221 if let Some(ctx) = ctx {
3223 ctx.check_timeout()?;
3224 }
3225
3226 if rows.is_empty() || window_exprs.is_empty() {
3228 return Ok(rows);
3229 }
3230
3231 for window_expr in window_exprs {
3233 let Expr::FunctionCall {
3235 name,
3236 args,
3237 window_spec: Some(window_spec),
3238 ..
3239 } = window_expr
3240 else {
3241 return Err(anyhow!(
3242 "Window expression must be a FunctionCall with OVER clause: {:?}",
3243 window_expr
3244 ));
3245 };
3246
3247 let name_upper = name.to_uppercase();
3248
3249 if !WINDOW_FUNCTIONS.contains(&name_upper.as_str()) {
3251 return Err(anyhow!(
3252 "Unsupported window function: {}. Supported functions: {}",
3253 name,
3254 WINDOW_FUNCTIONS.join(", ")
3255 ));
3256 }
3257
3258 let mut partition_map: HashMap<Vec<Value>, Vec<usize>> = HashMap::new();
3260
3261 for (row_idx, row) in rows.iter().enumerate() {
3262 let partition_key: Vec<Value> = if window_spec.partition_by.is_empty() {
3264 vec![]
3266 } else {
3267 window_spec
3268 .partition_by
3269 .iter()
3270 .map(|expr| self.evaluate_simple_expr(expr, row))
3271 .collect::<Result<Vec<_>>>()?
3272 };
3273
3274 partition_map
3275 .entry(partition_key)
3276 .or_default()
3277 .push(row_idx);
3278 }
3279
3280 for (_partition_key, row_indices) in partition_map.iter_mut() {
3282 if !window_spec.order_by.is_empty() {
3284 row_indices.sort_by(|&a, &b| {
3285 for sort_item in &window_spec.order_by {
3286 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[a]);
3287 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[b]);
3288
3289 if let (Ok(va), Ok(vb)) = (val_a, val_b) {
3290 let cmp = Executor::compare_values(&va, &vb);
3291 let cmp = if sort_item.ascending {
3292 cmp
3293 } else {
3294 cmp.reverse()
3295 };
3296 if cmp != std::cmp::Ordering::Equal {
3297 return cmp;
3298 }
3299 }
3300 }
3301 std::cmp::Ordering::Equal
3302 });
3303 }
3304
3305 for (position, &row_idx) in row_indices.iter().enumerate() {
3307 let window_value = match name_upper.as_str() {
3308 "ROW_NUMBER" => Value::from((position + 1) as i64),
3309 "RANK" => {
3310 let rank = if position == 0 {
3312 1i64
3313 } else {
3314 let prev_row_idx = row_indices[position - 1];
3315 let same_as_prev = self.rows_have_same_sort_keys(
3316 &window_spec.order_by,
3317 &rows,
3318 row_idx,
3319 prev_row_idx,
3320 );
3321
3322 if same_as_prev {
3323 let mut group_start = position - 1;
3325 while group_start > 0 {
3326 let curr_idx = row_indices[group_start];
3327 let prev_idx = row_indices[group_start - 1];
3328 if !self.rows_have_same_sort_keys(
3329 &window_spec.order_by,
3330 &rows,
3331 curr_idx,
3332 prev_idx,
3333 ) {
3334 break;
3335 }
3336 group_start -= 1;
3337 }
3338 (group_start + 1) as i64
3339 } else {
3340 (position + 1) as i64
3341 }
3342 };
3343 Value::from(rank)
3344 }
3345 "DENSE_RANK" => {
3346 let mut dense_rank = 1i64;
3348 for i in 0..position {
3349 let curr_idx = row_indices[i + 1];
3350 let prev_idx = row_indices[i];
3351 if !self.rows_have_same_sort_keys(
3352 &window_spec.order_by,
3353 &rows,
3354 curr_idx,
3355 prev_idx,
3356 ) {
3357 dense_rank += 1;
3358 }
3359 }
3360 Value::from(dense_rank)
3361 }
3362 "LAG" => {
3363 let (value_expr, offset, default_value) =
3364 self.extract_lag_lead_params("LAG", args, &rows[row_idx])?;
3365
3366 if position >= offset {
3367 let target_idx = row_indices[position - offset];
3368 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3369 } else {
3370 default_value
3371 }
3372 }
3373 "LEAD" => {
3374 let (value_expr, offset, default_value) =
3375 self.extract_lag_lead_params("LEAD", args, &rows[row_idx])?;
3376
3377 if position + offset < row_indices.len() {
3378 let target_idx = row_indices[position + offset];
3379 self.evaluate_simple_expr(value_expr, &rows[target_idx])?
3380 } else {
3381 default_value
3382 }
3383 }
3384 "NTILE" => {
3385 let num_buckets_expr = args.first().ok_or_else(|| {
3387 anyhow!("NTILE requires 1 argument: NTILE(num_buckets)")
3388 })?;
3389 let num_buckets_val =
3390 self.evaluate_simple_expr(num_buckets_expr, &rows[row_idx])?;
3391 let num_buckets = num_buckets_val.as_i64().ok_or_else(|| {
3392 anyhow!(
3393 "NTILE argument must be an integer, got: {:?}",
3394 num_buckets_val
3395 )
3396 })?;
3397
3398 if num_buckets <= 0 {
3399 return Err(anyhow!(
3400 "NTILE bucket count must be positive, got: {}",
3401 num_buckets
3402 ));
3403 }
3404
3405 let num_buckets = num_buckets as usize;
3406 let partition_size = row_indices.len();
3407
3408 let base_size = partition_size / num_buckets;
3413 let extra_rows = partition_size % num_buckets;
3414
3415 let bucket = if position < extra_rows * (base_size + 1) {
3417 position / (base_size + 1) + 1
3419 } else {
3420 let adjusted_position = position - extra_rows * (base_size + 1);
3422 extra_rows + (adjusted_position / base_size) + 1
3423 };
3424
3425 Value::from(bucket as i64)
3426 }
3427 "FIRST_VALUE" => {
3428 let value_expr = args.first().ok_or_else(|| {
3430 anyhow!("FIRST_VALUE requires 1 argument: FIRST_VALUE(expr)")
3431 })?;
3432
3433 if row_indices.is_empty() {
3435 Value::Null
3436 } else {
3437 let first_idx = row_indices[0];
3438 self.evaluate_simple_expr(value_expr, &rows[first_idx])?
3439 }
3440 }
3441 "LAST_VALUE" => {
3442 let value_expr = args.first().ok_or_else(|| {
3444 anyhow!("LAST_VALUE requires 1 argument: LAST_VALUE(expr)")
3445 })?;
3446
3447 if row_indices.is_empty() {
3449 Value::Null
3450 } else {
3451 let last_idx = row_indices[row_indices.len() - 1];
3452 self.evaluate_simple_expr(value_expr, &rows[last_idx])?
3453 }
3454 }
3455 "NTH_VALUE" => {
3456 if args.len() != 2 {
3458 return Err(anyhow!(
3459 "NTH_VALUE requires 2 arguments: NTH_VALUE(expr, n)"
3460 ));
3461 }
3462
3463 let value_expr = &args[0];
3464 let n_expr = &args[1];
3465
3466 let n_val = self.evaluate_simple_expr(n_expr, &rows[row_idx])?;
3467 let n = n_val.as_i64().ok_or_else(|| {
3468 anyhow!(
3469 "NTH_VALUE second argument must be an integer, got: {:?}",
3470 n_val
3471 )
3472 })?;
3473
3474 if n <= 0 {
3475 return Err(anyhow!(
3476 "NTH_VALUE position must be positive, got: {}",
3477 n
3478 ));
3479 }
3480
3481 let nth_index = (n - 1) as usize; if nth_index < row_indices.len() {
3483 let nth_idx = row_indices[nth_index];
3484 self.evaluate_simple_expr(value_expr, &rows[nth_idx])?
3485 } else {
3486 Value::Null
3487 }
3488 }
3489 _ => unreachable!("Window function {} already validated", name),
3490 };
3491
3492 let col_name = window_expr.to_string_repr();
3495 rows[row_idx].insert(col_name, window_value);
3496 }
3497 }
3498 }
3499
3500 Ok(rows)
3501 }
3502
3503 fn evaluate_simple_expr(&self, expr: &Expr, row: &HashMap<String, Value>) -> Result<Value> {
3508 match expr {
3509 Expr::Variable(name) => row
3510 .get(name)
3511 .cloned()
3512 .ok_or_else(|| anyhow!("Variable not found: {}", name)),
3513 Expr::Property(base, prop) => {
3514 let base_val = self.evaluate_simple_expr(base, row)?;
3515 if let Value::Map(map) = base_val {
3516 map.get(prop)
3517 .cloned()
3518 .ok_or_else(|| anyhow!("Property not found: {}", prop))
3519 } else {
3520 Err(anyhow!("Cannot access property on non-object"))
3521 }
3522 }
3523 Expr::Literal(lit) => Ok(lit.to_value()),
3524 _ => Err(anyhow!(
3525 "Unsupported expression in window function: {:?}",
3526 expr
3527 )),
3528 }
3529 }
3530
3531 fn rows_have_same_sort_keys(
3533 &self,
3534 order_by: &[uni_cypher::ast::SortItem],
3535 rows: &[HashMap<String, Value>],
3536 idx_a: usize,
3537 idx_b: usize,
3538 ) -> bool {
3539 order_by.iter().all(|sort_item| {
3540 let val_a = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_a]);
3541 let val_b = self.evaluate_simple_expr(&sort_item.expr, &rows[idx_b]);
3542 matches!((val_a, val_b), (Ok(a), Ok(b)) if a == b)
3543 })
3544 }
3545
3546 fn extract_lag_lead_params<'a>(
3548 &self,
3549 func_name: &str,
3550 args: &'a [Expr],
3551 row: &HashMap<String, Value>,
3552 ) -> Result<(&'a Expr, usize, Value)> {
3553 let value_expr = args.first().ok_or_else(|| {
3554 anyhow!(
3555 "{} requires at least 1 argument: {}(expr [, offset [, default]])",
3556 func_name,
3557 func_name
3558 )
3559 })?;
3560
3561 let offset = if let Some(offset_expr) = args.get(1) {
3562 let offset_val = self.evaluate_simple_expr(offset_expr, row)?;
3563 offset_val.as_i64().ok_or_else(|| {
3564 anyhow!(
3565 "{} offset must be an integer, got: {:?}",
3566 func_name,
3567 offset_val
3568 )
3569 })? as usize
3570 } else {
3571 1
3572 };
3573
3574 let default_value = if let Some(default_expr) = args.get(2) {
3575 self.evaluate_simple_expr(default_expr, row)?
3576 } else {
3577 Value::Null
3578 };
3579
3580 Ok((value_expr, offset, default_value))
3581 }
3582
3583 pub(crate) async fn evaluate_group_keys(
3585 &self,
3586 group_by: &[Expr],
3587 row: &HashMap<String, Value>,
3588 prop_manager: &PropertyManager,
3589 params: &HashMap<String, Value>,
3590 ctx: Option<&QueryContext>,
3591 ) -> Result<Vec<Value>> {
3592 let mut key_vals = Vec::new();
3593 for expr in group_by {
3594 key_vals.push(
3595 self.evaluate_expr(expr, row, prop_manager, params, ctx)
3596 .await?,
3597 );
3598 }
3599 Ok(key_vals)
3600 }
3601
3602 pub(crate) async fn update_accumulators(
3604 &self,
3605 accs: &mut [Accumulator],
3606 aggregates: &[Expr],
3607 row: &HashMap<String, Value>,
3608 prop_manager: &PropertyManager,
3609 params: &HashMap<String, Value>,
3610 ctx: Option<&QueryContext>,
3611 ) -> Result<()> {
3612 for (i, agg_expr) in aggregates.iter().enumerate() {
3613 if let Expr::FunctionCall { args, .. } = agg_expr {
3614 let is_wildcard = args.is_empty() || matches!(args[0], Expr::Wildcard);
3615 let val = if is_wildcard {
3616 Value::Null
3617 } else {
3618 self.evaluate_expr(&args[0], row, prop_manager, params, ctx)
3619 .await?
3620 };
3621 accs[i].update(&val, is_wildcard);
3622 }
3623 }
3624 Ok(())
3625 }
3626
3627 pub(crate) async fn execute_recursive_cte(
3629 &self,
3630 cte_name: &str,
3631 initial: LogicalPlan,
3632 recursive: LogicalPlan,
3633 prop_manager: &PropertyManager,
3634 params: &HashMap<String, Value>,
3635 ctx: Option<&QueryContext>,
3636 ) -> Result<Vec<HashMap<String, Value>>> {
3637 use std::collections::HashSet;
3638
3639 pub(crate) fn row_key(row: &HashMap<String, Value>) -> String {
3642 let mut pairs: Vec<_> = row.iter().collect();
3643 pairs.sort_by(|a, b| a.0.cmp(b.0));
3644 format!("{:?}", pairs)
3645 }
3646
3647 let mut working_table = self
3649 .execute_subplan(initial, prop_manager, params, ctx)
3650 .await?;
3651 let mut result_table = working_table.clone();
3652
3653 let mut seen: HashSet<String> = working_table.iter().map(row_key).collect();
3655
3656 let max_iterations = 1000;
3660 for _iteration in 0..max_iterations {
3661 if let Some(ctx) = ctx {
3663 ctx.check_timeout()?;
3664 }
3665
3666 if working_table.is_empty() {
3667 break;
3668 }
3669
3670 let working_val = Value::List(
3672 working_table
3673 .iter()
3674 .map(|row| {
3675 if row.len() == 1 {
3676 row.values().next().unwrap().clone()
3677 } else {
3678 Value::Map(row.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3679 }
3680 })
3681 .collect(),
3682 );
3683
3684 let mut next_params = params.clone();
3685 next_params.insert(cte_name.to_string(), working_val);
3686
3687 let next_result = self
3689 .execute_subplan(recursive.clone(), prop_manager, &next_params, ctx)
3690 .await?;
3691
3692 if next_result.is_empty() {
3693 break;
3694 }
3695
3696 let new_rows: Vec<_> = next_result
3698 .into_iter()
3699 .filter(|row| {
3700 let key = row_key(row);
3701 seen.insert(key) })
3703 .collect();
3704
3705 if new_rows.is_empty() {
3706 break;
3708 }
3709
3710 result_table.extend(new_rows.clone());
3711 working_table = new_rows;
3712 }
3713
3714 let final_list = Value::List(
3716 result_table
3717 .into_iter()
3718 .map(|row| {
3719 if row.len() == 1 {
3731 row.values().next().unwrap().clone()
3732 } else {
3733 Value::Map(row.into_iter().collect())
3734 }
3735 })
3736 .collect(),
3737 );
3738
3739 let mut final_row = HashMap::new();
3740 final_row.insert(cte_name.to_string(), final_list);
3741 Ok(vec![final_row])
3742 }
3743
3744 const SORT_TIMEOUT_CHECK_INTERVAL: usize = 1000;
3746
3747 pub(crate) async fn execute_sort(
3748 &self,
3749 rows: Vec<HashMap<String, Value>>,
3750 order_by: &[uni_cypher::ast::SortItem],
3751 prop_manager: &PropertyManager,
3752 params: &HashMap<String, Value>,
3753 ctx: Option<&QueryContext>,
3754 ) -> Result<Vec<HashMap<String, Value>>> {
3755 if let Some(ctx) = ctx {
3757 ctx.check_timeout()?;
3758 }
3759
3760 let mut rows_with_keys = Vec::with_capacity(rows.len());
3761 for (idx, row) in rows.into_iter().enumerate() {
3762 if idx.is_multiple_of(Self::SORT_TIMEOUT_CHECK_INTERVAL)
3764 && let Some(ctx) = ctx
3765 {
3766 ctx.check_timeout()?;
3767 }
3768
3769 let mut keys = Vec::new();
3770 for item in order_by {
3771 let val = row
3772 .get(&item.expr.to_string_repr())
3773 .cloned()
3774 .unwrap_or(Value::Null);
3775 let val = if val.is_null() {
3776 self.evaluate_expr(&item.expr, &row, prop_manager, params, ctx)
3777 .await
3778 .unwrap_or(Value::Null)
3779 } else {
3780 val
3781 };
3782 keys.push(val);
3783 }
3784 rows_with_keys.push((row, keys));
3785 }
3786
3787 if let Some(ctx) = ctx {
3789 ctx.check_timeout()?;
3790 }
3791
3792 rows_with_keys.sort_by(|a, b| Self::compare_sort_keys(&a.1, &b.1, order_by));
3793
3794 Ok(rows_with_keys.into_iter().map(|(r, _)| r).collect())
3795 }
3796
3797 pub(crate) fn create_accumulators(aggregates: &[Expr]) -> Vec<Accumulator> {
3799 aggregates
3800 .iter()
3801 .map(|expr| {
3802 if let Expr::FunctionCall { name, distinct, .. } = expr {
3803 Accumulator::new(name, *distinct)
3804 } else {
3805 Accumulator::new("COUNT", false)
3806 }
3807 })
3808 .collect()
3809 }
3810
3811 pub(crate) fn build_aggregate_result(
3813 group_by: &[Expr],
3814 aggregates: &[Expr],
3815 key_vals: &[Value],
3816 accs: &[Accumulator],
3817 ) -> HashMap<String, Value> {
3818 let mut res_row = HashMap::new();
3819 for (i, expr) in group_by.iter().enumerate() {
3820 res_row.insert(expr.to_string_repr(), key_vals[i].clone());
3821 }
3822 for (i, expr) in aggregates.iter().enumerate() {
3823 let col_name = crate::query::planner::aggregate_column_name(expr);
3825 res_row.insert(col_name, accs[i].finish());
3826 }
3827 res_row
3828 }
3829
3830 pub(crate) fn compare_sort_keys(
3832 a_keys: &[Value],
3833 b_keys: &[Value],
3834 order_by: &[uni_cypher::ast::SortItem],
3835 ) -> std::cmp::Ordering {
3836 for (i, item) in order_by.iter().enumerate() {
3837 let order = Self::compare_values(&a_keys[i], &b_keys[i]);
3838 if order != std::cmp::Ordering::Equal {
3839 return if item.ascending {
3840 order
3841 } else {
3842 order.reverse()
3843 };
3844 }
3845 }
3846 std::cmp::Ordering::Equal
3847 }
3848
3849 pub(crate) async fn execute_backup(
3853 &self,
3854 destination: &str,
3855 _options: &HashMap<String, Value>,
3856 ) -> Result<Vec<HashMap<String, Value>>> {
3857 if let Some(writer_arc) = &self.writer {
3859 let mut writer = writer_arc.write().await;
3860 writer.flush_to_l1(None).await?;
3861 }
3862
3863 let snapshot_manager = self.storage.snapshot_manager();
3865 let snapshot = snapshot_manager
3866 .load_latest_snapshot()
3867 .await?
3868 .ok_or_else(|| anyhow!("No snapshot found"))?;
3869
3870 if is_cloud_url(destination) {
3872 self.backup_to_cloud(destination, &snapshot.snapshot_id)
3873 .await?;
3874 } else {
3875 let validated_dest = self.validate_path(destination)?;
3877 self.backup_to_local(&validated_dest, &snapshot.snapshot_id)
3878 .await?;
3879 }
3880
3881 let mut res = HashMap::new();
3882 res.insert(
3883 "status".to_string(),
3884 Value::String("Backup completed".to_string()),
3885 );
3886 res.insert(
3887 "snapshot_id".to_string(),
3888 Value::String(snapshot.snapshot_id),
3889 );
3890 Ok(vec![res])
3891 }
3892
3893 async fn backup_to_local(&self, dest_path: &std::path::Path, _snapshot_id: &str) -> Result<()> {
3895 let source_path = std::path::Path::new(self.storage.base_path());
3896
3897 if !dest_path.exists() {
3898 std::fs::create_dir_all(dest_path)?;
3899 }
3900
3901 if source_path.exists() {
3903 Self::copy_dir_all(source_path, dest_path)?;
3904 }
3905
3906 let schema_manager = self.storage.schema_manager();
3908 let dest_catalog = dest_path.join("catalog");
3909 if !dest_catalog.exists() {
3910 std::fs::create_dir_all(&dest_catalog)?;
3911 }
3912
3913 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3914 std::fs::write(dest_catalog.join("schema.json"), schema_content)?;
3915
3916 Ok(())
3917 }
3918
3919 async fn backup_to_cloud(&self, dest_url: &str, _snapshot_id: &str) -> Result<()> {
3923 use object_store::ObjectStore;
3924 use object_store::local::LocalFileSystem;
3925 use object_store::path::Path as ObjPath;
3926
3927 let (dest_store, dest_prefix) = build_store_from_url(dest_url)?;
3928 let source_path = std::path::Path::new(self.storage.base_path());
3929
3930 let src_store: Arc<dyn ObjectStore> =
3932 Arc::new(LocalFileSystem::new_with_prefix(source_path)?);
3933
3934 let catalog_src = ObjPath::from("catalog");
3936 let catalog_dst = if dest_prefix.as_ref().is_empty() {
3937 ObjPath::from("catalog")
3938 } else {
3939 ObjPath::from(format!("{}/catalog", dest_prefix.as_ref()))
3940 };
3941 copy_store_prefix(&src_store, &dest_store, &catalog_src, &catalog_dst).await?;
3942
3943 let storage_src = ObjPath::from("storage");
3945 let storage_dst = if dest_prefix.as_ref().is_empty() {
3946 ObjPath::from("storage")
3947 } else {
3948 ObjPath::from(format!("{}/storage", dest_prefix.as_ref()))
3949 };
3950 copy_store_prefix(&src_store, &dest_store, &storage_src, &storage_dst).await?;
3951
3952 let schema_manager = self.storage.schema_manager();
3954 let schema_content = serde_json::to_string_pretty(&schema_manager.schema())?;
3955 let schema_path = if dest_prefix.as_ref().is_empty() {
3956 ObjPath::from("catalog/schema.json")
3957 } else {
3958 ObjPath::from(format!("{}/catalog/schema.json", dest_prefix.as_ref()))
3959 };
3960 dest_store
3961 .put(&schema_path, bytes::Bytes::from(schema_content).into())
3962 .await?;
3963
3964 Ok(())
3965 }
3966
3967 const MAX_BACKUP_DEPTH: usize = 100;
3972
3973 const MAX_BACKUP_FILES: usize = 100_000;
3978
3979 pub(crate) fn copy_dir_all(
3987 src: &std::path::Path,
3988 dst: &std::path::Path,
3989 ) -> std::io::Result<()> {
3990 let mut file_count = 0usize;
3991 Self::copy_dir_all_impl(src, dst, 0, &mut file_count)
3992 }
3993
3994 pub(crate) fn copy_dir_all_impl(
3996 src: &std::path::Path,
3997 dst: &std::path::Path,
3998 depth: usize,
3999 file_count: &mut usize,
4000 ) -> std::io::Result<()> {
4001 if depth >= Self::MAX_BACKUP_DEPTH {
4002 return Err(std::io::Error::new(
4003 std::io::ErrorKind::InvalidInput,
4004 format!(
4005 "Maximum backup depth {} exceeded at {:?}",
4006 Self::MAX_BACKUP_DEPTH,
4007 src
4008 ),
4009 ));
4010 }
4011
4012 std::fs::create_dir_all(dst)?;
4013
4014 for entry in std::fs::read_dir(src)? {
4015 if *file_count >= Self::MAX_BACKUP_FILES {
4016 return Err(std::io::Error::new(
4017 std::io::ErrorKind::InvalidInput,
4018 format!(
4019 "Maximum backup file count {} exceeded",
4020 Self::MAX_BACKUP_FILES
4021 ),
4022 ));
4023 }
4024 *file_count += 1;
4025
4026 let entry = entry?;
4027 let metadata = entry.metadata()?;
4028
4029 if metadata.file_type().is_symlink() {
4031 continue;
4033 }
4034
4035 let dst_path = dst.join(entry.file_name());
4036 if metadata.is_dir() {
4037 Self::copy_dir_all_impl(&entry.path(), &dst_path, depth + 1, file_count)?;
4038 } else {
4039 std::fs::copy(entry.path(), dst_path)?;
4040 }
4041 }
4042 Ok(())
4043 }
4044
4045 pub(crate) async fn execute_copy(
4046 &self,
4047 target: &str,
4048 source: &str,
4049 options: &HashMap<String, Value>,
4050 prop_manager: &PropertyManager,
4051 ) -> Result<Vec<HashMap<String, Value>>> {
4052 let format = options
4053 .get("format")
4054 .and_then(|v| v.as_str())
4055 .unwrap_or_else(|| {
4056 if source.ends_with(".parquet") {
4057 "parquet"
4058 } else {
4059 "csv"
4060 }
4061 });
4062
4063 match format.to_lowercase().as_str() {
4064 "csv" => self.execute_csv_import(target, source, options).await,
4065 "parquet" => {
4066 self.execute_parquet_import(target, source, options, prop_manager)
4067 .await
4068 }
4069 _ => Err(anyhow!("Unsupported format: {}", format)),
4070 }
4071 }
4072
4073 pub(crate) async fn execute_csv_import(
4074 &self,
4075 target: &str,
4076 source: &str,
4077 options: &HashMap<String, Value>,
4078 ) -> Result<Vec<HashMap<String, Value>>> {
4079 let validated_source = self.validate_path(source)?;
4081
4082 let writer_lock = self
4083 .writer
4084 .as_ref()
4085 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4086
4087 let schema = self.storage.schema_manager().schema();
4088
4089 let label_meta = schema.labels.get(target);
4091 let edge_meta = schema.edge_types.get(target);
4092
4093 if label_meta.is_none() && edge_meta.is_none() {
4094 return Err(anyhow!("Target '{}' not found in schema", target));
4095 }
4096
4097 let delimiter_str = options
4099 .get("delimiter")
4100 .and_then(|v| v.as_str())
4101 .unwrap_or(",");
4102 let delimiter = if delimiter_str.is_empty() {
4103 b','
4104 } else {
4105 delimiter_str.as_bytes()[0]
4106 };
4107 let has_header = options
4108 .get("header")
4109 .and_then(|v| v.as_bool())
4110 .unwrap_or(true);
4111
4112 let mut rdr = csv::ReaderBuilder::new()
4113 .delimiter(delimiter)
4114 .has_headers(has_header)
4115 .from_path(&validated_source)?;
4116
4117 let headers = rdr.headers()?.clone();
4118 let mut count = 0;
4119
4120 let mut writer = writer_lock.write().await;
4121
4122 if label_meta.is_some() {
4123 let target_props = schema
4124 .properties
4125 .get(target)
4126 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4127
4128 for result in rdr.records() {
4129 let record = result?;
4130 let mut props = HashMap::new();
4131
4132 for (i, header) in headers.iter().enumerate() {
4133 if let Some(val_str) = record.get(i)
4134 && let Some(prop_meta) = target_props.get(header)
4135 {
4136 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4137 props.insert(header.to_string(), val);
4138 }
4139 }
4140
4141 let vid = writer.next_vid().await?;
4142 writer
4143 .insert_vertex_with_labels(vid, props, &[target.to_string()])
4144 .await?;
4145 count += 1;
4146 }
4147 } else if let Some(meta) = edge_meta {
4148 let type_id = meta.id;
4149 let target_props = schema
4150 .properties
4151 .get(target)
4152 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4153
4154 let src_col = options
4157 .get("src_col")
4158 .and_then(|v| v.as_str())
4159 .unwrap_or("_src");
4160 let dst_col = options
4161 .get("dst_col")
4162 .and_then(|v| v.as_str())
4163 .unwrap_or("_dst");
4164
4165 for result in rdr.records() {
4166 let record = result?;
4167 let mut props = HashMap::new();
4168 let mut src_vid = None;
4169 let mut dst_vid = None;
4170
4171 for (i, header) in headers.iter().enumerate() {
4172 if let Some(val_str) = record.get(i) {
4173 if header == src_col {
4174 src_vid =
4175 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4176 } else if header == dst_col {
4177 dst_vid =
4178 Some(Self::vid_from_value(&Value::String(val_str.to_string()))?);
4179 } else if let Some(prop_meta) = target_props.get(header) {
4180 let val = self.parse_csv_value(val_str, &prop_meta.r#type, header)?;
4181 props.insert(header.to_string(), val);
4182 }
4183 }
4184 }
4185
4186 let src =
4187 src_vid.ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4188 let dst = dst_vid
4189 .ok_or_else(|| anyhow!("Missing destination VID in column '{}'", dst_col))?;
4190
4191 let eid = writer.next_eid(type_id).await?;
4192 writer
4193 .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4194 .await?;
4195 count += 1;
4196 }
4197 }
4198
4199 let mut res = HashMap::new();
4200 res.insert("count".to_string(), Value::Int(count as i64));
4201 Ok(vec![res])
4202 }
4203
4204 pub(crate) async fn execute_parquet_import(
4208 &self,
4209 target: &str,
4210 source: &str,
4211 options: &HashMap<String, Value>,
4212 _prop_manager: &PropertyManager,
4213 ) -> Result<Vec<HashMap<String, Value>>> {
4214 let writer_lock = self
4215 .writer
4216 .as_ref()
4217 .ok_or_else(|| anyhow!("COPY requires a Writer"))?;
4218
4219 let schema = self.storage.schema_manager().schema();
4220
4221 let label_meta = schema.labels.get(target);
4223 let edge_meta = schema.edge_types.get(target);
4224
4225 if label_meta.is_none() && edge_meta.is_none() {
4226 return Err(anyhow!("Target '{}' not found in schema", target));
4227 }
4228
4229 let reader = if is_cloud_url(source) {
4231 self.open_parquet_from_cloud(source).await?
4232 } else {
4233 let validated_source = self.validate_path(source)?;
4235 let file = std::fs::File::open(&validated_source)?;
4236 let builder =
4237 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
4238 builder.build()?
4239 };
4240 let mut reader = reader;
4241
4242 let mut count = 0;
4243 let mut writer = writer_lock.write().await;
4244
4245 if label_meta.is_some() {
4246 let target_props = schema
4247 .properties
4248 .get(target)
4249 .ok_or_else(|| anyhow!("Properties for label '{}' not found", target))?;
4250
4251 for batch in reader.by_ref() {
4252 let batch = batch?;
4253 for row in 0..batch.num_rows() {
4254 let mut props = HashMap::new();
4255 for field in batch.schema().fields() {
4256 let name = field.name();
4257 if target_props.contains_key(name) {
4258 let col = batch.column_by_name(name).unwrap();
4259 if !col.is_null(row) {
4260 let data_type = target_props.get(name).map(|pm| &pm.r#type);
4262 let val =
4263 arrow_convert::arrow_to_value(col.as_ref(), row, data_type);
4264 props.insert(name.clone(), val);
4265 }
4266 }
4267 }
4268 let vid = writer.next_vid().await?;
4269 writer
4270 .insert_vertex_with_labels(vid, props, &[target.to_string()])
4271 .await?;
4272 count += 1;
4273 }
4274 }
4275 } else if let Some(meta) = edge_meta {
4276 let type_id = meta.id;
4277 let target_props = schema
4278 .properties
4279 .get(target)
4280 .ok_or_else(|| anyhow!("Properties for edge type '{}' not found", target))?;
4281
4282 let src_col = options
4283 .get("src_col")
4284 .and_then(|v| v.as_str())
4285 .unwrap_or("_src");
4286 let dst_col = options
4287 .get("dst_col")
4288 .and_then(|v| v.as_str())
4289 .unwrap_or("_dst");
4290
4291 for batch in reader {
4292 let batch = batch?;
4293 for row in 0..batch.num_rows() {
4294 let mut props = HashMap::new();
4295 let mut src_vid = None;
4296 let mut dst_vid = None;
4297
4298 for field in batch.schema().fields() {
4299 let name = field.name();
4300 let col = batch.column_by_name(name).unwrap();
4301 if col.is_null(row) {
4302 continue;
4303 }
4304
4305 if name == src_col {
4306 let val = Self::arrow_to_value(col.as_ref(), row);
4307 src_vid = Some(Self::vid_from_value(&val)?);
4308 } else if name == dst_col {
4309 let val = Self::arrow_to_value(col.as_ref(), row);
4310 dst_vid = Some(Self::vid_from_value(&val)?);
4311 } else if let Some(pm) = target_props.get(name) {
4312 let val =
4314 arrow_convert::arrow_to_value(col.as_ref(), row, Some(&pm.r#type));
4315 props.insert(name.clone(), val);
4316 }
4317 }
4318
4319 let src = src_vid
4320 .ok_or_else(|| anyhow!("Missing source VID in column '{}'", src_col))?;
4321 let dst = dst_vid.ok_or_else(|| {
4322 anyhow!("Missing destination VID in column '{}'", dst_col)
4323 })?;
4324
4325 let eid = writer.next_eid(type_id).await?;
4326 writer
4327 .insert_edge(src, dst, type_id, eid, props, Some(target.to_string()))
4328 .await?;
4329 count += 1;
4330 }
4331 }
4332 }
4333
4334 let mut res = HashMap::new();
4335 res.insert("count".to_string(), Value::Int(count as i64));
4336 Ok(vec![res])
4337 }
4338
4339 async fn open_parquet_from_cloud(
4343 &self,
4344 source_url: &str,
4345 ) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
4346 use object_store::ObjectStore;
4347
4348 let (store, path) = build_store_from_url(source_url)?;
4349
4350 let bytes = store.get(&path).await?.bytes().await?;
4352
4353 let reader = bytes::Bytes::from(bytes.to_vec());
4355 let builder =
4356 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(reader)?;
4357 Ok(builder.build()?)
4358 }
4359
4360 pub(crate) async fn scan_edge_type(
4361 &self,
4362 edge_type: &str,
4363 ctx: Option<&QueryContext>,
4364 ) -> Result<Vec<(uni_common::core::id::Eid, Vid, Vid)>> {
4365 let mut edges: HashMap<uni_common::core::id::Eid, (Vid, Vid)> = HashMap::new();
4366
4367 self.scan_edge_type_l2(edge_type, &mut edges).await?;
4369
4370 self.scan_edge_type_l1(edge_type, &mut edges).await?;
4372
4373 if let Some(ctx) = ctx {
4375 self.scan_edge_type_l0(edge_type, ctx, &mut edges);
4376 self.filter_tombstoned_vertex_edges(ctx, &mut edges);
4377 }
4378
4379 Ok(edges
4380 .into_iter()
4381 .map(|(eid, (src, dst))| (eid, src, dst))
4382 .collect())
4383 }
4384
4385 pub(crate) async fn scan_edge_type_l2(
4390 &self,
4391 _edge_type: &str,
4392 _edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4393 ) -> Result<()> {
4394 Ok(())
4397 }
4398
4399 pub(crate) async fn scan_edge_type_l1(
4401 &self,
4402 edge_type: &str,
4403 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4404 ) -> Result<()> {
4405 use futures::TryStreamExt;
4406 use lancedb::query::{ExecutableQuery, QueryBase, Select};
4407
4408 if let Ok(ds) = self.storage.delta_dataset(edge_type, "fwd") {
4409 let lancedb_store = self.storage.lancedb_store();
4410 if let Ok(table) = ds.open_lancedb(lancedb_store).await {
4411 let query = table.query().select(Select::Columns(vec![
4412 "eid".into(),
4413 "src_vid".into(),
4414 "dst_vid".into(),
4415 "op".into(),
4416 "_version".into(),
4417 ]));
4418
4419 if let Ok(stream) = query.execute().await {
4420 let batches: Vec<arrow_array::RecordBatch> =
4421 stream.try_collect().await.unwrap_or_default();
4422
4423 let mut versioned_ops: HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)> =
4425 HashMap::new();
4426
4427 for batch in batches {
4428 self.process_delta_batch(&batch, &mut versioned_ops)?;
4429 }
4430
4431 for (eid, (_, op, src, dst)) in versioned_ops {
4433 if op == 0 {
4434 edges.insert(eid, (src, dst));
4435 } else if op == 1 {
4436 edges.remove(&eid);
4437 }
4438 }
4439 }
4440 }
4441 }
4442 Ok(())
4443 }
4444
4445 pub(crate) fn process_delta_batch(
4447 &self,
4448 batch: &arrow_array::RecordBatch,
4449 versioned_ops: &mut HashMap<uni_common::core::id::Eid, (u64, u8, Vid, Vid)>,
4450 ) -> Result<()> {
4451 use arrow_array::UInt64Array;
4452 let eid_col = batch
4453 .column_by_name("eid")
4454 .ok_or(anyhow!("Missing eid"))?
4455 .as_any()
4456 .downcast_ref::<UInt64Array>()
4457 .ok_or(anyhow!("Invalid eid"))?;
4458 let src_col = batch
4459 .column_by_name("src_vid")
4460 .ok_or(anyhow!("Missing src_vid"))?
4461 .as_any()
4462 .downcast_ref::<UInt64Array>()
4463 .ok_or(anyhow!("Invalid src_vid"))?;
4464 let dst_col = batch
4465 .column_by_name("dst_vid")
4466 .ok_or(anyhow!("Missing dst_vid"))?
4467 .as_any()
4468 .downcast_ref::<UInt64Array>()
4469 .ok_or(anyhow!("Invalid dst_vid"))?;
4470 let op_col = batch
4471 .column_by_name("op")
4472 .ok_or(anyhow!("Missing op"))?
4473 .as_any()
4474 .downcast_ref::<arrow_array::UInt8Array>()
4475 .ok_or(anyhow!("Invalid op"))?;
4476 let version_col = batch
4477 .column_by_name("_version")
4478 .ok_or(anyhow!("Missing _version"))?
4479 .as_any()
4480 .downcast_ref::<UInt64Array>()
4481 .ok_or(anyhow!("Invalid _version"))?;
4482
4483 for i in 0..batch.num_rows() {
4484 let eid = uni_common::core::id::Eid::from(eid_col.value(i));
4485 let version = version_col.value(i);
4486 let op = op_col.value(i);
4487 let src = Vid::from(src_col.value(i));
4488 let dst = Vid::from(dst_col.value(i));
4489
4490 match versioned_ops.entry(eid) {
4491 std::collections::hash_map::Entry::Vacant(e) => {
4492 e.insert((version, op, src, dst));
4493 }
4494 std::collections::hash_map::Entry::Occupied(mut e) => {
4495 if version > e.get().0 {
4496 e.insert((version, op, src, dst));
4497 }
4498 }
4499 }
4500 }
4501 Ok(())
4502 }
4503
4504 pub(crate) fn scan_edge_type_l0(
4506 &self,
4507 edge_type: &str,
4508 ctx: &QueryContext,
4509 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4510 ) {
4511 let schema = self.storage.schema_manager().schema();
4512 let type_id = schema.edge_types.get(edge_type).map(|m| m.id);
4513
4514 if let Some(type_id) = type_id {
4515 self.scan_single_l0(&ctx.l0.read(), type_id, edges);
4517
4518 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4520 self.scan_single_l0(&tx_l0_arc.read(), type_id, edges);
4521 }
4522
4523 for pending_l0_arc in &ctx.pending_flush_l0s {
4525 self.scan_single_l0(&pending_l0_arc.read(), type_id, edges);
4526 }
4527 }
4528 }
4529
4530 pub(crate) fn scan_single_l0(
4532 &self,
4533 l0: &uni_store::runtime::L0Buffer,
4534 type_id: u32,
4535 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4536 ) {
4537 for edge_entry in l0.graph.edges() {
4538 if edge_entry.edge_type == type_id {
4539 edges.insert(edge_entry.eid, (edge_entry.src_vid, edge_entry.dst_vid));
4540 }
4541 }
4542 let eids_to_check: Vec<_> = edges.keys().cloned().collect();
4544 for eid in eids_to_check {
4545 if l0.is_tombstoned(eid) {
4546 edges.remove(&eid);
4547 }
4548 }
4549 }
4550
4551 pub(crate) fn filter_tombstoned_vertex_edges(
4553 &self,
4554 ctx: &QueryContext,
4555 edges: &mut HashMap<uni_common::core::id::Eid, (Vid, Vid)>,
4556 ) {
4557 let l0 = ctx.l0.read();
4558 let mut all_vertex_tombstones = l0.vertex_tombstones.clone();
4559
4560 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
4562 let tx_l0 = tx_l0_arc.read();
4563 all_vertex_tombstones.extend(tx_l0.vertex_tombstones.iter().cloned());
4564 }
4565
4566 for pending_l0_arc in &ctx.pending_flush_l0s {
4568 let pending_l0 = pending_l0_arc.read();
4569 all_vertex_tombstones.extend(pending_l0.vertex_tombstones.iter().cloned());
4570 }
4571
4572 edges.retain(|_, (src, dst)| {
4573 !all_vertex_tombstones.contains(src) && !all_vertex_tombstones.contains(dst)
4574 });
4575 }
4576
4577 pub(crate) async fn execute_project(
4579 &self,
4580 input_rows: Vec<HashMap<String, Value>>,
4581 projections: &[(Expr, Option<String>)],
4582 prop_manager: &PropertyManager,
4583 params: &HashMap<String, Value>,
4584 ctx: Option<&QueryContext>,
4585 ) -> Result<Vec<HashMap<String, Value>>> {
4586 let mut results = Vec::new();
4587 for m in input_rows {
4588 let mut row = HashMap::new();
4589 for (expr, alias) in projections {
4590 let val = self
4591 .evaluate_expr(expr, &m, prop_manager, params, ctx)
4592 .await?;
4593 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4594 row.insert(name, val);
4595 }
4596 results.push(row);
4597 }
4598 Ok(results)
4599 }
4600
4601 pub(crate) async fn execute_unwind(
4603 &self,
4604 input_rows: Vec<HashMap<String, Value>>,
4605 expr: &Expr,
4606 variable: &str,
4607 prop_manager: &PropertyManager,
4608 params: &HashMap<String, Value>,
4609 ctx: Option<&QueryContext>,
4610 ) -> Result<Vec<HashMap<String, Value>>> {
4611 let mut results = Vec::new();
4612 for row in input_rows {
4613 let val = self
4614 .evaluate_expr(expr, &row, prop_manager, params, ctx)
4615 .await?;
4616 if let Value::List(items) = val {
4617 for item in items {
4618 let mut new_row = row.clone();
4619 new_row.insert(variable.to_string(), item);
4620 results.push(new_row);
4621 }
4622 }
4623 }
4624 Ok(results)
4625 }
4626
4627 pub(crate) async fn execute_apply(
4629 &self,
4630 input_rows: Vec<HashMap<String, Value>>,
4631 subquery: &LogicalPlan,
4632 input_filter: Option<&Expr>,
4633 prop_manager: &PropertyManager,
4634 params: &HashMap<String, Value>,
4635 ctx: Option<&QueryContext>,
4636 ) -> Result<Vec<HashMap<String, Value>>> {
4637 let mut filtered_rows = input_rows;
4638
4639 if let Some(filter) = input_filter {
4640 let mut filtered = Vec::new();
4641 for row in filtered_rows {
4642 let res = self
4643 .evaluate_expr(filter, &row, prop_manager, params, ctx)
4644 .await?;
4645 if res.as_bool().unwrap_or(false) {
4646 filtered.push(row);
4647 }
4648 }
4649 filtered_rows = filtered;
4650 }
4651
4652 if filtered_rows.is_empty() {
4655 let sub_rows = self
4656 .execute_subplan(subquery.clone(), prop_manager, params, ctx)
4657 .await?;
4658 return Ok(sub_rows);
4659 }
4660
4661 let mut results = Vec::new();
4662 for row in filtered_rows {
4663 let mut sub_params = params.clone();
4664 sub_params.extend(row.clone());
4665
4666 let sub_rows = self
4667 .execute_subplan(subquery.clone(), prop_manager, &sub_params, ctx)
4668 .await?;
4669
4670 for sub_row in sub_rows {
4671 let mut new_row = row.clone();
4672 new_row.extend(sub_row);
4673 results.push(new_row);
4674 }
4675 }
4676 Ok(results)
4677 }
4678
4679 pub(crate) fn execute_show_indexes(&self, filter: Option<&str>) -> Vec<HashMap<String, Value>> {
4681 let schema = self.storage.schema_manager().schema();
4682 let mut rows = Vec::new();
4683 for idx in &schema.indexes {
4684 let (name, type_str, details) = match idx {
4685 uni_common::core::schema::IndexDefinition::Vector(c) => (
4686 c.name.clone(),
4687 "VECTOR",
4688 format!("{:?} on {}.{}", c.index_type, c.label, c.property),
4689 ),
4690 uni_common::core::schema::IndexDefinition::FullText(c) => (
4691 c.name.clone(),
4692 "FULLTEXT",
4693 format!("on {}:{:?}", c.label, c.properties),
4694 ),
4695 uni_common::core::schema::IndexDefinition::Scalar(cfg) => (
4696 cfg.name.clone(),
4697 "SCALAR",
4698 format!(":{}({:?})", cfg.label, cfg.properties),
4699 ),
4700 _ => ("UNKNOWN".to_string(), "UNKNOWN", "".to_string()),
4701 };
4702
4703 if let Some(f) = filter
4704 && f != type_str
4705 {
4706 continue;
4707 }
4708
4709 let mut row = HashMap::new();
4710 row.insert("name".to_string(), Value::String(name));
4711 row.insert("type".to_string(), Value::String(type_str.to_string()));
4712 row.insert("details".to_string(), Value::String(details));
4713 rows.push(row);
4714 }
4715 rows
4716 }
4717
4718 pub(crate) fn execute_show_database(&self) -> Vec<HashMap<String, Value>> {
4719 let mut row = HashMap::new();
4720 row.insert("name".to_string(), Value::String("uni".to_string()));
4721 vec![row]
4723 }
4724
4725 pub(crate) fn execute_show_config(&self) -> Vec<HashMap<String, Value>> {
4726 vec![]
4728 }
4729
4730 pub(crate) async fn execute_show_statistics(&self) -> Result<Vec<HashMap<String, Value>>> {
4731 let snapshot = self
4732 .storage
4733 .snapshot_manager()
4734 .load_latest_snapshot()
4735 .await?;
4736 let mut results = Vec::new();
4737
4738 if let Some(snap) = snapshot {
4739 for (label, s) in &snap.vertices {
4740 let mut row = HashMap::new();
4741 row.insert("type".to_string(), Value::String("Label".to_string()));
4742 row.insert("name".to_string(), Value::String(label.clone()));
4743 row.insert("count".to_string(), Value::Int(s.count as i64));
4744 results.push(row);
4745 }
4746 for (edge, s) in &snap.edges {
4747 let mut row = HashMap::new();
4748 row.insert("type".to_string(), Value::String("Edge".to_string()));
4749 row.insert("name".to_string(), Value::String(edge.clone()));
4750 row.insert("count".to_string(), Value::Int(s.count as i64));
4751 results.push(row);
4752 }
4753 }
4754
4755 Ok(results)
4756 }
4757
4758 pub(crate) fn execute_show_constraints(
4759 &self,
4760 clause: ShowConstraints,
4761 ) -> Vec<HashMap<String, Value>> {
4762 let schema = self.storage.schema_manager().schema();
4763 let mut rows = Vec::new();
4764 for c in &schema.constraints {
4765 if let Some(target) = &clause.target {
4766 match (target, &c.target) {
4767 (AstConstraintTarget::Label(l1), ConstraintTarget::Label(l2)) if l1 == l2 => {}
4768 (AstConstraintTarget::EdgeType(e1), ConstraintTarget::EdgeType(e2))
4769 if e1 == e2 => {}
4770 _ => continue,
4771 }
4772 }
4773
4774 let mut row = HashMap::new();
4775 row.insert("name".to_string(), Value::String(c.name.clone()));
4776 let type_str = match c.constraint_type {
4777 ConstraintType::Unique { .. } => "UNIQUE",
4778 ConstraintType::Exists { .. } => "EXISTS",
4779 ConstraintType::Check { .. } => "CHECK",
4780 _ => "UNKNOWN",
4781 };
4782 row.insert("type".to_string(), Value::String(type_str.to_string()));
4783
4784 let target_str = match &c.target {
4785 ConstraintTarget::Label(l) => format!("(:{})", l),
4786 ConstraintTarget::EdgeType(e) => format!("[:{}]", e),
4787 _ => "UNKNOWN".to_string(),
4788 };
4789 row.insert("target".to_string(), Value::String(target_str));
4790
4791 rows.push(row);
4792 }
4793 rows
4794 }
4795
4796 pub(crate) async fn execute_cross_join(
4798 &self,
4799 left: Box<LogicalPlan>,
4800 right: Box<LogicalPlan>,
4801 prop_manager: &PropertyManager,
4802 params: &HashMap<String, Value>,
4803 ctx: Option<&QueryContext>,
4804 ) -> Result<Vec<HashMap<String, Value>>> {
4805 let left_rows = self
4806 .execute_subplan(*left, prop_manager, params, ctx)
4807 .await?;
4808 let right_rows = self
4809 .execute_subplan(*right, prop_manager, params, ctx)
4810 .await?;
4811
4812 let mut results = Vec::new();
4813 for l in &left_rows {
4814 for r in &right_rows {
4815 let mut combined = l.clone();
4816 combined.extend(r.clone());
4817 results.push(combined);
4818 }
4819 }
4820 Ok(results)
4821 }
4822
4823 pub(crate) async fn execute_union(
4825 &self,
4826 left: Box<LogicalPlan>,
4827 right: Box<LogicalPlan>,
4828 all: bool,
4829 prop_manager: &PropertyManager,
4830 params: &HashMap<String, Value>,
4831 ctx: Option<&QueryContext>,
4832 ) -> Result<Vec<HashMap<String, Value>>> {
4833 let mut left_rows = self
4834 .execute_subplan(*left, prop_manager, params, ctx)
4835 .await?;
4836 let mut right_rows = self
4837 .execute_subplan(*right, prop_manager, params, ctx)
4838 .await?;
4839
4840 left_rows.append(&mut right_rows);
4841
4842 if !all {
4843 let mut seen = HashSet::new();
4844 left_rows.retain(|row| {
4845 let sorted_row: std::collections::BTreeMap<_, _> = row.iter().collect();
4846 let key = format!("{:?}", sorted_row);
4847 seen.insert(key)
4848 });
4849 }
4850 Ok(left_rows)
4851 }
4852
4853 pub(crate) fn index_exists_by_name(&self, name: &str) -> bool {
4855 let schema = self.storage.schema_manager().schema();
4856 schema.indexes.iter().any(|idx| match idx {
4857 uni_common::core::schema::IndexDefinition::Vector(c) => c.name == name,
4858 uni_common::core::schema::IndexDefinition::FullText(c) => c.name == name,
4859 uni_common::core::schema::IndexDefinition::Scalar(c) => c.name == name,
4860 _ => false,
4861 })
4862 }
4863
4864 pub(crate) async fn execute_export(
4865 &self,
4866 target: &str,
4867 source: &str,
4868 options: &HashMap<String, Value>,
4869 prop_manager: &PropertyManager,
4870 ctx: Option<&QueryContext>,
4871 ) -> Result<Vec<HashMap<String, Value>>> {
4872 let format = options
4873 .get("format")
4874 .and_then(|v| v.as_str())
4875 .unwrap_or("csv")
4876 .to_lowercase();
4877
4878 match format.as_str() {
4879 "csv" => {
4880 self.execute_csv_export(target, source, options, prop_manager, ctx)
4881 .await
4882 }
4883 "parquet" => {
4884 self.execute_parquet_export(target, source, options, prop_manager, ctx)
4885 .await
4886 }
4887 _ => Err(anyhow!("Unsupported export format: {}", format)),
4888 }
4889 }
4890
4891 pub(crate) async fn execute_csv_export(
4892 &self,
4893 target: &str,
4894 source: &str,
4895 options: &HashMap<String, Value>,
4896 prop_manager: &PropertyManager,
4897 ctx: Option<&QueryContext>,
4898 ) -> Result<Vec<HashMap<String, Value>>> {
4899 let validated_dest = self.validate_path(source)?;
4901
4902 let schema = self.storage.schema_manager().schema();
4903 let label_meta = schema.labels.get(target);
4904 let edge_meta = schema.edge_types.get(target);
4905
4906 if label_meta.is_none() && edge_meta.is_none() {
4907 return Err(anyhow!("Target '{}' not found in schema", target));
4908 }
4909
4910 let delimiter_str = options
4911 .get("delimiter")
4912 .and_then(|v| v.as_str())
4913 .unwrap_or(",");
4914 let delimiter = if delimiter_str.is_empty() {
4915 b','
4916 } else {
4917 delimiter_str.as_bytes()[0]
4918 };
4919 let has_header = options
4920 .get("header")
4921 .and_then(|v| v.as_bool())
4922 .unwrap_or(true);
4923
4924 let mut wtr = csv::WriterBuilder::new()
4925 .delimiter(delimiter)
4926 .from_path(&validated_dest)?;
4927
4928 let mut count = 0;
4929 let empty_props = HashMap::new();
4931
4932 if let Some(meta) = label_meta {
4933 let label_id = meta.id;
4934 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4935 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4936 prop_names.sort();
4937
4938 let mut headers = vec!["_vid".to_string()];
4939 headers.extend(prop_names.clone());
4940
4941 if has_header {
4942 wtr.write_record(&headers)?;
4943 }
4944
4945 let vids = self
4946 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
4947 .await?;
4948
4949 for vid in vids {
4950 let props = prop_manager
4951 .get_all_vertex_props_with_ctx(vid, ctx)
4952 .await?
4953 .unwrap_or_default();
4954
4955 let mut row = Vec::with_capacity(headers.len());
4956 row.push(vid.to_string());
4957 for p_name in &prop_names {
4958 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4959 row.push(self.format_csv_value(val));
4960 }
4961 wtr.write_record(&row)?;
4962 count += 1;
4963 }
4964 } else if let Some(meta) = edge_meta {
4965 let props_meta = schema.properties.get(target).unwrap_or(&empty_props);
4966 let mut prop_names: Vec<_> = props_meta.keys().cloned().collect();
4967 prop_names.sort();
4968
4969 let mut headers = vec![
4971 "_eid".to_string(),
4972 "_src".to_string(),
4973 "_dst".to_string(),
4974 "_type".to_string(),
4975 ];
4976 headers.extend(prop_names.clone());
4977
4978 if has_header {
4979 wtr.write_record(&headers)?;
4980 }
4981
4982 let edges = self.scan_edge_type(target, ctx).await?;
4983
4984 for (eid, src, dst) in edges {
4985 let props = prop_manager
4986 .get_all_edge_props_with_ctx(eid, ctx)
4987 .await?
4988 .unwrap_or_default();
4989
4990 let mut row = Vec::with_capacity(headers.len());
4991 row.push(eid.to_string());
4992 row.push(src.to_string());
4993 row.push(dst.to_string());
4994 row.push(meta.id.to_string());
4995
4996 for p_name in &prop_names {
4997 let val = props.get(p_name).cloned().unwrap_or(Value::Null);
4998 row.push(self.format_csv_value(val));
4999 }
5000 wtr.write_record(&row)?;
5001 count += 1;
5002 }
5003 }
5004
5005 wtr.flush()?;
5006 let mut res = HashMap::new();
5007 res.insert("count".to_string(), Value::Int(count as i64));
5008 Ok(vec![res])
5009 }
5010
5011 pub(crate) async fn execute_parquet_export(
5015 &self,
5016 target: &str,
5017 destination: &str,
5018 _options: &HashMap<String, Value>,
5019 prop_manager: &PropertyManager,
5020 ctx: Option<&QueryContext>,
5021 ) -> Result<Vec<HashMap<String, Value>>> {
5022 let schema_manager = self.storage.schema_manager();
5023 let schema = schema_manager.schema();
5024 let label_meta = schema.labels.get(target);
5025 let edge_meta = schema.edge_types.get(target);
5026
5027 if label_meta.is_none() && edge_meta.is_none() {
5028 return Err(anyhow!("Target '{}' not found in schema", target));
5029 }
5030
5031 let arrow_schema = if label_meta.is_some() {
5032 let dataset = self.storage.vertex_dataset(target)?;
5033 dataset.get_arrow_schema(&schema)?
5034 } else {
5035 let dataset = self.storage.edge_dataset(target, "", "")?;
5037 dataset.get_arrow_schema(&schema)?
5038 };
5039
5040 let mut rows: Vec<HashMap<String, uni_common::Value>> = Vec::new();
5041
5042 if let Some(meta) = label_meta {
5043 let label_id = meta.id;
5044 let vids = self
5045 .scan_label_with_filter(label_id, "n", None, ctx, prop_manager, &HashMap::new())
5046 .await?;
5047
5048 for vid in vids {
5049 let mut props = prop_manager
5050 .get_all_vertex_props_with_ctx(vid, ctx)
5051 .await?
5052 .unwrap_or_default();
5053
5054 props.insert(
5055 "_vid".to_string(),
5056 uni_common::Value::Int(vid.as_u64() as i64),
5057 );
5058 if !props.contains_key("_uid") {
5059 props.insert(
5060 "_uid".to_string(),
5061 uni_common::Value::List(vec![uni_common::Value::Int(0); 32]),
5062 );
5063 }
5064 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5065 props.insert("_version".to_string(), uni_common::Value::Int(1));
5066 rows.push(props);
5067 }
5068 } else if edge_meta.is_some() {
5069 let edges = self.scan_edge_type(target, ctx).await?;
5070 for (eid, src, dst) in edges {
5071 let mut props = prop_manager
5072 .get_all_edge_props_with_ctx(eid, ctx)
5073 .await?
5074 .unwrap_or_default();
5075
5076 props.insert(
5077 "eid".to_string(),
5078 uni_common::Value::Int(eid.as_u64() as i64),
5079 );
5080 props.insert(
5081 "src_vid".to_string(),
5082 uni_common::Value::Int(src.as_u64() as i64),
5083 );
5084 props.insert(
5085 "dst_vid".to_string(),
5086 uni_common::Value::Int(dst.as_u64() as i64),
5087 );
5088 props.insert("_deleted".to_string(), uni_common::Value::Bool(false));
5089 props.insert("_version".to_string(), uni_common::Value::Int(1));
5090 rows.push(props);
5091 }
5092 }
5093
5094 if is_cloud_url(destination) {
5096 self.write_parquet_to_cloud(destination, &rows, &arrow_schema)
5097 .await?;
5098 } else {
5099 let validated_dest = self.validate_path(destination)?;
5101 let file = std::fs::File::create(&validated_dest)?;
5102 let mut writer =
5103 parquet::arrow::ArrowWriter::try_new(file, arrow_schema.clone(), None)?;
5104
5105 if !rows.is_empty() {
5107 let batch = self.rows_to_batch(&rows, &arrow_schema)?;
5108 writer.write(&batch)?;
5109 }
5110
5111 writer.close()?;
5112 }
5113
5114 let mut res = HashMap::new();
5115 res.insert("count".to_string(), Value::Int(rows.len() as i64));
5116 Ok(vec![res])
5117 }
5118
5119 async fn write_parquet_to_cloud(
5121 &self,
5122 dest_url: &str,
5123 rows: &[HashMap<String, uni_common::Value>],
5124 arrow_schema: &arrow_schema::Schema,
5125 ) -> Result<()> {
5126 use object_store::ObjectStore;
5127
5128 let (store, path) = build_store_from_url(dest_url)?;
5129
5130 let mut buffer = Vec::new();
5132 {
5133 let mut writer = parquet::arrow::ArrowWriter::try_new(
5134 &mut buffer,
5135 Arc::new(arrow_schema.clone()),
5136 None,
5137 )?;
5138
5139 if !rows.is_empty() {
5140 let batch = self.rows_to_batch(rows, arrow_schema)?;
5141 writer.write(&batch)?;
5142 }
5143
5144 writer.close()?;
5145 }
5146
5147 store.put(&path, bytes::Bytes::from(buffer).into()).await?;
5149
5150 Ok(())
5151 }
5152
5153 pub(crate) fn rows_to_batch(
5154 &self,
5155 rows: &[HashMap<String, uni_common::Value>],
5156 schema: &arrow_schema::Schema,
5157 ) -> Result<RecordBatch> {
5158 let mut columns: Vec<Arc<dyn Array>> = Vec::new();
5159
5160 for field in schema.fields() {
5161 let name = field.name();
5162 let dt = field.data_type();
5163
5164 let values: Vec<uni_common::Value> = rows
5165 .iter()
5166 .map(|row| row.get(name).cloned().unwrap_or(uni_common::Value::Null))
5167 .collect();
5168 let array = self.values_to_array(&values, dt)?;
5169 columns.push(array);
5170 }
5171
5172 Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
5173 }
5174
5175 pub(crate) fn values_to_array(
5178 &self,
5179 values: &[uni_common::Value],
5180 dt: &arrow_schema::DataType,
5181 ) -> Result<Arc<dyn Array>> {
5182 arrow_convert::values_to_array(values, dt)
5183 }
5184
5185 pub(crate) fn format_csv_value(&self, val: Value) -> String {
5186 match val {
5187 Value::Null => "".to_string(),
5188 Value::String(s) => s,
5189 Value::Int(i) => i.to_string(),
5190 Value::Float(f) => f.to_string(),
5191 Value::Bool(b) => b.to_string(),
5192 _ => format!("{}", val),
5193 }
5194 }
5195
5196 pub(crate) fn parse_csv_value(
5197 &self,
5198 s: &str,
5199 data_type: &uni_common::core::schema::DataType,
5200 prop_name: &str,
5201 ) -> Result<Value> {
5202 if s.is_empty() || s.to_lowercase() == "null" {
5203 return Ok(Value::Null);
5204 }
5205
5206 use uni_common::core::schema::DataType;
5207 match data_type {
5208 DataType::String => Ok(Value::String(s.to_string())),
5209 DataType::Int32 | DataType::Int64 => {
5210 let i = s.parse::<i64>().map_err(|_| {
5211 anyhow!(
5212 "Failed to parse integer for property '{}': {}",
5213 prop_name,
5214 s
5215 )
5216 })?;
5217 Ok(Value::Int(i))
5218 }
5219 DataType::Float32 | DataType::Float64 => {
5220 let f = s.parse::<f64>().map_err(|_| {
5221 anyhow!("Failed to parse float for property '{}': {}", prop_name, s)
5222 })?;
5223 Ok(Value::Float(f))
5224 }
5225 DataType::Bool => {
5226 let b = s.to_lowercase().parse::<bool>().map_err(|_| {
5227 anyhow!(
5228 "Failed to parse boolean for property '{}': {}",
5229 prop_name,
5230 s
5231 )
5232 })?;
5233 Ok(Value::Bool(b))
5234 }
5235 DataType::CypherValue => {
5236 let json_val: serde_json::Value = serde_json::from_str(s).map_err(|_| {
5237 anyhow!("Failed to parse JSON for property '{}': {}", prop_name, s)
5238 })?;
5239 Ok(Value::from(json_val))
5240 }
5241 DataType::Vector { .. } => {
5242 let v: Vec<f32> = serde_json::from_str(s).map_err(|_| {
5243 anyhow!("Failed to parse Vector for property '{}': {}", prop_name, s)
5244 })?;
5245 Ok(Value::Vector(v))
5246 }
5247 _ => Ok(Value::String(s.to_string())),
5248 }
5249 }
5250
5251 pub(crate) async fn detach_delete_vertex(&self, vid: Vid, writer: &mut Writer) -> Result<()> {
5252 let schema = self.storage.schema_manager().schema();
5253 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5254
5255 let out_graph = self
5257 .storage
5258 .load_subgraph_cached(
5259 &[vid],
5260 &edge_type_ids,
5261 1,
5262 uni_store::runtime::Direction::Outgoing,
5263 Some(writer.l0_manager.get_current()),
5264 )
5265 .await?;
5266
5267 for edge in out_graph.edges() {
5268 writer
5269 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5270 .await?;
5271 }
5272
5273 let in_graph = self
5275 .storage
5276 .load_subgraph_cached(
5277 &[vid],
5278 &edge_type_ids,
5279 1,
5280 uni_store::runtime::Direction::Incoming,
5281 Some(writer.l0_manager.get_current()),
5282 )
5283 .await?;
5284
5285 for edge in in_graph.edges() {
5286 writer
5287 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5288 .await?;
5289 }
5290
5291 Ok(())
5292 }
5293
5294 pub(crate) async fn batch_detach_delete_vertices(
5296 &self,
5297 vids: &[Vid],
5298 labels_per_vid: Vec<Option<Vec<String>>>,
5299 writer: &mut Writer,
5300 ) -> Result<()> {
5301 let schema = self.storage.schema_manager().schema();
5302 let edge_type_ids: Vec<u32> = schema.all_edge_type_ids();
5303
5304 let out_graph = self
5306 .storage
5307 .load_subgraph_cached(
5308 vids,
5309 &edge_type_ids,
5310 1,
5311 uni_store::runtime::Direction::Outgoing,
5312 Some(writer.l0_manager.get_current()),
5313 )
5314 .await?;
5315
5316 for edge in out_graph.edges() {
5317 writer
5318 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5319 .await?;
5320 }
5321
5322 let in_graph = self
5324 .storage
5325 .load_subgraph_cached(
5326 vids,
5327 &edge_type_ids,
5328 1,
5329 uni_store::runtime::Direction::Incoming,
5330 Some(writer.l0_manager.get_current()),
5331 )
5332 .await?;
5333
5334 for edge in in_graph.edges() {
5335 writer
5336 .delete_edge(edge.eid, edge.src_vid, edge.dst_vid, edge.edge_type)
5337 .await?;
5338 }
5339
5340 for (vid, labels) in vids.iter().zip(labels_per_vid) {
5342 writer.delete_vertex(*vid, labels).await?;
5343 }
5344
5345 Ok(())
5346 }
5347}