1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
36use crate::query::df_graph::ReadSetRecordingExec;
37use crate::query::df_graph::bind_fixed_path::BindFixedPathExec;
38use crate::query::df_graph::bind_zero_length_path::BindZeroLengthPathExec;
39use crate::query::df_graph::mutation_common::{
40 MutationKind, extended_schema_for_new_vars, new_create_exec, new_merge_exec,
41};
42use crate::query::df_graph::mutation_delete::new_delete_exec;
43use crate::query::df_graph::mutation_remove::new_remove_exec;
44use crate::query::df_graph::mutation_set::new_set_exec;
45use crate::query::df_graph::recursive_cte::RecursiveCTEExec;
46use crate::query::df_graph::traverse::{
47 GraphVariableLengthTraverseExec, GraphVariableLengthTraverseMainExec,
48};
49use crate::query::df_graph::{
50 GraphApplyExec, GraphExecutionContext, GraphExtIdLookupExec, GraphProcedureCallExec,
51 GraphScanExec, GraphShortestPathExec, GraphTraverseExec, GraphTraverseMainExec,
52 GraphUnwindExec, GraphVectorKnnExec, L0Context, MutationContext, MutationExec,
53 OptionalFilterExec,
54};
55use crate::query::planner::{
56 LogicalPlan, STRUCT_ONLY_SENTINEL, aggregate_column_name, collect_properties_from_plan,
57};
58use anyhow::{Result, anyhow};
59use arrow_schema::{DataType, Schema, SchemaRef};
60use datafusion::common::JoinType;
61use datafusion::execution::SessionState;
62use datafusion::logical_expr::{Expr as DfExpr, ExprSchemable, SortExpr as DfSortExpr};
63use datafusion::physical_expr::{create_physical_expr, create_physical_sort_exprs};
64use datafusion::physical_plan::ExecutionPlan;
65use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
66use datafusion::physical_plan::filter::FilterExec;
67use datafusion::physical_plan::joins::NestedLoopJoinExec;
68use datafusion::physical_plan::limit::LocalLimitExec;
69use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
70use datafusion::physical_plan::projection::ProjectionExec;
71use datafusion::physical_plan::sorts::sort::SortExec;
72use datafusion::physical_plan::udaf::AggregateFunctionExpr;
73use datafusion::physical_plan::union::UnionExec;
74use datafusion::prelude::SessionContext;
75use parking_lot::RwLock;
76use std::collections::{HashMap, HashSet};
77use std::sync::Arc;
78use std::sync::atomic::{AtomicU64, Ordering};
79use uni_algo::algo::AlgorithmRegistry;
80use uni_common::core::schema::{PropertyMeta, Schema as UniSchema};
81use uni_cypher::ast::{
82 CypherLiteral, Direction as AstDirection, Expr, Pattern, PatternElement, SortItem,
83};
84use uni_store::runtime::l0::L0Buffer;
85use uni_store::runtime::property_manager::PropertyManager;
86use uni_store::storage::direction::Direction;
87use uni_store::storage::manager::StorageManager;
88use uni_xervo::runtime::ModelRuntime;
89
90type PhysicalAggregate = (
92 Arc<AggregateFunctionExpr>,
93 Option<Arc<dyn datafusion::physical_expr::PhysicalExpr>>,
94);
95
96pub struct HybridPhysicalPlanner {
116 session_ctx: Arc<RwLock<SessionContext>>,
118
119 storage: Arc<StorageManager>,
121
122 graph_ctx: Arc<GraphExecutionContext>,
124
125 schema: Arc<UniSchema>,
127
128 last_flush_version: AtomicU64,
130
131 params: HashMap<String, uni_common::Value>,
133
134 outer_values: HashMap<String, uni_common::Value>,
138
139 mutation_ctx: Option<Arc<MutationContext>>,
142
143 outer_entity_vars: HashSet<String>,
147
148 plugin_registry: Arc<uni_plugin::PluginRegistry>,
153}
154
155impl std::fmt::Debug for HybridPhysicalPlanner {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("HybridPhysicalPlanner")
158 .field(
159 "last_flush_version",
160 &self.last_flush_version.load(Ordering::Relaxed),
161 )
162 .finish_non_exhaustive()
163 }
164}
165
166impl HybridPhysicalPlanner {
167 pub fn new(
177 session_ctx: Arc<RwLock<SessionContext>>,
178 storage: Arc<StorageManager>,
179 l0: Arc<RwLock<L0Buffer>>,
180 property_manager: Arc<PropertyManager>,
181 schema: Arc<UniSchema>,
182 params: HashMap<String, uni_common::Value>,
183 ) -> Self {
184 let graph_ctx = Arc::new(GraphExecutionContext::new(
185 storage.clone(),
186 l0,
187 property_manager,
188 ));
189
190 Self {
191 session_ctx,
192 storage,
193 graph_ctx,
194 schema,
195 last_flush_version: AtomicU64::new(0),
196 params,
197 outer_values: HashMap::new(),
198 mutation_ctx: None,
199 outer_entity_vars: HashSet::new(),
200 plugin_registry: super::df_graph::locy_fold::default_locy_plugin_registry(),
201 }
202 }
203
204 #[must_use]
211 pub fn with_plugin_registry(
212 mut self,
213 plugin_registry: Arc<uni_plugin::PluginRegistry>,
214 ) -> Self {
215 let mut ctx = self.take_graph_ctx();
220 ctx = ctx.with_plugin_registry(Arc::clone(&plugin_registry));
221 self.graph_ctx = Arc::new(ctx);
222 self.plugin_registry = plugin_registry;
223 self
224 }
225
226 fn resolve_properties(
232 &self,
233 variable: &str,
234 schema_name: &str,
235 all_properties: &HashMap<String, HashSet<String>>,
236 ) -> Vec<String> {
237 const SYSTEM_COLUMNS: &[&str] =
239 &["_vid", "_labels", "_eid", "_src_vid", "_dst_vid", "_type"];
240
241 all_properties
242 .get(variable)
243 .map(|props| {
244 if props.contains("*") {
245 let schema_props: Vec<String> = self
246 .schema
247 .properties
248 .get(schema_name)
249 .map(|p| p.keys().cloned().collect())
250 .unwrap_or_default();
251
252 let explicit: Vec<String> = props
258 .iter()
259 .filter(|p| {
260 *p != "*"
261 && *p != STRUCT_ONLY_SENTINEL
262 && (!p.starts_with('_')
263 || matches!(p.as_str(), "_created_at" | "_updated_at"))
264 })
265 .cloned()
266 .collect();
267
268 if schema_props.is_empty() && explicit.is_empty() {
269 return vec!["*".to_string()];
271 }
272
273 let mut combined: Vec<String> = schema_props;
275 for p in explicit {
276 if !combined.contains(&p) {
277 combined.push(p);
278 }
279 }
280 combined.retain(|p| !SYSTEM_COLUMNS.contains(&p.as_str()));
281 combined.sort();
282 combined
283 } else {
284 let mut explicit_props: Vec<String> = props
290 .iter()
291 .filter(|p| {
292 *p != "*"
293 && *p != STRUCT_ONLY_SENTINEL
294 && !SYSTEM_COLUMNS.contains(&p.as_str())
295 })
296 .cloned()
297 .collect();
298 explicit_props.sort();
299 explicit_props
300 }
301 })
302 .unwrap_or_default()
303 }
304
305 pub fn with_l0_context(
307 session_ctx: Arc<RwLock<SessionContext>>,
308 storage: Arc<StorageManager>,
309 l0_context: L0Context,
310 property_manager: Arc<PropertyManager>,
311 schema: Arc<UniSchema>,
312 params: HashMap<String, uni_common::Value>,
313 outer_values: HashMap<String, uni_common::Value>,
314 ) -> Self {
315 let graph_ctx = Arc::new(GraphExecutionContext::with_l0_context(
316 storage.clone(),
317 l0_context,
318 property_manager,
319 ));
320
321 Self {
322 session_ctx,
323 storage,
324 graph_ctx,
325 schema,
326 last_flush_version: AtomicU64::new(0),
327 params,
328 outer_values,
329 mutation_ctx: None,
330 outer_entity_vars: HashSet::new(),
331 plugin_registry: super::df_graph::locy_fold::default_locy_plugin_registry(),
332 }
333 }
334
335 fn take_graph_ctx(&mut self) -> GraphExecutionContext {
339 let algo_registry = self.graph_ctx.algo_registry().cloned();
340 let procedure_registry = self.graph_ctx.procedure_registry().cloned();
341 let xervo_runtime = self.graph_ctx.xervo_runtime().cloned();
342 let plugin_registry = self.graph_ctx.plugin_registry().cloned();
343 let writer = self.graph_ctx.writer().cloned();
344
345 let new_base = |ctx: &Arc<GraphExecutionContext>| {
346 GraphExecutionContext::with_l0_context(
347 ctx.storage().clone(),
348 ctx.l0_context().clone(),
349 ctx.property_manager().clone(),
350 )
351 };
352 let placeholder = Arc::new(new_base(&self.graph_ctx));
353 let arc = std::mem::replace(&mut self.graph_ctx, placeholder);
354 let mut ctx = Arc::try_unwrap(arc).unwrap_or_else(|arc| new_base(&arc));
355
356 if let Some(registry) = algo_registry {
357 ctx = ctx.with_algo_registry(registry);
358 }
359 if let Some(registry) = procedure_registry {
360 ctx = ctx.with_procedure_registry(registry);
361 }
362 if let Some(runtime) = xervo_runtime {
363 ctx = ctx.with_xervo_runtime(runtime);
364 }
365 if let Some(registry) = plugin_registry {
366 ctx = ctx.with_plugin_registry(registry);
367 }
368 if let Some(w) = writer {
369 ctx = ctx.with_writer(w);
370 }
371 ctx
372 }
373
374 #[must_use]
379 pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
380 let ctx = self.take_graph_ctx().with_writer(writer);
381 self.graph_ctx = Arc::new(ctx);
382 self
383 }
384
385 pub fn set_outer_entity_vars(&mut self, vars: HashSet<String>) {
390 self.outer_entity_vars = vars;
391 }
392
393 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
394 let ctx = self.take_graph_ctx().with_algo_registry(registry);
395 self.graph_ctx = Arc::new(ctx);
396 self
397 }
398
399 pub fn with_procedure_registry(
403 mut self,
404 registry: Arc<crate::query::executor::procedure::ProcedureRegistry>,
405 ) -> Self {
406 let ctx = self.take_graph_ctx().with_procedure_registry(registry);
407 self.graph_ctx = Arc::new(ctx);
408 self
409 }
410
411 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
413 let ctx = self.take_graph_ctx().with_xervo_runtime(runtime);
414 self.graph_ctx = Arc::new(ctx);
415 self
416 }
417
418 pub fn with_mutation_context(mut self, ctx: Arc<MutationContext>) -> Self {
420 self.mutation_ctx = Some(ctx);
421 self
422 }
423
424 pub fn graph_ctx(&self) -> &Arc<GraphExecutionContext> {
426 &self.graph_ctx
427 }
428
429 pub fn session_ctx(&self) -> &Arc<RwLock<SessionContext>> {
431 &self.session_ctx
432 }
433
434 pub fn storage(&self) -> &Arc<StorageManager> {
436 &self.storage
437 }
438
439 pub fn schema_info(&self) -> &Arc<UniSchema> {
441 &self.schema
442 }
443
444 fn require_mutation_ctx(&self) -> Result<Arc<MutationContext>> {
446 self.mutation_ctx.clone().ok_or_else(|| {
447 tracing::error!(
448 "Mutation context not set — this indicates a routing bug where a write \
449 operation was sent to the DataFusion engine without a MutationContext"
450 );
451 anyhow!("Mutation context not set — write operations require a MutationContext")
452 })
453 }
454
455 fn translation_context_for_plan(&self, plan: &LogicalPlan) -> TranslationContext {
460 let mut variable_kinds = HashMap::new();
461 let mut variable_labels = HashMap::new();
462 let mut node_variable_hints = Vec::new();
463 let mut mutation_edge_hints = Vec::new();
464 collect_variable_kinds(plan, &mut variable_kinds);
465 collect_mutation_node_hints(plan, &mut node_variable_hints);
466 collect_mutation_edge_hints(plan, &mut mutation_edge_hints);
467 self.collect_variable_labels(plan, &mut variable_labels);
468 TranslationContext {
469 parameters: self.params.clone(),
470 outer_values: self.outer_values.clone(),
471 variable_labels,
472 variable_kinds,
473 node_variable_hints,
474 mutation_edge_hints,
475 ..Default::default()
476 }
477 }
478
479 fn collect_variable_labels(&self, plan: &LogicalPlan, labels: &mut HashMap<String, String>) {
485 match plan {
486 LogicalPlan::Scan {
487 variable,
488 labels: scan_labels,
489 ..
490 }
491 | LogicalPlan::ScanMainByLabels {
492 variable,
493 labels: scan_labels,
494 ..
495 } => {
496 if let Some(first) = scan_labels.first() {
497 labels.insert(variable.clone(), first.clone());
498 }
499 }
500 LogicalPlan::Traverse {
501 input,
502 step_variable,
503 edge_type_ids,
504 target_variable,
505 target_label_id,
506 ..
507 } => {
508 self.collect_variable_labels(input, labels);
509 if let Some(sv) = step_variable
510 && edge_type_ids.len() == 1
511 && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
512 {
513 labels.insert(sv.clone(), name.to_string());
514 }
515 if *target_label_id != 0
516 && let Some(name) = self.schema.label_name_by_id(*target_label_id)
517 {
518 labels.insert(target_variable.clone(), name.to_string());
519 }
520 }
521 LogicalPlan::TraverseMainByType {
522 input,
523 step_variable,
524 type_names,
525 ..
526 } => {
527 self.collect_variable_labels(input, labels);
528 if let Some(sv) = step_variable
529 && type_names.len() == 1
530 {
531 labels.insert(sv.clone(), type_names[0].clone());
532 }
533 }
534 LogicalPlan::Filter { input, .. }
536 | LogicalPlan::Project { input, .. }
537 | LogicalPlan::Sort { input, .. }
538 | LogicalPlan::Limit { input, .. }
539 | LogicalPlan::Aggregate { input, .. }
540 | LogicalPlan::Distinct { input, .. }
541 | LogicalPlan::Window { input, .. }
542 | LogicalPlan::Unwind { input, .. }
543 | LogicalPlan::Create { input, .. }
544 | LogicalPlan::CreateBatch { input, .. }
545 | LogicalPlan::Merge { input, .. }
546 | LogicalPlan::Set { input, .. }
547 | LogicalPlan::Remove { input, .. }
548 | LogicalPlan::Delete { input, .. }
549 | LogicalPlan::Foreach { input, .. }
550 | LogicalPlan::SubqueryCall { input, .. } => {
551 self.collect_variable_labels(input, labels);
552 }
553 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
554 self.collect_variable_labels(left, labels);
555 self.collect_variable_labels(right, labels);
556 }
557 LogicalPlan::Apply {
558 input, subquery, ..
559 } => {
560 self.collect_variable_labels(input, labels);
561 self.collect_variable_labels(subquery, labels);
562 }
563 LogicalPlan::Explain { plan } => {
564 self.collect_variable_labels(plan, labels);
565 }
566 _ => {}
567 }
568 }
569
570 fn merged_edge_type_properties(&self, edge_type_ids: &[u32]) -> HashMap<String, PropertyMeta> {
571 crate::query::df_graph::common::merged_edge_schema_props(&self.schema, edge_type_ids)
572 }
573
574 pub fn plan(&self, logical: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
588 let logical_rewritten = merge_unwind_in_filters(logical, &self.params);
595
596 let all_properties = collect_properties_from_plan(&logical_rewritten);
598
599 self.plan_internal(&logical_rewritten, &all_properties)
601 }
602
603 pub fn plan_with_properties(
609 &self,
610 logical: &LogicalPlan,
611 extra_properties: HashMap<String, HashSet<String>>,
612 ) -> Result<Arc<dyn ExecutionPlan>> {
613 let logical_rewritten = merge_unwind_in_filters(logical, &self.params);
615 let mut all_properties = collect_properties_from_plan(&logical_rewritten);
616 for (var, props) in extra_properties {
617 all_properties.entry(var).or_default().extend(props);
618 }
619 self.plan_internal(&logical_rewritten, &all_properties)
620 }
621
622 fn wrap_optional(
629 &self,
630 plan: Arc<dyn ExecutionPlan>,
631 optional: bool,
632 ) -> Result<Arc<dyn ExecutionPlan>> {
633 if !optional {
634 return Ok(plan);
635 }
636
637 let empty_schema = Arc::new(Schema::empty());
639 let placeholder = Arc::new(PlaceholderRowExec::new(empty_schema));
640
641 Ok(Arc::new(NestedLoopJoinExec::try_new(
644 placeholder,
645 plan,
646 None, &JoinType::Left,
648 None, )?))
650 }
651
652 fn plan_internal(
653 &self,
654 logical: &LogicalPlan,
655 all_properties: &HashMap<String, HashSet<String>>,
656 ) -> Result<Arc<dyn ExecutionPlan>> {
657 match logical {
658 LogicalPlan::FusedIndexScanWrapped { inner, kind: _ } => {
666 self.plan_internal(inner, all_properties)
667 }
668 LogicalPlan::Scan {
669 label_id,
670 labels,
671 variable,
672 filter,
673 optional,
674 }
675 | LogicalPlan::FusedIndexScan {
682 label_id,
683 labels,
684 variable,
685 filter,
686 optional,
687 kind: _,
688 } => {
689 if labels.len() > 1 {
690 self.plan_multi_label_scan(
692 labels,
693 variable,
694 filter.as_ref(),
695 *optional,
696 all_properties,
697 )
698 } else {
699 self.plan_scan(
701 *label_id,
702 variable,
703 filter.as_ref(),
704 *optional,
705 all_properties,
706 )
707 }
708 }
709
710 LogicalPlan::ScanMainByLabels {
712 labels,
713 variable,
714 filter,
715 optional,
716 } => {
717 if labels.len() > 1 {
718 self.plan_multi_label_scan(
720 labels,
721 variable,
722 filter.as_ref(),
723 *optional,
724 all_properties,
725 )
726 } else if let Some(label_name) = labels.first() {
727 self.plan_schemaless_scan(
729 label_name,
730 variable,
731 filter.as_ref(),
732 *optional,
733 all_properties,
734 )
735 } else {
736 self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties)
738 }
739 }
740
741 LogicalPlan::ScanAll {
743 variable,
744 filter,
745 optional,
746 } => self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties),
747
748 LogicalPlan::TraverseMainByType {
750 type_names,
751 input,
752 direction,
753 source_variable,
754 target_variable,
755 step_variable,
756 min_hops,
757 max_hops,
758 optional,
759 target_filter,
760 path_variable,
761 is_variable_length,
762 scope_match_variables,
763 optional_pattern_vars,
764 edge_filter_expr,
765 path_mode,
766 ..
767 } => {
768 if *is_variable_length {
769 let vlp_plan = self.plan_traverse_main_by_type_vlp(
770 input,
771 type_names,
772 direction.clone(),
773 source_variable,
774 target_variable,
775 step_variable.as_deref(),
776 *min_hops,
777 *max_hops,
778 path_variable.as_deref(),
779 *optional,
780 all_properties,
781 edge_filter_expr.as_ref(),
782 path_mode,
783 scope_match_variables,
784 )?;
785 self.apply_schemaless_traverse_filter(
786 vlp_plan,
787 target_filter.as_ref(),
788 source_variable,
789 target_variable,
790 step_variable.as_deref(),
791 path_variable.as_deref(),
792 true, *optional,
794 optional_pattern_vars,
795 )
796 } else {
797 let base_plan = self.plan_traverse_main_by_type(
798 input,
799 type_names,
800 direction.clone(),
801 source_variable,
802 target_variable,
803 step_variable.as_deref(),
804 *optional,
805 optional_pattern_vars,
806 all_properties,
807 scope_match_variables,
808 )?;
809 let edge_filtered = self.apply_schemaless_traverse_filter(
813 base_plan,
814 edge_filter_expr.as_ref(),
815 source_variable,
816 target_variable,
817 step_variable.as_deref(),
818 path_variable.as_deref(),
819 false,
820 *optional,
821 optional_pattern_vars,
822 )?;
823 self.apply_schemaless_traverse_filter(
824 edge_filtered,
825 target_filter.as_ref(),
826 source_variable,
827 target_variable,
828 step_variable.as_deref(),
829 path_variable.as_deref(),
830 false,
831 *optional,
832 optional_pattern_vars,
833 )
834 }
835 }
836
837 LogicalPlan::Traverse {
838 input,
839 edge_type_ids,
840 direction,
841 source_variable,
842 target_variable,
843 target_label_id,
844 step_variable,
845 min_hops,
846 max_hops,
847 optional,
848 target_filter,
849 path_variable,
850 is_variable_length,
851 optional_pattern_vars,
852 scope_match_variables,
853 edge_filter_expr,
854 path_mode,
855 qpp_steps,
856 ..
857 } => self.plan_traverse(
858 input,
859 edge_type_ids,
860 direction.clone(),
861 source_variable,
862 target_variable,
863 *target_label_id,
864 step_variable.as_deref(),
865 *min_hops,
866 *max_hops,
867 path_variable.as_deref(),
868 *optional,
869 target_filter.as_ref(),
870 *is_variable_length,
871 optional_pattern_vars,
872 all_properties,
873 scope_match_variables,
874 edge_filter_expr.as_ref(),
875 path_mode,
876 qpp_steps.as_deref(),
877 ),
878
879 LogicalPlan::ShortestPath {
880 input,
881 edge_type_ids,
882 direction,
883 source_variable,
884 target_variable,
885 target_label_id: _,
886 path_variable,
887 min_hops: _,
888 max_hops: _,
889 } => self.plan_shortest_path(
890 input,
891 edge_type_ids,
892 direction.clone(),
893 source_variable,
894 target_variable,
895 path_variable,
896 false,
897 all_properties,
898 ),
899
900 LogicalPlan::Filter {
902 input,
903 predicate,
904 optional_variables,
905 } => self.plan_filter(input, predicate, optional_variables, all_properties),
906
907 LogicalPlan::Project { input, projections } => {
908 let alias_map: HashMap<String, Expr> = projections
911 .iter()
912 .filter_map(|(expr, alias)| alias.as_ref().map(|a| (a.clone(), expr.clone())))
913 .collect();
914
915 self.plan_project_with_aliases(input, projections, all_properties, &alias_map)
917 }
918
919 LogicalPlan::Aggregate {
920 input,
921 group_by,
922 aggregates,
923 } => self.plan_aggregate(input, group_by, aggregates, all_properties),
924
925 LogicalPlan::Distinct { input } => {
926 let input_plan = self.plan_internal(input, all_properties)?;
927 let schema = input_plan.schema();
928 let group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
930 schema
931 .fields()
932 .iter()
933 .enumerate()
934 .map(|(i, f)| {
935 (
936 Arc::new(datafusion::physical_expr::expressions::Column::new(
937 f.name(),
938 i,
939 ))
940 as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
941 f.name().clone(),
942 )
943 })
944 .collect();
945 let group_by = PhysicalGroupBy::new_single(group_exprs);
946 Ok(Arc::new(AggregateExec::try_new(
947 AggregateMode::Single,
948 group_by,
949 vec![],
950 vec![],
951 input_plan.clone(),
952 input_plan.schema(),
953 )?))
954 }
955
956 LogicalPlan::Sort { input, order_by } => {
957 self.plan_sort(input, order_by, all_properties, &HashMap::new())
958 }
959
960 LogicalPlan::Limit { input, skip, fetch } => {
961 self.plan_limit(input, *skip, *fetch, all_properties)
962 }
963
964 LogicalPlan::Union { left, right, all } => {
965 self.plan_union(left, right, *all, all_properties)
966 }
967
968 LogicalPlan::Empty => self.plan_empty(),
969
970 LogicalPlan::BindZeroLengthPath {
971 input,
972 node_variable,
973 path_variable,
974 } => {
975 self.plan_bind_zero_length_path(input, node_variable, path_variable, all_properties)
976 }
977
978 LogicalPlan::BindPath {
979 input,
980 node_variables,
981 edge_variables,
982 path_variable,
983 } => self.plan_bind_path(
984 input,
985 node_variables,
986 edge_variables,
987 path_variable,
988 all_properties,
989 ),
990
991 LogicalPlan::Create { input, pattern } => {
993 tracing::debug!("Planning MutationCreateExec");
994 let child = self.plan_internal(input, all_properties)?;
995 let mutation_ctx = self.require_mutation_ctx()?;
996 Ok(Arc::new(new_create_exec(
997 child,
998 pattern.clone(),
999 mutation_ctx,
1000 )))
1001 }
1002 LogicalPlan::CreateBatch { input, patterns } => {
1003 tracing::debug!(
1004 patterns = patterns.len(),
1005 "Planning MutationCreateExec (batch)"
1006 );
1007 let child = self.plan_internal(input, all_properties)?;
1008 let mutation_ctx = self.require_mutation_ctx()?;
1009 let output_schema = extended_schema_for_new_vars(&child.schema(), patterns);
1012 Ok(Arc::new(MutationExec::new_with_schema(
1013 child,
1014 MutationKind::CreateBatch {
1015 patterns: patterns.clone(),
1016 },
1017 "MutationCreateExec",
1018 mutation_ctx,
1019 output_schema,
1020 )))
1021 }
1022 LogicalPlan::Set { input, items } => {
1023 tracing::debug!(items = items.len(), "Planning MutationSetExec");
1024 let child = self.plan_internal(input, all_properties)?;
1025 let mutation_ctx = self.require_mutation_ctx()?;
1026 Ok(Arc::new(new_set_exec(child, items.clone(), mutation_ctx)))
1027 }
1028 LogicalPlan::Remove { input, items } => {
1029 tracing::debug!(items = items.len(), "Planning MutationRemoveExec");
1030 let child = self.plan_internal(input, all_properties)?;
1031 let mutation_ctx = self.require_mutation_ctx()?;
1032 Ok(Arc::new(new_remove_exec(
1033 child,
1034 items.clone(),
1035 mutation_ctx,
1036 )))
1037 }
1038 LogicalPlan::Delete {
1039 input,
1040 items,
1041 detach,
1042 } => {
1043 tracing::debug!(
1044 items = items.len(),
1045 detach = detach,
1046 "Planning MutationDeleteExec"
1047 );
1048 let child = self.plan_internal(input, all_properties)?;
1049 let mutation_ctx = self.require_mutation_ctx()?;
1050 Ok(Arc::new(new_delete_exec(
1051 child,
1052 items.clone(),
1053 *detach,
1054 mutation_ctx,
1055 )))
1056 }
1057 LogicalPlan::Merge {
1058 input,
1059 pattern,
1060 on_match,
1061 on_create,
1062 } => {
1063 tracing::debug!("Planning MutationMergeExec");
1064 let child = self.plan_internal(input, all_properties)?;
1065 let mutation_ctx = self.require_mutation_ctx()?;
1066 Ok(Arc::new(new_merge_exec(
1067 child,
1068 pattern.clone(),
1069 on_match.clone(),
1070 on_create.clone(),
1071 mutation_ctx,
1072 )))
1073 }
1074
1075 LogicalPlan::Window {
1076 input,
1077 window_exprs,
1078 } => {
1079 let input_plan = self.plan_internal(input, all_properties)?;
1080 if !window_exprs.is_empty() {
1081 self.plan_window_functions(input_plan, window_exprs, Some(input.as_ref()))
1082 } else {
1083 Ok(input_plan)
1084 }
1085 }
1086
1087 LogicalPlan::CrossJoin { left, right } => {
1088 let left_plan = self.plan_internal(left, all_properties)?;
1089 let right_plan = self.plan_internal(right, all_properties)?;
1090
1091 let left_plan = if matches!(right.as_ref(), LogicalPlan::LocyDerivedScan { .. }) {
1097 let derived_schema = right_plan.schema();
1098 let derived_names: HashSet<&str> = derived_schema
1099 .fields()
1100 .iter()
1101 .map(|f| f.name().as_str())
1102 .collect();
1103 strip_conflicting_structural_columns(left_plan, &derived_names)?
1104 } else {
1105 left_plan
1106 };
1107
1108 Ok(Arc::new(
1109 datafusion::physical_plan::joins::CrossJoinExec::new(left_plan, right_plan),
1110 ))
1111 }
1112
1113 LogicalPlan::Apply {
1114 input,
1115 subquery,
1116 input_filter,
1117 } => self.plan_apply(input, subquery, input_filter.as_ref(), all_properties),
1118
1119 LogicalPlan::Unwind {
1120 input,
1121 expr,
1122 variable,
1123 } => self.plan_unwind(
1124 input.as_ref().clone(),
1125 expr.clone(),
1126 variable.clone(),
1127 all_properties,
1128 ),
1129
1130 LogicalPlan::VectorKnn {
1131 label_id,
1132 variable,
1133 property,
1134 query,
1135 k,
1136 threshold,
1137 } => self.plan_vector_knn(
1138 *label_id,
1139 variable,
1140 property,
1141 query.clone(),
1142 *k,
1143 *threshold,
1144 all_properties,
1145 ),
1146
1147 LogicalPlan::InvertedIndexLookup { .. } => Err(anyhow!(
1148 "Full-text search not yet supported in DataFusion engine"
1149 )),
1150
1151 LogicalPlan::AllShortestPaths {
1152 input,
1153 edge_type_ids,
1154 direction,
1155 source_variable,
1156 target_variable,
1157 target_label_id: _,
1158 path_variable,
1159 min_hops: _,
1160 max_hops: _,
1161 } => self.plan_shortest_path(
1162 input,
1163 edge_type_ids,
1164 direction.clone(),
1165 source_variable,
1166 target_variable,
1167 path_variable,
1168 true,
1169 all_properties,
1170 ),
1171
1172 LogicalPlan::QuantifiedPattern { .. } => Err(anyhow!(
1173 "Quantified patterns not yet supported in DataFusion engine"
1174 )),
1175
1176 LogicalPlan::RecursiveCTE {
1177 cte_name,
1178 initial,
1179 recursive,
1180 } => self.plan_recursive_cte(cte_name, initial, recursive, all_properties),
1181
1182 LogicalPlan::ProcedureCall {
1183 procedure_name,
1184 arguments,
1185 yield_items,
1186 } => self.plan_procedure_call(procedure_name, arguments, yield_items, all_properties),
1187
1188 LogicalPlan::SubqueryCall { input, subquery } => {
1189 self.plan_apply(input, subquery, None, all_properties)
1190 }
1191
1192 LogicalPlan::ExtIdLookup {
1193 variable,
1194 ext_id,
1195 filter,
1196 optional,
1197 } => self.plan_ext_id_lookup(variable, ext_id, filter.as_ref(), *optional),
1198
1199 LogicalPlan::Foreach {
1200 input,
1201 variable,
1202 list,
1203 body,
1204 } => {
1205 tracing::debug!(variable = variable.as_str(), "Planning ForeachExec");
1206 let child = self.plan_internal(input, all_properties)?;
1207 let mutation_ctx = self.require_mutation_ctx()?;
1208 Ok(Arc::new(
1209 super::df_graph::mutation_foreach::ForeachExec::new(
1210 child,
1211 variable.clone(),
1212 list.clone(),
1213 body.clone(),
1214 mutation_ctx,
1215 ),
1216 ))
1217 }
1218
1219 LogicalPlan::LocyPriority { input, key_columns } => {
1221 let child = self.plan_internal(input, all_properties)?;
1222 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1223 let priority_col_index = child.schema().index_of("__priority").map_err(|_| {
1224 anyhow::anyhow!("LocyPriority input must contain __priority column")
1225 })?;
1226 Ok(Arc::new(super::df_graph::locy_priority::PriorityExec::new(
1227 child,
1228 key_indices,
1229 priority_col_index,
1230 )))
1231 }
1232
1233 LogicalPlan::LocyBestBy {
1234 input,
1235 key_columns,
1236 criteria,
1237 } => {
1238 let child = self.plan_internal(input, all_properties)?;
1239 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1240 let sort_criteria = resolve_best_by_criteria(&child.schema(), criteria)?;
1241 Ok(Arc::new(super::df_graph::locy_best_by::BestByExec::new(
1242 child,
1243 key_indices,
1244 sort_criteria,
1245 true, )))
1247 }
1248
1249 LogicalPlan::LocyFold {
1250 input,
1251 key_columns,
1252 fold_bindings,
1253 strict_probability_domain,
1254 probability_epsilon,
1255 } => {
1256 let child = self.plan_internal(input, all_properties)?;
1257 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1258 let bindings =
1259 resolve_fold_bindings(&child.schema(), fold_bindings, &self.plugin_registry)?;
1260 Ok(Arc::new(super::df_graph::locy_fold::FoldExec::new(
1261 child,
1262 key_indices,
1263 bindings,
1264 *strict_probability_domain,
1265 *probability_epsilon,
1266 )))
1267 }
1268
1269 LogicalPlan::LocyDerivedScan {
1270 scan_index: _,
1271 data,
1272 schema,
1273 } => Ok(Arc::new(
1274 super::df_graph::locy_fixpoint::DerivedScanExec::new(
1275 Arc::clone(data),
1276 Arc::clone(schema),
1277 ),
1278 )),
1279
1280 LogicalPlan::LocyProject {
1281 input,
1282 projections,
1283 target_types,
1284 } => self.plan_locy_project(input, projections, target_types, all_properties),
1285
1286 LogicalPlan::LocyModelInvoke {
1287 input,
1288 invocations,
1289 classifier_registry,
1290 classifier_cache,
1291 classifier_provenance_store,
1292 path_context_handles,
1293 } => {
1294 let input_plan = self.plan_internal(input, all_properties)?;
1295 let xervo_runtime =
1301 super::df_graph::locy_model_invoke::XervoRuntimeHandle(
1302 self.graph_ctx.xervo_runtime().cloned(),
1303 );
1304 let graph_algo = {
1308 let l0_ctx = self.graph_ctx.l0_context();
1309 let l0_mgr = l0_ctx.current_l0.as_ref().map(|current| {
1310 let mut pending = l0_ctx.pending_flush_l0s.clone();
1311 if let Some(tx_l0) = &l0_ctx.transaction_l0 {
1312 pending.push(tx_l0.clone());
1313 }
1314 Arc::new(uni_store::runtime::l0_manager::L0Manager::from_snapshot(
1315 current.clone(),
1316 pending,
1317 ))
1318 });
1319 let l0_buffers = self.graph_ctx.l0_context().current_l0.as_ref().map(
1320 |current| super::df_graph::locy_model_invoke::L0Buffers {
1321 current: current.clone(),
1322 transaction: self.graph_ctx.l0_context().transaction_l0.clone(),
1323 pending_flush: self.graph_ctx.l0_context().pending_flush_l0s.clone(),
1324 },
1325 );
1326 super::df_graph::locy_model_invoke::GraphAlgoHandle {
1327 registry: self.graph_ctx.algo_registry().cloned(),
1328 storage: Some(self.graph_ctx.storage().clone()),
1329 l0_manager: l0_mgr,
1330 property_manager: Some(self.graph_ctx.property_manager().clone()),
1331 l0_buffers,
1332 }
1333 };
1334 Ok(Arc::new(
1335 super::df_graph::locy_model_invoke::LocyModelInvokeExec::new(
1336 input_plan,
1337 invocations.clone(),
1338 Arc::clone(classifier_registry),
1339 classifier_cache.as_ref().map(Arc::clone),
1340 classifier_provenance_store.as_ref().map(Arc::clone),
1341 path_context_handles.clone(),
1342 xervo_runtime,
1343 graph_algo,
1344 ),
1345 ))
1346 }
1347
1348 LogicalPlan::LocyProgram {
1349 strata,
1350 commands,
1351 derived_scan_registry,
1352 max_iterations,
1353 timeout,
1354 max_derived_bytes,
1355 deterministic_best_by,
1356 strict_probability_domain,
1357 probability_epsilon,
1358 exact_probability,
1359 max_bdd_variables,
1360 top_k_proofs,
1361 semiring_kind,
1362 classifier_registry,
1363 classifier_cache,
1364 classifier_provenance_store,
1365 } => {
1366 let output_schema = super::df_graph::locy_program::stats_schema();
1367
1368 Ok(Arc::new(
1369 super::df_graph::locy_program::LocyProgramExec::new_with_semiring_classifiers_and_cache(
1370 strata.clone(),
1371 commands.clone(),
1372 Arc::clone(derived_scan_registry),
1373 Arc::clone(&self.plugin_registry),
1374 Arc::clone(&self.graph_ctx),
1375 Arc::clone(&self.session_ctx),
1376 Arc::clone(&self.storage),
1377 Arc::clone(&self.schema),
1378 self.params.clone(),
1379 output_schema,
1380 *max_iterations,
1381 *timeout,
1382 *max_derived_bytes,
1383 *deterministic_best_by,
1384 *strict_probability_domain,
1385 *probability_epsilon,
1386 *exact_probability,
1387 *max_bdd_variables,
1388 *top_k_proofs,
1389 *semiring_kind,
1390 Arc::clone(classifier_registry),
1391 classifier_cache.as_ref().map(Arc::clone),
1392 classifier_provenance_store.as_ref().map(Arc::clone),
1393 ),
1394 ))
1395 }
1396
1397 LogicalPlan::CreateVectorIndex { .. }
1399 | LogicalPlan::CreateFullTextIndex { .. }
1400 | LogicalPlan::CreateScalarIndex { .. }
1401 | LogicalPlan::CreateJsonFtsIndex { .. }
1402 | LogicalPlan::DropIndex { .. }
1403 | LogicalPlan::ShowIndexes { .. }
1404 | LogicalPlan::Copy { .. }
1405 | LogicalPlan::Backup { .. }
1406 | LogicalPlan::ShowDatabase
1407 | LogicalPlan::ShowConfig
1408 | LogicalPlan::ShowStatistics
1409 | LogicalPlan::Vacuum
1410 | LogicalPlan::Checkpoint
1411 | LogicalPlan::CopyTo { .. }
1412 | LogicalPlan::CopyFrom { .. }
1413 | LogicalPlan::CreateLabel(_)
1414 | LogicalPlan::CreateEdgeType(_)
1415 | LogicalPlan::AlterLabel(_)
1416 | LogicalPlan::AlterEdgeType(_)
1417 | LogicalPlan::DropLabel(_)
1418 | LogicalPlan::DropEdgeType(_)
1419 | LogicalPlan::CreateConstraint(_)
1420 | LogicalPlan::DropConstraint(_)
1421 | LogicalPlan::ShowConstraints(_)
1422 | LogicalPlan::Explain { .. } => {
1423 Err(anyhow!("DDL/Admin operations should be handled separately"))
1424 }
1425 }
1426 }
1427
1428 fn plan_internal_with_aliases(
1432 &self,
1433 logical: &LogicalPlan,
1434 all_properties: &HashMap<String, HashSet<String>>,
1435 alias_map: &HashMap<String, Expr>,
1436 ) -> Result<Arc<dyn ExecutionPlan>> {
1437 match logical {
1438 LogicalPlan::Sort { input, order_by } => {
1439 self.plan_sort(input, order_by, all_properties, alias_map)
1440 }
1441 LogicalPlan::Limit { input, skip, fetch } => {
1442 let input_plan =
1444 self.plan_internal_with_aliases(input, all_properties, alias_map)?;
1445 if let Some(offset) = skip.filter(|&s| s > 0) {
1446 use datafusion::physical_plan::limit::GlobalLimitExec;
1447 Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, *fetch)))
1448 } else {
1449 Ok(Arc::new(LocalLimitExec::new(
1450 input_plan,
1451 fetch.unwrap_or(usize::MAX),
1452 )))
1453 }
1454 }
1455 _ => self.plan_internal(logical, all_properties),
1457 }
1458 }
1459
1460 fn extract_vid_from_cypher_filter(
1476 filter: Option<&Expr>,
1477 variable: &str,
1478 params: &HashMap<String, uni_common::Value>,
1479 ) -> Option<Vec<u64>> {
1480 use uni_cypher::ast::BinaryOp;
1481 let filter = filter?;
1482 match filter {
1483 Expr::BinaryOp {
1484 left,
1485 op: BinaryOp::Eq,
1486 right,
1487 } => {
1488 if let Expr::Property(var_expr, prop) = left.as_ref()
1490 && let Expr::Variable(v) = var_expr.as_ref()
1491 && v == variable
1492 && prop == "_vid"
1493 {
1494 return Self::resolve_vid_value(right, params).map(|v| vec![v]);
1495 }
1496 if let Expr::Property(var_expr, prop) = right.as_ref()
1498 && let Expr::Variable(v) = var_expr.as_ref()
1499 && v == variable
1500 && prop == "_vid"
1501 {
1502 return Self::resolve_vid_value(left, params).map(|v| vec![v]);
1503 }
1504 None
1505 }
1506 Expr::In { expr, list } => {
1507 let Expr::Property(var_expr, prop) = expr.as_ref() else {
1509 return None;
1510 };
1511 let Expr::Variable(v) = var_expr.as_ref() else {
1512 return None;
1513 };
1514 if v != variable || prop != "_vid" {
1515 return None;
1516 }
1517 let Expr::List(items) = list.as_ref() else {
1518 return None;
1519 };
1520 let mut out = Vec::with_capacity(items.len());
1521 for item in items {
1522 out.push(Self::resolve_vid_value(item, params)?);
1523 }
1524 if out.is_empty() { None } else { Some(out) }
1525 }
1526 Expr::BinaryOp {
1527 left,
1528 op: BinaryOp::And,
1529 right,
1530 } => Self::extract_vid_from_cypher_filter(Some(left), variable, params)
1531 .or_else(|| Self::extract_vid_from_cypher_filter(Some(right), variable, params)),
1532 _ => None,
1533 }
1534 }
1535
1536 fn build_vid_physical_filter(
1541 col_name: &str,
1542 vid: u64,
1543 ) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
1544 use datafusion::physical_expr::expressions::{BinaryExpr, Column, Literal};
1545 Arc::new(BinaryExpr::new(
1546 Arc::new(Column::new(col_name, 0)),
1547 datafusion::logical_expr::Operator::Eq,
1548 Arc::new(Literal::new(datafusion::common::ScalarValue::UInt64(Some(
1549 vid,
1550 )))),
1551 ))
1552 }
1553
1554 fn resolve_vid_value(expr: &Expr, params: &HashMap<String, uni_common::Value>) -> Option<u64> {
1555 match expr {
1556 Expr::Literal(CypherLiteral::Integer(v)) if *v >= 0 => Some(*v as u64),
1557 Expr::Parameter(name) => match params.get(name) {
1558 Some(uni_common::Value::Int(v)) if *v >= 0 => Some(*v as u64),
1559 _ => None,
1560 },
1561 _ => None,
1562 }
1563 }
1564
1565 fn and_join_predicates(mut preds: Vec<Expr>) -> Expr {
1569 if preds.is_empty() {
1570 return uni_cypher::ast::Expr::TRUE;
1571 }
1572 let mut acc = preds.remove(0);
1573 for p in preds {
1574 acc = Expr::BinaryOp {
1575 left: Box::new(acc),
1576 op: uni_cypher::ast::BinaryOp::And,
1577 right: Box::new(p),
1578 };
1579 }
1580 acc
1581 }
1582
1583 fn build_indexed_property_pushdown(
1595 &self,
1596 filter: Option<&Expr>,
1597 variable: &str,
1598 label_id: u16,
1599 scan_schema: &SchemaRef,
1600 ) -> Option<(String, Arc<dyn datafusion::physical_expr::PhysicalExpr>)> {
1601 let filter = filter?;
1602 let analyzer = crate::query::pushdown::IndexAwareAnalyzer::new(&self.schema);
1603 let strategy = analyzer.analyze(filter, variable, label_id);
1604 if strategy.hash_index_columns.is_empty() {
1605 return None;
1606 }
1607
1608 let label_name = self.schema.label_name_by_id(label_id)?;
1616 let label_props = self.schema.properties.get(label_name);
1617 let mut indexed_preds: Vec<Expr> = Vec::new();
1618 for pred in &strategy.lance_predicates {
1619 if let Some(col) = crate::query::pushdown::predicate_target_column(pred, variable)
1620 && strategy.hash_index_columns.iter().any(|c| c == &col)
1621 {
1622 let resolved = crate::query::pushdown::substitute_params(pred, &self.params)?;
1623 indexed_preds.push(resolved);
1624 }
1625 }
1626 if indexed_preds.is_empty() {
1627 return None;
1628 }
1629
1630 let lance_str = crate::query::pushdown::LanceFilterGenerator::generate(
1632 &indexed_preds,
1633 variable,
1634 label_props,
1635 )?;
1636
1637 let combined = Self::and_join_predicates(indexed_preds.clone());
1641 let mut variable_kinds = HashMap::new();
1642 variable_kinds.insert(variable.to_string(), VariableKind::Node);
1643 let mut variable_labels = HashMap::new();
1644 variable_labels.insert(variable.to_string(), label_name.to_string());
1645 let ctx = TranslationContext {
1646 parameters: self.params.clone(),
1647 variable_labels,
1648 variable_kinds,
1649 ..Default::default()
1650 };
1651 let df_filter = cypher_expr_to_df(&combined, Some(&ctx)).ok()?;
1652 let session = self.session_ctx.read();
1653 let physical = self
1654 .create_physical_filter_expr(&df_filter, scan_schema, &session)
1655 .ok()?;
1656 Some((lance_str, physical))
1657 }
1658
1659 fn wrap_read_set_recording(
1668 &self,
1669 plan: Arc<dyn ExecutionPlan>,
1670 variable: &str,
1671 ) -> Arc<dyn ExecutionPlan> {
1672 let has_read_set = self
1673 .graph_ctx
1674 .l0_context()
1675 .transaction_l0
1676 .as_ref()
1677 .is_some_and(|l0| l0.read().occ_read_set.is_some());
1678 if !has_read_set {
1679 return plan;
1680 }
1681 Arc::new(ReadSetRecordingExec::new(
1682 plan,
1683 self.graph_ctx.clone(),
1684 variable,
1685 ))
1686 }
1687
1688 fn apply_scan_filter(
1689 &self,
1690 plan: Arc<dyn ExecutionPlan>,
1691 variable: &str,
1692 filter: Option<&Expr>,
1693 label_name: Option<&str>,
1694 ) -> Result<Arc<dyn ExecutionPlan>> {
1695 let Some(filter_expr) = filter else {
1696 return Ok(plan);
1697 };
1698
1699 let mut variable_kinds = HashMap::new();
1700 variable_kinds.insert(variable.to_string(), VariableKind::Node);
1701 let mut variable_labels = HashMap::new();
1702 if let Some(label) = label_name {
1703 variable_labels.insert(variable.to_string(), label.to_string());
1704 }
1705 let ctx = TranslationContext {
1706 parameters: self.params.clone(),
1707 variable_labels,
1708 variable_kinds,
1709 ..Default::default()
1710 };
1711 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1712
1713 let schema = plan.schema();
1714
1715 let session = self.session_ctx.read();
1716 let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1717
1718 Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1719 }
1720
1721 #[expect(clippy::too_many_arguments)]
1728 fn apply_schemaless_traverse_filter(
1729 &self,
1730 plan: Arc<dyn ExecutionPlan>,
1731 filter_expr: Option<&Expr>,
1732 source_variable: &str,
1733 target_variable: &str,
1734 step_variable: Option<&str>,
1735 path_variable: Option<&str>,
1736 is_variable_length: bool,
1737 optional: bool,
1738 optional_pattern_vars: &HashSet<String>,
1739 ) -> Result<Arc<dyn ExecutionPlan>> {
1740 let Some(filter_expr) = filter_expr else {
1741 return Ok(plan);
1742 };
1743
1744 let mut variable_kinds = HashMap::new();
1745 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
1746 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
1747 if let Some(sv) = step_variable {
1748 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
1749 }
1750 if let Some(pv) = path_variable {
1751 variable_kinds.insert(pv.to_string(), VariableKind::Path);
1752 }
1753 let ctx = TranslationContext {
1754 parameters: self.params.clone(),
1755 variable_kinds,
1756 ..Default::default()
1757 };
1758 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1759 let schema = plan.schema();
1760 let session = self.session_ctx.read();
1761 let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1762
1763 if optional {
1764 Ok(Arc::new(OptionalFilterExec::new(
1765 plan,
1766 physical_filter,
1767 optional_pattern_vars.clone(),
1768 )))
1769 } else {
1770 Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1771 }
1772 }
1773
1774 fn plan_ext_id_lookup(
1776 &self,
1777 variable: &str,
1778 ext_id: &str,
1779 filter: Option<&Expr>,
1780 optional: bool,
1781 ) -> Result<Arc<dyn ExecutionPlan>> {
1782 let properties = if let Some(filter_expr) = filter {
1784 crate::query::df_expr::collect_properties(filter_expr)
1785 .into_iter()
1786 .filter(|(var, _)| var == variable)
1787 .map(|(_, prop)| prop)
1788 .collect()
1789 } else {
1790 vec![]
1791 };
1792
1793 let lookup_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphExtIdLookupExec::new(
1794 self.graph_ctx.clone(),
1795 variable.to_string(),
1796 ext_id.to_string(),
1797 properties,
1798 optional,
1799 ));
1800
1801 self.apply_scan_filter(lookup_plan, variable, filter, None)
1802 }
1803
1804 fn plan_unwind(
1808 &self,
1809 input: LogicalPlan,
1810 expr: Expr,
1811 variable: String,
1812 all_properties: &HashMap<String, HashSet<String>>,
1813 ) -> Result<Arc<dyn ExecutionPlan>> {
1814 let input_plan = self.plan_internal(&input, all_properties)?;
1816
1817 let unwind = GraphUnwindExec::new(input_plan, expr, variable, self.params.clone());
1818
1819 Ok(Arc::new(unwind))
1820 }
1821
1822 fn plan_recursive_cte(
1827 &self,
1828 cte_name: &str,
1829 initial: &LogicalPlan,
1830 recursive: &LogicalPlan,
1831 _all_properties: &HashMap<String, HashSet<String>>,
1832 ) -> Result<Arc<dyn ExecutionPlan>> {
1833 Ok(Arc::new(RecursiveCTEExec::new(
1834 cte_name.to_string(),
1835 initial.clone(),
1836 recursive.clone(),
1837 self.graph_ctx.clone(),
1838 self.session_ctx.clone(),
1839 self.storage.clone(),
1840 self.schema.clone(),
1841 self.params.clone(),
1842 self.mutation_ctx.clone(),
1843 )))
1844 }
1845
1846 fn plan_apply(
1848 &self,
1849 input: &LogicalPlan,
1850 subquery: &LogicalPlan,
1851 input_filter: Option<&Expr>,
1852 all_properties: &HashMap<String, HashSet<String>>,
1853 ) -> Result<Arc<dyn ExecutionPlan>> {
1854 use crate::query::df_graph::common::infer_logical_plan_schema;
1855
1856 let input_exec = self.plan_internal(input, all_properties)?;
1858 let input_schema = input_exec.schema();
1859
1860 let subquery_effective = match subquery {
1869 LogicalPlan::Limit {
1870 input: inner,
1871 skip: None,
1872 fetch: Some(0),
1873 } => inner.as_ref(),
1874 _ => subquery,
1875 };
1876
1877 let sub_schema = infer_logical_plan_schema(subquery, &self.schema);
1881
1882 let sub_field_names: HashSet<&str> = sub_schema
1891 .fields()
1892 .iter()
1893 .map(|f| f.name().as_str())
1894 .collect();
1895 let kept_input_indices: Vec<usize> = input_schema
1903 .fields()
1904 .iter()
1905 .enumerate()
1906 .filter(|(_, f)| !sub_field_names.contains(f.name().as_str()))
1907 .map(|(i, _)| i)
1908 .collect();
1909 let kept_input_overrides: Vec<Option<(String, String)>> = kept_input_indices
1914 .iter()
1915 .map(|&i| {
1916 let name = input_schema.field(i).name();
1917 if let Some(dot) = name.find('.') {
1918 let base = &name[..dot];
1919 if sub_field_names.contains(base) {
1920 return Some((base.to_string(), name[dot + 1..].to_string()));
1921 }
1922 }
1923 None
1924 })
1925 .collect();
1926 let mut fields: Vec<Arc<arrow_schema::Field>> = kept_input_indices
1927 .iter()
1928 .map(|&i| input_schema.fields()[i].clone())
1929 .collect();
1930 fields.extend(sub_schema.fields().iter().cloned());
1931 let output_schema: SchemaRef = Arc::new(Schema::new(fields));
1932
1933 Ok(Arc::new(GraphApplyExec::new(
1934 input_exec,
1935 subquery_effective.clone(),
1936 input_filter.cloned(),
1937 self.graph_ctx.clone(),
1938 self.session_ctx.clone(),
1939 self.storage.clone(),
1940 self.schema.clone(),
1941 self.params.clone(),
1942 output_schema,
1943 kept_input_indices,
1944 kept_input_overrides,
1945 self.mutation_ctx.clone(),
1946 )))
1947 }
1948
1949 #[expect(clippy::too_many_arguments)]
1951 fn plan_vector_knn(
1952 &self,
1953 label_id: u16,
1954 variable: &str,
1955 property: &str,
1956 query_expr: Expr,
1957 k: usize,
1958 threshold: Option<f32>,
1959 all_properties: &HashMap<String, HashSet<String>>,
1960 ) -> Result<Arc<dyn ExecutionPlan>> {
1961 let label_name = self
1962 .schema
1963 .label_name_by_id(label_id)
1964 .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
1965
1966 let target_properties = self.resolve_properties(variable, label_name, all_properties);
1967
1968 let plugin_source = self
1977 .schema
1978 .vector_index_for_property(label_name, property)
1979 .and_then(|cfg| {
1980 self.plugin_registry
1981 .index_handle(&cfg.name)
1982 .map(|entry| (cfg.name.clone(), entry))
1983 });
1984
1985 let knn = if let Some((index_name, entry)) = plugin_source {
1986 tracing::debug!(
1987 target: "uni.plugin.registry",
1988 index_kind = %entry.kind.0,
1989 index_name = %index_name,
1990 "plan_vector_knn: dispatching via plugin IndexHandle"
1991 );
1992 GraphVectorKnnExec::with_plugin_source(
1993 self.graph_ctx.clone(),
1994 label_id,
1995 label_name,
1996 variable.to_string(),
1997 property.to_string(),
1998 query_expr,
1999 k,
2000 threshold,
2001 self.params.clone(),
2002 target_properties,
2003 entry.kind,
2004 entry.handle,
2005 )
2006 } else {
2007 GraphVectorKnnExec::new(
2008 self.graph_ctx.clone(),
2009 label_id,
2010 label_name,
2011 variable.to_string(),
2012 property.to_string(),
2013 query_expr,
2014 k,
2015 threshold,
2016 self.params.clone(),
2017 target_properties,
2018 )
2019 };
2020
2021 Ok(self.wrap_read_set_recording(Arc::new(knn), variable))
2027 }
2028
2029 fn plan_procedure_call(
2031 &self,
2032 procedure_name: &str,
2033 arguments: &[Expr],
2034 yield_items: &[(String, Option<String>)],
2035 all_properties: &HashMap<String, HashSet<String>>,
2036 ) -> Result<Arc<dyn ExecutionPlan>> {
2037 use crate::query::df_graph::procedure_call::map_yield_to_canonical;
2038
2039 let mut target_properties: HashMap<String, Vec<String>> = HashMap::new();
2041
2042 if crate::query::df_graph::procedure_call::is_node_yield_procedure_static(procedure_name) {
2043 for (name, alias) in yield_items {
2044 let output_name = alias.as_ref().unwrap_or(name);
2045 let canonical = map_yield_to_canonical(name);
2046 if canonical == "node" {
2047 if let Some(props) = all_properties.get(output_name.as_str()) {
2049 let prop_list: Vec<String> = props
2050 .iter()
2051 .filter(|p| *p != "*" && !p.starts_with('_'))
2052 .cloned()
2053 .collect();
2054 target_properties.insert(output_name.clone(), prop_list);
2055 }
2056 }
2057 }
2058 }
2059
2060 let exec = GraphProcedureCallExec::new(
2061 self.graph_ctx.clone(),
2062 procedure_name.to_string(),
2063 arguments.to_vec(),
2064 yield_items.to_vec(),
2065 self.params.clone(),
2066 self.outer_values.clone(),
2067 target_properties,
2068 );
2069
2070 Ok(Arc::new(exec))
2071 }
2072
2073 fn plan_scan(
2075 &self,
2076 label_id: u16,
2077 variable: &str,
2078 filter: Option<&Expr>,
2079 optional: bool,
2080 all_properties: &HashMap<String, HashSet<String>>,
2081 ) -> Result<Arc<dyn ExecutionPlan>> {
2082 if uni_common::core::schema::is_virtual_label_id(label_id) {
2088 let entry = self
2089 .plugin_registry
2090 .virtual_label_by_id(label_id)
2091 .ok_or_else(|| {
2092 anyhow!(
2093 "Virtual label id {label_id:#x} has no registered CatalogTable; \
2094 the originating CatalogProvider may have been deregistered \
2095 after the plan was cached"
2096 )
2097 })?;
2098 let label_name = entry.name.as_str();
2099 let properties = self.resolve_properties(variable, label_name, all_properties);
2100 let pushdown_filters: Vec<datafusion::logical_expr::Expr> = filter
2101 .map(|f| -> Result<Vec<_>> {
2102 let ctx = crate::query::df_expr::TranslationContext {
2103 parameters: self.params.clone(),
2104 outer_values: self.outer_values.clone(),
2105 ..Default::default()
2106 };
2107 let df = crate::query::df_expr::cypher_expr_to_df(f, Some(&ctx))?;
2108 Ok(vec![df])
2109 })
2110 .transpose()?
2111 .unwrap_or_default();
2112 let exec = crate::query::df_graph::catalog_scan::CatalogVertexScanExec::try_new(
2113 entry.table,
2114 label_id,
2115 label_name.to_string(),
2116 variable.to_string(),
2117 properties,
2118 pushdown_filters,
2119 None, )?;
2121 let mut plan: Arc<dyn ExecutionPlan> = Arc::new(exec);
2122 plan = self.apply_scan_filter(plan, variable, filter, Some(label_name))?;
2125 return self.wrap_optional(plan, optional);
2134 }
2135
2136 let label_name = self
2137 .schema
2138 .label_name_by_id(label_id)
2139 .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
2140
2141 let mut properties = self.resolve_properties(variable, label_name, all_properties);
2143
2144 let label_props = self.schema.properties.get(label_name);
2146 let has_projection_overflow = properties.iter().any(|p| {
2147 p != "overflow_json"
2148 && !p.starts_with('_')
2149 && !label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
2150 });
2151 if has_projection_overflow && !properties.iter().any(|p| p == "overflow_json") {
2152 properties.push("overflow_json".to_string());
2153 }
2154
2155 if let Some(filter_expr) = filter {
2158 let filter_props = crate::query::df_expr::collect_properties(filter_expr);
2159 let has_overflow = filter_props.iter().any(|(var, prop)| {
2160 var == variable
2161 && !prop.starts_with('_')
2162 && label_props.is_none_or(|props| !props.contains_key(prop.as_str()))
2163 });
2164 if has_overflow && !properties.iter().any(|p| p == "overflow_json") {
2165 properties.push("overflow_json".to_string());
2166 }
2167 }
2168
2169 let var_props = all_properties.get(variable);
2178 let need_full =
2179 var_props.is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL));
2180 let need_full_record = var_props.is_some_and(|p| p.contains("*"));
2181 if need_full_record {
2182 if !properties.contains(&"_all_props".to_string()) {
2183 properties.push("_all_props".to_string());
2184 }
2185 if !properties.contains(&"overflow_json".to_string()) {
2186 properties.push("overflow_json".to_string());
2187 }
2188 }
2189
2190 let extracted_vids = Self::extract_vid_from_cypher_filter(filter, variable, &self.params);
2198 let scan_filter = extracted_vids
2199 .as_deref()
2200 .filter(|v| v.len() == 1)
2201 .map(|v| Self::build_vid_physical_filter(&format!("{variable}._vid"), v[0]));
2202 let mut scan_exec = GraphScanExec::new_vertex_scan(
2203 self.graph_ctx.clone(),
2204 label_name.to_string(),
2205 variable.to_string(),
2206 properties.clone(),
2207 scan_filter,
2208 );
2209 if let Some(vids) = extracted_vids
2210 && vids.len() > 1
2211 {
2212 scan_exec = scan_exec.with_vid_list_filter(vids);
2213 }
2214
2215 let scan_schema_for_idx = scan_exec.schema();
2223 if let Some((lance_str, runtime_filter)) =
2224 self.build_indexed_property_pushdown(filter, variable, label_id, &scan_schema_for_idx)
2225 {
2226 scan_exec = scan_exec
2227 .with_extra_lance_filter(lance_str)
2228 .with_extra_runtime_filter(runtime_filter);
2229 }
2230 let mut scan_plan: Arc<dyn ExecutionPlan> = Arc::new(scan_exec);
2231
2232 scan_plan = self.apply_scan_filter(scan_plan, variable, filter, Some(label_name))?;
2237
2238 scan_plan = self.wrap_read_set_recording(scan_plan, variable);
2241
2242 if need_full {
2243 let struct_props: Vec<String> = properties
2246 .iter()
2247 .filter(|p| *p != "overflow_json" && *p != "*" && *p != STRUCT_ONLY_SENTINEL)
2248 .cloned()
2249 .collect();
2250 scan_plan = self.add_structural_projection(scan_plan, variable, &struct_props)?;
2251 }
2252
2253 self.wrap_optional(scan_plan, optional)
2254 }
2255
2256 fn add_wildcard_structural_projection(
2265 &self,
2266 plan: Arc<dyn ExecutionPlan>,
2267 variable: &str,
2268 all_properties: &HashMap<String, HashSet<String>>,
2269 ) -> Result<Arc<dyn ExecutionPlan>> {
2270 if !all_properties
2271 .get(variable)
2272 .is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL))
2273 {
2274 return Ok(plan);
2275 }
2276 let prefix = format!("{}.", variable);
2277 let struct_props: Vec<String> = plan
2278 .schema()
2279 .fields()
2280 .iter()
2281 .filter_map(|f| {
2282 f.name()
2283 .strip_prefix(&prefix)
2284 .filter(|prop| !prop.starts_with('_') || *prop == "_all_props")
2285 .map(|prop| prop.to_string())
2286 })
2287 .collect();
2288 self.add_structural_projection(plan, variable, &struct_props)
2289 }
2290
2291 fn detect_bound_target(input_schema: &SchemaRef, target_variable: &str) -> Option<String> {
2295 let col = format!("{}._vid", target_variable);
2297 if input_schema.column_with_name(&col).is_some() {
2298 return Some(col);
2299 }
2300 if let Ok(field) = input_schema.field_with_name(target_variable)
2306 && matches!(
2307 field.data_type(),
2308 datafusion::arrow::datatypes::DataType::UInt64
2309 | datafusion::arrow::datatypes::DataType::Int64
2310 )
2311 {
2312 return Some(target_variable.to_string());
2313 }
2314 None
2315 }
2316
2317 fn resolve_schemaless_properties(
2324 variable: &str,
2325 all_properties: &HashMap<String, HashSet<String>>,
2326 ) -> (Vec<String>, bool) {
2327 let mut properties: Vec<String> = all_properties
2328 .get(variable)
2329 .map(|s| {
2330 s.iter()
2331 .filter(|p| *p != "*" && *p != STRUCT_ONLY_SENTINEL)
2332 .cloned()
2333 .collect()
2334 })
2335 .unwrap_or_default();
2336 let need_full = all_properties
2337 .get(variable)
2338 .is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL));
2339 if !properties.iter().any(|p| p == "_all_props") {
2340 properties.push("_all_props".to_string());
2341 }
2342 (properties, need_full)
2343 }
2344
2345 fn collect_used_edge_columns(
2348 schema: &SchemaRef,
2349 scope_match_variables: &HashSet<String>,
2350 exclude_col: Option<&str>,
2351 ) -> Vec<String> {
2352 schema
2353 .fields()
2354 .iter()
2355 .filter_map(|f| {
2356 let name = f.name();
2357 if exclude_col.is_some_and(|exc| name == exc) {
2358 None
2359 } else if name.ends_with("._eid") {
2360 let var_name = name.trim_end_matches("._eid");
2361 scope_match_variables
2362 .contains(var_name)
2363 .then(|| name.clone())
2364 } else if name.starts_with("__eid_to_") {
2365 let var_name = name.trim_start_matches("__eid_to_");
2366 scope_match_variables
2367 .contains(var_name)
2368 .then(|| name.clone())
2369 } else {
2370 None
2371 }
2372 })
2373 .collect()
2374 }
2375
2376 fn maybe_add_edge_structural_projection(
2379 &self,
2380 plan: Arc<dyn ExecutionPlan>,
2381 step_variable: Option<&str>,
2382 source_variable: &str,
2383 target_variable: &str,
2384 all_properties: &HashMap<String, HashSet<String>>,
2385 skip_if_vlp: bool,
2386 ) -> Result<Arc<dyn ExecutionPlan>> {
2387 if skip_if_vlp {
2388 return Ok(plan);
2389 }
2390 let Some(edge_var) = step_variable else {
2391 return Ok(plan);
2392 };
2393 if !all_properties
2394 .get(edge_var)
2395 .is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL))
2396 {
2397 return Ok(plan);
2398 }
2399 let prefix = format!("{}.", edge_var);
2401 let edge_props: Vec<String> = plan
2402 .schema()
2403 .fields()
2404 .iter()
2405 .filter_map(|f| {
2406 f.name()
2407 .strip_prefix(&prefix)
2408 .filter(|prop| !prop.starts_with('_') && *prop != "overflow_json")
2409 .map(|prop| prop.to_string())
2410 })
2411 .collect();
2412 self.add_edge_structural_projection(
2413 plan,
2414 edge_var,
2415 &edge_props,
2416 source_variable,
2417 target_variable,
2418 )
2419 }
2420
2421 fn finalize_schemaless_scan(
2423 &self,
2424 scan_plan: Arc<dyn ExecutionPlan>,
2425 variable: &str,
2426 filter: Option<&Expr>,
2427 optional: bool,
2428 properties: &[String],
2429 need_full: bool,
2430 ) -> Result<Arc<dyn ExecutionPlan>> {
2431 let mut plan = self.apply_scan_filter(scan_plan, variable, filter, None)?;
2434
2435 plan = self.wrap_read_set_recording(plan, variable);
2438
2439 if need_full {
2442 let struct_props: Vec<String> = properties
2447 .iter()
2448 .filter(|p| *p != "*" && *p != STRUCT_ONLY_SENTINEL)
2449 .cloned()
2450 .collect();
2451 plan = self.add_structural_projection(plan, variable, &struct_props)?;
2452 }
2453
2454 self.wrap_optional(plan, optional)
2455 }
2456
2457 fn plan_schemaless_scan(
2458 &self,
2459 label_name: &str,
2460 variable: &str,
2461 filter: Option<&Expr>,
2462 optional: bool,
2463 all_properties: &HashMap<String, HashSet<String>>,
2464 ) -> Result<Arc<dyn ExecutionPlan>> {
2465 let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
2466 let scan_plan: Arc<dyn ExecutionPlan> =
2467 Arc::new(GraphScanExec::new_schemaless_vertex_scan(
2468 self.graph_ctx.clone(),
2469 label_name.to_string(),
2470 variable.to_string(),
2471 properties.clone(),
2472 None,
2473 ));
2474 self.finalize_schemaless_scan(
2475 scan_plan,
2476 variable,
2477 filter,
2478 optional,
2479 &properties,
2480 need_full,
2481 )
2482 }
2483
2484 fn classify_labels(
2491 registry: &uni_plugin::PluginRegistry,
2492 labels: &[String],
2493 ) -> (Vec<(String, u16)>, Vec<String>) {
2494 let mut virtual_labels: Vec<(String, u16)> = Vec::new();
2495 let mut native_labels: Vec<String> = Vec::new();
2496 for label in labels {
2497 if let Some(id) = registry.virtual_label_by_name(label) {
2498 virtual_labels.push((label.clone(), id));
2499 } else {
2500 native_labels.push(label.clone());
2501 }
2502 }
2503 (virtual_labels, native_labels)
2504 }
2505
2506 fn plan_multi_label_scan(
2522 &self,
2523 labels: &[String],
2524 variable: &str,
2525 filter: Option<&Expr>,
2526 optional: bool,
2527 all_properties: &HashMap<String, HashSet<String>>,
2528 ) -> Result<Arc<dyn ExecutionPlan>> {
2529 let (virtual_labels, native_labels) = Self::classify_labels(&self.plugin_registry, labels);
2530
2531 if virtual_labels.is_empty() {
2533 let (properties, need_full) =
2534 Self::resolve_schemaless_properties(variable, all_properties);
2535 let scan_plan: Arc<dyn ExecutionPlan> =
2536 Arc::new(GraphScanExec::new_multi_label_vertex_scan(
2537 self.graph_ctx.clone(),
2538 labels.to_vec(),
2539 variable.to_string(),
2540 properties.clone(),
2541 None,
2542 ));
2543 return self.finalize_schemaless_scan(
2544 scan_plan,
2545 variable,
2546 filter,
2547 optional,
2548 &properties,
2549 need_full,
2550 );
2551 }
2552
2553 let virtual_side =
2559 self.build_virtual_union_scan(&virtual_labels, variable, filter, all_properties)?;
2560
2561 if native_labels.is_empty() {
2563 let plan = self.apply_scan_filter(virtual_side, variable, filter, None)?;
2568 return self.wrap_optional(plan, optional);
2569 }
2570
2571 let native_properties: Vec<String> = vec!["_all_props".to_string()];
2577 let native_scan: Arc<dyn ExecutionPlan> =
2578 Arc::new(GraphScanExec::new_multi_label_vertex_scan(
2579 self.graph_ctx.clone(),
2580 native_labels,
2581 variable.to_string(),
2582 native_properties,
2583 None,
2584 ));
2585
2586 let joined = self.semi_join_on_vid(virtual_side, native_scan, variable)?;
2587 let plan = self.apply_scan_filter(joined, variable, filter, None)?;
2588 self.wrap_optional(plan, optional)
2589 }
2590
2591 fn build_virtual_union_scan(
2598 &self,
2599 virtual_labels: &[(String, u16)],
2600 variable: &str,
2601 filter: Option<&Expr>,
2602 all_properties: &HashMap<String, HashSet<String>>,
2603 ) -> Result<Arc<dyn ExecutionPlan>> {
2604 let pushdown_filters: Vec<DfExpr> = filter
2605 .map(|f| -> Result<Vec<_>> {
2606 let ctx = crate::query::df_expr::TranslationContext {
2607 parameters: self.params.clone(),
2608 outer_values: self.outer_values.clone(),
2609 ..Default::default()
2610 };
2611 let df = crate::query::df_expr::cypher_expr_to_df(f, Some(&ctx))?;
2612 Ok(vec![df])
2613 })
2614 .transpose()?
2615 .unwrap_or_default();
2616
2617 let mut scans: Vec<Arc<dyn ExecutionPlan>> = Vec::with_capacity(virtual_labels.len());
2618 for (label_name, label_id) in virtual_labels {
2619 let entry = self
2620 .plugin_registry
2621 .virtual_label_by_id(*label_id)
2622 .ok_or_else(|| {
2623 anyhow!(
2624 "Virtual label `{label_name}` (id {label_id:#x}) has no \
2625 registered CatalogTable; the originating CatalogProvider \
2626 may have been deregistered after the plan was cached"
2627 )
2628 })?;
2629 let properties = self.resolve_properties(variable, label_name, all_properties);
2630 let exec = crate::query::df_graph::catalog_scan::CatalogVertexScanExec::try_new(
2631 entry.table,
2632 *label_id,
2633 label_name.clone(),
2634 variable.to_string(),
2635 properties,
2636 pushdown_filters.clone(),
2637 None,
2638 )?;
2639 scans.push(Arc::new(exec));
2640 }
2641
2642 if scans.len() == 1 {
2643 Ok(scans.pop().expect("len == 1 implies non-empty"))
2644 } else {
2645 UnionExec::try_new(scans).map_err(|e| anyhow!("UnionExec construction failed: {e}"))
2646 }
2647 }
2648
2649 fn semi_join_on_vid(
2653 &self,
2654 left: Arc<dyn ExecutionPlan>,
2655 right: Arc<dyn ExecutionPlan>,
2656 variable: &str,
2657 ) -> Result<Arc<dyn ExecutionPlan>> {
2658 use datafusion::common::NullEquality;
2659 use datafusion::physical_plan::expressions::Column;
2660 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
2661
2662 let vid_col = format!("{variable}._vid");
2663 let left_idx = left
2664 .schema()
2665 .index_of(&vid_col)
2666 .map_err(|e| anyhow!("virtual scan output missing `{vid_col}`: {e}"))?;
2667 let right_idx = right
2668 .schema()
2669 .index_of(&vid_col)
2670 .map_err(|e| anyhow!("native scan output missing `{vid_col}`: {e}"))?;
2671 let on: Vec<(
2672 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2673 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2674 )> = vec![(
2675 Arc::new(Column::new(&vid_col, left_idx)),
2676 Arc::new(Column::new(&vid_col, right_idx)),
2677 )];
2678 let join = HashJoinExec::try_new(
2679 left,
2680 right,
2681 on,
2682 None,
2683 &JoinType::LeftSemi,
2684 None,
2685 PartitionMode::CollectLeft,
2686 NullEquality::NullEqualsNothing,
2687 false,
2688 )?;
2689 Ok(Arc::new(join))
2690 }
2691
2692 fn hydrate_virtual_target_from_catalog(
2709 &self,
2710 traverse_plan: Arc<dyn ExecutionPlan>,
2711 target_label_id: u16,
2712 target_variable: &str,
2713 all_properties: &HashMap<String, HashSet<String>>,
2714 ) -> Result<Arc<dyn ExecutionPlan>> {
2715 use datafusion::common::NullEquality;
2716 use datafusion::physical_expr::expressions::{Column, col as col_expr};
2717 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
2718
2719 let entry = self
2720 .plugin_registry
2721 .virtual_label_by_id(target_label_id)
2722 .ok_or_else(|| {
2723 anyhow!(
2724 "Virtual label id {target_label_id:#x} for target `{target_variable}` has no \
2725 registered CatalogTable; the originating CatalogProvider may have been \
2726 deregistered after the plan was cached"
2727 )
2728 })?;
2729 let label_name = entry.name.as_str();
2730 let properties = self.resolve_properties(target_variable, label_name, all_properties);
2731 let catalog_exec = crate::query::df_graph::catalog_scan::CatalogVertexScanExec::try_new(
2737 entry.table,
2738 target_label_id,
2739 label_name.to_string(),
2740 target_variable.to_string(),
2741 properties,
2742 Vec::new(),
2743 None,
2744 )?;
2745 let catalog_plan: Arc<dyn ExecutionPlan> = Arc::new(catalog_exec);
2746
2747 let vid_col_name = format!("{target_variable}._vid");
2748 let left_idx = traverse_plan
2749 .schema()
2750 .index_of(&vid_col_name)
2751 .map_err(|e| anyhow!("traverse plan missing `{vid_col_name}` for hydration: {e}"))?;
2752 let right_idx = catalog_plan
2753 .schema()
2754 .index_of(&vid_col_name)
2755 .map_err(|e| anyhow!("catalog scan missing `{vid_col_name}`: {e}"))?;
2756 let on: Vec<(
2757 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2758 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2759 )> = vec![(
2760 Arc::new(Column::new(&vid_col_name, left_idx)),
2761 Arc::new(Column::new(&vid_col_name, right_idx)),
2762 )];
2763 let join = HashJoinExec::try_new(
2764 traverse_plan,
2765 catalog_plan,
2766 on,
2767 None,
2768 &JoinType::Inner,
2769 None,
2770 PartitionMode::CollectLeft,
2771 NullEquality::NullEqualsNothing,
2772 false,
2773 )?;
2774 let join_plan: Arc<dyn ExecutionPlan> = Arc::new(join);
2775
2776 let join_schema = join_plan.schema();
2784 let mut projection_exprs: Vec<(Arc<dyn datafusion::physical_plan::PhysicalExpr>, String)> =
2785 Vec::with_capacity(join_schema.fields().len() - 1);
2786 let mut seen_vid = false;
2787 for field in join_schema.fields().iter() {
2788 if field.name() == &vid_col_name {
2789 if seen_vid {
2790 continue;
2791 }
2792 seen_vid = true;
2793 }
2794 let expr = col_expr(field.name(), &join_schema)
2795 .map_err(|e| anyhow!("hydrate_virtual_target_from_catalog projection: {e}"))?;
2796 projection_exprs.push((expr, field.name().clone()));
2797 }
2798 let projected = ProjectionExec::try_new(projection_exprs, join_plan)
2799 .map_err(|e| anyhow!("hydrate_virtual_target_from_catalog projection: {e}"))?;
2800 Ok(Arc::new(projected))
2801 }
2802
2803 #[expect(
2819 clippy::too_many_arguments,
2820 reason = "mirrors plan_traverse's argument set"
2821 )]
2822 fn plan_traverse_virtual_edge(
2823 &self,
2824 input_plan: Arc<dyn ExecutionPlan>,
2825 source_col: String,
2826 source_variable: &str,
2827 virtual_edge_type_id: u32,
2828 direction: AstDirection,
2829 target_variable: &str,
2830 target_label_id: u16,
2831 step_variable: Option<&str>,
2832 all_properties: &HashMap<String, HashSet<String>>,
2833 target_filter: Option<&Expr>,
2834 optional: bool,
2835 optional_pattern_vars: &HashSet<String>,
2836 ) -> Result<Arc<dyn ExecutionPlan>> {
2837 use datafusion::common::NullEquality;
2838 use datafusion::physical_expr::expressions::{Column, col as col_expr};
2839 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
2840
2841 let entry = self
2842 .plugin_registry
2843 .virtual_edge_type_by_id(virtual_edge_type_id)
2844 .ok_or_else(|| {
2845 anyhow!(
2846 "Virtual edge-type id {virtual_edge_type_id:#x} for `{target_variable}` has \
2847 no registered CatalogTable; the originating CatalogProvider may have been \
2848 deregistered after the plan was cached"
2849 )
2850 })?;
2851 let type_name = entry.name.as_str();
2852 let edge_var = step_variable
2853 .map(str::to_string)
2854 .unwrap_or_else(|| format!("__anon_edge_{target_variable}"));
2855
2856 let edge_properties: Vec<String> = step_variable
2857 .and_then(|sv| all_properties.get(sv))
2858 .map(|props| {
2859 props
2860 .iter()
2861 .filter(|p| !p.starts_with('_') && *p != "*")
2862 .cloned()
2863 .collect()
2864 })
2865 .unwrap_or_default();
2866
2867 let catalog_exec = crate::query::df_graph::catalog_scan::CatalogEdgeScanExec::try_new(
2868 entry.table,
2869 virtual_edge_type_id,
2870 type_name.to_string(),
2871 edge_var.clone(),
2872 edge_properties,
2873 Vec::new(),
2874 None,
2875 )?;
2876 let catalog_plan: Arc<dyn ExecutionPlan> = Arc::new(catalog_exec);
2877
2878 let edge_src_col = format!("{edge_var}._src_vid");
2879 let edge_dst_col = format!("{edge_var}._dst_vid");
2880 let (right_key, target_src_col) = match direction {
2881 AstDirection::Outgoing => (edge_src_col.clone(), edge_dst_col.clone()),
2882 AstDirection::Incoming => (edge_dst_col.clone(), edge_src_col.clone()),
2883 AstDirection::Both => (edge_src_col.clone(), edge_dst_col.clone()),
2884 };
2885
2886 let left_idx = input_plan
2887 .schema()
2888 .index_of(&source_col)
2889 .map_err(|e| anyhow!("input plan missing source vid column `{source_col}`: {e}"))?;
2890 let right_idx = catalog_plan
2891 .schema()
2892 .index_of(&right_key)
2893 .map_err(|e| anyhow!("CatalogEdgeScanExec missing `{right_key}`: {e}"))?;
2894 let on: Vec<(
2895 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2896 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2897 )> = vec![(
2898 Arc::new(Column::new(&source_col, left_idx)),
2899 Arc::new(Column::new(&right_key, right_idx)),
2900 )];
2901 let join = HashJoinExec::try_new(
2902 input_plan,
2903 catalog_plan,
2904 on,
2905 None,
2906 &JoinType::Inner,
2907 None,
2908 PartitionMode::CollectLeft,
2909 NullEquality::NullEqualsNothing,
2910 false,
2911 )?;
2912 let join_plan: Arc<dyn ExecutionPlan> = Arc::new(join);
2913
2914 let join_schema = join_plan.schema();
2915 let target_vid_name = format!("{target_variable}._vid");
2916 let mut projection_exprs: Vec<(Arc<dyn datafusion::physical_plan::PhysicalExpr>, String)> =
2917 Vec::with_capacity(join_schema.fields().len());
2918 for field in join_schema.fields() {
2919 let name = field.name();
2920 if name == &right_key {
2921 continue;
2922 }
2923 let expr = col_expr(name, &join_schema)
2924 .map_err(|e| anyhow!("plan_traverse_virtual_edge projection: {e}"))?;
2925 let out_name = if name == &target_src_col {
2926 target_vid_name.clone()
2927 } else {
2928 name.clone()
2929 };
2930 projection_exprs.push((expr, out_name));
2931 }
2932 let projected: Arc<dyn ExecutionPlan> = Arc::new(
2933 ProjectionExec::try_new(projection_exprs, join_plan)
2934 .map_err(|e| anyhow!("plan_traverse_virtual_edge projection: {e}"))?,
2935 );
2936
2937 let mut plan = if uni_common::core::schema::is_virtual_label_id(target_label_id) {
2938 self.hydrate_virtual_target_from_catalog(
2939 projected,
2940 target_label_id,
2941 target_variable,
2942 all_properties,
2943 )?
2944 } else {
2945 projected
2946 };
2947
2948 plan = self.add_wildcard_structural_projection(plan, target_variable, all_properties)?;
2949 plan = self.maybe_add_edge_structural_projection(
2950 plan,
2951 step_variable,
2952 source_variable,
2953 target_variable,
2954 all_properties,
2955 false,
2956 )?;
2957
2958 if let Some(filter_expr) = target_filter {
2959 let mut variable_kinds = HashMap::new();
2960 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
2961 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
2962 if let Some(sv) = step_variable {
2963 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(false));
2964 }
2965 let ctx = TranslationContext {
2966 parameters: self.params.clone(),
2967 variable_kinds,
2968 ..Default::default()
2969 };
2970 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
2971 let schema = plan.schema();
2972 let session = self.session_ctx.read();
2973 let physical_filter =
2974 self.create_physical_filter_expr(&df_filter, &schema, &session)?;
2975 plan = if optional {
2976 Arc::new(OptionalFilterExec::new(
2977 plan,
2978 physical_filter,
2979 optional_pattern_vars.clone(),
2980 ))
2981 } else {
2982 Arc::new(FilterExec::try_new(physical_filter, plan)?)
2983 };
2984 } else {
2985 let _ = optional_pattern_vars;
2986 }
2987 Ok(plan)
2988 }
2989
2990 fn plan_scan_all(
2994 &self,
2995 variable: &str,
2996 filter: Option<&Expr>,
2997 optional: bool,
2998 all_properties: &HashMap<String, HashSet<String>>,
2999 ) -> Result<Arc<dyn ExecutionPlan>> {
3000 let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
3001 let extracted_vids = Self::extract_vid_from_cypher_filter(filter, variable, &self.params);
3004 let scan_filter = extracted_vids
3005 .as_deref()
3006 .filter(|v| v.len() == 1)
3007 .map(|v| Self::build_vid_physical_filter(&format!("{variable}._vid"), v[0]));
3008 let mut scan_exec = GraphScanExec::new_schemaless_all_scan(
3009 self.graph_ctx.clone(),
3010 variable.to_string(),
3011 properties.clone(),
3012 scan_filter,
3013 );
3014 if let Some(vids) = extracted_vids
3015 && vids.len() > 1
3016 {
3017 scan_exec = scan_exec.with_vid_list_filter(vids);
3018 }
3019 let scan_plan: Arc<dyn ExecutionPlan> = Arc::new(scan_exec);
3020 self.finalize_schemaless_scan(
3021 scan_plan,
3022 variable,
3023 filter,
3024 optional,
3025 &properties,
3026 need_full,
3027 )
3028 }
3029
3030 #[expect(
3032 clippy::too_many_arguments,
3033 reason = "Graph traversal requires many parameters"
3034 )]
3035 fn plan_traverse(
3036 &self,
3037 input: &LogicalPlan,
3038 edge_type_ids: &[u32],
3039 direction: AstDirection,
3040 source_variable: &str,
3041 target_variable: &str,
3042 target_label_id: u16,
3043 step_variable: Option<&str>,
3044 min_hops: usize,
3045 max_hops: usize,
3046 path_variable: Option<&str>,
3047 optional: bool,
3048 target_filter: Option<&Expr>,
3049 is_variable_length: bool,
3050 optional_pattern_vars: &HashSet<String>,
3051 all_properties: &HashMap<String, HashSet<String>>,
3052 scope_match_variables: &HashSet<String>,
3053 edge_filter_expr: Option<&Expr>,
3054 path_mode: &crate::query::df_graph::nfa::PathMode,
3055 qpp_steps: Option<&[crate::query::planner::QppStepInfo]>,
3056 ) -> Result<Arc<dyn ExecutionPlan>> {
3057 let input_plan = self.plan_internal(input, all_properties)?;
3058
3059 let adj_direction = convert_direction(direction.clone());
3060 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
3061
3062 if !is_variable_length
3071 && !edge_type_ids.is_empty()
3072 && edge_type_ids.len() == 1
3073 && edge_type_ids
3074 .iter()
3075 .all(|eid| uni_common::core::edge_type::is_virtual_edge_type(*eid))
3076 {
3077 return self.plan_traverse_virtual_edge(
3078 input_plan,
3079 source_col,
3080 source_variable,
3081 edge_type_ids[0],
3082 direction,
3083 target_variable,
3084 target_label_id,
3085 step_variable,
3086 all_properties,
3087 target_filter,
3088 optional,
3089 optional_pattern_vars,
3090 );
3091 }
3092
3093 let traverse_plan: Arc<dyn ExecutionPlan> = if !is_variable_length {
3094 let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
3096 let has_wildcard = all_properties
3097 .get(edge_var)
3098 .is_some_and(|props| props.contains("*"));
3099 if has_wildcard {
3100 let mut schema_props: Vec<String> = edge_type_ids
3102 .iter()
3103 .filter_map(|eid| self.schema.edge_type_name_by_id(*eid))
3104 .flat_map(|name| {
3105 self.schema
3106 .properties
3107 .get(name)
3108 .map(|p| p.keys().cloned().collect::<Vec<_>>())
3109 .unwrap_or_default()
3110 })
3111 .collect();
3112
3113 if let Some(props) = all_properties.get(edge_var) {
3118 for p in props {
3119 let passthrough = !p.starts_with('_')
3120 || matches!(p.as_str(), "_created_at" | "_updated_at");
3121 if p != "*" && passthrough && !schema_props.contains(p) {
3122 schema_props.push(p.clone());
3123 }
3124 }
3125 }
3126 schema_props
3127 } else {
3128 all_properties
3129 .get(edge_var)
3130 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3131 .unwrap_or_default()
3132 }
3133 } else {
3134 Vec::new()
3135 };
3136
3137 if let Some(edge_var) = step_variable {
3139 let has_wildcard = all_properties
3140 .get(edge_var)
3141 .is_some_and(|props| props.contains("*"));
3142 let edge_type_props = self.merged_edge_type_properties(edge_type_ids);
3143 let has_overflow_edge_props = edge_properties.iter().any(|p| {
3144 p != "overflow_json"
3145 && !p.starts_with('_')
3146 && !edge_type_props.contains_key(p.as_str())
3147 });
3148 let needs_overflow =
3152 (has_wildcard && edge_properties.is_empty()) || has_overflow_edge_props;
3153 if needs_overflow && !edge_properties.contains(&"overflow_json".to_string()) {
3154 edge_properties.push("overflow_json".to_string());
3155 }
3156
3157 if has_wildcard && !edge_properties.contains(&"_all_props".to_string()) {
3161 edge_properties.push("_all_props".to_string());
3162 }
3163 }
3164
3165 let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
3167 let mut target_properties =
3168 self.resolve_properties(target_variable, target_label_name_str, all_properties);
3169
3170 target_properties.retain(|p| p != "*" && p != STRUCT_ONLY_SENTINEL);
3176
3177 let target_has_wildcard = all_properties
3180 .get(target_variable)
3181 .is_some_and(|p| p.contains("*"));
3182 if target_has_wildcard && target_properties.is_empty() {
3183 target_properties.push("_all_props".to_string());
3184 }
3185
3186 let target_label_props = if !target_label_name_str.is_empty() {
3190 self.schema.properties.get(target_label_name_str)
3191 } else {
3192 None
3193 };
3194 let has_non_schema_props = target_properties.iter().any(|p| {
3195 p != "overflow_json"
3196 && p != "_all_props"
3197 && !p.starts_with('_')
3198 && !target_label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
3199 });
3200 if has_non_schema_props && !target_properties.iter().any(|p| p == "_all_props") {
3201 target_properties.push("_all_props".to_string());
3202 }
3203 if let Some(filter_expr) = target_filter {
3205 let filter_props = crate::query::df_expr::collect_properties(filter_expr);
3206 let has_overflow_filter = filter_props.iter().any(|(var, prop)| {
3207 var == target_variable
3208 && !prop.starts_with('_')
3209 && !target_label_props
3210 .is_some_and(|props| props.contains_key(prop.as_str()))
3211 });
3212 if has_overflow_filter && !target_properties.iter().any(|p| p == "_all_props") {
3213 target_properties.push("_all_props".to_string());
3214 }
3215 }
3216 if !target_label_name_str.is_empty()
3219 && has_non_schema_props
3220 && !target_properties.iter().any(|p| p == "overflow_json")
3221 {
3222 target_properties.push("overflow_json".to_string());
3223 }
3224
3225 let target_label_name = if target_label_name_str.is_empty() {
3227 None
3228 } else {
3229 Some(target_label_name_str.to_string())
3230 };
3231
3232 let bound_target_column =
3239 Self::detect_bound_target(&input_plan.schema(), target_variable);
3240
3241 let mut input_plan = input_plan;
3257 for rebound_var in [
3258 step_variable.and_then(|sv| sv.strip_prefix("__rebound_")),
3259 target_variable.strip_prefix("__rebound_"),
3260 ]
3261 .into_iter()
3262 .flatten()
3263 {
3264 if input_plan
3265 .schema()
3266 .field_with_name(rebound_var)
3267 .ok()
3268 .is_some_and(|f| {
3269 matches!(
3270 f.data_type(),
3271 datafusion::arrow::datatypes::DataType::Struct(_)
3272 )
3273 })
3274 {
3275 input_plan = Self::extract_all_struct_fields(input_plan, rebound_var)?;
3276 }
3277 }
3278
3279 let rebound_bound_edge_col = step_variable
3280 .and_then(|sv| sv.strip_prefix("__rebound_"))
3281 .map(|bound| format!("{}._eid", bound));
3282
3283 let used_edge_columns = Self::collect_used_edge_columns(
3284 &input_plan.schema(),
3285 scope_match_variables,
3286 rebound_bound_edge_col.as_deref(),
3287 );
3288
3289 Arc::new(GraphTraverseExec::new(
3290 input_plan,
3291 source_col,
3292 edge_type_ids.to_vec(),
3293 adj_direction,
3294 target_variable.to_string(),
3295 step_variable.map(|s| s.to_string()),
3296 edge_properties,
3297 target_properties,
3298 target_label_name,
3299 None, self.graph_ctx.clone(),
3301 optional,
3302 optional_pattern_vars.clone(),
3303 bound_target_column,
3304 used_edge_columns,
3305 ))
3306 } else {
3307 if edge_type_ids.is_empty() {
3309 if let (0, Some(path_var)) = (min_hops, path_variable) {
3312 return Ok(Arc::new(BindZeroLengthPathExec::new(
3313 input_plan,
3314 source_variable.to_string(),
3315 path_var.to_string(),
3316 self.graph_ctx.clone(),
3317 )));
3318 } else if min_hops == 0 && step_variable.is_none() {
3319 return Ok(input_plan);
3322 }
3323 }
3324 {
3325 let vlp_target_label_name_str =
3327 self.schema.label_name_by_id(target_label_id).unwrap_or("");
3328 let vlp_target_properties_raw = self.resolve_properties(
3329 target_variable,
3330 vlp_target_label_name_str,
3331 all_properties,
3332 );
3333 let target_has_wildcard = all_properties
3334 .get(target_variable)
3335 .is_some_and(|p| p.contains("*"));
3336 let vlp_target_label_props: Option<HashSet<String>> =
3337 if vlp_target_label_name_str.is_empty() {
3338 None
3339 } else {
3340 self.schema
3341 .properties
3342 .get(vlp_target_label_name_str)
3343 .map(|props| props.keys().cloned().collect())
3344 };
3345 let mut vlp_target_properties = sanitize_vlp_target_properties(
3346 vlp_target_properties_raw,
3347 target_has_wildcard,
3348 vlp_target_label_props.as_ref(),
3349 );
3350 let vlp_target_label_name = if vlp_target_label_name_str.is_empty() {
3351 None
3352 } else {
3353 Some(vlp_target_label_name_str.to_string())
3354 };
3355
3356 let bound_target_column =
3358 Self::detect_bound_target(&input_plan.schema(), target_variable);
3359 if bound_target_column.is_some() {
3360 vlp_target_properties.clear();
3363 }
3364
3365 let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
3367 let edge_var_name = step_variable.unwrap_or("__anon_edge");
3368 crate::query::pushdown::LanceFilterGenerator::generate(
3369 std::slice::from_ref(expr),
3370 edge_var_name,
3371 None,
3372 )
3373 });
3374
3375 let edge_property_conditions = edge_filter_expr
3377 .map(Self::extract_edge_property_conditions)
3378 .unwrap_or_default();
3379
3380 let used_edge_columns = Self::collect_used_edge_columns(
3382 &input_plan.schema(),
3383 scope_match_variables,
3384 None,
3385 );
3386
3387 let output_mode = if step_variable.is_some() {
3389 crate::query::df_graph::nfa::VlpOutputMode::StepVariable
3390 } else if path_variable.is_some() {
3391 crate::query::df_graph::nfa::VlpOutputMode::FullPath
3392 } else {
3393 crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
3394 };
3395
3396 let qpp_nfa = qpp_steps.map(|steps| {
3398 use crate::query::df_graph::nfa::{QppStep, VertexConstraint};
3399 let hops_per_iter = steps.len();
3400 let min_iter = min_hops / hops_per_iter;
3401 let max_iter = max_hops / hops_per_iter;
3402 let nfa_steps: Vec<QppStep> = steps
3403 .iter()
3404 .map(|s| QppStep {
3405 edge_type_ids: s.edge_type_ids.clone(),
3406 direction: convert_direction(s.direction.clone()),
3407 target_constraint: s
3408 .target_label
3409 .as_ref()
3410 .map(|l| VertexConstraint::Label(l.clone())),
3411 })
3412 .collect();
3413 crate::query::df_graph::nfa::PathNfa::from_qpp(nfa_steps, min_iter, max_iter)
3414 });
3415
3416 Arc::new(GraphVariableLengthTraverseExec::new(
3417 input_plan,
3418 source_col,
3419 edge_type_ids.to_vec(),
3420 adj_direction,
3421 min_hops,
3422 max_hops,
3423 target_variable.to_string(),
3424 step_variable.map(|s| s.to_string()),
3425 path_variable.map(|s| s.to_string()),
3426 vlp_target_properties,
3427 vlp_target_label_name,
3428 self.graph_ctx.clone(),
3429 optional,
3430 bound_target_column,
3431 edge_lance_filter,
3432 edge_property_conditions,
3433 used_edge_columns,
3434 path_mode.clone(),
3435 output_mode,
3436 qpp_nfa,
3437 ))
3438 }
3439 };
3440
3441 let mut traverse_plan = traverse_plan;
3443
3444 if uni_common::core::schema::is_virtual_label_id(target_label_id) {
3455 traverse_plan = self.hydrate_virtual_target_from_catalog(
3456 traverse_plan,
3457 target_label_id,
3458 target_variable,
3459 all_properties,
3460 )?;
3461 }
3462
3463 traverse_plan = self.add_wildcard_structural_projection(
3465 traverse_plan,
3466 target_variable,
3467 all_properties,
3468 )?;
3469
3470 traverse_plan = self.maybe_add_edge_structural_projection(
3473 traverse_plan,
3474 step_variable,
3475 source_variable,
3476 target_variable,
3477 all_properties,
3478 is_variable_length,
3479 )?;
3480
3481 if let Some(filter_expr) = target_filter {
3483 let mut variable_kinds = HashMap::new();
3485 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
3486 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
3487 if let Some(sv) = step_variable {
3488 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
3489 }
3490 if let Some(pv) = path_variable {
3491 variable_kinds.insert(pv.to_string(), VariableKind::Path);
3492 }
3493 let mut variable_labels = HashMap::new();
3494 if let Some(sv) = step_variable
3495 && edge_type_ids.len() == 1
3496 && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
3497 {
3498 variable_labels.insert(sv.to_string(), name.to_string());
3499 }
3500 let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
3501 if !target_label_name_str.is_empty() {
3502 variable_labels.insert(
3503 target_variable.to_string(),
3504 target_label_name_str.to_string(),
3505 );
3506 }
3507 let ctx = TranslationContext {
3508 parameters: self.params.clone(),
3509 variable_labels,
3510 variable_kinds,
3511 ..Default::default()
3512 };
3513 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
3514 let schema = traverse_plan.schema();
3515 let session = self.session_ctx.read();
3516 let physical_filter =
3517 self.create_physical_filter_expr(&df_filter, &schema, &session)?;
3518
3519 if optional {
3520 Ok(Arc::new(OptionalFilterExec::new(
3521 traverse_plan,
3522 physical_filter,
3523 optional_pattern_vars.clone(),
3524 )))
3525 } else {
3526 Ok(Arc::new(FilterExec::try_new(
3527 physical_filter,
3528 traverse_plan,
3529 )?))
3530 }
3531 } else {
3532 Ok(traverse_plan)
3533 }
3534 }
3535
3536 #[expect(clippy::too_many_arguments)]
3541 fn plan_traverse_main_by_type(
3542 &self,
3543 input: &LogicalPlan,
3544 type_names: &[String],
3545 direction: AstDirection,
3546 source_variable: &str,
3547 target_variable: &str,
3548 step_variable: Option<&str>,
3549 optional: bool,
3550 optional_pattern_vars: &HashSet<String>,
3551 all_properties: &HashMap<String, HashSet<String>>,
3552 scope_match_variables: &HashSet<String>,
3553 ) -> Result<Arc<dyn ExecutionPlan>> {
3554 let input_plan = self.plan_internal(input, all_properties)?;
3555
3556 let adj_direction = convert_direction(direction);
3557 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
3558
3559 let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
3561
3562 let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
3564 all_properties
3565 .get(edge_var)
3566 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3567 .unwrap_or_default()
3568 } else {
3569 Vec::new()
3570 };
3571
3572 if let Some(edge_var) = step_variable
3574 && all_properties
3575 .get(edge_var)
3576 .is_some_and(|props| props.contains("*"))
3577 && !edge_properties.iter().any(|p| p == "_all_props")
3578 {
3579 edge_properties.push("_all_props".to_string());
3580 }
3581
3582 let mut target_properties: Vec<String> = all_properties
3584 .get(target_variable)
3585 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3586 .unwrap_or_default();
3587
3588 let target_has_wildcard = all_properties
3592 .get(target_variable)
3593 .is_some_and(|p| p.contains("*"));
3594 if (target_has_wildcard || !target_properties.is_empty())
3595 && !target_properties.iter().any(|p| p == "_all_props")
3596 {
3597 target_properties.push("_all_props".to_string());
3598 }
3599 if bound_target_column.is_some() {
3600 target_properties.clear();
3602 }
3603
3604 let rebound_bound_edge_col = step_variable
3607 .and_then(|sv| sv.strip_prefix("__rebound_"))
3608 .map(|bound| format!("{}._eid", bound));
3609 let used_edge_columns = Self::collect_used_edge_columns(
3610 &input_plan.schema(),
3611 scope_match_variables,
3612 rebound_bound_edge_col.as_deref(),
3613 );
3614
3615 let traverse_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphTraverseMainExec::new(
3617 input_plan,
3618 source_col,
3619 type_names.to_vec(),
3620 adj_direction,
3621 target_variable.to_string(),
3622 step_variable.map(|s| s.to_string()),
3623 edge_properties.clone(),
3624 target_properties,
3625 self.graph_ctx.clone(),
3626 optional,
3627 optional_pattern_vars.clone(),
3628 bound_target_column,
3629 used_edge_columns,
3630 ));
3631
3632 let mut result_plan = traverse_plan;
3633
3634 result_plan =
3636 self.add_wildcard_structural_projection(result_plan, target_variable, all_properties)?;
3637
3638 result_plan = self.maybe_add_edge_structural_projection(
3640 result_plan,
3641 step_variable,
3642 source_variable,
3643 target_variable,
3644 all_properties,
3645 false, )?;
3647
3648 Ok(result_plan)
3649 }
3650
3651 #[expect(clippy::too_many_arguments)]
3656 fn plan_traverse_main_by_type_vlp(
3657 &self,
3658 input: &LogicalPlan,
3659 type_names: &[String],
3660 direction: AstDirection,
3661 source_variable: &str,
3662 target_variable: &str,
3663 step_variable: Option<&str>,
3664 min_hops: usize,
3665 max_hops: usize,
3666 path_variable: Option<&str>,
3667 optional: bool,
3668 all_properties: &HashMap<String, HashSet<String>>,
3669 edge_filter_expr: Option<&Expr>,
3670 path_mode: &crate::query::df_graph::nfa::PathMode,
3671 scope_match_variables: &HashSet<String>,
3672 ) -> Result<Arc<dyn ExecutionPlan>> {
3673 let input_plan = self.plan_internal(input, all_properties)?;
3674
3675 let adj_direction = convert_direction(direction);
3676 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
3677
3678 let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
3680
3681 let mut target_properties: Vec<String> = all_properties
3683 .get(target_variable)
3684 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3685 .unwrap_or_default();
3686
3687 let target_has_wildcard = all_properties
3691 .get(target_variable)
3692 .is_some_and(|p| p.contains("*"));
3693 if (target_has_wildcard || !target_properties.is_empty())
3694 && !target_properties.iter().any(|p| p == "_all_props")
3695 {
3696 target_properties.push("_all_props".to_string());
3697 }
3698 if bound_target_column.is_some() {
3699 target_properties.clear();
3701 }
3702
3703 let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
3705 let edge_var_name = step_variable.unwrap_or("__anon_edge");
3706 crate::query::pushdown::LanceFilterGenerator::generate(
3707 std::slice::from_ref(expr),
3708 edge_var_name,
3709 None,
3710 )
3711 });
3712
3713 let edge_property_conditions = edge_filter_expr
3715 .map(Self::extract_edge_property_conditions)
3716 .unwrap_or_default();
3717
3718 let used_edge_columns =
3720 Self::collect_used_edge_columns(&input_plan.schema(), scope_match_variables, None);
3721
3722 let output_mode = if step_variable.is_some() {
3724 crate::query::df_graph::nfa::VlpOutputMode::StepVariable
3725 } else if path_variable.is_some() {
3726 crate::query::df_graph::nfa::VlpOutputMode::FullPath
3727 } else {
3728 crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
3729 };
3730
3731 let traverse_plan = Arc::new(GraphVariableLengthTraverseMainExec::new(
3732 input_plan,
3733 source_col,
3734 type_names.to_vec(),
3735 adj_direction,
3736 min_hops,
3737 max_hops,
3738 target_variable.to_string(),
3739 step_variable.map(|s| s.to_string()),
3740 path_variable.map(|s| s.to_string()),
3741 target_properties,
3742 self.graph_ctx.clone(),
3743 optional,
3744 bound_target_column,
3745 edge_lance_filter,
3746 edge_property_conditions,
3747 used_edge_columns,
3748 path_mode.clone(),
3749 output_mode,
3750 ));
3751
3752 Ok(traverse_plan)
3753 }
3754
3755 #[expect(clippy::too_many_arguments)]
3757 fn plan_shortest_path(
3758 &self,
3759 input: &LogicalPlan,
3760 edge_type_ids: &[u32],
3761 direction: AstDirection,
3762 source_variable: &str,
3763 target_variable: &str,
3764 path_variable: &str,
3765 all_shortest: bool,
3766 all_properties: &HashMap<String, HashSet<String>>,
3767 ) -> Result<Arc<dyn ExecutionPlan>> {
3768 let input_plan = self.plan_internal(input, all_properties)?;
3769
3770 let adj_direction = convert_direction(direction);
3771 let source_col = format!("{}._vid", source_variable);
3772 let target_col = format!("{}._vid", target_variable);
3773
3774 Ok(Arc::new(GraphShortestPathExec::new(
3775 input_plan,
3776 source_col,
3777 target_col,
3778 edge_type_ids.to_vec(),
3779 adj_direction,
3780 path_variable.to_string(),
3781 self.graph_ctx.clone(),
3782 all_shortest,
3783 )))
3784 }
3785
3786 fn plan_filter(
3791 &self,
3792 input: &LogicalPlan,
3793 predicate: &Expr,
3794 optional_variables: &HashSet<String>,
3795 all_properties: &HashMap<String, HashSet<String>>,
3796 ) -> Result<Arc<dyn ExecutionPlan>> {
3797 if let LogicalPlan::CrossJoin { left, right } = input
3804 && let Some(plan) = self.try_plan_cross_join_as_hash_join(
3805 left,
3806 right,
3807 predicate,
3808 optional_variables,
3809 all_properties,
3810 )?
3811 {
3812 return Ok(plan);
3813 }
3814
3815 let input_plan = self.plan_internal(input, all_properties)?;
3816 let schema = input_plan.schema();
3817
3818 let ctx = self.translation_context_for_plan(input);
3821 let session = self.session_ctx.read();
3822 let state = session.state();
3823 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
3824 &state,
3825 Some(&ctx),
3826 )
3827 .with_subquery_ctx(
3828 self.graph_ctx.clone(),
3829 self.schema.clone(),
3830 self.session_ctx.clone(),
3831 self.storage.clone(),
3832 self.params.clone(),
3833 self.outer_entity_vars.clone(),
3834 );
3835 let physical_predicate = compiler.compile(predicate, &schema)?;
3836
3837 if !optional_variables.is_empty() {
3839 return Ok(Arc::new(OptionalFilterExec::new(
3840 input_plan,
3841 physical_predicate,
3842 optional_variables.clone(),
3843 )));
3844 }
3845
3846 Ok(Arc::new(FilterExec::try_new(
3847 physical_predicate,
3848 input_plan,
3849 )?))
3850 }
3851
3852 fn try_plan_cross_join_as_hash_join(
3861 &self,
3862 left: &LogicalPlan,
3863 right: &LogicalPlan,
3864 predicate: &Expr,
3865 optional_variables: &HashSet<String>,
3866 all_properties: &HashMap<String, HashSet<String>>,
3867 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
3868 use datafusion::common::NullEquality;
3869 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
3870
3871 let left_vars = collect_plan_variables(left);
3872 let right_vars = collect_plan_variables(right);
3873 let cls = classify_join_predicate(predicate, &left_vars, &right_vars);
3874
3875 if cls.equi_pairs.is_empty() {
3876 return Ok(None);
3877 }
3878
3879 let left_optional: HashSet<&String> = optional_variables
3891 .iter()
3892 .filter(|v| left_vars.contains(*v))
3893 .collect();
3894 let right_optional: HashSet<&String> = optional_variables
3895 .iter()
3896 .filter(|v| right_vars.contains(*v))
3897 .collect();
3898
3899 let join_type = match (left_optional.is_empty(), right_optional.is_empty()) {
3900 (true, true) => JoinType::Inner,
3901 (true, false) => JoinType::Left,
3902 (false, true) => JoinType::Right,
3903 (false, false) => return Ok(None), };
3905
3906 if !matches!(join_type, JoinType::Inner)
3909 && (!cls.left_only.is_empty() || !cls.right_only.is_empty() || cls.residual.is_some())
3910 {
3911 return Ok(None);
3912 }
3913
3914 tracing::debug!(
3927 target: "uni_query::cross_join_in_pushdown",
3928 equi_pairs = cls.equi_pairs.len(),
3929 left_only = cls.left_only.len(),
3930 right_only = cls.right_only.len(),
3931 has_residual = cls.residual.is_some(),
3932 "try_plan_cross_join_as_hash_join: classified predicate"
3933 );
3934
3935 let left_filters: Vec<Expr> = cls.left_only.clone();
3936 let right_filters: Vec<Expr> = cls.right_only.clone();
3937 let left_with_filter = wrap_with_filter(left.clone(), &left_filters);
3938 let right_with_filter = wrap_with_filter(right.clone(), &right_filters);
3939 let left_plan = self.plan_internal(&left_with_filter, all_properties)?;
3940 let right_plan = self.plan_internal(&right_with_filter, all_properties)?;
3941
3942 let left_schema = left_plan.schema();
3946 let right_schema = right_plan.schema();
3947 let left_ctx = self.translation_context_for_plan(&left_with_filter);
3948 let right_ctx = self.translation_context_for_plan(&right_with_filter);
3949
3950 let on: Vec<(
3954 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3955 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3956 )> = {
3957 let session = self.session_ctx.read();
3958 let state = session.state();
3959
3960 let left_compiler =
3961 crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
3962 &state,
3963 Some(&left_ctx),
3964 )
3965 .with_subquery_ctx(
3966 self.graph_ctx.clone(),
3967 self.schema.clone(),
3968 self.session_ctx.clone(),
3969 self.storage.clone(),
3970 self.params.clone(),
3971 self.outer_entity_vars.clone(),
3972 );
3973 let right_compiler =
3974 crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
3975 &state,
3976 Some(&right_ctx),
3977 )
3978 .with_subquery_ctx(
3979 self.graph_ctx.clone(),
3980 self.schema.clone(),
3981 self.session_ctx.clone(),
3982 self.storage.clone(),
3983 self.params.clone(),
3984 self.outer_entity_vars.clone(),
3985 );
3986
3987 let mut pairs: Vec<(
3988 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3989 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3990 )> = Vec::with_capacity(cls.equi_pairs.len());
3991
3992 for (l_expr, r_expr) in &cls.equi_pairs {
3993 let l_phys = left_compiler.compile(l_expr, &left_schema)?;
3994 let r_phys = right_compiler.compile(r_expr, &right_schema)?;
3995 let Some((l_key, r_key)) =
3996 unify_join_key_types(l_phys, r_phys, &left_schema, &right_schema, &state)
3997 else {
3998 return Ok(None);
3999 };
4000 pairs.push((l_key, r_key));
4001 }
4002 pairs
4003 };
4004
4005 if matches!(join_type, JoinType::Inner | JoinType::Left)
4013 && cls.residual.is_none()
4014 && let Some(plan) = self.try_emit_vid_lookup_join(
4015 &cls.equi_pairs,
4016 join_type,
4017 &left_plan,
4018 &right_plan,
4019 &left_with_filter,
4020 &right_with_filter,
4021 )?
4022 {
4023 return Ok(Some(plan));
4024 }
4025
4026 let join: Arc<dyn ExecutionPlan> = Arc::new(HashJoinExec::try_new(
4027 left_plan,
4028 right_plan,
4029 on,
4030 None,
4031 &join_type,
4032 None,
4033 PartitionMode::CollectLeft,
4034 NullEquality::NullEqualsNothing,
4035 false,
4036 )?);
4037
4038 if let Some(residual) = cls.residual {
4041 let join_schema = join.schema();
4042 let crossjoin_for_ctx = LogicalPlan::CrossJoin {
4043 left: Box::new(left_with_filter.clone()),
4044 right: Box::new(right_with_filter.clone()),
4045 };
4046 let merged_ctx = self.translation_context_for_plan(&crossjoin_for_ctx);
4047 let session = self.session_ctx.read();
4048 let state = session.state();
4049 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4050 &state,
4051 Some(&merged_ctx),
4052 )
4053 .with_subquery_ctx(
4054 self.graph_ctx.clone(),
4055 self.schema.clone(),
4056 self.session_ctx.clone(),
4057 self.storage.clone(),
4058 self.params.clone(),
4059 self.outer_entity_vars.clone(),
4060 );
4061 let physical_residual = compiler.compile(&residual, &join_schema)?;
4062 return Ok(Some(Arc::new(FilterExec::try_new(
4063 physical_residual,
4064 join,
4065 )?)));
4066 }
4067
4068 Ok(Some(join))
4069 }
4070
4071 fn try_emit_vid_lookup_join(
4087 &self,
4088 equi_pairs: &[(Expr, Expr)],
4089 join_type: JoinType,
4090 left_plan: &Arc<dyn ExecutionPlan>,
4091 right_plan: &Arc<dyn ExecutionPlan>,
4092 left_logical: &LogicalPlan,
4093 right_logical: &LogicalPlan,
4094 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
4095 use crate::query::df_graph::scan::GraphScanExec;
4096 use crate::query::df_graph::vid_lookup_join::{
4097 EquiPair, ProbeSide, VidJoinKind, VidLookupJoinExec,
4098 };
4099 use datafusion::physical_expr::expressions::Column;
4100
4101 if equi_pairs.is_empty() {
4102 return Ok(None);
4103 }
4104
4105 let mut anchor_idx: Option<(usize, ProbeSide)> = None;
4111 for (i, (l_expr, r_expr)) in equi_pairs.iter().enumerate() {
4112 if expr_is_vid_property(l_expr) {
4113 anchor_idx = Some((i, ProbeSide::Left));
4114 break;
4115 }
4116 if expr_is_vid_property(r_expr) {
4117 anchor_idx = Some((i, ProbeSide::Right));
4118 break;
4119 }
4120 }
4121 let Some((anchor_pair_idx, probe_side)) = anchor_idx else {
4122 return Ok(None);
4123 };
4124
4125 let probe_plan = match probe_side {
4126 ProbeSide::Left => left_plan,
4127 ProbeSide::Right => right_plan,
4128 };
4129 let build_plan = match probe_side {
4130 ProbeSide::Left => right_plan,
4131 ProbeSide::Right => left_plan,
4132 };
4133 let build_logical = match probe_side {
4134 ProbeSide::Left => right_logical,
4135 ProbeSide::Right => left_logical,
4136 };
4137
4138 if probe_plan
4149 .as_any()
4150 .downcast_ref::<GraphScanExec>()
4151 .is_none()
4152 {
4153 return Ok(None);
4154 }
4155
4156 let left_schema = left_plan.schema();
4160 let right_schema = right_plan.schema();
4161 let left_ctx = self.translation_context_for_plan(left_logical);
4162 let right_ctx = self.translation_context_for_plan(right_logical);
4163 let _ = build_logical; let session = self.session_ctx.read();
4166 let state = session.state();
4167 let left_compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4168 &state,
4169 Some(&left_ctx),
4170 )
4171 .with_subquery_ctx(
4172 self.graph_ctx.clone(),
4173 self.schema.clone(),
4174 self.session_ctx.clone(),
4175 self.storage.clone(),
4176 self.params.clone(),
4177 self.outer_entity_vars.clone(),
4178 );
4179 let right_compiler =
4180 crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4181 &state,
4182 Some(&right_ctx),
4183 )
4184 .with_subquery_ctx(
4185 self.graph_ctx.clone(),
4186 self.schema.clone(),
4187 self.session_ctx.clone(),
4188 self.storage.clone(),
4189 self.params.clone(),
4190 self.outer_entity_vars.clone(),
4191 );
4192
4193 let mut compiled: Vec<EquiPair> = Vec::with_capacity(equi_pairs.len());
4194 for (l_expr, r_expr) in equi_pairs {
4195 let l_phys = left_compiler.compile(l_expr, &left_schema)?;
4196 let r_phys = right_compiler.compile(r_expr, &right_schema)?;
4197 let (Some(l_col), Some(r_col)) = (
4198 l_phys.as_any().downcast_ref::<Column>(),
4199 r_phys.as_any().downcast_ref::<Column>(),
4200 ) else {
4201 return Ok(None);
4203 };
4204 compiled.push(EquiPair {
4205 left_col_idx: l_col.index(),
4206 right_col_idx: r_col.index(),
4207 });
4208 }
4209
4210 let anchor = compiled[anchor_pair_idx];
4212 let anchor_build_idx = match probe_side {
4213 ProbeSide::Left => anchor.right_col_idx,
4214 ProbeSide::Right => anchor.left_col_idx,
4215 };
4216 let build_schema = build_plan.schema();
4217 if !matches!(
4218 build_schema.field(anchor_build_idx).data_type(),
4219 datafusion::arrow::datatypes::DataType::UInt64
4220 ) {
4221 return Ok(None);
4222 }
4223
4224 if anchor_pair_idx != 0 {
4226 compiled.swap(0, anchor_pair_idx);
4227 }
4228
4229 let join_kind = match join_type {
4233 JoinType::Inner => VidJoinKind::Inner,
4234 JoinType::Left => VidJoinKind::Left,
4235 _ => return Ok(None),
4236 };
4237
4238 drop(session);
4239
4240 Ok(Some(Arc::new(VidLookupJoinExec::try_new(
4241 left_plan.clone(),
4242 right_plan.clone(),
4243 probe_side,
4244 compiled,
4245 join_kind,
4246 )?)))
4247 }
4248
4249 fn plan_project_with_aliases(
4251 &self,
4252 input: &LogicalPlan,
4253 projections: &[(Expr, Option<String>)],
4254 all_properties: &HashMap<String, HashSet<String>>,
4255 alias_map: &HashMap<String, Expr>,
4256 ) -> Result<Arc<dyn ExecutionPlan>> {
4257 let input_plan = self.plan_internal_with_aliases(input, all_properties, alias_map)?;
4259 self.plan_project_from_input(input_plan, projections, Some(input))
4260 }
4261
4262 fn plan_project_from_input(
4264 &self,
4265 input_plan: Arc<dyn ExecutionPlan>,
4266 projections: &[(Expr, Option<String>)],
4267 context_plan: Option<&LogicalPlan>,
4268 ) -> Result<Arc<dyn ExecutionPlan>> {
4269 let schema = input_plan.schema();
4270
4271 let session = self.session_ctx.read();
4272 let state = session.state();
4273
4274 let ctx = context_plan.map(|p| self.translation_context_for_plan(p));
4276
4277 let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
4278
4279 for (expr, alias) in projections {
4280 if let Expr::Variable(var_name) = expr {
4286 if schema.column_with_name(var_name).is_some() {
4287 let (col_idx, _) = schema.column_with_name(var_name).unwrap();
4288 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4289 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
4290 );
4291 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4292 exprs.push((col_expr, name));
4293
4294 let vid_col = format!("{}._vid", var_name);
4296 let labels_col = format!("{}._labels", var_name);
4297 if let Some((vi, _)) = schema.column_with_name(&vid_col) {
4298 let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4299 datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
4300 );
4301 exprs.push((ve, vid_col.clone()));
4302 }
4303 if let Some((li, _)) = schema.column_with_name(&labels_col) {
4304 let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4305 datafusion::physical_expr::expressions::Column::new(&labels_col, li),
4306 );
4307 exprs.push((le, labels_col.clone()));
4308 }
4309
4310 let prefix = format!("{}.", var_name);
4313 for (idx, field) in schema.fields().iter().enumerate() {
4314 let fname = field.name();
4315 if fname.starts_with(&prefix)
4316 && fname != &vid_col
4317 && fname != &labels_col
4318 && !exprs.iter().any(|(_, n)| n == fname)
4319 {
4320 let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4321 Arc::new(datafusion::physical_expr::expressions::Column::new(
4322 fname, idx,
4323 ));
4324 exprs.push((prop_expr, fname.clone()));
4325 }
4326 }
4327 continue;
4328 }
4329
4330 let prefix = format!("{}.", var_name);
4333 let expanded_fields: Vec<(usize, String)> = schema
4334 .fields()
4335 .iter()
4336 .enumerate()
4337 .filter(|(_, f)| f.name().starts_with(&prefix))
4338 .map(|(i, f)| (i, f.name().clone()))
4339 .collect();
4340
4341 if !expanded_fields.is_empty() {
4342 use datafusion::functions::expr_fn::named_struct;
4343 use datafusion::logical_expr::lit;
4344
4345 let mut struct_args = Vec::new();
4347 for (_, field_name) in &expanded_fields {
4348 let prop_name = &field_name[prefix.len()..];
4349 struct_args.push(lit(prop_name.to_string()));
4350 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4352 field_name.as_str(),
4353 )));
4354 }
4355
4356 let struct_expr = named_struct(struct_args);
4357 let df_schema =
4358 datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4359 let session = self.session_ctx.read();
4360 let state_ref = session.state();
4361 let resolved_expr = Self::resolve_udfs(&struct_expr, &state_ref)?;
4362
4363 use datafusion::physical_planner::PhysicalPlanner;
4364 let phys_planner =
4365 datafusion::physical_planner::DefaultPhysicalPlanner::default();
4366 let physical_struct_expr = phys_planner.create_physical_expr(
4367 &resolved_expr,
4368 &df_schema,
4369 &state_ref,
4370 )?;
4371
4372 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4373 exprs.push((physical_struct_expr, name));
4374
4375 let vid_col = format!("{}._vid", var_name);
4377 let labels_col = format!("{}._labels", var_name);
4378 if let Some((vi, _)) = schema.column_with_name(&vid_col) {
4379 let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4380 datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
4381 );
4382 exprs.push((ve, vid_col.clone()));
4383 }
4384 if let Some((li, _)) = schema.column_with_name(&labels_col) {
4385 let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4386 datafusion::physical_expr::expressions::Column::new(&labels_col, li),
4387 );
4388 exprs.push((le, labels_col.clone()));
4389 }
4390
4391 for (idx, field) in schema.fields().iter().enumerate() {
4394 let fname = field.name();
4395 if fname.starts_with(&prefix)
4396 && fname != &vid_col
4397 && fname != &labels_col
4398 && !exprs.iter().any(|(_, n)| n == fname)
4399 {
4400 let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4401 Arc::new(datafusion::physical_expr::expressions::Column::new(
4402 fname, idx,
4403 ));
4404 exprs.push((prop_expr, fname.clone()));
4405 }
4406 }
4407 continue;
4408 }
4409 }
4411
4412 if matches!(expr, Expr::Wildcard) {
4414 for (col_idx, field) in schema.fields().iter().enumerate() {
4415 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4416 datafusion::physical_expr::expressions::Column::new(field.name(), col_idx),
4417 );
4418 exprs.push((col_expr, field.name().clone()));
4419 }
4420 continue;
4421 }
4422
4423 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4424 &state,
4425 ctx.as_ref(),
4426 )
4427 .with_subquery_ctx(
4428 self.graph_ctx.clone(),
4429 self.schema.clone(),
4430 self.session_ctx.clone(),
4431 self.storage.clone(),
4432 self.params.clone(),
4433 self.outer_entity_vars.clone(),
4434 );
4435 let physical_expr = compiler.compile(expr, &schema)?;
4436
4437 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4438 exprs.push((physical_expr, name));
4439 }
4440
4441 Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
4442 }
4443
4444 fn plan_locy_project(
4450 &self,
4451 input: &LogicalPlan,
4452 projections: &[(Expr, Option<String>)],
4453 target_types: &[DataType],
4454 all_properties: &HashMap<String, HashSet<String>>,
4455 ) -> Result<Arc<dyn ExecutionPlan>> {
4456 use datafusion::physical_expr::expressions::Column;
4457
4458 let input_plan = self.plan_internal(input, all_properties)?;
4459 let schema = input_plan.schema();
4460
4461 let session = self.session_ctx.read();
4462 let state = session.state();
4463
4464 let ctx = self.translation_context_for_plan(input);
4465
4466 let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
4467
4468 for (i, (expr, alias)) in projections.iter().enumerate() {
4469 let target_type = target_types.get(i);
4470
4471 if let Expr::Variable(var_name) = expr {
4473 let vid_col_name = format!("{}._vid", var_name);
4475 let vid_col_match = schema
4476 .fields()
4477 .iter()
4478 .enumerate()
4479 .find(|(_, f)| f.name() == &vid_col_name);
4480
4481 if let Some((vid_idx, _)) = vid_col_match {
4482 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4484 Arc::new(Column::new(&vid_col_name, vid_idx));
4485 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4486 exprs.push((col_expr, name));
4487 continue;
4488 }
4489
4490 if let Some((col_idx, _)) = schema.column_with_name(var_name) {
4492 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4493 Arc::new(Column::new(var_name, col_idx));
4494 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4495 exprs.push((col_expr, name));
4496 continue;
4497 }
4498 }
4500
4501 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4503 &state,
4504 Some(&ctx),
4505 )
4506 .with_subquery_ctx(
4507 self.graph_ctx.clone(),
4508 self.schema.clone(),
4509 self.session_ctx.clone(),
4510 self.storage.clone(),
4511 self.params.clone(),
4512 self.outer_entity_vars.clone(),
4513 );
4514 let physical_expr = compiler.compile(expr, &schema)?;
4515
4516 let physical_expr = if let Some(target_dt) = target_type {
4521 let actual_dt = physical_expr
4522 .data_type(schema.as_ref())
4523 .unwrap_or(DataType::LargeUtf8);
4524 let is_string = |dt: &DataType| matches!(dt, DataType::Utf8 | DataType::LargeUtf8);
4525 let is_numeric = |dt: &DataType| {
4526 matches!(dt, DataType::Int64 | DataType::Float64 | DataType::UInt64)
4527 };
4528 let cross_domain = (is_string(&actual_dt) && is_numeric(target_dt))
4529 || (is_numeric(&actual_dt) && is_string(target_dt));
4530 if actual_dt != *target_dt && !cross_domain {
4531 coerce_physical_expr(physical_expr, &actual_dt, target_dt, schema.as_ref())
4532 } else {
4533 physical_expr
4534 }
4535 } else {
4536 physical_expr
4537 };
4538
4539 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4540 exprs.push((physical_expr, name));
4541 }
4542
4543 Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
4544 }
4545
4546 fn plan_aggregate(
4548 &self,
4549 input: &LogicalPlan,
4550 group_by: &[Expr],
4551 aggregates: &[Expr],
4552 all_properties: &HashMap<String, HashSet<String>>,
4553 ) -> Result<Arc<dyn ExecutionPlan>> {
4554 let input_plan = self.plan_internal(input, all_properties)?;
4555 let schema = input_plan.schema();
4556
4557 let session = self.session_ctx.read();
4558 let state = session.state();
4559
4560 let ctx = self.translation_context_for_plan(input);
4562
4563 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
4565 let mut group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4566 Vec::new();
4567 for expr in group_by {
4568 let name = expr.to_string_repr();
4569
4570 if let Expr::Variable(var_name) = expr
4575 && schema.column_with_name(var_name).is_none()
4576 {
4577 let prefix = format!("{}.", var_name);
4578 let has_expanded = schema
4579 .fields()
4580 .iter()
4581 .any(|f| f.name().starts_with(&prefix));
4582 if has_expanded {
4583 continue;
4584 }
4585 }
4586
4587 let physical_expr = if CypherPhysicalExprCompiler::contains_custom_expr(expr) {
4588 let compiler = CypherPhysicalExprCompiler::new(&state, Some(&ctx))
4591 .with_subquery_ctx(
4592 self.graph_ctx.clone(),
4593 self.schema.clone(),
4594 self.session_ctx.clone(),
4595 self.storage.clone(),
4596 self.params.clone(),
4597 self.outer_entity_vars.clone(),
4598 );
4599 compiler.compile(expr, &schema)?
4600 } else {
4601 let df_schema_ref =
4604 datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4605 let df_expr = cypher_expr_to_df(expr, Some(&ctx))?;
4606 let df_expr = Self::resolve_udfs(&df_expr, &state)?;
4607 let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema_ref)?;
4608 let mut df_expr = Self::resolve_udfs(&df_expr, &state)?;
4609 if let Ok(expr_type) = df_expr.get_type(&df_schema_ref) {
4610 if uni_common::core::schema::is_datetime_struct(&expr_type) {
4611 df_expr = crate::query::df_expr::extract_datetime_nanos(df_expr);
4613 } else if uni_common::core::schema::is_time_struct(&expr_type) {
4614 df_expr = crate::query::df_expr::extract_time_nanos(df_expr);
4617 }
4618 }
4619
4620 create_physical_expr(&df_expr, &df_schema_ref, state.execution_props())?
4622 };
4623 group_exprs.push((physical_expr, name));
4624 }
4625
4626 for expr in group_by {
4632 if let Expr::Variable(var_name) = expr
4633 && matches!(
4634 ctx.variable_kinds.get(var_name),
4635 Some(VariableKind::Node) | Some(VariableKind::Edge)
4636 )
4637 {
4638 let prefix = format!("{}.", var_name);
4639 for (idx, field) in schema.fields().iter().enumerate() {
4640 if field.name().starts_with(&prefix) {
4641 let prop_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4642 datafusion::physical_expr::expressions::Column::new(field.name(), idx),
4643 );
4644 group_exprs.push((prop_col, field.name().clone()));
4645 }
4646 }
4647 }
4648 }
4649
4650 let physical_group_by = PhysicalGroupBy::new_single(group_exprs);
4651
4652 let (input_plan, schema, rewritten_aggregates) =
4654 self.precompute_custom_aggregate_args(input_plan, &schema, aggregates, &state, &ctx)?;
4655
4656 let (aggr_exprs, filter_exprs): (Vec<_>, Vec<_>) = self
4659 .translate_aggregates(&rewritten_aggregates, &schema, &state, &ctx)?
4660 .into_iter()
4661 .unzip();
4662 let num_aggregates = aggr_exprs.len();
4663
4664 let agg_exec = Arc::new(AggregateExec::try_new(
4665 AggregateMode::Single,
4666 physical_group_by,
4667 aggr_exprs,
4668 filter_exprs,
4669 input_plan,
4670 schema,
4671 )?);
4672
4673 let agg_schema = agg_exec.schema();
4677 let num_group_by = agg_schema.fields().len() - num_aggregates;
4680 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4681 Vec::new();
4682
4683 for (i, field) in agg_schema.fields().iter().enumerate() {
4684 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4685 datafusion::physical_expr::expressions::Column::new(field.name(), i),
4686 );
4687 let name = if i >= num_group_by {
4688 aggregate_column_name(&aggregates[i - num_group_by])
4690 } else {
4691 field.name().clone()
4692 };
4693 proj_exprs.push((col_expr, name));
4694 }
4695
4696 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, agg_exec)?))
4697 }
4698
4699 fn wrap_temporal_sort_key(
4704 arg: datafusion::logical_expr::Expr,
4705 schema: &SchemaRef,
4706 ) -> Result<datafusion::logical_expr::Expr> {
4707 use datafusion::logical_expr::ScalarUDF;
4708 if let Ok(arg_type) = arg.get_type(&datafusion::common::DFSchema::try_from(
4709 schema.as_ref().clone(),
4710 )?) {
4711 if uni_common::core::schema::is_datetime_struct(&arg_type) {
4712 return Ok(datafusion::logical_expr::Expr::ScalarFunction(
4713 datafusion::logical_expr::expr::ScalarFunction::new_udf(
4714 Arc::new(ScalarUDF::from(
4715 datafusion::functions::core::getfield::GetFieldFunc::new(),
4716 )),
4717 vec![arg, datafusion::logical_expr::lit("nanos_since_epoch")],
4718 ),
4719 ));
4720 } else if uni_common::core::schema::is_time_struct(&arg_type) {
4721 return Ok(datafusion::logical_expr::Expr::ScalarFunction(
4722 datafusion::logical_expr::expr::ScalarFunction::new_udf(
4723 Arc::new(ScalarUDF::from(
4724 datafusion::functions::core::getfield::GetFieldFunc::new(),
4725 )),
4726 vec![arg, datafusion::logical_expr::lit("nanos_since_midnight")],
4727 ),
4728 ));
4729 }
4730 }
4731 Ok(arg)
4732 }
4733
4734 fn translate_aggregates(
4736 &self,
4737 aggregates: &[Expr],
4738 schema: &SchemaRef,
4739 state: &SessionState,
4740 ctx: &TranslationContext,
4741 ) -> Result<Vec<PhysicalAggregate>> {
4742 use datafusion::functions_aggregate::expr_fn::{
4743 avg, count, max, min, stddev, stddev_pop, sum, var_pop, var_sample,
4744 };
4745
4746 let mut result: Vec<PhysicalAggregate> = Vec::new();
4747
4748 for agg_expr in aggregates {
4749 let Expr::FunctionCall {
4750 name,
4751 args,
4752 distinct,
4753 ..
4754 } = agg_expr
4755 else {
4756 return Err(anyhow!("Expected aggregate function, got: {:?}", agg_expr));
4757 };
4758
4759 let name_lower = name.to_lowercase();
4760
4761 let get_arg = || -> Result<DfExpr> {
4763 if args.is_empty() {
4764 return Err(anyhow!("{}() requires an argument", name_lower));
4765 }
4766 cypher_expr_to_df(&args[0], Some(ctx))
4767 };
4768
4769 let df_agg = match name_lower.as_str() {
4770 "count" if args.is_empty() => count(datafusion::logical_expr::lit(1)),
4771 "count" => {
4772 if matches!(args.first(), Some(uni_cypher::ast::Expr::Wildcard)) {
4778 count(datafusion::logical_expr::lit(1))
4779 } else if matches!(args.first(), Some(uni_cypher::ast::Expr::Variable(_))) {
4780 if *distinct {
4781 count(get_arg()?)
4782 } else {
4783 count(datafusion::logical_expr::lit(1))
4784 }
4785 } else {
4786 count(get_arg()?)
4787 }
4788 }
4789 "sum" => {
4790 let arg = get_arg()?;
4791 if self.is_large_binary_col(&arg, schema) {
4792 let udaf = Arc::new(crate::query::df_udfs::create_cypher_sum_udaf());
4793 udaf.call(vec![arg])
4794 } else {
4795 use datafusion::logical_expr::Cast;
4798 let is_float = if let DfExpr::Column(col) = &arg
4799 && let Ok(field) = schema.field_with_name(&col.name)
4800 {
4801 matches!(
4802 field.data_type(),
4803 datafusion::arrow::datatypes::DataType::Float32
4804 | datafusion::arrow::datatypes::DataType::Float64
4805 )
4806 } else {
4807 false
4808 };
4809 if is_float {
4810 sum(DfExpr::Cast(Cast::new(
4811 Box::new(arg),
4812 datafusion::arrow::datatypes::DataType::Float64,
4813 )))
4814 } else {
4815 sum(DfExpr::Cast(Cast::new(
4816 Box::new(arg),
4817 datafusion::arrow::datatypes::DataType::Int64,
4818 )))
4819 }
4820 }
4821 }
4822 "avg" => {
4823 let arg = get_arg()?;
4824 if self.is_large_binary_col(&arg, schema) {
4825 let coerced = crate::query::df_udfs::cypher_to_float64_expr(arg);
4826 avg(coerced)
4827 } else {
4828 use datafusion::logical_expr::Cast;
4829 avg(DfExpr::Cast(Cast::new(
4830 Box::new(arg),
4831 datafusion::arrow::datatypes::DataType::Float64,
4832 )))
4833 }
4834 }
4835 "stdev" | "stddev" => {
4843 let arg = get_arg()?;
4844 stddev(Self::coerce_numeric_for_stat(arg, schema, self))
4845 }
4846 "stdevp" | "stddevp" => {
4847 let arg = get_arg()?;
4848 stddev_pop(Self::coerce_numeric_for_stat(arg, schema, self))
4849 }
4850 "variance" => {
4851 let arg = get_arg()?;
4852 var_sample(Self::coerce_numeric_for_stat(arg, schema, self))
4853 }
4854 "variancep" => {
4855 let arg = get_arg()?;
4856 var_pop(Self::coerce_numeric_for_stat(arg, schema, self))
4857 }
4858 "min" => {
4859 let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
4861
4862 if self.is_large_binary_col(&arg, schema) {
4863 let udaf = Arc::new(crate::query::df_udfs::create_cypher_min_udaf());
4864 udaf.call(vec![arg])
4865 } else {
4866 min(arg)
4867 }
4868 }
4869 "max" => {
4870 let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
4872
4873 if self.is_large_binary_col(&arg, schema) {
4874 let udaf = Arc::new(crate::query::df_udfs::create_cypher_max_udaf());
4875 udaf.call(vec![arg])
4876 } else {
4877 max(arg)
4878 }
4879 }
4880 "percentiledisc" => {
4881 if args.len() != 2 {
4882 return Err(anyhow!("percentileDisc() requires exactly 2 arguments"));
4883 }
4884 let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
4885 let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
4886 let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
4887 let udaf =
4888 Arc::new(crate::query::df_udfs::create_cypher_percentile_disc_udaf());
4889 udaf.call(vec![coerced, pct_arg])
4890 }
4891 "percentilecont" => {
4892 if args.len() != 2 {
4893 return Err(anyhow!("percentileCont() requires exactly 2 arguments"));
4894 }
4895 let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
4896 let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
4897 let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
4898 let udaf =
4899 Arc::new(crate::query::df_udfs::create_cypher_percentile_cont_udaf());
4900 udaf.call(vec![coerced, pct_arg])
4901 }
4902 "collect" => {
4903 let arg = get_arg()?;
4906 crate::query::df_udfs::create_cypher_collect_expr(arg, *distinct)
4907 }
4908 "btic_min" => {
4909 let arg = get_arg()?;
4910 let udaf = Arc::new(crate::query::df_udfs::create_btic_min_udaf());
4911 udaf.call(vec![arg])
4912 }
4913 "btic_max" => {
4914 let arg = get_arg()?;
4915 let udaf = Arc::new(crate::query::df_udfs::create_btic_max_udaf());
4916 udaf.call(vec![arg])
4917 }
4918 "btic_span_agg" => {
4919 let arg = get_arg()?;
4920 let udaf = Arc::new(crate::query::df_udfs::create_btic_span_agg_udaf());
4921 udaf.call(vec![arg])
4922 }
4923 "btic_count_at" => {
4924 if args.len() != 2 {
4925 return Err(anyhow!("btic_count_at() requires exactly 2 arguments"));
4926 }
4927 let btic_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
4928 let point_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
4929 let udaf = Arc::new(crate::query::df_udfs::create_btic_count_at_udaf());
4930 udaf.call(vec![btic_arg, point_arg])
4931 }
4932 _ => {
4933 let resolved = uni_plugin::QName::candidate_splits(&name_lower)
4945 .find_map(|q| self.plugin_registry.aggregate(&q).map(|e| (q, e)));
4946 if let Some((qname, entry)) = resolved {
4947 let arg_exprs: Vec<DfExpr> = args
4948 .iter()
4949 .map(|a| cypher_expr_to_df(a, Some(ctx)))
4950 .collect::<Result<Vec<_>>>()?;
4951 let udaf = Arc::new(datafusion::logical_expr::AggregateUDF::from(
4952 crate::query::df_udaf_plugin::PluginAggregateUdaf::new(
4953 qname,
4954 Arc::clone(&self.plugin_registry),
4955 entry.signature.clone(),
4956 ),
4957 ));
4958 udaf.call(arg_exprs)
4959 } else {
4960 return Err(anyhow!("Unsupported aggregate function: {}", name));
4961 }
4962 }
4963 };
4964
4965 let df_agg = if *distinct
4967 && !matches!(
4968 name_lower.as_str(),
4969 "collect" | "percentiledisc" | "percentilecont"
4970 ) {
4971 use datafusion::prelude::ExprFunctionExt;
4972 df_agg.distinct().build().map_err(|e| anyhow!("{}", e))?
4973 } else {
4974 df_agg
4975 };
4976
4977 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4979 let df_agg = Self::resolve_udfs(&df_agg, state)?;
4980 let df_agg = crate::query::df_expr::apply_type_coercion(&df_agg, &df_schema)?;
4981 let df_agg = Self::resolve_udfs(&df_agg, state)?;
4982
4983 let agg_and_filter = self.create_physical_aggregate(&df_agg, schema, state)?;
4985 result.push(agg_and_filter);
4986 }
4987
4988 Ok(result)
4989 }
4990
4991 fn precompute_custom_aggregate_args(
4997 &self,
4998 input_plan: Arc<dyn ExecutionPlan>,
4999 schema: &SchemaRef,
5000 aggregates: &[Expr],
5001 state: &SessionState,
5002 ctx: &TranslationContext,
5003 ) -> Result<(Arc<dyn ExecutionPlan>, SchemaRef, Vec<Expr>)> {
5004 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
5005
5006 let mut needs_projection = false;
5007 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
5008 Vec::new();
5009 let mut rewritten_aggregates = Vec::new();
5010 let mut col_counter = 0;
5011
5012 for (i, field) in schema.fields().iter().enumerate() {
5014 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
5015 datafusion::physical_expr::expressions::Column::new(field.name(), i),
5016 );
5017 proj_exprs.push((col_expr, field.name().clone()));
5018 }
5019
5020 for agg_expr in aggregates {
5022 let Expr::FunctionCall {
5023 name,
5024 args,
5025 distinct,
5026 window_spec,
5027 } = agg_expr
5028 else {
5029 rewritten_aggregates.push(agg_expr.clone());
5030 continue;
5031 };
5032
5033 let mut rewritten_args = Vec::new();
5034 let mut agg_needs_rewrite = false;
5035
5036 for arg in args {
5037 if CypherPhysicalExprCompiler::contains_custom_expr(arg) {
5038 let compiler = CypherPhysicalExprCompiler::new(state, Some(ctx))
5040 .with_subquery_ctx(
5041 self.graph_ctx.clone(),
5042 self.schema.clone(),
5043 self.session_ctx.clone(),
5044 self.storage.clone(),
5045 self.params.clone(),
5046 self.outer_entity_vars.clone(),
5047 );
5048 let physical_expr = compiler.compile(arg, schema)?;
5049
5050 let col_name = format!("__pc_{}", col_counter);
5052 col_counter += 1;
5053 proj_exprs.push((physical_expr, col_name.clone()));
5054
5055 rewritten_args.push(Expr::Variable(col_name));
5057 agg_needs_rewrite = true;
5058 needs_projection = true;
5059 } else {
5060 rewritten_args.push(arg.clone());
5061 }
5062 }
5063
5064 if agg_needs_rewrite {
5065 rewritten_aggregates.push(Expr::FunctionCall {
5066 name: name.clone(),
5067 args: rewritten_args,
5068 distinct: *distinct,
5069 window_spec: window_spec.clone(),
5070 });
5071 } else {
5072 rewritten_aggregates.push(agg_expr.clone());
5073 }
5074 }
5075
5076 if needs_projection {
5077 let projection_exec = Arc::new(
5078 datafusion::physical_plan::projection::ProjectionExec::try_new(
5079 proj_exprs, input_plan,
5080 )?,
5081 );
5082 let new_schema = projection_exec.schema();
5083 Ok((projection_exec, new_schema, rewritten_aggregates))
5084 } else {
5085 Ok((input_plan, schema.clone(), aggregates.to_vec()))
5086 }
5087 }
5088
5089 fn plan_sort(
5096 &self,
5097 input: &LogicalPlan,
5098 order_by: &[SortItem],
5099 all_properties: &HashMap<String, HashSet<String>>,
5100 alias_map: &HashMap<String, Expr>,
5101 ) -> Result<Arc<dyn ExecutionPlan>> {
5102 let input_plan = self.plan_internal(input, all_properties)?;
5103 let schema = input_plan.schema();
5104
5105 let session = self.session_ctx.read();
5106
5107 let ctx = self.translation_context_for_plan(input);
5109
5110 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
5112
5113 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
5117
5118 let mut df_sort_exprs = Vec::new();
5119 let mut custom_physical_overrides: Vec<(
5120 usize,
5121 Arc<dyn datafusion::physical_expr::PhysicalExpr>,
5122 )> = Vec::new();
5123 for item in order_by {
5124 let mut sort_expr = item.expr.clone();
5125
5126 if let Expr::Variable(ref name) = sort_expr {
5129 let col_name = name.as_str();
5131 let exists_in_schema = schema.fields().iter().any(|f| f.name() == col_name);
5132
5133 if !exists_in_schema && let Some(aliased_expr) = alias_map.get(col_name) {
5134 sort_expr = aliased_expr.clone();
5135 }
5136 }
5137
5138 let asc = item.ascending;
5139 let nulls_first = !asc; if CypherPhysicalExprCompiler::contains_custom_expr(&sort_expr) {
5145 let sort_state = session.state();
5146 let compiler = CypherPhysicalExprCompiler::new(&sort_state, Some(&ctx))
5147 .with_subquery_ctx(
5148 self.graph_ctx.clone(),
5149 self.schema.clone(),
5150 self.session_ctx.clone(),
5151 self.storage.clone(),
5152 self.params.clone(),
5153 self.outer_entity_vars.clone(),
5154 );
5155 let inner_physical = compiler.compile(&sort_expr, &schema)?;
5156
5157 let first_col = schema
5160 .fields()
5161 .first()
5162 .map(|f| f.name().clone())
5163 .unwrap_or_else(|| "_dummy_".to_string());
5164 let dummy_expr = DfExpr::Column(datafusion::common::Column::from_name(&first_col));
5165 let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
5166 let sort_key_expr = sort_key_udf.call(vec![dummy_expr]);
5167 custom_physical_overrides.push((df_sort_exprs.len(), inner_physical));
5168 df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
5169 continue;
5170 }
5171
5172 let df_expr = cypher_expr_to_df(&sort_expr, Some(&ctx))?;
5173 let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
5174 let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema)?;
5175 let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
5178
5179 let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
5184 let sort_key_expr = sort_key_udf.call(vec![df_expr]);
5185 df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
5186 }
5187
5188 let mut physical_sort_exprs = create_physical_sort_exprs(
5189 &df_sort_exprs,
5190 &df_schema,
5191 session.state().execution_props(),
5192 )?;
5193
5194 for (idx, custom_inner) in custom_physical_overrides {
5198 if idx < physical_sort_exprs.len() {
5199 let phys = &physical_sort_exprs[idx];
5200 let sort_key_udf = Arc::new(crate::query::df_udfs::create_cypher_sort_key_udf());
5204 let config_options = Arc::new(datafusion::config::ConfigOptions::default());
5205 let udf_name = sort_key_udf.name().to_string();
5206 let new_sort_key = datafusion::physical_expr::ScalarFunctionExpr::new(
5207 &udf_name,
5208 sort_key_udf,
5209 vec![custom_inner],
5210 Arc::new(arrow_schema::Field::new(
5211 "_cypher_sort_key",
5212 DataType::LargeBinary,
5213 true,
5214 )),
5215 config_options,
5216 );
5217 physical_sort_exprs[idx] = datafusion::physical_expr::PhysicalSortExpr {
5218 expr: Arc::new(new_sort_key),
5219 options: phys.options,
5220 };
5221 }
5222 }
5223
5224 let lex_ordering = datafusion::physical_expr::LexOrdering::new(physical_sort_exprs)
5227 .ok_or_else(|| anyhow!("ORDER BY must have at least one sort expression"))?;
5228
5229 Ok(Arc::new(SortExec::new(lex_ordering, input_plan)))
5230 }
5231
5232 fn plan_limit(
5234 &self,
5235 input: &LogicalPlan,
5236 skip: Option<usize>,
5237 fetch: Option<usize>,
5238 all_properties: &HashMap<String, HashSet<String>>,
5239 ) -> Result<Arc<dyn ExecutionPlan>> {
5240 let input_plan = self.plan_internal(input, all_properties)?;
5241
5242 if let Some(offset) = skip.filter(|&s| s > 0) {
5244 use datafusion::physical_plan::limit::GlobalLimitExec;
5245 return Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, fetch)));
5246 }
5247
5248 if let Some(limit) = fetch {
5249 Ok(Arc::new(LocalLimitExec::new(input_plan, limit)))
5250 } else {
5251 Ok(input_plan)
5253 }
5254 }
5255
5256 fn plan_union(
5258 &self,
5259 left: &LogicalPlan,
5260 right: &LogicalPlan,
5261 all: bool,
5262 all_properties: &HashMap<String, HashSet<String>>,
5263 ) -> Result<Arc<dyn ExecutionPlan>> {
5264 let left_plan = self.plan_internal(left, all_properties)?;
5265 let right_plan = self.plan_internal(right, all_properties)?;
5266
5267 let left_schema = left_plan.schema();
5284 let right_schema = right_plan.schema();
5285 if left_schema.fields().len() != right_schema.fields().len()
5286 || left_schema
5287 .fields()
5288 .iter()
5289 .zip(right_schema.fields().iter())
5290 .any(|(l, r)| l.data_type() != r.data_type())
5291 {
5292 let fmt = |s: &Schema| {
5293 s.fields()
5294 .iter()
5295 .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
5296 .collect::<Vec<_>>()
5297 .join(", ")
5298 };
5299 return Err(anyhow!(
5300 "Plan: cannot UNION branches with mismatched schemas — \
5301 left=[{}], right=[{}]. This is a planner bug; please file \
5302 an issue.",
5303 fmt(left_schema.as_ref()),
5304 fmt(right_schema.as_ref()),
5305 ));
5306 }
5307
5308 let union_plan = UnionExec::try_new(vec![left_plan, right_plan])?;
5309
5310 if !all {
5312 use datafusion::physical_plan::aggregates::{
5313 AggregateExec, AggregateMode, PhysicalGroupBy,
5314 };
5315 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
5316
5317 let coalesced = Arc::new(CoalescePartitionsExec::new(union_plan));
5319
5320 let schema = coalesced.schema();
5322 let group_by_exprs: Vec<_> = (0..schema.fields().len())
5323 .map(|i| {
5324 (
5325 Arc::new(datafusion::physical_plan::expressions::Column::new(
5326 schema.field(i).name(),
5327 i,
5328 ))
5329 as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
5330 schema.field(i).name().clone(),
5331 )
5332 })
5333 .collect();
5334
5335 let group_by = PhysicalGroupBy::new_single(group_by_exprs);
5336
5337 Ok(Arc::new(AggregateExec::try_new(
5338 AggregateMode::Single,
5339 group_by,
5340 vec![], vec![], coalesced,
5343 schema,
5344 )?))
5345 } else {
5346 Ok(union_plan)
5348 }
5349 }
5350
5351 fn plan_window_functions(
5357 &self,
5358 input: Arc<dyn ExecutionPlan>,
5359 window_exprs: &[Expr],
5360 context_plan: Option<&LogicalPlan>,
5361 ) -> Result<Arc<dyn ExecutionPlan>> {
5362 use datafusion::functions_aggregate::average::avg_udaf;
5363 use datafusion::functions_aggregate::count::count_udaf;
5364 use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
5365 use datafusion::functions_aggregate::sum::sum_udaf;
5366 use datafusion::functions_window::lead_lag::{lag_udwf, lead_udwf};
5367 use datafusion::functions_window::nth_value::{
5368 first_value_udwf, last_value_udwf, nth_value_udwf,
5369 };
5370 use datafusion::functions_window::ntile::ntile_udwf;
5371 use datafusion::functions_window::rank::{dense_rank_udwf, rank_udwf};
5372 use datafusion::functions_window::row_number::row_number_udwf;
5373 use datafusion::logical_expr::{WindowFrame, WindowFunctionDefinition};
5374 use datafusion::physical_expr::LexOrdering;
5375 use datafusion::physical_plan::sorts::sort::SortExec;
5376 use datafusion::physical_plan::windows::{WindowAggExec, create_window_expr};
5377
5378 let input_schema = input.schema();
5379 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
5380
5381 let session = self.session_ctx.read();
5382 let state = session.state();
5383
5384 let tx_ctx = context_plan.map(|p| self.translation_context_for_plan(p));
5386 let mut window_expr_list = Vec::new();
5387
5388 for expr in window_exprs {
5389 let Expr::FunctionCall {
5390 name,
5391 args,
5392 distinct,
5393 window_spec: Some(window_spec),
5394 } = expr
5395 else {
5396 return Err(anyhow!("Expected window function call with OVER clause"));
5397 };
5398
5399 let name_lower = name.to_lowercase();
5400
5401 let (window_fn_def, is_aggregate) = match name_lower.as_str() {
5403 "count" => (WindowFunctionDefinition::AggregateUDF(count_udaf()), true),
5405 "sum" => (WindowFunctionDefinition::AggregateUDF(sum_udaf()), true),
5406 "avg" => (WindowFunctionDefinition::AggregateUDF(avg_udaf()), true),
5407 "min" => (WindowFunctionDefinition::AggregateUDF(min_udaf()), true),
5408 "max" => (WindowFunctionDefinition::AggregateUDF(max_udaf()), true),
5409 "row_number" => (
5411 WindowFunctionDefinition::WindowUDF(row_number_udwf()),
5412 false,
5413 ),
5414 "rank" => (WindowFunctionDefinition::WindowUDF(rank_udwf()), false),
5415 "dense_rank" => (
5416 WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
5417 false,
5418 ),
5419 "lag" => (WindowFunctionDefinition::WindowUDF(lag_udwf()), false),
5420 "lead" => (WindowFunctionDefinition::WindowUDF(lead_udwf()), false),
5421 "ntile" => {
5422 if let Some(Expr::Literal(CypherLiteral::Integer(n))) = args.first()
5424 && *n <= 0
5425 {
5426 return Err(anyhow!("NTILE bucket count must be positive, got: {}", n));
5427 }
5428 (WindowFunctionDefinition::WindowUDF(ntile_udwf()), false)
5429 }
5430 "first_value" => (
5431 WindowFunctionDefinition::WindowUDF(first_value_udwf()),
5432 false,
5433 ),
5434 "last_value" => (
5435 WindowFunctionDefinition::WindowUDF(last_value_udwf()),
5436 false,
5437 ),
5438 "nth_value" => (WindowFunctionDefinition::WindowUDF(nth_value_udwf()), false),
5439 other => return Err(anyhow!("Unsupported window function: {}", other)),
5440 };
5441
5442 let physical_args: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
5444 if args.is_empty() || matches!(args.as_slice(), [Expr::Wildcard]) {
5445 if is_aggregate {
5447 vec![create_physical_expr(
5448 &datafusion::logical_expr::lit(1),
5449 &df_schema,
5450 state.execution_props(),
5451 )?]
5452 } else {
5453 vec![]
5455 }
5456 } else {
5457 args.iter()
5458 .map(|arg| {
5459 let mut df_expr = cypher_expr_to_df(arg, tx_ctx.as_ref())?;
5460
5461 if is_aggregate {
5464 let cast_type = match name_lower.as_str() {
5465 "sum" => Some(datafusion::arrow::datatypes::DataType::Int64),
5466 "avg" => Some(datafusion::arrow::datatypes::DataType::Float64),
5467 _ => None,
5468 };
5469 if let Some(target_type) = cast_type {
5470 df_expr = DfExpr::Cast(datafusion::logical_expr::Cast::new(
5471 Box::new(df_expr),
5472 target_type,
5473 ));
5474 }
5475 }
5476
5477 create_physical_expr(&df_expr, &df_schema, state.execution_props())
5478 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
5479 })
5480 .collect::<Result<Vec<_>>>()?
5481 };
5482
5483 let partition_by_physical: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
5485 window_spec
5486 .partition_by
5487 .iter()
5488 .map(|e| {
5489 let df_expr = cypher_expr_to_df(e, tx_ctx.as_ref())?;
5490 create_physical_expr(&df_expr, &df_schema, state.execution_props())
5491 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
5492 })
5493 .collect::<Result<Vec<_>>>()?;
5494
5495 let mut order_by_physical: Vec<datafusion::physical_expr::PhysicalSortExpr> =
5497 window_spec
5498 .order_by
5499 .iter()
5500 .map(|sort_item| {
5501 let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
5502 let physical_expr =
5503 create_physical_expr(&df_expr, &df_schema, state.execution_props())
5504 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))?;
5505 Ok(datafusion::physical_expr::PhysicalSortExpr {
5506 expr: physical_expr,
5507 options: datafusion::arrow::compute::SortOptions {
5508 descending: !sort_item.ascending,
5509 nulls_first: !sort_item.ascending, },
5511 })
5512 })
5513 .collect::<Result<Vec<_>>>()?;
5514
5515 if order_by_physical.is_empty() && !partition_by_physical.is_empty() {
5518 for partition_expr in &partition_by_physical {
5519 order_by_physical.push(datafusion::physical_expr::PhysicalSortExpr {
5520 expr: Arc::clone(partition_expr),
5521 options: datafusion::arrow::compute::SortOptions {
5522 descending: false,
5523 nulls_first: false,
5524 },
5525 });
5526 }
5527 }
5528
5529 let window_frame = if is_aggregate {
5534 if window_spec.order_by.is_empty() {
5535 use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
5537 Arc::new(WindowFrame::new_bounds(
5538 WindowFrameUnits::Rows,
5539 WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
5540 WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
5541 ))
5542 } else {
5543 Arc::new(WindowFrame::new(Some(false)))
5545 }
5546 } else {
5547 use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
5549 Arc::new(WindowFrame::new_bounds(
5550 WindowFrameUnits::Rows,
5551 WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
5552 WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
5553 ))
5554 };
5555
5556 let alias = expr.to_string_repr();
5558
5559 let window_expr = create_window_expr(
5561 &window_fn_def,
5562 alias,
5563 &physical_args,
5564 &partition_by_physical,
5565 &order_by_physical,
5566 window_frame,
5567 input_schema.clone(),
5568 false, *distinct,
5570 None, )?;
5572
5573 window_expr_list.push(window_expr);
5574 }
5575
5576 let mut sort_exprs = Vec::new();
5579
5580 for expr in window_exprs {
5582 if let Expr::FunctionCall {
5583 window_spec: Some(window_spec),
5584 ..
5585 } = expr
5586 {
5587 for partition_expr in &window_spec.partition_by {
5588 let df_expr = cypher_expr_to_df(partition_expr, tx_ctx.as_ref())?;
5589 let physical_expr =
5590 create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
5591
5592 if !sort_exprs
5595 .iter()
5596 .any(|s: &datafusion::physical_expr::PhysicalSortExpr| {
5597 s.expr.to_string() == physical_expr.to_string()
5598 })
5599 {
5600 sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
5601 expr: physical_expr,
5602 options: datafusion::arrow::compute::SortOptions {
5603 descending: false,
5604 nulls_first: false,
5605 },
5606 });
5607 }
5608 }
5609
5610 for sort_item in &window_spec.order_by {
5612 let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
5613 let physical_expr =
5614 create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
5615
5616 sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
5617 expr: physical_expr,
5618 options: datafusion::arrow::compute::SortOptions {
5619 descending: !sort_item.ascending,
5620 nulls_first: !sort_item.ascending,
5621 },
5622 });
5623 }
5624 }
5625 }
5626
5627 let sorted_input = if !sort_exprs.is_empty() {
5629 let lex_ordering = LexOrdering::new(sort_exprs)
5630 .ok_or_else(|| anyhow!("Failed to create LexOrdering for window function"))?;
5631 Arc::new(SortExec::new(lex_ordering, input)) as Arc<dyn ExecutionPlan>
5632 } else {
5633 input
5634 };
5635
5636 let window_agg_exec = WindowAggExec::try_new(
5638 window_expr_list,
5639 sorted_input,
5640 false, )?;
5642
5643 Ok(Arc::new(window_agg_exec))
5644 }
5645
5646 fn plan_empty(&self) -> Result<Arc<dyn ExecutionPlan>> {
5651 let schema = Arc::new(Schema::empty());
5652 Ok(Arc::new(PlaceholderRowExec::new(schema)))
5655 }
5656
5657 fn plan_bind_zero_length_path(
5660 &self,
5661 input: &LogicalPlan,
5662 node_variable: &str,
5663 path_variable: &str,
5664 all_properties: &HashMap<String, HashSet<String>>,
5665 ) -> Result<Arc<dyn ExecutionPlan>> {
5666 let input_plan = self.plan_internal(input, all_properties)?;
5667 Ok(Arc::new(BindZeroLengthPathExec::new(
5668 input_plan,
5669 node_variable.to_string(),
5670 path_variable.to_string(),
5671 self.graph_ctx.clone(),
5672 )))
5673 }
5674
5675 fn plan_bind_path(
5678 &self,
5679 input: &LogicalPlan,
5680 node_variables: &[String],
5681 edge_variables: &[String],
5682 path_variable: &str,
5683 all_properties: &HashMap<String, HashSet<String>>,
5684 ) -> Result<Arc<dyn ExecutionPlan>> {
5685 let input_plan = self.plan_internal(input, all_properties)?;
5686 Ok(Arc::new(BindFixedPathExec::new(
5687 input_plan,
5688 node_variables.to_vec(),
5689 edge_variables.to_vec(),
5690 path_variable.to_string(),
5691 self.graph_ctx.clone(),
5692 )))
5693 }
5694
5695 fn extract_edge_property_conditions(expr: &Expr) -> Vec<(String, uni_common::Value)> {
5704 match expr {
5705 Expr::BinaryOp {
5706 left,
5707 op: uni_cypher::ast::BinaryOp::Eq,
5708 right,
5709 } => {
5710 if let Expr::Property(inner, prop_name) = left.as_ref()
5712 && matches!(inner.as_ref(), Expr::Variable(_))
5713 && let Expr::Literal(lit) = right.as_ref()
5714 {
5715 return vec![(prop_name.clone(), lit.to_value())];
5716 }
5717 if let Expr::Literal(lit) = left.as_ref()
5719 && let Expr::Property(inner, prop_name) = right.as_ref()
5720 && matches!(inner.as_ref(), Expr::Variable(_))
5721 {
5722 return vec![(prop_name.clone(), lit.to_value())];
5723 }
5724 vec![]
5725 }
5726 Expr::BinaryOp {
5727 left,
5728 op: uni_cypher::ast::BinaryOp::And,
5729 right,
5730 } => {
5731 let mut result = Self::extract_edge_property_conditions(left);
5732 result.extend(Self::extract_edge_property_conditions(right));
5733 result
5734 }
5735 _ => vec![],
5736 }
5737 }
5738
5739 fn create_physical_filter_expr(
5744 &self,
5745 expr: &DfExpr,
5746 schema: &SchemaRef,
5747 session: &SessionContext,
5748 ) -> Result<Arc<dyn datafusion::physical_expr::PhysicalExpr>> {
5749 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
5750 let state = session.state();
5751
5752 let resolved_expr = Self::resolve_udfs(expr, &state)?;
5754
5755 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
5757
5758 let coerced_expr = Self::resolve_udfs(&coerced_expr, &state)?;
5760
5761 use datafusion::physical_planner::PhysicalPlanner;
5763 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
5764 let physical = planner.create_physical_expr(&coerced_expr, &df_schema, &state)?;
5765
5766 Ok(physical)
5767 }
5768
5769 fn resolve_udfs(expr: &DfExpr, state: &datafusion::execution::SessionState) -> Result<DfExpr> {
5774 use datafusion::common::tree_node::{Transformed, TreeNode};
5775 use datafusion::logical_expr::Expr as DfExpr;
5776
5777 let result = expr
5778 .clone()
5779 .transform_up(|node| {
5780 if let DfExpr::ScalarFunction(ref func) = node {
5781 let udf_name = func.func.name();
5782 if let Some(registered_udf) = state.scalar_functions().get(udf_name) {
5783 return Ok(Transformed::yes(DfExpr::ScalarFunction(
5784 datafusion::logical_expr::expr::ScalarFunction {
5785 func: registered_udf.clone(),
5786 args: func.args.clone(),
5787 },
5788 )));
5789 }
5790 }
5791 Ok(Transformed::no(node))
5792 })
5793 .map_err(|e| anyhow::anyhow!("Failed to resolve UDFs: {}", e))?;
5794
5795 Ok(result.data)
5796 }
5797
5798 fn add_structural_projection(
5801 &self,
5802 input: Arc<dyn ExecutionPlan>,
5803 variable: &str,
5804 properties: &[String],
5805 ) -> Result<Arc<dyn ExecutionPlan>> {
5806 use datafusion::functions::expr_fn::named_struct;
5807 use datafusion::logical_expr::lit;
5808 use datafusion::physical_plan::projection::ProjectionExec;
5809
5810 let input_schema = input.schema();
5811 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
5812 Vec::new();
5813
5814 for (i, field) in input_schema.fields().iter().enumerate() {
5816 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
5817 field.name(),
5818 i,
5819 ));
5820 proj_exprs.push((col_expr, field.name().clone()));
5821 }
5822
5823 let mut struct_args = Vec::with_capacity(properties.len() * 2 + 4);
5825
5826 struct_args.push(lit("_vid"));
5828 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5829 format!("{}._vid", variable),
5830 )));
5831
5832 struct_args.push(lit("_labels"));
5834 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5835 format!("{}._labels", variable),
5836 )));
5837
5838 for prop in properties {
5839 struct_args.push(lit(prop.clone()));
5840 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5841 format!("{}.{}", variable, prop),
5842 )));
5843 }
5844
5845 let struct_expr = named_struct(struct_args);
5847
5848 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
5849 let session = self.session_ctx.read();
5850 let state = session.state();
5851
5852 let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
5854
5855 use datafusion::physical_planner::PhysicalPlanner;
5856 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
5857 let physical_struct_expr =
5858 planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
5859
5860 proj_exprs.push((physical_struct_expr, variable.to_string()));
5861
5862 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
5863 }
5864
5865 fn add_edge_structural_projection(
5867 &self,
5868 input: Arc<dyn ExecutionPlan>,
5869 variable: &str,
5870 properties: &[String],
5871 source_variable: &str,
5872 target_variable: &str,
5873 ) -> Result<Arc<dyn ExecutionPlan>> {
5874 use datafusion::functions::expr_fn::named_struct;
5875 use datafusion::logical_expr::lit;
5876 use datafusion::physical_plan::projection::ProjectionExec;
5877
5878 let input_schema = input.schema();
5879 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
5880 Vec::new();
5881
5882 for (i, field) in input_schema.fields().iter().enumerate() {
5884 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
5885 field.name(),
5886 i,
5887 ));
5888 proj_exprs.push((col_expr, field.name().clone()));
5889 }
5890
5891 let mut struct_args = Vec::with_capacity(properties.len() * 2 + 10);
5893
5894 struct_args.push(lit("_eid"));
5896 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5897 format!("{}._eid", variable),
5898 )));
5899
5900 struct_args.push(lit("_type"));
5901 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5902 format!("{}._type", variable),
5903 )));
5904
5905 let resolve_vid_col = |var: &str| -> String {
5910 let vid_col = format!("{}._vid", var);
5911 if input_schema.column_with_name(&vid_col).is_some() {
5912 vid_col
5913 } else {
5914 var.to_string()
5915 }
5916 };
5917 let src_col_name = resolve_vid_col(source_variable);
5918 let dst_col_name = resolve_vid_col(target_variable);
5919 struct_args.push(lit("_src"));
5920 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5921 src_col_name,
5922 )));
5923
5924 struct_args.push(lit("_dst"));
5925 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5926 dst_col_name,
5927 )));
5928
5929 let all_props_col = format!("{}._all_props", variable);
5931 if input_schema.column_with_name(&all_props_col).is_some() {
5932 struct_args.push(lit("_all_props"));
5933 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5934 all_props_col,
5935 )));
5936 }
5937
5938 for prop in properties {
5939 struct_args.push(lit(prop.clone()));
5940 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5941 format!("{}.{}", variable, prop),
5942 )));
5943 }
5944
5945 let struct_expr = named_struct(struct_args);
5946
5947 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
5948 let session = self.session_ctx.read();
5949 let state = session.state();
5950
5951 let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
5952
5953 use datafusion::physical_planner::PhysicalPlanner;
5954 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
5955 let physical_struct_expr =
5956 planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
5957
5958 proj_exprs.push((physical_struct_expr, variable.to_string()));
5959
5960 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
5961 }
5962
5963 fn create_physical_aggregate(
5965 &self,
5966 expr: &DfExpr,
5967 schema: &SchemaRef,
5968 state: &SessionState,
5969 ) -> Result<PhysicalAggregate> {
5970 use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
5971
5972 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
5974
5975 let (agg_expr, filter, _ordering) = create_aggregate_expr_and_maybe_filter(
5977 expr,
5978 &df_schema,
5979 schema.as_ref(),
5980 state.execution_props(),
5981 )?;
5982 Ok((agg_expr, filter))
5983 }
5984
5985 fn resolve_source_vid_col(
5990 input_plan: Arc<dyn ExecutionPlan>,
5991 source_variable: &str,
5992 ) -> Result<(Arc<dyn ExecutionPlan>, String)> {
5993 let source_vid_col = format!("{}._vid", source_variable);
5994 if input_plan
5995 .schema()
5996 .column_with_name(&source_vid_col)
5997 .is_some()
5998 {
5999 return Ok((input_plan, source_vid_col));
6000 }
6001 if let Ok(field) = input_plan.schema().field_with_name(source_variable)
6004 && matches!(
6005 field.data_type(),
6006 datafusion::arrow::datatypes::DataType::Struct(_)
6007 )
6008 {
6009 let enriched = Self::extract_struct_identity_columns(input_plan, source_variable)?;
6010 return Ok((enriched, format!("{}._vid", source_variable)));
6011 }
6012 Ok((input_plan, source_variable.to_string()))
6013 }
6014
6015 fn extract_struct_identity_columns(
6020 input: Arc<dyn ExecutionPlan>,
6021 variable: &str,
6022 ) -> Result<Arc<dyn ExecutionPlan>> {
6023 use datafusion::common::ScalarValue;
6024 use datafusion::physical_plan::projection::ProjectionExec;
6025
6026 let schema = input.schema();
6027 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
6028 Vec::new();
6029
6030 for (i, field) in schema.fields().iter().enumerate() {
6032 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
6033 field.name(),
6034 i,
6035 ));
6036 proj_exprs.push((col_expr, field.name().clone()));
6037 }
6038
6039 if let Some((struct_idx, struct_field)) = schema
6041 .fields()
6042 .iter()
6043 .enumerate()
6044 .find(|(_, f)| f.name() == variable)
6045 && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
6046 {
6047 let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
6048 datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
6049 );
6050 let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
6051 Arc::new(datafusion::logical_expr::ScalarUDF::from(
6052 datafusion::functions::core::getfield::GetFieldFunc::new(),
6053 ));
6054
6055 if fields.iter().any(|f| f.name() == "_vid") {
6057 let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
6058 Arc::new(datafusion::physical_expr::expressions::Literal::new(
6059 ScalarValue::Utf8(Some("_vid".to_string())),
6060 ));
6061 let vid_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
6062 get_field_udf.clone(),
6063 vec![struct_col.clone(), field_name],
6064 schema.as_ref(),
6065 Arc::new(datafusion::common::config::ConfigOptions::default()),
6066 )?);
6067 proj_exprs.push((vid_expr, format!("{}._vid", variable)));
6068 }
6069
6070 if fields.iter().any(|f| f.name() == "_labels") {
6072 let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
6073 Arc::new(datafusion::physical_expr::expressions::Literal::new(
6074 ScalarValue::Utf8(Some("_labels".to_string())),
6075 ));
6076 let labels_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
6077 get_field_udf,
6078 vec![struct_col, field_name],
6079 schema.as_ref(),
6080 Arc::new(datafusion::common::config::ConfigOptions::default()),
6081 )?);
6082 proj_exprs.push((labels_expr, format!("{}._labels", variable)));
6083 }
6084 }
6085
6086 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
6087 }
6088
6089 fn extract_all_struct_fields(
6093 input: Arc<dyn ExecutionPlan>,
6094 variable: &str,
6095 ) -> Result<Arc<dyn ExecutionPlan>> {
6096 use datafusion::common::ScalarValue;
6097 use datafusion::physical_plan::projection::ProjectionExec;
6098
6099 let schema = input.schema();
6100 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
6101 Vec::new();
6102
6103 for (i, field) in schema.fields().iter().enumerate() {
6105 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
6106 field.name(),
6107 i,
6108 ));
6109 proj_exprs.push((col_expr, field.name().clone()));
6110 }
6111
6112 if let Some((struct_idx, struct_field)) = schema
6114 .fields()
6115 .iter()
6116 .enumerate()
6117 .find(|(_, f)| f.name() == variable)
6118 && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
6119 {
6120 let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
6121 datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
6122 );
6123 let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
6124 Arc::new(datafusion::logical_expr::ScalarUDF::from(
6125 datafusion::functions::core::getfield::GetFieldFunc::new(),
6126 ));
6127
6128 for field in fields.iter() {
6129 let flat_name = format!("{}.{}", variable, field.name());
6130 if schema.column_with_name(&flat_name).is_some() {
6132 continue;
6133 }
6134 let field_lit: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
6135 Arc::new(datafusion::physical_expr::expressions::Literal::new(
6136 ScalarValue::Utf8(Some(field.name().to_string())),
6137 ));
6138 let extract_expr =
6139 Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
6140 get_field_udf.clone(),
6141 vec![struct_col.clone(), field_lit],
6142 schema.as_ref(),
6143 Arc::new(datafusion::common::config::ConfigOptions::default()),
6144 )?);
6145 proj_exprs.push((extract_expr, flat_name));
6146 }
6147 }
6148
6149 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
6150 }
6151
6152 fn is_large_binary_col(&self, expr: &DfExpr, schema: &SchemaRef) -> bool {
6154 if let DfExpr::Column(col) = expr
6155 && let Ok(field) = schema.field_with_name(&col.name)
6156 {
6157 return matches!(
6158 field.data_type(),
6159 datafusion::arrow::datatypes::DataType::LargeBinary
6160 );
6161 }
6162 true
6165 }
6166
6167 fn coerce_numeric_for_stat(arg: DfExpr, schema: &SchemaRef, this: &Self) -> DfExpr {
6173 if this.is_large_binary_col(&arg, schema) {
6174 crate::query::df_udfs::cypher_to_float64_expr(arg)
6175 } else {
6176 use datafusion::logical_expr::Cast;
6177 DfExpr::Cast(Cast::new(
6178 Box::new(arg),
6179 datafusion::arrow::datatypes::DataType::Float64,
6180 ))
6181 }
6182 }
6183}
6184
6185fn coerce_physical_expr(
6201 expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
6202 actual_dt: &DataType,
6203 target_dt: &DataType,
6204 schema: &arrow_schema::Schema,
6205) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
6206 use datafusion::physical_expr::expressions::CastExpr;
6207
6208 match (actual_dt, target_dt) {
6209 (DataType::LargeBinary, DataType::Float64) => wrap_cypher_to_float64(expr, schema),
6211 (DataType::LargeBinary, DataType::Int64) => {
6213 let float_expr = wrap_cypher_to_float64(expr, schema);
6214 Arc::new(CastExpr::new(float_expr, DataType::Int64, None))
6215 }
6216 _ => Arc::new(CastExpr::new(expr, target_dt.clone(), None)),
6218 }
6219}
6220
6221fn wrap_cypher_to_float64(
6223 expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
6224 schema: &arrow_schema::Schema,
6225) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
6226 let udf = Arc::new(super::df_udfs::cypher_to_float64_udf());
6227 let config = Arc::new(datafusion::common::config::ConfigOptions::default());
6228 Arc::new(
6229 datafusion::physical_expr::ScalarFunctionExpr::try_new(udf, vec![expr], schema, config)
6230 .expect("CypherToFloat64Udf accepts Any(1) signature"),
6231 )
6232}
6233
6234fn strip_conflicting_structural_columns(
6244 input: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
6245 derived_col_names: &HashSet<&str>,
6246) -> anyhow::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
6247 use datafusion::physical_plan::projection::ProjectionExec;
6248
6249 let schema = input.schema();
6250 let proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = schema
6251 .fields()
6252 .iter()
6253 .enumerate()
6254 .filter(|(_, f)| {
6255 !(matches!(f.data_type(), arrow_schema::DataType::Struct(_))
6257 && derived_col_names.contains(f.name().as_str()))
6258 })
6259 .map(|(i, f)| {
6260 let col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
6261 datafusion::physical_expr::expressions::Column::new(f.name(), i),
6262 );
6263 (col, f.name().clone())
6264 })
6265 .collect();
6266
6267 if proj_exprs.len() == schema.fields().len() {
6268 return Ok(input);
6270 }
6271
6272 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
6273}
6274
6275fn resolve_column_indices(
6276 schema: &arrow_schema::SchemaRef,
6277 column_names: &[String],
6278) -> anyhow::Result<Vec<usize>> {
6279 column_names
6280 .iter()
6281 .map(|name| {
6282 schema
6283 .index_of(name)
6284 .map_err(|_| anyhow::anyhow!("Column '{}' not found in schema", name))
6285 })
6286 .collect()
6287}
6288
6289fn resolve_best_by_criteria(
6291 schema: &arrow_schema::SchemaRef,
6292 criteria: &[(Expr, bool)],
6293) -> anyhow::Result<Vec<super::df_graph::locy_best_by::SortCriterion>> {
6294 criteria
6295 .iter()
6296 .map(|(expr, ascending)| {
6297 let candidates: Vec<String> = match expr {
6300 Expr::Property(base, prop) => {
6301 if let Expr::Variable(var) = base.as_ref() {
6302 vec![prop.clone(), format!("{}.{}", var, prop)]
6303 } else {
6304 vec![prop.clone()]
6305 }
6306 }
6307 Expr::Variable(name) => {
6308 let short = name.rsplit('.').next().unwrap_or(name).to_string();
6309 if short != *name {
6310 vec![short, name.clone()]
6311 } else {
6312 vec![name.clone()]
6313 }
6314 }
6315 _ => {
6316 return Err(anyhow::anyhow!(
6317 "BEST BY criteria must be variable or property access"
6318 ));
6319 }
6320 };
6321 let col_index = candidates
6322 .iter()
6323 .find_map(|name| schema.index_of(name).ok())
6324 .ok_or_else(|| {
6325 anyhow::anyhow!(
6326 "BEST BY column '{}' not found",
6327 candidates.first().unwrap_or(&String::new())
6328 )
6329 })?;
6330 Ok(super::df_graph::locy_best_by::SortCriterion {
6331 col_index,
6332 ascending: *ascending,
6333 nulls_first: false, })
6335 })
6336 .collect()
6337}
6338
6339fn resolve_fold_bindings(
6345 schema: &arrow_schema::SchemaRef,
6346 fold_bindings: &[(String, Expr)],
6347 plugin_registry: &uni_plugin::PluginRegistry,
6348) -> anyhow::Result<Vec<super::df_graph::locy_fold::FoldBinding>> {
6349 use super::df_graph::locy_fold::resolve_locy_aggregate;
6350 fold_bindings
6351 .iter()
6352 .map(|(output_name, expr)| {
6353 match expr {
6355 Expr::FunctionCall { name, args, .. } => {
6356 let upper = name.to_uppercase();
6357 let is_count = matches!(upper.as_str(), "COUNT" | "MCOUNT");
6358
6359 let canonical: smol_str::SmolStr = if is_count && args.is_empty() {
6360 smol_str::SmolStr::new_static("COUNTALL")
6361 } else {
6362 match upper.as_str() {
6363 "SUM" | "MSUM" => smol_str::SmolStr::new_static("SUM"),
6364 "COUNT" | "MCOUNT" => smol_str::SmolStr::new_static("COUNT"),
6365 "MAX" | "MMAX" => smol_str::SmolStr::new_static("MAX"),
6366 "MIN" | "MMIN" => smol_str::SmolStr::new_static("MIN"),
6367 "AVG" => smol_str::SmolStr::new_static("AVG"),
6368 "COLLECT" => smol_str::SmolStr::new_static("COLLECT"),
6369 "MNOR" => smol_str::SmolStr::new_static("MNOR"),
6370 "MPROD" => smol_str::SmolStr::new_static("MPROD"),
6371 other => {
6372 return Err(anyhow::anyhow!(
6373 "Unsupported FOLD aggregate function: {}",
6374 other
6375 ));
6376 }
6377 }
6378 };
6379
6380 let entry = resolve_locy_aggregate(plugin_registry, canonical.as_str())
6381 .ok_or_else(|| {
6382 anyhow::anyhow!(
6383 "Locy aggregate '{canonical}' is not registered in the plugin registry"
6384 )
6385 })?;
6386 let aggregate = Arc::clone(&entry.aggregate);
6387
6388 if canonical.as_str() == "COUNTALL" {
6390 return Ok(super::df_graph::locy_fold::FoldBinding {
6391 output_name: output_name.clone(),
6392 name: canonical,
6393 aggregate,
6394 input_col_index: 0,
6395 input_col_name: None,
6396 });
6397 }
6398
6399 let input_col_index = schema
6402 .index_of(output_name)
6403 .or_else(|_| {
6404 let col_name = match args.first() {
6406 Some(Expr::Variable(name)) => Some(name.clone()),
6407 Some(Expr::Property(base, prop)) => {
6408 if let Expr::Variable(var) = base.as_ref() {
6409 Some(format!("{}.{}", var, prop))
6410 } else {
6411 None
6412 }
6413 }
6414 _ => None,
6415 };
6416 col_name
6417 .and_then(|n| schema.index_of(&n).ok())
6418 .ok_or_else(|| {
6419 arrow_schema::ArrowError::SchemaError(format!(
6420 "FOLD column '{}' not found",
6421 output_name
6422 ))
6423 })
6424 })
6425 .map_err(|_| anyhow::anyhow!("FOLD column '{}' not found", output_name))?;
6426 Ok(super::df_graph::locy_fold::FoldBinding {
6427 output_name: output_name.clone(),
6428 name: canonical,
6429 aggregate,
6430 input_col_index,
6431 input_col_name: Some(output_name.clone()),
6432 })
6433 }
6434 _ => Err(anyhow::anyhow!(
6435 "FOLD binding must be an aggregate function call"
6436 )),
6437 }
6438 })
6439 .collect()
6440}
6441
6442fn collect_variable_kinds(plan: &LogicalPlan, kinds: &mut HashMap<String, VariableKind>) {
6447 match plan {
6448 LogicalPlan::FusedIndexScanWrapped { inner, .. } => {
6451 collect_variable_kinds(inner, kinds);
6452 }
6453 LogicalPlan::Scan { variable, .. }
6454 | LogicalPlan::FusedIndexScan { variable, .. }
6455 | LogicalPlan::ExtIdLookup { variable, .. }
6456 | LogicalPlan::ScanAll { variable, .. }
6457 | LogicalPlan::ScanMainByLabels { variable, .. }
6458 | LogicalPlan::VectorKnn { variable, .. }
6459 | LogicalPlan::InvertedIndexLookup { variable, .. } => {
6460 kinds.insert(variable.clone(), VariableKind::Node);
6461 }
6462 LogicalPlan::Traverse {
6463 input,
6464 source_variable,
6465 target_variable,
6466 step_variable,
6467 path_variable,
6468 is_variable_length,
6469 ..
6470 }
6471 | LogicalPlan::TraverseMainByType {
6472 input,
6473 source_variable,
6474 target_variable,
6475 step_variable,
6476 path_variable,
6477 is_variable_length,
6478 ..
6479 } => {
6480 collect_variable_kinds(input, kinds);
6481 kinds.insert(source_variable.clone(), VariableKind::Node);
6482 kinds.insert(target_variable.clone(), VariableKind::Node);
6483 if let Some(sv) = step_variable {
6484 kinds.insert(sv.clone(), VariableKind::edge_for(*is_variable_length));
6485 }
6486 if let Some(pv) = path_variable {
6487 kinds.insert(pv.clone(), VariableKind::Path);
6488 }
6489 }
6490 LogicalPlan::ShortestPath {
6491 input,
6492 source_variable,
6493 target_variable,
6494 path_variable,
6495 ..
6496 }
6497 | LogicalPlan::AllShortestPaths {
6498 input,
6499 source_variable,
6500 target_variable,
6501 path_variable,
6502 ..
6503 } => {
6504 collect_variable_kinds(input, kinds);
6505 kinds.insert(source_variable.clone(), VariableKind::Node);
6506 kinds.insert(target_variable.clone(), VariableKind::Node);
6507 kinds.insert(path_variable.clone(), VariableKind::Path);
6508 }
6509 LogicalPlan::QuantifiedPattern {
6510 input,
6511 pattern_plan,
6512 path_variable,
6513 start_variable,
6514 binding_variable,
6515 ..
6516 } => {
6517 collect_variable_kinds(input, kinds);
6518 collect_variable_kinds(pattern_plan, kinds);
6519 kinds.insert(start_variable.clone(), VariableKind::Node);
6520 kinds.insert(binding_variable.clone(), VariableKind::Node);
6521 if let Some(pv) = path_variable {
6522 kinds.insert(pv.clone(), VariableKind::Path);
6523 }
6524 }
6525 LogicalPlan::BindZeroLengthPath {
6526 input,
6527 node_variable,
6528 path_variable,
6529 } => {
6530 collect_variable_kinds(input, kinds);
6531 kinds.insert(node_variable.clone(), VariableKind::Node);
6532 kinds.insert(path_variable.clone(), VariableKind::Path);
6533 }
6534 LogicalPlan::BindPath {
6535 input,
6536 node_variables,
6537 edge_variables,
6538 path_variable,
6539 } => {
6540 collect_variable_kinds(input, kinds);
6541 for nv in node_variables {
6542 kinds.insert(nv.clone(), VariableKind::Node);
6543 }
6544 for ev in edge_variables {
6545 kinds.insert(ev.clone(), VariableKind::Edge);
6546 }
6547 kinds.insert(path_variable.clone(), VariableKind::Path);
6548 }
6549 LogicalPlan::Filter { input, .. }
6551 | LogicalPlan::Project { input, .. }
6552 | LogicalPlan::Sort { input, .. }
6553 | LogicalPlan::Limit { input, .. }
6554 | LogicalPlan::Aggregate { input, .. }
6555 | LogicalPlan::Distinct { input, .. }
6556 | LogicalPlan::Window { input, .. }
6557 | LogicalPlan::Unwind { input, .. }
6558 | LogicalPlan::Create { input, .. }
6559 | LogicalPlan::CreateBatch { input, .. }
6560 | LogicalPlan::Merge { input, .. }
6561 | LogicalPlan::Set { input, .. }
6562 | LogicalPlan::Remove { input, .. }
6563 | LogicalPlan::Delete { input, .. }
6564 | LogicalPlan::Foreach { input, .. }
6565 | LogicalPlan::SubqueryCall { input, .. } => {
6566 collect_variable_kinds(input, kinds);
6567 }
6568 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
6569 collect_variable_kinds(left, kinds);
6570 collect_variable_kinds(right, kinds);
6571 }
6572 LogicalPlan::Apply {
6573 input, subquery, ..
6574 } => {
6575 collect_variable_kinds(input, kinds);
6576 collect_variable_kinds(subquery, kinds);
6577 }
6578 LogicalPlan::RecursiveCTE {
6579 initial, recursive, ..
6580 } => {
6581 collect_variable_kinds(initial, kinds);
6582 collect_variable_kinds(recursive, kinds);
6583 }
6584 LogicalPlan::Explain { plan } => {
6585 collect_variable_kinds(plan, kinds);
6586 }
6587 LogicalPlan::ProcedureCall {
6588 procedure_name,
6589 yield_items,
6590 ..
6591 } => {
6592 use crate::query::df_graph::procedure_call::{
6593 is_node_yield_procedure_static, map_yield_to_canonical,
6594 };
6595 for (name, alias) in yield_items {
6596 let var = alias.as_ref().unwrap_or(name);
6597 if is_node_yield_procedure_static(procedure_name.as_str()) {
6598 let canonical = map_yield_to_canonical(name);
6599 if canonical == "node" {
6600 kinds.insert(var.clone(), VariableKind::Node);
6601 }
6602 }
6604 }
6606 }
6607 LogicalPlan::LocyProgram { .. }
6609 | LogicalPlan::LocyFold { .. }
6610 | LogicalPlan::LocyBestBy { .. }
6611 | LogicalPlan::LocyPriority { .. }
6612 | LogicalPlan::LocyDerivedScan { .. }
6613 | LogicalPlan::LocyProject { .. }
6614 | LogicalPlan::LocyModelInvoke { .. } => {}
6615 LogicalPlan::Empty
6617 | LogicalPlan::CreateVectorIndex { .. }
6618 | LogicalPlan::CreateFullTextIndex { .. }
6619 | LogicalPlan::CreateScalarIndex { .. }
6620 | LogicalPlan::CreateJsonFtsIndex { .. }
6621 | LogicalPlan::DropIndex { .. }
6622 | LogicalPlan::ShowIndexes { .. }
6623 | LogicalPlan::Copy { .. }
6624 | LogicalPlan::Backup { .. }
6625 | LogicalPlan::ShowDatabase
6626 | LogicalPlan::ShowConfig
6627 | LogicalPlan::ShowStatistics
6628 | LogicalPlan::Vacuum
6629 | LogicalPlan::Checkpoint
6630 | LogicalPlan::CopyTo { .. }
6631 | LogicalPlan::CopyFrom { .. }
6632 | LogicalPlan::CreateLabel(_)
6633 | LogicalPlan::CreateEdgeType(_)
6634 | LogicalPlan::AlterLabel(_)
6635 | LogicalPlan::AlterEdgeType(_)
6636 | LogicalPlan::DropLabel(_)
6637 | LogicalPlan::DropEdgeType(_)
6638 | LogicalPlan::CreateConstraint(_)
6639 | LogicalPlan::DropConstraint(_)
6640 | LogicalPlan::ShowConstraints(_) => {}
6641 }
6642}
6643
6644fn collect_mutation_node_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
6649 match plan {
6650 LogicalPlan::Create { input, pattern } => {
6651 collect_node_names_from_pattern(pattern, hints);
6652 collect_mutation_node_hints(input, hints);
6653 }
6654 LogicalPlan::CreateBatch { input, patterns } => {
6655 for pattern in patterns {
6656 collect_node_names_from_pattern(pattern, hints);
6657 }
6658 collect_mutation_node_hints(input, hints);
6659 }
6660 LogicalPlan::Merge { input, pattern, .. } => {
6661 collect_node_names_from_pattern(pattern, hints);
6662 collect_mutation_node_hints(input, hints);
6663 }
6664 LogicalPlan::Traverse { input, .. }
6666 | LogicalPlan::TraverseMainByType { input, .. }
6667 | LogicalPlan::Filter { input, .. }
6668 | LogicalPlan::Project { input, .. }
6669 | LogicalPlan::Sort { input, .. }
6670 | LogicalPlan::Limit { input, .. }
6671 | LogicalPlan::Aggregate { input, .. }
6672 | LogicalPlan::Distinct { input, .. }
6673 | LogicalPlan::Window { input, .. }
6674 | LogicalPlan::Unwind { input, .. }
6675 | LogicalPlan::Set { input, .. }
6676 | LogicalPlan::Remove { input, .. }
6677 | LogicalPlan::Delete { input, .. }
6678 | LogicalPlan::Foreach { input, .. }
6679 | LogicalPlan::SubqueryCall { input, .. }
6680 | LogicalPlan::ShortestPath { input, .. }
6681 | LogicalPlan::AllShortestPaths { input, .. }
6682 | LogicalPlan::QuantifiedPattern { input, .. }
6683 | LogicalPlan::BindZeroLengthPath { input, .. }
6684 | LogicalPlan::BindPath { input, .. } => {
6685 collect_mutation_node_hints(input, hints);
6686 }
6687 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
6688 collect_mutation_node_hints(left, hints);
6689 collect_mutation_node_hints(right, hints);
6690 }
6691 LogicalPlan::Apply {
6692 input, subquery, ..
6693 } => {
6694 collect_mutation_node_hints(input, hints);
6695 collect_mutation_node_hints(subquery, hints);
6696 }
6697 LogicalPlan::RecursiveCTE {
6698 initial, recursive, ..
6699 } => {
6700 collect_mutation_node_hints(initial, hints);
6701 collect_mutation_node_hints(recursive, hints);
6702 }
6703 LogicalPlan::Explain { plan } => {
6704 collect_mutation_node_hints(plan, hints);
6705 }
6706 _ => {}
6708 }
6709}
6710
6711fn collect_node_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
6713 for path in &pattern.paths {
6714 for element in &path.elements {
6715 match element {
6716 PatternElement::Node(n) => {
6717 if let Some(ref v) = n.variable
6718 && !hints.contains(v)
6719 {
6720 hints.push(v.clone());
6721 }
6722 }
6723 PatternElement::Parenthesized { pattern, .. } => {
6724 let sub = Pattern {
6725 paths: vec![pattern.as_ref().clone()],
6726 };
6727 collect_node_names_from_pattern(&sub, hints);
6728 }
6729 _ => {}
6730 }
6731 }
6732 }
6733}
6734
6735fn collect_mutation_edge_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
6739 match plan {
6740 LogicalPlan::Create { input, pattern } | LogicalPlan::Merge { input, pattern, .. } => {
6741 collect_edge_names_from_pattern(pattern, hints);
6742 collect_mutation_edge_hints(input, hints);
6743 }
6744 LogicalPlan::CreateBatch { input, patterns } => {
6745 for pattern in patterns {
6746 collect_edge_names_from_pattern(pattern, hints);
6747 }
6748 collect_mutation_edge_hints(input, hints);
6749 }
6750 LogicalPlan::Traverse { input, .. }
6752 | LogicalPlan::TraverseMainByType { input, .. }
6753 | LogicalPlan::Filter { input, .. }
6754 | LogicalPlan::Project { input, .. }
6755 | LogicalPlan::Sort { input, .. }
6756 | LogicalPlan::Limit { input, .. }
6757 | LogicalPlan::Aggregate { input, .. }
6758 | LogicalPlan::Distinct { input, .. }
6759 | LogicalPlan::Window { input, .. }
6760 | LogicalPlan::Unwind { input, .. }
6761 | LogicalPlan::Set { input, .. }
6762 | LogicalPlan::Remove { input, .. }
6763 | LogicalPlan::Delete { input, .. }
6764 | LogicalPlan::Foreach { input, .. }
6765 | LogicalPlan::SubqueryCall { input, .. }
6766 | LogicalPlan::ShortestPath { input, .. }
6767 | LogicalPlan::AllShortestPaths { input, .. }
6768 | LogicalPlan::QuantifiedPattern { input, .. }
6769 | LogicalPlan::BindZeroLengthPath { input, .. }
6770 | LogicalPlan::BindPath { input, .. } => {
6771 collect_mutation_edge_hints(input, hints);
6772 }
6773 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
6774 collect_mutation_edge_hints(left, hints);
6775 collect_mutation_edge_hints(right, hints);
6776 }
6777 LogicalPlan::Apply {
6778 input, subquery, ..
6779 } => {
6780 collect_mutation_edge_hints(input, hints);
6781 collect_mutation_edge_hints(subquery, hints);
6782 }
6783 LogicalPlan::RecursiveCTE {
6784 initial, recursive, ..
6785 } => {
6786 collect_mutation_edge_hints(initial, hints);
6787 collect_mutation_edge_hints(recursive, hints);
6788 }
6789 LogicalPlan::Explain { plan } => {
6790 collect_mutation_edge_hints(plan, hints);
6791 }
6792 _ => {}
6793 }
6794}
6795
6796fn collect_edge_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
6798 for path in &pattern.paths {
6799 for element in &path.elements {
6800 match element {
6801 PatternElement::Relationship(r) => {
6802 if let Some(ref v) = r.variable
6803 && !hints.contains(v)
6804 {
6805 hints.push(v.clone());
6806 }
6807 }
6808 PatternElement::Parenthesized { pattern, .. } => {
6809 let sub = Pattern {
6810 paths: vec![pattern.as_ref().clone()],
6811 };
6812 collect_edge_names_from_pattern(&sub, hints);
6813 }
6814 _ => {}
6815 }
6816 }
6817 }
6818}
6819
6820fn convert_direction(ast_dir: AstDirection) -> Direction {
6822 match ast_dir {
6823 AstDirection::Outgoing => Direction::Outgoing,
6824 AstDirection::Incoming => Direction::Incoming,
6825 AstDirection::Both => Direction::Both,
6826 }
6827}
6828
6829fn sanitize_vlp_target_properties(
6834 mut properties: Vec<String>,
6835 target_has_wildcard: bool,
6836 target_label_props: Option<&HashSet<String>>,
6837) -> Vec<String> {
6838 properties.retain(|p| p != "*");
6839
6840 if target_has_wildcard && properties.is_empty() {
6841 properties.push("_all_props".to_string());
6842 }
6843
6844 let has_non_schema_props = properties.iter().any(|p| {
6845 p != "_all_props"
6846 && p != "overflow_json"
6847 && !p.starts_with('_')
6848 && !target_label_props.is_some_and(|props| props.contains(p))
6849 });
6850 if has_non_schema_props && !properties.iter().any(|p| p == "_all_props") {
6851 properties.push("_all_props".to_string());
6852 }
6853
6854 properties
6855}
6856
6857struct JoinPredicateClassification {
6864 equi_pairs: Vec<(Expr, Expr)>,
6868 left_only: Vec<Expr>,
6871 right_only: Vec<Expr>,
6874 residual: Option<Expr>,
6877}
6878
6879pub(crate) fn collect_plan_variables(plan: &LogicalPlan) -> HashSet<String> {
6884 let mut out = HashSet::new();
6885 collect_plan_variables_into(plan, &mut out);
6886 out
6887}
6888
6889fn collect_plan_variables_into(plan: &LogicalPlan, out: &mut HashSet<String>) {
6890 match plan {
6891 LogicalPlan::Scan { variable, .. }
6892 | LogicalPlan::ExtIdLookup { variable, .. }
6893 | LogicalPlan::ScanAll { variable, .. }
6894 | LogicalPlan::ScanMainByLabels { variable, .. } => {
6895 out.insert(variable.clone());
6896 }
6897 LogicalPlan::Unwind {
6898 input, variable, ..
6899 } => {
6900 out.insert(variable.clone());
6901 collect_plan_variables_into(input, out);
6902 }
6903 LogicalPlan::Traverse {
6904 input,
6905 source_variable,
6906 target_variable,
6907 step_variable,
6908 path_variable,
6909 ..
6910 } => {
6911 collect_plan_variables_into(input, out);
6912 out.insert(source_variable.clone());
6913 out.insert(target_variable.clone());
6914 if let Some(s) = step_variable {
6915 out.insert(s.clone());
6916 }
6917 if let Some(p) = path_variable {
6918 out.insert(p.clone());
6919 }
6920 }
6921 LogicalPlan::TraverseMainByType {
6922 input,
6923 source_variable,
6924 target_variable,
6925 step_variable,
6926 path_variable,
6927 ..
6928 } => {
6929 collect_plan_variables_into(input, out);
6930 out.insert(source_variable.clone());
6931 out.insert(target_variable.clone());
6932 if let Some(s) = step_variable {
6933 out.insert(s.clone());
6934 }
6935 if let Some(p) = path_variable {
6936 out.insert(p.clone());
6937 }
6938 }
6939 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
6940 collect_plan_variables_into(left, out);
6941 collect_plan_variables_into(right, out);
6942 }
6943 LogicalPlan::Apply {
6944 input, subquery, ..
6945 } => {
6946 collect_plan_variables_into(input, out);
6947 collect_plan_variables_into(subquery, out);
6948 }
6949 LogicalPlan::Filter { input, .. }
6950 | LogicalPlan::Project { input, .. }
6951 | LogicalPlan::Sort { input, .. }
6952 | LogicalPlan::Limit { input, .. }
6953 | LogicalPlan::Aggregate { input, .. }
6954 | LogicalPlan::Distinct { input }
6955 | LogicalPlan::Window { input, .. }
6956 | LogicalPlan::Create { input, .. }
6957 | LogicalPlan::CreateBatch { input, .. }
6958 | LogicalPlan::Merge { input, .. }
6959 | LogicalPlan::Set { input, .. }
6960 | LogicalPlan::Remove { input, .. }
6961 | LogicalPlan::Delete { input, .. }
6962 | LogicalPlan::Foreach { input, .. }
6963 | LogicalPlan::SubqueryCall { input, .. } => {
6964 collect_plan_variables_into(input, out);
6965 }
6966 _ => {}
6968 }
6969}
6970
6971fn collect_expr_variables_set(expr: &Expr) -> HashSet<String> {
6973 let mut out = HashSet::new();
6974 collect_expr_variables_into(expr, &mut out);
6975 out
6976}
6977
6978fn collect_expr_variables_into(expr: &Expr, out: &mut HashSet<String>) {
6979 use uni_cypher::ast::Expr as E;
6980 match expr {
6981 E::Variable(v) => {
6982 out.insert(v.clone());
6983 }
6984 E::Property(base, _) => collect_expr_variables_into(base, out),
6985 E::BinaryOp { left, right, .. } => {
6986 collect_expr_variables_into(left, out);
6987 collect_expr_variables_into(right, out);
6988 }
6989 E::UnaryOp { expr, .. } | E::IsNull(expr) | E::IsNotNull(expr) | E::IsUnique(expr) => {
6990 collect_expr_variables_into(expr, out)
6991 }
6992 E::FunctionCall { args, .. } => {
6993 for a in args {
6994 collect_expr_variables_into(a, out);
6995 }
6996 }
6997 E::List(items) => {
6998 for it in items {
6999 collect_expr_variables_into(it, out);
7000 }
7001 }
7002 E::In { expr, list } => {
7003 collect_expr_variables_into(expr, out);
7004 collect_expr_variables_into(list, out);
7005 }
7006 E::Case {
7007 expr,
7008 when_then,
7009 else_expr,
7010 } => {
7011 if let Some(e) = expr {
7012 collect_expr_variables_into(e, out);
7013 }
7014 for (w, t) in when_then {
7015 collect_expr_variables_into(w, out);
7016 collect_expr_variables_into(t, out);
7017 }
7018 if let Some(e) = else_expr {
7019 collect_expr_variables_into(e, out);
7020 }
7021 }
7022 E::Map(entries) => {
7023 for (_, v) in entries {
7024 collect_expr_variables_into(v, out);
7025 }
7026 }
7027 E::LabelCheck { expr, .. } => collect_expr_variables_into(expr, out),
7028 E::ArrayIndex { array, index } => {
7029 collect_expr_variables_into(array, out);
7030 collect_expr_variables_into(index, out);
7031 }
7032 E::ArraySlice { array, start, end } => {
7033 collect_expr_variables_into(array, out);
7034 if let Some(s) = start {
7035 collect_expr_variables_into(s, out);
7036 }
7037 if let Some(e) = end {
7038 collect_expr_variables_into(e, out);
7039 }
7040 }
7041 _ => {}
7044 }
7045}
7046
7047fn split_and_conjuncts(predicate: &Expr) -> Vec<Expr> {
7049 use uni_cypher::ast::BinaryOp;
7050 let mut out = Vec::new();
7051 fn walk(e: &Expr, out: &mut Vec<Expr>) {
7052 if let Expr::BinaryOp {
7053 left,
7054 op: BinaryOp::And,
7055 right,
7056 } = e
7057 {
7058 walk(left, out);
7059 walk(right, out);
7060 } else {
7061 out.push(e.clone());
7062 }
7063 }
7064 walk(predicate, &mut out);
7065 out
7066}
7067
7068fn and_combine(exprs: Vec<Expr>) -> Option<Expr> {
7070 use uni_cypher::ast::BinaryOp;
7071 let mut iter = exprs.into_iter();
7072 let first = iter.next()?;
7073 Some(iter.fold(first, |acc, e| Expr::BinaryOp {
7074 left: Box::new(acc),
7075 op: BinaryOp::And,
7076 right: Box::new(e),
7077 }))
7078}
7079
7080fn classify_join_predicate(
7083 predicate: &Expr,
7084 left_vars: &HashSet<String>,
7085 right_vars: &HashSet<String>,
7086) -> JoinPredicateClassification {
7087 use uni_cypher::ast::BinaryOp;
7088
7089 let mut equi_pairs = Vec::new();
7090 let mut left_only = Vec::new();
7091 let mut right_only = Vec::new();
7092 let mut residual_parts: Vec<Expr> = Vec::new();
7093
7094 for conjunct in split_and_conjuncts(predicate) {
7095 if let Expr::BinaryOp {
7098 left,
7099 op: BinaryOp::Eq,
7100 right,
7101 } = &conjunct
7102 {
7103 let lv = collect_expr_variables_set(left);
7104 let rv = collect_expr_variables_set(right);
7105 let l_in_left = !lv.is_empty() && lv.is_subset(left_vars);
7106 let r_in_right = !rv.is_empty() && rv.is_subset(right_vars);
7107 let l_in_right = !lv.is_empty() && lv.is_subset(right_vars);
7108 let r_in_left = !rv.is_empty() && rv.is_subset(left_vars);
7109 if l_in_left && r_in_right {
7110 equi_pairs.push(((**left).clone(), (**right).clone()));
7111 continue;
7112 }
7113 if l_in_right && r_in_left {
7114 equi_pairs.push(((**right).clone(), (**left).clone()));
7115 continue;
7116 }
7117 }
7118
7119 let vars = collect_expr_variables_set(&conjunct);
7121 let touches_left = vars.iter().any(|v| left_vars.contains(v));
7122 let touches_right = vars.iter().any(|v| right_vars.contains(v));
7123 match (touches_left, touches_right) {
7124 (true, false) => left_only.push(conjunct),
7125 (false, true) => right_only.push(conjunct),
7126 _ => residual_parts.push(conjunct),
7128 }
7129 }
7130
7131 JoinPredicateClassification {
7132 equi_pairs,
7133 left_only,
7134 right_only,
7135 residual: and_combine(residual_parts),
7136 }
7137}
7138
7139const MAX_UNWIND_IN_PUSHDOWN_VALUES: usize = 10_000;
7143
7144fn warn_unpushable_unwind_once(reason: &'static str) {
7157 use std::sync::atomic::{AtomicBool, Ordering};
7158 static WARNED: AtomicBool = AtomicBool::new(false);
7159 if WARNED.swap(true, Ordering::Relaxed) {
7160 return;
7161 }
7162 tracing::warn!(
7163 target: "uni_query::cross_join_in_pushdown",
7164 reason,
7165 "Inlined UNWIND of map literals failed pushdown — falling back \
7166 to FilterExec over a full scan. Rewrite as `UNWIND $param AS u` \
7167 with the param bound as a List<Map<...>> to guarantee pushdown."
7168 );
7169}
7170
7171fn value_to_cypher_literal(v: &uni_common::Value) -> Option<CypherLiteral> {
7172 use uni_common::Value;
7173 match v {
7174 Value::Null => Some(CypherLiteral::Null),
7175 Value::Bool(b) => Some(CypherLiteral::Bool(*b)),
7176 Value::Int(n) => Some(CypherLiteral::Integer(*n)),
7177 Value::Float(f) => Some(CypherLiteral::Float(*f)),
7178 Value::String(s) => Some(CypherLiteral::String(s.clone())),
7179 _ => None,
7180 }
7181}
7182
7183fn walk_static_unwind_chain<F, T>(
7203 plan: &LogicalPlan,
7204 target_var: &str,
7205 extract: &mut F,
7206) -> Option<T>
7207where
7208 F: FnMut(&Expr) -> Option<T>,
7209{
7210 match plan {
7211 LogicalPlan::Unwind {
7212 input,
7213 expr,
7214 variable,
7215 } if variable == target_var => {
7216 extract(expr).or_else(|| walk_static_unwind_chain(input, target_var, extract))
7217 }
7218 LogicalPlan::Filter { input, .. }
7220 | LogicalPlan::Project { input, .. }
7221 | LogicalPlan::Unwind { input, .. } => walk_static_unwind_chain(input, target_var, extract),
7222 LogicalPlan::CrossJoin { left, right } => {
7225 walk_static_unwind_chain(left, target_var, extract)
7226 .or_else(|| walk_static_unwind_chain(right, target_var, extract))
7227 }
7228 _ => None,
7229 }
7230}
7231
7232fn extract_static_unwind_values(
7233 plan: &LogicalPlan,
7234 target_var: &str,
7235 params: &HashMap<String, uni_common::Value>,
7236) -> Option<Vec<Expr>> {
7237 walk_static_unwind_chain(plan, target_var, &mut |expr| {
7238 materialize_unwind_source(expr, params)
7239 })
7240}
7241
7242fn extract_static_unwind_field_values(
7245 plan: &LogicalPlan,
7246 target_var: &str,
7247 field: &str,
7248 params: &HashMap<String, uni_common::Value>,
7249) -> Option<Vec<Expr>> {
7250 walk_static_unwind_chain(plan, target_var, &mut |expr| {
7251 materialize_unwind_source_field(expr, params, field)
7252 })
7253}
7254
7255fn materialize_unwind_source(
7257 expr: &Expr,
7258 params: &HashMap<String, uni_common::Value>,
7259) -> Option<Vec<Expr>> {
7260 match expr {
7261 Expr::List(items) => {
7262 if items.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7263 return None;
7264 }
7265 let mut out = Vec::with_capacity(items.len());
7266 for item in items {
7267 match item {
7268 Expr::Literal(_) => out.push(item.clone()),
7269 _ => return None,
7270 }
7271 }
7272 Some(out)
7273 }
7274 Expr::Parameter(name) => match params.get(name)? {
7275 uni_common::Value::List(values) => {
7276 if values.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7277 return None;
7278 }
7279 let mut out = Vec::with_capacity(values.len());
7280 for v in values {
7281 out.push(Expr::Literal(value_to_cypher_literal(v)?));
7282 }
7283 Some(out)
7284 }
7285 _ => None,
7286 },
7287 _ => None,
7288 }
7289}
7290
7291fn materialize_unwind_source_field(
7301 expr: &Expr,
7302 params: &HashMap<String, uni_common::Value>,
7303 field: &str,
7304) -> Option<Vec<Expr>> {
7305 match expr {
7306 Expr::List(items) => {
7307 if items.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7308 warn_unpushable_unwind_once("UNWIND list exceeds MAX_UNWIND_IN_PUSHDOWN_VALUES");
7309 return None;
7310 }
7311 let mut out = Vec::with_capacity(items.len());
7324 for item in items {
7325 let entries = match item {
7326 Expr::Map(entries) => entries,
7327 _ => return None,
7328 };
7329 let Some((_, value_expr)) = entries.iter().find(|(k, _)| k == field) else {
7330 warn_unpushable_unwind_once(
7331 "UNWIND map literal is missing the field referenced by the join predicate",
7332 );
7333 return None;
7334 };
7335 let Expr::Literal(_) = value_expr else {
7336 warn_unpushable_unwind_once(
7337 "UNWIND map literal has a non-literal value at the joined field \
7338 (e.g., a parameter or function call) — substitute with a literal \
7339 or rewrite as `UNWIND $param AS u` with the param bound at runtime",
7340 );
7341 return None;
7342 };
7343 out.push(value_expr.clone());
7344 }
7345 Some(out)
7346 }
7347 Expr::Parameter(name) => match params.get(name)? {
7348 uni_common::Value::List(values) => {
7349 if values.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7350 return None;
7351 }
7352 let mut out = Vec::with_capacity(values.len());
7353 for v in values {
7354 let map = match v {
7355 uni_common::Value::Map(m) => m,
7356 _ => return None,
7357 };
7358 let inner = map.get(field)?;
7359 out.push(Expr::Literal(value_to_cypher_literal(inner)?));
7360 }
7361 Some(out)
7362 }
7363 _ => None,
7364 },
7365 _ => None,
7366 }
7367}
7368
7369fn build_in_pushdown(
7381 unwind_side_expr: &Expr,
7382 scan_side_expr: &Expr,
7383 unwind_subplan: &LogicalPlan,
7384 params: &HashMap<String, uni_common::Value>,
7385) -> Option<Expr> {
7386 let (unwind_var, field) = match unwind_side_expr {
7388 Expr::Variable(v) => (v.as_str(), None),
7389 Expr::Property(box_var, f) => match box_var.as_ref() {
7390 Expr::Variable(v) => (v.as_str(), Some(f.as_str())),
7391 _ => {
7392 tracing::debug!(
7393 target: "uni_query::cross_join_in_pushdown",
7394 reason = "unwind side Property inner is not Variable",
7395 "build_in_pushdown rejected"
7396 );
7397 return None;
7398 }
7399 },
7400 _ => {
7401 tracing::debug!(
7402 target: "uni_query::cross_join_in_pushdown",
7403 reason = "unwind side is not Variable or Property",
7404 unwind_kind = std::any::type_name_of_val(&unwind_side_expr),
7405 "build_in_pushdown rejected"
7406 );
7407 return None;
7408 }
7409 };
7410
7411 let Expr::Property(scan_box_var, _scan_field) = scan_side_expr else {
7415 tracing::debug!(
7416 target: "uni_query::cross_join_in_pushdown",
7417 reason = "scan side is not Property",
7418 "build_in_pushdown rejected"
7419 );
7420 return None;
7421 };
7422 if !matches!(scan_box_var.as_ref(), Expr::Variable(_)) {
7423 tracing::debug!(
7424 target: "uni_query::cross_join_in_pushdown",
7425 reason = "scan side Property inner is not Variable",
7426 "build_in_pushdown rejected"
7427 );
7428 return None;
7429 }
7430
7431 let values = match field {
7436 None => match extract_static_unwind_values(unwind_subplan, unwind_var, params) {
7437 Some(v) => v,
7438 None => {
7439 tracing::debug!(
7440 target: "uni_query::cross_join_in_pushdown",
7441 reason = "extract_static_unwind_values returned None",
7442 unwind_var,
7443 "build_in_pushdown rejected"
7444 );
7445 return None;
7446 }
7447 },
7448 Some(f) => {
7449 match extract_static_unwind_field_values(unwind_subplan, unwind_var, f, params) {
7450 Some(v) => v,
7451 None => {
7452 tracing::debug!(
7453 target: "uni_query::cross_join_in_pushdown",
7454 reason = "extract_static_unwind_field_values returned None \
7455 (UNWIND source is not Expr::Parameter, or param is not \
7456 Value::List<Value::Map>, or a map element lacks field, \
7457 or list size exceeded MAX_UNWIND_IN_PUSHDOWN_VALUES)",
7458 unwind_var,
7459 field = f,
7460 "build_in_pushdown rejected"
7461 );
7462 return None;
7463 }
7464 }
7465 }
7466 };
7467 if values.is_empty() {
7468 tracing::debug!(
7469 target: "uni_query::cross_join_in_pushdown",
7470 reason = "extracted value list is empty",
7471 unwind_var,
7472 ?field,
7473 "build_in_pushdown rejected"
7474 );
7475 return None;
7476 }
7477
7478 tracing::debug!(
7479 target: "uni_query::cross_join_in_pushdown",
7480 unwind_var,
7481 ?field,
7482 values_count = values.len(),
7483 "build_in_pushdown extracted IN-list"
7484 );
7485 Some(Expr::In {
7486 expr: Box::new(scan_side_expr.clone()),
7487 list: Box::new(Expr::List(values)),
7488 })
7489}
7490
7491fn expr_is_vid_property(expr: &Expr) -> bool {
7498 matches!(
7499 expr,
7500 Expr::Property(inner, prop)
7501 if prop == "_vid" && matches!(inner.as_ref(), Expr::Variable(_))
7502 )
7503}
7504
7505fn wrap_with_filter(plan: LogicalPlan, filters: &[Expr]) -> LogicalPlan {
7506 if filters.is_empty() {
7507 return plan;
7508 }
7509 let predicate = and_combine(filters.to_vec()).expect("non-empty filters");
7510 match plan {
7518 LogicalPlan::Scan {
7519 label_id,
7520 labels,
7521 variable,
7522 filter: existing,
7523 optional,
7524 } => LogicalPlan::Scan {
7525 label_id,
7526 labels,
7527 variable,
7528 filter: merge_filter(existing, predicate),
7529 optional,
7530 },
7531 LogicalPlan::ScanMainByLabels {
7532 labels,
7533 variable,
7534 filter: existing,
7535 optional,
7536 } => LogicalPlan::ScanMainByLabels {
7537 labels,
7538 variable,
7539 filter: merge_filter(existing, predicate),
7540 optional,
7541 },
7542 LogicalPlan::ScanAll {
7543 variable,
7544 filter: existing,
7545 optional,
7546 } => LogicalPlan::ScanAll {
7547 variable,
7548 filter: merge_filter(existing, predicate),
7549 optional,
7550 },
7551 other => LogicalPlan::Filter {
7555 input: Box::new(other),
7556 predicate,
7557 optional_variables: HashSet::new(),
7558 },
7559 }
7560}
7561
7562fn merge_filter(existing: Option<Expr>, predicate: Expr) -> Option<Expr> {
7570 match existing {
7571 Some(prev) if prev == predicate => Some(prev),
7572 Some(prev) => and_combine(vec![prev, predicate]),
7573 None => Some(predicate),
7574 }
7575}
7576
7577fn merge_unwind_in_filters(
7615 plan: &LogicalPlan,
7616 params: &HashMap<String, uni_common::Value>,
7617) -> LogicalPlan {
7618 match plan {
7619 LogicalPlan::Filter {
7621 input,
7622 predicate,
7623 optional_variables,
7624 } if matches!(input.as_ref(), LogicalPlan::CrossJoin { .. }) => {
7625 let LogicalPlan::CrossJoin { left, right } = input.as_ref() else {
7627 unreachable!("matches! above guarantees CrossJoin")
7628 };
7629
7630 let left_rewritten = merge_unwind_in_filters(left, params);
7632 let right_rewritten = merge_unwind_in_filters(right, params);
7633
7634 let left_vars = collect_plan_variables(&left_rewritten);
7635 let right_vars = collect_plan_variables(&right_rewritten);
7636 let cls = classify_join_predicate(predicate, &left_vars, &right_vars);
7637
7638 let rebuild_unmodified = |l: LogicalPlan, r: LogicalPlan| LogicalPlan::Filter {
7639 input: Box::new(LogicalPlan::CrossJoin {
7640 left: Box::new(l),
7641 right: Box::new(r),
7642 }),
7643 predicate: predicate.clone(),
7644 optional_variables: optional_variables.clone(),
7645 };
7646
7647 if cls.equi_pairs.is_empty() {
7648 return rebuild_unmodified(left_rewritten, right_rewritten);
7649 }
7650
7651 let mut left_extra_in: Vec<Expr> = Vec::new();
7655 let mut right_extra_in: Vec<Expr> = Vec::new();
7656 for (l_expr, r_expr) in &cls.equi_pairs {
7657 if let Some(in_filter) = build_in_pushdown(l_expr, r_expr, &left_rewritten, params)
7658 {
7659 right_extra_in.push(in_filter);
7660 continue;
7661 }
7662 if let Some(in_filter) = build_in_pushdown(r_expr, l_expr, &left_rewritten, params)
7663 {
7664 right_extra_in.push(in_filter);
7665 continue;
7666 }
7667 if let Some(in_filter) = build_in_pushdown(l_expr, r_expr, &right_rewritten, params)
7668 {
7669 left_extra_in.push(in_filter);
7670 continue;
7671 }
7672 if let Some(in_filter) = build_in_pushdown(r_expr, l_expr, &right_rewritten, params)
7673 {
7674 left_extra_in.push(in_filter);
7675 }
7676 }
7677
7678 tracing::debug!(
7679 target: "uni_query::cross_join_in_pushdown",
7680 left_in_filters = left_extra_in.len(),
7681 right_in_filters = right_extra_in.len(),
7682 "merge_unwind_in_filters: IN-pushdown result"
7683 );
7684
7685 if left_extra_in.is_empty() && right_extra_in.is_empty() {
7686 return rebuild_unmodified(left_rewritten, right_rewritten);
7687 }
7688
7689 let left_merged = wrap_with_filter(left_rewritten, &left_extra_in);
7690 let right_merged = wrap_with_filter(right_rewritten, &right_extra_in);
7691 rebuild_unmodified(left_merged, right_merged)
7692 }
7693 LogicalPlan::Filter {
7695 input,
7696 predicate,
7697 optional_variables,
7698 } => LogicalPlan::Filter {
7699 input: Box::new(merge_unwind_in_filters(input, params)),
7700 predicate: predicate.clone(),
7701 optional_variables: optional_variables.clone(),
7702 },
7703 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
7705 input: Box::new(merge_unwind_in_filters(input, params)),
7706 projections: projections.clone(),
7707 },
7708 LogicalPlan::Sort { input, order_by } => LogicalPlan::Sort {
7709 input: Box::new(merge_unwind_in_filters(input, params)),
7710 order_by: order_by.clone(),
7711 },
7712 LogicalPlan::Limit { input, skip, fetch } => LogicalPlan::Limit {
7713 input: Box::new(merge_unwind_in_filters(input, params)),
7714 skip: *skip,
7715 fetch: *fetch,
7716 },
7717 LogicalPlan::Distinct { input } => LogicalPlan::Distinct {
7718 input: Box::new(merge_unwind_in_filters(input, params)),
7719 },
7720 LogicalPlan::Unwind {
7721 input,
7722 expr,
7723 variable,
7724 } => LogicalPlan::Unwind {
7725 input: Box::new(merge_unwind_in_filters(input, params)),
7726 expr: expr.clone(),
7727 variable: variable.clone(),
7728 },
7729 LogicalPlan::Set { input, items } => LogicalPlan::Set {
7734 input: Box::new(merge_unwind_in_filters(input, params)),
7735 items: items.clone(),
7736 },
7737 LogicalPlan::Remove { input, items } => LogicalPlan::Remove {
7738 input: Box::new(merge_unwind_in_filters(input, params)),
7739 items: items.clone(),
7740 },
7741 LogicalPlan::Delete {
7742 input,
7743 items,
7744 detach,
7745 } => LogicalPlan::Delete {
7746 input: Box::new(merge_unwind_in_filters(input, params)),
7747 items: items.clone(),
7748 detach: *detach,
7749 },
7750 LogicalPlan::Create { input, pattern } => LogicalPlan::Create {
7751 input: Box::new(merge_unwind_in_filters(input, params)),
7752 pattern: pattern.clone(),
7753 },
7754 LogicalPlan::CreateBatch { input, patterns } => LogicalPlan::CreateBatch {
7755 input: Box::new(merge_unwind_in_filters(input, params)),
7756 patterns: patterns.clone(),
7757 },
7758 LogicalPlan::Merge {
7759 input,
7760 pattern,
7761 on_match,
7762 on_create,
7763 } => LogicalPlan::Merge {
7764 input: Box::new(merge_unwind_in_filters(input, params)),
7765 pattern: pattern.clone(),
7766 on_match: on_match.clone(),
7767 on_create: on_create.clone(),
7768 },
7769 LogicalPlan::Foreach {
7770 input,
7771 variable,
7772 list,
7773 body,
7774 } => LogicalPlan::Foreach {
7775 input: Box::new(merge_unwind_in_filters(input, params)),
7776 variable: variable.clone(),
7777 list: list.clone(),
7778 body: body
7779 .iter()
7780 .map(|b| merge_unwind_in_filters(b, params))
7781 .collect(),
7782 },
7783 LogicalPlan::Aggregate {
7785 input,
7786 group_by,
7787 aggregates,
7788 } => LogicalPlan::Aggregate {
7789 input: Box::new(merge_unwind_in_filters(input, params)),
7790 group_by: group_by.clone(),
7791 aggregates: aggregates.clone(),
7792 },
7793 LogicalPlan::Window {
7794 input,
7795 window_exprs,
7796 } => LogicalPlan::Window {
7797 input: Box::new(merge_unwind_in_filters(input, params)),
7798 window_exprs: window_exprs.clone(),
7799 },
7800 LogicalPlan::SubqueryCall { input, subquery } => LogicalPlan::SubqueryCall {
7801 input: Box::new(merge_unwind_in_filters(input, params)),
7802 subquery: Box::new(merge_unwind_in_filters(subquery, params)),
7803 },
7804 LogicalPlan::CrossJoin { left, right } => LogicalPlan::CrossJoin {
7806 left: Box::new(merge_unwind_in_filters(left, params)),
7807 right: Box::new(merge_unwind_in_filters(right, params)),
7808 },
7809 LogicalPlan::Union { left, right, all } => LogicalPlan::Union {
7810 left: Box::new(merge_unwind_in_filters(left, params)),
7811 right: Box::new(merge_unwind_in_filters(right, params)),
7812 all: *all,
7813 },
7814 LogicalPlan::Apply {
7816 input,
7817 subquery,
7818 input_filter,
7819 } => LogicalPlan::Apply {
7820 input: Box::new(merge_unwind_in_filters(input, params)),
7821 subquery: Box::new(merge_unwind_in_filters(subquery, params)),
7822 input_filter: input_filter.clone(),
7823 },
7824 _ => plan.clone(),
7831 }
7832}
7833
7834fn is_hashable_native_dtype(dt: &DataType) -> bool {
7838 matches!(
7839 dt,
7840 DataType::Boolean
7841 | DataType::Int8
7842 | DataType::Int16
7843 | DataType::Int32
7844 | DataType::Int64
7845 | DataType::UInt8
7846 | DataType::UInt16
7847 | DataType::UInt32
7848 | DataType::UInt64
7849 | DataType::Float32
7850 | DataType::Float64
7851 | DataType::Utf8
7852 | DataType::LargeUtf8
7853 | DataType::Binary
7854 | DataType::LargeBinary
7855 | DataType::Date32
7856 | DataType::Date64
7857 )
7858}
7859
7860fn tointeger_accepts_dtype(dt: &DataType) -> bool {
7863 matches!(
7864 dt,
7865 DataType::Int8
7866 | DataType::Int16
7867 | DataType::Int32
7868 | DataType::Int64
7869 | DataType::UInt8
7870 | DataType::UInt16
7871 | DataType::UInt32
7872 | DataType::UInt64
7873 | DataType::Float32
7874 | DataType::Float64
7875 | DataType::LargeBinary
7876 )
7877}
7878
7879fn wrap_with_unary_udf(
7881 expr: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7882 udf: Arc<datafusion::logical_expr::ScalarUDF>,
7883 return_dt: DataType,
7884) -> Arc<dyn datafusion::physical_plan::PhysicalExpr> {
7885 let config_options = Arc::new(datafusion::config::ConfigOptions::default());
7886 let udf_name = udf.name().to_string();
7887 let return_field = Arc::new(arrow_schema::Field::new(&udf_name, return_dt, true));
7888 Arc::new(datafusion::physical_expr::ScalarFunctionExpr::new(
7889 &udf_name,
7890 udf,
7891 vec![expr],
7892 return_field,
7893 config_options,
7894 ))
7895}
7896
7897fn unify_join_key_types(
7912 left: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7913 right: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7914 left_schema: &Schema,
7915 right_schema: &Schema,
7916 state: &SessionState,
7917) -> Option<(
7918 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7919 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7920)> {
7921 let l_dt = left.data_type(left_schema).ok()?;
7922 let r_dt = right.data_type(right_schema).ok()?;
7923
7924 if l_dt == r_dt && is_hashable_native_dtype(&l_dt) {
7925 return Some((left, right));
7926 }
7927
7928 if tointeger_accepts_dtype(&l_dt) && tointeger_accepts_dtype(&r_dt) {
7929 let udf = state.scalar_functions().get("tointeger")?.clone();
7930 return Some((
7931 wrap_with_unary_udf(left, udf.clone(), DataType::Int64),
7932 wrap_with_unary_udf(right, udf, DataType::Int64),
7933 ));
7934 }
7935
7936 None
7940}
7941
7942#[cfg(test)]
7943mod tests {
7944 use super::*;
7945
7946 #[test]
7947 fn test_convert_direction() {
7948 assert!(matches!(
7949 convert_direction(AstDirection::Outgoing),
7950 Direction::Outgoing
7951 ));
7952 assert!(matches!(
7953 convert_direction(AstDirection::Incoming),
7954 Direction::Incoming
7955 ));
7956 assert!(matches!(
7957 convert_direction(AstDirection::Both),
7958 Direction::Both
7959 ));
7960 }
7961
7962 #[test]
7963 fn test_sanitize_vlp_target_properties_removes_wildcard() {
7964 let props = vec!["*".to_string(), "name".to_string()];
7965 let label_props = HashSet::from(["name".to_string()]);
7966 let sanitized = sanitize_vlp_target_properties(props, true, Some(&label_props));
7967
7968 assert_eq!(sanitized, vec!["name".to_string()]);
7969 }
7970
7971 #[test]
7972 fn test_sanitize_vlp_target_properties_adds_all_props_for_wildcard_empty() {
7973 let props = vec!["*".to_string()];
7974 let sanitized = sanitize_vlp_target_properties(props, true, None);
7975
7976 assert_eq!(sanitized, vec!["_all_props".to_string()]);
7977 }
7978
7979 #[test]
7980 fn test_sanitize_vlp_target_properties_adds_all_props_for_non_schema() {
7981 let props = vec!["custom_prop".to_string()];
7982 let label_props = HashSet::from(["name".to_string()]);
7983 let sanitized = sanitize_vlp_target_properties(props, false, Some(&label_props));
7984
7985 assert_eq!(
7986 sanitized,
7987 vec!["custom_prop".to_string(), "_all_props".to_string()]
7988 );
7989 }
7990
7991 use uni_cypher::ast::CypherLiteral;
8003
8004 fn int_lit(n: i64) -> Expr {
8005 Expr::Literal(CypherLiteral::Integer(n))
8006 }
8007
8008 fn str_lit(s: &str) -> Expr {
8009 Expr::Literal(CypherLiteral::String(s.to_string()))
8010 }
8011
8012 fn map_entry(k: &str, v: Expr) -> (String, Expr) {
8013 (k.to_string(), v)
8014 }
8015
8016 #[test]
8017 fn materialize_unwind_field_accepts_inlined_map_literals() {
8018 let unwind_expr = Expr::List(vec![
8020 Expr::Map(vec![
8021 map_entry("nid", int_lit(64)),
8022 map_entry("x", int_lit(1)),
8023 ]),
8024 Expr::Map(vec![
8025 map_entry("nid", int_lit(65)),
8026 map_entry("x", int_lit(2)),
8027 ]),
8028 ]);
8029 let params = HashMap::new();
8030 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8031 let values = result.expect("literal-map UNWIND should produce an IN-list");
8032 assert_eq!(values.len(), 2);
8033 assert!(matches!(
8034 &values[0],
8035 Expr::Literal(CypherLiteral::Integer(64))
8036 ));
8037 assert!(matches!(
8038 &values[1],
8039 Expr::Literal(CypherLiteral::Integer(65))
8040 ));
8041 }
8042
8043 #[test]
8044 fn materialize_unwind_field_handles_mixed_primitive_field_types() {
8045 let unwind_expr = Expr::List(vec![
8048 Expr::Map(vec![map_entry("k", str_lit("a"))]),
8049 Expr::Map(vec![map_entry("k", str_lit("b"))]),
8050 ]);
8051 let params = HashMap::new();
8052 let values = materialize_unwind_source_field(&unwind_expr, ¶ms, "k")
8053 .expect("literal-map UNWIND should produce an IN-list");
8054 assert_eq!(values.len(), 2);
8055 }
8056
8057 #[test]
8058 fn materialize_unwind_field_rejects_non_literal_value_at_target_field() {
8059 let unwind_expr = Expr::List(vec![Expr::Map(vec![map_entry(
8063 "nid",
8064 Expr::Parameter("p".to_string()),
8065 )])]);
8066 let params = HashMap::new();
8067 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8068 assert!(result.is_none(), "non-literal value at field should bail");
8069 }
8070
8071 #[test]
8072 fn materialize_unwind_field_rejects_when_target_field_missing() {
8073 let unwind_expr = Expr::List(vec![Expr::Map(vec![map_entry("other", int_lit(64))])]);
8075 let params = HashMap::new();
8076 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8077 assert!(
8078 result.is_none(),
8079 "map missing the requested field should bail"
8080 );
8081 }
8082
8083 #[test]
8084 fn materialize_unwind_field_rejects_non_map_list_item() {
8085 let unwind_expr = Expr::List(vec![int_lit(64), int_lit(65)]);
8088 let params = HashMap::new();
8089 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8090 assert!(
8091 result.is_none(),
8092 "non-map list items can't be field-projected"
8093 );
8094 }
8095
8096 #[test]
8097 fn materialize_unwind_field_rejects_oversized_list() {
8098 let oversized = MAX_UNWIND_IN_PUSHDOWN_VALUES + 1;
8100 let items: Vec<Expr> = (0..oversized)
8101 .map(|i| Expr::Map(vec![map_entry("nid", int_lit(i as i64))]))
8102 .collect();
8103 let unwind_expr = Expr::List(items);
8104 let params = HashMap::new();
8105 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8106 assert!(result.is_none(), "oversized list should bail");
8107 }
8108
8109 #[test]
8110 fn materialize_unwind_field_param_form_still_works() {
8111 let mut params = HashMap::new();
8114 params.insert(
8115 "updates".to_string(),
8116 uni_common::Value::List(vec![
8117 uni_common::Value::Map({
8118 let mut m = HashMap::new();
8119 m.insert("nid".to_string(), uni_common::Value::Int(64));
8120 m
8121 }),
8122 uni_common::Value::Map({
8123 let mut m = HashMap::new();
8124 m.insert("nid".to_string(), uni_common::Value::Int(65));
8125 m
8126 }),
8127 ]),
8128 );
8129 let unwind_expr = Expr::Parameter("updates".to_string());
8130 let values = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid")
8131 .expect("parameter form should produce IN-list");
8132 assert_eq!(values.len(), 2);
8133 }
8134
8135 fn make_filter_crossjoin_scan(
8147 unwind_source: Expr,
8148 unwind_var: &str,
8149 scan_label_id: u16,
8150 scan_label: &str,
8151 scan_var: &str,
8152 predicate: Expr,
8153 ) -> LogicalPlan {
8154 let unwind = LogicalPlan::Unwind {
8155 input: Box::new(LogicalPlan::Project {
8156 input: Box::new(LogicalPlan::Scan {
8157 label_id: scan_label_id,
8158 labels: vec![scan_label.to_string()],
8159 variable: "__dummy__".to_string(),
8160 filter: None,
8161 optional: false,
8162 }),
8163 projections: vec![],
8164 }),
8165 expr: unwind_source,
8166 variable: unwind_var.to_string(),
8167 };
8168 let scan = LogicalPlan::Scan {
8169 label_id: scan_label_id,
8170 labels: vec![scan_label.to_string()],
8171 variable: scan_var.to_string(),
8172 filter: None,
8173 optional: false,
8174 };
8175 LogicalPlan::Filter {
8176 input: Box::new(LogicalPlan::CrossJoin {
8177 left: Box::new(unwind),
8178 right: Box::new(scan),
8179 }),
8180 predicate,
8181 optional_variables: HashSet::new(),
8182 }
8183 }
8184
8185 fn eq_property_predicate(scan_var: &str, prop: &str, unwind_var: &str) -> Expr {
8188 Expr::BinaryOp {
8189 left: Box::new(Expr::Property(
8190 Box::new(Expr::Variable(scan_var.to_string())),
8191 prop.to_string(),
8192 )),
8193 op: uni_cypher::ast::BinaryOp::Eq,
8194 right: Box::new(Expr::Variable(unwind_var.to_string())),
8195 }
8196 }
8197
8198 fn assert_scan_filter_is_in_list(plan: &LogicalPlan, expected_label: &str) {
8199 let LogicalPlan::Filter { input, .. } = plan else {
8202 panic!("expected top-level Filter, got {plan:?}");
8203 };
8204 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8205 panic!("expected CrossJoin under Filter, got {input:?}");
8206 };
8207 let LogicalPlan::Scan { labels, filter, .. } = right.as_ref() else {
8208 panic!("expected Scan as right subtree, got {right:?}");
8209 };
8210 assert_eq!(labels, &vec![expected_label.to_string()]);
8211 let filter_expr = filter
8212 .as_ref()
8213 .expect("Scan.filter must be Some after pass");
8214 assert!(
8215 matches!(filter_expr, Expr::In { .. }),
8216 "Scan.filter should be Expr::In, got {filter_expr:?}"
8217 );
8218 }
8219
8220 #[test]
8221 fn merge_pass_pushes_in_list_into_scan_filter() {
8222 let unwind_source = Expr::List(vec![str_lit("a"), str_lit("b")]);
8224 let plan = make_filter_crossjoin_scan(
8225 unwind_source,
8226 "u",
8227 1,
8228 "Item",
8229 "n",
8230 eq_property_predicate("n", "name", "u"),
8231 );
8232 let params = HashMap::new();
8233 let rewritten = merge_unwind_in_filters(&plan, ¶ms);
8234 assert_scan_filter_is_in_list(&rewritten, "Item");
8235 }
8236
8237 #[test]
8238 fn merge_pass_idempotent() {
8239 let unwind_source = Expr::List(vec![str_lit("a"), str_lit("b")]);
8243 let plan = make_filter_crossjoin_scan(
8244 unwind_source,
8245 "u",
8246 1,
8247 "Item",
8248 "n",
8249 eq_property_predicate("n", "name", "u"),
8250 );
8251 let params = HashMap::new();
8252 let pass1 = merge_unwind_in_filters(&plan, ¶ms);
8253 let pass2 = merge_unwind_in_filters(&pass1, ¶ms);
8254
8255 let LogicalPlan::Filter { input, .. } = &pass2 else {
8260 panic!("expected Filter");
8261 };
8262 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8263 panic!("expected CrossJoin");
8264 };
8265 let LogicalPlan::Scan { filter, .. } = right.as_ref() else {
8266 panic!("expected Scan");
8267 };
8268 let filter_expr = filter.as_ref().expect("Scan.filter must be Some");
8269 assert!(
8270 matches!(filter_expr, Expr::In { .. }),
8271 "After 2 passes the filter should still be a single Expr::In, \
8272 not ANDed with a duplicate; got {filter_expr:?}"
8273 );
8274 }
8275
8276 #[test]
8277 fn merge_pass_leaves_non_pushable_predicates_alone() {
8278 let unwind_source = Expr::List(vec![str_lit("a")]);
8282 let starts_with = Expr::BinaryOp {
8283 left: Box::new(Expr::Property(
8284 Box::new(Expr::Variable("n".to_string())),
8285 "name".to_string(),
8286 )),
8287 op: uni_cypher::ast::BinaryOp::StartsWith,
8288 right: Box::new(str_lit("x")),
8289 };
8290 let plan = make_filter_crossjoin_scan(unwind_source, "u", 1, "Item", "n", starts_with);
8291 let params = HashMap::new();
8292 let rewritten = merge_unwind_in_filters(&plan, ¶ms);
8293
8294 let LogicalPlan::Filter { input, .. } = &rewritten else {
8297 panic!("expected Filter");
8298 };
8299 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8300 panic!("expected CrossJoin");
8301 };
8302 let LogicalPlan::Scan { filter, .. } = right.as_ref() else {
8303 panic!("expected Scan");
8304 };
8305 assert!(
8306 filter.is_none(),
8307 "no equi-pair → no pushdown; Scan.filter should remain None, got {filter:?}"
8308 );
8309 }
8310
8311 #[test]
8312 fn merge_pass_handles_nested_crossjoin() {
8313 let unwind_source = Expr::List(vec![str_lit("a")]);
8325 let unwind = LogicalPlan::Unwind {
8326 input: Box::new(LogicalPlan::Project {
8327 input: Box::new(LogicalPlan::Scan {
8328 label_id: 0,
8329 labels: vec!["__".to_string()],
8330 variable: "__".to_string(),
8331 filter: None,
8332 optional: false,
8333 }),
8334 projections: vec![],
8335 }),
8336 expr: unwind_source,
8337 variable: "u".to_string(),
8338 };
8339 let inner_cross = LogicalPlan::CrossJoin {
8340 left: Box::new(LogicalPlan::Scan {
8341 label_id: 1,
8342 labels: vec!["Item".to_string()],
8343 variable: "n".to_string(),
8344 filter: None,
8345 optional: false,
8346 }),
8347 right: Box::new(LogicalPlan::Scan {
8348 label_id: 2,
8349 labels: vec!["Other".to_string()],
8350 variable: "m".to_string(),
8351 filter: None,
8352 optional: false,
8353 }),
8354 };
8355 let plan = LogicalPlan::Filter {
8356 input: Box::new(LogicalPlan::CrossJoin {
8357 left: Box::new(unwind),
8358 right: Box::new(inner_cross),
8359 }),
8360 predicate: eq_property_predicate("n", "name", "u"),
8361 optional_variables: HashSet::new(),
8362 };
8363 let params = HashMap::new();
8364 let rewritten = merge_unwind_in_filters(&plan, ¶ms);
8365
8366 let LogicalPlan::Filter { input, .. } = &rewritten else {
8373 panic!("expected outer Filter");
8374 };
8375 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8376 panic!("expected outer CrossJoin");
8377 };
8378 match right.as_ref() {
8382 LogicalPlan::Filter { predicate, .. } => {
8383 assert!(
8384 matches!(predicate, Expr::In { .. }),
8385 "expected Expr::In wrapping inner CrossJoin, got {predicate:?}"
8386 );
8387 }
8388 other => panic!(
8389 "expected Filter wrapping inner CrossJoin, got {other:?}. \
8390 This is acceptable behaviour — the IN-list is preserved \
8391 above the inner join — but the test should be updated if \
8392 wrap_with_filter changes to descend through CrossJoins."
8393 ),
8394 }
8395 }
8396}