1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
36use crate::query::df_graph::bind_fixed_path::BindFixedPathExec;
37use crate::query::df_graph::bind_zero_length_path::BindZeroLengthPathExec;
38use crate::query::df_graph::mutation_common::{MutationKind, extended_schema_for_new_vars};
39use crate::query::df_graph::mutation_create::new_create_exec;
40use crate::query::df_graph::mutation_delete::new_delete_exec;
41use crate::query::df_graph::mutation_merge::new_merge_exec;
42use crate::query::df_graph::mutation_remove::new_remove_exec;
43use crate::query::df_graph::mutation_set::new_set_exec;
44use crate::query::df_graph::recursive_cte::RecursiveCTEExec;
45use crate::query::df_graph::traverse::{
46 GraphVariableLengthTraverseExec, GraphVariableLengthTraverseMainExec,
47};
48use crate::query::df_graph::{
49 GraphApplyExec, GraphExecutionContext, GraphExtIdLookupExec, GraphProcedureCallExec,
50 GraphScanExec, GraphShortestPathExec, GraphTraverseExec, GraphTraverseMainExec,
51 GraphUnwindExec, GraphVectorKnnExec, L0Context, MutationContext, MutationExec,
52 OptionalFilterExec,
53};
54use crate::query::planner::{LogicalPlan, aggregate_column_name, collect_properties_from_plan};
55use anyhow::{Result, anyhow};
56use arrow_schema::{DataType, Schema, SchemaRef};
57use datafusion::common::JoinType;
58use datafusion::execution::SessionState;
59use datafusion::logical_expr::{Expr as DfExpr, ExprSchemable, SortExpr as DfSortExpr};
60use datafusion::physical_expr::{create_physical_expr, create_physical_sort_exprs};
61use datafusion::physical_plan::ExecutionPlan;
62use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
63use datafusion::physical_plan::filter::FilterExec;
64use datafusion::physical_plan::joins::NestedLoopJoinExec;
65use datafusion::physical_plan::limit::LocalLimitExec;
66use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
67use datafusion::physical_plan::projection::ProjectionExec;
68use datafusion::physical_plan::sorts::sort::SortExec;
69use datafusion::physical_plan::udaf::AggregateFunctionExpr;
70use datafusion::physical_plan::union::UnionExec;
71use datafusion::prelude::SessionContext;
72use parking_lot::RwLock;
73use std::collections::{HashMap, HashSet};
74use std::sync::Arc;
75use std::sync::atomic::{AtomicU64, Ordering};
76use uni_algo::algo::AlgorithmRegistry;
77use uni_common::core::schema::{PropertyMeta, Schema as UniSchema};
78use uni_cypher::ast::{
79 CypherLiteral, Direction as AstDirection, Expr, Pattern, PatternElement, SortItem,
80};
81use uni_store::runtime::l0::L0Buffer;
82use uni_store::runtime::property_manager::PropertyManager;
83use uni_store::storage::direction::Direction;
84use uni_store::storage::manager::StorageManager;
85use uni_xervo::runtime::ModelRuntime;
86
87type PhysicalAggregate = (
89 Arc<AggregateFunctionExpr>,
90 Option<Arc<dyn datafusion::physical_expr::PhysicalExpr>>,
91);
92
93pub struct HybridPhysicalPlanner {
113 session_ctx: Arc<RwLock<SessionContext>>,
115
116 storage: Arc<StorageManager>,
118
119 graph_ctx: Arc<GraphExecutionContext>,
121
122 schema: Arc<UniSchema>,
124
125 last_flush_version: AtomicU64,
127
128 params: HashMap<String, uni_common::Value>,
130
131 outer_values: HashMap<String, uni_common::Value>,
135
136 mutation_ctx: Option<Arc<MutationContext>>,
139}
140
141impl std::fmt::Debug for HybridPhysicalPlanner {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("HybridPhysicalPlanner")
144 .field(
145 "last_flush_version",
146 &self.last_flush_version.load(Ordering::Relaxed),
147 )
148 .finish_non_exhaustive()
149 }
150}
151
152impl HybridPhysicalPlanner {
153 pub fn new(
163 session_ctx: Arc<RwLock<SessionContext>>,
164 storage: Arc<StorageManager>,
165 l0: Arc<RwLock<L0Buffer>>,
166 property_manager: Arc<PropertyManager>,
167 schema: Arc<UniSchema>,
168 params: HashMap<String, uni_common::Value>,
169 ) -> Self {
170 let graph_ctx = Arc::new(GraphExecutionContext::new(
171 storage.clone(),
172 l0,
173 property_manager,
174 ));
175
176 Self {
177 session_ctx,
178 storage,
179 graph_ctx,
180 schema,
181 last_flush_version: AtomicU64::new(0),
182 params,
183 outer_values: HashMap::new(),
184 mutation_ctx: None,
185 }
186 }
187
188 fn resolve_properties(
194 &self,
195 variable: &str,
196 schema_name: &str,
197 all_properties: &HashMap<String, HashSet<String>>,
198 ) -> Vec<String> {
199 const SYSTEM_COLUMNS: &[&str] =
201 &["_vid", "_labels", "_eid", "_src_vid", "_dst_vid", "_type"];
202
203 all_properties
204 .get(variable)
205 .map(|props| {
206 if props.contains("*") {
207 let schema_props: Vec<String> = self
208 .schema
209 .properties
210 .get(schema_name)
211 .map(|p| p.keys().cloned().collect())
212 .unwrap_or_default();
213
214 let explicit: Vec<String> = props
216 .iter()
217 .filter(|p| *p != "*" && !p.starts_with('_'))
218 .cloned()
219 .collect();
220
221 if schema_props.is_empty() && explicit.is_empty() {
222 return vec!["*".to_string()];
224 }
225
226 let mut combined: Vec<String> = schema_props;
228 for p in explicit {
229 if !combined.contains(&p) {
230 combined.push(p);
231 }
232 }
233 combined.retain(|p| !SYSTEM_COLUMNS.contains(&p.as_str()));
234 combined.sort();
235 combined
236 } else {
237 let mut explicit_props: Vec<String> = props
238 .iter()
239 .filter(|p| *p != "*" && !SYSTEM_COLUMNS.contains(&p.as_str()))
240 .cloned()
241 .collect();
242 explicit_props.sort();
243 explicit_props
244 }
245 })
246 .unwrap_or_default()
247 }
248
249 pub fn with_l0_context(
251 session_ctx: Arc<RwLock<SessionContext>>,
252 storage: Arc<StorageManager>,
253 l0_context: L0Context,
254 property_manager: Arc<PropertyManager>,
255 schema: Arc<UniSchema>,
256 params: HashMap<String, uni_common::Value>,
257 outer_values: HashMap<String, uni_common::Value>,
258 ) -> Self {
259 let graph_ctx = Arc::new(GraphExecutionContext::with_l0_context(
260 storage.clone(),
261 l0_context,
262 property_manager,
263 ));
264
265 Self {
266 session_ctx,
267 storage,
268 graph_ctx,
269 schema,
270 last_flush_version: AtomicU64::new(0),
271 params,
272 outer_values,
273 mutation_ctx: None,
274 }
275 }
276
277 fn take_graph_ctx(&mut self) -> GraphExecutionContext {
281 let algo_registry = self.graph_ctx.algo_registry().cloned();
282 let procedure_registry = self.graph_ctx.procedure_registry().cloned();
283 let xervo_runtime = self.graph_ctx.xervo_runtime().cloned();
284
285 let new_base = |ctx: &Arc<GraphExecutionContext>| {
286 GraphExecutionContext::with_l0_context(
287 ctx.storage().clone(),
288 ctx.l0_context().clone(),
289 ctx.property_manager().clone(),
290 )
291 };
292 let placeholder = Arc::new(new_base(&self.graph_ctx));
293 let arc = std::mem::replace(&mut self.graph_ctx, placeholder);
294 let mut ctx = Arc::try_unwrap(arc).unwrap_or_else(|arc| new_base(&arc));
295
296 if let Some(registry) = algo_registry {
297 ctx = ctx.with_algo_registry(registry);
298 }
299 if let Some(registry) = procedure_registry {
300 ctx = ctx.with_procedure_registry(registry);
301 }
302 if let Some(runtime) = xervo_runtime {
303 ctx = ctx.with_xervo_runtime(runtime);
304 }
305 ctx
306 }
307
308 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
312 let ctx = self.take_graph_ctx().with_algo_registry(registry);
313 self.graph_ctx = Arc::new(ctx);
314 self
315 }
316
317 pub fn with_procedure_registry(
321 mut self,
322 registry: Arc<crate::query::executor::procedure::ProcedureRegistry>,
323 ) -> Self {
324 let ctx = self.take_graph_ctx().with_procedure_registry(registry);
325 self.graph_ctx = Arc::new(ctx);
326 self
327 }
328
329 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
331 let ctx = self.take_graph_ctx().with_xervo_runtime(runtime);
332 self.graph_ctx = Arc::new(ctx);
333 self
334 }
335
336 pub fn with_mutation_context(mut self, ctx: Arc<MutationContext>) -> Self {
338 self.mutation_ctx = Some(ctx);
339 self
340 }
341
342 pub fn graph_ctx(&self) -> &Arc<GraphExecutionContext> {
344 &self.graph_ctx
345 }
346
347 pub fn session_ctx(&self) -> &Arc<RwLock<SessionContext>> {
349 &self.session_ctx
350 }
351
352 pub fn storage(&self) -> &Arc<StorageManager> {
354 &self.storage
355 }
356
357 pub fn schema_info(&self) -> &Arc<UniSchema> {
359 &self.schema
360 }
361
362 fn require_mutation_ctx(&self) -> Result<Arc<MutationContext>> {
364 self.mutation_ctx.clone().ok_or_else(|| {
365 tracing::error!(
366 "Mutation context not set — this indicates a routing bug where a write \
367 operation was sent to the DataFusion engine without a MutationContext"
368 );
369 anyhow!("Mutation context not set — write operations require a MutationContext")
370 })
371 }
372
373 fn translation_context_for_plan(&self, plan: &LogicalPlan) -> TranslationContext {
378 let mut variable_kinds = HashMap::new();
379 let mut variable_labels = HashMap::new();
380 let mut node_variable_hints = Vec::new();
381 let mut mutation_edge_hints = Vec::new();
382 collect_variable_kinds(plan, &mut variable_kinds);
383 collect_mutation_node_hints(plan, &mut node_variable_hints);
384 collect_mutation_edge_hints(plan, &mut mutation_edge_hints);
385 self.collect_variable_labels(plan, &mut variable_labels);
386 TranslationContext {
387 parameters: self.params.clone(),
388 outer_values: self.outer_values.clone(),
389 variable_labels,
390 variable_kinds,
391 node_variable_hints,
392 mutation_edge_hints,
393 ..Default::default()
394 }
395 }
396
397 fn collect_variable_labels(&self, plan: &LogicalPlan, labels: &mut HashMap<String, String>) {
403 match plan {
404 LogicalPlan::Scan {
405 variable,
406 labels: scan_labels,
407 ..
408 }
409 | LogicalPlan::ScanMainByLabels {
410 variable,
411 labels: scan_labels,
412 ..
413 } => {
414 if let Some(first) = scan_labels.first() {
415 labels.insert(variable.clone(), first.clone());
416 }
417 }
418 LogicalPlan::Traverse {
419 input,
420 step_variable,
421 edge_type_ids,
422 target_variable,
423 target_label_id,
424 ..
425 } => {
426 self.collect_variable_labels(input, labels);
427 if let Some(sv) = step_variable
428 && edge_type_ids.len() == 1
429 && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
430 {
431 labels.insert(sv.clone(), name.to_string());
432 }
433 if *target_label_id != 0
434 && let Some(name) = self.schema.label_name_by_id(*target_label_id)
435 {
436 labels.insert(target_variable.clone(), name.to_string());
437 }
438 }
439 LogicalPlan::TraverseMainByType {
440 input,
441 step_variable,
442 type_names,
443 ..
444 } => {
445 self.collect_variable_labels(input, labels);
446 if let Some(sv) = step_variable
447 && type_names.len() == 1
448 {
449 labels.insert(sv.clone(), type_names[0].clone());
450 }
451 }
452 LogicalPlan::Filter { input, .. }
454 | LogicalPlan::Project { input, .. }
455 | LogicalPlan::Sort { input, .. }
456 | LogicalPlan::Limit { input, .. }
457 | LogicalPlan::Aggregate { input, .. }
458 | LogicalPlan::Distinct { input, .. }
459 | LogicalPlan::Window { input, .. }
460 | LogicalPlan::Unwind { input, .. }
461 | LogicalPlan::Create { input, .. }
462 | LogicalPlan::CreateBatch { input, .. }
463 | LogicalPlan::Merge { input, .. }
464 | LogicalPlan::Set { input, .. }
465 | LogicalPlan::Remove { input, .. }
466 | LogicalPlan::Delete { input, .. }
467 | LogicalPlan::Foreach { input, .. }
468 | LogicalPlan::SubqueryCall { input, .. } => {
469 self.collect_variable_labels(input, labels);
470 }
471 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
472 self.collect_variable_labels(left, labels);
473 self.collect_variable_labels(right, labels);
474 }
475 LogicalPlan::Apply {
476 input, subquery, ..
477 } => {
478 self.collect_variable_labels(input, labels);
479 self.collect_variable_labels(subquery, labels);
480 }
481 LogicalPlan::Explain { plan } => {
482 self.collect_variable_labels(plan, labels);
483 }
484 _ => {}
485 }
486 }
487
488 fn merged_edge_type_properties(&self, edge_type_ids: &[u32]) -> HashMap<String, PropertyMeta> {
489 crate::query::df_graph::common::merged_edge_schema_props(&self.schema, edge_type_ids)
490 }
491
492 pub fn plan(&self, logical: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
506 let all_properties = collect_properties_from_plan(logical);
508
509 self.plan_internal(logical, &all_properties)
511 }
512
513 pub fn plan_with_properties(
519 &self,
520 logical: &LogicalPlan,
521 extra_properties: HashMap<String, HashSet<String>>,
522 ) -> Result<Arc<dyn ExecutionPlan>> {
523 let mut all_properties = collect_properties_from_plan(logical);
524 for (var, props) in extra_properties {
525 all_properties.entry(var).or_default().extend(props);
526 }
527 self.plan_internal(logical, &all_properties)
528 }
529
530 fn wrap_optional(
537 &self,
538 plan: Arc<dyn ExecutionPlan>,
539 optional: bool,
540 ) -> Result<Arc<dyn ExecutionPlan>> {
541 if !optional {
542 return Ok(plan);
543 }
544
545 let empty_schema = Arc::new(Schema::empty());
547 let placeholder = Arc::new(PlaceholderRowExec::new(empty_schema));
548
549 Ok(Arc::new(NestedLoopJoinExec::try_new(
552 placeholder,
553 plan,
554 None, &JoinType::Left,
556 None, )?))
558 }
559
560 fn plan_internal(
561 &self,
562 logical: &LogicalPlan,
563 all_properties: &HashMap<String, HashSet<String>>,
564 ) -> Result<Arc<dyn ExecutionPlan>> {
565 match logical {
566 LogicalPlan::Scan {
568 label_id,
569 labels,
570 variable,
571 filter,
572 optional,
573 } => {
574 if labels.len() > 1 {
575 self.plan_multi_label_scan(
577 labels,
578 variable,
579 filter.as_ref(),
580 *optional,
581 all_properties,
582 )
583 } else {
584 self.plan_scan(
586 *label_id,
587 variable,
588 filter.as_ref(),
589 *optional,
590 all_properties,
591 )
592 }
593 }
594
595 LogicalPlan::ScanMainByLabels {
597 labels,
598 variable,
599 filter,
600 optional,
601 } => {
602 if labels.len() > 1 {
603 self.plan_multi_label_scan(
605 labels,
606 variable,
607 filter.as_ref(),
608 *optional,
609 all_properties,
610 )
611 } else if let Some(label_name) = labels.first() {
612 self.plan_schemaless_scan(
614 label_name,
615 variable,
616 filter.as_ref(),
617 *optional,
618 all_properties,
619 )
620 } else {
621 self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties)
623 }
624 }
625
626 LogicalPlan::ScanAll {
628 variable,
629 filter,
630 optional,
631 } => self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties),
632
633 LogicalPlan::TraverseMainByType {
635 type_names,
636 input,
637 direction,
638 source_variable,
639 target_variable,
640 step_variable,
641 min_hops,
642 max_hops,
643 optional,
644 target_filter,
645 path_variable,
646 is_variable_length,
647 scope_match_variables,
648 optional_pattern_vars,
649 edge_filter_expr,
650 path_mode,
651 ..
652 } => {
653 if *is_variable_length {
654 let vlp_plan = self.plan_traverse_main_by_type_vlp(
655 input,
656 type_names,
657 direction.clone(),
658 source_variable,
659 target_variable,
660 step_variable.as_deref(),
661 *min_hops,
662 *max_hops,
663 path_variable.as_deref(),
664 *optional,
665 all_properties,
666 edge_filter_expr.as_ref(),
667 path_mode,
668 scope_match_variables,
669 )?;
670 self.apply_schemaless_traverse_filter(
671 vlp_plan,
672 target_filter.as_ref(),
673 source_variable,
674 target_variable,
675 step_variable.as_deref(),
676 path_variable.as_deref(),
677 true, *optional,
679 optional_pattern_vars,
680 )
681 } else {
682 let base_plan = self.plan_traverse_main_by_type(
683 input,
684 type_names,
685 direction.clone(),
686 source_variable,
687 target_variable,
688 step_variable.as_deref(),
689 *optional,
690 optional_pattern_vars,
691 all_properties,
692 scope_match_variables,
693 )?;
694 let edge_filtered = self.apply_schemaless_traverse_filter(
698 base_plan,
699 edge_filter_expr.as_ref(),
700 source_variable,
701 target_variable,
702 step_variable.as_deref(),
703 path_variable.as_deref(),
704 false,
705 *optional,
706 optional_pattern_vars,
707 )?;
708 self.apply_schemaless_traverse_filter(
709 edge_filtered,
710 target_filter.as_ref(),
711 source_variable,
712 target_variable,
713 step_variable.as_deref(),
714 path_variable.as_deref(),
715 false,
716 *optional,
717 optional_pattern_vars,
718 )
719 }
720 }
721
722 LogicalPlan::Traverse {
723 input,
724 edge_type_ids,
725 direction,
726 source_variable,
727 target_variable,
728 target_label_id,
729 step_variable,
730 min_hops,
731 max_hops,
732 optional,
733 target_filter,
734 path_variable,
735 is_variable_length,
736 optional_pattern_vars,
737 scope_match_variables,
738 edge_filter_expr,
739 path_mode,
740 qpp_steps,
741 ..
742 } => self.plan_traverse(
743 input,
744 edge_type_ids,
745 direction.clone(),
746 source_variable,
747 target_variable,
748 *target_label_id,
749 step_variable.as_deref(),
750 *min_hops,
751 *max_hops,
752 path_variable.as_deref(),
753 *optional,
754 target_filter.as_ref(),
755 *is_variable_length,
756 optional_pattern_vars,
757 all_properties,
758 scope_match_variables,
759 edge_filter_expr.as_ref(),
760 path_mode,
761 qpp_steps.as_deref(),
762 ),
763
764 LogicalPlan::ShortestPath {
765 input,
766 edge_type_ids,
767 direction,
768 source_variable,
769 target_variable,
770 target_label_id: _,
771 path_variable,
772 min_hops: _,
773 max_hops: _,
774 } => self.plan_shortest_path(
775 input,
776 edge_type_ids,
777 direction.clone(),
778 source_variable,
779 target_variable,
780 path_variable,
781 false,
782 all_properties,
783 ),
784
785 LogicalPlan::Filter {
787 input,
788 predicate,
789 optional_variables,
790 } => self.plan_filter(input, predicate, optional_variables, all_properties),
791
792 LogicalPlan::Project { input, projections } => {
793 let alias_map: HashMap<String, Expr> = projections
796 .iter()
797 .filter_map(|(expr, alias)| alias.as_ref().map(|a| (a.clone(), expr.clone())))
798 .collect();
799
800 self.plan_project_with_aliases(input, projections, all_properties, &alias_map)
802 }
803
804 LogicalPlan::Aggregate {
805 input,
806 group_by,
807 aggregates,
808 } => self.plan_aggregate(input, group_by, aggregates, all_properties),
809
810 LogicalPlan::Distinct { input } => {
811 let input_plan = self.plan_internal(input, all_properties)?;
812 let schema = input_plan.schema();
813 let group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
815 schema
816 .fields()
817 .iter()
818 .enumerate()
819 .map(|(i, f)| {
820 (
821 Arc::new(datafusion::physical_expr::expressions::Column::new(
822 f.name(),
823 i,
824 ))
825 as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
826 f.name().clone(),
827 )
828 })
829 .collect();
830 let group_by = PhysicalGroupBy::new_single(group_exprs);
831 Ok(Arc::new(AggregateExec::try_new(
832 AggregateMode::Single,
833 group_by,
834 vec![],
835 vec![],
836 input_plan.clone(),
837 input_plan.schema(),
838 )?))
839 }
840
841 LogicalPlan::Sort { input, order_by } => {
842 self.plan_sort(input, order_by, all_properties, &HashMap::new())
843 }
844
845 LogicalPlan::Limit { input, skip, fetch } => {
846 self.plan_limit(input, *skip, *fetch, all_properties)
847 }
848
849 LogicalPlan::Union { left, right, all } => {
850 self.plan_union(left, right, *all, all_properties)
851 }
852
853 LogicalPlan::Empty => self.plan_empty(),
854
855 LogicalPlan::BindZeroLengthPath {
856 input,
857 node_variable,
858 path_variable,
859 } => {
860 self.plan_bind_zero_length_path(input, node_variable, path_variable, all_properties)
861 }
862
863 LogicalPlan::BindPath {
864 input,
865 node_variables,
866 edge_variables,
867 path_variable,
868 } => self.plan_bind_path(
869 input,
870 node_variables,
871 edge_variables,
872 path_variable,
873 all_properties,
874 ),
875
876 LogicalPlan::Create { input, pattern } => {
878 tracing::debug!("Planning MutationCreateExec");
879 let child = self.plan_internal(input, all_properties)?;
880 let mutation_ctx = self.require_mutation_ctx()?;
881 Ok(Arc::new(new_create_exec(
882 child,
883 pattern.clone(),
884 mutation_ctx,
885 )))
886 }
887 LogicalPlan::CreateBatch { input, patterns } => {
888 tracing::debug!(
889 patterns = patterns.len(),
890 "Planning MutationCreateExec (batch)"
891 );
892 let child = self.plan_internal(input, all_properties)?;
893 let mutation_ctx = self.require_mutation_ctx()?;
894 let output_schema = extended_schema_for_new_vars(&child.schema(), patterns);
897 Ok(Arc::new(MutationExec::new_with_schema(
898 child,
899 MutationKind::CreateBatch {
900 patterns: patterns.clone(),
901 },
902 "MutationCreateExec",
903 mutation_ctx,
904 output_schema,
905 )))
906 }
907 LogicalPlan::Set { input, items } => {
908 tracing::debug!(items = items.len(), "Planning MutationSetExec");
909 let child = self.plan_internal(input, all_properties)?;
910 let mutation_ctx = self.require_mutation_ctx()?;
911 Ok(Arc::new(new_set_exec(child, items.clone(), mutation_ctx)))
912 }
913 LogicalPlan::Remove { input, items } => {
914 tracing::debug!(items = items.len(), "Planning MutationRemoveExec");
915 let child = self.plan_internal(input, all_properties)?;
916 let mutation_ctx = self.require_mutation_ctx()?;
917 Ok(Arc::new(new_remove_exec(
918 child,
919 items.clone(),
920 mutation_ctx,
921 )))
922 }
923 LogicalPlan::Delete {
924 input,
925 items,
926 detach,
927 } => {
928 tracing::debug!(
929 items = items.len(),
930 detach = detach,
931 "Planning MutationDeleteExec"
932 );
933 let child = self.plan_internal(input, all_properties)?;
934 let mutation_ctx = self.require_mutation_ctx()?;
935 Ok(Arc::new(new_delete_exec(
936 child,
937 items.clone(),
938 *detach,
939 mutation_ctx,
940 )))
941 }
942 LogicalPlan::Merge {
943 input,
944 pattern,
945 on_match,
946 on_create,
947 } => {
948 tracing::debug!("Planning MutationMergeExec");
949 let child = self.plan_internal(input, all_properties)?;
950 let mutation_ctx = self.require_mutation_ctx()?;
951 Ok(Arc::new(new_merge_exec(
952 child,
953 pattern.clone(),
954 on_match.clone(),
955 on_create.clone(),
956 mutation_ctx,
957 )))
958 }
959
960 LogicalPlan::Window {
961 input,
962 window_exprs,
963 } => {
964 let input_plan = self.plan_internal(input, all_properties)?;
965 if !window_exprs.is_empty() {
966 self.plan_window_functions(input_plan, window_exprs, Some(input.as_ref()))
967 } else {
968 Ok(input_plan)
969 }
970 }
971
972 LogicalPlan::CrossJoin { left, right } => {
973 let left_plan = self.plan_internal(left, all_properties)?;
974 let right_plan = self.plan_internal(right, all_properties)?;
975
976 let left_plan = if matches!(right.as_ref(), LogicalPlan::LocyDerivedScan { .. }) {
982 let derived_schema = right_plan.schema();
983 let derived_names: HashSet<&str> = derived_schema
984 .fields()
985 .iter()
986 .map(|f| f.name().as_str())
987 .collect();
988 strip_conflicting_structural_columns(left_plan, &derived_names)?
989 } else {
990 left_plan
991 };
992
993 Ok(Arc::new(
994 datafusion::physical_plan::joins::CrossJoinExec::new(left_plan, right_plan),
995 ))
996 }
997
998 LogicalPlan::Apply {
999 input,
1000 subquery,
1001 input_filter,
1002 } => self.plan_apply(input, subquery, input_filter.as_ref(), all_properties),
1003
1004 LogicalPlan::Unwind {
1005 input,
1006 expr,
1007 variable,
1008 } => self.plan_unwind(
1009 input.as_ref().clone(),
1010 expr.clone(),
1011 variable.clone(),
1012 all_properties,
1013 ),
1014
1015 LogicalPlan::VectorKnn {
1016 label_id,
1017 variable,
1018 property,
1019 query,
1020 k,
1021 threshold,
1022 } => self.plan_vector_knn(
1023 *label_id,
1024 variable,
1025 property,
1026 query.clone(),
1027 *k,
1028 *threshold,
1029 all_properties,
1030 ),
1031
1032 LogicalPlan::InvertedIndexLookup { .. } => Err(anyhow!(
1033 "Full-text search not yet supported in DataFusion engine"
1034 )),
1035
1036 LogicalPlan::AllShortestPaths {
1037 input,
1038 edge_type_ids,
1039 direction,
1040 source_variable,
1041 target_variable,
1042 target_label_id: _,
1043 path_variable,
1044 min_hops: _,
1045 max_hops: _,
1046 } => self.plan_shortest_path(
1047 input,
1048 edge_type_ids,
1049 direction.clone(),
1050 source_variable,
1051 target_variable,
1052 path_variable,
1053 true,
1054 all_properties,
1055 ),
1056
1057 LogicalPlan::QuantifiedPattern { .. } => Err(anyhow!(
1058 "Quantified patterns not yet supported in DataFusion engine"
1059 )),
1060
1061 LogicalPlan::RecursiveCTE {
1062 cte_name,
1063 initial,
1064 recursive,
1065 } => self.plan_recursive_cte(cte_name, initial, recursive, all_properties),
1066
1067 LogicalPlan::ProcedureCall {
1068 procedure_name,
1069 arguments,
1070 yield_items,
1071 } => self.plan_procedure_call(procedure_name, arguments, yield_items, all_properties),
1072
1073 LogicalPlan::SubqueryCall { input, subquery } => {
1074 self.plan_apply(input, subquery, None, all_properties)
1075 }
1076
1077 LogicalPlan::ExtIdLookup {
1078 variable,
1079 ext_id,
1080 filter,
1081 optional,
1082 } => self.plan_ext_id_lookup(variable, ext_id, filter.as_ref(), *optional),
1083
1084 LogicalPlan::Foreach {
1085 input,
1086 variable,
1087 list,
1088 body,
1089 } => {
1090 tracing::debug!(variable = variable.as_str(), "Planning ForeachExec");
1091 let child = self.plan_internal(input, all_properties)?;
1092 let mutation_ctx = self.require_mutation_ctx()?;
1093 Ok(Arc::new(
1094 super::df_graph::mutation_foreach::ForeachExec::new(
1095 child,
1096 variable.clone(),
1097 list.clone(),
1098 body.clone(),
1099 mutation_ctx,
1100 ),
1101 ))
1102 }
1103
1104 LogicalPlan::LocyPriority { input, key_columns } => {
1106 let child = self.plan_internal(input, all_properties)?;
1107 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1108 let priority_col_index = child.schema().index_of("__priority").map_err(|_| {
1109 anyhow::anyhow!("LocyPriority input must contain __priority column")
1110 })?;
1111 Ok(Arc::new(super::df_graph::locy_priority::PriorityExec::new(
1112 child,
1113 key_indices,
1114 priority_col_index,
1115 )))
1116 }
1117
1118 LogicalPlan::LocyBestBy {
1119 input,
1120 key_columns,
1121 criteria,
1122 } => {
1123 let child = self.plan_internal(input, all_properties)?;
1124 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1125 let sort_criteria = resolve_best_by_criteria(&child.schema(), criteria)?;
1126 Ok(Arc::new(super::df_graph::locy_best_by::BestByExec::new(
1127 child,
1128 key_indices,
1129 sort_criteria,
1130 true, )))
1132 }
1133
1134 LogicalPlan::LocyFold {
1135 input,
1136 key_columns,
1137 fold_bindings,
1138 strict_probability_domain,
1139 probability_epsilon,
1140 } => {
1141 let child = self.plan_internal(input, all_properties)?;
1142 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1143 let bindings = resolve_fold_bindings(&child.schema(), fold_bindings)?;
1144 Ok(Arc::new(super::df_graph::locy_fold::FoldExec::new(
1145 child,
1146 key_indices,
1147 bindings,
1148 *strict_probability_domain,
1149 *probability_epsilon,
1150 )))
1151 }
1152
1153 LogicalPlan::LocyDerivedScan {
1154 scan_index: _,
1155 data,
1156 schema,
1157 } => Ok(Arc::new(
1158 super::df_graph::locy_fixpoint::DerivedScanExec::new(
1159 Arc::clone(data),
1160 Arc::clone(schema),
1161 ),
1162 )),
1163
1164 LogicalPlan::LocyProject {
1165 input,
1166 projections,
1167 target_types,
1168 } => self.plan_locy_project(input, projections, target_types, all_properties),
1169
1170 LogicalPlan::LocyProgram {
1171 strata,
1172 commands,
1173 derived_scan_registry,
1174 max_iterations,
1175 timeout,
1176 max_derived_bytes,
1177 deterministic_best_by,
1178 strict_probability_domain,
1179 probability_epsilon,
1180 exact_probability,
1181 max_bdd_variables,
1182 top_k_proofs,
1183 } => {
1184 let output_schema = super::df_graph::locy_program::stats_schema();
1185
1186 Ok(Arc::new(
1187 super::df_graph::locy_program::LocyProgramExec::new(
1188 strata.clone(),
1189 commands.clone(),
1190 Arc::clone(derived_scan_registry),
1191 Arc::clone(&self.graph_ctx),
1192 Arc::clone(&self.session_ctx),
1193 Arc::clone(&self.storage),
1194 Arc::clone(&self.schema),
1195 self.params.clone(),
1196 output_schema,
1197 *max_iterations,
1198 *timeout,
1199 *max_derived_bytes,
1200 *deterministic_best_by,
1201 *strict_probability_domain,
1202 *probability_epsilon,
1203 *exact_probability,
1204 *max_bdd_variables,
1205 *top_k_proofs,
1206 ),
1207 ))
1208 }
1209
1210 LogicalPlan::CreateVectorIndex { .. }
1212 | LogicalPlan::CreateFullTextIndex { .. }
1213 | LogicalPlan::CreateScalarIndex { .. }
1214 | LogicalPlan::CreateJsonFtsIndex { .. }
1215 | LogicalPlan::DropIndex { .. }
1216 | LogicalPlan::ShowIndexes { .. }
1217 | LogicalPlan::Copy { .. }
1218 | LogicalPlan::Backup { .. }
1219 | LogicalPlan::ShowDatabase
1220 | LogicalPlan::ShowConfig
1221 | LogicalPlan::ShowStatistics
1222 | LogicalPlan::Vacuum
1223 | LogicalPlan::Checkpoint
1224 | LogicalPlan::CopyTo { .. }
1225 | LogicalPlan::CopyFrom { .. }
1226 | LogicalPlan::CreateLabel(_)
1227 | LogicalPlan::CreateEdgeType(_)
1228 | LogicalPlan::AlterLabel(_)
1229 | LogicalPlan::AlterEdgeType(_)
1230 | LogicalPlan::DropLabel(_)
1231 | LogicalPlan::DropEdgeType(_)
1232 | LogicalPlan::CreateConstraint(_)
1233 | LogicalPlan::DropConstraint(_)
1234 | LogicalPlan::ShowConstraints(_)
1235 | LogicalPlan::Explain { .. } => {
1236 Err(anyhow!("DDL/Admin operations should be handled separately"))
1237 }
1238 }
1239 }
1240
1241 fn plan_internal_with_aliases(
1245 &self,
1246 logical: &LogicalPlan,
1247 all_properties: &HashMap<String, HashSet<String>>,
1248 alias_map: &HashMap<String, Expr>,
1249 ) -> Result<Arc<dyn ExecutionPlan>> {
1250 match logical {
1251 LogicalPlan::Sort { input, order_by } => {
1252 self.plan_sort(input, order_by, all_properties, alias_map)
1253 }
1254 LogicalPlan::Limit { input, skip, fetch } => {
1255 let input_plan =
1257 self.plan_internal_with_aliases(input, all_properties, alias_map)?;
1258 if let Some(offset) = skip.filter(|&s| s > 0) {
1259 use datafusion::physical_plan::limit::GlobalLimitExec;
1260 Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, *fetch)))
1261 } else {
1262 Ok(Arc::new(LocalLimitExec::new(
1263 input_plan,
1264 fetch.unwrap_or(usize::MAX),
1265 )))
1266 }
1267 }
1268 _ => self.plan_internal(logical, all_properties),
1270 }
1271 }
1272
1273 fn apply_scan_filter(
1279 &self,
1280 plan: Arc<dyn ExecutionPlan>,
1281 variable: &str,
1282 filter: Option<&Expr>,
1283 label_name: Option<&str>,
1284 ) -> Result<Arc<dyn ExecutionPlan>> {
1285 let Some(filter_expr) = filter else {
1286 return Ok(plan);
1287 };
1288
1289 let mut variable_kinds = HashMap::new();
1290 variable_kinds.insert(variable.to_string(), VariableKind::Node);
1291 let mut variable_labels = HashMap::new();
1292 if let Some(label) = label_name {
1293 variable_labels.insert(variable.to_string(), label.to_string());
1294 }
1295 let ctx = TranslationContext {
1296 parameters: self.params.clone(),
1297 variable_labels,
1298 variable_kinds,
1299 ..Default::default()
1300 };
1301 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1302
1303 let schema = plan.schema();
1304
1305 let session = self.session_ctx.read();
1306 let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1307
1308 Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1309 }
1310
1311 #[expect(clippy::too_many_arguments)]
1318 fn apply_schemaless_traverse_filter(
1319 &self,
1320 plan: Arc<dyn ExecutionPlan>,
1321 filter_expr: Option<&Expr>,
1322 source_variable: &str,
1323 target_variable: &str,
1324 step_variable: Option<&str>,
1325 path_variable: Option<&str>,
1326 is_variable_length: bool,
1327 optional: bool,
1328 optional_pattern_vars: &HashSet<String>,
1329 ) -> Result<Arc<dyn ExecutionPlan>> {
1330 let Some(filter_expr) = filter_expr else {
1331 return Ok(plan);
1332 };
1333
1334 let mut variable_kinds = HashMap::new();
1335 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
1336 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
1337 if let Some(sv) = step_variable {
1338 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
1339 }
1340 if let Some(pv) = path_variable {
1341 variable_kinds.insert(pv.to_string(), VariableKind::Path);
1342 }
1343 let ctx = TranslationContext {
1344 parameters: self.params.clone(),
1345 variable_kinds,
1346 ..Default::default()
1347 };
1348 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1349 let schema = plan.schema();
1350 let session = self.session_ctx.read();
1351 let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1352
1353 if optional {
1354 Ok(Arc::new(OptionalFilterExec::new(
1355 plan,
1356 physical_filter,
1357 optional_pattern_vars.clone(),
1358 )))
1359 } else {
1360 Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1361 }
1362 }
1363
1364 fn plan_ext_id_lookup(
1366 &self,
1367 variable: &str,
1368 ext_id: &str,
1369 filter: Option<&Expr>,
1370 optional: bool,
1371 ) -> Result<Arc<dyn ExecutionPlan>> {
1372 let properties = if let Some(filter_expr) = filter {
1374 crate::query::df_expr::collect_properties(filter_expr)
1375 .into_iter()
1376 .filter(|(var, _)| var == variable)
1377 .map(|(_, prop)| prop)
1378 .collect()
1379 } else {
1380 vec![]
1381 };
1382
1383 let lookup_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphExtIdLookupExec::new(
1384 self.graph_ctx.clone(),
1385 variable.to_string(),
1386 ext_id.to_string(),
1387 properties,
1388 optional,
1389 ));
1390
1391 self.apply_scan_filter(lookup_plan, variable, filter, None)
1392 }
1393
1394 fn plan_unwind(
1398 &self,
1399 input: LogicalPlan,
1400 expr: Expr,
1401 variable: String,
1402 all_properties: &HashMap<String, HashSet<String>>,
1403 ) -> Result<Arc<dyn ExecutionPlan>> {
1404 let input_plan = self.plan_internal(&input, all_properties)?;
1406
1407 let unwind = GraphUnwindExec::new(input_plan, expr, variable, self.params.clone());
1408
1409 Ok(Arc::new(unwind))
1410 }
1411
1412 fn plan_recursive_cte(
1417 &self,
1418 cte_name: &str,
1419 initial: &LogicalPlan,
1420 recursive: &LogicalPlan,
1421 _all_properties: &HashMap<String, HashSet<String>>,
1422 ) -> Result<Arc<dyn ExecutionPlan>> {
1423 Ok(Arc::new(RecursiveCTEExec::new(
1424 cte_name.to_string(),
1425 initial.clone(),
1426 recursive.clone(),
1427 self.graph_ctx.clone(),
1428 self.session_ctx.clone(),
1429 self.storage.clone(),
1430 self.schema.clone(),
1431 self.params.clone(),
1432 )))
1433 }
1434
1435 fn plan_apply(
1437 &self,
1438 input: &LogicalPlan,
1439 subquery: &LogicalPlan,
1440 input_filter: Option<&Expr>,
1441 all_properties: &HashMap<String, HashSet<String>>,
1442 ) -> Result<Arc<dyn ExecutionPlan>> {
1443 use crate::query::df_graph::common::infer_logical_plan_schema;
1444
1445 let input_exec = self.plan_internal(input, all_properties)?;
1447 let input_schema = input_exec.schema();
1448
1449 let sub_schema = infer_logical_plan_schema(subquery, &self.schema);
1451
1452 let mut fields: Vec<Arc<arrow_schema::Field>> = input_schema.fields().to_vec();
1454 let input_field_names: HashSet<&str> = input_schema
1455 .fields()
1456 .iter()
1457 .map(|f| f.name().as_str())
1458 .collect();
1459 for field in sub_schema.fields() {
1460 if !input_field_names.contains(field.name().as_str()) {
1461 fields.push(field.clone());
1462 }
1463 }
1464 let output_schema: SchemaRef = Arc::new(Schema::new(fields));
1465
1466 Ok(Arc::new(GraphApplyExec::new(
1467 input_exec,
1468 subquery.clone(),
1469 input_filter.cloned(),
1470 self.graph_ctx.clone(),
1471 self.session_ctx.clone(),
1472 self.storage.clone(),
1473 self.schema.clone(),
1474 self.params.clone(),
1475 output_schema,
1476 )))
1477 }
1478
1479 #[expect(clippy::too_many_arguments)]
1481 fn plan_vector_knn(
1482 &self,
1483 label_id: u16,
1484 variable: &str,
1485 property: &str,
1486 query_expr: Expr,
1487 k: usize,
1488 threshold: Option<f32>,
1489 all_properties: &HashMap<String, HashSet<String>>,
1490 ) -> Result<Arc<dyn ExecutionPlan>> {
1491 let label_name = self
1492 .schema
1493 .label_name_by_id(label_id)
1494 .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
1495
1496 let target_properties = self.resolve_properties(variable, label_name, all_properties);
1497
1498 let knn = GraphVectorKnnExec::new(
1499 self.graph_ctx.clone(),
1500 label_id,
1501 label_name,
1502 variable.to_string(),
1503 property.to_string(),
1504 query_expr,
1505 k,
1506 threshold,
1507 self.params.clone(),
1508 target_properties,
1509 );
1510
1511 Ok(Arc::new(knn))
1512 }
1513
1514 fn plan_procedure_call(
1516 &self,
1517 procedure_name: &str,
1518 arguments: &[Expr],
1519 yield_items: &[(String, Option<String>)],
1520 all_properties: &HashMap<String, HashSet<String>>,
1521 ) -> Result<Arc<dyn ExecutionPlan>> {
1522 use crate::query::df_graph::procedure_call::map_yield_to_canonical;
1523
1524 let mut target_properties: HashMap<String, Vec<String>> = HashMap::new();
1526
1527 if matches!(
1528 procedure_name,
1529 "uni.vector.query" | "uni.fts.query" | "uni.search"
1530 ) {
1531 for (name, alias) in yield_items {
1532 let output_name = alias.as_ref().unwrap_or(name);
1533 let canonical = map_yield_to_canonical(name);
1534 if canonical == "node" {
1535 if let Some(props) = all_properties.get(output_name.as_str()) {
1537 let prop_list: Vec<String> = props
1538 .iter()
1539 .filter(|p| *p != "*" && !p.starts_with('_'))
1540 .cloned()
1541 .collect();
1542 target_properties.insert(output_name.clone(), prop_list);
1543 }
1544 }
1545 }
1546 }
1547
1548 let exec = GraphProcedureCallExec::new(
1549 self.graph_ctx.clone(),
1550 procedure_name.to_string(),
1551 arguments.to_vec(),
1552 yield_items.to_vec(),
1553 self.params.clone(),
1554 self.outer_values.clone(),
1555 target_properties,
1556 );
1557
1558 Ok(Arc::new(exec))
1559 }
1560
1561 fn plan_scan(
1563 &self,
1564 label_id: u16,
1565 variable: &str,
1566 filter: Option<&Expr>,
1567 optional: bool,
1568 all_properties: &HashMap<String, HashSet<String>>,
1569 ) -> Result<Arc<dyn ExecutionPlan>> {
1570 let label_name = self
1571 .schema
1572 .label_name_by_id(label_id)
1573 .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
1574
1575 let mut properties = self.resolve_properties(variable, label_name, all_properties);
1577
1578 let label_props = self.schema.properties.get(label_name);
1580 let has_projection_overflow = properties.iter().any(|p| {
1581 p != "overflow_json"
1582 && !p.starts_with('_')
1583 && !label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
1584 });
1585 if has_projection_overflow && !properties.iter().any(|p| p == "overflow_json") {
1586 properties.push("overflow_json".to_string());
1587 }
1588
1589 if let Some(filter_expr) = filter {
1592 let filter_props = crate::query::df_expr::collect_properties(filter_expr);
1593 let has_overflow = filter_props.iter().any(|(var, prop)| {
1594 var == variable
1595 && !prop.starts_with('_')
1596 && label_props.is_none_or(|props| !props.contains_key(prop.as_str()))
1597 });
1598 if has_overflow && !properties.iter().any(|p| p == "overflow_json") {
1599 properties.push("overflow_json".to_string());
1600 }
1601 }
1602
1603 let var_props = all_properties.get(variable);
1606 let need_full = var_props.is_some_and(|p| p.contains("*"));
1607 if need_full {
1608 if !properties.contains(&"_all_props".to_string()) {
1609 properties.push("_all_props".to_string());
1610 }
1611 if !properties.contains(&"overflow_json".to_string()) {
1612 properties.push("overflow_json".to_string());
1613 }
1614 }
1615
1616 let mut scan_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphScanExec::new_vertex_scan(
1617 self.graph_ctx.clone(),
1618 label_name.to_string(),
1619 variable.to_string(),
1620 properties.clone(),
1621 None, ));
1623
1624 scan_plan = self.apply_scan_filter(scan_plan, variable, filter, Some(label_name))?;
1629
1630 if need_full {
1631 let struct_props: Vec<String> = properties
1634 .iter()
1635 .filter(|p| *p != "overflow_json" && *p != "*")
1636 .cloned()
1637 .collect();
1638 scan_plan = self.add_structural_projection(scan_plan, variable, &struct_props)?;
1639 }
1640
1641 self.wrap_optional(scan_plan, optional)
1642 }
1643
1644 fn add_wildcard_structural_projection(
1653 &self,
1654 plan: Arc<dyn ExecutionPlan>,
1655 variable: &str,
1656 all_properties: &HashMap<String, HashSet<String>>,
1657 ) -> Result<Arc<dyn ExecutionPlan>> {
1658 if !all_properties
1659 .get(variable)
1660 .is_some_and(|p| p.contains("*"))
1661 {
1662 return Ok(plan);
1663 }
1664 let prefix = format!("{}.", variable);
1665 let struct_props: Vec<String> = plan
1666 .schema()
1667 .fields()
1668 .iter()
1669 .filter_map(|f| {
1670 f.name()
1671 .strip_prefix(&prefix)
1672 .filter(|prop| !prop.starts_with('_') || *prop == "_all_props")
1673 .map(|prop| prop.to_string())
1674 })
1675 .collect();
1676 self.add_structural_projection(plan, variable, &struct_props)
1677 }
1678
1679 fn detect_bound_target(input_schema: &SchemaRef, target_variable: &str) -> Option<String> {
1683 let col = format!("{}._vid", target_variable);
1685 if input_schema.column_with_name(&col).is_some() {
1686 return Some(col);
1687 }
1688 if let Ok(field) = input_schema.field_with_name(target_variable)
1694 && matches!(
1695 field.data_type(),
1696 datafusion::arrow::datatypes::DataType::UInt64
1697 | datafusion::arrow::datatypes::DataType::Int64
1698 )
1699 {
1700 return Some(target_variable.to_string());
1701 }
1702 None
1703 }
1704
1705 fn resolve_schemaless_properties(
1710 variable: &str,
1711 all_properties: &HashMap<String, HashSet<String>>,
1712 ) -> (Vec<String>, bool) {
1713 let mut properties: Vec<String> = all_properties
1714 .get(variable)
1715 .map(|s| s.iter().filter(|p| *p != "*").cloned().collect())
1716 .unwrap_or_default();
1717 let need_full = all_properties
1718 .get(variable)
1719 .is_some_and(|p| p.contains("*"));
1720 if !properties.iter().any(|p| p == "_all_props") {
1721 properties.push("_all_props".to_string());
1722 }
1723 (properties, need_full)
1724 }
1725
1726 fn collect_used_edge_columns(
1729 schema: &SchemaRef,
1730 scope_match_variables: &HashSet<String>,
1731 exclude_col: Option<&str>,
1732 ) -> Vec<String> {
1733 schema
1734 .fields()
1735 .iter()
1736 .filter_map(|f| {
1737 let name = f.name();
1738 if exclude_col.is_some_and(|exc| name == exc) {
1739 None
1740 } else if name.ends_with("._eid") {
1741 let var_name = name.trim_end_matches("._eid");
1742 scope_match_variables
1743 .contains(var_name)
1744 .then(|| name.clone())
1745 } else if name.starts_with("__eid_to_") {
1746 let var_name = name.trim_start_matches("__eid_to_");
1747 scope_match_variables
1748 .contains(var_name)
1749 .then(|| name.clone())
1750 } else {
1751 None
1752 }
1753 })
1754 .collect()
1755 }
1756
1757 fn maybe_add_edge_structural_projection(
1760 &self,
1761 plan: Arc<dyn ExecutionPlan>,
1762 step_variable: Option<&str>,
1763 source_variable: &str,
1764 target_variable: &str,
1765 all_properties: &HashMap<String, HashSet<String>>,
1766 skip_if_vlp: bool,
1767 ) -> Result<Arc<dyn ExecutionPlan>> {
1768 if skip_if_vlp {
1769 return Ok(plan);
1770 }
1771 let Some(edge_var) = step_variable else {
1772 return Ok(plan);
1773 };
1774 if !all_properties
1775 .get(edge_var)
1776 .is_some_and(|p| p.contains("*"))
1777 {
1778 return Ok(plan);
1779 }
1780 let prefix = format!("{}.", edge_var);
1782 let edge_props: Vec<String> = plan
1783 .schema()
1784 .fields()
1785 .iter()
1786 .filter_map(|f| {
1787 f.name()
1788 .strip_prefix(&prefix)
1789 .filter(|prop| !prop.starts_with('_') && *prop != "overflow_json")
1790 .map(|prop| prop.to_string())
1791 })
1792 .collect();
1793 self.add_edge_structural_projection(
1794 plan,
1795 edge_var,
1796 &edge_props,
1797 source_variable,
1798 target_variable,
1799 )
1800 }
1801
1802 fn finalize_schemaless_scan(
1804 &self,
1805 scan_plan: Arc<dyn ExecutionPlan>,
1806 variable: &str,
1807 filter: Option<&Expr>,
1808 optional: bool,
1809 properties: &[String],
1810 need_full: bool,
1811 ) -> Result<Arc<dyn ExecutionPlan>> {
1812 let mut plan = self.apply_scan_filter(scan_plan, variable, filter, None)?;
1815
1816 if need_full {
1819 let struct_props: Vec<String> =
1823 properties.iter().filter(|p| *p != "*").cloned().collect();
1824 plan = self.add_structural_projection(plan, variable, &struct_props)?;
1825 }
1826
1827 self.wrap_optional(plan, optional)
1828 }
1829
1830 fn plan_schemaless_scan(
1831 &self,
1832 label_name: &str,
1833 variable: &str,
1834 filter: Option<&Expr>,
1835 optional: bool,
1836 all_properties: &HashMap<String, HashSet<String>>,
1837 ) -> Result<Arc<dyn ExecutionPlan>> {
1838 let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
1839 let scan_plan: Arc<dyn ExecutionPlan> =
1840 Arc::new(GraphScanExec::new_schemaless_vertex_scan(
1841 self.graph_ctx.clone(),
1842 label_name.to_string(),
1843 variable.to_string(),
1844 properties.clone(),
1845 None,
1846 ));
1847 self.finalize_schemaless_scan(
1848 scan_plan,
1849 variable,
1850 filter,
1851 optional,
1852 &properties,
1853 need_full,
1854 )
1855 }
1856
1857 fn plan_multi_label_scan(
1861 &self,
1862 labels: &[String],
1863 variable: &str,
1864 filter: Option<&Expr>,
1865 optional: bool,
1866 all_properties: &HashMap<String, HashSet<String>>,
1867 ) -> Result<Arc<dyn ExecutionPlan>> {
1868 let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
1869 let scan_plan: Arc<dyn ExecutionPlan> =
1870 Arc::new(GraphScanExec::new_multi_label_vertex_scan(
1871 self.graph_ctx.clone(),
1872 labels.to_vec(),
1873 variable.to_string(),
1874 properties.clone(),
1875 None,
1876 ));
1877 self.finalize_schemaless_scan(
1878 scan_plan,
1879 variable,
1880 filter,
1881 optional,
1882 &properties,
1883 need_full,
1884 )
1885 }
1886
1887 fn plan_scan_all(
1891 &self,
1892 variable: &str,
1893 filter: Option<&Expr>,
1894 optional: bool,
1895 all_properties: &HashMap<String, HashSet<String>>,
1896 ) -> Result<Arc<dyn ExecutionPlan>> {
1897 let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
1898 let scan_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphScanExec::new_schemaless_all_scan(
1899 self.graph_ctx.clone(),
1900 variable.to_string(),
1901 properties.clone(),
1902 None,
1903 ));
1904 self.finalize_schemaless_scan(
1905 scan_plan,
1906 variable,
1907 filter,
1908 optional,
1909 &properties,
1910 need_full,
1911 )
1912 }
1913
1914 #[expect(
1916 clippy::too_many_arguments,
1917 reason = "Graph traversal requires many parameters"
1918 )]
1919 fn plan_traverse(
1920 &self,
1921 input: &LogicalPlan,
1922 edge_type_ids: &[u32],
1923 direction: AstDirection,
1924 source_variable: &str,
1925 target_variable: &str,
1926 target_label_id: u16,
1927 step_variable: Option<&str>,
1928 min_hops: usize,
1929 max_hops: usize,
1930 path_variable: Option<&str>,
1931 optional: bool,
1932 target_filter: Option<&Expr>,
1933 is_variable_length: bool,
1934 optional_pattern_vars: &HashSet<String>,
1935 all_properties: &HashMap<String, HashSet<String>>,
1936 scope_match_variables: &HashSet<String>,
1937 edge_filter_expr: Option<&Expr>,
1938 path_mode: &crate::query::df_graph::nfa::PathMode,
1939 qpp_steps: Option<&[crate::query::planner::QppStepInfo]>,
1940 ) -> Result<Arc<dyn ExecutionPlan>> {
1941 let input_plan = self.plan_internal(input, all_properties)?;
1942
1943 let adj_direction = convert_direction(direction);
1944 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
1945
1946 let traverse_plan: Arc<dyn ExecutionPlan> = if !is_variable_length {
1947 let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
1949 let has_wildcard = all_properties
1950 .get(edge_var)
1951 .is_some_and(|props| props.contains("*"));
1952 if has_wildcard {
1953 let mut schema_props: Vec<String> = edge_type_ids
1955 .iter()
1956 .filter_map(|eid| self.schema.edge_type_name_by_id(*eid))
1957 .flat_map(|name| {
1958 self.schema
1959 .properties
1960 .get(name)
1961 .map(|p| p.keys().cloned().collect::<Vec<_>>())
1962 .unwrap_or_default()
1963 })
1964 .collect();
1965
1966 if let Some(props) = all_properties.get(edge_var) {
1969 for p in props {
1970 if p != "*" && !p.starts_with('_') && !schema_props.contains(p) {
1971 schema_props.push(p.clone());
1972 }
1973 }
1974 }
1975 schema_props
1976 } else {
1977 all_properties
1978 .get(edge_var)
1979 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
1980 .unwrap_or_default()
1981 }
1982 } else {
1983 Vec::new()
1984 };
1985
1986 if let Some(edge_var) = step_variable {
1988 let has_wildcard = all_properties
1989 .get(edge_var)
1990 .is_some_and(|props| props.contains("*"));
1991 let edge_type_props = self.merged_edge_type_properties(edge_type_ids);
1992 let has_overflow_edge_props = edge_properties.iter().any(|p| {
1993 p != "overflow_json"
1994 && !p.starts_with('_')
1995 && !edge_type_props.contains_key(p.as_str())
1996 });
1997 let needs_overflow =
2001 (has_wildcard && edge_properties.is_empty()) || has_overflow_edge_props;
2002 if needs_overflow && !edge_properties.contains(&"overflow_json".to_string()) {
2003 edge_properties.push("overflow_json".to_string());
2004 }
2005
2006 if has_wildcard && !edge_properties.contains(&"_all_props".to_string()) {
2010 edge_properties.push("_all_props".to_string());
2011 }
2012 }
2013
2014 let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
2016 let mut target_properties =
2017 self.resolve_properties(target_variable, target_label_name_str, all_properties);
2018
2019 target_properties.retain(|p| p != "*");
2023
2024 let target_has_wildcard = all_properties
2027 .get(target_variable)
2028 .is_some_and(|p| p.contains("*"));
2029 if target_has_wildcard && target_properties.is_empty() {
2030 target_properties.push("_all_props".to_string());
2031 }
2032
2033 let target_label_props = if !target_label_name_str.is_empty() {
2037 self.schema.properties.get(target_label_name_str)
2038 } else {
2039 None
2040 };
2041 let has_non_schema_props = target_properties.iter().any(|p| {
2042 p != "overflow_json"
2043 && p != "_all_props"
2044 && !p.starts_with('_')
2045 && !target_label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
2046 });
2047 if has_non_schema_props && !target_properties.iter().any(|p| p == "_all_props") {
2048 target_properties.push("_all_props".to_string());
2049 }
2050 if let Some(filter_expr) = target_filter {
2052 let filter_props = crate::query::df_expr::collect_properties(filter_expr);
2053 let has_overflow_filter = filter_props.iter().any(|(var, prop)| {
2054 var == target_variable
2055 && !prop.starts_with('_')
2056 && !target_label_props
2057 .is_some_and(|props| props.contains_key(prop.as_str()))
2058 });
2059 if has_overflow_filter && !target_properties.iter().any(|p| p == "_all_props") {
2060 target_properties.push("_all_props".to_string());
2061 }
2062 }
2063 if !target_label_name_str.is_empty()
2066 && has_non_schema_props
2067 && !target_properties.iter().any(|p| p == "overflow_json")
2068 {
2069 target_properties.push("overflow_json".to_string());
2070 }
2071
2072 let target_label_name = if target_label_name_str.is_empty() {
2074 None
2075 } else {
2076 Some(target_label_name_str.to_string())
2077 };
2078
2079 let bound_target_column =
2086 Self::detect_bound_target(&input_plan.schema(), target_variable);
2087
2088 let mut input_plan = input_plan;
2104 for rebound_var in [
2105 step_variable.and_then(|sv| sv.strip_prefix("__rebound_")),
2106 target_variable.strip_prefix("__rebound_"),
2107 ]
2108 .into_iter()
2109 .flatten()
2110 {
2111 if input_plan
2112 .schema()
2113 .field_with_name(rebound_var)
2114 .ok()
2115 .is_some_and(|f| {
2116 matches!(
2117 f.data_type(),
2118 datafusion::arrow::datatypes::DataType::Struct(_)
2119 )
2120 })
2121 {
2122 input_plan = Self::extract_all_struct_fields(input_plan, rebound_var)?;
2123 }
2124 }
2125
2126 let rebound_bound_edge_col = step_variable
2127 .and_then(|sv| sv.strip_prefix("__rebound_"))
2128 .map(|bound| format!("{}._eid", bound));
2129
2130 let used_edge_columns = Self::collect_used_edge_columns(
2131 &input_plan.schema(),
2132 scope_match_variables,
2133 rebound_bound_edge_col.as_deref(),
2134 );
2135
2136 Arc::new(GraphTraverseExec::new(
2137 input_plan,
2138 source_col,
2139 edge_type_ids.to_vec(),
2140 adj_direction,
2141 target_variable.to_string(),
2142 step_variable.map(|s| s.to_string()),
2143 edge_properties,
2144 target_properties,
2145 target_label_name,
2146 None, self.graph_ctx.clone(),
2148 optional,
2149 optional_pattern_vars.clone(),
2150 bound_target_column,
2151 used_edge_columns,
2152 ))
2153 } else {
2154 if edge_type_ids.is_empty() {
2156 if let (0, Some(path_var)) = (min_hops, path_variable) {
2159 return Ok(Arc::new(BindZeroLengthPathExec::new(
2160 input_plan,
2161 source_variable.to_string(),
2162 path_var.to_string(),
2163 self.graph_ctx.clone(),
2164 )));
2165 } else if min_hops == 0 && step_variable.is_none() {
2166 return Ok(input_plan);
2169 }
2170 }
2171 {
2172 let vlp_target_label_name_str =
2174 self.schema.label_name_by_id(target_label_id).unwrap_or("");
2175 let vlp_target_properties_raw = self.resolve_properties(
2176 target_variable,
2177 vlp_target_label_name_str,
2178 all_properties,
2179 );
2180 let target_has_wildcard = all_properties
2181 .get(target_variable)
2182 .is_some_and(|p| p.contains("*"));
2183 let vlp_target_label_props: Option<HashSet<String>> =
2184 if vlp_target_label_name_str.is_empty() {
2185 None
2186 } else {
2187 self.schema
2188 .properties
2189 .get(vlp_target_label_name_str)
2190 .map(|props| props.keys().cloned().collect())
2191 };
2192 let mut vlp_target_properties = sanitize_vlp_target_properties(
2193 vlp_target_properties_raw,
2194 target_has_wildcard,
2195 vlp_target_label_props.as_ref(),
2196 );
2197 let vlp_target_label_name = if vlp_target_label_name_str.is_empty() {
2198 None
2199 } else {
2200 Some(vlp_target_label_name_str.to_string())
2201 };
2202
2203 let bound_target_column =
2205 Self::detect_bound_target(&input_plan.schema(), target_variable);
2206 if bound_target_column.is_some() {
2207 vlp_target_properties.clear();
2210 }
2211
2212 let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
2214 let edge_var_name = step_variable.unwrap_or("__anon_edge");
2215 crate::query::pushdown::LanceFilterGenerator::generate(
2216 std::slice::from_ref(expr),
2217 edge_var_name,
2218 None,
2219 )
2220 });
2221
2222 let edge_property_conditions = edge_filter_expr
2224 .map(Self::extract_edge_property_conditions)
2225 .unwrap_or_default();
2226
2227 let used_edge_columns = Self::collect_used_edge_columns(
2229 &input_plan.schema(),
2230 scope_match_variables,
2231 None,
2232 );
2233
2234 let output_mode = if step_variable.is_some() {
2236 crate::query::df_graph::nfa::VlpOutputMode::StepVariable
2237 } else if path_variable.is_some() {
2238 crate::query::df_graph::nfa::VlpOutputMode::FullPath
2239 } else {
2240 crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
2241 };
2242
2243 let qpp_nfa = qpp_steps.map(|steps| {
2245 use crate::query::df_graph::nfa::{QppStep, VertexConstraint};
2246 let hops_per_iter = steps.len();
2247 let min_iter = min_hops / hops_per_iter;
2248 let max_iter = max_hops / hops_per_iter;
2249 let nfa_steps: Vec<QppStep> = steps
2250 .iter()
2251 .map(|s| QppStep {
2252 edge_type_ids: s.edge_type_ids.clone(),
2253 direction: convert_direction(s.direction.clone()),
2254 target_constraint: s
2255 .target_label
2256 .as_ref()
2257 .map(|l| VertexConstraint::Label(l.clone())),
2258 })
2259 .collect();
2260 crate::query::df_graph::nfa::PathNfa::from_qpp(nfa_steps, min_iter, max_iter)
2261 });
2262
2263 Arc::new(GraphVariableLengthTraverseExec::new(
2264 input_plan,
2265 source_col,
2266 edge_type_ids.to_vec(),
2267 adj_direction,
2268 min_hops,
2269 max_hops,
2270 target_variable.to_string(),
2271 step_variable.map(|s| s.to_string()),
2272 path_variable.map(|s| s.to_string()),
2273 vlp_target_properties,
2274 vlp_target_label_name,
2275 self.graph_ctx.clone(),
2276 optional,
2277 bound_target_column,
2278 edge_lance_filter,
2279 edge_property_conditions,
2280 used_edge_columns,
2281 path_mode.clone(),
2282 output_mode,
2283 qpp_nfa,
2284 ))
2285 }
2286 };
2287
2288 let mut traverse_plan = traverse_plan;
2290
2291 traverse_plan = self.add_wildcard_structural_projection(
2293 traverse_plan,
2294 target_variable,
2295 all_properties,
2296 )?;
2297
2298 traverse_plan = self.maybe_add_edge_structural_projection(
2301 traverse_plan,
2302 step_variable,
2303 source_variable,
2304 target_variable,
2305 all_properties,
2306 is_variable_length,
2307 )?;
2308
2309 if let Some(filter_expr) = target_filter {
2311 let mut variable_kinds = HashMap::new();
2313 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
2314 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
2315 if let Some(sv) = step_variable {
2316 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
2317 }
2318 if let Some(pv) = path_variable {
2319 variable_kinds.insert(pv.to_string(), VariableKind::Path);
2320 }
2321 let mut variable_labels = HashMap::new();
2322 if let Some(sv) = step_variable
2323 && edge_type_ids.len() == 1
2324 && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
2325 {
2326 variable_labels.insert(sv.to_string(), name.to_string());
2327 }
2328 let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
2329 if !target_label_name_str.is_empty() {
2330 variable_labels.insert(
2331 target_variable.to_string(),
2332 target_label_name_str.to_string(),
2333 );
2334 }
2335 let ctx = TranslationContext {
2336 parameters: self.params.clone(),
2337 variable_labels,
2338 variable_kinds,
2339 ..Default::default()
2340 };
2341 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
2342 let schema = traverse_plan.schema();
2343 let session = self.session_ctx.read();
2344 let physical_filter =
2345 self.create_physical_filter_expr(&df_filter, &schema, &session)?;
2346
2347 if optional {
2348 Ok(Arc::new(OptionalFilterExec::new(
2349 traverse_plan,
2350 physical_filter,
2351 optional_pattern_vars.clone(),
2352 )))
2353 } else {
2354 Ok(Arc::new(FilterExec::try_new(
2355 physical_filter,
2356 traverse_plan,
2357 )?))
2358 }
2359 } else {
2360 Ok(traverse_plan)
2361 }
2362 }
2363
2364 #[expect(clippy::too_many_arguments)]
2369 fn plan_traverse_main_by_type(
2370 &self,
2371 input: &LogicalPlan,
2372 type_names: &[String],
2373 direction: AstDirection,
2374 source_variable: &str,
2375 target_variable: &str,
2376 step_variable: Option<&str>,
2377 optional: bool,
2378 optional_pattern_vars: &HashSet<String>,
2379 all_properties: &HashMap<String, HashSet<String>>,
2380 scope_match_variables: &HashSet<String>,
2381 ) -> Result<Arc<dyn ExecutionPlan>> {
2382 let input_plan = self.plan_internal(input, all_properties)?;
2383
2384 let adj_direction = convert_direction(direction);
2385 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
2386
2387 let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
2389
2390 let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
2392 all_properties
2393 .get(edge_var)
2394 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
2395 .unwrap_or_default()
2396 } else {
2397 Vec::new()
2398 };
2399
2400 if let Some(edge_var) = step_variable
2402 && all_properties
2403 .get(edge_var)
2404 .is_some_and(|props| props.contains("*"))
2405 && !edge_properties.iter().any(|p| p == "_all_props")
2406 {
2407 edge_properties.push("_all_props".to_string());
2408 }
2409
2410 let mut target_properties: Vec<String> = all_properties
2412 .get(target_variable)
2413 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
2414 .unwrap_or_default();
2415
2416 let target_has_wildcard = all_properties
2420 .get(target_variable)
2421 .is_some_and(|p| p.contains("*"));
2422 if (target_has_wildcard || !target_properties.is_empty())
2423 && !target_properties.iter().any(|p| p == "_all_props")
2424 {
2425 target_properties.push("_all_props".to_string());
2426 }
2427 if bound_target_column.is_some() {
2428 target_properties.clear();
2430 }
2431
2432 let rebound_bound_edge_col = step_variable
2435 .and_then(|sv| sv.strip_prefix("__rebound_"))
2436 .map(|bound| format!("{}._eid", bound));
2437 let used_edge_columns = Self::collect_used_edge_columns(
2438 &input_plan.schema(),
2439 scope_match_variables,
2440 rebound_bound_edge_col.as_deref(),
2441 );
2442
2443 let traverse_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphTraverseMainExec::new(
2445 input_plan,
2446 source_col,
2447 type_names.to_vec(),
2448 adj_direction,
2449 target_variable.to_string(),
2450 step_variable.map(|s| s.to_string()),
2451 edge_properties.clone(),
2452 target_properties,
2453 self.graph_ctx.clone(),
2454 optional,
2455 optional_pattern_vars.clone(),
2456 bound_target_column,
2457 used_edge_columns,
2458 ));
2459
2460 let mut result_plan = traverse_plan;
2461
2462 result_plan =
2464 self.add_wildcard_structural_projection(result_plan, target_variable, all_properties)?;
2465
2466 result_plan = self.maybe_add_edge_structural_projection(
2468 result_plan,
2469 step_variable,
2470 source_variable,
2471 target_variable,
2472 all_properties,
2473 false, )?;
2475
2476 Ok(result_plan)
2477 }
2478
2479 #[expect(clippy::too_many_arguments)]
2484 fn plan_traverse_main_by_type_vlp(
2485 &self,
2486 input: &LogicalPlan,
2487 type_names: &[String],
2488 direction: AstDirection,
2489 source_variable: &str,
2490 target_variable: &str,
2491 step_variable: Option<&str>,
2492 min_hops: usize,
2493 max_hops: usize,
2494 path_variable: Option<&str>,
2495 optional: bool,
2496 all_properties: &HashMap<String, HashSet<String>>,
2497 edge_filter_expr: Option<&Expr>,
2498 path_mode: &crate::query::df_graph::nfa::PathMode,
2499 scope_match_variables: &HashSet<String>,
2500 ) -> Result<Arc<dyn ExecutionPlan>> {
2501 let input_plan = self.plan_internal(input, all_properties)?;
2502
2503 let adj_direction = convert_direction(direction);
2504 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
2505
2506 let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
2508
2509 let mut target_properties: Vec<String> = all_properties
2511 .get(target_variable)
2512 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
2513 .unwrap_or_default();
2514
2515 let target_has_wildcard = all_properties
2519 .get(target_variable)
2520 .is_some_and(|p| p.contains("*"));
2521 if (target_has_wildcard || !target_properties.is_empty())
2522 && !target_properties.iter().any(|p| p == "_all_props")
2523 {
2524 target_properties.push("_all_props".to_string());
2525 }
2526 if bound_target_column.is_some() {
2527 target_properties.clear();
2529 }
2530
2531 let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
2533 let edge_var_name = step_variable.unwrap_or("__anon_edge");
2534 crate::query::pushdown::LanceFilterGenerator::generate(
2535 std::slice::from_ref(expr),
2536 edge_var_name,
2537 None,
2538 )
2539 });
2540
2541 let edge_property_conditions = edge_filter_expr
2543 .map(Self::extract_edge_property_conditions)
2544 .unwrap_or_default();
2545
2546 let used_edge_columns =
2548 Self::collect_used_edge_columns(&input_plan.schema(), scope_match_variables, None);
2549
2550 let output_mode = if step_variable.is_some() {
2552 crate::query::df_graph::nfa::VlpOutputMode::StepVariable
2553 } else if path_variable.is_some() {
2554 crate::query::df_graph::nfa::VlpOutputMode::FullPath
2555 } else {
2556 crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
2557 };
2558
2559 let traverse_plan = Arc::new(GraphVariableLengthTraverseMainExec::new(
2560 input_plan,
2561 source_col,
2562 type_names.to_vec(),
2563 adj_direction,
2564 min_hops,
2565 max_hops,
2566 target_variable.to_string(),
2567 step_variable.map(|s| s.to_string()),
2568 path_variable.map(|s| s.to_string()),
2569 target_properties,
2570 self.graph_ctx.clone(),
2571 optional,
2572 bound_target_column,
2573 edge_lance_filter,
2574 edge_property_conditions,
2575 used_edge_columns,
2576 path_mode.clone(),
2577 output_mode,
2578 ));
2579
2580 Ok(traverse_plan)
2581 }
2582
2583 #[expect(clippy::too_many_arguments)]
2585 fn plan_shortest_path(
2586 &self,
2587 input: &LogicalPlan,
2588 edge_type_ids: &[u32],
2589 direction: AstDirection,
2590 source_variable: &str,
2591 target_variable: &str,
2592 path_variable: &str,
2593 all_shortest: bool,
2594 all_properties: &HashMap<String, HashSet<String>>,
2595 ) -> Result<Arc<dyn ExecutionPlan>> {
2596 let input_plan = self.plan_internal(input, all_properties)?;
2597
2598 let adj_direction = convert_direction(direction);
2599 let source_col = format!("{}._vid", source_variable);
2600 let target_col = format!("{}._vid", target_variable);
2601
2602 Ok(Arc::new(GraphShortestPathExec::new(
2603 input_plan,
2604 source_col,
2605 target_col,
2606 edge_type_ids.to_vec(),
2607 adj_direction,
2608 path_variable.to_string(),
2609 self.graph_ctx.clone(),
2610 all_shortest,
2611 )))
2612 }
2613
2614 fn plan_filter(
2619 &self,
2620 input: &LogicalPlan,
2621 predicate: &Expr,
2622 optional_variables: &HashSet<String>,
2623 all_properties: &HashMap<String, HashSet<String>>,
2624 ) -> Result<Arc<dyn ExecutionPlan>> {
2625 let input_plan = self.plan_internal(input, all_properties)?;
2626 let schema = input_plan.schema();
2627
2628 let ctx = self.translation_context_for_plan(input);
2631 let session = self.session_ctx.read();
2632 let state = session.state();
2633 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
2634 &state,
2635 Some(&ctx),
2636 )
2637 .with_subquery_ctx(
2638 self.graph_ctx.clone(),
2639 self.schema.clone(),
2640 self.session_ctx.clone(),
2641 self.storage.clone(),
2642 self.params.clone(),
2643 );
2644 let physical_predicate = compiler.compile(predicate, &schema)?;
2645
2646 if !optional_variables.is_empty() {
2648 return Ok(Arc::new(OptionalFilterExec::new(
2649 input_plan,
2650 physical_predicate,
2651 optional_variables.clone(),
2652 )));
2653 }
2654
2655 Ok(Arc::new(FilterExec::try_new(
2656 physical_predicate,
2657 input_plan,
2658 )?))
2659 }
2660
2661 fn plan_project_with_aliases(
2663 &self,
2664 input: &LogicalPlan,
2665 projections: &[(Expr, Option<String>)],
2666 all_properties: &HashMap<String, HashSet<String>>,
2667 alias_map: &HashMap<String, Expr>,
2668 ) -> Result<Arc<dyn ExecutionPlan>> {
2669 let input_plan = self.plan_internal_with_aliases(input, all_properties, alias_map)?;
2671 self.plan_project_from_input(input_plan, projections, Some(input))
2672 }
2673
2674 fn plan_project_from_input(
2676 &self,
2677 input_plan: Arc<dyn ExecutionPlan>,
2678 projections: &[(Expr, Option<String>)],
2679 context_plan: Option<&LogicalPlan>,
2680 ) -> Result<Arc<dyn ExecutionPlan>> {
2681 let schema = input_plan.schema();
2682
2683 let session = self.session_ctx.read();
2684 let state = session.state();
2685
2686 let ctx = context_plan.map(|p| self.translation_context_for_plan(p));
2688
2689 let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
2690
2691 for (expr, alias) in projections {
2692 if let Expr::Variable(var_name) = expr {
2698 if schema.column_with_name(var_name).is_some() {
2699 let (col_idx, _) = schema.column_with_name(var_name).unwrap();
2700 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2701 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
2702 );
2703 let name = alias.clone().unwrap_or_else(|| var_name.clone());
2704 exprs.push((col_expr, name));
2705
2706 let vid_col = format!("{}._vid", var_name);
2708 let labels_col = format!("{}._labels", var_name);
2709 if let Some((vi, _)) = schema.column_with_name(&vid_col) {
2710 let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2711 datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
2712 );
2713 exprs.push((ve, vid_col.clone()));
2714 }
2715 if let Some((li, _)) = schema.column_with_name(&labels_col) {
2716 let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2717 datafusion::physical_expr::expressions::Column::new(&labels_col, li),
2718 );
2719 exprs.push((le, labels_col.clone()));
2720 }
2721
2722 let prefix = format!("{}.", var_name);
2725 for (idx, field) in schema.fields().iter().enumerate() {
2726 let fname = field.name();
2727 if fname.starts_with(&prefix)
2728 && fname != &vid_col
2729 && fname != &labels_col
2730 && !exprs.iter().any(|(_, n)| n == fname)
2731 {
2732 let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2733 Arc::new(datafusion::physical_expr::expressions::Column::new(
2734 fname, idx,
2735 ));
2736 exprs.push((prop_expr, fname.clone()));
2737 }
2738 }
2739 continue;
2740 }
2741
2742 let prefix = format!("{}.", var_name);
2745 let expanded_fields: Vec<(usize, String)> = schema
2746 .fields()
2747 .iter()
2748 .enumerate()
2749 .filter(|(_, f)| f.name().starts_with(&prefix))
2750 .map(|(i, f)| (i, f.name().clone()))
2751 .collect();
2752
2753 if !expanded_fields.is_empty() {
2754 use datafusion::functions::expr_fn::named_struct;
2755 use datafusion::logical_expr::lit;
2756
2757 let mut struct_args = Vec::new();
2759 for (_, field_name) in &expanded_fields {
2760 let prop_name = &field_name[prefix.len()..];
2761 struct_args.push(lit(prop_name.to_string()));
2762 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
2764 field_name.as_str(),
2765 )));
2766 }
2767
2768 let struct_expr = named_struct(struct_args);
2769 let df_schema =
2770 datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
2771 let session = self.session_ctx.read();
2772 let state_ref = session.state();
2773 let resolved_expr = Self::resolve_udfs(&struct_expr, &state_ref)?;
2774
2775 use datafusion::physical_planner::PhysicalPlanner;
2776 let phys_planner =
2777 datafusion::physical_planner::DefaultPhysicalPlanner::default();
2778 let physical_struct_expr = phys_planner.create_physical_expr(
2779 &resolved_expr,
2780 &df_schema,
2781 &state_ref,
2782 )?;
2783
2784 let name = alias.clone().unwrap_or_else(|| var_name.clone());
2785 exprs.push((physical_struct_expr, name));
2786
2787 let vid_col = format!("{}._vid", var_name);
2789 let labels_col = format!("{}._labels", var_name);
2790 if let Some((vi, _)) = schema.column_with_name(&vid_col) {
2791 let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2792 datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
2793 );
2794 exprs.push((ve, vid_col.clone()));
2795 }
2796 if let Some((li, _)) = schema.column_with_name(&labels_col) {
2797 let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2798 datafusion::physical_expr::expressions::Column::new(&labels_col, li),
2799 );
2800 exprs.push((le, labels_col.clone()));
2801 }
2802
2803 for (idx, field) in schema.fields().iter().enumerate() {
2806 let fname = field.name();
2807 if fname.starts_with(&prefix)
2808 && fname != &vid_col
2809 && fname != &labels_col
2810 && !exprs.iter().any(|(_, n)| n == fname)
2811 {
2812 let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2813 Arc::new(datafusion::physical_expr::expressions::Column::new(
2814 fname, idx,
2815 ));
2816 exprs.push((prop_expr, fname.clone()));
2817 }
2818 }
2819 continue;
2820 }
2821 }
2823
2824 if matches!(expr, Expr::Wildcard) {
2826 for (col_idx, field) in schema.fields().iter().enumerate() {
2827 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
2828 datafusion::physical_expr::expressions::Column::new(field.name(), col_idx),
2829 );
2830 exprs.push((col_expr, field.name().clone()));
2831 }
2832 continue;
2833 }
2834
2835 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
2836 &state,
2837 ctx.as_ref(),
2838 )
2839 .with_subquery_ctx(
2840 self.graph_ctx.clone(),
2841 self.schema.clone(),
2842 self.session_ctx.clone(),
2843 self.storage.clone(),
2844 self.params.clone(),
2845 );
2846 let physical_expr = compiler.compile(expr, &schema)?;
2847
2848 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
2849 exprs.push((physical_expr, name));
2850 }
2851
2852 Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
2853 }
2854
2855 fn plan_locy_project(
2861 &self,
2862 input: &LogicalPlan,
2863 projections: &[(Expr, Option<String>)],
2864 target_types: &[DataType],
2865 all_properties: &HashMap<String, HashSet<String>>,
2866 ) -> Result<Arc<dyn ExecutionPlan>> {
2867 use datafusion::physical_expr::expressions::Column;
2868
2869 let input_plan = self.plan_internal(input, all_properties)?;
2870 let schema = input_plan.schema();
2871
2872 let session = self.session_ctx.read();
2873 let state = session.state();
2874
2875 let ctx = self.translation_context_for_plan(input);
2876
2877 let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
2878
2879 for (i, (expr, alias)) in projections.iter().enumerate() {
2880 let target_type = target_types.get(i);
2881
2882 if let Expr::Variable(var_name) = expr {
2884 let vid_col_name = format!("{}._vid", var_name);
2886 let vid_col_match = schema
2887 .fields()
2888 .iter()
2889 .enumerate()
2890 .find(|(_, f)| f.name() == &vid_col_name);
2891
2892 if let Some((vid_idx, _)) = vid_col_match {
2893 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2895 Arc::new(Column::new(&vid_col_name, vid_idx));
2896 let name = alias.clone().unwrap_or_else(|| var_name.clone());
2897 exprs.push((col_expr, name));
2898 continue;
2899 }
2900
2901 if let Some((col_idx, _)) = schema.column_with_name(var_name) {
2903 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
2904 Arc::new(Column::new(var_name, col_idx));
2905 let name = alias.clone().unwrap_or_else(|| var_name.clone());
2906 exprs.push((col_expr, name));
2907 continue;
2908 }
2909 }
2911
2912 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
2914 &state,
2915 Some(&ctx),
2916 )
2917 .with_subquery_ctx(
2918 self.graph_ctx.clone(),
2919 self.schema.clone(),
2920 self.session_ctx.clone(),
2921 self.storage.clone(),
2922 self.params.clone(),
2923 );
2924 let physical_expr = compiler.compile(expr, &schema)?;
2925
2926 let physical_expr = if let Some(target_dt) = target_type {
2931 let actual_dt = physical_expr
2932 .data_type(schema.as_ref())
2933 .unwrap_or(DataType::LargeUtf8);
2934 let is_string = |dt: &DataType| matches!(dt, DataType::Utf8 | DataType::LargeUtf8);
2935 let is_numeric = |dt: &DataType| {
2936 matches!(dt, DataType::Int64 | DataType::Float64 | DataType::UInt64)
2937 };
2938 let cross_domain = (is_string(&actual_dt) && is_numeric(target_dt))
2939 || (is_numeric(&actual_dt) && is_string(target_dt));
2940 if actual_dt != *target_dt && !cross_domain {
2941 coerce_physical_expr(physical_expr, &actual_dt, target_dt, schema.as_ref())
2942 } else {
2943 physical_expr
2944 }
2945 } else {
2946 physical_expr
2947 };
2948
2949 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
2950 exprs.push((physical_expr, name));
2951 }
2952
2953 Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
2954 }
2955
2956 fn plan_aggregate(
2958 &self,
2959 input: &LogicalPlan,
2960 group_by: &[Expr],
2961 aggregates: &[Expr],
2962 all_properties: &HashMap<String, HashSet<String>>,
2963 ) -> Result<Arc<dyn ExecutionPlan>> {
2964 let input_plan = self.plan_internal(input, all_properties)?;
2965 let schema = input_plan.schema();
2966
2967 let session = self.session_ctx.read();
2968 let state = session.state();
2969
2970 let ctx = self.translation_context_for_plan(input);
2972
2973 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
2975 let mut group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
2976 Vec::new();
2977 for expr in group_by {
2978 let name = expr.to_string_repr();
2979
2980 if let Expr::Variable(var_name) = expr
2985 && schema.column_with_name(var_name).is_none()
2986 {
2987 let prefix = format!("{}.", var_name);
2988 let has_expanded = schema
2989 .fields()
2990 .iter()
2991 .any(|f| f.name().starts_with(&prefix));
2992 if has_expanded {
2993 continue;
2994 }
2995 }
2996
2997 let physical_expr = if CypherPhysicalExprCompiler::contains_custom_expr(expr) {
2998 let compiler = CypherPhysicalExprCompiler::new(&state, Some(&ctx))
3001 .with_subquery_ctx(
3002 self.graph_ctx.clone(),
3003 self.schema.clone(),
3004 self.session_ctx.clone(),
3005 self.storage.clone(),
3006 self.params.clone(),
3007 );
3008 compiler.compile(expr, &schema)?
3009 } else {
3010 let df_schema_ref =
3013 datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
3014 let df_expr = cypher_expr_to_df(expr, Some(&ctx))?;
3015 let df_expr = Self::resolve_udfs(&df_expr, &state)?;
3016 let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema_ref)?;
3017 let mut df_expr = Self::resolve_udfs(&df_expr, &state)?;
3018 if let Ok(expr_type) = df_expr.get_type(&df_schema_ref) {
3019 if uni_common::core::schema::is_datetime_struct(&expr_type) {
3020 df_expr = crate::query::df_expr::extract_datetime_nanos(df_expr);
3022 } else if uni_common::core::schema::is_time_struct(&expr_type) {
3023 df_expr = crate::query::df_expr::extract_time_nanos(df_expr);
3026 }
3027 }
3028
3029 create_physical_expr(&df_expr, &df_schema_ref, state.execution_props())?
3031 };
3032 group_exprs.push((physical_expr, name));
3033 }
3034
3035 for expr in group_by {
3041 if let Expr::Variable(var_name) = expr
3042 && matches!(
3043 ctx.variable_kinds.get(var_name),
3044 Some(VariableKind::Node) | Some(VariableKind::Edge)
3045 )
3046 {
3047 let prefix = format!("{}.", var_name);
3048 for (idx, field) in schema.fields().iter().enumerate() {
3049 if field.name().starts_with(&prefix) {
3050 let prop_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
3051 datafusion::physical_expr::expressions::Column::new(field.name(), idx),
3052 );
3053 group_exprs.push((prop_col, field.name().clone()));
3054 }
3055 }
3056 }
3057 }
3058
3059 let physical_group_by = PhysicalGroupBy::new_single(group_exprs);
3060
3061 let (input_plan, schema, rewritten_aggregates) =
3063 self.precompute_custom_aggregate_args(input_plan, &schema, aggregates, &state, &ctx)?;
3064
3065 let (aggr_exprs, filter_exprs): (Vec<_>, Vec<_>) = self
3068 .translate_aggregates(&rewritten_aggregates, &schema, &state, &ctx)?
3069 .into_iter()
3070 .unzip();
3071 let num_aggregates = aggr_exprs.len();
3072
3073 let agg_exec = Arc::new(AggregateExec::try_new(
3074 AggregateMode::Single,
3075 physical_group_by,
3076 aggr_exprs,
3077 filter_exprs,
3078 input_plan,
3079 schema,
3080 )?);
3081
3082 let agg_schema = agg_exec.schema();
3086 let num_group_by = agg_schema.fields().len() - num_aggregates;
3089 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
3090 Vec::new();
3091
3092 for (i, field) in agg_schema.fields().iter().enumerate() {
3093 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
3094 datafusion::physical_expr::expressions::Column::new(field.name(), i),
3095 );
3096 let name = if i >= num_group_by {
3097 aggregate_column_name(&aggregates[i - num_group_by])
3099 } else {
3100 field.name().clone()
3101 };
3102 proj_exprs.push((col_expr, name));
3103 }
3104
3105 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, agg_exec)?))
3106 }
3107
3108 fn wrap_temporal_sort_key(
3113 arg: datafusion::logical_expr::Expr,
3114 schema: &SchemaRef,
3115 ) -> Result<datafusion::logical_expr::Expr> {
3116 use datafusion::logical_expr::ScalarUDF;
3117 if let Ok(arg_type) = arg.get_type(&datafusion::common::DFSchema::try_from(
3118 schema.as_ref().clone(),
3119 )?) {
3120 if uni_common::core::schema::is_datetime_struct(&arg_type) {
3121 return Ok(datafusion::logical_expr::Expr::ScalarFunction(
3122 datafusion::logical_expr::expr::ScalarFunction::new_udf(
3123 Arc::new(ScalarUDF::from(
3124 datafusion::functions::core::getfield::GetFieldFunc::new(),
3125 )),
3126 vec![arg, datafusion::logical_expr::lit("nanos_since_epoch")],
3127 ),
3128 ));
3129 } else if uni_common::core::schema::is_time_struct(&arg_type) {
3130 return Ok(datafusion::logical_expr::Expr::ScalarFunction(
3131 datafusion::logical_expr::expr::ScalarFunction::new_udf(
3132 Arc::new(ScalarUDF::from(
3133 datafusion::functions::core::getfield::GetFieldFunc::new(),
3134 )),
3135 vec![arg, datafusion::logical_expr::lit("nanos_since_midnight")],
3136 ),
3137 ));
3138 }
3139 }
3140 Ok(arg)
3141 }
3142
3143 fn translate_aggregates(
3145 &self,
3146 aggregates: &[Expr],
3147 schema: &SchemaRef,
3148 state: &SessionState,
3149 ctx: &TranslationContext,
3150 ) -> Result<Vec<PhysicalAggregate>> {
3151 use datafusion::functions_aggregate::expr_fn::{avg, count, max, min, sum};
3152
3153 let mut result: Vec<PhysicalAggregate> = Vec::new();
3154
3155 for agg_expr in aggregates {
3156 let Expr::FunctionCall {
3157 name,
3158 args,
3159 distinct,
3160 ..
3161 } = agg_expr
3162 else {
3163 return Err(anyhow!("Expected aggregate function, got: {:?}", agg_expr));
3164 };
3165
3166 let name_lower = name.to_lowercase();
3167
3168 let get_arg = || -> Result<DfExpr> {
3170 if args.is_empty() {
3171 return Err(anyhow!("{}() requires an argument", name_lower));
3172 }
3173 cypher_expr_to_df(&args[0], Some(ctx))
3174 };
3175
3176 let df_agg = match name_lower.as_str() {
3177 "count" if args.is_empty() => count(datafusion::logical_expr::lit(1)),
3178 "count" => {
3179 if matches!(args.first(), Some(uni_cypher::ast::Expr::Wildcard)) {
3185 count(datafusion::logical_expr::lit(1))
3186 } else if matches!(args.first(), Some(uni_cypher::ast::Expr::Variable(_))) {
3187 if *distinct {
3188 count(get_arg()?)
3189 } else {
3190 count(datafusion::logical_expr::lit(1))
3191 }
3192 } else {
3193 count(get_arg()?)
3194 }
3195 }
3196 "sum" => {
3197 let arg = get_arg()?;
3198 if self.is_large_binary_col(&arg, schema) {
3199 let udaf = Arc::new(crate::query::df_udfs::create_cypher_sum_udaf());
3200 udaf.call(vec![arg])
3201 } else {
3202 use datafusion::logical_expr::Cast;
3205 let is_float = if let DfExpr::Column(col) = &arg
3206 && let Ok(field) = schema.field_with_name(&col.name)
3207 {
3208 matches!(
3209 field.data_type(),
3210 datafusion::arrow::datatypes::DataType::Float32
3211 | datafusion::arrow::datatypes::DataType::Float64
3212 )
3213 } else {
3214 false
3215 };
3216 if is_float {
3217 sum(DfExpr::Cast(Cast::new(
3218 Box::new(arg),
3219 datafusion::arrow::datatypes::DataType::Float64,
3220 )))
3221 } else {
3222 sum(DfExpr::Cast(Cast::new(
3223 Box::new(arg),
3224 datafusion::arrow::datatypes::DataType::Int64,
3225 )))
3226 }
3227 }
3228 }
3229 "avg" => {
3230 let arg = get_arg()?;
3231 if self.is_large_binary_col(&arg, schema) {
3232 let coerced = crate::query::df_udfs::cypher_to_float64_expr(arg);
3233 avg(coerced)
3234 } else {
3235 use datafusion::logical_expr::Cast;
3236 avg(DfExpr::Cast(Cast::new(
3237 Box::new(arg),
3238 datafusion::arrow::datatypes::DataType::Float64,
3239 )))
3240 }
3241 }
3242 "min" => {
3243 let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
3245
3246 if self.is_large_binary_col(&arg, schema) {
3247 let udaf = Arc::new(crate::query::df_udfs::create_cypher_min_udaf());
3248 udaf.call(vec![arg])
3249 } else {
3250 min(arg)
3251 }
3252 }
3253 "max" => {
3254 let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
3256
3257 if self.is_large_binary_col(&arg, schema) {
3258 let udaf = Arc::new(crate::query::df_udfs::create_cypher_max_udaf());
3259 udaf.call(vec![arg])
3260 } else {
3261 max(arg)
3262 }
3263 }
3264 "percentiledisc" => {
3265 if args.len() != 2 {
3266 return Err(anyhow!("percentileDisc() requires exactly 2 arguments"));
3267 }
3268 let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
3269 let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
3270 let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
3271 let udaf =
3272 Arc::new(crate::query::df_udfs::create_cypher_percentile_disc_udaf());
3273 udaf.call(vec![coerced, pct_arg])
3274 }
3275 "percentilecont" => {
3276 if args.len() != 2 {
3277 return Err(anyhow!("percentileCont() requires exactly 2 arguments"));
3278 }
3279 let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
3280 let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
3281 let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
3282 let udaf =
3283 Arc::new(crate::query::df_udfs::create_cypher_percentile_cont_udaf());
3284 udaf.call(vec![coerced, pct_arg])
3285 }
3286 "collect" => {
3287 let arg = get_arg()?;
3290 crate::query::df_udfs::create_cypher_collect_expr(arg, *distinct)
3291 }
3292 "btic_min" => {
3293 let arg = get_arg()?;
3294 let udaf = Arc::new(crate::query::df_udfs::create_btic_min_udaf());
3295 udaf.call(vec![arg])
3296 }
3297 "btic_max" => {
3298 let arg = get_arg()?;
3299 let udaf = Arc::new(crate::query::df_udfs::create_btic_max_udaf());
3300 udaf.call(vec![arg])
3301 }
3302 "btic_span_agg" => {
3303 let arg = get_arg()?;
3304 let udaf = Arc::new(crate::query::df_udfs::create_btic_span_agg_udaf());
3305 udaf.call(vec![arg])
3306 }
3307 "btic_count_at" => {
3308 if args.len() != 2 {
3309 return Err(anyhow!("btic_count_at() requires exactly 2 arguments"));
3310 }
3311 let btic_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
3312 let point_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
3313 let udaf = Arc::new(crate::query::df_udfs::create_btic_count_at_udaf());
3314 udaf.call(vec![btic_arg, point_arg])
3315 }
3316 _ => return Err(anyhow!("Unsupported aggregate function: {}", name)),
3317 };
3318
3319 let df_agg = if *distinct
3321 && !matches!(
3322 name_lower.as_str(),
3323 "collect" | "percentiledisc" | "percentilecont"
3324 ) {
3325 use datafusion::prelude::ExprFunctionExt;
3326 df_agg.distinct().build().map_err(|e| anyhow!("{}", e))?
3327 } else {
3328 df_agg
3329 };
3330
3331 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
3333 let df_agg = Self::resolve_udfs(&df_agg, state)?;
3334 let df_agg = crate::query::df_expr::apply_type_coercion(&df_agg, &df_schema)?;
3335 let df_agg = Self::resolve_udfs(&df_agg, state)?;
3336
3337 let agg_and_filter = self.create_physical_aggregate(&df_agg, schema, state)?;
3339 result.push(agg_and_filter);
3340 }
3341
3342 Ok(result)
3343 }
3344
3345 fn precompute_custom_aggregate_args(
3351 &self,
3352 input_plan: Arc<dyn ExecutionPlan>,
3353 schema: &SchemaRef,
3354 aggregates: &[Expr],
3355 state: &SessionState,
3356 ctx: &TranslationContext,
3357 ) -> Result<(Arc<dyn ExecutionPlan>, SchemaRef, Vec<Expr>)> {
3358 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
3359
3360 let mut needs_projection = false;
3361 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
3362 Vec::new();
3363 let mut rewritten_aggregates = Vec::new();
3364 let mut col_counter = 0;
3365
3366 for (i, field) in schema.fields().iter().enumerate() {
3368 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
3369 datafusion::physical_expr::expressions::Column::new(field.name(), i),
3370 );
3371 proj_exprs.push((col_expr, field.name().clone()));
3372 }
3373
3374 for agg_expr in aggregates {
3376 let Expr::FunctionCall {
3377 name,
3378 args,
3379 distinct,
3380 window_spec,
3381 } = agg_expr
3382 else {
3383 rewritten_aggregates.push(agg_expr.clone());
3384 continue;
3385 };
3386
3387 let mut rewritten_args = Vec::new();
3388 let mut agg_needs_rewrite = false;
3389
3390 for arg in args {
3391 if CypherPhysicalExprCompiler::contains_custom_expr(arg) {
3392 let compiler = CypherPhysicalExprCompiler::new(state, Some(ctx))
3394 .with_subquery_ctx(
3395 self.graph_ctx.clone(),
3396 self.schema.clone(),
3397 self.session_ctx.clone(),
3398 self.storage.clone(),
3399 self.params.clone(),
3400 );
3401 let physical_expr = compiler.compile(arg, schema)?;
3402
3403 let col_name = format!("__pc_{}", col_counter);
3405 col_counter += 1;
3406 proj_exprs.push((physical_expr, col_name.clone()));
3407
3408 rewritten_args.push(Expr::Variable(col_name));
3410 agg_needs_rewrite = true;
3411 needs_projection = true;
3412 } else {
3413 rewritten_args.push(arg.clone());
3414 }
3415 }
3416
3417 if agg_needs_rewrite {
3418 rewritten_aggregates.push(Expr::FunctionCall {
3419 name: name.clone(),
3420 args: rewritten_args,
3421 distinct: *distinct,
3422 window_spec: window_spec.clone(),
3423 });
3424 } else {
3425 rewritten_aggregates.push(agg_expr.clone());
3426 }
3427 }
3428
3429 if needs_projection {
3430 let projection_exec = Arc::new(
3431 datafusion::physical_plan::projection::ProjectionExec::try_new(
3432 proj_exprs, input_plan,
3433 )?,
3434 );
3435 let new_schema = projection_exec.schema();
3436 Ok((projection_exec, new_schema, rewritten_aggregates))
3437 } else {
3438 Ok((input_plan, schema.clone(), aggregates.to_vec()))
3439 }
3440 }
3441
3442 fn plan_sort(
3449 &self,
3450 input: &LogicalPlan,
3451 order_by: &[SortItem],
3452 all_properties: &HashMap<String, HashSet<String>>,
3453 alias_map: &HashMap<String, Expr>,
3454 ) -> Result<Arc<dyn ExecutionPlan>> {
3455 let input_plan = self.plan_internal(input, all_properties)?;
3456 let schema = input_plan.schema();
3457
3458 let session = self.session_ctx.read();
3459
3460 let ctx = self.translation_context_for_plan(input);
3462
3463 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
3465
3466 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
3470
3471 let mut df_sort_exprs = Vec::new();
3472 let mut custom_physical_overrides: Vec<(
3473 usize,
3474 Arc<dyn datafusion::physical_expr::PhysicalExpr>,
3475 )> = Vec::new();
3476 for item in order_by {
3477 let mut sort_expr = item.expr.clone();
3478
3479 if let Expr::Variable(ref name) = sort_expr {
3482 let col_name = name.as_str();
3484 let exists_in_schema = schema.fields().iter().any(|f| f.name() == col_name);
3485
3486 if !exists_in_schema && let Some(aliased_expr) = alias_map.get(col_name) {
3487 sort_expr = aliased_expr.clone();
3488 }
3489 }
3490
3491 let asc = item.ascending;
3492 let nulls_first = !asc; if CypherPhysicalExprCompiler::contains_custom_expr(&sort_expr) {
3498 let sort_state = session.state();
3499 let compiler = CypherPhysicalExprCompiler::new(&sort_state, Some(&ctx))
3500 .with_subquery_ctx(
3501 self.graph_ctx.clone(),
3502 self.schema.clone(),
3503 self.session_ctx.clone(),
3504 self.storage.clone(),
3505 self.params.clone(),
3506 );
3507 let inner_physical = compiler.compile(&sort_expr, &schema)?;
3508
3509 let first_col = schema
3512 .fields()
3513 .first()
3514 .map(|f| f.name().clone())
3515 .unwrap_or_else(|| "_dummy_".to_string());
3516 let dummy_expr = DfExpr::Column(datafusion::common::Column::from_name(&first_col));
3517 let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
3518 let sort_key_expr = sort_key_udf.call(vec![dummy_expr]);
3519 custom_physical_overrides.push((df_sort_exprs.len(), inner_physical));
3520 df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
3521 continue;
3522 }
3523
3524 let df_expr = cypher_expr_to_df(&sort_expr, Some(&ctx))?;
3525 let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
3526 let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema)?;
3527 let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
3530
3531 let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
3536 let sort_key_expr = sort_key_udf.call(vec![df_expr]);
3537 df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
3538 }
3539
3540 let mut physical_sort_exprs = create_physical_sort_exprs(
3541 &df_sort_exprs,
3542 &df_schema,
3543 session.state().execution_props(),
3544 )?;
3545
3546 for (idx, custom_inner) in custom_physical_overrides {
3550 if idx < physical_sort_exprs.len() {
3551 let phys = &physical_sort_exprs[idx];
3552 let sort_key_udf = Arc::new(crate::query::df_udfs::create_cypher_sort_key_udf());
3556 let config_options = Arc::new(datafusion::config::ConfigOptions::default());
3557 let udf_name = sort_key_udf.name().to_string();
3558 let new_sort_key = datafusion::physical_expr::ScalarFunctionExpr::new(
3559 &udf_name,
3560 sort_key_udf,
3561 vec![custom_inner],
3562 Arc::new(arrow_schema::Field::new(
3563 "_cypher_sort_key",
3564 DataType::LargeBinary,
3565 true,
3566 )),
3567 config_options,
3568 );
3569 physical_sort_exprs[idx] = datafusion::physical_expr::PhysicalSortExpr {
3570 expr: Arc::new(new_sort_key),
3571 options: phys.options,
3572 };
3573 }
3574 }
3575
3576 let lex_ordering = datafusion::physical_expr::LexOrdering::new(physical_sort_exprs)
3579 .ok_or_else(|| anyhow!("ORDER BY must have at least one sort expression"))?;
3580
3581 Ok(Arc::new(SortExec::new(lex_ordering, input_plan)))
3582 }
3583
3584 fn plan_limit(
3586 &self,
3587 input: &LogicalPlan,
3588 skip: Option<usize>,
3589 fetch: Option<usize>,
3590 all_properties: &HashMap<String, HashSet<String>>,
3591 ) -> Result<Arc<dyn ExecutionPlan>> {
3592 let input_plan = self.plan_internal(input, all_properties)?;
3593
3594 if let Some(offset) = skip.filter(|&s| s > 0) {
3596 use datafusion::physical_plan::limit::GlobalLimitExec;
3597 return Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, fetch)));
3598 }
3599
3600 if let Some(limit) = fetch {
3601 Ok(Arc::new(LocalLimitExec::new(input_plan, limit)))
3602 } else {
3603 Ok(input_plan)
3605 }
3606 }
3607
3608 fn plan_union(
3610 &self,
3611 left: &LogicalPlan,
3612 right: &LogicalPlan,
3613 all: bool,
3614 all_properties: &HashMap<String, HashSet<String>>,
3615 ) -> Result<Arc<dyn ExecutionPlan>> {
3616 let left_plan = self.plan_internal(left, all_properties)?;
3617 let right_plan = self.plan_internal(right, all_properties)?;
3618
3619 let union_plan = UnionExec::try_new(vec![left_plan, right_plan])?;
3620
3621 if !all {
3623 use datafusion::physical_plan::aggregates::{
3624 AggregateExec, AggregateMode, PhysicalGroupBy,
3625 };
3626 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3627
3628 let coalesced = Arc::new(CoalescePartitionsExec::new(union_plan));
3630
3631 let schema = coalesced.schema();
3633 let group_by_exprs: Vec<_> = (0..schema.fields().len())
3634 .map(|i| {
3635 (
3636 Arc::new(datafusion::physical_plan::expressions::Column::new(
3637 schema.field(i).name(),
3638 i,
3639 ))
3640 as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
3641 schema.field(i).name().clone(),
3642 )
3643 })
3644 .collect();
3645
3646 let group_by = PhysicalGroupBy::new_single(group_by_exprs);
3647
3648 Ok(Arc::new(AggregateExec::try_new(
3649 AggregateMode::Single,
3650 group_by,
3651 vec![], vec![], coalesced,
3654 schema,
3655 )?))
3656 } else {
3657 Ok(union_plan)
3659 }
3660 }
3661
3662 fn plan_window_functions(
3668 &self,
3669 input: Arc<dyn ExecutionPlan>,
3670 window_exprs: &[Expr],
3671 context_plan: Option<&LogicalPlan>,
3672 ) -> Result<Arc<dyn ExecutionPlan>> {
3673 use datafusion::functions_aggregate::average::avg_udaf;
3674 use datafusion::functions_aggregate::count::count_udaf;
3675 use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
3676 use datafusion::functions_aggregate::sum::sum_udaf;
3677 use datafusion::functions_window::lead_lag::{lag_udwf, lead_udwf};
3678 use datafusion::functions_window::nth_value::{
3679 first_value_udwf, last_value_udwf, nth_value_udwf,
3680 };
3681 use datafusion::functions_window::ntile::ntile_udwf;
3682 use datafusion::functions_window::rank::{dense_rank_udwf, rank_udwf};
3683 use datafusion::functions_window::row_number::row_number_udwf;
3684 use datafusion::logical_expr::{WindowFrame, WindowFunctionDefinition};
3685 use datafusion::physical_expr::LexOrdering;
3686 use datafusion::physical_plan::sorts::sort::SortExec;
3687 use datafusion::physical_plan::windows::{WindowAggExec, create_window_expr};
3688
3689 let input_schema = input.schema();
3690 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
3691
3692 let session = self.session_ctx.read();
3693 let state = session.state();
3694
3695 let tx_ctx = context_plan.map(|p| self.translation_context_for_plan(p));
3697 let mut window_expr_list = Vec::new();
3698
3699 for expr in window_exprs {
3700 let Expr::FunctionCall {
3701 name,
3702 args,
3703 distinct,
3704 window_spec: Some(window_spec),
3705 } = expr
3706 else {
3707 return Err(anyhow!("Expected window function call with OVER clause"));
3708 };
3709
3710 let name_lower = name.to_lowercase();
3711
3712 let (window_fn_def, is_aggregate) = match name_lower.as_str() {
3714 "count" => (WindowFunctionDefinition::AggregateUDF(count_udaf()), true),
3716 "sum" => (WindowFunctionDefinition::AggregateUDF(sum_udaf()), true),
3717 "avg" => (WindowFunctionDefinition::AggregateUDF(avg_udaf()), true),
3718 "min" => (WindowFunctionDefinition::AggregateUDF(min_udaf()), true),
3719 "max" => (WindowFunctionDefinition::AggregateUDF(max_udaf()), true),
3720 "row_number" => (
3722 WindowFunctionDefinition::WindowUDF(row_number_udwf()),
3723 false,
3724 ),
3725 "rank" => (WindowFunctionDefinition::WindowUDF(rank_udwf()), false),
3726 "dense_rank" => (
3727 WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
3728 false,
3729 ),
3730 "lag" => (WindowFunctionDefinition::WindowUDF(lag_udwf()), false),
3731 "lead" => (WindowFunctionDefinition::WindowUDF(lead_udwf()), false),
3732 "ntile" => {
3733 if let Some(Expr::Literal(CypherLiteral::Integer(n))) = args.first()
3735 && *n <= 0
3736 {
3737 return Err(anyhow!("NTILE bucket count must be positive, got: {}", n));
3738 }
3739 (WindowFunctionDefinition::WindowUDF(ntile_udwf()), false)
3740 }
3741 "first_value" => (
3742 WindowFunctionDefinition::WindowUDF(first_value_udwf()),
3743 false,
3744 ),
3745 "last_value" => (
3746 WindowFunctionDefinition::WindowUDF(last_value_udwf()),
3747 false,
3748 ),
3749 "nth_value" => (WindowFunctionDefinition::WindowUDF(nth_value_udwf()), false),
3750 other => return Err(anyhow!("Unsupported window function: {}", other)),
3751 };
3752
3753 let physical_args: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
3755 if args.is_empty() || matches!(args.as_slice(), [Expr::Wildcard]) {
3756 if is_aggregate {
3758 vec![create_physical_expr(
3759 &datafusion::logical_expr::lit(1),
3760 &df_schema,
3761 state.execution_props(),
3762 )?]
3763 } else {
3764 vec![]
3766 }
3767 } else {
3768 args.iter()
3769 .map(|arg| {
3770 let mut df_expr = cypher_expr_to_df(arg, tx_ctx.as_ref())?;
3771
3772 if is_aggregate {
3775 let cast_type = match name_lower.as_str() {
3776 "sum" => Some(datafusion::arrow::datatypes::DataType::Int64),
3777 "avg" => Some(datafusion::arrow::datatypes::DataType::Float64),
3778 _ => None,
3779 };
3780 if let Some(target_type) = cast_type {
3781 df_expr = DfExpr::Cast(datafusion::logical_expr::Cast::new(
3782 Box::new(df_expr),
3783 target_type,
3784 ));
3785 }
3786 }
3787
3788 create_physical_expr(&df_expr, &df_schema, state.execution_props())
3789 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
3790 })
3791 .collect::<Result<Vec<_>>>()?
3792 };
3793
3794 let partition_by_physical: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
3796 window_spec
3797 .partition_by
3798 .iter()
3799 .map(|e| {
3800 let df_expr = cypher_expr_to_df(e, tx_ctx.as_ref())?;
3801 create_physical_expr(&df_expr, &df_schema, state.execution_props())
3802 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
3803 })
3804 .collect::<Result<Vec<_>>>()?;
3805
3806 let mut order_by_physical: Vec<datafusion::physical_expr::PhysicalSortExpr> =
3808 window_spec
3809 .order_by
3810 .iter()
3811 .map(|sort_item| {
3812 let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
3813 let physical_expr =
3814 create_physical_expr(&df_expr, &df_schema, state.execution_props())
3815 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))?;
3816 Ok(datafusion::physical_expr::PhysicalSortExpr {
3817 expr: physical_expr,
3818 options: datafusion::arrow::compute::SortOptions {
3819 descending: !sort_item.ascending,
3820 nulls_first: !sort_item.ascending, },
3822 })
3823 })
3824 .collect::<Result<Vec<_>>>()?;
3825
3826 if order_by_physical.is_empty() && !partition_by_physical.is_empty() {
3829 for partition_expr in &partition_by_physical {
3830 order_by_physical.push(datafusion::physical_expr::PhysicalSortExpr {
3831 expr: Arc::clone(partition_expr),
3832 options: datafusion::arrow::compute::SortOptions {
3833 descending: false,
3834 nulls_first: false,
3835 },
3836 });
3837 }
3838 }
3839
3840 let window_frame = if is_aggregate {
3845 if window_spec.order_by.is_empty() {
3846 use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
3848 Arc::new(WindowFrame::new_bounds(
3849 WindowFrameUnits::Rows,
3850 WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
3851 WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
3852 ))
3853 } else {
3854 Arc::new(WindowFrame::new(Some(false)))
3856 }
3857 } else {
3858 use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
3860 Arc::new(WindowFrame::new_bounds(
3861 WindowFrameUnits::Rows,
3862 WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
3863 WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
3864 ))
3865 };
3866
3867 let alias = expr.to_string_repr();
3869
3870 let window_expr = create_window_expr(
3872 &window_fn_def,
3873 alias,
3874 &physical_args,
3875 &partition_by_physical,
3876 &order_by_physical,
3877 window_frame,
3878 input_schema.clone(),
3879 false, *distinct,
3881 None, )?;
3883
3884 window_expr_list.push(window_expr);
3885 }
3886
3887 let mut sort_exprs = Vec::new();
3890
3891 for expr in window_exprs {
3893 if let Expr::FunctionCall {
3894 window_spec: Some(window_spec),
3895 ..
3896 } = expr
3897 {
3898 for partition_expr in &window_spec.partition_by {
3899 let df_expr = cypher_expr_to_df(partition_expr, tx_ctx.as_ref())?;
3900 let physical_expr =
3901 create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
3902
3903 if !sort_exprs
3906 .iter()
3907 .any(|s: &datafusion::physical_expr::PhysicalSortExpr| {
3908 s.expr.to_string() == physical_expr.to_string()
3909 })
3910 {
3911 sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
3912 expr: physical_expr,
3913 options: datafusion::arrow::compute::SortOptions {
3914 descending: false,
3915 nulls_first: false,
3916 },
3917 });
3918 }
3919 }
3920
3921 for sort_item in &window_spec.order_by {
3923 let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
3924 let physical_expr =
3925 create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
3926
3927 sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
3928 expr: physical_expr,
3929 options: datafusion::arrow::compute::SortOptions {
3930 descending: !sort_item.ascending,
3931 nulls_first: !sort_item.ascending,
3932 },
3933 });
3934 }
3935 }
3936 }
3937
3938 let sorted_input = if !sort_exprs.is_empty() {
3940 let lex_ordering = LexOrdering::new(sort_exprs)
3941 .ok_or_else(|| anyhow!("Failed to create LexOrdering for window function"))?;
3942 Arc::new(SortExec::new(lex_ordering, input)) as Arc<dyn ExecutionPlan>
3943 } else {
3944 input
3945 };
3946
3947 let window_agg_exec = WindowAggExec::try_new(
3949 window_expr_list,
3950 sorted_input,
3951 false, )?;
3953
3954 Ok(Arc::new(window_agg_exec))
3955 }
3956
3957 fn plan_empty(&self) -> Result<Arc<dyn ExecutionPlan>> {
3962 let schema = Arc::new(Schema::empty());
3963 Ok(Arc::new(PlaceholderRowExec::new(schema)))
3966 }
3967
3968 fn plan_bind_zero_length_path(
3971 &self,
3972 input: &LogicalPlan,
3973 node_variable: &str,
3974 path_variable: &str,
3975 all_properties: &HashMap<String, HashSet<String>>,
3976 ) -> Result<Arc<dyn ExecutionPlan>> {
3977 let input_plan = self.plan_internal(input, all_properties)?;
3978 Ok(Arc::new(BindZeroLengthPathExec::new(
3979 input_plan,
3980 node_variable.to_string(),
3981 path_variable.to_string(),
3982 self.graph_ctx.clone(),
3983 )))
3984 }
3985
3986 fn plan_bind_path(
3989 &self,
3990 input: &LogicalPlan,
3991 node_variables: &[String],
3992 edge_variables: &[String],
3993 path_variable: &str,
3994 all_properties: &HashMap<String, HashSet<String>>,
3995 ) -> Result<Arc<dyn ExecutionPlan>> {
3996 let input_plan = self.plan_internal(input, all_properties)?;
3997 Ok(Arc::new(BindFixedPathExec::new(
3998 input_plan,
3999 node_variables.to_vec(),
4000 edge_variables.to_vec(),
4001 path_variable.to_string(),
4002 self.graph_ctx.clone(),
4003 )))
4004 }
4005
4006 fn extract_edge_property_conditions(expr: &Expr) -> Vec<(String, uni_common::Value)> {
4015 match expr {
4016 Expr::BinaryOp {
4017 left,
4018 op: uni_cypher::ast::BinaryOp::Eq,
4019 right,
4020 } => {
4021 if let Expr::Property(inner, prop_name) = left.as_ref()
4023 && matches!(inner.as_ref(), Expr::Variable(_))
4024 && let Expr::Literal(lit) = right.as_ref()
4025 {
4026 return vec![(prop_name.clone(), lit.to_value())];
4027 }
4028 if let Expr::Literal(lit) = left.as_ref()
4030 && let Expr::Property(inner, prop_name) = right.as_ref()
4031 && matches!(inner.as_ref(), Expr::Variable(_))
4032 {
4033 return vec![(prop_name.clone(), lit.to_value())];
4034 }
4035 vec![]
4036 }
4037 Expr::BinaryOp {
4038 left,
4039 op: uni_cypher::ast::BinaryOp::And,
4040 right,
4041 } => {
4042 let mut result = Self::extract_edge_property_conditions(left);
4043 result.extend(Self::extract_edge_property_conditions(right));
4044 result
4045 }
4046 _ => vec![],
4047 }
4048 }
4049
4050 fn create_physical_filter_expr(
4055 &self,
4056 expr: &DfExpr,
4057 schema: &SchemaRef,
4058 session: &SessionContext,
4059 ) -> Result<Arc<dyn datafusion::physical_expr::PhysicalExpr>> {
4060 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4061 let state = session.state();
4062
4063 let resolved_expr = Self::resolve_udfs(expr, &state)?;
4065
4066 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
4068
4069 let coerced_expr = Self::resolve_udfs(&coerced_expr, &state)?;
4071
4072 use datafusion::physical_planner::PhysicalPlanner;
4074 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
4075 let physical = planner.create_physical_expr(&coerced_expr, &df_schema, &state)?;
4076
4077 Ok(physical)
4078 }
4079
4080 fn resolve_udfs(expr: &DfExpr, state: &datafusion::execution::SessionState) -> Result<DfExpr> {
4085 use datafusion::common::tree_node::{Transformed, TreeNode};
4086 use datafusion::logical_expr::Expr as DfExpr;
4087
4088 let result = expr
4089 .clone()
4090 .transform_up(|node| {
4091 if let DfExpr::ScalarFunction(ref func) = node {
4092 let udf_name = func.func.name();
4093 if let Some(registered_udf) = state.scalar_functions().get(udf_name) {
4094 return Ok(Transformed::yes(DfExpr::ScalarFunction(
4095 datafusion::logical_expr::expr::ScalarFunction {
4096 func: registered_udf.clone(),
4097 args: func.args.clone(),
4098 },
4099 )));
4100 }
4101 }
4102 Ok(Transformed::no(node))
4103 })
4104 .map_err(|e| anyhow::anyhow!("Failed to resolve UDFs: {}", e))?;
4105
4106 Ok(result.data)
4107 }
4108
4109 fn add_structural_projection(
4112 &self,
4113 input: Arc<dyn ExecutionPlan>,
4114 variable: &str,
4115 properties: &[String],
4116 ) -> Result<Arc<dyn ExecutionPlan>> {
4117 use datafusion::functions::expr_fn::named_struct;
4118 use datafusion::logical_expr::lit;
4119 use datafusion::physical_plan::projection::ProjectionExec;
4120
4121 let input_schema = input.schema();
4122 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4123 Vec::new();
4124
4125 for (i, field) in input_schema.fields().iter().enumerate() {
4127 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4128 field.name(),
4129 i,
4130 ));
4131 proj_exprs.push((col_expr, field.name().clone()));
4132 }
4133
4134 let mut struct_args = Vec::with_capacity(properties.len() * 2 + 4);
4136
4137 struct_args.push(lit("_vid"));
4139 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4140 format!("{}._vid", variable),
4141 )));
4142
4143 struct_args.push(lit("_labels"));
4145 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4146 format!("{}._labels", variable),
4147 )));
4148
4149 for prop in properties {
4150 struct_args.push(lit(prop.clone()));
4151 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4152 format!("{}.{}", variable, prop),
4153 )));
4154 }
4155
4156 let struct_expr = named_struct(struct_args);
4158
4159 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
4160 let session = self.session_ctx.read();
4161 let state = session.state();
4162
4163 let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
4165
4166 use datafusion::physical_planner::PhysicalPlanner;
4167 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
4168 let physical_struct_expr =
4169 planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
4170
4171 proj_exprs.push((physical_struct_expr, variable.to_string()));
4172
4173 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4174 }
4175
4176 fn add_edge_structural_projection(
4178 &self,
4179 input: Arc<dyn ExecutionPlan>,
4180 variable: &str,
4181 properties: &[String],
4182 source_variable: &str,
4183 target_variable: &str,
4184 ) -> Result<Arc<dyn ExecutionPlan>> {
4185 use datafusion::functions::expr_fn::named_struct;
4186 use datafusion::logical_expr::lit;
4187 use datafusion::physical_plan::projection::ProjectionExec;
4188
4189 let input_schema = input.schema();
4190 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4191 Vec::new();
4192
4193 for (i, field) in input_schema.fields().iter().enumerate() {
4195 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4196 field.name(),
4197 i,
4198 ));
4199 proj_exprs.push((col_expr, field.name().clone()));
4200 }
4201
4202 let mut struct_args = Vec::with_capacity(properties.len() * 2 + 10);
4204
4205 struct_args.push(lit("_eid"));
4207 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4208 format!("{}._eid", variable),
4209 )));
4210
4211 struct_args.push(lit("_type"));
4212 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4213 format!("{}._type", variable),
4214 )));
4215
4216 let resolve_vid_col = |var: &str| -> String {
4221 let vid_col = format!("{}._vid", var);
4222 if input_schema.column_with_name(&vid_col).is_some() {
4223 vid_col
4224 } else {
4225 var.to_string()
4226 }
4227 };
4228 let src_col_name = resolve_vid_col(source_variable);
4229 let dst_col_name = resolve_vid_col(target_variable);
4230 struct_args.push(lit("_src"));
4231 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4232 src_col_name,
4233 )));
4234
4235 struct_args.push(lit("_dst"));
4236 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4237 dst_col_name,
4238 )));
4239
4240 let all_props_col = format!("{}._all_props", variable);
4242 if input_schema.column_with_name(&all_props_col).is_some() {
4243 struct_args.push(lit("_all_props"));
4244 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4245 all_props_col,
4246 )));
4247 }
4248
4249 for prop in properties {
4250 struct_args.push(lit(prop.clone()));
4251 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4252 format!("{}.{}", variable, prop),
4253 )));
4254 }
4255
4256 let struct_expr = named_struct(struct_args);
4257
4258 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
4259 let session = self.session_ctx.read();
4260 let state = session.state();
4261
4262 let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
4263
4264 use datafusion::physical_planner::PhysicalPlanner;
4265 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
4266 let physical_struct_expr =
4267 planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
4268
4269 proj_exprs.push((physical_struct_expr, variable.to_string()));
4270
4271 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4272 }
4273
4274 fn create_physical_aggregate(
4276 &self,
4277 expr: &DfExpr,
4278 schema: &SchemaRef,
4279 state: &SessionState,
4280 ) -> Result<PhysicalAggregate> {
4281 use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
4282
4283 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4285
4286 let (agg_expr, filter, _ordering) = create_aggregate_expr_and_maybe_filter(
4288 expr,
4289 &df_schema,
4290 schema.as_ref(),
4291 state.execution_props(),
4292 )?;
4293 Ok((agg_expr, filter))
4294 }
4295
4296 fn resolve_source_vid_col(
4301 input_plan: Arc<dyn ExecutionPlan>,
4302 source_variable: &str,
4303 ) -> Result<(Arc<dyn ExecutionPlan>, String)> {
4304 let source_vid_col = format!("{}._vid", source_variable);
4305 if input_plan
4306 .schema()
4307 .column_with_name(&source_vid_col)
4308 .is_some()
4309 {
4310 return Ok((input_plan, source_vid_col));
4311 }
4312 if let Ok(field) = input_plan.schema().field_with_name(source_variable)
4315 && matches!(
4316 field.data_type(),
4317 datafusion::arrow::datatypes::DataType::Struct(_)
4318 )
4319 {
4320 let enriched = Self::extract_struct_identity_columns(input_plan, source_variable)?;
4321 return Ok((enriched, format!("{}._vid", source_variable)));
4322 }
4323 Ok((input_plan, source_variable.to_string()))
4324 }
4325
4326 fn extract_struct_identity_columns(
4331 input: Arc<dyn ExecutionPlan>,
4332 variable: &str,
4333 ) -> Result<Arc<dyn ExecutionPlan>> {
4334 use datafusion::common::ScalarValue;
4335 use datafusion::physical_plan::projection::ProjectionExec;
4336
4337 let schema = input.schema();
4338 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4339 Vec::new();
4340
4341 for (i, field) in schema.fields().iter().enumerate() {
4343 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4344 field.name(),
4345 i,
4346 ));
4347 proj_exprs.push((col_expr, field.name().clone()));
4348 }
4349
4350 if let Some((struct_idx, struct_field)) = schema
4352 .fields()
4353 .iter()
4354 .enumerate()
4355 .find(|(_, f)| f.name() == variable)
4356 && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
4357 {
4358 let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4359 datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
4360 );
4361 let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
4362 Arc::new(datafusion::logical_expr::ScalarUDF::from(
4363 datafusion::functions::core::getfield::GetFieldFunc::new(),
4364 ));
4365
4366 if fields.iter().any(|f| f.name() == "_vid") {
4368 let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4369 Arc::new(datafusion::physical_expr::expressions::Literal::new(
4370 ScalarValue::Utf8(Some("_vid".to_string())),
4371 ));
4372 let vid_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
4373 get_field_udf.clone(),
4374 vec![struct_col.clone(), field_name],
4375 schema.as_ref(),
4376 Arc::new(datafusion::common::config::ConfigOptions::default()),
4377 )?);
4378 proj_exprs.push((vid_expr, format!("{}._vid", variable)));
4379 }
4380
4381 if fields.iter().any(|f| f.name() == "_labels") {
4383 let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4384 Arc::new(datafusion::physical_expr::expressions::Literal::new(
4385 ScalarValue::Utf8(Some("_labels".to_string())),
4386 ));
4387 let labels_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
4388 get_field_udf,
4389 vec![struct_col, field_name],
4390 schema.as_ref(),
4391 Arc::new(datafusion::common::config::ConfigOptions::default()),
4392 )?);
4393 proj_exprs.push((labels_expr, format!("{}._labels", variable)));
4394 }
4395 }
4396
4397 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4398 }
4399
4400 fn extract_all_struct_fields(
4404 input: Arc<dyn ExecutionPlan>,
4405 variable: &str,
4406 ) -> Result<Arc<dyn ExecutionPlan>> {
4407 use datafusion::common::ScalarValue;
4408 use datafusion::physical_plan::projection::ProjectionExec;
4409
4410 let schema = input.schema();
4411 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4412 Vec::new();
4413
4414 for (i, field) in schema.fields().iter().enumerate() {
4416 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
4417 field.name(),
4418 i,
4419 ));
4420 proj_exprs.push((col_expr, field.name().clone()));
4421 }
4422
4423 if let Some((struct_idx, struct_field)) = schema
4425 .fields()
4426 .iter()
4427 .enumerate()
4428 .find(|(_, f)| f.name() == variable)
4429 && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
4430 {
4431 let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4432 datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
4433 );
4434 let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
4435 Arc::new(datafusion::logical_expr::ScalarUDF::from(
4436 datafusion::functions::core::getfield::GetFieldFunc::new(),
4437 ));
4438
4439 for field in fields.iter() {
4440 let flat_name = format!("{}.{}", variable, field.name());
4441 if schema.column_with_name(&flat_name).is_some() {
4443 continue;
4444 }
4445 let field_lit: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4446 Arc::new(datafusion::physical_expr::expressions::Literal::new(
4447 ScalarValue::Utf8(Some(field.name().to_string())),
4448 ));
4449 let extract_expr =
4450 Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
4451 get_field_udf.clone(),
4452 vec![struct_col.clone(), field_lit],
4453 schema.as_ref(),
4454 Arc::new(datafusion::common::config::ConfigOptions::default()),
4455 )?);
4456 proj_exprs.push((extract_expr, flat_name));
4457 }
4458 }
4459
4460 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4461 }
4462
4463 fn is_large_binary_col(&self, expr: &DfExpr, schema: &SchemaRef) -> bool {
4465 if let DfExpr::Column(col) = expr
4466 && let Ok(field) = schema.field_with_name(&col.name)
4467 {
4468 return matches!(
4469 field.data_type(),
4470 datafusion::arrow::datatypes::DataType::LargeBinary
4471 );
4472 }
4473 true
4476 }
4477}
4478
4479fn coerce_physical_expr(
4495 expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
4496 actual_dt: &DataType,
4497 target_dt: &DataType,
4498 schema: &arrow_schema::Schema,
4499) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
4500 use datafusion::physical_expr::expressions::CastExpr;
4501
4502 match (actual_dt, target_dt) {
4503 (DataType::LargeBinary, DataType::Float64) => wrap_cypher_to_float64(expr, schema),
4505 (DataType::LargeBinary, DataType::Int64) => {
4507 let float_expr = wrap_cypher_to_float64(expr, schema);
4508 Arc::new(CastExpr::new(float_expr, DataType::Int64, None))
4509 }
4510 _ => Arc::new(CastExpr::new(expr, target_dt.clone(), None)),
4512 }
4513}
4514
4515fn wrap_cypher_to_float64(
4517 expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
4518 schema: &arrow_schema::Schema,
4519) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
4520 let udf = Arc::new(super::df_udfs::cypher_to_float64_udf());
4521 let config = Arc::new(datafusion::common::config::ConfigOptions::default());
4522 Arc::new(
4523 datafusion::physical_expr::ScalarFunctionExpr::try_new(udf, vec![expr], schema, config)
4524 .expect("CypherToFloat64Udf accepts Any(1) signature"),
4525 )
4526}
4527
4528fn strip_conflicting_structural_columns(
4538 input: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
4539 derived_col_names: &HashSet<&str>,
4540) -> anyhow::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
4541 use datafusion::physical_plan::projection::ProjectionExec;
4542
4543 let schema = input.schema();
4544 let proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = schema
4545 .fields()
4546 .iter()
4547 .enumerate()
4548 .filter(|(_, f)| {
4549 !(matches!(f.data_type(), arrow_schema::DataType::Struct(_))
4551 && derived_col_names.contains(f.name().as_str()))
4552 })
4553 .map(|(i, f)| {
4554 let col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4555 datafusion::physical_expr::expressions::Column::new(f.name(), i),
4556 );
4557 (col, f.name().clone())
4558 })
4559 .collect();
4560
4561 if proj_exprs.len() == schema.fields().len() {
4562 return Ok(input);
4564 }
4565
4566 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
4567}
4568
4569fn resolve_column_indices(
4570 schema: &arrow_schema::SchemaRef,
4571 column_names: &[String],
4572) -> anyhow::Result<Vec<usize>> {
4573 column_names
4574 .iter()
4575 .map(|name| {
4576 schema
4577 .index_of(name)
4578 .map_err(|_| anyhow::anyhow!("Column '{}' not found in schema", name))
4579 })
4580 .collect()
4581}
4582
4583fn resolve_best_by_criteria(
4585 schema: &arrow_schema::SchemaRef,
4586 criteria: &[(Expr, bool)],
4587) -> anyhow::Result<Vec<super::df_graph::locy_best_by::SortCriterion>> {
4588 criteria
4589 .iter()
4590 .map(|(expr, ascending)| {
4591 let candidates: Vec<String> = match expr {
4594 Expr::Property(base, prop) => {
4595 if let Expr::Variable(var) = base.as_ref() {
4596 vec![prop.clone(), format!("{}.{}", var, prop)]
4597 } else {
4598 vec![prop.clone()]
4599 }
4600 }
4601 Expr::Variable(name) => {
4602 let short = name.rsplit('.').next().unwrap_or(name).to_string();
4603 if short != *name {
4604 vec![short, name.clone()]
4605 } else {
4606 vec![name.clone()]
4607 }
4608 }
4609 _ => {
4610 return Err(anyhow::anyhow!(
4611 "BEST BY criteria must be variable or property access"
4612 ));
4613 }
4614 };
4615 let col_index = candidates
4616 .iter()
4617 .find_map(|name| schema.index_of(name).ok())
4618 .ok_or_else(|| {
4619 anyhow::anyhow!(
4620 "BEST BY column '{}' not found",
4621 candidates.first().unwrap_or(&String::new())
4622 )
4623 })?;
4624 Ok(super::df_graph::locy_best_by::SortCriterion {
4625 col_index,
4626 ascending: *ascending,
4627 nulls_first: false, })
4629 })
4630 .collect()
4631}
4632
4633fn resolve_fold_bindings(
4635 schema: &arrow_schema::SchemaRef,
4636 fold_bindings: &[(String, Expr)],
4637) -> anyhow::Result<Vec<super::df_graph::locy_fold::FoldBinding>> {
4638 fold_bindings
4639 .iter()
4640 .map(|(output_name, expr)| {
4641 match expr {
4643 Expr::FunctionCall { name, args, .. } => {
4644 let upper = name.to_uppercase();
4645 let is_count = matches!(upper.as_str(), "COUNT" | "MCOUNT");
4646
4647 if is_count && args.is_empty() {
4649 return Ok(super::df_graph::locy_fold::FoldBinding {
4650 output_name: output_name.clone(),
4651 kind: super::df_graph::locy_fold::FoldAggKind::CountAll,
4652 input_col_index: 0, input_col_name: None,
4654 });
4655 }
4656
4657 let kind = match upper.as_str() {
4658 "SUM" | "MSUM" => super::df_graph::locy_fold::FoldAggKind::Sum,
4659 "COUNT" | "MCOUNT" => super::df_graph::locy_fold::FoldAggKind::Count,
4660 "MAX" | "MMAX" => super::df_graph::locy_fold::FoldAggKind::Max,
4661 "MIN" | "MMIN" => super::df_graph::locy_fold::FoldAggKind::Min,
4662 "AVG" => super::df_graph::locy_fold::FoldAggKind::Avg,
4663 "COLLECT" => super::df_graph::locy_fold::FoldAggKind::Collect,
4664 "MNOR" => super::df_graph::locy_fold::FoldAggKind::Nor,
4665 "MPROD" => super::df_graph::locy_fold::FoldAggKind::Prod,
4666 other => {
4667 return Err(anyhow::anyhow!(
4668 "Unsupported FOLD aggregate function: {}",
4669 other
4670 ));
4671 }
4672 };
4673 let input_col_index = schema
4676 .index_of(output_name)
4677 .or_else(|_| {
4678 let col_name = match args.first() {
4680 Some(Expr::Variable(name)) => Some(name.clone()),
4681 Some(Expr::Property(base, prop)) => {
4682 if let Expr::Variable(var) = base.as_ref() {
4683 Some(format!("{}.{}", var, prop))
4684 } else {
4685 None
4686 }
4687 }
4688 _ => None,
4689 };
4690 col_name
4691 .and_then(|n| schema.index_of(&n).ok())
4692 .ok_or_else(|| {
4693 arrow_schema::ArrowError::SchemaError(format!(
4694 "FOLD column '{}' not found",
4695 output_name
4696 ))
4697 })
4698 })
4699 .map_err(|_| anyhow::anyhow!("FOLD column '{}' not found", output_name))?;
4700 Ok(super::df_graph::locy_fold::FoldBinding {
4701 output_name: output_name.clone(),
4702 kind,
4703 input_col_index,
4704 input_col_name: Some(output_name.clone()),
4705 })
4706 }
4707 _ => Err(anyhow::anyhow!(
4708 "FOLD binding must be an aggregate function call"
4709 )),
4710 }
4711 })
4712 .collect()
4713}
4714
4715fn collect_variable_kinds(plan: &LogicalPlan, kinds: &mut HashMap<String, VariableKind>) {
4720 match plan {
4721 LogicalPlan::Scan { variable, .. }
4722 | LogicalPlan::ExtIdLookup { variable, .. }
4723 | LogicalPlan::ScanAll { variable, .. }
4724 | LogicalPlan::ScanMainByLabels { variable, .. }
4725 | LogicalPlan::VectorKnn { variable, .. }
4726 | LogicalPlan::InvertedIndexLookup { variable, .. } => {
4727 kinds.insert(variable.clone(), VariableKind::Node);
4728 }
4729 LogicalPlan::Traverse {
4730 input,
4731 source_variable,
4732 target_variable,
4733 step_variable,
4734 path_variable,
4735 is_variable_length,
4736 ..
4737 }
4738 | LogicalPlan::TraverseMainByType {
4739 input,
4740 source_variable,
4741 target_variable,
4742 step_variable,
4743 path_variable,
4744 is_variable_length,
4745 ..
4746 } => {
4747 collect_variable_kinds(input, kinds);
4748 kinds.insert(source_variable.clone(), VariableKind::Node);
4749 kinds.insert(target_variable.clone(), VariableKind::Node);
4750 if let Some(sv) = step_variable {
4751 kinds.insert(sv.clone(), VariableKind::edge_for(*is_variable_length));
4752 }
4753 if let Some(pv) = path_variable {
4754 kinds.insert(pv.clone(), VariableKind::Path);
4755 }
4756 }
4757 LogicalPlan::ShortestPath {
4758 input,
4759 source_variable,
4760 target_variable,
4761 path_variable,
4762 ..
4763 }
4764 | LogicalPlan::AllShortestPaths {
4765 input,
4766 source_variable,
4767 target_variable,
4768 path_variable,
4769 ..
4770 } => {
4771 collect_variable_kinds(input, kinds);
4772 kinds.insert(source_variable.clone(), VariableKind::Node);
4773 kinds.insert(target_variable.clone(), VariableKind::Node);
4774 kinds.insert(path_variable.clone(), VariableKind::Path);
4775 }
4776 LogicalPlan::QuantifiedPattern {
4777 input,
4778 pattern_plan,
4779 path_variable,
4780 start_variable,
4781 binding_variable,
4782 ..
4783 } => {
4784 collect_variable_kinds(input, kinds);
4785 collect_variable_kinds(pattern_plan, kinds);
4786 kinds.insert(start_variable.clone(), VariableKind::Node);
4787 kinds.insert(binding_variable.clone(), VariableKind::Node);
4788 if let Some(pv) = path_variable {
4789 kinds.insert(pv.clone(), VariableKind::Path);
4790 }
4791 }
4792 LogicalPlan::BindZeroLengthPath {
4793 input,
4794 node_variable,
4795 path_variable,
4796 } => {
4797 collect_variable_kinds(input, kinds);
4798 kinds.insert(node_variable.clone(), VariableKind::Node);
4799 kinds.insert(path_variable.clone(), VariableKind::Path);
4800 }
4801 LogicalPlan::BindPath {
4802 input,
4803 node_variables,
4804 edge_variables,
4805 path_variable,
4806 } => {
4807 collect_variable_kinds(input, kinds);
4808 for nv in node_variables {
4809 kinds.insert(nv.clone(), VariableKind::Node);
4810 }
4811 for ev in edge_variables {
4812 kinds.insert(ev.clone(), VariableKind::Edge);
4813 }
4814 kinds.insert(path_variable.clone(), VariableKind::Path);
4815 }
4816 LogicalPlan::Filter { input, .. }
4818 | LogicalPlan::Project { input, .. }
4819 | LogicalPlan::Sort { input, .. }
4820 | LogicalPlan::Limit { input, .. }
4821 | LogicalPlan::Aggregate { input, .. }
4822 | LogicalPlan::Distinct { input, .. }
4823 | LogicalPlan::Window { input, .. }
4824 | LogicalPlan::Unwind { input, .. }
4825 | LogicalPlan::Create { input, .. }
4826 | LogicalPlan::CreateBatch { input, .. }
4827 | LogicalPlan::Merge { input, .. }
4828 | LogicalPlan::Set { input, .. }
4829 | LogicalPlan::Remove { input, .. }
4830 | LogicalPlan::Delete { input, .. }
4831 | LogicalPlan::Foreach { input, .. }
4832 | LogicalPlan::SubqueryCall { input, .. } => {
4833 collect_variable_kinds(input, kinds);
4834 }
4835 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
4836 collect_variable_kinds(left, kinds);
4837 collect_variable_kinds(right, kinds);
4838 }
4839 LogicalPlan::Apply {
4840 input, subquery, ..
4841 } => {
4842 collect_variable_kinds(input, kinds);
4843 collect_variable_kinds(subquery, kinds);
4844 }
4845 LogicalPlan::RecursiveCTE {
4846 initial, recursive, ..
4847 } => {
4848 collect_variable_kinds(initial, kinds);
4849 collect_variable_kinds(recursive, kinds);
4850 }
4851 LogicalPlan::Explain { plan } => {
4852 collect_variable_kinds(plan, kinds);
4853 }
4854 LogicalPlan::ProcedureCall {
4855 procedure_name,
4856 yield_items,
4857 ..
4858 } => {
4859 use crate::query::df_graph::procedure_call::map_yield_to_canonical;
4860 for (name, alias) in yield_items {
4861 let var = alias.as_ref().unwrap_or(name);
4862 if matches!(
4863 procedure_name.as_str(),
4864 "uni.vector.query" | "uni.fts.query" | "uni.search"
4865 ) {
4866 let canonical = map_yield_to_canonical(name);
4867 if canonical == "node" {
4868 kinds.insert(var.clone(), VariableKind::Node);
4869 }
4870 }
4872 }
4874 }
4875 LogicalPlan::LocyProgram { .. }
4877 | LogicalPlan::LocyFold { .. }
4878 | LogicalPlan::LocyBestBy { .. }
4879 | LogicalPlan::LocyPriority { .. }
4880 | LogicalPlan::LocyDerivedScan { .. }
4881 | LogicalPlan::LocyProject { .. } => {}
4882 LogicalPlan::Empty
4884 | LogicalPlan::CreateVectorIndex { .. }
4885 | LogicalPlan::CreateFullTextIndex { .. }
4886 | LogicalPlan::CreateScalarIndex { .. }
4887 | LogicalPlan::CreateJsonFtsIndex { .. }
4888 | LogicalPlan::DropIndex { .. }
4889 | LogicalPlan::ShowIndexes { .. }
4890 | LogicalPlan::Copy { .. }
4891 | LogicalPlan::Backup { .. }
4892 | LogicalPlan::ShowDatabase
4893 | LogicalPlan::ShowConfig
4894 | LogicalPlan::ShowStatistics
4895 | LogicalPlan::Vacuum
4896 | LogicalPlan::Checkpoint
4897 | LogicalPlan::CopyTo { .. }
4898 | LogicalPlan::CopyFrom { .. }
4899 | LogicalPlan::CreateLabel(_)
4900 | LogicalPlan::CreateEdgeType(_)
4901 | LogicalPlan::AlterLabel(_)
4902 | LogicalPlan::AlterEdgeType(_)
4903 | LogicalPlan::DropLabel(_)
4904 | LogicalPlan::DropEdgeType(_)
4905 | LogicalPlan::CreateConstraint(_)
4906 | LogicalPlan::DropConstraint(_)
4907 | LogicalPlan::ShowConstraints(_) => {}
4908 }
4909}
4910
4911fn collect_mutation_node_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
4916 match plan {
4917 LogicalPlan::Create { input, pattern } => {
4918 collect_node_names_from_pattern(pattern, hints);
4919 collect_mutation_node_hints(input, hints);
4920 }
4921 LogicalPlan::CreateBatch { input, patterns } => {
4922 for pattern in patterns {
4923 collect_node_names_from_pattern(pattern, hints);
4924 }
4925 collect_mutation_node_hints(input, hints);
4926 }
4927 LogicalPlan::Merge { input, pattern, .. } => {
4928 collect_node_names_from_pattern(pattern, hints);
4929 collect_mutation_node_hints(input, hints);
4930 }
4931 LogicalPlan::Traverse { input, .. }
4933 | LogicalPlan::TraverseMainByType { input, .. }
4934 | LogicalPlan::Filter { input, .. }
4935 | LogicalPlan::Project { input, .. }
4936 | LogicalPlan::Sort { input, .. }
4937 | LogicalPlan::Limit { input, .. }
4938 | LogicalPlan::Aggregate { input, .. }
4939 | LogicalPlan::Distinct { input, .. }
4940 | LogicalPlan::Window { input, .. }
4941 | LogicalPlan::Unwind { input, .. }
4942 | LogicalPlan::Set { input, .. }
4943 | LogicalPlan::Remove { input, .. }
4944 | LogicalPlan::Delete { input, .. }
4945 | LogicalPlan::Foreach { input, .. }
4946 | LogicalPlan::SubqueryCall { input, .. }
4947 | LogicalPlan::ShortestPath { input, .. }
4948 | LogicalPlan::AllShortestPaths { input, .. }
4949 | LogicalPlan::QuantifiedPattern { input, .. }
4950 | LogicalPlan::BindZeroLengthPath { input, .. }
4951 | LogicalPlan::BindPath { input, .. } => {
4952 collect_mutation_node_hints(input, hints);
4953 }
4954 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
4955 collect_mutation_node_hints(left, hints);
4956 collect_mutation_node_hints(right, hints);
4957 }
4958 LogicalPlan::Apply {
4959 input, subquery, ..
4960 } => {
4961 collect_mutation_node_hints(input, hints);
4962 collect_mutation_node_hints(subquery, hints);
4963 }
4964 LogicalPlan::RecursiveCTE {
4965 initial, recursive, ..
4966 } => {
4967 collect_mutation_node_hints(initial, hints);
4968 collect_mutation_node_hints(recursive, hints);
4969 }
4970 LogicalPlan::Explain { plan } => {
4971 collect_mutation_node_hints(plan, hints);
4972 }
4973 _ => {}
4975 }
4976}
4977
4978fn collect_node_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
4980 for path in &pattern.paths {
4981 for element in &path.elements {
4982 match element {
4983 PatternElement::Node(n) => {
4984 if let Some(ref v) = n.variable
4985 && !hints.contains(v)
4986 {
4987 hints.push(v.clone());
4988 }
4989 }
4990 PatternElement::Parenthesized { pattern, .. } => {
4991 let sub = Pattern {
4992 paths: vec![pattern.as_ref().clone()],
4993 };
4994 collect_node_names_from_pattern(&sub, hints);
4995 }
4996 _ => {}
4997 }
4998 }
4999 }
5000}
5001
5002fn collect_mutation_edge_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
5006 match plan {
5007 LogicalPlan::Create { input, pattern } | LogicalPlan::Merge { input, pattern, .. } => {
5008 collect_edge_names_from_pattern(pattern, hints);
5009 collect_mutation_edge_hints(input, hints);
5010 }
5011 LogicalPlan::CreateBatch { input, patterns } => {
5012 for pattern in patterns {
5013 collect_edge_names_from_pattern(pattern, hints);
5014 }
5015 collect_mutation_edge_hints(input, hints);
5016 }
5017 LogicalPlan::Traverse { input, .. }
5019 | LogicalPlan::TraverseMainByType { input, .. }
5020 | LogicalPlan::Filter { input, .. }
5021 | LogicalPlan::Project { input, .. }
5022 | LogicalPlan::Sort { input, .. }
5023 | LogicalPlan::Limit { input, .. }
5024 | LogicalPlan::Aggregate { input, .. }
5025 | LogicalPlan::Distinct { input, .. }
5026 | LogicalPlan::Window { input, .. }
5027 | LogicalPlan::Unwind { input, .. }
5028 | LogicalPlan::Set { input, .. }
5029 | LogicalPlan::Remove { input, .. }
5030 | LogicalPlan::Delete { input, .. }
5031 | LogicalPlan::Foreach { input, .. }
5032 | LogicalPlan::SubqueryCall { input, .. }
5033 | LogicalPlan::ShortestPath { input, .. }
5034 | LogicalPlan::AllShortestPaths { input, .. }
5035 | LogicalPlan::QuantifiedPattern { input, .. }
5036 | LogicalPlan::BindZeroLengthPath { input, .. }
5037 | LogicalPlan::BindPath { input, .. } => {
5038 collect_mutation_edge_hints(input, hints);
5039 }
5040 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
5041 collect_mutation_edge_hints(left, hints);
5042 collect_mutation_edge_hints(right, hints);
5043 }
5044 LogicalPlan::Apply {
5045 input, subquery, ..
5046 } => {
5047 collect_mutation_edge_hints(input, hints);
5048 collect_mutation_edge_hints(subquery, hints);
5049 }
5050 LogicalPlan::RecursiveCTE {
5051 initial, recursive, ..
5052 } => {
5053 collect_mutation_edge_hints(initial, hints);
5054 collect_mutation_edge_hints(recursive, hints);
5055 }
5056 LogicalPlan::Explain { plan } => {
5057 collect_mutation_edge_hints(plan, hints);
5058 }
5059 _ => {}
5060 }
5061}
5062
5063fn collect_edge_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
5065 for path in &pattern.paths {
5066 for element in &path.elements {
5067 match element {
5068 PatternElement::Relationship(r) => {
5069 if let Some(ref v) = r.variable
5070 && !hints.contains(v)
5071 {
5072 hints.push(v.clone());
5073 }
5074 }
5075 PatternElement::Parenthesized { pattern, .. } => {
5076 let sub = Pattern {
5077 paths: vec![pattern.as_ref().clone()],
5078 };
5079 collect_edge_names_from_pattern(&sub, hints);
5080 }
5081 _ => {}
5082 }
5083 }
5084 }
5085}
5086
5087fn convert_direction(ast_dir: AstDirection) -> Direction {
5089 match ast_dir {
5090 AstDirection::Outgoing => Direction::Outgoing,
5091 AstDirection::Incoming => Direction::Incoming,
5092 AstDirection::Both => Direction::Both,
5093 }
5094}
5095
5096fn sanitize_vlp_target_properties(
5101 mut properties: Vec<String>,
5102 target_has_wildcard: bool,
5103 target_label_props: Option<&HashSet<String>>,
5104) -> Vec<String> {
5105 properties.retain(|p| p != "*");
5106
5107 if target_has_wildcard && properties.is_empty() {
5108 properties.push("_all_props".to_string());
5109 }
5110
5111 let has_non_schema_props = properties.iter().any(|p| {
5112 p != "_all_props"
5113 && p != "overflow_json"
5114 && !p.starts_with('_')
5115 && !target_label_props.is_some_and(|props| props.contains(p))
5116 });
5117 if has_non_schema_props && !properties.iter().any(|p| p == "_all_props") {
5118 properties.push("_all_props".to_string());
5119 }
5120
5121 properties
5122}
5123
5124#[cfg(test)]
5125mod tests {
5126 use super::*;
5127
5128 #[test]
5129 fn test_convert_direction() {
5130 assert!(matches!(
5131 convert_direction(AstDirection::Outgoing),
5132 Direction::Outgoing
5133 ));
5134 assert!(matches!(
5135 convert_direction(AstDirection::Incoming),
5136 Direction::Incoming
5137 ));
5138 assert!(matches!(
5139 convert_direction(AstDirection::Both),
5140 Direction::Both
5141 ));
5142 }
5143
5144 #[test]
5145 fn test_sanitize_vlp_target_properties_removes_wildcard() {
5146 let props = vec!["*".to_string(), "name".to_string()];
5147 let label_props = HashSet::from(["name".to_string()]);
5148 let sanitized = sanitize_vlp_target_properties(props, true, Some(&label_props));
5149
5150 assert_eq!(sanitized, vec!["name".to_string()]);
5151 }
5152
5153 #[test]
5154 fn test_sanitize_vlp_target_properties_adds_all_props_for_wildcard_empty() {
5155 let props = vec!["*".to_string()];
5156 let sanitized = sanitize_vlp_target_properties(props, true, None);
5157
5158 assert_eq!(sanitized, vec!["_all_props".to_string()]);
5159 }
5160
5161 #[test]
5162 fn test_sanitize_vlp_target_properties_adds_all_props_for_non_schema() {
5163 let props = vec!["custom_prop".to_string()];
5164 let label_props = HashSet::from(["name".to_string()]);
5165 let sanitized = sanitize_vlp_target_properties(props, false, Some(&label_props));
5166
5167 assert_eq!(
5168 sanitized,
5169 vec!["custom_prop".to_string(), "_all_props".to_string()]
5170 );
5171 }
5172}