1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
36use crate::query::df_graph::ReadSetRecordingExec;
37use crate::query::df_graph::bind_fixed_path::BindFixedPathExec;
38use crate::query::df_graph::bind_zero_length_path::BindZeroLengthPathExec;
39use crate::query::df_graph::mutation_common::{
40 MutationKind, extended_schema_for_new_vars, new_create_exec, new_merge_exec,
41};
42use crate::query::df_graph::mutation_delete::new_delete_exec;
43use crate::query::df_graph::mutation_remove::new_remove_exec;
44use crate::query::df_graph::mutation_set::new_set_exec;
45use crate::query::df_graph::recursive_cte::RecursiveCTEExec;
46use crate::query::df_graph::traverse::{
47 GraphVariableLengthTraverseExec, GraphVariableLengthTraverseMainExec,
48};
49use crate::query::df_graph::{
50 GraphApplyExec, GraphExecutionContext, GraphExtIdLookupExec, GraphProcedureCallExec,
51 GraphScanExec, GraphShortestPathExec, GraphTraverseExec, GraphTraverseMainExec,
52 GraphUnwindExec, GraphVectorKnnExec, L0Context, MutationContext, MutationExec,
53 OptionalFilterExec,
54};
55use crate::query::planner::{
56 LogicalPlan, STRUCT_ONLY_SENTINEL, aggregate_column_name, collect_properties_from_plan,
57};
58use anyhow::{Result, anyhow};
59use arrow_schema::{DataType, Schema, SchemaRef};
60use datafusion::common::JoinType;
61use datafusion::execution::SessionState;
62use datafusion::logical_expr::{Expr as DfExpr, ExprSchemable, SortExpr as DfSortExpr};
63use datafusion::physical_expr::{create_physical_expr, create_physical_sort_exprs};
64use datafusion::physical_plan::ExecutionPlan;
65use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
66use datafusion::physical_plan::filter::FilterExec;
67use datafusion::physical_plan::joins::NestedLoopJoinExec;
68use datafusion::physical_plan::limit::LocalLimitExec;
69use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
70use datafusion::physical_plan::projection::ProjectionExec;
71use datafusion::physical_plan::sorts::sort::SortExec;
72use datafusion::physical_plan::udaf::AggregateFunctionExpr;
73use datafusion::physical_plan::union::UnionExec;
74use datafusion::prelude::SessionContext;
75use parking_lot::RwLock;
76use std::collections::{HashMap, HashSet};
77use std::sync::Arc;
78use std::sync::atomic::{AtomicU64, Ordering};
79use uni_algo::algo::AlgorithmRegistry;
80use uni_common::core::schema::{PropertyMeta, Schema as UniSchema};
81use uni_cypher::ast::{
82 CypherLiteral, Direction as AstDirection, Expr, Pattern, PatternElement, SortItem,
83};
84use uni_store::runtime::l0::L0Buffer;
85use uni_store::runtime::property_manager::PropertyManager;
86use uni_store::storage::direction::Direction;
87use uni_store::storage::manager::StorageManager;
88use uni_xervo::runtime::ModelRuntime;
89
90type PhysicalAggregate = (
92 Arc<AggregateFunctionExpr>,
93 Option<Arc<dyn datafusion::physical_expr::PhysicalExpr>>,
94);
95
96pub struct HybridPhysicalPlanner {
116 session_ctx: Arc<RwLock<SessionContext>>,
118
119 storage: Arc<StorageManager>,
121
122 graph_ctx: Arc<GraphExecutionContext>,
124
125 schema: Arc<UniSchema>,
127
128 last_flush_version: AtomicU64,
130
131 params: HashMap<String, uni_common::Value>,
133
134 outer_values: HashMap<String, uni_common::Value>,
138
139 mutation_ctx: Option<Arc<MutationContext>>,
142
143 outer_entity_vars: HashSet<String>,
147
148 plugin_registry: Arc<uni_plugin::PluginRegistry>,
153}
154
155impl std::fmt::Debug for HybridPhysicalPlanner {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("HybridPhysicalPlanner")
158 .field(
159 "last_flush_version",
160 &self.last_flush_version.load(Ordering::Relaxed),
161 )
162 .finish_non_exhaustive()
163 }
164}
165
166impl HybridPhysicalPlanner {
167 pub fn new(
177 session_ctx: Arc<RwLock<SessionContext>>,
178 storage: Arc<StorageManager>,
179 l0: Arc<RwLock<L0Buffer>>,
180 property_manager: Arc<PropertyManager>,
181 schema: Arc<UniSchema>,
182 params: HashMap<String, uni_common::Value>,
183 ) -> Self {
184 let graph_ctx = Arc::new(GraphExecutionContext::new(
185 storage.clone(),
186 l0,
187 property_manager,
188 ));
189
190 Self {
191 session_ctx,
192 storage,
193 graph_ctx,
194 schema,
195 last_flush_version: AtomicU64::new(0),
196 params,
197 outer_values: HashMap::new(),
198 mutation_ctx: None,
199 outer_entity_vars: HashSet::new(),
200 plugin_registry: super::df_graph::locy_fold::default_locy_plugin_registry(),
201 }
202 }
203
204 #[must_use]
211 pub fn with_plugin_registry(
212 mut self,
213 plugin_registry: Arc<uni_plugin::PluginRegistry>,
214 ) -> Self {
215 let mut ctx = self.take_graph_ctx();
220 ctx = ctx.with_plugin_registry(Arc::clone(&plugin_registry));
221 self.graph_ctx = Arc::new(ctx);
222 self.plugin_registry = plugin_registry;
223 self
224 }
225
226 fn resolve_properties(
232 &self,
233 variable: &str,
234 schema_name: &str,
235 all_properties: &HashMap<String, HashSet<String>>,
236 ) -> Vec<String> {
237 const SYSTEM_COLUMNS: &[&str] =
239 &["_vid", "_labels", "_eid", "_src_vid", "_dst_vid", "_type"];
240
241 all_properties
242 .get(variable)
243 .map(|props| {
244 if props.contains("*") {
245 let schema_props: Vec<String> = self
246 .schema
247 .properties
248 .get(schema_name)
249 .map(|p| p.keys().cloned().collect())
250 .unwrap_or_default();
251
252 let explicit: Vec<String> = props
258 .iter()
259 .filter(|p| {
260 *p != "*"
261 && *p != STRUCT_ONLY_SENTINEL
262 && (!p.starts_with('_')
263 || matches!(p.as_str(), "_created_at" | "_updated_at"))
264 })
265 .cloned()
266 .collect();
267
268 if schema_props.is_empty() && explicit.is_empty() {
269 return vec!["*".to_string()];
271 }
272
273 let mut combined: Vec<String> = schema_props;
275 for p in explicit {
276 if !combined.contains(&p) {
277 combined.push(p);
278 }
279 }
280 combined.retain(|p| !SYSTEM_COLUMNS.contains(&p.as_str()));
281 combined.sort();
282 combined
283 } else {
284 let mut explicit_props: Vec<String> = props
290 .iter()
291 .filter(|p| {
292 *p != "*"
293 && *p != STRUCT_ONLY_SENTINEL
294 && !SYSTEM_COLUMNS.contains(&p.as_str())
295 })
296 .cloned()
297 .collect();
298 explicit_props.sort();
299 explicit_props
300 }
301 })
302 .unwrap_or_default()
303 }
304
305 pub fn with_l0_context(
307 session_ctx: Arc<RwLock<SessionContext>>,
308 storage: Arc<StorageManager>,
309 l0_context: L0Context,
310 property_manager: Arc<PropertyManager>,
311 schema: Arc<UniSchema>,
312 params: HashMap<String, uni_common::Value>,
313 outer_values: HashMap<String, uni_common::Value>,
314 ) -> Self {
315 let graph_ctx = Arc::new(GraphExecutionContext::with_l0_context(
316 storage.clone(),
317 l0_context,
318 property_manager,
319 ));
320
321 Self {
322 session_ctx,
323 storage,
324 graph_ctx,
325 schema,
326 last_flush_version: AtomicU64::new(0),
327 params,
328 outer_values,
329 mutation_ctx: None,
330 outer_entity_vars: HashSet::new(),
331 plugin_registry: super::df_graph::locy_fold::default_locy_plugin_registry(),
332 }
333 }
334
335 fn take_graph_ctx(&mut self) -> GraphExecutionContext {
339 let algo_registry = self.graph_ctx.algo_registry().cloned();
340 let procedure_registry = self.graph_ctx.procedure_registry().cloned();
341 let xervo_runtime = self.graph_ctx.xervo_runtime().cloned();
342 let plugin_registry = self.graph_ctx.plugin_registry().cloned();
343 let writer = self.graph_ctx.writer().cloned();
344
345 let new_base = |ctx: &Arc<GraphExecutionContext>| {
346 GraphExecutionContext::with_l0_context(
347 ctx.storage().clone(),
348 ctx.l0_context().clone(),
349 ctx.property_manager().clone(),
350 )
351 };
352 let placeholder = Arc::new(new_base(&self.graph_ctx));
353 let arc = std::mem::replace(&mut self.graph_ctx, placeholder);
354 let mut ctx = Arc::try_unwrap(arc).unwrap_or_else(|arc| new_base(&arc));
355
356 if let Some(registry) = algo_registry {
357 ctx = ctx.with_algo_registry(registry);
358 }
359 if let Some(registry) = procedure_registry {
360 ctx = ctx.with_procedure_registry(registry);
361 }
362 if let Some(runtime) = xervo_runtime {
363 ctx = ctx.with_xervo_runtime(runtime);
364 }
365 if let Some(registry) = plugin_registry {
366 ctx = ctx.with_plugin_registry(registry);
367 }
368 if let Some(w) = writer {
369 ctx = ctx.with_writer(w);
370 }
371 ctx
372 }
373
374 #[must_use]
379 pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
380 let ctx = self.take_graph_ctx().with_writer(writer);
381 self.graph_ctx = Arc::new(ctx);
382 self
383 }
384
385 pub fn set_outer_entity_vars(&mut self, vars: HashSet<String>) {
390 self.outer_entity_vars = vars;
391 }
392
393 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
394 let ctx = self.take_graph_ctx().with_algo_registry(registry);
395 self.graph_ctx = Arc::new(ctx);
396 self
397 }
398
399 pub fn with_procedure_registry(
403 mut self,
404 registry: Arc<crate::query::executor::procedure::ProcedureRegistry>,
405 ) -> Self {
406 let ctx = self.take_graph_ctx().with_procedure_registry(registry);
407 self.graph_ctx = Arc::new(ctx);
408 self
409 }
410
411 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
413 let ctx = self.take_graph_ctx().with_xervo_runtime(runtime);
414 self.graph_ctx = Arc::new(ctx);
415 self
416 }
417
418 pub fn with_mutation_context(mut self, ctx: Arc<MutationContext>) -> Self {
420 self.mutation_ctx = Some(ctx);
421 self
422 }
423
424 pub fn graph_ctx(&self) -> &Arc<GraphExecutionContext> {
426 &self.graph_ctx
427 }
428
429 pub fn session_ctx(&self) -> &Arc<RwLock<SessionContext>> {
431 &self.session_ctx
432 }
433
434 pub fn storage(&self) -> &Arc<StorageManager> {
436 &self.storage
437 }
438
439 pub fn schema_info(&self) -> &Arc<UniSchema> {
441 &self.schema
442 }
443
444 fn require_mutation_ctx(&self) -> Result<Arc<MutationContext>> {
446 self.mutation_ctx.clone().ok_or_else(|| {
447 tracing::error!(
448 "Mutation context not set — this indicates a routing bug where a write \
449 operation was sent to the DataFusion engine without a MutationContext"
450 );
451 anyhow!("Mutation context not set — write operations require a MutationContext")
452 })
453 }
454
455 fn translation_context_for_plan(&self, plan: &LogicalPlan) -> TranslationContext {
460 let mut variable_kinds = HashMap::new();
461 let mut variable_labels = HashMap::new();
462 let mut node_variable_hints = Vec::new();
463 let mut mutation_edge_hints = Vec::new();
464 collect_variable_kinds(plan, &mut variable_kinds);
465 collect_mutation_node_hints(plan, &mut node_variable_hints);
466 collect_mutation_edge_hints(plan, &mut mutation_edge_hints);
467 self.collect_variable_labels(plan, &mut variable_labels);
468 TranslationContext {
469 parameters: self.params.clone(),
470 outer_values: self.outer_values.clone(),
471 variable_labels,
472 variable_kinds,
473 node_variable_hints,
474 mutation_edge_hints,
475 ..Default::default()
476 }
477 }
478
479 fn collect_variable_labels(&self, plan: &LogicalPlan, labels: &mut HashMap<String, String>) {
485 match plan {
486 LogicalPlan::Scan {
487 variable,
488 labels: scan_labels,
489 ..
490 }
491 | LogicalPlan::ScanMainByLabels {
492 variable,
493 labels: scan_labels,
494 ..
495 } => {
496 if let Some(first) = scan_labels.first() {
497 labels.insert(variable.clone(), first.clone());
498 }
499 }
500 LogicalPlan::Traverse {
501 input,
502 step_variable,
503 edge_type_ids,
504 target_variable,
505 target_label_id,
506 ..
507 } => {
508 self.collect_variable_labels(input, labels);
509 if let Some(sv) = step_variable
510 && edge_type_ids.len() == 1
511 && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
512 {
513 labels.insert(sv.clone(), name.to_string());
514 }
515 if *target_label_id != 0
516 && let Some(name) = self.schema.label_name_by_id(*target_label_id)
517 {
518 labels.insert(target_variable.clone(), name.to_string());
519 }
520 }
521 LogicalPlan::TraverseMainByType {
522 input,
523 step_variable,
524 type_names,
525 ..
526 } => {
527 self.collect_variable_labels(input, labels);
528 if let Some(sv) = step_variable
529 && type_names.len() == 1
530 {
531 labels.insert(sv.clone(), type_names[0].clone());
532 }
533 }
534 LogicalPlan::Filter { input, .. }
536 | LogicalPlan::Project { input, .. }
537 | LogicalPlan::Sort { input, .. }
538 | LogicalPlan::Limit { input, .. }
539 | LogicalPlan::Aggregate { input, .. }
540 | LogicalPlan::Distinct { input, .. }
541 | LogicalPlan::Window { input, .. }
542 | LogicalPlan::Unwind { input, .. }
543 | LogicalPlan::Create { input, .. }
544 | LogicalPlan::CreateBatch { input, .. }
545 | LogicalPlan::Merge { input, .. }
546 | LogicalPlan::Set { input, .. }
547 | LogicalPlan::Remove { input, .. }
548 | LogicalPlan::Delete { input, .. }
549 | LogicalPlan::Foreach { input, .. }
550 | LogicalPlan::SubqueryCall { input, .. } => {
551 self.collect_variable_labels(input, labels);
552 }
553 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
554 self.collect_variable_labels(left, labels);
555 self.collect_variable_labels(right, labels);
556 }
557 LogicalPlan::Apply {
558 input, subquery, ..
559 } => {
560 self.collect_variable_labels(input, labels);
561 self.collect_variable_labels(subquery, labels);
562 }
563 LogicalPlan::Explain { plan } => {
564 self.collect_variable_labels(plan, labels);
565 }
566 _ => {}
567 }
568 }
569
570 fn merged_edge_type_properties(&self, edge_type_ids: &[u32]) -> HashMap<String, PropertyMeta> {
571 crate::query::df_graph::common::merged_edge_schema_props(&self.schema, edge_type_ids)
572 }
573
574 pub fn plan(&self, logical: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
588 let logical_rewritten = merge_unwind_in_filters(logical, &self.params);
595
596 let all_properties = collect_properties_from_plan(&logical_rewritten);
598
599 self.plan_internal(&logical_rewritten, &all_properties)
601 }
602
603 pub fn plan_with_properties(
609 &self,
610 logical: &LogicalPlan,
611 extra_properties: HashMap<String, HashSet<String>>,
612 ) -> Result<Arc<dyn ExecutionPlan>> {
613 let logical_rewritten = merge_unwind_in_filters(logical, &self.params);
615 let mut all_properties = collect_properties_from_plan(&logical_rewritten);
616 for (var, props) in extra_properties {
617 all_properties.entry(var).or_default().extend(props);
618 }
619 self.plan_internal(&logical_rewritten, &all_properties)
620 }
621
622 fn wrap_optional(
629 &self,
630 plan: Arc<dyn ExecutionPlan>,
631 optional: bool,
632 ) -> Result<Arc<dyn ExecutionPlan>> {
633 if !optional {
634 return Ok(plan);
635 }
636
637 let empty_schema = Arc::new(Schema::empty());
639 let placeholder = Arc::new(PlaceholderRowExec::new(empty_schema));
640
641 Ok(Arc::new(NestedLoopJoinExec::try_new(
644 placeholder,
645 plan,
646 None, &JoinType::Left,
648 None, )?))
650 }
651
652 fn plan_internal(
653 &self,
654 logical: &LogicalPlan,
655 all_properties: &HashMap<String, HashSet<String>>,
656 ) -> Result<Arc<dyn ExecutionPlan>> {
657 match logical {
658 LogicalPlan::FusedIndexScanWrapped { inner, kind: _ } => {
666 self.plan_internal(inner, all_properties)
667 }
668 LogicalPlan::Scan {
669 label_id,
670 labels,
671 variable,
672 filter,
673 optional,
674 }
675 | LogicalPlan::FusedIndexScan {
682 label_id,
683 labels,
684 variable,
685 filter,
686 optional,
687 kind: _,
688 } => {
689 if labels.len() > 1 {
690 self.plan_multi_label_scan(
692 labels,
693 variable,
694 filter.as_ref(),
695 *optional,
696 all_properties,
697 )
698 } else {
699 self.plan_scan(
701 *label_id,
702 variable,
703 filter.as_ref(),
704 *optional,
705 all_properties,
706 )
707 }
708 }
709
710 LogicalPlan::ScanMainByLabels {
712 labels,
713 variable,
714 filter,
715 optional,
716 } => {
717 if labels.len() > 1 {
718 self.plan_multi_label_scan(
720 labels,
721 variable,
722 filter.as_ref(),
723 *optional,
724 all_properties,
725 )
726 } else if let Some(label_name) = labels.first() {
727 self.plan_schemaless_scan(
729 label_name,
730 variable,
731 filter.as_ref(),
732 *optional,
733 all_properties,
734 )
735 } else {
736 self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties)
738 }
739 }
740
741 LogicalPlan::ScanAll {
743 variable,
744 filter,
745 optional,
746 } => self.plan_scan_all(variable, filter.as_ref(), *optional, all_properties),
747
748 LogicalPlan::TraverseMainByType {
750 type_names,
751 input,
752 direction,
753 source_variable,
754 target_variable,
755 step_variable,
756 min_hops,
757 max_hops,
758 optional,
759 target_filter,
760 path_variable,
761 is_variable_length,
762 scope_match_variables,
763 optional_pattern_vars,
764 edge_filter_expr,
765 path_mode,
766 ..
767 } => {
768 if *is_variable_length {
769 let vlp_plan = self.plan_traverse_main_by_type_vlp(
770 input,
771 type_names,
772 direction.clone(),
773 source_variable,
774 target_variable,
775 step_variable.as_deref(),
776 *min_hops,
777 *max_hops,
778 path_variable.as_deref(),
779 *optional,
780 all_properties,
781 edge_filter_expr.as_ref(),
782 path_mode,
783 scope_match_variables,
784 )?;
785 self.apply_schemaless_traverse_filter(
786 vlp_plan,
787 target_filter.as_ref(),
788 source_variable,
789 target_variable,
790 step_variable.as_deref(),
791 path_variable.as_deref(),
792 true, *optional,
794 optional_pattern_vars,
795 )
796 } else {
797 let base_plan = self.plan_traverse_main_by_type(
798 input,
799 type_names,
800 direction.clone(),
801 source_variable,
802 target_variable,
803 step_variable.as_deref(),
804 *optional,
805 optional_pattern_vars,
806 all_properties,
807 scope_match_variables,
808 )?;
809 let edge_filtered = self.apply_schemaless_traverse_filter(
813 base_plan,
814 edge_filter_expr.as_ref(),
815 source_variable,
816 target_variable,
817 step_variable.as_deref(),
818 path_variable.as_deref(),
819 false,
820 *optional,
821 optional_pattern_vars,
822 )?;
823 self.apply_schemaless_traverse_filter(
824 edge_filtered,
825 target_filter.as_ref(),
826 source_variable,
827 target_variable,
828 step_variable.as_deref(),
829 path_variable.as_deref(),
830 false,
831 *optional,
832 optional_pattern_vars,
833 )
834 }
835 }
836
837 LogicalPlan::Traverse {
838 input,
839 edge_type_ids,
840 direction,
841 source_variable,
842 target_variable,
843 target_label_id,
844 step_variable,
845 min_hops,
846 max_hops,
847 optional,
848 target_filter,
849 path_variable,
850 is_variable_length,
851 optional_pattern_vars,
852 scope_match_variables,
853 edge_filter_expr,
854 path_mode,
855 qpp_steps,
856 ..
857 } => self.plan_traverse(
858 input,
859 edge_type_ids,
860 direction.clone(),
861 source_variable,
862 target_variable,
863 *target_label_id,
864 step_variable.as_deref(),
865 *min_hops,
866 *max_hops,
867 path_variable.as_deref(),
868 *optional,
869 target_filter.as_ref(),
870 *is_variable_length,
871 optional_pattern_vars,
872 all_properties,
873 scope_match_variables,
874 edge_filter_expr.as_ref(),
875 path_mode,
876 qpp_steps.as_deref(),
877 ),
878
879 LogicalPlan::ShortestPath {
880 input,
881 edge_type_ids,
882 direction,
883 source_variable,
884 target_variable,
885 target_label_id: _,
886 path_variable,
887 min_hops: _,
888 max_hops: _,
889 } => self.plan_shortest_path(
890 input,
891 edge_type_ids,
892 direction.clone(),
893 source_variable,
894 target_variable,
895 path_variable,
896 false,
897 all_properties,
898 ),
899
900 LogicalPlan::Filter {
902 input,
903 predicate,
904 optional_variables,
905 } => self.plan_filter(input, predicate, optional_variables, all_properties),
906
907 LogicalPlan::Project { input, projections } => {
908 let alias_map: HashMap<String, Expr> = projections
911 .iter()
912 .filter_map(|(expr, alias)| alias.as_ref().map(|a| (a.clone(), expr.clone())))
913 .collect();
914
915 self.plan_project_with_aliases(input, projections, all_properties, &alias_map)
917 }
918
919 LogicalPlan::Aggregate {
920 input,
921 group_by,
922 aggregates,
923 } => self.plan_aggregate(input, group_by, aggregates, all_properties),
924
925 LogicalPlan::Distinct { input } => {
926 let input_plan = self.plan_internal(input, all_properties)?;
927 let schema = input_plan.schema();
928 let group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
930 schema
931 .fields()
932 .iter()
933 .enumerate()
934 .map(|(i, f)| {
935 (
936 Arc::new(datafusion::physical_expr::expressions::Column::new(
937 f.name(),
938 i,
939 ))
940 as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
941 f.name().clone(),
942 )
943 })
944 .collect();
945 let group_by = PhysicalGroupBy::new_single(group_exprs);
946 Ok(Arc::new(AggregateExec::try_new(
947 AggregateMode::Single,
948 group_by,
949 vec![],
950 vec![],
951 input_plan.clone(),
952 input_plan.schema(),
953 )?))
954 }
955
956 LogicalPlan::Sort { input, order_by } => {
957 self.plan_sort(input, order_by, all_properties, &HashMap::new())
958 }
959
960 LogicalPlan::Limit { input, skip, fetch } => {
961 self.plan_limit(input, *skip, *fetch, all_properties)
962 }
963
964 LogicalPlan::Union { left, right, all } => {
965 self.plan_union(left, right, *all, all_properties)
966 }
967
968 LogicalPlan::Empty => self.plan_empty(),
969
970 LogicalPlan::BindZeroLengthPath {
971 input,
972 node_variable,
973 path_variable,
974 } => {
975 self.plan_bind_zero_length_path(input, node_variable, path_variable, all_properties)
976 }
977
978 LogicalPlan::BindPath {
979 input,
980 node_variables,
981 edge_variables,
982 path_variable,
983 } => self.plan_bind_path(
984 input,
985 node_variables,
986 edge_variables,
987 path_variable,
988 all_properties,
989 ),
990
991 LogicalPlan::Create { input, pattern } => {
993 tracing::debug!("Planning MutationCreateExec");
994 let child = self.plan_internal(input, all_properties)?;
995 let mutation_ctx = self.require_mutation_ctx()?;
996 Ok(Arc::new(new_create_exec(
997 child,
998 pattern.clone(),
999 mutation_ctx,
1000 )))
1001 }
1002 LogicalPlan::CreateBatch { input, patterns } => {
1003 tracing::debug!(
1004 patterns = patterns.len(),
1005 "Planning MutationCreateExec (batch)"
1006 );
1007 let child = self.plan_internal(input, all_properties)?;
1008 let mutation_ctx = self.require_mutation_ctx()?;
1009 let output_schema = extended_schema_for_new_vars(&child.schema(), patterns);
1012 Ok(Arc::new(MutationExec::new_with_schema(
1013 child,
1014 MutationKind::CreateBatch {
1015 patterns: patterns.clone(),
1016 },
1017 "MutationCreateExec",
1018 mutation_ctx,
1019 output_schema,
1020 )))
1021 }
1022 LogicalPlan::Set { input, items } => {
1023 tracing::debug!(items = items.len(), "Planning MutationSetExec");
1024 let child = self.plan_internal(input, all_properties)?;
1025 let mutation_ctx = self.require_mutation_ctx()?;
1026 Ok(Arc::new(new_set_exec(child, items.clone(), mutation_ctx)))
1027 }
1028 LogicalPlan::Remove { input, items } => {
1029 tracing::debug!(items = items.len(), "Planning MutationRemoveExec");
1030 let child = self.plan_internal(input, all_properties)?;
1031 let mutation_ctx = self.require_mutation_ctx()?;
1032 Ok(Arc::new(new_remove_exec(
1033 child,
1034 items.clone(),
1035 mutation_ctx,
1036 )))
1037 }
1038 LogicalPlan::Delete {
1039 input,
1040 items,
1041 detach,
1042 } => {
1043 tracing::debug!(
1044 items = items.len(),
1045 detach = detach,
1046 "Planning MutationDeleteExec"
1047 );
1048 let child = self.plan_internal(input, all_properties)?;
1049 let mutation_ctx = self.require_mutation_ctx()?;
1050 Ok(Arc::new(new_delete_exec(
1051 child,
1052 items.clone(),
1053 *detach,
1054 mutation_ctx,
1055 )))
1056 }
1057 LogicalPlan::Merge {
1058 input,
1059 pattern,
1060 on_match,
1061 on_create,
1062 } => {
1063 tracing::debug!("Planning MutationMergeExec");
1064 let child = self.plan_internal(input, all_properties)?;
1065 let mutation_ctx = self.require_mutation_ctx()?;
1066 Ok(Arc::new(new_merge_exec(
1067 child,
1068 pattern.clone(),
1069 on_match.clone(),
1070 on_create.clone(),
1071 mutation_ctx,
1072 )))
1073 }
1074
1075 LogicalPlan::Window {
1076 input,
1077 window_exprs,
1078 } => {
1079 let input_plan = self.plan_internal(input, all_properties)?;
1080 if !window_exprs.is_empty() {
1081 self.plan_window_functions(input_plan, window_exprs, Some(input.as_ref()))
1082 } else {
1083 Ok(input_plan)
1084 }
1085 }
1086
1087 LogicalPlan::CrossJoin { left, right } => {
1088 let left_plan = self.plan_internal(left, all_properties)?;
1089 let right_plan = self.plan_internal(right, all_properties)?;
1090
1091 let left_plan = if matches!(right.as_ref(), LogicalPlan::LocyDerivedScan { .. }) {
1097 let derived_schema = right_plan.schema();
1098 let derived_names: HashSet<&str> = derived_schema
1099 .fields()
1100 .iter()
1101 .map(|f| f.name().as_str())
1102 .collect();
1103 strip_conflicting_structural_columns(left_plan, &derived_names)?
1104 } else {
1105 left_plan
1106 };
1107
1108 Ok(Arc::new(
1109 datafusion::physical_plan::joins::CrossJoinExec::new(left_plan, right_plan),
1110 ))
1111 }
1112
1113 LogicalPlan::Apply {
1114 input,
1115 subquery,
1116 input_filter,
1117 } => self.plan_apply(input, subquery, input_filter.as_ref(), all_properties),
1118
1119 LogicalPlan::Unwind {
1120 input,
1121 expr,
1122 variable,
1123 } => self.plan_unwind(
1124 input.as_ref().clone(),
1125 expr.clone(),
1126 variable.clone(),
1127 all_properties,
1128 ),
1129
1130 LogicalPlan::VectorKnn {
1131 label_id,
1132 variable,
1133 property,
1134 query,
1135 k,
1136 threshold,
1137 } => self.plan_vector_knn(
1138 *label_id,
1139 variable,
1140 property,
1141 query.clone(),
1142 *k,
1143 *threshold,
1144 all_properties,
1145 ),
1146
1147 LogicalPlan::InvertedIndexLookup { .. } => Err(anyhow!(
1148 "Full-text search not yet supported in DataFusion engine"
1149 )),
1150
1151 LogicalPlan::AllShortestPaths {
1152 input,
1153 edge_type_ids,
1154 direction,
1155 source_variable,
1156 target_variable,
1157 target_label_id: _,
1158 path_variable,
1159 min_hops: _,
1160 max_hops: _,
1161 } => self.plan_shortest_path(
1162 input,
1163 edge_type_ids,
1164 direction.clone(),
1165 source_variable,
1166 target_variable,
1167 path_variable,
1168 true,
1169 all_properties,
1170 ),
1171
1172 LogicalPlan::QuantifiedPattern { .. } => Err(anyhow!(
1173 "Quantified patterns not yet supported in DataFusion engine"
1174 )),
1175
1176 LogicalPlan::RecursiveCTE {
1177 cte_name,
1178 initial,
1179 recursive,
1180 } => self.plan_recursive_cte(cte_name, initial, recursive, all_properties),
1181
1182 LogicalPlan::ProcedureCall {
1183 procedure_name,
1184 arguments,
1185 yield_items,
1186 } => self.plan_procedure_call(procedure_name, arguments, yield_items, all_properties),
1187
1188 LogicalPlan::SubqueryCall { input, subquery } => {
1189 self.plan_apply(input, subquery, None, all_properties)
1190 }
1191
1192 LogicalPlan::ExtIdLookup {
1193 variable,
1194 ext_id,
1195 filter,
1196 optional,
1197 } => self.plan_ext_id_lookup(variable, ext_id, filter.as_ref(), *optional),
1198
1199 LogicalPlan::Foreach {
1200 input,
1201 variable,
1202 list,
1203 body,
1204 } => {
1205 tracing::debug!(variable = variable.as_str(), "Planning ForeachExec");
1206 let child = self.plan_internal(input, all_properties)?;
1207 let mutation_ctx = self.require_mutation_ctx()?;
1208 Ok(Arc::new(
1209 super::df_graph::mutation_foreach::ForeachExec::new(
1210 child,
1211 variable.clone(),
1212 list.clone(),
1213 body.clone(),
1214 mutation_ctx,
1215 ),
1216 ))
1217 }
1218
1219 LogicalPlan::LocyPriority { input, key_columns } => {
1221 let child = self.plan_internal(input, all_properties)?;
1222 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1223 let priority_col_index = child.schema().index_of("__priority").map_err(|_| {
1224 anyhow::anyhow!("LocyPriority input must contain __priority column")
1225 })?;
1226 Ok(Arc::new(super::df_graph::locy_priority::PriorityExec::new(
1227 child,
1228 key_indices,
1229 priority_col_index,
1230 )))
1231 }
1232
1233 LogicalPlan::LocyBestBy {
1234 input,
1235 key_columns,
1236 criteria,
1237 } => {
1238 let child = self.plan_internal(input, all_properties)?;
1239 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1240 let sort_criteria = resolve_best_by_criteria(&child.schema(), criteria)?;
1241 Ok(Arc::new(super::df_graph::locy_best_by::BestByExec::new(
1242 child,
1243 key_indices,
1244 sort_criteria,
1245 true, )))
1247 }
1248
1249 LogicalPlan::LocyFold {
1250 input,
1251 key_columns,
1252 fold_bindings,
1253 strict_probability_domain,
1254 probability_epsilon,
1255 } => {
1256 let child = self.plan_internal(input, all_properties)?;
1257 let key_indices = resolve_column_indices(&child.schema(), key_columns)?;
1258 let bindings =
1259 resolve_fold_bindings(&child.schema(), fold_bindings, &self.plugin_registry)?;
1260 Ok(Arc::new(super::df_graph::locy_fold::FoldExec::new(
1261 child,
1262 key_indices,
1263 bindings,
1264 *strict_probability_domain,
1265 *probability_epsilon,
1266 )))
1267 }
1268
1269 LogicalPlan::LocyDerivedScan {
1270 scan_index: _,
1271 data,
1272 schema,
1273 } => Ok(Arc::new(
1274 super::df_graph::locy_fixpoint::DerivedScanExec::new(
1275 Arc::clone(data),
1276 Arc::clone(schema),
1277 ),
1278 )),
1279
1280 LogicalPlan::LocyProject {
1281 input,
1282 projections,
1283 target_types,
1284 } => self.plan_locy_project(input, projections, target_types, all_properties),
1285
1286 LogicalPlan::LocyModelInvoke {
1287 input,
1288 invocations,
1289 classifier_registry,
1290 classifier_cache,
1291 classifier_provenance_store,
1292 path_context_handles,
1293 } => {
1294 let input_plan = self.plan_internal(input, all_properties)?;
1295 let xervo_runtime =
1301 super::df_graph::locy_model_invoke::XervoRuntimeHandle(
1302 self.graph_ctx.xervo_runtime().cloned(),
1303 );
1304 let graph_algo = {
1308 let l0_ctx = self.graph_ctx.l0_context();
1309 let l0_mgr = l0_ctx.current_l0.as_ref().map(|current| {
1310 let mut pending = l0_ctx.pending_flush_l0s.clone();
1311 if let Some(tx_l0) = &l0_ctx.transaction_l0 {
1312 pending.push(tx_l0.clone());
1313 }
1314 Arc::new(uni_store::runtime::l0_manager::L0Manager::from_snapshot(
1315 current.clone(),
1316 pending,
1317 ))
1318 });
1319 let l0_buffers = self.graph_ctx.l0_context().current_l0.as_ref().map(
1320 |current| super::df_graph::locy_model_invoke::L0Buffers {
1321 current: current.clone(),
1322 transaction: self.graph_ctx.l0_context().transaction_l0.clone(),
1323 pending_flush: self.graph_ctx.l0_context().pending_flush_l0s.clone(),
1324 },
1325 );
1326 super::df_graph::locy_model_invoke::GraphAlgoHandle {
1327 registry: self.graph_ctx.algo_registry().cloned(),
1328 storage: Some(self.graph_ctx.storage().clone()),
1329 l0_manager: l0_mgr,
1330 property_manager: Some(self.graph_ctx.property_manager().clone()),
1331 l0_buffers,
1332 }
1333 };
1334 Ok(Arc::new(
1335 super::df_graph::locy_model_invoke::LocyModelInvokeExec::new(
1336 input_plan,
1337 invocations.clone(),
1338 Arc::clone(classifier_registry),
1339 classifier_cache.as_ref().map(Arc::clone),
1340 classifier_provenance_store.as_ref().map(Arc::clone),
1341 path_context_handles.clone(),
1342 xervo_runtime,
1343 graph_algo,
1344 ),
1345 ))
1346 }
1347
1348 LogicalPlan::LocyProgram {
1349 strata,
1350 commands,
1351 derived_scan_registry,
1352 max_iterations,
1353 timeout,
1354 max_derived_bytes,
1355 deterministic_best_by,
1356 strict_probability_domain,
1357 probability_epsilon,
1358 exact_probability,
1359 max_bdd_variables,
1360 top_k_proofs,
1361 semiring_kind,
1362 classifier_registry,
1363 classifier_cache,
1364 classifier_provenance_store,
1365 } => {
1366 let output_schema = super::df_graph::locy_program::stats_schema();
1367
1368 Ok(Arc::new(
1369 super::df_graph::locy_program::LocyProgramExec::new_with_semiring_classifiers_and_cache(
1370 strata.clone(),
1371 commands.clone(),
1372 Arc::clone(derived_scan_registry),
1373 Arc::clone(&self.plugin_registry),
1374 Arc::clone(&self.graph_ctx),
1375 Arc::clone(&self.session_ctx),
1376 Arc::clone(&self.storage),
1377 Arc::clone(&self.schema),
1378 self.params.clone(),
1379 output_schema,
1380 *max_iterations,
1381 *timeout,
1382 *max_derived_bytes,
1383 *deterministic_best_by,
1384 *strict_probability_domain,
1385 *probability_epsilon,
1386 *exact_probability,
1387 *max_bdd_variables,
1388 *top_k_proofs,
1389 *semiring_kind,
1390 Arc::clone(classifier_registry),
1391 classifier_cache.as_ref().map(Arc::clone),
1392 classifier_provenance_store.as_ref().map(Arc::clone),
1393 ),
1394 ))
1395 }
1396
1397 LogicalPlan::CreateVectorIndex { .. }
1399 | LogicalPlan::CreateFullTextIndex { .. }
1400 | LogicalPlan::CreateScalarIndex { .. }
1401 | LogicalPlan::CreateJsonFtsIndex { .. }
1402 | LogicalPlan::DropIndex { .. }
1403 | LogicalPlan::ShowIndexes { .. }
1404 | LogicalPlan::Copy { .. }
1405 | LogicalPlan::Backup { .. }
1406 | LogicalPlan::ShowDatabase
1407 | LogicalPlan::ShowConfig
1408 | LogicalPlan::ShowStatistics
1409 | LogicalPlan::Vacuum
1410 | LogicalPlan::Checkpoint
1411 | LogicalPlan::CopyTo { .. }
1412 | LogicalPlan::CopyFrom { .. }
1413 | LogicalPlan::CreateLabel(_)
1414 | LogicalPlan::CreateEdgeType(_)
1415 | LogicalPlan::AlterLabel(_)
1416 | LogicalPlan::AlterEdgeType(_)
1417 | LogicalPlan::DropLabel(_)
1418 | LogicalPlan::DropEdgeType(_)
1419 | LogicalPlan::CreateConstraint(_)
1420 | LogicalPlan::DropConstraint(_)
1421 | LogicalPlan::ShowConstraints(_)
1422 | LogicalPlan::Explain { .. } => {
1423 Err(anyhow!("DDL/Admin operations should be handled separately"))
1424 }
1425 }
1426 }
1427
1428 fn plan_internal_with_aliases(
1432 &self,
1433 logical: &LogicalPlan,
1434 all_properties: &HashMap<String, HashSet<String>>,
1435 alias_map: &HashMap<String, Expr>,
1436 ) -> Result<Arc<dyn ExecutionPlan>> {
1437 match logical {
1438 LogicalPlan::Sort { input, order_by } => {
1439 self.plan_sort(input, order_by, all_properties, alias_map)
1440 }
1441 LogicalPlan::Limit { input, skip, fetch } => {
1442 let input_plan =
1444 self.plan_internal_with_aliases(input, all_properties, alias_map)?;
1445 if let Some(offset) = skip.filter(|&s| s > 0) {
1446 use datafusion::physical_plan::limit::GlobalLimitExec;
1447 Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, *fetch)))
1448 } else {
1449 Ok(Arc::new(LocalLimitExec::new(
1450 input_plan,
1451 fetch.unwrap_or(usize::MAX),
1452 )))
1453 }
1454 }
1455 _ => self.plan_internal(logical, all_properties),
1457 }
1458 }
1459
1460 fn extract_vid_from_cypher_filter(
1476 filter: Option<&Expr>,
1477 variable: &str,
1478 params: &HashMap<String, uni_common::Value>,
1479 ) -> Option<Vec<u64>> {
1480 use uni_cypher::ast::BinaryOp;
1481 let filter = filter?;
1482 match filter {
1483 Expr::BinaryOp {
1484 left,
1485 op: BinaryOp::Eq,
1486 right,
1487 } => {
1488 if let Expr::Property(var_expr, prop) = left.as_ref()
1490 && let Expr::Variable(v) = var_expr.as_ref()
1491 && v == variable
1492 && prop == "_vid"
1493 {
1494 return Self::resolve_vid_value(right, params).map(|v| vec![v]);
1495 }
1496 if let Expr::Property(var_expr, prop) = right.as_ref()
1498 && let Expr::Variable(v) = var_expr.as_ref()
1499 && v == variable
1500 && prop == "_vid"
1501 {
1502 return Self::resolve_vid_value(left, params).map(|v| vec![v]);
1503 }
1504 None
1505 }
1506 Expr::In { expr, list } => {
1507 let Expr::Property(var_expr, prop) = expr.as_ref() else {
1509 return None;
1510 };
1511 let Expr::Variable(v) = var_expr.as_ref() else {
1512 return None;
1513 };
1514 if v != variable || prop != "_vid" {
1515 return None;
1516 }
1517 let Expr::List(items) = list.as_ref() else {
1518 return None;
1519 };
1520 let mut out = Vec::with_capacity(items.len());
1521 for item in items {
1522 out.push(Self::resolve_vid_value(item, params)?);
1523 }
1524 if out.is_empty() { None } else { Some(out) }
1525 }
1526 Expr::BinaryOp {
1527 left,
1528 op: BinaryOp::And,
1529 right,
1530 } => Self::extract_vid_from_cypher_filter(Some(left), variable, params)
1531 .or_else(|| Self::extract_vid_from_cypher_filter(Some(right), variable, params)),
1532 _ => None,
1533 }
1534 }
1535
1536 fn build_vid_physical_filter(
1541 col_name: &str,
1542 vid: u64,
1543 ) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
1544 use datafusion::physical_expr::expressions::{BinaryExpr, Column, Literal};
1545 Arc::new(BinaryExpr::new(
1546 Arc::new(Column::new(col_name, 0)),
1547 datafusion::logical_expr::Operator::Eq,
1548 Arc::new(Literal::new(datafusion::common::ScalarValue::UInt64(Some(
1549 vid,
1550 )))),
1551 ))
1552 }
1553
1554 fn resolve_vid_value(expr: &Expr, params: &HashMap<String, uni_common::Value>) -> Option<u64> {
1555 match expr {
1556 Expr::Literal(CypherLiteral::Integer(v)) if *v >= 0 => Some(*v as u64),
1557 Expr::Parameter(name) => match params.get(name) {
1558 Some(uni_common::Value::Int(v)) if *v >= 0 => Some(*v as u64),
1559 _ => None,
1560 },
1561 _ => None,
1562 }
1563 }
1564
1565 fn and_join_predicates(mut preds: Vec<Expr>) -> Expr {
1569 if preds.is_empty() {
1570 return uni_cypher::ast::Expr::TRUE;
1571 }
1572 let mut acc = preds.remove(0);
1573 for p in preds {
1574 acc = Expr::BinaryOp {
1575 left: Box::new(acc),
1576 op: uni_cypher::ast::BinaryOp::And,
1577 right: Box::new(p),
1578 };
1579 }
1580 acc
1581 }
1582
1583 fn build_indexed_property_pushdown(
1595 &self,
1596 filter: Option<&Expr>,
1597 variable: &str,
1598 label_id: u16,
1599 scan_schema: &SchemaRef,
1600 ) -> Option<(String, Arc<dyn datafusion::physical_expr::PhysicalExpr>)> {
1601 let filter = filter?;
1602 let analyzer = crate::query::pushdown::IndexAwareAnalyzer::new(&self.schema);
1603 let strategy = analyzer.analyze(filter, variable, label_id);
1604 if strategy.hash_index_columns.is_empty() {
1605 return None;
1606 }
1607
1608 let label_name = self.schema.label_name_by_id(label_id)?;
1616 let label_props = self.schema.properties.get(label_name);
1617 let mut indexed_preds: Vec<Expr> = Vec::new();
1618 for pred in &strategy.lance_predicates {
1619 if let Some(col) = crate::query::pushdown::predicate_target_column(pred, variable)
1620 && strategy.hash_index_columns.iter().any(|c| c == &col)
1621 {
1622 let resolved = crate::query::pushdown::substitute_params(pred, &self.params)?;
1623 indexed_preds.push(resolved);
1624 }
1625 }
1626 if indexed_preds.is_empty() {
1627 return None;
1628 }
1629
1630 let lance_str = crate::query::pushdown::LanceFilterGenerator::generate(
1632 &indexed_preds,
1633 variable,
1634 label_props,
1635 )?;
1636
1637 let combined = Self::and_join_predicates(indexed_preds.clone());
1641 let mut variable_kinds = HashMap::new();
1642 variable_kinds.insert(variable.to_string(), VariableKind::Node);
1643 let mut variable_labels = HashMap::new();
1644 variable_labels.insert(variable.to_string(), label_name.to_string());
1645 let ctx = TranslationContext {
1646 parameters: self.params.clone(),
1647 variable_labels,
1648 variable_kinds,
1649 ..Default::default()
1650 };
1651 let df_filter = cypher_expr_to_df(&combined, Some(&ctx)).ok()?;
1652 let session = self.session_ctx.read();
1653 let physical = self
1654 .create_physical_filter_expr(&df_filter, scan_schema, &session)
1655 .ok()?;
1656 Some((lance_str, physical))
1657 }
1658
1659 fn wrap_read_set_recording(
1668 &self,
1669 plan: Arc<dyn ExecutionPlan>,
1670 variable: &str,
1671 ) -> Arc<dyn ExecutionPlan> {
1672 let has_read_set = self
1673 .graph_ctx
1674 .l0_context()
1675 .transaction_l0
1676 .as_ref()
1677 .is_some_and(|l0| l0.read().occ_read_set.is_some());
1678 if !has_read_set {
1679 return plan;
1680 }
1681 Arc::new(ReadSetRecordingExec::new(
1682 plan,
1683 self.graph_ctx.clone(),
1684 variable,
1685 ))
1686 }
1687
1688 fn apply_scan_filter(
1689 &self,
1690 plan: Arc<dyn ExecutionPlan>,
1691 variable: &str,
1692 filter: Option<&Expr>,
1693 label_name: Option<&str>,
1694 ) -> Result<Arc<dyn ExecutionPlan>> {
1695 let Some(filter_expr) = filter else {
1696 return Ok(plan);
1697 };
1698
1699 let mut variable_kinds = HashMap::new();
1700 variable_kinds.insert(variable.to_string(), VariableKind::Node);
1701 let mut variable_labels = HashMap::new();
1702 if let Some(label) = label_name {
1703 variable_labels.insert(variable.to_string(), label.to_string());
1704 }
1705 let ctx = TranslationContext {
1706 parameters: self.params.clone(),
1707 variable_labels,
1708 variable_kinds,
1709 ..Default::default()
1710 };
1711 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1712
1713 let schema = plan.schema();
1714
1715 let session = self.session_ctx.read();
1716 let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1717
1718 Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1719 }
1720
1721 #[expect(clippy::too_many_arguments)]
1728 fn apply_schemaless_traverse_filter(
1729 &self,
1730 plan: Arc<dyn ExecutionPlan>,
1731 filter_expr: Option<&Expr>,
1732 source_variable: &str,
1733 target_variable: &str,
1734 step_variable: Option<&str>,
1735 path_variable: Option<&str>,
1736 is_variable_length: bool,
1737 optional: bool,
1738 optional_pattern_vars: &HashSet<String>,
1739 ) -> Result<Arc<dyn ExecutionPlan>> {
1740 let Some(filter_expr) = filter_expr else {
1741 return Ok(plan);
1742 };
1743
1744 let mut variable_kinds = HashMap::new();
1745 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
1746 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
1747 if let Some(sv) = step_variable {
1748 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
1749 }
1750 if let Some(pv) = path_variable {
1751 variable_kinds.insert(pv.to_string(), VariableKind::Path);
1752 }
1753 let ctx = TranslationContext {
1754 parameters: self.params.clone(),
1755 variable_kinds,
1756 ..Default::default()
1757 };
1758 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
1759 let schema = plan.schema();
1760 let session = self.session_ctx.read();
1761 let physical_filter = self.create_physical_filter_expr(&df_filter, &schema, &session)?;
1762
1763 if optional {
1764 Ok(Arc::new(OptionalFilterExec::new(
1765 plan,
1766 physical_filter,
1767 optional_pattern_vars.clone(),
1768 )))
1769 } else {
1770 Ok(Arc::new(FilterExec::try_new(physical_filter, plan)?))
1771 }
1772 }
1773
1774 fn plan_ext_id_lookup(
1776 &self,
1777 variable: &str,
1778 ext_id: &str,
1779 filter: Option<&Expr>,
1780 optional: bool,
1781 ) -> Result<Arc<dyn ExecutionPlan>> {
1782 let properties = if let Some(filter_expr) = filter {
1784 crate::query::df_expr::collect_properties(filter_expr)
1785 .into_iter()
1786 .filter(|(var, _)| var == variable)
1787 .map(|(_, prop)| prop)
1788 .collect()
1789 } else {
1790 vec![]
1791 };
1792
1793 let lookup_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphExtIdLookupExec::new(
1794 self.graph_ctx.clone(),
1795 variable.to_string(),
1796 ext_id.to_string(),
1797 properties,
1798 optional,
1799 ));
1800
1801 self.apply_scan_filter(lookup_plan, variable, filter, None)
1802 }
1803
1804 fn plan_unwind(
1808 &self,
1809 input: LogicalPlan,
1810 expr: Expr,
1811 variable: String,
1812 all_properties: &HashMap<String, HashSet<String>>,
1813 ) -> Result<Arc<dyn ExecutionPlan>> {
1814 let input_plan = self.plan_internal(&input, all_properties)?;
1816
1817 let unwind = GraphUnwindExec::new(input_plan, expr, variable, self.params.clone());
1818
1819 Ok(Arc::new(unwind))
1820 }
1821
1822 fn plan_recursive_cte(
1827 &self,
1828 cte_name: &str,
1829 initial: &LogicalPlan,
1830 recursive: &LogicalPlan,
1831 _all_properties: &HashMap<String, HashSet<String>>,
1832 ) -> Result<Arc<dyn ExecutionPlan>> {
1833 Ok(Arc::new(RecursiveCTEExec::new(
1834 cte_name.to_string(),
1835 initial.clone(),
1836 recursive.clone(),
1837 self.graph_ctx.clone(),
1838 self.session_ctx.clone(),
1839 self.storage.clone(),
1840 self.schema.clone(),
1841 self.params.clone(),
1842 self.mutation_ctx.clone(),
1843 )))
1844 }
1845
1846 fn plan_apply(
1848 &self,
1849 input: &LogicalPlan,
1850 subquery: &LogicalPlan,
1851 input_filter: Option<&Expr>,
1852 all_properties: &HashMap<String, HashSet<String>>,
1853 ) -> Result<Arc<dyn ExecutionPlan>> {
1854 use crate::query::df_graph::common::infer_logical_plan_schema;
1855
1856 let input_exec = self.plan_internal(input, all_properties)?;
1858 let input_schema = input_exec.schema();
1859
1860 let subquery_effective = match subquery {
1869 LogicalPlan::Limit {
1870 input: inner,
1871 skip: None,
1872 fetch: Some(0),
1873 } => inner.as_ref(),
1874 _ => subquery,
1875 };
1876
1877 let sub_schema = infer_logical_plan_schema(subquery, &self.schema);
1881
1882 let sub_field_names: HashSet<&str> = sub_schema
1891 .fields()
1892 .iter()
1893 .map(|f| f.name().as_str())
1894 .collect();
1895 let kept_input_indices: Vec<usize> = input_schema
1903 .fields()
1904 .iter()
1905 .enumerate()
1906 .filter(|(_, f)| !sub_field_names.contains(f.name().as_str()))
1907 .map(|(i, _)| i)
1908 .collect();
1909 let kept_input_overrides: Vec<Option<(String, String)>> = kept_input_indices
1914 .iter()
1915 .map(|&i| {
1916 let name = input_schema.field(i).name();
1917 if let Some(dot) = name.find('.') {
1918 let base = &name[..dot];
1919 if sub_field_names.contains(base) {
1920 return Some((base.to_string(), name[dot + 1..].to_string()));
1921 }
1922 }
1923 None
1924 })
1925 .collect();
1926 let mut fields: Vec<Arc<arrow_schema::Field>> = kept_input_indices
1927 .iter()
1928 .map(|&i| input_schema.fields()[i].clone())
1929 .collect();
1930 fields.extend(sub_schema.fields().iter().cloned());
1931 let output_schema: SchemaRef = Arc::new(Schema::new(fields));
1932
1933 Ok(Arc::new(GraphApplyExec::new(
1934 input_exec,
1935 subquery_effective.clone(),
1936 input_filter.cloned(),
1937 self.graph_ctx.clone(),
1938 self.session_ctx.clone(),
1939 self.storage.clone(),
1940 self.schema.clone(),
1941 self.params.clone(),
1942 output_schema,
1943 kept_input_indices,
1944 kept_input_overrides,
1945 self.mutation_ctx.clone(),
1946 )))
1947 }
1948
1949 #[expect(clippy::too_many_arguments)]
1951 fn plan_vector_knn(
1952 &self,
1953 label_id: u16,
1954 variable: &str,
1955 property: &str,
1956 query_expr: Expr,
1957 k: usize,
1958 threshold: Option<f32>,
1959 all_properties: &HashMap<String, HashSet<String>>,
1960 ) -> Result<Arc<dyn ExecutionPlan>> {
1961 let label_name = self
1962 .schema
1963 .label_name_by_id(label_id)
1964 .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
1965
1966 let target_properties = self.resolve_properties(variable, label_name, all_properties);
1967
1968 let plugin_source = self
1977 .schema
1978 .vector_index_for_property(label_name, property)
1979 .and_then(|cfg| {
1980 self.plugin_registry
1981 .index_handle(&cfg.name)
1982 .map(|entry| (cfg.name.clone(), entry))
1983 });
1984
1985 let knn = if let Some((index_name, entry)) = plugin_source {
1986 tracing::debug!(
1987 target: "uni.plugin.registry",
1988 index_kind = %entry.kind.0,
1989 index_name = %index_name,
1990 "plan_vector_knn: dispatching via plugin IndexHandle"
1991 );
1992 GraphVectorKnnExec::with_plugin_source(
1993 self.graph_ctx.clone(),
1994 label_id,
1995 label_name,
1996 variable.to_string(),
1997 property.to_string(),
1998 query_expr,
1999 k,
2000 threshold,
2001 self.params.clone(),
2002 target_properties,
2003 entry.kind,
2004 entry.handle,
2005 )
2006 } else {
2007 GraphVectorKnnExec::new(
2008 self.graph_ctx.clone(),
2009 label_id,
2010 label_name,
2011 variable.to_string(),
2012 property.to_string(),
2013 query_expr,
2014 k,
2015 threshold,
2016 self.params.clone(),
2017 target_properties,
2018 )
2019 };
2020
2021 Ok(self.wrap_read_set_recording(Arc::new(knn), variable))
2027 }
2028
2029 fn plan_procedure_call(
2031 &self,
2032 procedure_name: &str,
2033 arguments: &[Expr],
2034 yield_items: &[(String, Option<String>)],
2035 all_properties: &HashMap<String, HashSet<String>>,
2036 ) -> Result<Arc<dyn ExecutionPlan>> {
2037 use crate::query::df_graph::procedure_call::map_yield_to_canonical;
2038
2039 let mut target_properties: HashMap<String, Vec<String>> = HashMap::new();
2041
2042 if crate::query::df_graph::procedure_call::is_node_yield_procedure_static(procedure_name) {
2043 for (name, alias) in yield_items {
2044 let output_name = alias.as_ref().unwrap_or(name);
2045 let canonical = map_yield_to_canonical(name);
2046 if canonical == "node" {
2047 if let Some(props) = all_properties.get(output_name.as_str()) {
2049 let prop_list: Vec<String> = props
2050 .iter()
2051 .filter(|p| *p != "*" && !p.starts_with('_'))
2052 .cloned()
2053 .collect();
2054 target_properties.insert(output_name.clone(), prop_list);
2055 }
2056 }
2057 }
2058 }
2059
2060 let exec = GraphProcedureCallExec::new(
2061 self.graph_ctx.clone(),
2062 procedure_name.to_string(),
2063 arguments.to_vec(),
2064 yield_items.to_vec(),
2065 self.params.clone(),
2066 self.outer_values.clone(),
2067 target_properties,
2068 );
2069
2070 Ok(Arc::new(exec))
2071 }
2072
2073 fn plan_scan(
2075 &self,
2076 label_id: u16,
2077 variable: &str,
2078 filter: Option<&Expr>,
2079 optional: bool,
2080 all_properties: &HashMap<String, HashSet<String>>,
2081 ) -> Result<Arc<dyn ExecutionPlan>> {
2082 if uni_common::core::schema::is_virtual_label_id(label_id) {
2088 let entry = self
2089 .plugin_registry
2090 .virtual_label_by_id(label_id)
2091 .ok_or_else(|| {
2092 anyhow!(
2093 "Virtual label id {label_id:#x} has no registered CatalogTable; \
2094 the originating CatalogProvider may have been deregistered \
2095 after the plan was cached"
2096 )
2097 })?;
2098 let label_name = entry.name.as_str();
2099 let properties = self.resolve_properties(variable, label_name, all_properties);
2100 let pushdown_filters: Vec<datafusion::logical_expr::Expr> = filter
2101 .map(|f| -> Result<Vec<_>> {
2102 let ctx = crate::query::df_expr::TranslationContext {
2103 parameters: self.params.clone(),
2104 outer_values: self.outer_values.clone(),
2105 ..Default::default()
2106 };
2107 let df = crate::query::df_expr::cypher_expr_to_df(f, Some(&ctx))?;
2108 Ok(vec![df])
2109 })
2110 .transpose()?
2111 .unwrap_or_default();
2112 let exec = crate::query::df_graph::catalog_scan::CatalogVertexScanExec::try_new(
2113 entry.table,
2114 label_id,
2115 label_name.to_string(),
2116 variable.to_string(),
2117 properties,
2118 pushdown_filters,
2119 None, )?;
2121 let mut plan: Arc<dyn ExecutionPlan> = Arc::new(exec);
2122 plan = self.apply_scan_filter(plan, variable, filter, Some(label_name))?;
2125 return self.wrap_optional(plan, optional);
2134 }
2135
2136 let label_name = self
2137 .schema
2138 .label_name_by_id(label_id)
2139 .ok_or_else(|| anyhow!("Unknown label ID: {}", label_id))?;
2140
2141 let mut properties = self.resolve_properties(variable, label_name, all_properties);
2143
2144 let label_props = self.schema.properties.get(label_name);
2146 let has_projection_overflow = properties.iter().any(|p| {
2147 p != "overflow_json"
2148 && !p.starts_with('_')
2149 && !label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
2150 });
2151 if has_projection_overflow && !properties.iter().any(|p| p == "overflow_json") {
2152 properties.push("overflow_json".to_string());
2153 }
2154
2155 if let Some(filter_expr) = filter {
2158 let filter_props = crate::query::df_expr::collect_properties(filter_expr);
2159 let has_overflow = filter_props.iter().any(|(var, prop)| {
2160 var == variable
2161 && !prop.starts_with('_')
2162 && label_props.is_none_or(|props| !props.contains_key(prop.as_str()))
2163 });
2164 if has_overflow && !properties.iter().any(|p| p == "overflow_json") {
2165 properties.push("overflow_json".to_string());
2166 }
2167 }
2168
2169 let var_props = all_properties.get(variable);
2178 let need_full =
2179 var_props.is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL));
2180 let need_full_record = var_props.is_some_and(|p| p.contains("*"));
2181 if need_full_record {
2182 if !properties.contains(&"_all_props".to_string()) {
2183 properties.push("_all_props".to_string());
2184 }
2185 if !properties.contains(&"overflow_json".to_string()) {
2186 properties.push("overflow_json".to_string());
2187 }
2188 }
2189
2190 let extracted_vids = Self::extract_vid_from_cypher_filter(filter, variable, &self.params);
2198 let scan_filter = extracted_vids
2199 .as_deref()
2200 .filter(|v| v.len() == 1)
2201 .map(|v| Self::build_vid_physical_filter(&format!("{variable}._vid"), v[0]));
2202 let mut scan_exec = GraphScanExec::new_vertex_scan(
2203 self.graph_ctx.clone(),
2204 label_name.to_string(),
2205 variable.to_string(),
2206 properties.clone(),
2207 scan_filter,
2208 );
2209 if let Some(vids) = extracted_vids
2210 && vids.len() > 1
2211 {
2212 scan_exec = scan_exec.with_vid_list_filter(vids);
2213 }
2214
2215 let scan_schema_for_idx = scan_exec.schema();
2223 if let Some((lance_str, runtime_filter)) =
2224 self.build_indexed_property_pushdown(filter, variable, label_id, &scan_schema_for_idx)
2225 {
2226 scan_exec = scan_exec
2227 .with_extra_lance_filter(lance_str)
2228 .with_extra_runtime_filter(runtime_filter);
2229 }
2230 let mut scan_plan: Arc<dyn ExecutionPlan> = Arc::new(scan_exec);
2231
2232 scan_plan = self.apply_scan_filter(scan_plan, variable, filter, Some(label_name))?;
2237
2238 scan_plan = self.wrap_read_set_recording(scan_plan, variable);
2241
2242 if need_full {
2243 let struct_props: Vec<String> = properties
2246 .iter()
2247 .filter(|p| *p != "overflow_json" && *p != "*" && *p != STRUCT_ONLY_SENTINEL)
2248 .cloned()
2249 .collect();
2250 scan_plan = self.add_structural_projection(scan_plan, variable, &struct_props)?;
2251 }
2252
2253 self.wrap_optional(scan_plan, optional)
2254 }
2255
2256 fn add_wildcard_structural_projection(
2265 &self,
2266 plan: Arc<dyn ExecutionPlan>,
2267 variable: &str,
2268 all_properties: &HashMap<String, HashSet<String>>,
2269 ) -> Result<Arc<dyn ExecutionPlan>> {
2270 if !all_properties
2271 .get(variable)
2272 .is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL))
2273 {
2274 return Ok(plan);
2275 }
2276 let prefix = format!("{}.", variable);
2277 let struct_props: Vec<String> = plan
2278 .schema()
2279 .fields()
2280 .iter()
2281 .filter_map(|f| {
2282 f.name()
2283 .strip_prefix(&prefix)
2284 .filter(|prop| !prop.starts_with('_') || *prop == "_all_props")
2285 .map(|prop| prop.to_string())
2286 })
2287 .collect();
2288 self.add_structural_projection(plan, variable, &struct_props)
2289 }
2290
2291 fn detect_bound_target(input_schema: &SchemaRef, target_variable: &str) -> Option<String> {
2295 let col = format!("{}._vid", target_variable);
2297 if input_schema.column_with_name(&col).is_some() {
2298 return Some(col);
2299 }
2300 if let Ok(field) = input_schema.field_with_name(target_variable)
2306 && matches!(
2307 field.data_type(),
2308 datafusion::arrow::datatypes::DataType::UInt64
2309 | datafusion::arrow::datatypes::DataType::Int64
2310 )
2311 {
2312 return Some(target_variable.to_string());
2313 }
2314 None
2315 }
2316
2317 fn resolve_schemaless_properties(
2324 variable: &str,
2325 all_properties: &HashMap<String, HashSet<String>>,
2326 ) -> (Vec<String>, bool) {
2327 let mut properties: Vec<String> = all_properties
2328 .get(variable)
2329 .map(|s| {
2330 s.iter()
2331 .filter(|p| *p != "*" && *p != STRUCT_ONLY_SENTINEL)
2332 .cloned()
2333 .collect()
2334 })
2335 .unwrap_or_default();
2336 let need_full = all_properties
2337 .get(variable)
2338 .is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL));
2339 if !properties.iter().any(|p| p == "_all_props") {
2340 properties.push("_all_props".to_string());
2341 }
2342 (properties, need_full)
2343 }
2344
2345 fn collect_used_edge_columns(
2348 schema: &SchemaRef,
2349 scope_match_variables: &HashSet<String>,
2350 exclude_col: Option<&str>,
2351 ) -> Vec<String> {
2352 schema
2353 .fields()
2354 .iter()
2355 .filter_map(|f| {
2356 let name = f.name();
2357 if exclude_col.is_some_and(|exc| name == exc) {
2358 None
2359 } else if name.ends_with("._eid") {
2360 let var_name = name.trim_end_matches("._eid");
2361 scope_match_variables
2362 .contains(var_name)
2363 .then(|| name.clone())
2364 } else if name.starts_with("__eid_to_") {
2365 let var_name = name.trim_start_matches("__eid_to_");
2366 scope_match_variables
2367 .contains(var_name)
2368 .then(|| name.clone())
2369 } else {
2370 None
2371 }
2372 })
2373 .collect()
2374 }
2375
2376 fn maybe_add_edge_structural_projection(
2379 &self,
2380 plan: Arc<dyn ExecutionPlan>,
2381 step_variable: Option<&str>,
2382 source_variable: &str,
2383 target_variable: &str,
2384 all_properties: &HashMap<String, HashSet<String>>,
2385 skip_if_vlp: bool,
2386 ) -> Result<Arc<dyn ExecutionPlan>> {
2387 if skip_if_vlp {
2388 return Ok(plan);
2389 }
2390 let Some(edge_var) = step_variable else {
2391 return Ok(plan);
2392 };
2393 if !all_properties
2394 .get(edge_var)
2395 .is_some_and(|p| p.contains("*") || p.contains(STRUCT_ONLY_SENTINEL))
2396 {
2397 return Ok(plan);
2398 }
2399 let prefix = format!("{}.", edge_var);
2401 let edge_props: Vec<String> = plan
2402 .schema()
2403 .fields()
2404 .iter()
2405 .filter_map(|f| {
2406 f.name()
2407 .strip_prefix(&prefix)
2408 .filter(|prop| !prop.starts_with('_') && *prop != "overflow_json")
2409 .map(|prop| prop.to_string())
2410 })
2411 .collect();
2412 self.add_edge_structural_projection(
2413 plan,
2414 edge_var,
2415 &edge_props,
2416 source_variable,
2417 target_variable,
2418 )
2419 }
2420
2421 fn finalize_schemaless_scan(
2423 &self,
2424 scan_plan: Arc<dyn ExecutionPlan>,
2425 variable: &str,
2426 filter: Option<&Expr>,
2427 optional: bool,
2428 properties: &[String],
2429 need_full: bool,
2430 ) -> Result<Arc<dyn ExecutionPlan>> {
2431 let mut plan = self.apply_scan_filter(scan_plan, variable, filter, None)?;
2434
2435 plan = self.wrap_read_set_recording(plan, variable);
2438
2439 if need_full {
2442 let struct_props: Vec<String> = properties
2447 .iter()
2448 .filter(|p| *p != "*" && *p != STRUCT_ONLY_SENTINEL)
2449 .cloned()
2450 .collect();
2451 plan = self.add_structural_projection(plan, variable, &struct_props)?;
2452 }
2453
2454 self.wrap_optional(plan, optional)
2455 }
2456
2457 fn plan_schemaless_scan(
2458 &self,
2459 label_name: &str,
2460 variable: &str,
2461 filter: Option<&Expr>,
2462 optional: bool,
2463 all_properties: &HashMap<String, HashSet<String>>,
2464 ) -> Result<Arc<dyn ExecutionPlan>> {
2465 let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
2466 let scan_plan: Arc<dyn ExecutionPlan> =
2467 Arc::new(GraphScanExec::new_schemaless_vertex_scan(
2468 self.graph_ctx.clone(),
2469 label_name.to_string(),
2470 variable.to_string(),
2471 properties.clone(),
2472 None,
2473 ));
2474 self.finalize_schemaless_scan(
2475 scan_plan,
2476 variable,
2477 filter,
2478 optional,
2479 &properties,
2480 need_full,
2481 )
2482 }
2483
2484 fn classify_labels(
2491 registry: &uni_plugin::PluginRegistry,
2492 labels: &[String],
2493 ) -> (Vec<(String, u16)>, Vec<String>) {
2494 let mut virtual_labels: Vec<(String, u16)> = Vec::new();
2495 let mut native_labels: Vec<String> = Vec::new();
2496 for label in labels {
2497 if let Some(id) = registry.virtual_label_by_name(label) {
2498 virtual_labels.push((label.clone(), id));
2499 } else {
2500 native_labels.push(label.clone());
2501 }
2502 }
2503 (virtual_labels, native_labels)
2504 }
2505
2506 fn plan_multi_label_scan(
2522 &self,
2523 labels: &[String],
2524 variable: &str,
2525 filter: Option<&Expr>,
2526 optional: bool,
2527 all_properties: &HashMap<String, HashSet<String>>,
2528 ) -> Result<Arc<dyn ExecutionPlan>> {
2529 let (virtual_labels, native_labels) = Self::classify_labels(&self.plugin_registry, labels);
2530
2531 if virtual_labels.is_empty() {
2533 let (properties, need_full) =
2534 Self::resolve_schemaless_properties(variable, all_properties);
2535 let scan_plan: Arc<dyn ExecutionPlan> =
2536 Arc::new(GraphScanExec::new_multi_label_vertex_scan(
2537 self.graph_ctx.clone(),
2538 labels.to_vec(),
2539 variable.to_string(),
2540 properties.clone(),
2541 None,
2542 ));
2543 return self.finalize_schemaless_scan(
2544 scan_plan,
2545 variable,
2546 filter,
2547 optional,
2548 &properties,
2549 need_full,
2550 );
2551 }
2552
2553 let virtual_side =
2559 self.build_virtual_union_scan(&virtual_labels, variable, filter, all_properties)?;
2560
2561 if native_labels.is_empty() {
2563 let plan = self.apply_scan_filter(virtual_side, variable, filter, None)?;
2568 return self.wrap_optional(plan, optional);
2569 }
2570
2571 let native_properties: Vec<String> = vec!["_all_props".to_string()];
2577 let native_scan: Arc<dyn ExecutionPlan> =
2578 Arc::new(GraphScanExec::new_multi_label_vertex_scan(
2579 self.graph_ctx.clone(),
2580 native_labels,
2581 variable.to_string(),
2582 native_properties,
2583 None,
2584 ));
2585
2586 let joined = self.semi_join_on_vid(virtual_side, native_scan, variable)?;
2587 let plan = self.apply_scan_filter(joined, variable, filter, None)?;
2588 self.wrap_optional(plan, optional)
2589 }
2590
2591 fn build_virtual_union_scan(
2598 &self,
2599 virtual_labels: &[(String, u16)],
2600 variable: &str,
2601 filter: Option<&Expr>,
2602 all_properties: &HashMap<String, HashSet<String>>,
2603 ) -> Result<Arc<dyn ExecutionPlan>> {
2604 let pushdown_filters: Vec<DfExpr> = filter
2605 .map(|f| -> Result<Vec<_>> {
2606 let ctx = crate::query::df_expr::TranslationContext {
2607 parameters: self.params.clone(),
2608 outer_values: self.outer_values.clone(),
2609 ..Default::default()
2610 };
2611 let df = crate::query::df_expr::cypher_expr_to_df(f, Some(&ctx))?;
2612 Ok(vec![df])
2613 })
2614 .transpose()?
2615 .unwrap_or_default();
2616
2617 let mut scans: Vec<Arc<dyn ExecutionPlan>> = Vec::with_capacity(virtual_labels.len());
2618 for (label_name, label_id) in virtual_labels {
2619 let entry = self
2620 .plugin_registry
2621 .virtual_label_by_id(*label_id)
2622 .ok_or_else(|| {
2623 anyhow!(
2624 "Virtual label `{label_name}` (id {label_id:#x}) has no \
2625 registered CatalogTable; the originating CatalogProvider \
2626 may have been deregistered after the plan was cached"
2627 )
2628 })?;
2629 let properties = self.resolve_properties(variable, label_name, all_properties);
2630 let exec = crate::query::df_graph::catalog_scan::CatalogVertexScanExec::try_new(
2631 entry.table,
2632 *label_id,
2633 label_name.clone(),
2634 variable.to_string(),
2635 properties,
2636 pushdown_filters.clone(),
2637 None,
2638 )?;
2639 scans.push(Arc::new(exec));
2640 }
2641
2642 if scans.len() == 1 {
2643 Ok(scans.pop().expect("len == 1 implies non-empty"))
2644 } else {
2645 UnionExec::try_new(scans).map_err(|e| anyhow!("UnionExec construction failed: {e}"))
2646 }
2647 }
2648
2649 fn semi_join_on_vid(
2653 &self,
2654 left: Arc<dyn ExecutionPlan>,
2655 right: Arc<dyn ExecutionPlan>,
2656 variable: &str,
2657 ) -> Result<Arc<dyn ExecutionPlan>> {
2658 use datafusion::common::NullEquality;
2659 use datafusion::physical_plan::expressions::Column;
2660 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
2661
2662 let vid_col = format!("{variable}._vid");
2663 let left_idx = left
2664 .schema()
2665 .index_of(&vid_col)
2666 .map_err(|e| anyhow!("virtual scan output missing `{vid_col}`: {e}"))?;
2667 let right_idx = right
2668 .schema()
2669 .index_of(&vid_col)
2670 .map_err(|e| anyhow!("native scan output missing `{vid_col}`: {e}"))?;
2671 let on: Vec<(
2672 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2673 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2674 )> = vec![(
2675 Arc::new(Column::new(&vid_col, left_idx)),
2676 Arc::new(Column::new(&vid_col, right_idx)),
2677 )];
2678 let join = HashJoinExec::try_new(
2679 left,
2680 right,
2681 on,
2682 None,
2683 &JoinType::LeftSemi,
2684 None,
2685 PartitionMode::CollectLeft,
2686 NullEquality::NullEqualsNothing,
2687 false,
2688 )?;
2689 Ok(Arc::new(join))
2690 }
2691
2692 fn hydrate_virtual_target_from_catalog(
2709 &self,
2710 traverse_plan: Arc<dyn ExecutionPlan>,
2711 target_label_id: u16,
2712 target_variable: &str,
2713 all_properties: &HashMap<String, HashSet<String>>,
2714 ) -> Result<Arc<dyn ExecutionPlan>> {
2715 use datafusion::common::NullEquality;
2716 use datafusion::physical_expr::expressions::{Column, col as col_expr};
2717 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
2718
2719 let entry = self
2720 .plugin_registry
2721 .virtual_label_by_id(target_label_id)
2722 .ok_or_else(|| {
2723 anyhow!(
2724 "Virtual label id {target_label_id:#x} for target `{target_variable}` has no \
2725 registered CatalogTable; the originating CatalogProvider may have been \
2726 deregistered after the plan was cached"
2727 )
2728 })?;
2729 let label_name = entry.name.as_str();
2730 let properties = self.resolve_properties(target_variable, label_name, all_properties);
2731 let catalog_exec = crate::query::df_graph::catalog_scan::CatalogVertexScanExec::try_new(
2737 entry.table,
2738 target_label_id,
2739 label_name.to_string(),
2740 target_variable.to_string(),
2741 properties,
2742 Vec::new(),
2743 None,
2744 )?;
2745 let catalog_plan: Arc<dyn ExecutionPlan> = Arc::new(catalog_exec);
2746
2747 let vid_col_name = format!("{target_variable}._vid");
2748 let left_idx = traverse_plan
2749 .schema()
2750 .index_of(&vid_col_name)
2751 .map_err(|e| anyhow!("traverse plan missing `{vid_col_name}` for hydration: {e}"))?;
2752 let right_idx = catalog_plan
2753 .schema()
2754 .index_of(&vid_col_name)
2755 .map_err(|e| anyhow!("catalog scan missing `{vid_col_name}`: {e}"))?;
2756 let on: Vec<(
2757 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2758 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2759 )> = vec![(
2760 Arc::new(Column::new(&vid_col_name, left_idx)),
2761 Arc::new(Column::new(&vid_col_name, right_idx)),
2762 )];
2763 let join = HashJoinExec::try_new(
2764 traverse_plan,
2765 catalog_plan,
2766 on,
2767 None,
2768 &JoinType::Inner,
2769 None,
2770 PartitionMode::CollectLeft,
2771 NullEquality::NullEqualsNothing,
2772 false,
2773 )?;
2774 let join_plan: Arc<dyn ExecutionPlan> = Arc::new(join);
2775
2776 let join_schema = join_plan.schema();
2784 let mut projection_exprs: Vec<(Arc<dyn datafusion::physical_plan::PhysicalExpr>, String)> =
2785 Vec::with_capacity(join_schema.fields().len() - 1);
2786 let mut seen_vid = false;
2787 for field in join_schema.fields().iter() {
2788 if field.name() == &vid_col_name {
2789 if seen_vid {
2790 continue;
2791 }
2792 seen_vid = true;
2793 }
2794 let expr = col_expr(field.name(), &join_schema)
2795 .map_err(|e| anyhow!("hydrate_virtual_target_from_catalog projection: {e}"))?;
2796 projection_exprs.push((expr, field.name().clone()));
2797 }
2798 let projected = ProjectionExec::try_new(projection_exprs, join_plan)
2799 .map_err(|e| anyhow!("hydrate_virtual_target_from_catalog projection: {e}"))?;
2800 Ok(Arc::new(projected))
2801 }
2802
2803 #[expect(
2819 clippy::too_many_arguments,
2820 reason = "mirrors plan_traverse's argument set"
2821 )]
2822 fn plan_traverse_virtual_edge(
2823 &self,
2824 input_plan: Arc<dyn ExecutionPlan>,
2825 source_col: String,
2826 source_variable: &str,
2827 virtual_edge_type_id: u32,
2828 direction: AstDirection,
2829 target_variable: &str,
2830 target_label_id: u16,
2831 step_variable: Option<&str>,
2832 all_properties: &HashMap<String, HashSet<String>>,
2833 target_filter: Option<&Expr>,
2834 optional: bool,
2835 optional_pattern_vars: &HashSet<String>,
2836 ) -> Result<Arc<dyn ExecutionPlan>> {
2837 use datafusion::common::NullEquality;
2838 use datafusion::physical_expr::expressions::{Column, col as col_expr};
2839 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
2840
2841 let entry = self
2842 .plugin_registry
2843 .virtual_edge_type_by_id(virtual_edge_type_id)
2844 .ok_or_else(|| {
2845 anyhow!(
2846 "Virtual edge-type id {virtual_edge_type_id:#x} for `{target_variable}` has \
2847 no registered CatalogTable; the originating CatalogProvider may have been \
2848 deregistered after the plan was cached"
2849 )
2850 })?;
2851 let type_name = entry.name.as_str();
2852 let edge_var = step_variable
2853 .map(str::to_string)
2854 .unwrap_or_else(|| format!("__anon_edge_{target_variable}"));
2855
2856 let edge_properties: Vec<String> = step_variable
2857 .and_then(|sv| all_properties.get(sv))
2858 .map(|props| {
2859 props
2860 .iter()
2861 .filter(|p| !p.starts_with('_') && *p != "*")
2862 .cloned()
2863 .collect()
2864 })
2865 .unwrap_or_default();
2866
2867 let catalog_exec = crate::query::df_graph::catalog_scan::CatalogEdgeScanExec::try_new(
2868 entry.table,
2869 virtual_edge_type_id,
2870 type_name.to_string(),
2871 edge_var.clone(),
2872 edge_properties,
2873 Vec::new(),
2874 None,
2875 )?;
2876 let catalog_plan: Arc<dyn ExecutionPlan> = Arc::new(catalog_exec);
2877
2878 let edge_src_col = format!("{edge_var}._src_vid");
2879 let edge_dst_col = format!("{edge_var}._dst_vid");
2880 let (right_key, target_src_col) = match direction {
2881 AstDirection::Outgoing => (edge_src_col.clone(), edge_dst_col.clone()),
2882 AstDirection::Incoming => (edge_dst_col.clone(), edge_src_col.clone()),
2883 AstDirection::Both => (edge_src_col.clone(), edge_dst_col.clone()),
2884 };
2885
2886 let left_idx = input_plan
2887 .schema()
2888 .index_of(&source_col)
2889 .map_err(|e| anyhow!("input plan missing source vid column `{source_col}`: {e}"))?;
2890 let right_idx = catalog_plan
2891 .schema()
2892 .index_of(&right_key)
2893 .map_err(|e| anyhow!("CatalogEdgeScanExec missing `{right_key}`: {e}"))?;
2894 let on: Vec<(
2895 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2896 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
2897 )> = vec![(
2898 Arc::new(Column::new(&source_col, left_idx)),
2899 Arc::new(Column::new(&right_key, right_idx)),
2900 )];
2901 let join = HashJoinExec::try_new(
2902 input_plan,
2903 catalog_plan,
2904 on,
2905 None,
2906 &JoinType::Inner,
2907 None,
2908 PartitionMode::CollectLeft,
2909 NullEquality::NullEqualsNothing,
2910 false,
2911 )?;
2912 let join_plan: Arc<dyn ExecutionPlan> = Arc::new(join);
2913
2914 let join_schema = join_plan.schema();
2915 let target_vid_name = format!("{target_variable}._vid");
2916 let mut projection_exprs: Vec<(Arc<dyn datafusion::physical_plan::PhysicalExpr>, String)> =
2917 Vec::with_capacity(join_schema.fields().len());
2918 for field in join_schema.fields() {
2919 let name = field.name();
2920 if name == &right_key {
2921 continue;
2922 }
2923 let expr = col_expr(name, &join_schema)
2924 .map_err(|e| anyhow!("plan_traverse_virtual_edge projection: {e}"))?;
2925 let out_name = if name == &target_src_col {
2926 target_vid_name.clone()
2927 } else {
2928 name.clone()
2929 };
2930 projection_exprs.push((expr, out_name));
2931 }
2932 let projected: Arc<dyn ExecutionPlan> = Arc::new(
2933 ProjectionExec::try_new(projection_exprs, join_plan)
2934 .map_err(|e| anyhow!("plan_traverse_virtual_edge projection: {e}"))?,
2935 );
2936
2937 let mut plan = if uni_common::core::schema::is_virtual_label_id(target_label_id) {
2938 self.hydrate_virtual_target_from_catalog(
2939 projected,
2940 target_label_id,
2941 target_variable,
2942 all_properties,
2943 )?
2944 } else {
2945 projected
2946 };
2947
2948 plan = self.add_wildcard_structural_projection(plan, target_variable, all_properties)?;
2949 plan = self.maybe_add_edge_structural_projection(
2950 plan,
2951 step_variable,
2952 source_variable,
2953 target_variable,
2954 all_properties,
2955 false,
2956 )?;
2957
2958 if let Some(filter_expr) = target_filter {
2959 let mut variable_kinds = HashMap::new();
2960 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
2961 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
2962 if let Some(sv) = step_variable {
2963 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(false));
2964 }
2965 let ctx = TranslationContext {
2966 parameters: self.params.clone(),
2967 variable_kinds,
2968 ..Default::default()
2969 };
2970 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
2971 let schema = plan.schema();
2972 let session = self.session_ctx.read();
2973 let physical_filter =
2974 self.create_physical_filter_expr(&df_filter, &schema, &session)?;
2975 plan = if optional {
2976 Arc::new(OptionalFilterExec::new(
2977 plan,
2978 physical_filter,
2979 optional_pattern_vars.clone(),
2980 ))
2981 } else {
2982 Arc::new(FilterExec::try_new(physical_filter, plan)?)
2983 };
2984 } else {
2985 let _ = optional_pattern_vars;
2986 }
2987 Ok(plan)
2988 }
2989
2990 fn plan_scan_all(
2994 &self,
2995 variable: &str,
2996 filter: Option<&Expr>,
2997 optional: bool,
2998 all_properties: &HashMap<String, HashSet<String>>,
2999 ) -> Result<Arc<dyn ExecutionPlan>> {
3000 let (properties, need_full) = Self::resolve_schemaless_properties(variable, all_properties);
3001 let extracted_vids = Self::extract_vid_from_cypher_filter(filter, variable, &self.params);
3004 let scan_filter = extracted_vids
3005 .as_deref()
3006 .filter(|v| v.len() == 1)
3007 .map(|v| Self::build_vid_physical_filter(&format!("{variable}._vid"), v[0]));
3008 let mut scan_exec = GraphScanExec::new_schemaless_all_scan(
3009 self.graph_ctx.clone(),
3010 variable.to_string(),
3011 properties.clone(),
3012 scan_filter,
3013 );
3014 if let Some(vids) = extracted_vids
3015 && vids.len() > 1
3016 {
3017 scan_exec = scan_exec.with_vid_list_filter(vids);
3018 }
3019 let scan_plan: Arc<dyn ExecutionPlan> = Arc::new(scan_exec);
3020 self.finalize_schemaless_scan(
3021 scan_plan,
3022 variable,
3023 filter,
3024 optional,
3025 &properties,
3026 need_full,
3027 )
3028 }
3029
3030 #[expect(
3032 clippy::too_many_arguments,
3033 reason = "Graph traversal requires many parameters"
3034 )]
3035 fn plan_traverse(
3036 &self,
3037 input: &LogicalPlan,
3038 edge_type_ids: &[u32],
3039 direction: AstDirection,
3040 source_variable: &str,
3041 target_variable: &str,
3042 target_label_id: u16,
3043 step_variable: Option<&str>,
3044 min_hops: usize,
3045 max_hops: usize,
3046 path_variable: Option<&str>,
3047 optional: bool,
3048 target_filter: Option<&Expr>,
3049 is_variable_length: bool,
3050 optional_pattern_vars: &HashSet<String>,
3051 all_properties: &HashMap<String, HashSet<String>>,
3052 scope_match_variables: &HashSet<String>,
3053 edge_filter_expr: Option<&Expr>,
3054 path_mode: &crate::query::df_graph::nfa::PathMode,
3055 qpp_steps: Option<&[crate::query::planner::QppStepInfo]>,
3056 ) -> Result<Arc<dyn ExecutionPlan>> {
3057 let input_plan = self.plan_internal(input, all_properties)?;
3058
3059 let adj_direction = convert_direction(direction.clone());
3060 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
3061
3062 if !is_variable_length
3071 && !edge_type_ids.is_empty()
3072 && edge_type_ids.len() == 1
3073 && edge_type_ids
3074 .iter()
3075 .all(|eid| uni_common::core::edge_type::is_virtual_edge_type(*eid))
3076 {
3077 return self.plan_traverse_virtual_edge(
3078 input_plan,
3079 source_col,
3080 source_variable,
3081 edge_type_ids[0],
3082 direction,
3083 target_variable,
3084 target_label_id,
3085 step_variable,
3086 all_properties,
3087 target_filter,
3088 optional,
3089 optional_pattern_vars,
3090 );
3091 }
3092
3093 let traverse_plan: Arc<dyn ExecutionPlan> = if !is_variable_length {
3094 let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
3096 let has_wildcard = all_properties
3097 .get(edge_var)
3098 .is_some_and(|props| props.contains("*"));
3099 if has_wildcard {
3100 let mut schema_props: Vec<String> = edge_type_ids
3102 .iter()
3103 .filter_map(|eid| self.schema.edge_type_name_by_id(*eid))
3104 .flat_map(|name| {
3105 self.schema
3106 .properties
3107 .get(name)
3108 .map(|p| p.keys().cloned().collect::<Vec<_>>())
3109 .unwrap_or_default()
3110 })
3111 .collect();
3112
3113 if let Some(props) = all_properties.get(edge_var) {
3118 for p in props {
3119 let passthrough = !p.starts_with('_')
3120 || matches!(p.as_str(), "_created_at" | "_updated_at");
3121 if p != "*" && passthrough && !schema_props.contains(p) {
3122 schema_props.push(p.clone());
3123 }
3124 }
3125 }
3126 schema_props
3127 } else {
3128 all_properties
3129 .get(edge_var)
3130 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3131 .unwrap_or_default()
3132 }
3133 } else {
3134 Vec::new()
3135 };
3136
3137 if let Some(edge_var) = step_variable {
3139 let has_wildcard = all_properties
3140 .get(edge_var)
3141 .is_some_and(|props| props.contains("*"));
3142 let edge_type_props = self.merged_edge_type_properties(edge_type_ids);
3143 let has_overflow_edge_props = edge_properties.iter().any(|p| {
3144 p != "overflow_json"
3145 && !p.starts_with('_')
3146 && !edge_type_props.contains_key(p.as_str())
3147 });
3148 let needs_overflow =
3152 (has_wildcard && edge_properties.is_empty()) || has_overflow_edge_props;
3153 if needs_overflow && !edge_properties.contains(&"overflow_json".to_string()) {
3154 edge_properties.push("overflow_json".to_string());
3155 }
3156
3157 if has_wildcard && !edge_properties.contains(&"_all_props".to_string()) {
3161 edge_properties.push("_all_props".to_string());
3162 }
3163 }
3164
3165 let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
3167 let mut target_properties =
3168 self.resolve_properties(target_variable, target_label_name_str, all_properties);
3169
3170 target_properties.retain(|p| p != "*" && p != STRUCT_ONLY_SENTINEL);
3176
3177 let target_has_wildcard = all_properties
3180 .get(target_variable)
3181 .is_some_and(|p| p.contains("*"));
3182 if target_has_wildcard && target_properties.is_empty() {
3183 target_properties.push("_all_props".to_string());
3184 }
3185
3186 let target_label_props = if !target_label_name_str.is_empty() {
3190 self.schema.properties.get(target_label_name_str)
3191 } else {
3192 None
3193 };
3194 let has_non_schema_props = target_properties.iter().any(|p| {
3195 p != "overflow_json"
3196 && p != "_all_props"
3197 && !p.starts_with('_')
3198 && !target_label_props.is_some_and(|lp| lp.contains_key(p.as_str()))
3199 });
3200 if has_non_schema_props && !target_properties.iter().any(|p| p == "_all_props") {
3201 target_properties.push("_all_props".to_string());
3202 }
3203 if let Some(filter_expr) = target_filter {
3205 let filter_props = crate::query::df_expr::collect_properties(filter_expr);
3206 let has_overflow_filter = filter_props.iter().any(|(var, prop)| {
3207 var == target_variable
3208 && !prop.starts_with('_')
3209 && !target_label_props
3210 .is_some_and(|props| props.contains_key(prop.as_str()))
3211 });
3212 if has_overflow_filter && !target_properties.iter().any(|p| p == "_all_props") {
3213 target_properties.push("_all_props".to_string());
3214 }
3215 }
3216 if !target_label_name_str.is_empty()
3219 && has_non_schema_props
3220 && !target_properties.iter().any(|p| p == "overflow_json")
3221 {
3222 target_properties.push("overflow_json".to_string());
3223 }
3224
3225 let target_label_name = if target_label_name_str.is_empty() {
3227 None
3228 } else {
3229 Some(target_label_name_str.to_string())
3230 };
3231
3232 let bound_target_column =
3239 Self::detect_bound_target(&input_plan.schema(), target_variable);
3240
3241 let mut input_plan = input_plan;
3257 for rebound_var in [
3258 step_variable.and_then(|sv| sv.strip_prefix("__rebound_")),
3259 target_variable.strip_prefix("__rebound_"),
3260 ]
3261 .into_iter()
3262 .flatten()
3263 {
3264 if input_plan
3265 .schema()
3266 .field_with_name(rebound_var)
3267 .ok()
3268 .is_some_and(|f| {
3269 matches!(
3270 f.data_type(),
3271 datafusion::arrow::datatypes::DataType::Struct(_)
3272 )
3273 })
3274 {
3275 input_plan = Self::extract_all_struct_fields(input_plan, rebound_var)?;
3276 }
3277 }
3278
3279 let rebound_bound_edge_col = step_variable
3280 .and_then(|sv| sv.strip_prefix("__rebound_"))
3281 .map(|bound| format!("{}._eid", bound));
3282
3283 let used_edge_columns = Self::collect_used_edge_columns(
3284 &input_plan.schema(),
3285 scope_match_variables,
3286 rebound_bound_edge_col.as_deref(),
3287 );
3288
3289 Arc::new(GraphTraverseExec::new(
3290 input_plan,
3291 source_col,
3292 edge_type_ids.to_vec(),
3293 adj_direction,
3294 target_variable.to_string(),
3295 step_variable.map(|s| s.to_string()),
3296 edge_properties,
3297 target_properties,
3298 target_label_name,
3299 None, self.graph_ctx.clone(),
3301 optional,
3302 optional_pattern_vars.clone(),
3303 bound_target_column,
3304 used_edge_columns,
3305 ))
3306 } else {
3307 if edge_type_ids.is_empty() {
3309 if let (0, Some(path_var)) = (min_hops, path_variable) {
3312 return Ok(Arc::new(BindZeroLengthPathExec::new(
3313 input_plan,
3314 source_variable.to_string(),
3315 path_var.to_string(),
3316 self.graph_ctx.clone(),
3317 )));
3318 } else if min_hops == 0 && step_variable.is_none() {
3319 return Ok(input_plan);
3322 }
3323 }
3324 {
3325 let vlp_target_label_name_str =
3327 self.schema.label_name_by_id(target_label_id).unwrap_or("");
3328 let vlp_target_properties_raw = self.resolve_properties(
3329 target_variable,
3330 vlp_target_label_name_str,
3331 all_properties,
3332 );
3333 let target_has_wildcard = all_properties
3334 .get(target_variable)
3335 .is_some_and(|p| p.contains("*"));
3336 let vlp_target_label_props: Option<HashSet<String>> =
3337 if vlp_target_label_name_str.is_empty() {
3338 None
3339 } else {
3340 self.schema
3341 .properties
3342 .get(vlp_target_label_name_str)
3343 .map(|props| props.keys().cloned().collect())
3344 };
3345 let mut vlp_target_properties = sanitize_vlp_target_properties(
3346 vlp_target_properties_raw,
3347 target_has_wildcard,
3348 vlp_target_label_props.as_ref(),
3349 );
3350 let vlp_target_label_name = if vlp_target_label_name_str.is_empty() {
3351 None
3352 } else {
3353 Some(vlp_target_label_name_str.to_string())
3354 };
3355
3356 let bound_target_column =
3358 Self::detect_bound_target(&input_plan.schema(), target_variable);
3359 if bound_target_column.is_some() {
3360 vlp_target_properties.clear();
3363 }
3364
3365 let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
3367 let edge_var_name = step_variable.unwrap_or("__anon_edge");
3368 crate::query::pushdown::LanceFilterGenerator::generate(
3369 std::slice::from_ref(expr),
3370 edge_var_name,
3371 None,
3372 )
3373 });
3374
3375 let edge_property_conditions = edge_filter_expr
3377 .map(Self::extract_edge_property_conditions)
3378 .unwrap_or_default();
3379
3380 let used_edge_columns = Self::collect_used_edge_columns(
3382 &input_plan.schema(),
3383 scope_match_variables,
3384 None,
3385 );
3386
3387 let output_mode = if step_variable.is_some() {
3389 crate::query::df_graph::nfa::VlpOutputMode::StepVariable
3390 } else if path_variable.is_some() {
3391 crate::query::df_graph::nfa::VlpOutputMode::FullPath
3392 } else {
3393 crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
3394 };
3395
3396 let qpp_nfa = qpp_steps.map(|steps| {
3398 use crate::query::df_graph::nfa::{QppStep, VertexConstraint};
3399 let hops_per_iter = steps.len();
3400 let min_iter = min_hops / hops_per_iter;
3401 let max_iter = max_hops / hops_per_iter;
3402 let nfa_steps: Vec<QppStep> = steps
3403 .iter()
3404 .map(|s| QppStep {
3405 edge_type_ids: s.edge_type_ids.clone(),
3406 direction: convert_direction(s.direction.clone()),
3407 target_constraint: s
3408 .target_label
3409 .as_ref()
3410 .map(|l| VertexConstraint::Label(l.clone())),
3411 })
3412 .collect();
3413 crate::query::df_graph::nfa::PathNfa::from_qpp(nfa_steps, min_iter, max_iter)
3414 });
3415
3416 Arc::new(GraphVariableLengthTraverseExec::new(
3417 input_plan,
3418 source_col,
3419 edge_type_ids.to_vec(),
3420 adj_direction,
3421 min_hops,
3422 max_hops,
3423 target_variable.to_string(),
3424 step_variable.map(|s| s.to_string()),
3425 path_variable.map(|s| s.to_string()),
3426 vlp_target_properties,
3427 vlp_target_label_name,
3428 self.graph_ctx.clone(),
3429 optional,
3430 bound_target_column,
3431 edge_lance_filter,
3432 edge_property_conditions,
3433 used_edge_columns,
3434 path_mode.clone(),
3435 output_mode,
3436 qpp_nfa,
3437 ))
3438 }
3439 };
3440
3441 let mut traverse_plan = traverse_plan;
3443
3444 if uni_common::core::schema::is_virtual_label_id(target_label_id) {
3455 traverse_plan = self.hydrate_virtual_target_from_catalog(
3456 traverse_plan,
3457 target_label_id,
3458 target_variable,
3459 all_properties,
3460 )?;
3461 }
3462
3463 traverse_plan = self.add_wildcard_structural_projection(
3465 traverse_plan,
3466 target_variable,
3467 all_properties,
3468 )?;
3469
3470 traverse_plan = self.maybe_add_edge_structural_projection(
3473 traverse_plan,
3474 step_variable,
3475 source_variable,
3476 target_variable,
3477 all_properties,
3478 is_variable_length,
3479 )?;
3480
3481 if let Some(filter_expr) = target_filter {
3483 let mut variable_kinds = HashMap::new();
3485 variable_kinds.insert(source_variable.to_string(), VariableKind::Node);
3486 variable_kinds.insert(target_variable.to_string(), VariableKind::Node);
3487 if let Some(sv) = step_variable {
3488 variable_kinds.insert(sv.to_string(), VariableKind::edge_for(is_variable_length));
3489 }
3490 if let Some(pv) = path_variable {
3491 variable_kinds.insert(pv.to_string(), VariableKind::Path);
3492 }
3493 let mut variable_labels = HashMap::new();
3494 if let Some(sv) = step_variable
3495 && edge_type_ids.len() == 1
3496 && let Some(name) = self.schema.edge_type_name_by_id(edge_type_ids[0])
3497 {
3498 variable_labels.insert(sv.to_string(), name.to_string());
3499 }
3500 let target_label_name_str = self.schema.label_name_by_id(target_label_id).unwrap_or("");
3501 if !target_label_name_str.is_empty() {
3502 variable_labels.insert(
3503 target_variable.to_string(),
3504 target_label_name_str.to_string(),
3505 );
3506 }
3507 let ctx = TranslationContext {
3508 parameters: self.params.clone(),
3509 variable_labels,
3510 variable_kinds,
3511 ..Default::default()
3512 };
3513 let df_filter = cypher_expr_to_df(filter_expr, Some(&ctx))?;
3514 let schema = traverse_plan.schema();
3515 let session = self.session_ctx.read();
3516 let physical_filter =
3517 self.create_physical_filter_expr(&df_filter, &schema, &session)?;
3518
3519 if optional {
3520 Ok(Arc::new(OptionalFilterExec::new(
3521 traverse_plan,
3522 physical_filter,
3523 optional_pattern_vars.clone(),
3524 )))
3525 } else {
3526 Ok(Arc::new(FilterExec::try_new(
3527 physical_filter,
3528 traverse_plan,
3529 )?))
3530 }
3531 } else {
3532 Ok(traverse_plan)
3533 }
3534 }
3535
3536 #[expect(clippy::too_many_arguments)]
3541 fn plan_traverse_main_by_type(
3542 &self,
3543 input: &LogicalPlan,
3544 type_names: &[String],
3545 direction: AstDirection,
3546 source_variable: &str,
3547 target_variable: &str,
3548 step_variable: Option<&str>,
3549 optional: bool,
3550 optional_pattern_vars: &HashSet<String>,
3551 all_properties: &HashMap<String, HashSet<String>>,
3552 scope_match_variables: &HashSet<String>,
3553 ) -> Result<Arc<dyn ExecutionPlan>> {
3554 let input_plan = self.plan_internal(input, all_properties)?;
3555
3556 let adj_direction = convert_direction(direction);
3557 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
3558
3559 let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
3561
3562 let mut edge_properties: Vec<String> = if let Some(edge_var) = step_variable {
3564 all_properties
3565 .get(edge_var)
3566 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3567 .unwrap_or_default()
3568 } else {
3569 Vec::new()
3570 };
3571
3572 if let Some(edge_var) = step_variable
3574 && all_properties
3575 .get(edge_var)
3576 .is_some_and(|props| props.contains("*"))
3577 && !edge_properties.iter().any(|p| p == "_all_props")
3578 {
3579 edge_properties.push("_all_props".to_string());
3580 }
3581
3582 let mut target_properties: Vec<String> = all_properties
3584 .get(target_variable)
3585 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3586 .unwrap_or_default();
3587
3588 let target_has_wildcard = all_properties
3592 .get(target_variable)
3593 .is_some_and(|p| p.contains("*"));
3594 if (target_has_wildcard || !target_properties.is_empty())
3595 && !target_properties.iter().any(|p| p == "_all_props")
3596 {
3597 target_properties.push("_all_props".to_string());
3598 }
3599 if bound_target_column.is_some() {
3600 target_properties.clear();
3602 }
3603
3604 let rebound_bound_edge_col = step_variable
3607 .and_then(|sv| sv.strip_prefix("__rebound_"))
3608 .map(|bound| format!("{}._eid", bound));
3609 let used_edge_columns = Self::collect_used_edge_columns(
3610 &input_plan.schema(),
3611 scope_match_variables,
3612 rebound_bound_edge_col.as_deref(),
3613 );
3614
3615 let traverse_plan: Arc<dyn ExecutionPlan> = Arc::new(GraphTraverseMainExec::new(
3617 input_plan,
3618 source_col,
3619 type_names.to_vec(),
3620 adj_direction,
3621 target_variable.to_string(),
3622 step_variable.map(|s| s.to_string()),
3623 edge_properties.clone(),
3624 target_properties,
3625 self.graph_ctx.clone(),
3626 optional,
3627 optional_pattern_vars.clone(),
3628 bound_target_column,
3629 used_edge_columns,
3630 ));
3631
3632 let mut result_plan = traverse_plan;
3633
3634 result_plan =
3636 self.add_wildcard_structural_projection(result_plan, target_variable, all_properties)?;
3637
3638 result_plan = self.maybe_add_edge_structural_projection(
3640 result_plan,
3641 step_variable,
3642 source_variable,
3643 target_variable,
3644 all_properties,
3645 false, )?;
3647
3648 Ok(result_plan)
3649 }
3650
3651 #[expect(clippy::too_many_arguments)]
3656 fn plan_traverse_main_by_type_vlp(
3657 &self,
3658 input: &LogicalPlan,
3659 type_names: &[String],
3660 direction: AstDirection,
3661 source_variable: &str,
3662 target_variable: &str,
3663 step_variable: Option<&str>,
3664 min_hops: usize,
3665 max_hops: usize,
3666 path_variable: Option<&str>,
3667 optional: bool,
3668 all_properties: &HashMap<String, HashSet<String>>,
3669 edge_filter_expr: Option<&Expr>,
3670 path_mode: &crate::query::df_graph::nfa::PathMode,
3671 scope_match_variables: &HashSet<String>,
3672 ) -> Result<Arc<dyn ExecutionPlan>> {
3673 let input_plan = self.plan_internal(input, all_properties)?;
3674
3675 let adj_direction = convert_direction(direction);
3676 let (input_plan, source_col) = Self::resolve_source_vid_col(input_plan, source_variable)?;
3677
3678 let bound_target_column = Self::detect_bound_target(&input_plan.schema(), target_variable);
3680
3681 let mut target_properties: Vec<String> = all_properties
3683 .get(target_variable)
3684 .map(|props| props.iter().filter(|p| *p != "*").cloned().collect())
3685 .unwrap_or_default();
3686
3687 let target_has_wildcard = all_properties
3691 .get(target_variable)
3692 .is_some_and(|p| p.contains("*"));
3693 if (target_has_wildcard || !target_properties.is_empty())
3694 && !target_properties.iter().any(|p| p == "_all_props")
3695 {
3696 target_properties.push("_all_props".to_string());
3697 }
3698 if bound_target_column.is_some() {
3699 target_properties.clear();
3701 }
3702
3703 let edge_lance_filter: Option<String> = edge_filter_expr.and_then(|expr| {
3705 let edge_var_name = step_variable.unwrap_or("__anon_edge");
3706 crate::query::pushdown::LanceFilterGenerator::generate(
3707 std::slice::from_ref(expr),
3708 edge_var_name,
3709 None,
3710 )
3711 });
3712
3713 let edge_property_conditions = edge_filter_expr
3715 .map(Self::extract_edge_property_conditions)
3716 .unwrap_or_default();
3717
3718 let used_edge_columns =
3720 Self::collect_used_edge_columns(&input_plan.schema(), scope_match_variables, None);
3721
3722 let output_mode = if step_variable.is_some() {
3724 crate::query::df_graph::nfa::VlpOutputMode::StepVariable
3725 } else if path_variable.is_some() {
3726 crate::query::df_graph::nfa::VlpOutputMode::FullPath
3727 } else {
3728 crate::query::df_graph::nfa::VlpOutputMode::EndpointsOnly
3729 };
3730
3731 let traverse_plan = Arc::new(GraphVariableLengthTraverseMainExec::new(
3732 input_plan,
3733 source_col,
3734 type_names.to_vec(),
3735 adj_direction,
3736 min_hops,
3737 max_hops,
3738 target_variable.to_string(),
3739 step_variable.map(|s| s.to_string()),
3740 path_variable.map(|s| s.to_string()),
3741 target_properties,
3742 self.graph_ctx.clone(),
3743 optional,
3744 bound_target_column,
3745 edge_lance_filter,
3746 edge_property_conditions,
3747 used_edge_columns,
3748 path_mode.clone(),
3749 output_mode,
3750 ));
3751
3752 Ok(traverse_plan)
3753 }
3754
3755 #[expect(clippy::too_many_arguments)]
3757 fn plan_shortest_path(
3758 &self,
3759 input: &LogicalPlan,
3760 edge_type_ids: &[u32],
3761 direction: AstDirection,
3762 source_variable: &str,
3763 target_variable: &str,
3764 path_variable: &str,
3765 all_shortest: bool,
3766 all_properties: &HashMap<String, HashSet<String>>,
3767 ) -> Result<Arc<dyn ExecutionPlan>> {
3768 let input_plan = self.plan_internal(input, all_properties)?;
3769
3770 let adj_direction = convert_direction(direction);
3771 let source_col = format!("{}._vid", source_variable);
3772 let target_col = format!("{}._vid", target_variable);
3773
3774 Ok(Arc::new(GraphShortestPathExec::new(
3775 input_plan,
3776 source_col,
3777 target_col,
3778 edge_type_ids.to_vec(),
3779 adj_direction,
3780 path_variable.to_string(),
3781 self.graph_ctx.clone(),
3782 all_shortest,
3783 )))
3784 }
3785
3786 fn plan_filter(
3791 &self,
3792 input: &LogicalPlan,
3793 predicate: &Expr,
3794 optional_variables: &HashSet<String>,
3795 all_properties: &HashMap<String, HashSet<String>>,
3796 ) -> Result<Arc<dyn ExecutionPlan>> {
3797 if let LogicalPlan::CrossJoin { left, right } = input
3804 && let Some(plan) = self.try_plan_cross_join_as_hash_join(
3805 left,
3806 right,
3807 predicate,
3808 optional_variables,
3809 all_properties,
3810 )?
3811 {
3812 return Ok(plan);
3813 }
3814
3815 let input_plan = self.plan_internal(input, all_properties)?;
3816 let schema = input_plan.schema();
3817
3818 let ctx = self.translation_context_for_plan(input);
3821 let session = self.session_ctx.read();
3822 let state = session.state();
3823 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
3824 &state,
3825 Some(&ctx),
3826 )
3827 .with_subquery_ctx(
3828 self.graph_ctx.clone(),
3829 self.schema.clone(),
3830 self.session_ctx.clone(),
3831 self.storage.clone(),
3832 self.params.clone(),
3833 self.outer_entity_vars.clone(),
3834 );
3835 let physical_predicate = compiler.compile(predicate, &schema)?;
3836
3837 if !optional_variables.is_empty() {
3839 return Ok(Arc::new(OptionalFilterExec::new(
3840 input_plan,
3841 physical_predicate,
3842 optional_variables.clone(),
3843 )));
3844 }
3845
3846 Ok(Arc::new(FilterExec::try_new(
3847 physical_predicate,
3848 input_plan,
3849 )?))
3850 }
3851
3852 fn try_plan_cross_join_as_hash_join(
3861 &self,
3862 left: &LogicalPlan,
3863 right: &LogicalPlan,
3864 predicate: &Expr,
3865 optional_variables: &HashSet<String>,
3866 all_properties: &HashMap<String, HashSet<String>>,
3867 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
3868 use datafusion::common::NullEquality;
3869 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
3870
3871 let left_vars = collect_plan_variables(left);
3872 let right_vars = collect_plan_variables(right);
3873 let cls = classify_join_predicate(predicate, &left_vars, &right_vars);
3874
3875 if cls.equi_pairs.is_empty() {
3876 return Ok(None);
3877 }
3878
3879 let left_optional: HashSet<&String> = optional_variables
3891 .iter()
3892 .filter(|v| left_vars.contains(*v))
3893 .collect();
3894 let right_optional: HashSet<&String> = optional_variables
3895 .iter()
3896 .filter(|v| right_vars.contains(*v))
3897 .collect();
3898
3899 let join_type = match (left_optional.is_empty(), right_optional.is_empty()) {
3900 (true, true) => JoinType::Inner,
3901 (true, false) => JoinType::Left,
3902 (false, true) => JoinType::Right,
3903 (false, false) => return Ok(None), };
3905
3906 if !matches!(join_type, JoinType::Inner)
3909 && (!cls.left_only.is_empty() || !cls.right_only.is_empty() || cls.residual.is_some())
3910 {
3911 return Ok(None);
3912 }
3913
3914 tracing::debug!(
3927 target: "uni_query::cross_join_in_pushdown",
3928 equi_pairs = cls.equi_pairs.len(),
3929 left_only = cls.left_only.len(),
3930 right_only = cls.right_only.len(),
3931 has_residual = cls.residual.is_some(),
3932 "try_plan_cross_join_as_hash_join: classified predicate"
3933 );
3934
3935 let left_filters: Vec<Expr> = cls.left_only.clone();
3936 let right_filters: Vec<Expr> = cls.right_only.clone();
3937 let left_with_filter = wrap_with_filter(left.clone(), &left_filters);
3938 let right_with_filter = wrap_with_filter(right.clone(), &right_filters);
3939 let left_plan = self.plan_internal(&left_with_filter, all_properties)?;
3940 let right_plan = self.plan_internal(&right_with_filter, all_properties)?;
3941
3942 let left_schema = left_plan.schema();
3946 let right_schema = right_plan.schema();
3947 let left_ctx = self.translation_context_for_plan(&left_with_filter);
3948 let right_ctx = self.translation_context_for_plan(&right_with_filter);
3949
3950 let on: Vec<(
3954 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3955 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3956 )> = {
3957 let session = self.session_ctx.read();
3958 let state = session.state();
3959
3960 let left_compiler =
3961 crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
3962 &state,
3963 Some(&left_ctx),
3964 )
3965 .with_subquery_ctx(
3966 self.graph_ctx.clone(),
3967 self.schema.clone(),
3968 self.session_ctx.clone(),
3969 self.storage.clone(),
3970 self.params.clone(),
3971 self.outer_entity_vars.clone(),
3972 );
3973 let right_compiler =
3974 crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
3975 &state,
3976 Some(&right_ctx),
3977 )
3978 .with_subquery_ctx(
3979 self.graph_ctx.clone(),
3980 self.schema.clone(),
3981 self.session_ctx.clone(),
3982 self.storage.clone(),
3983 self.params.clone(),
3984 self.outer_entity_vars.clone(),
3985 );
3986
3987 let mut pairs: Vec<(
3988 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3989 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
3990 )> = Vec::with_capacity(cls.equi_pairs.len());
3991
3992 for (l_expr, r_expr) in &cls.equi_pairs {
3993 let l_phys = left_compiler.compile(l_expr, &left_schema)?;
3994 let r_phys = right_compiler.compile(r_expr, &right_schema)?;
3995 let Some((l_key, r_key)) =
3996 unify_join_key_types(l_phys, r_phys, &left_schema, &right_schema, &state)
3997 else {
3998 return Ok(None);
3999 };
4000 pairs.push((l_key, r_key));
4001 }
4002 pairs
4003 };
4004
4005 if matches!(join_type, JoinType::Inner | JoinType::Left)
4013 && cls.residual.is_none()
4014 && let Some(plan) = self.try_emit_vid_lookup_join(
4015 &cls.equi_pairs,
4016 join_type,
4017 &left_plan,
4018 &right_plan,
4019 &left_with_filter,
4020 &right_with_filter,
4021 )?
4022 {
4023 return Ok(Some(plan));
4024 }
4025
4026 let join: Arc<dyn ExecutionPlan> = Arc::new(HashJoinExec::try_new(
4027 left_plan,
4028 right_plan,
4029 on,
4030 None,
4031 &join_type,
4032 None,
4033 PartitionMode::CollectLeft,
4034 NullEquality::NullEqualsNothing,
4035 false,
4036 )?);
4037
4038 if let Some(residual) = cls.residual {
4041 let join_schema = join.schema();
4042 let crossjoin_for_ctx = LogicalPlan::CrossJoin {
4043 left: Box::new(left_with_filter.clone()),
4044 right: Box::new(right_with_filter.clone()),
4045 };
4046 let merged_ctx = self.translation_context_for_plan(&crossjoin_for_ctx);
4047 let session = self.session_ctx.read();
4048 let state = session.state();
4049 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4050 &state,
4051 Some(&merged_ctx),
4052 )
4053 .with_subquery_ctx(
4054 self.graph_ctx.clone(),
4055 self.schema.clone(),
4056 self.session_ctx.clone(),
4057 self.storage.clone(),
4058 self.params.clone(),
4059 self.outer_entity_vars.clone(),
4060 );
4061 let physical_residual = compiler.compile(&residual, &join_schema)?;
4062 return Ok(Some(Arc::new(FilterExec::try_new(
4063 physical_residual,
4064 join,
4065 )?)));
4066 }
4067
4068 Ok(Some(join))
4069 }
4070
4071 fn try_emit_vid_lookup_join(
4087 &self,
4088 equi_pairs: &[(Expr, Expr)],
4089 join_type: JoinType,
4090 left_plan: &Arc<dyn ExecutionPlan>,
4091 right_plan: &Arc<dyn ExecutionPlan>,
4092 left_logical: &LogicalPlan,
4093 right_logical: &LogicalPlan,
4094 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
4095 use crate::query::df_graph::scan::GraphScanExec;
4096 use crate::query::df_graph::vid_lookup_join::{
4097 EquiPair, ProbeSide, VidJoinKind, VidLookupJoinExec,
4098 };
4099 use datafusion::physical_expr::expressions::Column;
4100
4101 if equi_pairs.is_empty() {
4102 return Ok(None);
4103 }
4104
4105 let mut anchor_idx: Option<(usize, ProbeSide)> = None;
4111 for (i, (l_expr, r_expr)) in equi_pairs.iter().enumerate() {
4112 if expr_is_vid_property(l_expr) {
4113 anchor_idx = Some((i, ProbeSide::Left));
4114 break;
4115 }
4116 if expr_is_vid_property(r_expr) {
4117 anchor_idx = Some((i, ProbeSide::Right));
4118 break;
4119 }
4120 }
4121 let Some((anchor_pair_idx, probe_side)) = anchor_idx else {
4122 return Ok(None);
4123 };
4124
4125 let probe_plan = match probe_side {
4126 ProbeSide::Left => left_plan,
4127 ProbeSide::Right => right_plan,
4128 };
4129 let build_plan = match probe_side {
4130 ProbeSide::Left => right_plan,
4131 ProbeSide::Right => left_plan,
4132 };
4133 let build_logical = match probe_side {
4134 ProbeSide::Left => right_logical,
4135 ProbeSide::Right => left_logical,
4136 };
4137
4138 if probe_plan
4149 .as_any()
4150 .downcast_ref::<GraphScanExec>()
4151 .is_none()
4152 {
4153 return Ok(None);
4154 }
4155
4156 let left_schema = left_plan.schema();
4160 let right_schema = right_plan.schema();
4161 let left_ctx = self.translation_context_for_plan(left_logical);
4162 let right_ctx = self.translation_context_for_plan(right_logical);
4163 let _ = build_logical; let session = self.session_ctx.read();
4166 let state = session.state();
4167 let left_compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4168 &state,
4169 Some(&left_ctx),
4170 )
4171 .with_subquery_ctx(
4172 self.graph_ctx.clone(),
4173 self.schema.clone(),
4174 self.session_ctx.clone(),
4175 self.storage.clone(),
4176 self.params.clone(),
4177 self.outer_entity_vars.clone(),
4178 );
4179 let right_compiler =
4180 crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4181 &state,
4182 Some(&right_ctx),
4183 )
4184 .with_subquery_ctx(
4185 self.graph_ctx.clone(),
4186 self.schema.clone(),
4187 self.session_ctx.clone(),
4188 self.storage.clone(),
4189 self.params.clone(),
4190 self.outer_entity_vars.clone(),
4191 );
4192
4193 let mut compiled: Vec<EquiPair> = Vec::with_capacity(equi_pairs.len());
4194 for (l_expr, r_expr) in equi_pairs {
4195 let l_phys = left_compiler.compile(l_expr, &left_schema)?;
4196 let r_phys = right_compiler.compile(r_expr, &right_schema)?;
4197 let (Some(l_col), Some(r_col)) = (
4198 l_phys.as_any().downcast_ref::<Column>(),
4199 r_phys.as_any().downcast_ref::<Column>(),
4200 ) else {
4201 return Ok(None);
4203 };
4204 compiled.push(EquiPair {
4205 left_col_idx: l_col.index(),
4206 right_col_idx: r_col.index(),
4207 });
4208 }
4209
4210 let anchor = compiled[anchor_pair_idx];
4212 let anchor_build_idx = match probe_side {
4213 ProbeSide::Left => anchor.right_col_idx,
4214 ProbeSide::Right => anchor.left_col_idx,
4215 };
4216 let build_schema = build_plan.schema();
4217 if !matches!(
4218 build_schema.field(anchor_build_idx).data_type(),
4219 datafusion::arrow::datatypes::DataType::UInt64
4220 ) {
4221 return Ok(None);
4222 }
4223
4224 if anchor_pair_idx != 0 {
4226 compiled.swap(0, anchor_pair_idx);
4227 }
4228
4229 let join_kind = match join_type {
4233 JoinType::Inner => VidJoinKind::Inner,
4234 JoinType::Left => VidJoinKind::Left,
4235 _ => return Ok(None),
4236 };
4237
4238 drop(session);
4239
4240 Ok(Some(Arc::new(VidLookupJoinExec::try_new(
4241 left_plan.clone(),
4242 right_plan.clone(),
4243 probe_side,
4244 compiled,
4245 join_kind,
4246 )?)))
4247 }
4248
4249 fn plan_project_with_aliases(
4251 &self,
4252 input: &LogicalPlan,
4253 projections: &[(Expr, Option<String>)],
4254 all_properties: &HashMap<String, HashSet<String>>,
4255 alias_map: &HashMap<String, Expr>,
4256 ) -> Result<Arc<dyn ExecutionPlan>> {
4257 let input_plan = self.plan_internal_with_aliases(input, all_properties, alias_map)?;
4259 self.plan_project_from_input(input_plan, projections, Some(input))
4260 }
4261
4262 fn plan_project_from_input(
4264 &self,
4265 input_plan: Arc<dyn ExecutionPlan>,
4266 projections: &[(Expr, Option<String>)],
4267 context_plan: Option<&LogicalPlan>,
4268 ) -> Result<Arc<dyn ExecutionPlan>> {
4269 let schema = input_plan.schema();
4270
4271 let session = self.session_ctx.read();
4272 let state = session.state();
4273
4274 let ctx = context_plan.map(|p| self.translation_context_for_plan(p));
4276
4277 let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
4278
4279 for (expr, alias) in projections {
4280 if let Expr::Variable(var_name) = expr {
4286 if schema.column_with_name(var_name).is_some() {
4287 let (col_idx, _) = schema.column_with_name(var_name).unwrap();
4288 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4289 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
4290 );
4291 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4292 exprs.push((col_expr, name));
4293
4294 let vid_col = format!("{}._vid", var_name);
4296 let labels_col = format!("{}._labels", var_name);
4297 if let Some((vi, _)) = schema.column_with_name(&vid_col) {
4298 let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4299 datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
4300 );
4301 exprs.push((ve, vid_col.clone()));
4302 }
4303 if let Some((li, _)) = schema.column_with_name(&labels_col) {
4304 let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4305 datafusion::physical_expr::expressions::Column::new(&labels_col, li),
4306 );
4307 exprs.push((le, labels_col.clone()));
4308 }
4309
4310 let prefix = format!("{}.", var_name);
4313 for (idx, field) in schema.fields().iter().enumerate() {
4314 let fname = field.name();
4315 if fname.starts_with(&prefix)
4316 && fname != &vid_col
4317 && fname != &labels_col
4318 && !exprs.iter().any(|(_, n)| n == fname)
4319 {
4320 let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4321 Arc::new(datafusion::physical_expr::expressions::Column::new(
4322 fname, idx,
4323 ));
4324 exprs.push((prop_expr, fname.clone()));
4325 }
4326 }
4327 continue;
4328 }
4329
4330 let prefix = format!("{}.", var_name);
4333 let expanded_fields: Vec<(usize, String)> = schema
4334 .fields()
4335 .iter()
4336 .enumerate()
4337 .filter(|(_, f)| f.name().starts_with(&prefix))
4338 .map(|(i, f)| (i, f.name().clone()))
4339 .collect();
4340
4341 if !expanded_fields.is_empty() {
4342 use datafusion::functions::expr_fn::named_struct;
4343 use datafusion::logical_expr::lit;
4344
4345 let mut struct_args = Vec::new();
4347 for (_, field_name) in &expanded_fields {
4348 let prop_name = &field_name[prefix.len()..];
4349 struct_args.push(lit(prop_name.to_string()));
4350 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
4352 field_name.as_str(),
4353 )));
4354 }
4355
4356 let struct_expr = named_struct(struct_args);
4357 let df_schema =
4358 datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4359 let session = self.session_ctx.read();
4360 let state_ref = session.state();
4361 let resolved_expr = Self::resolve_udfs(&struct_expr, &state_ref)?;
4362
4363 use datafusion::physical_planner::PhysicalPlanner;
4364 let phys_planner =
4365 datafusion::physical_planner::DefaultPhysicalPlanner::default();
4366 let physical_struct_expr = phys_planner.create_physical_expr(
4367 &resolved_expr,
4368 &df_schema,
4369 &state_ref,
4370 )?;
4371
4372 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4373 exprs.push((physical_struct_expr, name));
4374
4375 let vid_col = format!("{}._vid", var_name);
4377 let labels_col = format!("{}._labels", var_name);
4378 if let Some((vi, _)) = schema.column_with_name(&vid_col) {
4379 let ve: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4380 datafusion::physical_expr::expressions::Column::new(&vid_col, vi),
4381 );
4382 exprs.push((ve, vid_col.clone()));
4383 }
4384 if let Some((li, _)) = schema.column_with_name(&labels_col) {
4385 let le: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4386 datafusion::physical_expr::expressions::Column::new(&labels_col, li),
4387 );
4388 exprs.push((le, labels_col.clone()));
4389 }
4390
4391 for (idx, field) in schema.fields().iter().enumerate() {
4394 let fname = field.name();
4395 if fname.starts_with(&prefix)
4396 && fname != &vid_col
4397 && fname != &labels_col
4398 && !exprs.iter().any(|(_, n)| n == fname)
4399 {
4400 let prop_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4401 Arc::new(datafusion::physical_expr::expressions::Column::new(
4402 fname, idx,
4403 ));
4404 exprs.push((prop_expr, fname.clone()));
4405 }
4406 }
4407 continue;
4408 }
4409 }
4411
4412 if matches!(expr, Expr::Wildcard) {
4414 for (col_idx, field) in schema.fields().iter().enumerate() {
4415 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4416 datafusion::physical_expr::expressions::Column::new(field.name(), col_idx),
4417 );
4418 exprs.push((col_expr, field.name().clone()));
4419 }
4420 continue;
4421 }
4422
4423 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4424 &state,
4425 ctx.as_ref(),
4426 )
4427 .with_subquery_ctx(
4428 self.graph_ctx.clone(),
4429 self.schema.clone(),
4430 self.session_ctx.clone(),
4431 self.storage.clone(),
4432 self.params.clone(),
4433 self.outer_entity_vars.clone(),
4434 );
4435 let physical_expr = compiler.compile(expr, &schema)?;
4436
4437 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4438 exprs.push((physical_expr, name));
4439 }
4440
4441 Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
4442 }
4443
4444 fn plan_locy_project(
4450 &self,
4451 input: &LogicalPlan,
4452 projections: &[(Expr, Option<String>)],
4453 target_types: &[DataType],
4454 all_properties: &HashMap<String, HashSet<String>>,
4455 ) -> Result<Arc<dyn ExecutionPlan>> {
4456 use datafusion::physical_expr::expressions::Column;
4457
4458 let input_plan = self.plan_internal(input, all_properties)?;
4459 let schema = input_plan.schema();
4460
4461 let session = self.session_ctx.read();
4462 let state = session.state();
4463
4464 let ctx = self.translation_context_for_plan(input);
4465
4466 let mut exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = Vec::new();
4467
4468 for (i, (expr, alias)) in projections.iter().enumerate() {
4469 let target_type = target_types.get(i);
4470
4471 if let Expr::Variable(var_name) = expr {
4473 let vid_col_name = format!("{}._vid", var_name);
4475 let vid_col_match = schema
4476 .fields()
4477 .iter()
4478 .enumerate()
4479 .find(|(_, f)| f.name() == &vid_col_name);
4480
4481 if let Some((vid_idx, _)) = vid_col_match {
4482 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4484 Arc::new(Column::new(&vid_col_name, vid_idx));
4485 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4486 exprs.push((col_expr, name));
4487 continue;
4488 }
4489
4490 if let Some((col_idx, _)) = schema.column_with_name(var_name) {
4492 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
4493 Arc::new(Column::new(var_name, col_idx));
4494 let name = alias.clone().unwrap_or_else(|| var_name.clone());
4495 exprs.push((col_expr, name));
4496 continue;
4497 }
4498 }
4500
4501 let compiler = crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler::new(
4503 &state,
4504 Some(&ctx),
4505 )
4506 .with_subquery_ctx(
4507 self.graph_ctx.clone(),
4508 self.schema.clone(),
4509 self.session_ctx.clone(),
4510 self.storage.clone(),
4511 self.params.clone(),
4512 self.outer_entity_vars.clone(),
4513 );
4514 let physical_expr = compiler.compile(expr, &schema)?;
4515
4516 let physical_expr = if let Some(target_dt) = target_type {
4521 let actual_dt = physical_expr
4522 .data_type(schema.as_ref())
4523 .unwrap_or(DataType::LargeUtf8);
4524 let is_string = |dt: &DataType| matches!(dt, DataType::Utf8 | DataType::LargeUtf8);
4525 let is_numeric = |dt: &DataType| {
4526 matches!(dt, DataType::Int64 | DataType::Float64 | DataType::UInt64)
4527 };
4528 let cross_domain = (is_string(&actual_dt) && is_numeric(target_dt))
4529 || (is_numeric(&actual_dt) && is_string(target_dt));
4530 if actual_dt != *target_dt && !cross_domain {
4531 coerce_physical_expr(physical_expr, &actual_dt, target_dt, schema.as_ref())
4532 } else {
4533 physical_expr
4534 }
4535 } else {
4536 physical_expr
4537 };
4538
4539 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
4540 exprs.push((physical_expr, name));
4541 }
4542
4543 Ok(Arc::new(ProjectionExec::try_new(exprs, input_plan)?))
4544 }
4545
4546 fn plan_aggregate(
4548 &self,
4549 input: &LogicalPlan,
4550 group_by: &[Expr],
4551 aggregates: &[Expr],
4552 all_properties: &HashMap<String, HashSet<String>>,
4553 ) -> Result<Arc<dyn ExecutionPlan>> {
4554 let input_plan = self.plan_internal(input, all_properties)?;
4555 let schema = input_plan.schema();
4556
4557 let session = self.session_ctx.read();
4558 let state = session.state();
4559
4560 let ctx = self.translation_context_for_plan(input);
4562
4563 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
4565 let mut group_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4566 Vec::new();
4567 for expr in group_by {
4568 let name = expr.to_string_repr();
4569
4570 if let Expr::Variable(var_name) = expr
4575 && schema.column_with_name(var_name).is_none()
4576 {
4577 let prefix = format!("{}.", var_name);
4578 let has_expanded = schema
4579 .fields()
4580 .iter()
4581 .any(|f| f.name().starts_with(&prefix));
4582 if has_expanded {
4583 continue;
4584 }
4585 }
4586
4587 let physical_expr = if CypherPhysicalExprCompiler::contains_custom_expr(expr) {
4588 let compiler = CypherPhysicalExprCompiler::new(&state, Some(&ctx))
4591 .with_subquery_ctx(
4592 self.graph_ctx.clone(),
4593 self.schema.clone(),
4594 self.session_ctx.clone(),
4595 self.storage.clone(),
4596 self.params.clone(),
4597 self.outer_entity_vars.clone(),
4598 );
4599 compiler.compile(expr, &schema)?
4600 } else {
4601 let df_schema_ref =
4604 datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4605 let df_expr = cypher_expr_to_df(expr, Some(&ctx))?;
4606 let df_expr = Self::resolve_udfs(&df_expr, &state)?;
4607 let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema_ref)?;
4608 let mut df_expr = Self::resolve_udfs(&df_expr, &state)?;
4609 if let Ok(expr_type) = df_expr.get_type(&df_schema_ref) {
4610 if uni_common::core::schema::is_datetime_struct(&expr_type) {
4611 df_expr = crate::query::df_expr::extract_datetime_nanos(df_expr);
4613 } else if uni_common::core::schema::is_time_struct(&expr_type) {
4614 df_expr = crate::query::df_expr::extract_time_nanos(df_expr);
4617 }
4618 }
4619
4620 create_physical_expr(&df_expr, &df_schema_ref, state.execution_props())?
4622 };
4623 group_exprs.push((physical_expr, name));
4624 }
4625
4626 for expr in group_by {
4632 if let Expr::Variable(var_name) = expr
4633 && matches!(
4634 ctx.variable_kinds.get(var_name),
4635 Some(VariableKind::Node) | Some(VariableKind::Edge)
4636 )
4637 {
4638 let prefix = format!("{}.", var_name);
4639 for (idx, field) in schema.fields().iter().enumerate() {
4640 if field.name().starts_with(&prefix) {
4641 let prop_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4642 datafusion::physical_expr::expressions::Column::new(field.name(), idx),
4643 );
4644 group_exprs.push((prop_col, field.name().clone()));
4645 }
4646 }
4647 }
4648 }
4649
4650 let physical_group_by = PhysicalGroupBy::new_single(group_exprs);
4651
4652 let (input_plan, schema, rewritten_aggregates) =
4654 self.precompute_custom_aggregate_args(input_plan, &schema, aggregates, &state, &ctx)?;
4655
4656 let (aggr_exprs, filter_exprs): (Vec<_>, Vec<_>) = self
4659 .translate_aggregates(&rewritten_aggregates, &schema, &state, &ctx)?
4660 .into_iter()
4661 .unzip();
4662 let num_aggregates = aggr_exprs.len();
4663
4664 let agg_exec = Arc::new(AggregateExec::try_new(
4665 AggregateMode::Single,
4666 physical_group_by,
4667 aggr_exprs,
4668 filter_exprs,
4669 input_plan,
4670 schema,
4671 )?);
4672
4673 let agg_schema = agg_exec.schema();
4677 let num_group_by = agg_schema.fields().len() - num_aggregates;
4680 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4681 Vec::new();
4682
4683 for (i, field) in agg_schema.fields().iter().enumerate() {
4684 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4685 datafusion::physical_expr::expressions::Column::new(field.name(), i),
4686 );
4687 let name = if i >= num_group_by {
4688 aggregate_column_name(&aggregates[i - num_group_by])
4690 } else {
4691 field.name().clone()
4692 };
4693 proj_exprs.push((col_expr, name));
4694 }
4695
4696 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, agg_exec)?))
4697 }
4698
4699 fn wrap_temporal_sort_key(
4704 arg: datafusion::logical_expr::Expr,
4705 schema: &SchemaRef,
4706 ) -> Result<datafusion::logical_expr::Expr> {
4707 use datafusion::logical_expr::ScalarUDF;
4708 if let Ok(arg_type) = arg.get_type(&datafusion::common::DFSchema::try_from(
4709 schema.as_ref().clone(),
4710 )?) {
4711 if uni_common::core::schema::is_datetime_struct(&arg_type) {
4712 return Ok(datafusion::logical_expr::Expr::ScalarFunction(
4713 datafusion::logical_expr::expr::ScalarFunction::new_udf(
4714 Arc::new(ScalarUDF::from(
4715 datafusion::functions::core::getfield::GetFieldFunc::new(),
4716 )),
4717 vec![arg, datafusion::logical_expr::lit("nanos_since_epoch")],
4718 ),
4719 ));
4720 } else if uni_common::core::schema::is_time_struct(&arg_type) {
4721 return Ok(datafusion::logical_expr::Expr::ScalarFunction(
4722 datafusion::logical_expr::expr::ScalarFunction::new_udf(
4723 Arc::new(ScalarUDF::from(
4724 datafusion::functions::core::getfield::GetFieldFunc::new(),
4725 )),
4726 vec![arg, datafusion::logical_expr::lit("nanos_since_midnight")],
4727 ),
4728 ));
4729 }
4730 }
4731 Ok(arg)
4732 }
4733
4734 fn translate_aggregates(
4736 &self,
4737 aggregates: &[Expr],
4738 schema: &SchemaRef,
4739 state: &SessionState,
4740 ctx: &TranslationContext,
4741 ) -> Result<Vec<PhysicalAggregate>> {
4742 use datafusion::functions_aggregate::expr_fn::{avg, count, max, min, sum};
4743
4744 let mut result: Vec<PhysicalAggregate> = Vec::new();
4745
4746 for agg_expr in aggregates {
4747 let Expr::FunctionCall {
4748 name,
4749 args,
4750 distinct,
4751 ..
4752 } = agg_expr
4753 else {
4754 return Err(anyhow!("Expected aggregate function, got: {:?}", agg_expr));
4755 };
4756
4757 let name_lower = name.to_lowercase();
4758
4759 let get_arg = || -> Result<DfExpr> {
4761 if args.is_empty() {
4762 return Err(anyhow!("{}() requires an argument", name_lower));
4763 }
4764 cypher_expr_to_df(&args[0], Some(ctx))
4765 };
4766
4767 let df_agg = match name_lower.as_str() {
4768 "count" if args.is_empty() => count(datafusion::logical_expr::lit(1)),
4769 "count" => {
4770 if matches!(args.first(), Some(uni_cypher::ast::Expr::Wildcard)) {
4776 count(datafusion::logical_expr::lit(1))
4777 } else if matches!(args.first(), Some(uni_cypher::ast::Expr::Variable(_))) {
4778 if *distinct {
4779 count(get_arg()?)
4780 } else {
4781 count(datafusion::logical_expr::lit(1))
4782 }
4783 } else {
4784 count(get_arg()?)
4785 }
4786 }
4787 "sum" => {
4788 let arg = get_arg()?;
4789 if self.is_large_binary_col(&arg, schema) {
4790 let udaf = Arc::new(crate::query::df_udfs::create_cypher_sum_udaf());
4791 udaf.call(vec![arg])
4792 } else {
4793 use datafusion::logical_expr::Cast;
4796 let is_float = if let DfExpr::Column(col) = &arg
4797 && let Ok(field) = schema.field_with_name(&col.name)
4798 {
4799 matches!(
4800 field.data_type(),
4801 datafusion::arrow::datatypes::DataType::Float32
4802 | datafusion::arrow::datatypes::DataType::Float64
4803 )
4804 } else {
4805 false
4806 };
4807 if is_float {
4808 sum(DfExpr::Cast(Cast::new(
4809 Box::new(arg),
4810 datafusion::arrow::datatypes::DataType::Float64,
4811 )))
4812 } else {
4813 sum(DfExpr::Cast(Cast::new(
4814 Box::new(arg),
4815 datafusion::arrow::datatypes::DataType::Int64,
4816 )))
4817 }
4818 }
4819 }
4820 "avg" => {
4821 let arg = get_arg()?;
4822 if self.is_large_binary_col(&arg, schema) {
4823 let coerced = crate::query::df_udfs::cypher_to_float64_expr(arg);
4824 avg(coerced)
4825 } else {
4826 use datafusion::logical_expr::Cast;
4827 avg(DfExpr::Cast(Cast::new(
4828 Box::new(arg),
4829 datafusion::arrow::datatypes::DataType::Float64,
4830 )))
4831 }
4832 }
4833 "min" => {
4834 let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
4836
4837 if self.is_large_binary_col(&arg, schema) {
4838 let udaf = Arc::new(crate::query::df_udfs::create_cypher_min_udaf());
4839 udaf.call(vec![arg])
4840 } else {
4841 min(arg)
4842 }
4843 }
4844 "max" => {
4845 let arg = Self::wrap_temporal_sort_key(get_arg()?, schema)?;
4847
4848 if self.is_large_binary_col(&arg, schema) {
4849 let udaf = Arc::new(crate::query::df_udfs::create_cypher_max_udaf());
4850 udaf.call(vec![arg])
4851 } else {
4852 max(arg)
4853 }
4854 }
4855 "percentiledisc" => {
4856 if args.len() != 2 {
4857 return Err(anyhow!("percentileDisc() requires exactly 2 arguments"));
4858 }
4859 let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
4860 let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
4861 let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
4862 let udaf =
4863 Arc::new(crate::query::df_udfs::create_cypher_percentile_disc_udaf());
4864 udaf.call(vec![coerced, pct_arg])
4865 }
4866 "percentilecont" => {
4867 if args.len() != 2 {
4868 return Err(anyhow!("percentileCont() requires exactly 2 arguments"));
4869 }
4870 let expr_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
4871 let pct_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
4872 let coerced = crate::query::df_udfs::cypher_to_float64_expr(expr_arg);
4873 let udaf =
4874 Arc::new(crate::query::df_udfs::create_cypher_percentile_cont_udaf());
4875 udaf.call(vec![coerced, pct_arg])
4876 }
4877 "collect" => {
4878 let arg = get_arg()?;
4881 crate::query::df_udfs::create_cypher_collect_expr(arg, *distinct)
4882 }
4883 "btic_min" => {
4884 let arg = get_arg()?;
4885 let udaf = Arc::new(crate::query::df_udfs::create_btic_min_udaf());
4886 udaf.call(vec![arg])
4887 }
4888 "btic_max" => {
4889 let arg = get_arg()?;
4890 let udaf = Arc::new(crate::query::df_udfs::create_btic_max_udaf());
4891 udaf.call(vec![arg])
4892 }
4893 "btic_span_agg" => {
4894 let arg = get_arg()?;
4895 let udaf = Arc::new(crate::query::df_udfs::create_btic_span_agg_udaf());
4896 udaf.call(vec![arg])
4897 }
4898 "btic_count_at" => {
4899 if args.len() != 2 {
4900 return Err(anyhow!("btic_count_at() requires exactly 2 arguments"));
4901 }
4902 let btic_arg = cypher_expr_to_df(&args[0], Some(ctx))?;
4903 let point_arg = cypher_expr_to_df(&args[1], Some(ctx))?;
4904 let udaf = Arc::new(crate::query::df_udfs::create_btic_count_at_udaf());
4905 udaf.call(vec![btic_arg, point_arg])
4906 }
4907 _ => {
4908 if let Some((ns, local)) = name_lower.split_once('.')
4915 && let Some(entry) = self
4916 .plugin_registry
4917 .aggregate(&uni_plugin::QName::new(ns, local))
4918 {
4919 let arg_exprs: Vec<DfExpr> = args
4920 .iter()
4921 .map(|a| cypher_expr_to_df(a, Some(ctx)))
4922 .collect::<Result<Vec<_>>>()?;
4923 let udaf = Arc::new(datafusion::logical_expr::AggregateUDF::from(
4924 crate::query::df_udaf_plugin::PluginAggregateUdaf::new(
4925 uni_plugin::QName::new(ns, local),
4926 Arc::clone(&self.plugin_registry),
4927 entry.signature.clone(),
4928 ),
4929 ));
4930 udaf.call(arg_exprs)
4931 } else {
4932 return Err(anyhow!("Unsupported aggregate function: {}", name));
4933 }
4934 }
4935 };
4936
4937 let df_agg = if *distinct
4939 && !matches!(
4940 name_lower.as_str(),
4941 "collect" | "percentiledisc" | "percentilecont"
4942 ) {
4943 use datafusion::prelude::ExprFunctionExt;
4944 df_agg.distinct().build().map_err(|e| anyhow!("{}", e))?
4945 } else {
4946 df_agg
4947 };
4948
4949 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
4951 let df_agg = Self::resolve_udfs(&df_agg, state)?;
4952 let df_agg = crate::query::df_expr::apply_type_coercion(&df_agg, &df_schema)?;
4953 let df_agg = Self::resolve_udfs(&df_agg, state)?;
4954
4955 let agg_and_filter = self.create_physical_aggregate(&df_agg, schema, state)?;
4957 result.push(agg_and_filter);
4958 }
4959
4960 Ok(result)
4961 }
4962
4963 fn precompute_custom_aggregate_args(
4969 &self,
4970 input_plan: Arc<dyn ExecutionPlan>,
4971 schema: &SchemaRef,
4972 aggregates: &[Expr],
4973 state: &SessionState,
4974 ctx: &TranslationContext,
4975 ) -> Result<(Arc<dyn ExecutionPlan>, SchemaRef, Vec<Expr>)> {
4976 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
4977
4978 let mut needs_projection = false;
4979 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
4980 Vec::new();
4981 let mut rewritten_aggregates = Vec::new();
4982 let mut col_counter = 0;
4983
4984 for (i, field) in schema.fields().iter().enumerate() {
4986 let col_expr: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
4987 datafusion::physical_expr::expressions::Column::new(field.name(), i),
4988 );
4989 proj_exprs.push((col_expr, field.name().clone()));
4990 }
4991
4992 for agg_expr in aggregates {
4994 let Expr::FunctionCall {
4995 name,
4996 args,
4997 distinct,
4998 window_spec,
4999 } = agg_expr
5000 else {
5001 rewritten_aggregates.push(agg_expr.clone());
5002 continue;
5003 };
5004
5005 let mut rewritten_args = Vec::new();
5006 let mut agg_needs_rewrite = false;
5007
5008 for arg in args {
5009 if CypherPhysicalExprCompiler::contains_custom_expr(arg) {
5010 let compiler = CypherPhysicalExprCompiler::new(state, Some(ctx))
5012 .with_subquery_ctx(
5013 self.graph_ctx.clone(),
5014 self.schema.clone(),
5015 self.session_ctx.clone(),
5016 self.storage.clone(),
5017 self.params.clone(),
5018 self.outer_entity_vars.clone(),
5019 );
5020 let physical_expr = compiler.compile(arg, schema)?;
5021
5022 let col_name = format!("__pc_{}", col_counter);
5024 col_counter += 1;
5025 proj_exprs.push((physical_expr, col_name.clone()));
5026
5027 rewritten_args.push(Expr::Variable(col_name));
5029 agg_needs_rewrite = true;
5030 needs_projection = true;
5031 } else {
5032 rewritten_args.push(arg.clone());
5033 }
5034 }
5035
5036 if agg_needs_rewrite {
5037 rewritten_aggregates.push(Expr::FunctionCall {
5038 name: name.clone(),
5039 args: rewritten_args,
5040 distinct: *distinct,
5041 window_spec: window_spec.clone(),
5042 });
5043 } else {
5044 rewritten_aggregates.push(agg_expr.clone());
5045 }
5046 }
5047
5048 if needs_projection {
5049 let projection_exec = Arc::new(
5050 datafusion::physical_plan::projection::ProjectionExec::try_new(
5051 proj_exprs, input_plan,
5052 )?,
5053 );
5054 let new_schema = projection_exec.schema();
5055 Ok((projection_exec, new_schema, rewritten_aggregates))
5056 } else {
5057 Ok((input_plan, schema.clone(), aggregates.to_vec()))
5058 }
5059 }
5060
5061 fn plan_sort(
5068 &self,
5069 input: &LogicalPlan,
5070 order_by: &[SortItem],
5071 all_properties: &HashMap<String, HashSet<String>>,
5072 alias_map: &HashMap<String, Expr>,
5073 ) -> Result<Arc<dyn ExecutionPlan>> {
5074 let input_plan = self.plan_internal(input, all_properties)?;
5075 let schema = input_plan.schema();
5076
5077 let session = self.session_ctx.read();
5078
5079 let ctx = self.translation_context_for_plan(input);
5081
5082 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
5084
5085 use crate::query::df_graph::expr_compiler::CypherPhysicalExprCompiler;
5089
5090 let mut df_sort_exprs = Vec::new();
5091 let mut custom_physical_overrides: Vec<(
5092 usize,
5093 Arc<dyn datafusion::physical_expr::PhysicalExpr>,
5094 )> = Vec::new();
5095 for item in order_by {
5096 let mut sort_expr = item.expr.clone();
5097
5098 if let Expr::Variable(ref name) = sort_expr {
5101 let col_name = name.as_str();
5103 let exists_in_schema = schema.fields().iter().any(|f| f.name() == col_name);
5104
5105 if !exists_in_schema && let Some(aliased_expr) = alias_map.get(col_name) {
5106 sort_expr = aliased_expr.clone();
5107 }
5108 }
5109
5110 let asc = item.ascending;
5111 let nulls_first = !asc; if CypherPhysicalExprCompiler::contains_custom_expr(&sort_expr) {
5117 let sort_state = session.state();
5118 let compiler = CypherPhysicalExprCompiler::new(&sort_state, Some(&ctx))
5119 .with_subquery_ctx(
5120 self.graph_ctx.clone(),
5121 self.schema.clone(),
5122 self.session_ctx.clone(),
5123 self.storage.clone(),
5124 self.params.clone(),
5125 self.outer_entity_vars.clone(),
5126 );
5127 let inner_physical = compiler.compile(&sort_expr, &schema)?;
5128
5129 let first_col = schema
5132 .fields()
5133 .first()
5134 .map(|f| f.name().clone())
5135 .unwrap_or_else(|| "_dummy_".to_string());
5136 let dummy_expr = DfExpr::Column(datafusion::common::Column::from_name(&first_col));
5137 let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
5138 let sort_key_expr = sort_key_udf.call(vec![dummy_expr]);
5139 custom_physical_overrides.push((df_sort_exprs.len(), inner_physical));
5140 df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
5141 continue;
5142 }
5143
5144 let df_expr = cypher_expr_to_df(&sort_expr, Some(&ctx))?;
5145 let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
5146 let df_expr = crate::query::df_expr::apply_type_coercion(&df_expr, &df_schema)?;
5147 let df_expr = Self::resolve_udfs(&df_expr, &session.state())?;
5150
5151 let sort_key_udf = crate::query::df_udfs::create_cypher_sort_key_udf();
5156 let sort_key_expr = sort_key_udf.call(vec![df_expr]);
5157 df_sort_exprs.push(DfSortExpr::new(sort_key_expr, asc, nulls_first));
5158 }
5159
5160 let mut physical_sort_exprs = create_physical_sort_exprs(
5161 &df_sort_exprs,
5162 &df_schema,
5163 session.state().execution_props(),
5164 )?;
5165
5166 for (idx, custom_inner) in custom_physical_overrides {
5170 if idx < physical_sort_exprs.len() {
5171 let phys = &physical_sort_exprs[idx];
5172 let sort_key_udf = Arc::new(crate::query::df_udfs::create_cypher_sort_key_udf());
5176 let config_options = Arc::new(datafusion::config::ConfigOptions::default());
5177 let udf_name = sort_key_udf.name().to_string();
5178 let new_sort_key = datafusion::physical_expr::ScalarFunctionExpr::new(
5179 &udf_name,
5180 sort_key_udf,
5181 vec![custom_inner],
5182 Arc::new(arrow_schema::Field::new(
5183 "_cypher_sort_key",
5184 DataType::LargeBinary,
5185 true,
5186 )),
5187 config_options,
5188 );
5189 physical_sort_exprs[idx] = datafusion::physical_expr::PhysicalSortExpr {
5190 expr: Arc::new(new_sort_key),
5191 options: phys.options,
5192 };
5193 }
5194 }
5195
5196 let lex_ordering = datafusion::physical_expr::LexOrdering::new(physical_sort_exprs)
5199 .ok_or_else(|| anyhow!("ORDER BY must have at least one sort expression"))?;
5200
5201 Ok(Arc::new(SortExec::new(lex_ordering, input_plan)))
5202 }
5203
5204 fn plan_limit(
5206 &self,
5207 input: &LogicalPlan,
5208 skip: Option<usize>,
5209 fetch: Option<usize>,
5210 all_properties: &HashMap<String, HashSet<String>>,
5211 ) -> Result<Arc<dyn ExecutionPlan>> {
5212 let input_plan = self.plan_internal(input, all_properties)?;
5213
5214 if let Some(offset) = skip.filter(|&s| s > 0) {
5216 use datafusion::physical_plan::limit::GlobalLimitExec;
5217 return Ok(Arc::new(GlobalLimitExec::new(input_plan, offset, fetch)));
5218 }
5219
5220 if let Some(limit) = fetch {
5221 Ok(Arc::new(LocalLimitExec::new(input_plan, limit)))
5222 } else {
5223 Ok(input_plan)
5225 }
5226 }
5227
5228 fn plan_union(
5230 &self,
5231 left: &LogicalPlan,
5232 right: &LogicalPlan,
5233 all: bool,
5234 all_properties: &HashMap<String, HashSet<String>>,
5235 ) -> Result<Arc<dyn ExecutionPlan>> {
5236 let left_plan = self.plan_internal(left, all_properties)?;
5237 let right_plan = self.plan_internal(right, all_properties)?;
5238
5239 let left_schema = left_plan.schema();
5256 let right_schema = right_plan.schema();
5257 if left_schema.fields().len() != right_schema.fields().len()
5258 || left_schema
5259 .fields()
5260 .iter()
5261 .zip(right_schema.fields().iter())
5262 .any(|(l, r)| l.data_type() != r.data_type())
5263 {
5264 let fmt = |s: &Schema| {
5265 s.fields()
5266 .iter()
5267 .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
5268 .collect::<Vec<_>>()
5269 .join(", ")
5270 };
5271 return Err(anyhow!(
5272 "Plan: cannot UNION branches with mismatched schemas — \
5273 left=[{}], right=[{}]. This is a planner bug; please file \
5274 an issue.",
5275 fmt(left_schema.as_ref()),
5276 fmt(right_schema.as_ref()),
5277 ));
5278 }
5279
5280 let union_plan = UnionExec::try_new(vec![left_plan, right_plan])?;
5281
5282 if !all {
5284 use datafusion::physical_plan::aggregates::{
5285 AggregateExec, AggregateMode, PhysicalGroupBy,
5286 };
5287 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
5288
5289 let coalesced = Arc::new(CoalescePartitionsExec::new(union_plan));
5291
5292 let schema = coalesced.schema();
5294 let group_by_exprs: Vec<_> = (0..schema.fields().len())
5295 .map(|i| {
5296 (
5297 Arc::new(datafusion::physical_plan::expressions::Column::new(
5298 schema.field(i).name(),
5299 i,
5300 ))
5301 as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
5302 schema.field(i).name().clone(),
5303 )
5304 })
5305 .collect();
5306
5307 let group_by = PhysicalGroupBy::new_single(group_by_exprs);
5308
5309 Ok(Arc::new(AggregateExec::try_new(
5310 AggregateMode::Single,
5311 group_by,
5312 vec![], vec![], coalesced,
5315 schema,
5316 )?))
5317 } else {
5318 Ok(union_plan)
5320 }
5321 }
5322
5323 fn plan_window_functions(
5329 &self,
5330 input: Arc<dyn ExecutionPlan>,
5331 window_exprs: &[Expr],
5332 context_plan: Option<&LogicalPlan>,
5333 ) -> Result<Arc<dyn ExecutionPlan>> {
5334 use datafusion::functions_aggregate::average::avg_udaf;
5335 use datafusion::functions_aggregate::count::count_udaf;
5336 use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
5337 use datafusion::functions_aggregate::sum::sum_udaf;
5338 use datafusion::functions_window::lead_lag::{lag_udwf, lead_udwf};
5339 use datafusion::functions_window::nth_value::{
5340 first_value_udwf, last_value_udwf, nth_value_udwf,
5341 };
5342 use datafusion::functions_window::ntile::ntile_udwf;
5343 use datafusion::functions_window::rank::{dense_rank_udwf, rank_udwf};
5344 use datafusion::functions_window::row_number::row_number_udwf;
5345 use datafusion::logical_expr::{WindowFrame, WindowFunctionDefinition};
5346 use datafusion::physical_expr::LexOrdering;
5347 use datafusion::physical_plan::sorts::sort::SortExec;
5348 use datafusion::physical_plan::windows::{WindowAggExec, create_window_expr};
5349
5350 let input_schema = input.schema();
5351 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
5352
5353 let session = self.session_ctx.read();
5354 let state = session.state();
5355
5356 let tx_ctx = context_plan.map(|p| self.translation_context_for_plan(p));
5358 let mut window_expr_list = Vec::new();
5359
5360 for expr in window_exprs {
5361 let Expr::FunctionCall {
5362 name,
5363 args,
5364 distinct,
5365 window_spec: Some(window_spec),
5366 } = expr
5367 else {
5368 return Err(anyhow!("Expected window function call with OVER clause"));
5369 };
5370
5371 let name_lower = name.to_lowercase();
5372
5373 let (window_fn_def, is_aggregate) = match name_lower.as_str() {
5375 "count" => (WindowFunctionDefinition::AggregateUDF(count_udaf()), true),
5377 "sum" => (WindowFunctionDefinition::AggregateUDF(sum_udaf()), true),
5378 "avg" => (WindowFunctionDefinition::AggregateUDF(avg_udaf()), true),
5379 "min" => (WindowFunctionDefinition::AggregateUDF(min_udaf()), true),
5380 "max" => (WindowFunctionDefinition::AggregateUDF(max_udaf()), true),
5381 "row_number" => (
5383 WindowFunctionDefinition::WindowUDF(row_number_udwf()),
5384 false,
5385 ),
5386 "rank" => (WindowFunctionDefinition::WindowUDF(rank_udwf()), false),
5387 "dense_rank" => (
5388 WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
5389 false,
5390 ),
5391 "lag" => (WindowFunctionDefinition::WindowUDF(lag_udwf()), false),
5392 "lead" => (WindowFunctionDefinition::WindowUDF(lead_udwf()), false),
5393 "ntile" => {
5394 if let Some(Expr::Literal(CypherLiteral::Integer(n))) = args.first()
5396 && *n <= 0
5397 {
5398 return Err(anyhow!("NTILE bucket count must be positive, got: {}", n));
5399 }
5400 (WindowFunctionDefinition::WindowUDF(ntile_udwf()), false)
5401 }
5402 "first_value" => (
5403 WindowFunctionDefinition::WindowUDF(first_value_udwf()),
5404 false,
5405 ),
5406 "last_value" => (
5407 WindowFunctionDefinition::WindowUDF(last_value_udwf()),
5408 false,
5409 ),
5410 "nth_value" => (WindowFunctionDefinition::WindowUDF(nth_value_udwf()), false),
5411 other => return Err(anyhow!("Unsupported window function: {}", other)),
5412 };
5413
5414 let physical_args: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
5416 if args.is_empty() || matches!(args.as_slice(), [Expr::Wildcard]) {
5417 if is_aggregate {
5419 vec![create_physical_expr(
5420 &datafusion::logical_expr::lit(1),
5421 &df_schema,
5422 state.execution_props(),
5423 )?]
5424 } else {
5425 vec![]
5427 }
5428 } else {
5429 args.iter()
5430 .map(|arg| {
5431 let mut df_expr = cypher_expr_to_df(arg, tx_ctx.as_ref())?;
5432
5433 if is_aggregate {
5436 let cast_type = match name_lower.as_str() {
5437 "sum" => Some(datafusion::arrow::datatypes::DataType::Int64),
5438 "avg" => Some(datafusion::arrow::datatypes::DataType::Float64),
5439 _ => None,
5440 };
5441 if let Some(target_type) = cast_type {
5442 df_expr = DfExpr::Cast(datafusion::logical_expr::Cast::new(
5443 Box::new(df_expr),
5444 target_type,
5445 ));
5446 }
5447 }
5448
5449 create_physical_expr(&df_expr, &df_schema, state.execution_props())
5450 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
5451 })
5452 .collect::<Result<Vec<_>>>()?
5453 };
5454
5455 let partition_by_physical: Vec<Arc<dyn datafusion::physical_expr::PhysicalExpr>> =
5457 window_spec
5458 .partition_by
5459 .iter()
5460 .map(|e| {
5461 let df_expr = cypher_expr_to_df(e, tx_ctx.as_ref())?;
5462 create_physical_expr(&df_expr, &df_schema, state.execution_props())
5463 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))
5464 })
5465 .collect::<Result<Vec<_>>>()?;
5466
5467 let mut order_by_physical: Vec<datafusion::physical_expr::PhysicalSortExpr> =
5469 window_spec
5470 .order_by
5471 .iter()
5472 .map(|sort_item| {
5473 let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
5474 let physical_expr =
5475 create_physical_expr(&df_expr, &df_schema, state.execution_props())
5476 .map_err(|e| anyhow!("Failed to create physical expr: {}", e))?;
5477 Ok(datafusion::physical_expr::PhysicalSortExpr {
5478 expr: physical_expr,
5479 options: datafusion::arrow::compute::SortOptions {
5480 descending: !sort_item.ascending,
5481 nulls_first: !sort_item.ascending, },
5483 })
5484 })
5485 .collect::<Result<Vec<_>>>()?;
5486
5487 if order_by_physical.is_empty() && !partition_by_physical.is_empty() {
5490 for partition_expr in &partition_by_physical {
5491 order_by_physical.push(datafusion::physical_expr::PhysicalSortExpr {
5492 expr: Arc::clone(partition_expr),
5493 options: datafusion::arrow::compute::SortOptions {
5494 descending: false,
5495 nulls_first: false,
5496 },
5497 });
5498 }
5499 }
5500
5501 let window_frame = if is_aggregate {
5506 if window_spec.order_by.is_empty() {
5507 use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
5509 Arc::new(WindowFrame::new_bounds(
5510 WindowFrameUnits::Rows,
5511 WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
5512 WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
5513 ))
5514 } else {
5515 Arc::new(WindowFrame::new(Some(false)))
5517 }
5518 } else {
5519 use datafusion::logical_expr::{WindowFrameBound, WindowFrameUnits};
5521 Arc::new(WindowFrame::new_bounds(
5522 WindowFrameUnits::Rows,
5523 WindowFrameBound::Preceding(datafusion::common::ScalarValue::UInt64(None)),
5524 WindowFrameBound::Following(datafusion::common::ScalarValue::UInt64(None)),
5525 ))
5526 };
5527
5528 let alias = expr.to_string_repr();
5530
5531 let window_expr = create_window_expr(
5533 &window_fn_def,
5534 alias,
5535 &physical_args,
5536 &partition_by_physical,
5537 &order_by_physical,
5538 window_frame,
5539 input_schema.clone(),
5540 false, *distinct,
5542 None, )?;
5544
5545 window_expr_list.push(window_expr);
5546 }
5547
5548 let mut sort_exprs = Vec::new();
5551
5552 for expr in window_exprs {
5554 if let Expr::FunctionCall {
5555 window_spec: Some(window_spec),
5556 ..
5557 } = expr
5558 {
5559 for partition_expr in &window_spec.partition_by {
5560 let df_expr = cypher_expr_to_df(partition_expr, tx_ctx.as_ref())?;
5561 let physical_expr =
5562 create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
5563
5564 if !sort_exprs
5567 .iter()
5568 .any(|s: &datafusion::physical_expr::PhysicalSortExpr| {
5569 s.expr.to_string() == physical_expr.to_string()
5570 })
5571 {
5572 sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
5573 expr: physical_expr,
5574 options: datafusion::arrow::compute::SortOptions {
5575 descending: false,
5576 nulls_first: false,
5577 },
5578 });
5579 }
5580 }
5581
5582 for sort_item in &window_spec.order_by {
5584 let df_expr = cypher_expr_to_df(&sort_item.expr, tx_ctx.as_ref())?;
5585 let physical_expr =
5586 create_physical_expr(&df_expr, &df_schema, state.execution_props())?;
5587
5588 sort_exprs.push(datafusion::physical_expr::PhysicalSortExpr {
5589 expr: physical_expr,
5590 options: datafusion::arrow::compute::SortOptions {
5591 descending: !sort_item.ascending,
5592 nulls_first: !sort_item.ascending,
5593 },
5594 });
5595 }
5596 }
5597 }
5598
5599 let sorted_input = if !sort_exprs.is_empty() {
5601 let lex_ordering = LexOrdering::new(sort_exprs)
5602 .ok_or_else(|| anyhow!("Failed to create LexOrdering for window function"))?;
5603 Arc::new(SortExec::new(lex_ordering, input)) as Arc<dyn ExecutionPlan>
5604 } else {
5605 input
5606 };
5607
5608 let window_agg_exec = WindowAggExec::try_new(
5610 window_expr_list,
5611 sorted_input,
5612 false, )?;
5614
5615 Ok(Arc::new(window_agg_exec))
5616 }
5617
5618 fn plan_empty(&self) -> Result<Arc<dyn ExecutionPlan>> {
5623 let schema = Arc::new(Schema::empty());
5624 Ok(Arc::new(PlaceholderRowExec::new(schema)))
5627 }
5628
5629 fn plan_bind_zero_length_path(
5632 &self,
5633 input: &LogicalPlan,
5634 node_variable: &str,
5635 path_variable: &str,
5636 all_properties: &HashMap<String, HashSet<String>>,
5637 ) -> Result<Arc<dyn ExecutionPlan>> {
5638 let input_plan = self.plan_internal(input, all_properties)?;
5639 Ok(Arc::new(BindZeroLengthPathExec::new(
5640 input_plan,
5641 node_variable.to_string(),
5642 path_variable.to_string(),
5643 self.graph_ctx.clone(),
5644 )))
5645 }
5646
5647 fn plan_bind_path(
5650 &self,
5651 input: &LogicalPlan,
5652 node_variables: &[String],
5653 edge_variables: &[String],
5654 path_variable: &str,
5655 all_properties: &HashMap<String, HashSet<String>>,
5656 ) -> Result<Arc<dyn ExecutionPlan>> {
5657 let input_plan = self.plan_internal(input, all_properties)?;
5658 Ok(Arc::new(BindFixedPathExec::new(
5659 input_plan,
5660 node_variables.to_vec(),
5661 edge_variables.to_vec(),
5662 path_variable.to_string(),
5663 self.graph_ctx.clone(),
5664 )))
5665 }
5666
5667 fn extract_edge_property_conditions(expr: &Expr) -> Vec<(String, uni_common::Value)> {
5676 match expr {
5677 Expr::BinaryOp {
5678 left,
5679 op: uni_cypher::ast::BinaryOp::Eq,
5680 right,
5681 } => {
5682 if let Expr::Property(inner, prop_name) = left.as_ref()
5684 && matches!(inner.as_ref(), Expr::Variable(_))
5685 && let Expr::Literal(lit) = right.as_ref()
5686 {
5687 return vec![(prop_name.clone(), lit.to_value())];
5688 }
5689 if let Expr::Literal(lit) = left.as_ref()
5691 && let Expr::Property(inner, prop_name) = right.as_ref()
5692 && matches!(inner.as_ref(), Expr::Variable(_))
5693 {
5694 return vec![(prop_name.clone(), lit.to_value())];
5695 }
5696 vec![]
5697 }
5698 Expr::BinaryOp {
5699 left,
5700 op: uni_cypher::ast::BinaryOp::And,
5701 right,
5702 } => {
5703 let mut result = Self::extract_edge_property_conditions(left);
5704 result.extend(Self::extract_edge_property_conditions(right));
5705 result
5706 }
5707 _ => vec![],
5708 }
5709 }
5710
5711 fn create_physical_filter_expr(
5716 &self,
5717 expr: &DfExpr,
5718 schema: &SchemaRef,
5719 session: &SessionContext,
5720 ) -> Result<Arc<dyn datafusion::physical_expr::PhysicalExpr>> {
5721 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
5722 let state = session.state();
5723
5724 let resolved_expr = Self::resolve_udfs(expr, &state)?;
5726
5727 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
5729
5730 let coerced_expr = Self::resolve_udfs(&coerced_expr, &state)?;
5732
5733 use datafusion::physical_planner::PhysicalPlanner;
5735 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
5736 let physical = planner.create_physical_expr(&coerced_expr, &df_schema, &state)?;
5737
5738 Ok(physical)
5739 }
5740
5741 fn resolve_udfs(expr: &DfExpr, state: &datafusion::execution::SessionState) -> Result<DfExpr> {
5746 use datafusion::common::tree_node::{Transformed, TreeNode};
5747 use datafusion::logical_expr::Expr as DfExpr;
5748
5749 let result = expr
5750 .clone()
5751 .transform_up(|node| {
5752 if let DfExpr::ScalarFunction(ref func) = node {
5753 let udf_name = func.func.name();
5754 if let Some(registered_udf) = state.scalar_functions().get(udf_name) {
5755 return Ok(Transformed::yes(DfExpr::ScalarFunction(
5756 datafusion::logical_expr::expr::ScalarFunction {
5757 func: registered_udf.clone(),
5758 args: func.args.clone(),
5759 },
5760 )));
5761 }
5762 }
5763 Ok(Transformed::no(node))
5764 })
5765 .map_err(|e| anyhow::anyhow!("Failed to resolve UDFs: {}", e))?;
5766
5767 Ok(result.data)
5768 }
5769
5770 fn add_structural_projection(
5773 &self,
5774 input: Arc<dyn ExecutionPlan>,
5775 variable: &str,
5776 properties: &[String],
5777 ) -> Result<Arc<dyn ExecutionPlan>> {
5778 use datafusion::functions::expr_fn::named_struct;
5779 use datafusion::logical_expr::lit;
5780 use datafusion::physical_plan::projection::ProjectionExec;
5781
5782 let input_schema = input.schema();
5783 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
5784 Vec::new();
5785
5786 for (i, field) in input_schema.fields().iter().enumerate() {
5788 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
5789 field.name(),
5790 i,
5791 ));
5792 proj_exprs.push((col_expr, field.name().clone()));
5793 }
5794
5795 let mut struct_args = Vec::with_capacity(properties.len() * 2 + 4);
5797
5798 struct_args.push(lit("_vid"));
5800 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5801 format!("{}._vid", variable),
5802 )));
5803
5804 struct_args.push(lit("_labels"));
5806 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5807 format!("{}._labels", variable),
5808 )));
5809
5810 for prop in properties {
5811 struct_args.push(lit(prop.clone()));
5812 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5813 format!("{}.{}", variable, prop),
5814 )));
5815 }
5816
5817 let struct_expr = named_struct(struct_args);
5819
5820 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
5821 let session = self.session_ctx.read();
5822 let state = session.state();
5823
5824 let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
5826
5827 use datafusion::physical_planner::PhysicalPlanner;
5828 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
5829 let physical_struct_expr =
5830 planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
5831
5832 proj_exprs.push((physical_struct_expr, variable.to_string()));
5833
5834 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
5835 }
5836
5837 fn add_edge_structural_projection(
5839 &self,
5840 input: Arc<dyn ExecutionPlan>,
5841 variable: &str,
5842 properties: &[String],
5843 source_variable: &str,
5844 target_variable: &str,
5845 ) -> Result<Arc<dyn ExecutionPlan>> {
5846 use datafusion::functions::expr_fn::named_struct;
5847 use datafusion::logical_expr::lit;
5848 use datafusion::physical_plan::projection::ProjectionExec;
5849
5850 let input_schema = input.schema();
5851 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
5852 Vec::new();
5853
5854 for (i, field) in input_schema.fields().iter().enumerate() {
5856 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
5857 field.name(),
5858 i,
5859 ));
5860 proj_exprs.push((col_expr, field.name().clone()));
5861 }
5862
5863 let mut struct_args = Vec::with_capacity(properties.len() * 2 + 10);
5865
5866 struct_args.push(lit("_eid"));
5868 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5869 format!("{}._eid", variable),
5870 )));
5871
5872 struct_args.push(lit("_type"));
5873 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5874 format!("{}._type", variable),
5875 )));
5876
5877 let resolve_vid_col = |var: &str| -> String {
5882 let vid_col = format!("{}._vid", var);
5883 if input_schema.column_with_name(&vid_col).is_some() {
5884 vid_col
5885 } else {
5886 var.to_string()
5887 }
5888 };
5889 let src_col_name = resolve_vid_col(source_variable);
5890 let dst_col_name = resolve_vid_col(target_variable);
5891 struct_args.push(lit("_src"));
5892 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5893 src_col_name,
5894 )));
5895
5896 struct_args.push(lit("_dst"));
5897 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5898 dst_col_name,
5899 )));
5900
5901 let all_props_col = format!("{}._all_props", variable);
5903 if input_schema.column_with_name(&all_props_col).is_some() {
5904 struct_args.push(lit("_all_props"));
5905 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5906 all_props_col,
5907 )));
5908 }
5909
5910 for prop in properties {
5911 struct_args.push(lit(prop.clone()));
5912 struct_args.push(DfExpr::Column(datafusion::common::Column::from_name(
5913 format!("{}.{}", variable, prop),
5914 )));
5915 }
5916
5917 let struct_expr = named_struct(struct_args);
5918
5919 let df_schema = datafusion::common::DFSchema::try_from(input_schema.as_ref().clone())?;
5920 let session = self.session_ctx.read();
5921 let state = session.state();
5922
5923 let resolved_expr = Self::resolve_udfs(&struct_expr, &state)?;
5924
5925 use datafusion::physical_planner::PhysicalPlanner;
5926 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
5927 let physical_struct_expr =
5928 planner.create_physical_expr(&resolved_expr, &df_schema, &state)?;
5929
5930 proj_exprs.push((physical_struct_expr, variable.to_string()));
5931
5932 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
5933 }
5934
5935 fn create_physical_aggregate(
5937 &self,
5938 expr: &DfExpr,
5939 schema: &SchemaRef,
5940 state: &SessionState,
5941 ) -> Result<PhysicalAggregate> {
5942 use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
5943
5944 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
5946
5947 let (agg_expr, filter, _ordering) = create_aggregate_expr_and_maybe_filter(
5949 expr,
5950 &df_schema,
5951 schema.as_ref(),
5952 state.execution_props(),
5953 )?;
5954 Ok((agg_expr, filter))
5955 }
5956
5957 fn resolve_source_vid_col(
5962 input_plan: Arc<dyn ExecutionPlan>,
5963 source_variable: &str,
5964 ) -> Result<(Arc<dyn ExecutionPlan>, String)> {
5965 let source_vid_col = format!("{}._vid", source_variable);
5966 if input_plan
5967 .schema()
5968 .column_with_name(&source_vid_col)
5969 .is_some()
5970 {
5971 return Ok((input_plan, source_vid_col));
5972 }
5973 if let Ok(field) = input_plan.schema().field_with_name(source_variable)
5976 && matches!(
5977 field.data_type(),
5978 datafusion::arrow::datatypes::DataType::Struct(_)
5979 )
5980 {
5981 let enriched = Self::extract_struct_identity_columns(input_plan, source_variable)?;
5982 return Ok((enriched, format!("{}._vid", source_variable)));
5983 }
5984 Ok((input_plan, source_variable.to_string()))
5985 }
5986
5987 fn extract_struct_identity_columns(
5992 input: Arc<dyn ExecutionPlan>,
5993 variable: &str,
5994 ) -> Result<Arc<dyn ExecutionPlan>> {
5995 use datafusion::common::ScalarValue;
5996 use datafusion::physical_plan::projection::ProjectionExec;
5997
5998 let schema = input.schema();
5999 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
6000 Vec::new();
6001
6002 for (i, field) in schema.fields().iter().enumerate() {
6004 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
6005 field.name(),
6006 i,
6007 ));
6008 proj_exprs.push((col_expr, field.name().clone()));
6009 }
6010
6011 if let Some((struct_idx, struct_field)) = schema
6013 .fields()
6014 .iter()
6015 .enumerate()
6016 .find(|(_, f)| f.name() == variable)
6017 && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
6018 {
6019 let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
6020 datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
6021 );
6022 let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
6023 Arc::new(datafusion::logical_expr::ScalarUDF::from(
6024 datafusion::functions::core::getfield::GetFieldFunc::new(),
6025 ));
6026
6027 if fields.iter().any(|f| f.name() == "_vid") {
6029 let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
6030 Arc::new(datafusion::physical_expr::expressions::Literal::new(
6031 ScalarValue::Utf8(Some("_vid".to_string())),
6032 ));
6033 let vid_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
6034 get_field_udf.clone(),
6035 vec![struct_col.clone(), field_name],
6036 schema.as_ref(),
6037 Arc::new(datafusion::common::config::ConfigOptions::default()),
6038 )?);
6039 proj_exprs.push((vid_expr, format!("{}._vid", variable)));
6040 }
6041
6042 if fields.iter().any(|f| f.name() == "_labels") {
6044 let field_name: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
6045 Arc::new(datafusion::physical_expr::expressions::Literal::new(
6046 ScalarValue::Utf8(Some("_labels".to_string())),
6047 ));
6048 let labels_expr = Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
6049 get_field_udf,
6050 vec![struct_col, field_name],
6051 schema.as_ref(),
6052 Arc::new(datafusion::common::config::ConfigOptions::default()),
6053 )?);
6054 proj_exprs.push((labels_expr, format!("{}._labels", variable)));
6055 }
6056 }
6057
6058 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
6059 }
6060
6061 fn extract_all_struct_fields(
6065 input: Arc<dyn ExecutionPlan>,
6066 variable: &str,
6067 ) -> Result<Arc<dyn ExecutionPlan>> {
6068 use datafusion::common::ScalarValue;
6069 use datafusion::physical_plan::projection::ProjectionExec;
6070
6071 let schema = input.schema();
6072 let mut proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
6073 Vec::new();
6074
6075 for (i, field) in schema.fields().iter().enumerate() {
6077 let col_expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
6078 field.name(),
6079 i,
6080 ));
6081 proj_exprs.push((col_expr, field.name().clone()));
6082 }
6083
6084 if let Some((struct_idx, struct_field)) = schema
6086 .fields()
6087 .iter()
6088 .enumerate()
6089 .find(|(_, f)| f.name() == variable)
6090 && let datafusion::arrow::datatypes::DataType::Struct(fields) = struct_field.data_type()
6091 {
6092 let struct_col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
6093 datafusion::physical_expr::expressions::Column::new(variable, struct_idx),
6094 );
6095 let get_field_udf: Arc<datafusion::logical_expr::ScalarUDF> =
6096 Arc::new(datafusion::logical_expr::ScalarUDF::from(
6097 datafusion::functions::core::getfield::GetFieldFunc::new(),
6098 ));
6099
6100 for field in fields.iter() {
6101 let flat_name = format!("{}.{}", variable, field.name());
6102 if schema.column_with_name(&flat_name).is_some() {
6104 continue;
6105 }
6106 let field_lit: Arc<dyn datafusion::physical_expr::PhysicalExpr> =
6107 Arc::new(datafusion::physical_expr::expressions::Literal::new(
6108 ScalarValue::Utf8(Some(field.name().to_string())),
6109 ));
6110 let extract_expr =
6111 Arc::new(datafusion::physical_expr::ScalarFunctionExpr::try_new(
6112 get_field_udf.clone(),
6113 vec![struct_col.clone(), field_lit],
6114 schema.as_ref(),
6115 Arc::new(datafusion::common::config::ConfigOptions::default()),
6116 )?);
6117 proj_exprs.push((extract_expr, flat_name));
6118 }
6119 }
6120
6121 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
6122 }
6123
6124 fn is_large_binary_col(&self, expr: &DfExpr, schema: &SchemaRef) -> bool {
6126 if let DfExpr::Column(col) = expr
6127 && let Ok(field) = schema.field_with_name(&col.name)
6128 {
6129 return matches!(
6130 field.data_type(),
6131 datafusion::arrow::datatypes::DataType::LargeBinary
6132 );
6133 }
6134 true
6137 }
6138}
6139
6140fn coerce_physical_expr(
6156 expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
6157 actual_dt: &DataType,
6158 target_dt: &DataType,
6159 schema: &arrow_schema::Schema,
6160) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
6161 use datafusion::physical_expr::expressions::CastExpr;
6162
6163 match (actual_dt, target_dt) {
6164 (DataType::LargeBinary, DataType::Float64) => wrap_cypher_to_float64(expr, schema),
6166 (DataType::LargeBinary, DataType::Int64) => {
6168 let float_expr = wrap_cypher_to_float64(expr, schema);
6169 Arc::new(CastExpr::new(float_expr, DataType::Int64, None))
6170 }
6171 _ => Arc::new(CastExpr::new(expr, target_dt.clone(), None)),
6173 }
6174}
6175
6176fn wrap_cypher_to_float64(
6178 expr: Arc<dyn datafusion::physical_expr::PhysicalExpr>,
6179 schema: &arrow_schema::Schema,
6180) -> Arc<dyn datafusion::physical_expr::PhysicalExpr> {
6181 let udf = Arc::new(super::df_udfs::cypher_to_float64_udf());
6182 let config = Arc::new(datafusion::common::config::ConfigOptions::default());
6183 Arc::new(
6184 datafusion::physical_expr::ScalarFunctionExpr::try_new(udf, vec![expr], schema, config)
6185 .expect("CypherToFloat64Udf accepts Any(1) signature"),
6186 )
6187}
6188
6189fn strip_conflicting_structural_columns(
6199 input: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
6200 derived_col_names: &HashSet<&str>,
6201) -> anyhow::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
6202 use datafusion::physical_plan::projection::ProjectionExec;
6203
6204 let schema = input.schema();
6205 let proj_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> = schema
6206 .fields()
6207 .iter()
6208 .enumerate()
6209 .filter(|(_, f)| {
6210 !(matches!(f.data_type(), arrow_schema::DataType::Struct(_))
6212 && derived_col_names.contains(f.name().as_str()))
6213 })
6214 .map(|(i, f)| {
6215 let col: Arc<dyn datafusion::physical_expr::PhysicalExpr> = Arc::new(
6216 datafusion::physical_expr::expressions::Column::new(f.name(), i),
6217 );
6218 (col, f.name().clone())
6219 })
6220 .collect();
6221
6222 if proj_exprs.len() == schema.fields().len() {
6223 return Ok(input);
6225 }
6226
6227 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
6228}
6229
6230fn resolve_column_indices(
6231 schema: &arrow_schema::SchemaRef,
6232 column_names: &[String],
6233) -> anyhow::Result<Vec<usize>> {
6234 column_names
6235 .iter()
6236 .map(|name| {
6237 schema
6238 .index_of(name)
6239 .map_err(|_| anyhow::anyhow!("Column '{}' not found in schema", name))
6240 })
6241 .collect()
6242}
6243
6244fn resolve_best_by_criteria(
6246 schema: &arrow_schema::SchemaRef,
6247 criteria: &[(Expr, bool)],
6248) -> anyhow::Result<Vec<super::df_graph::locy_best_by::SortCriterion>> {
6249 criteria
6250 .iter()
6251 .map(|(expr, ascending)| {
6252 let candidates: Vec<String> = match expr {
6255 Expr::Property(base, prop) => {
6256 if let Expr::Variable(var) = base.as_ref() {
6257 vec![prop.clone(), format!("{}.{}", var, prop)]
6258 } else {
6259 vec![prop.clone()]
6260 }
6261 }
6262 Expr::Variable(name) => {
6263 let short = name.rsplit('.').next().unwrap_or(name).to_string();
6264 if short != *name {
6265 vec![short, name.clone()]
6266 } else {
6267 vec![name.clone()]
6268 }
6269 }
6270 _ => {
6271 return Err(anyhow::anyhow!(
6272 "BEST BY criteria must be variable or property access"
6273 ));
6274 }
6275 };
6276 let col_index = candidates
6277 .iter()
6278 .find_map(|name| schema.index_of(name).ok())
6279 .ok_or_else(|| {
6280 anyhow::anyhow!(
6281 "BEST BY column '{}' not found",
6282 candidates.first().unwrap_or(&String::new())
6283 )
6284 })?;
6285 Ok(super::df_graph::locy_best_by::SortCriterion {
6286 col_index,
6287 ascending: *ascending,
6288 nulls_first: false, })
6290 })
6291 .collect()
6292}
6293
6294fn resolve_fold_bindings(
6300 schema: &arrow_schema::SchemaRef,
6301 fold_bindings: &[(String, Expr)],
6302 plugin_registry: &uni_plugin::PluginRegistry,
6303) -> anyhow::Result<Vec<super::df_graph::locy_fold::FoldBinding>> {
6304 use super::df_graph::locy_fold::resolve_locy_aggregate;
6305 fold_bindings
6306 .iter()
6307 .map(|(output_name, expr)| {
6308 match expr {
6310 Expr::FunctionCall { name, args, .. } => {
6311 let upper = name.to_uppercase();
6312 let is_count = matches!(upper.as_str(), "COUNT" | "MCOUNT");
6313
6314 let canonical: smol_str::SmolStr = if is_count && args.is_empty() {
6315 smol_str::SmolStr::new_static("COUNTALL")
6316 } else {
6317 match upper.as_str() {
6318 "SUM" | "MSUM" => smol_str::SmolStr::new_static("SUM"),
6319 "COUNT" | "MCOUNT" => smol_str::SmolStr::new_static("COUNT"),
6320 "MAX" | "MMAX" => smol_str::SmolStr::new_static("MAX"),
6321 "MIN" | "MMIN" => smol_str::SmolStr::new_static("MIN"),
6322 "AVG" => smol_str::SmolStr::new_static("AVG"),
6323 "COLLECT" => smol_str::SmolStr::new_static("COLLECT"),
6324 "MNOR" => smol_str::SmolStr::new_static("MNOR"),
6325 "MPROD" => smol_str::SmolStr::new_static("MPROD"),
6326 other => {
6327 return Err(anyhow::anyhow!(
6328 "Unsupported FOLD aggregate function: {}",
6329 other
6330 ));
6331 }
6332 }
6333 };
6334
6335 let entry = resolve_locy_aggregate(plugin_registry, canonical.as_str())
6336 .ok_or_else(|| {
6337 anyhow::anyhow!(
6338 "Locy aggregate '{canonical}' is not registered in the plugin registry"
6339 )
6340 })?;
6341 let aggregate = Arc::clone(&entry.aggregate);
6342
6343 if canonical.as_str() == "COUNTALL" {
6345 return Ok(super::df_graph::locy_fold::FoldBinding {
6346 output_name: output_name.clone(),
6347 name: canonical,
6348 aggregate,
6349 input_col_index: 0,
6350 input_col_name: None,
6351 });
6352 }
6353
6354 let input_col_index = schema
6357 .index_of(output_name)
6358 .or_else(|_| {
6359 let col_name = match args.first() {
6361 Some(Expr::Variable(name)) => Some(name.clone()),
6362 Some(Expr::Property(base, prop)) => {
6363 if let Expr::Variable(var) = base.as_ref() {
6364 Some(format!("{}.{}", var, prop))
6365 } else {
6366 None
6367 }
6368 }
6369 _ => None,
6370 };
6371 col_name
6372 .and_then(|n| schema.index_of(&n).ok())
6373 .ok_or_else(|| {
6374 arrow_schema::ArrowError::SchemaError(format!(
6375 "FOLD column '{}' not found",
6376 output_name
6377 ))
6378 })
6379 })
6380 .map_err(|_| anyhow::anyhow!("FOLD column '{}' not found", output_name))?;
6381 Ok(super::df_graph::locy_fold::FoldBinding {
6382 output_name: output_name.clone(),
6383 name: canonical,
6384 aggregate,
6385 input_col_index,
6386 input_col_name: Some(output_name.clone()),
6387 })
6388 }
6389 _ => Err(anyhow::anyhow!(
6390 "FOLD binding must be an aggregate function call"
6391 )),
6392 }
6393 })
6394 .collect()
6395}
6396
6397fn collect_variable_kinds(plan: &LogicalPlan, kinds: &mut HashMap<String, VariableKind>) {
6402 match plan {
6403 LogicalPlan::FusedIndexScanWrapped { inner, .. } => {
6406 collect_variable_kinds(inner, kinds);
6407 }
6408 LogicalPlan::Scan { variable, .. }
6409 | LogicalPlan::FusedIndexScan { variable, .. }
6410 | LogicalPlan::ExtIdLookup { variable, .. }
6411 | LogicalPlan::ScanAll { variable, .. }
6412 | LogicalPlan::ScanMainByLabels { variable, .. }
6413 | LogicalPlan::VectorKnn { variable, .. }
6414 | LogicalPlan::InvertedIndexLookup { variable, .. } => {
6415 kinds.insert(variable.clone(), VariableKind::Node);
6416 }
6417 LogicalPlan::Traverse {
6418 input,
6419 source_variable,
6420 target_variable,
6421 step_variable,
6422 path_variable,
6423 is_variable_length,
6424 ..
6425 }
6426 | LogicalPlan::TraverseMainByType {
6427 input,
6428 source_variable,
6429 target_variable,
6430 step_variable,
6431 path_variable,
6432 is_variable_length,
6433 ..
6434 } => {
6435 collect_variable_kinds(input, kinds);
6436 kinds.insert(source_variable.clone(), VariableKind::Node);
6437 kinds.insert(target_variable.clone(), VariableKind::Node);
6438 if let Some(sv) = step_variable {
6439 kinds.insert(sv.clone(), VariableKind::edge_for(*is_variable_length));
6440 }
6441 if let Some(pv) = path_variable {
6442 kinds.insert(pv.clone(), VariableKind::Path);
6443 }
6444 }
6445 LogicalPlan::ShortestPath {
6446 input,
6447 source_variable,
6448 target_variable,
6449 path_variable,
6450 ..
6451 }
6452 | LogicalPlan::AllShortestPaths {
6453 input,
6454 source_variable,
6455 target_variable,
6456 path_variable,
6457 ..
6458 } => {
6459 collect_variable_kinds(input, kinds);
6460 kinds.insert(source_variable.clone(), VariableKind::Node);
6461 kinds.insert(target_variable.clone(), VariableKind::Node);
6462 kinds.insert(path_variable.clone(), VariableKind::Path);
6463 }
6464 LogicalPlan::QuantifiedPattern {
6465 input,
6466 pattern_plan,
6467 path_variable,
6468 start_variable,
6469 binding_variable,
6470 ..
6471 } => {
6472 collect_variable_kinds(input, kinds);
6473 collect_variable_kinds(pattern_plan, kinds);
6474 kinds.insert(start_variable.clone(), VariableKind::Node);
6475 kinds.insert(binding_variable.clone(), VariableKind::Node);
6476 if let Some(pv) = path_variable {
6477 kinds.insert(pv.clone(), VariableKind::Path);
6478 }
6479 }
6480 LogicalPlan::BindZeroLengthPath {
6481 input,
6482 node_variable,
6483 path_variable,
6484 } => {
6485 collect_variable_kinds(input, kinds);
6486 kinds.insert(node_variable.clone(), VariableKind::Node);
6487 kinds.insert(path_variable.clone(), VariableKind::Path);
6488 }
6489 LogicalPlan::BindPath {
6490 input,
6491 node_variables,
6492 edge_variables,
6493 path_variable,
6494 } => {
6495 collect_variable_kinds(input, kinds);
6496 for nv in node_variables {
6497 kinds.insert(nv.clone(), VariableKind::Node);
6498 }
6499 for ev in edge_variables {
6500 kinds.insert(ev.clone(), VariableKind::Edge);
6501 }
6502 kinds.insert(path_variable.clone(), VariableKind::Path);
6503 }
6504 LogicalPlan::Filter { input, .. }
6506 | LogicalPlan::Project { input, .. }
6507 | LogicalPlan::Sort { input, .. }
6508 | LogicalPlan::Limit { input, .. }
6509 | LogicalPlan::Aggregate { input, .. }
6510 | LogicalPlan::Distinct { input, .. }
6511 | LogicalPlan::Window { input, .. }
6512 | LogicalPlan::Unwind { input, .. }
6513 | LogicalPlan::Create { input, .. }
6514 | LogicalPlan::CreateBatch { input, .. }
6515 | LogicalPlan::Merge { input, .. }
6516 | LogicalPlan::Set { input, .. }
6517 | LogicalPlan::Remove { input, .. }
6518 | LogicalPlan::Delete { input, .. }
6519 | LogicalPlan::Foreach { input, .. }
6520 | LogicalPlan::SubqueryCall { input, .. } => {
6521 collect_variable_kinds(input, kinds);
6522 }
6523 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
6524 collect_variable_kinds(left, kinds);
6525 collect_variable_kinds(right, kinds);
6526 }
6527 LogicalPlan::Apply {
6528 input, subquery, ..
6529 } => {
6530 collect_variable_kinds(input, kinds);
6531 collect_variable_kinds(subquery, kinds);
6532 }
6533 LogicalPlan::RecursiveCTE {
6534 initial, recursive, ..
6535 } => {
6536 collect_variable_kinds(initial, kinds);
6537 collect_variable_kinds(recursive, kinds);
6538 }
6539 LogicalPlan::Explain { plan } => {
6540 collect_variable_kinds(plan, kinds);
6541 }
6542 LogicalPlan::ProcedureCall {
6543 procedure_name,
6544 yield_items,
6545 ..
6546 } => {
6547 use crate::query::df_graph::procedure_call::{
6548 is_node_yield_procedure_static, map_yield_to_canonical,
6549 };
6550 for (name, alias) in yield_items {
6551 let var = alias.as_ref().unwrap_or(name);
6552 if is_node_yield_procedure_static(procedure_name.as_str()) {
6553 let canonical = map_yield_to_canonical(name);
6554 if canonical == "node" {
6555 kinds.insert(var.clone(), VariableKind::Node);
6556 }
6557 }
6559 }
6561 }
6562 LogicalPlan::LocyProgram { .. }
6564 | LogicalPlan::LocyFold { .. }
6565 | LogicalPlan::LocyBestBy { .. }
6566 | LogicalPlan::LocyPriority { .. }
6567 | LogicalPlan::LocyDerivedScan { .. }
6568 | LogicalPlan::LocyProject { .. }
6569 | LogicalPlan::LocyModelInvoke { .. } => {}
6570 LogicalPlan::Empty
6572 | LogicalPlan::CreateVectorIndex { .. }
6573 | LogicalPlan::CreateFullTextIndex { .. }
6574 | LogicalPlan::CreateScalarIndex { .. }
6575 | LogicalPlan::CreateJsonFtsIndex { .. }
6576 | LogicalPlan::DropIndex { .. }
6577 | LogicalPlan::ShowIndexes { .. }
6578 | LogicalPlan::Copy { .. }
6579 | LogicalPlan::Backup { .. }
6580 | LogicalPlan::ShowDatabase
6581 | LogicalPlan::ShowConfig
6582 | LogicalPlan::ShowStatistics
6583 | LogicalPlan::Vacuum
6584 | LogicalPlan::Checkpoint
6585 | LogicalPlan::CopyTo { .. }
6586 | LogicalPlan::CopyFrom { .. }
6587 | LogicalPlan::CreateLabel(_)
6588 | LogicalPlan::CreateEdgeType(_)
6589 | LogicalPlan::AlterLabel(_)
6590 | LogicalPlan::AlterEdgeType(_)
6591 | LogicalPlan::DropLabel(_)
6592 | LogicalPlan::DropEdgeType(_)
6593 | LogicalPlan::CreateConstraint(_)
6594 | LogicalPlan::DropConstraint(_)
6595 | LogicalPlan::ShowConstraints(_) => {}
6596 }
6597}
6598
6599fn collect_mutation_node_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
6604 match plan {
6605 LogicalPlan::Create { input, pattern } => {
6606 collect_node_names_from_pattern(pattern, hints);
6607 collect_mutation_node_hints(input, hints);
6608 }
6609 LogicalPlan::CreateBatch { input, patterns } => {
6610 for pattern in patterns {
6611 collect_node_names_from_pattern(pattern, hints);
6612 }
6613 collect_mutation_node_hints(input, hints);
6614 }
6615 LogicalPlan::Merge { input, pattern, .. } => {
6616 collect_node_names_from_pattern(pattern, hints);
6617 collect_mutation_node_hints(input, hints);
6618 }
6619 LogicalPlan::Traverse { input, .. }
6621 | LogicalPlan::TraverseMainByType { input, .. }
6622 | LogicalPlan::Filter { input, .. }
6623 | LogicalPlan::Project { input, .. }
6624 | LogicalPlan::Sort { input, .. }
6625 | LogicalPlan::Limit { input, .. }
6626 | LogicalPlan::Aggregate { input, .. }
6627 | LogicalPlan::Distinct { input, .. }
6628 | LogicalPlan::Window { input, .. }
6629 | LogicalPlan::Unwind { input, .. }
6630 | LogicalPlan::Set { input, .. }
6631 | LogicalPlan::Remove { input, .. }
6632 | LogicalPlan::Delete { input, .. }
6633 | LogicalPlan::Foreach { input, .. }
6634 | LogicalPlan::SubqueryCall { input, .. }
6635 | LogicalPlan::ShortestPath { input, .. }
6636 | LogicalPlan::AllShortestPaths { input, .. }
6637 | LogicalPlan::QuantifiedPattern { input, .. }
6638 | LogicalPlan::BindZeroLengthPath { input, .. }
6639 | LogicalPlan::BindPath { input, .. } => {
6640 collect_mutation_node_hints(input, hints);
6641 }
6642 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
6643 collect_mutation_node_hints(left, hints);
6644 collect_mutation_node_hints(right, hints);
6645 }
6646 LogicalPlan::Apply {
6647 input, subquery, ..
6648 } => {
6649 collect_mutation_node_hints(input, hints);
6650 collect_mutation_node_hints(subquery, hints);
6651 }
6652 LogicalPlan::RecursiveCTE {
6653 initial, recursive, ..
6654 } => {
6655 collect_mutation_node_hints(initial, hints);
6656 collect_mutation_node_hints(recursive, hints);
6657 }
6658 LogicalPlan::Explain { plan } => {
6659 collect_mutation_node_hints(plan, hints);
6660 }
6661 _ => {}
6663 }
6664}
6665
6666fn collect_node_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
6668 for path in &pattern.paths {
6669 for element in &path.elements {
6670 match element {
6671 PatternElement::Node(n) => {
6672 if let Some(ref v) = n.variable
6673 && !hints.contains(v)
6674 {
6675 hints.push(v.clone());
6676 }
6677 }
6678 PatternElement::Parenthesized { pattern, .. } => {
6679 let sub = Pattern {
6680 paths: vec![pattern.as_ref().clone()],
6681 };
6682 collect_node_names_from_pattern(&sub, hints);
6683 }
6684 _ => {}
6685 }
6686 }
6687 }
6688}
6689
6690fn collect_mutation_edge_hints(plan: &LogicalPlan, hints: &mut Vec<String>) {
6694 match plan {
6695 LogicalPlan::Create { input, pattern } | LogicalPlan::Merge { input, pattern, .. } => {
6696 collect_edge_names_from_pattern(pattern, hints);
6697 collect_mutation_edge_hints(input, hints);
6698 }
6699 LogicalPlan::CreateBatch { input, patterns } => {
6700 for pattern in patterns {
6701 collect_edge_names_from_pattern(pattern, hints);
6702 }
6703 collect_mutation_edge_hints(input, hints);
6704 }
6705 LogicalPlan::Traverse { input, .. }
6707 | LogicalPlan::TraverseMainByType { input, .. }
6708 | LogicalPlan::Filter { input, .. }
6709 | LogicalPlan::Project { input, .. }
6710 | LogicalPlan::Sort { input, .. }
6711 | LogicalPlan::Limit { input, .. }
6712 | LogicalPlan::Aggregate { input, .. }
6713 | LogicalPlan::Distinct { input, .. }
6714 | LogicalPlan::Window { input, .. }
6715 | LogicalPlan::Unwind { input, .. }
6716 | LogicalPlan::Set { input, .. }
6717 | LogicalPlan::Remove { input, .. }
6718 | LogicalPlan::Delete { input, .. }
6719 | LogicalPlan::Foreach { input, .. }
6720 | LogicalPlan::SubqueryCall { input, .. }
6721 | LogicalPlan::ShortestPath { input, .. }
6722 | LogicalPlan::AllShortestPaths { input, .. }
6723 | LogicalPlan::QuantifiedPattern { input, .. }
6724 | LogicalPlan::BindZeroLengthPath { input, .. }
6725 | LogicalPlan::BindPath { input, .. } => {
6726 collect_mutation_edge_hints(input, hints);
6727 }
6728 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => {
6729 collect_mutation_edge_hints(left, hints);
6730 collect_mutation_edge_hints(right, hints);
6731 }
6732 LogicalPlan::Apply {
6733 input, subquery, ..
6734 } => {
6735 collect_mutation_edge_hints(input, hints);
6736 collect_mutation_edge_hints(subquery, hints);
6737 }
6738 LogicalPlan::RecursiveCTE {
6739 initial, recursive, ..
6740 } => {
6741 collect_mutation_edge_hints(initial, hints);
6742 collect_mutation_edge_hints(recursive, hints);
6743 }
6744 LogicalPlan::Explain { plan } => {
6745 collect_mutation_edge_hints(plan, hints);
6746 }
6747 _ => {}
6748 }
6749}
6750
6751fn collect_edge_names_from_pattern(pattern: &Pattern, hints: &mut Vec<String>) {
6753 for path in &pattern.paths {
6754 for element in &path.elements {
6755 match element {
6756 PatternElement::Relationship(r) => {
6757 if let Some(ref v) = r.variable
6758 && !hints.contains(v)
6759 {
6760 hints.push(v.clone());
6761 }
6762 }
6763 PatternElement::Parenthesized { pattern, .. } => {
6764 let sub = Pattern {
6765 paths: vec![pattern.as_ref().clone()],
6766 };
6767 collect_edge_names_from_pattern(&sub, hints);
6768 }
6769 _ => {}
6770 }
6771 }
6772 }
6773}
6774
6775fn convert_direction(ast_dir: AstDirection) -> Direction {
6777 match ast_dir {
6778 AstDirection::Outgoing => Direction::Outgoing,
6779 AstDirection::Incoming => Direction::Incoming,
6780 AstDirection::Both => Direction::Both,
6781 }
6782}
6783
6784fn sanitize_vlp_target_properties(
6789 mut properties: Vec<String>,
6790 target_has_wildcard: bool,
6791 target_label_props: Option<&HashSet<String>>,
6792) -> Vec<String> {
6793 properties.retain(|p| p != "*");
6794
6795 if target_has_wildcard && properties.is_empty() {
6796 properties.push("_all_props".to_string());
6797 }
6798
6799 let has_non_schema_props = properties.iter().any(|p| {
6800 p != "_all_props"
6801 && p != "overflow_json"
6802 && !p.starts_with('_')
6803 && !target_label_props.is_some_and(|props| props.contains(p))
6804 });
6805 if has_non_schema_props && !properties.iter().any(|p| p == "_all_props") {
6806 properties.push("_all_props".to_string());
6807 }
6808
6809 properties
6810}
6811
6812struct JoinPredicateClassification {
6819 equi_pairs: Vec<(Expr, Expr)>,
6823 left_only: Vec<Expr>,
6826 right_only: Vec<Expr>,
6829 residual: Option<Expr>,
6832}
6833
6834fn collect_plan_variables(plan: &LogicalPlan) -> HashSet<String> {
6838 let mut out = HashSet::new();
6839 collect_plan_variables_into(plan, &mut out);
6840 out
6841}
6842
6843fn collect_plan_variables_into(plan: &LogicalPlan, out: &mut HashSet<String>) {
6844 match plan {
6845 LogicalPlan::Scan { variable, .. }
6846 | LogicalPlan::ExtIdLookup { variable, .. }
6847 | LogicalPlan::ScanAll { variable, .. }
6848 | LogicalPlan::ScanMainByLabels { variable, .. } => {
6849 out.insert(variable.clone());
6850 }
6851 LogicalPlan::Unwind {
6852 input, variable, ..
6853 } => {
6854 out.insert(variable.clone());
6855 collect_plan_variables_into(input, out);
6856 }
6857 LogicalPlan::Traverse {
6858 input,
6859 source_variable,
6860 target_variable,
6861 step_variable,
6862 path_variable,
6863 ..
6864 } => {
6865 collect_plan_variables_into(input, out);
6866 out.insert(source_variable.clone());
6867 out.insert(target_variable.clone());
6868 if let Some(s) = step_variable {
6869 out.insert(s.clone());
6870 }
6871 if let Some(p) = path_variable {
6872 out.insert(p.clone());
6873 }
6874 }
6875 LogicalPlan::TraverseMainByType {
6876 input,
6877 source_variable,
6878 target_variable,
6879 step_variable,
6880 path_variable,
6881 ..
6882 } => {
6883 collect_plan_variables_into(input, out);
6884 out.insert(source_variable.clone());
6885 out.insert(target_variable.clone());
6886 if let Some(s) = step_variable {
6887 out.insert(s.clone());
6888 }
6889 if let Some(p) = path_variable {
6890 out.insert(p.clone());
6891 }
6892 }
6893 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
6894 collect_plan_variables_into(left, out);
6895 collect_plan_variables_into(right, out);
6896 }
6897 LogicalPlan::Apply {
6898 input, subquery, ..
6899 } => {
6900 collect_plan_variables_into(input, out);
6901 collect_plan_variables_into(subquery, out);
6902 }
6903 LogicalPlan::Filter { input, .. }
6904 | LogicalPlan::Project { input, .. }
6905 | LogicalPlan::Sort { input, .. }
6906 | LogicalPlan::Limit { input, .. }
6907 | LogicalPlan::Aggregate { input, .. }
6908 | LogicalPlan::Distinct { input }
6909 | LogicalPlan::Window { input, .. }
6910 | LogicalPlan::Create { input, .. }
6911 | LogicalPlan::CreateBatch { input, .. }
6912 | LogicalPlan::Merge { input, .. }
6913 | LogicalPlan::Set { input, .. }
6914 | LogicalPlan::Remove { input, .. }
6915 | LogicalPlan::Delete { input, .. }
6916 | LogicalPlan::Foreach { input, .. }
6917 | LogicalPlan::SubqueryCall { input, .. } => {
6918 collect_plan_variables_into(input, out);
6919 }
6920 _ => {}
6922 }
6923}
6924
6925fn collect_expr_variables_set(expr: &Expr) -> HashSet<String> {
6927 let mut out = HashSet::new();
6928 collect_expr_variables_into(expr, &mut out);
6929 out
6930}
6931
6932fn collect_expr_variables_into(expr: &Expr, out: &mut HashSet<String>) {
6933 use uni_cypher::ast::Expr as E;
6934 match expr {
6935 E::Variable(v) => {
6936 out.insert(v.clone());
6937 }
6938 E::Property(base, _) => collect_expr_variables_into(base, out),
6939 E::BinaryOp { left, right, .. } => {
6940 collect_expr_variables_into(left, out);
6941 collect_expr_variables_into(right, out);
6942 }
6943 E::UnaryOp { expr, .. } | E::IsNull(expr) | E::IsNotNull(expr) | E::IsUnique(expr) => {
6944 collect_expr_variables_into(expr, out)
6945 }
6946 E::FunctionCall { args, .. } => {
6947 for a in args {
6948 collect_expr_variables_into(a, out);
6949 }
6950 }
6951 E::List(items) => {
6952 for it in items {
6953 collect_expr_variables_into(it, out);
6954 }
6955 }
6956 E::In { expr, list } => {
6957 collect_expr_variables_into(expr, out);
6958 collect_expr_variables_into(list, out);
6959 }
6960 E::Case {
6961 expr,
6962 when_then,
6963 else_expr,
6964 } => {
6965 if let Some(e) = expr {
6966 collect_expr_variables_into(e, out);
6967 }
6968 for (w, t) in when_then {
6969 collect_expr_variables_into(w, out);
6970 collect_expr_variables_into(t, out);
6971 }
6972 if let Some(e) = else_expr {
6973 collect_expr_variables_into(e, out);
6974 }
6975 }
6976 E::Map(entries) => {
6977 for (_, v) in entries {
6978 collect_expr_variables_into(v, out);
6979 }
6980 }
6981 E::LabelCheck { expr, .. } => collect_expr_variables_into(expr, out),
6982 E::ArrayIndex { array, index } => {
6983 collect_expr_variables_into(array, out);
6984 collect_expr_variables_into(index, out);
6985 }
6986 E::ArraySlice { array, start, end } => {
6987 collect_expr_variables_into(array, out);
6988 if let Some(s) = start {
6989 collect_expr_variables_into(s, out);
6990 }
6991 if let Some(e) = end {
6992 collect_expr_variables_into(e, out);
6993 }
6994 }
6995 _ => {}
6998 }
6999}
7000
7001fn split_and_conjuncts(predicate: &Expr) -> Vec<Expr> {
7003 use uni_cypher::ast::BinaryOp;
7004 let mut out = Vec::new();
7005 fn walk(e: &Expr, out: &mut Vec<Expr>) {
7006 if let Expr::BinaryOp {
7007 left,
7008 op: BinaryOp::And,
7009 right,
7010 } = e
7011 {
7012 walk(left, out);
7013 walk(right, out);
7014 } else {
7015 out.push(e.clone());
7016 }
7017 }
7018 walk(predicate, &mut out);
7019 out
7020}
7021
7022fn and_combine(exprs: Vec<Expr>) -> Option<Expr> {
7024 use uni_cypher::ast::BinaryOp;
7025 let mut iter = exprs.into_iter();
7026 let first = iter.next()?;
7027 Some(iter.fold(first, |acc, e| Expr::BinaryOp {
7028 left: Box::new(acc),
7029 op: BinaryOp::And,
7030 right: Box::new(e),
7031 }))
7032}
7033
7034fn classify_join_predicate(
7037 predicate: &Expr,
7038 left_vars: &HashSet<String>,
7039 right_vars: &HashSet<String>,
7040) -> JoinPredicateClassification {
7041 use uni_cypher::ast::BinaryOp;
7042
7043 let mut equi_pairs = Vec::new();
7044 let mut left_only = Vec::new();
7045 let mut right_only = Vec::new();
7046 let mut residual_parts: Vec<Expr> = Vec::new();
7047
7048 for conjunct in split_and_conjuncts(predicate) {
7049 if let Expr::BinaryOp {
7052 left,
7053 op: BinaryOp::Eq,
7054 right,
7055 } = &conjunct
7056 {
7057 let lv = collect_expr_variables_set(left);
7058 let rv = collect_expr_variables_set(right);
7059 let l_in_left = !lv.is_empty() && lv.is_subset(left_vars);
7060 let r_in_right = !rv.is_empty() && rv.is_subset(right_vars);
7061 let l_in_right = !lv.is_empty() && lv.is_subset(right_vars);
7062 let r_in_left = !rv.is_empty() && rv.is_subset(left_vars);
7063 if l_in_left && r_in_right {
7064 equi_pairs.push(((**left).clone(), (**right).clone()));
7065 continue;
7066 }
7067 if l_in_right && r_in_left {
7068 equi_pairs.push(((**right).clone(), (**left).clone()));
7069 continue;
7070 }
7071 }
7072
7073 let vars = collect_expr_variables_set(&conjunct);
7075 let touches_left = vars.iter().any(|v| left_vars.contains(v));
7076 let touches_right = vars.iter().any(|v| right_vars.contains(v));
7077 match (touches_left, touches_right) {
7078 (true, false) => left_only.push(conjunct),
7079 (false, true) => right_only.push(conjunct),
7080 _ => residual_parts.push(conjunct),
7082 }
7083 }
7084
7085 JoinPredicateClassification {
7086 equi_pairs,
7087 left_only,
7088 right_only,
7089 residual: and_combine(residual_parts),
7090 }
7091}
7092
7093const MAX_UNWIND_IN_PUSHDOWN_VALUES: usize = 10_000;
7097
7098fn warn_unpushable_unwind_once(reason: &'static str) {
7111 use std::sync::atomic::{AtomicBool, Ordering};
7112 static WARNED: AtomicBool = AtomicBool::new(false);
7113 if WARNED.swap(true, Ordering::Relaxed) {
7114 return;
7115 }
7116 tracing::warn!(
7117 target: "uni_query::cross_join_in_pushdown",
7118 reason,
7119 "Inlined UNWIND of map literals failed pushdown — falling back \
7120 to FilterExec over a full scan. Rewrite as `UNWIND $param AS u` \
7121 with the param bound as a List<Map<...>> to guarantee pushdown."
7122 );
7123}
7124
7125fn value_to_cypher_literal(v: &uni_common::Value) -> Option<CypherLiteral> {
7126 use uni_common::Value;
7127 match v {
7128 Value::Null => Some(CypherLiteral::Null),
7129 Value::Bool(b) => Some(CypherLiteral::Bool(*b)),
7130 Value::Int(n) => Some(CypherLiteral::Integer(*n)),
7131 Value::Float(f) => Some(CypherLiteral::Float(*f)),
7132 Value::String(s) => Some(CypherLiteral::String(s.clone())),
7133 _ => None,
7134 }
7135}
7136
7137fn walk_static_unwind_chain<F, T>(
7157 plan: &LogicalPlan,
7158 target_var: &str,
7159 extract: &mut F,
7160) -> Option<T>
7161where
7162 F: FnMut(&Expr) -> Option<T>,
7163{
7164 match plan {
7165 LogicalPlan::Unwind {
7166 input,
7167 expr,
7168 variable,
7169 } if variable == target_var => {
7170 extract(expr).or_else(|| walk_static_unwind_chain(input, target_var, extract))
7171 }
7172 LogicalPlan::Filter { input, .. }
7174 | LogicalPlan::Project { input, .. }
7175 | LogicalPlan::Unwind { input, .. } => walk_static_unwind_chain(input, target_var, extract),
7176 LogicalPlan::CrossJoin { left, right } => {
7179 walk_static_unwind_chain(left, target_var, extract)
7180 .or_else(|| walk_static_unwind_chain(right, target_var, extract))
7181 }
7182 _ => None,
7183 }
7184}
7185
7186fn extract_static_unwind_values(
7187 plan: &LogicalPlan,
7188 target_var: &str,
7189 params: &HashMap<String, uni_common::Value>,
7190) -> Option<Vec<Expr>> {
7191 walk_static_unwind_chain(plan, target_var, &mut |expr| {
7192 materialize_unwind_source(expr, params)
7193 })
7194}
7195
7196fn extract_static_unwind_field_values(
7199 plan: &LogicalPlan,
7200 target_var: &str,
7201 field: &str,
7202 params: &HashMap<String, uni_common::Value>,
7203) -> Option<Vec<Expr>> {
7204 walk_static_unwind_chain(plan, target_var, &mut |expr| {
7205 materialize_unwind_source_field(expr, params, field)
7206 })
7207}
7208
7209fn materialize_unwind_source(
7211 expr: &Expr,
7212 params: &HashMap<String, uni_common::Value>,
7213) -> Option<Vec<Expr>> {
7214 match expr {
7215 Expr::List(items) => {
7216 if items.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7217 return None;
7218 }
7219 let mut out = Vec::with_capacity(items.len());
7220 for item in items {
7221 match item {
7222 Expr::Literal(_) => out.push(item.clone()),
7223 _ => return None,
7224 }
7225 }
7226 Some(out)
7227 }
7228 Expr::Parameter(name) => match params.get(name)? {
7229 uni_common::Value::List(values) => {
7230 if values.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7231 return None;
7232 }
7233 let mut out = Vec::with_capacity(values.len());
7234 for v in values {
7235 out.push(Expr::Literal(value_to_cypher_literal(v)?));
7236 }
7237 Some(out)
7238 }
7239 _ => None,
7240 },
7241 _ => None,
7242 }
7243}
7244
7245fn materialize_unwind_source_field(
7255 expr: &Expr,
7256 params: &HashMap<String, uni_common::Value>,
7257 field: &str,
7258) -> Option<Vec<Expr>> {
7259 match expr {
7260 Expr::List(items) => {
7261 if items.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7262 warn_unpushable_unwind_once("UNWIND list exceeds MAX_UNWIND_IN_PUSHDOWN_VALUES");
7263 return None;
7264 }
7265 let mut out = Vec::with_capacity(items.len());
7278 for item in items {
7279 let entries = match item {
7280 Expr::Map(entries) => entries,
7281 _ => return None,
7282 };
7283 let Some((_, value_expr)) = entries.iter().find(|(k, _)| k == field) else {
7284 warn_unpushable_unwind_once(
7285 "UNWIND map literal is missing the field referenced by the join predicate",
7286 );
7287 return None;
7288 };
7289 let Expr::Literal(_) = value_expr else {
7290 warn_unpushable_unwind_once(
7291 "UNWIND map literal has a non-literal value at the joined field \
7292 (e.g., a parameter or function call) — substitute with a literal \
7293 or rewrite as `UNWIND $param AS u` with the param bound at runtime",
7294 );
7295 return None;
7296 };
7297 out.push(value_expr.clone());
7298 }
7299 Some(out)
7300 }
7301 Expr::Parameter(name) => match params.get(name)? {
7302 uni_common::Value::List(values) => {
7303 if values.len() > MAX_UNWIND_IN_PUSHDOWN_VALUES {
7304 return None;
7305 }
7306 let mut out = Vec::with_capacity(values.len());
7307 for v in values {
7308 let map = match v {
7309 uni_common::Value::Map(m) => m,
7310 _ => return None,
7311 };
7312 let inner = map.get(field)?;
7313 out.push(Expr::Literal(value_to_cypher_literal(inner)?));
7314 }
7315 Some(out)
7316 }
7317 _ => None,
7318 },
7319 _ => None,
7320 }
7321}
7322
7323fn build_in_pushdown(
7335 unwind_side_expr: &Expr,
7336 scan_side_expr: &Expr,
7337 unwind_subplan: &LogicalPlan,
7338 params: &HashMap<String, uni_common::Value>,
7339) -> Option<Expr> {
7340 let (unwind_var, field) = match unwind_side_expr {
7342 Expr::Variable(v) => (v.as_str(), None),
7343 Expr::Property(box_var, f) => match box_var.as_ref() {
7344 Expr::Variable(v) => (v.as_str(), Some(f.as_str())),
7345 _ => {
7346 tracing::debug!(
7347 target: "uni_query::cross_join_in_pushdown",
7348 reason = "unwind side Property inner is not Variable",
7349 "build_in_pushdown rejected"
7350 );
7351 return None;
7352 }
7353 },
7354 _ => {
7355 tracing::debug!(
7356 target: "uni_query::cross_join_in_pushdown",
7357 reason = "unwind side is not Variable or Property",
7358 unwind_kind = std::any::type_name_of_val(&unwind_side_expr),
7359 "build_in_pushdown rejected"
7360 );
7361 return None;
7362 }
7363 };
7364
7365 let Expr::Property(scan_box_var, _scan_field) = scan_side_expr else {
7369 tracing::debug!(
7370 target: "uni_query::cross_join_in_pushdown",
7371 reason = "scan side is not Property",
7372 "build_in_pushdown rejected"
7373 );
7374 return None;
7375 };
7376 if !matches!(scan_box_var.as_ref(), Expr::Variable(_)) {
7377 tracing::debug!(
7378 target: "uni_query::cross_join_in_pushdown",
7379 reason = "scan side Property inner is not Variable",
7380 "build_in_pushdown rejected"
7381 );
7382 return None;
7383 }
7384
7385 let values = match field {
7390 None => match extract_static_unwind_values(unwind_subplan, unwind_var, params) {
7391 Some(v) => v,
7392 None => {
7393 tracing::debug!(
7394 target: "uni_query::cross_join_in_pushdown",
7395 reason = "extract_static_unwind_values returned None",
7396 unwind_var,
7397 "build_in_pushdown rejected"
7398 );
7399 return None;
7400 }
7401 },
7402 Some(f) => {
7403 match extract_static_unwind_field_values(unwind_subplan, unwind_var, f, params) {
7404 Some(v) => v,
7405 None => {
7406 tracing::debug!(
7407 target: "uni_query::cross_join_in_pushdown",
7408 reason = "extract_static_unwind_field_values returned None \
7409 (UNWIND source is not Expr::Parameter, or param is not \
7410 Value::List<Value::Map>, or a map element lacks field, \
7411 or list size exceeded MAX_UNWIND_IN_PUSHDOWN_VALUES)",
7412 unwind_var,
7413 field = f,
7414 "build_in_pushdown rejected"
7415 );
7416 return None;
7417 }
7418 }
7419 }
7420 };
7421 if values.is_empty() {
7422 tracing::debug!(
7423 target: "uni_query::cross_join_in_pushdown",
7424 reason = "extracted value list is empty",
7425 unwind_var,
7426 ?field,
7427 "build_in_pushdown rejected"
7428 );
7429 return None;
7430 }
7431
7432 tracing::debug!(
7433 target: "uni_query::cross_join_in_pushdown",
7434 unwind_var,
7435 ?field,
7436 values_count = values.len(),
7437 "build_in_pushdown extracted IN-list"
7438 );
7439 Some(Expr::In {
7440 expr: Box::new(scan_side_expr.clone()),
7441 list: Box::new(Expr::List(values)),
7442 })
7443}
7444
7445fn expr_is_vid_property(expr: &Expr) -> bool {
7452 matches!(
7453 expr,
7454 Expr::Property(inner, prop)
7455 if prop == "_vid" && matches!(inner.as_ref(), Expr::Variable(_))
7456 )
7457}
7458
7459fn wrap_with_filter(plan: LogicalPlan, filters: &[Expr]) -> LogicalPlan {
7460 if filters.is_empty() {
7461 return plan;
7462 }
7463 let predicate = and_combine(filters.to_vec()).expect("non-empty filters");
7464 match plan {
7472 LogicalPlan::Scan {
7473 label_id,
7474 labels,
7475 variable,
7476 filter: existing,
7477 optional,
7478 } => LogicalPlan::Scan {
7479 label_id,
7480 labels,
7481 variable,
7482 filter: merge_filter(existing, predicate),
7483 optional,
7484 },
7485 LogicalPlan::ScanMainByLabels {
7486 labels,
7487 variable,
7488 filter: existing,
7489 optional,
7490 } => LogicalPlan::ScanMainByLabels {
7491 labels,
7492 variable,
7493 filter: merge_filter(existing, predicate),
7494 optional,
7495 },
7496 LogicalPlan::ScanAll {
7497 variable,
7498 filter: existing,
7499 optional,
7500 } => LogicalPlan::ScanAll {
7501 variable,
7502 filter: merge_filter(existing, predicate),
7503 optional,
7504 },
7505 other => LogicalPlan::Filter {
7509 input: Box::new(other),
7510 predicate,
7511 optional_variables: HashSet::new(),
7512 },
7513 }
7514}
7515
7516fn merge_filter(existing: Option<Expr>, predicate: Expr) -> Option<Expr> {
7524 match existing {
7525 Some(prev) if prev == predicate => Some(prev),
7526 Some(prev) => and_combine(vec![prev, predicate]),
7527 None => Some(predicate),
7528 }
7529}
7530
7531fn merge_unwind_in_filters(
7569 plan: &LogicalPlan,
7570 params: &HashMap<String, uni_common::Value>,
7571) -> LogicalPlan {
7572 match plan {
7573 LogicalPlan::Filter {
7575 input,
7576 predicate,
7577 optional_variables,
7578 } if matches!(input.as_ref(), LogicalPlan::CrossJoin { .. }) => {
7579 let LogicalPlan::CrossJoin { left, right } = input.as_ref() else {
7581 unreachable!("matches! above guarantees CrossJoin")
7582 };
7583
7584 let left_rewritten = merge_unwind_in_filters(left, params);
7586 let right_rewritten = merge_unwind_in_filters(right, params);
7587
7588 let left_vars = collect_plan_variables(&left_rewritten);
7589 let right_vars = collect_plan_variables(&right_rewritten);
7590 let cls = classify_join_predicate(predicate, &left_vars, &right_vars);
7591
7592 let rebuild_unmodified = |l: LogicalPlan, r: LogicalPlan| LogicalPlan::Filter {
7593 input: Box::new(LogicalPlan::CrossJoin {
7594 left: Box::new(l),
7595 right: Box::new(r),
7596 }),
7597 predicate: predicate.clone(),
7598 optional_variables: optional_variables.clone(),
7599 };
7600
7601 if cls.equi_pairs.is_empty() {
7602 return rebuild_unmodified(left_rewritten, right_rewritten);
7603 }
7604
7605 let mut left_extra_in: Vec<Expr> = Vec::new();
7609 let mut right_extra_in: Vec<Expr> = Vec::new();
7610 for (l_expr, r_expr) in &cls.equi_pairs {
7611 if let Some(in_filter) = build_in_pushdown(l_expr, r_expr, &left_rewritten, params)
7612 {
7613 right_extra_in.push(in_filter);
7614 continue;
7615 }
7616 if let Some(in_filter) = build_in_pushdown(r_expr, l_expr, &left_rewritten, params)
7617 {
7618 right_extra_in.push(in_filter);
7619 continue;
7620 }
7621 if let Some(in_filter) = build_in_pushdown(l_expr, r_expr, &right_rewritten, params)
7622 {
7623 left_extra_in.push(in_filter);
7624 continue;
7625 }
7626 if let Some(in_filter) = build_in_pushdown(r_expr, l_expr, &right_rewritten, params)
7627 {
7628 left_extra_in.push(in_filter);
7629 }
7630 }
7631
7632 tracing::debug!(
7633 target: "uni_query::cross_join_in_pushdown",
7634 left_in_filters = left_extra_in.len(),
7635 right_in_filters = right_extra_in.len(),
7636 "merge_unwind_in_filters: IN-pushdown result"
7637 );
7638
7639 if left_extra_in.is_empty() && right_extra_in.is_empty() {
7640 return rebuild_unmodified(left_rewritten, right_rewritten);
7641 }
7642
7643 let left_merged = wrap_with_filter(left_rewritten, &left_extra_in);
7644 let right_merged = wrap_with_filter(right_rewritten, &right_extra_in);
7645 rebuild_unmodified(left_merged, right_merged)
7646 }
7647 LogicalPlan::Filter {
7649 input,
7650 predicate,
7651 optional_variables,
7652 } => LogicalPlan::Filter {
7653 input: Box::new(merge_unwind_in_filters(input, params)),
7654 predicate: predicate.clone(),
7655 optional_variables: optional_variables.clone(),
7656 },
7657 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
7659 input: Box::new(merge_unwind_in_filters(input, params)),
7660 projections: projections.clone(),
7661 },
7662 LogicalPlan::Sort { input, order_by } => LogicalPlan::Sort {
7663 input: Box::new(merge_unwind_in_filters(input, params)),
7664 order_by: order_by.clone(),
7665 },
7666 LogicalPlan::Limit { input, skip, fetch } => LogicalPlan::Limit {
7667 input: Box::new(merge_unwind_in_filters(input, params)),
7668 skip: *skip,
7669 fetch: *fetch,
7670 },
7671 LogicalPlan::Distinct { input } => LogicalPlan::Distinct {
7672 input: Box::new(merge_unwind_in_filters(input, params)),
7673 },
7674 LogicalPlan::Unwind {
7675 input,
7676 expr,
7677 variable,
7678 } => LogicalPlan::Unwind {
7679 input: Box::new(merge_unwind_in_filters(input, params)),
7680 expr: expr.clone(),
7681 variable: variable.clone(),
7682 },
7683 LogicalPlan::Set { input, items } => LogicalPlan::Set {
7688 input: Box::new(merge_unwind_in_filters(input, params)),
7689 items: items.clone(),
7690 },
7691 LogicalPlan::Remove { input, items } => LogicalPlan::Remove {
7692 input: Box::new(merge_unwind_in_filters(input, params)),
7693 items: items.clone(),
7694 },
7695 LogicalPlan::Delete {
7696 input,
7697 items,
7698 detach,
7699 } => LogicalPlan::Delete {
7700 input: Box::new(merge_unwind_in_filters(input, params)),
7701 items: items.clone(),
7702 detach: *detach,
7703 },
7704 LogicalPlan::Create { input, pattern } => LogicalPlan::Create {
7705 input: Box::new(merge_unwind_in_filters(input, params)),
7706 pattern: pattern.clone(),
7707 },
7708 LogicalPlan::CreateBatch { input, patterns } => LogicalPlan::CreateBatch {
7709 input: Box::new(merge_unwind_in_filters(input, params)),
7710 patterns: patterns.clone(),
7711 },
7712 LogicalPlan::Merge {
7713 input,
7714 pattern,
7715 on_match,
7716 on_create,
7717 } => LogicalPlan::Merge {
7718 input: Box::new(merge_unwind_in_filters(input, params)),
7719 pattern: pattern.clone(),
7720 on_match: on_match.clone(),
7721 on_create: on_create.clone(),
7722 },
7723 LogicalPlan::Foreach {
7724 input,
7725 variable,
7726 list,
7727 body,
7728 } => LogicalPlan::Foreach {
7729 input: Box::new(merge_unwind_in_filters(input, params)),
7730 variable: variable.clone(),
7731 list: list.clone(),
7732 body: body
7733 .iter()
7734 .map(|b| merge_unwind_in_filters(b, params))
7735 .collect(),
7736 },
7737 LogicalPlan::Aggregate {
7739 input,
7740 group_by,
7741 aggregates,
7742 } => LogicalPlan::Aggregate {
7743 input: Box::new(merge_unwind_in_filters(input, params)),
7744 group_by: group_by.clone(),
7745 aggregates: aggregates.clone(),
7746 },
7747 LogicalPlan::Window {
7748 input,
7749 window_exprs,
7750 } => LogicalPlan::Window {
7751 input: Box::new(merge_unwind_in_filters(input, params)),
7752 window_exprs: window_exprs.clone(),
7753 },
7754 LogicalPlan::SubqueryCall { input, subquery } => LogicalPlan::SubqueryCall {
7755 input: Box::new(merge_unwind_in_filters(input, params)),
7756 subquery: Box::new(merge_unwind_in_filters(subquery, params)),
7757 },
7758 LogicalPlan::CrossJoin { left, right } => LogicalPlan::CrossJoin {
7760 left: Box::new(merge_unwind_in_filters(left, params)),
7761 right: Box::new(merge_unwind_in_filters(right, params)),
7762 },
7763 LogicalPlan::Union { left, right, all } => LogicalPlan::Union {
7764 left: Box::new(merge_unwind_in_filters(left, params)),
7765 right: Box::new(merge_unwind_in_filters(right, params)),
7766 all: *all,
7767 },
7768 LogicalPlan::Apply {
7770 input,
7771 subquery,
7772 input_filter,
7773 } => LogicalPlan::Apply {
7774 input: Box::new(merge_unwind_in_filters(input, params)),
7775 subquery: Box::new(merge_unwind_in_filters(subquery, params)),
7776 input_filter: input_filter.clone(),
7777 },
7778 _ => plan.clone(),
7785 }
7786}
7787
7788fn is_hashable_native_dtype(dt: &DataType) -> bool {
7792 matches!(
7793 dt,
7794 DataType::Boolean
7795 | DataType::Int8
7796 | DataType::Int16
7797 | DataType::Int32
7798 | DataType::Int64
7799 | DataType::UInt8
7800 | DataType::UInt16
7801 | DataType::UInt32
7802 | DataType::UInt64
7803 | DataType::Float32
7804 | DataType::Float64
7805 | DataType::Utf8
7806 | DataType::LargeUtf8
7807 | DataType::Binary
7808 | DataType::LargeBinary
7809 | DataType::Date32
7810 | DataType::Date64
7811 )
7812}
7813
7814fn tointeger_accepts_dtype(dt: &DataType) -> bool {
7817 matches!(
7818 dt,
7819 DataType::Int8
7820 | DataType::Int16
7821 | DataType::Int32
7822 | DataType::Int64
7823 | DataType::UInt8
7824 | DataType::UInt16
7825 | DataType::UInt32
7826 | DataType::UInt64
7827 | DataType::Float32
7828 | DataType::Float64
7829 | DataType::LargeBinary
7830 )
7831}
7832
7833fn wrap_with_unary_udf(
7835 expr: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7836 udf: Arc<datafusion::logical_expr::ScalarUDF>,
7837 return_dt: DataType,
7838) -> Arc<dyn datafusion::physical_plan::PhysicalExpr> {
7839 let config_options = Arc::new(datafusion::config::ConfigOptions::default());
7840 let udf_name = udf.name().to_string();
7841 let return_field = Arc::new(arrow_schema::Field::new(&udf_name, return_dt, true));
7842 Arc::new(datafusion::physical_expr::ScalarFunctionExpr::new(
7843 &udf_name,
7844 udf,
7845 vec![expr],
7846 return_field,
7847 config_options,
7848 ))
7849}
7850
7851fn unify_join_key_types(
7866 left: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7867 right: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7868 left_schema: &Schema,
7869 right_schema: &Schema,
7870 state: &SessionState,
7871) -> Option<(
7872 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7873 Arc<dyn datafusion::physical_plan::PhysicalExpr>,
7874)> {
7875 let l_dt = left.data_type(left_schema).ok()?;
7876 let r_dt = right.data_type(right_schema).ok()?;
7877
7878 if l_dt == r_dt && is_hashable_native_dtype(&l_dt) {
7879 return Some((left, right));
7880 }
7881
7882 if tointeger_accepts_dtype(&l_dt) && tointeger_accepts_dtype(&r_dt) {
7883 let udf = state.scalar_functions().get("tointeger")?.clone();
7884 return Some((
7885 wrap_with_unary_udf(left, udf.clone(), DataType::Int64),
7886 wrap_with_unary_udf(right, udf, DataType::Int64),
7887 ));
7888 }
7889
7890 None
7894}
7895
7896#[cfg(test)]
7897mod tests {
7898 use super::*;
7899
7900 #[test]
7901 fn test_convert_direction() {
7902 assert!(matches!(
7903 convert_direction(AstDirection::Outgoing),
7904 Direction::Outgoing
7905 ));
7906 assert!(matches!(
7907 convert_direction(AstDirection::Incoming),
7908 Direction::Incoming
7909 ));
7910 assert!(matches!(
7911 convert_direction(AstDirection::Both),
7912 Direction::Both
7913 ));
7914 }
7915
7916 #[test]
7917 fn test_sanitize_vlp_target_properties_removes_wildcard() {
7918 let props = vec!["*".to_string(), "name".to_string()];
7919 let label_props = HashSet::from(["name".to_string()]);
7920 let sanitized = sanitize_vlp_target_properties(props, true, Some(&label_props));
7921
7922 assert_eq!(sanitized, vec!["name".to_string()]);
7923 }
7924
7925 #[test]
7926 fn test_sanitize_vlp_target_properties_adds_all_props_for_wildcard_empty() {
7927 let props = vec!["*".to_string()];
7928 let sanitized = sanitize_vlp_target_properties(props, true, None);
7929
7930 assert_eq!(sanitized, vec!["_all_props".to_string()]);
7931 }
7932
7933 #[test]
7934 fn test_sanitize_vlp_target_properties_adds_all_props_for_non_schema() {
7935 let props = vec!["custom_prop".to_string()];
7936 let label_props = HashSet::from(["name".to_string()]);
7937 let sanitized = sanitize_vlp_target_properties(props, false, Some(&label_props));
7938
7939 assert_eq!(
7940 sanitized,
7941 vec!["custom_prop".to_string(), "_all_props".to_string()]
7942 );
7943 }
7944
7945 use uni_cypher::ast::CypherLiteral;
7957
7958 fn int_lit(n: i64) -> Expr {
7959 Expr::Literal(CypherLiteral::Integer(n))
7960 }
7961
7962 fn str_lit(s: &str) -> Expr {
7963 Expr::Literal(CypherLiteral::String(s.to_string()))
7964 }
7965
7966 fn map_entry(k: &str, v: Expr) -> (String, Expr) {
7967 (k.to_string(), v)
7968 }
7969
7970 #[test]
7971 fn materialize_unwind_field_accepts_inlined_map_literals() {
7972 let unwind_expr = Expr::List(vec![
7974 Expr::Map(vec![
7975 map_entry("nid", int_lit(64)),
7976 map_entry("x", int_lit(1)),
7977 ]),
7978 Expr::Map(vec![
7979 map_entry("nid", int_lit(65)),
7980 map_entry("x", int_lit(2)),
7981 ]),
7982 ]);
7983 let params = HashMap::new();
7984 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
7985 let values = result.expect("literal-map UNWIND should produce an IN-list");
7986 assert_eq!(values.len(), 2);
7987 assert!(matches!(
7988 &values[0],
7989 Expr::Literal(CypherLiteral::Integer(64))
7990 ));
7991 assert!(matches!(
7992 &values[1],
7993 Expr::Literal(CypherLiteral::Integer(65))
7994 ));
7995 }
7996
7997 #[test]
7998 fn materialize_unwind_field_handles_mixed_primitive_field_types() {
7999 let unwind_expr = Expr::List(vec![
8002 Expr::Map(vec![map_entry("k", str_lit("a"))]),
8003 Expr::Map(vec![map_entry("k", str_lit("b"))]),
8004 ]);
8005 let params = HashMap::new();
8006 let values = materialize_unwind_source_field(&unwind_expr, ¶ms, "k")
8007 .expect("literal-map UNWIND should produce an IN-list");
8008 assert_eq!(values.len(), 2);
8009 }
8010
8011 #[test]
8012 fn materialize_unwind_field_rejects_non_literal_value_at_target_field() {
8013 let unwind_expr = Expr::List(vec![Expr::Map(vec![map_entry(
8017 "nid",
8018 Expr::Parameter("p".to_string()),
8019 )])]);
8020 let params = HashMap::new();
8021 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8022 assert!(result.is_none(), "non-literal value at field should bail");
8023 }
8024
8025 #[test]
8026 fn materialize_unwind_field_rejects_when_target_field_missing() {
8027 let unwind_expr = Expr::List(vec![Expr::Map(vec![map_entry("other", int_lit(64))])]);
8029 let params = HashMap::new();
8030 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8031 assert!(
8032 result.is_none(),
8033 "map missing the requested field should bail"
8034 );
8035 }
8036
8037 #[test]
8038 fn materialize_unwind_field_rejects_non_map_list_item() {
8039 let unwind_expr = Expr::List(vec![int_lit(64), int_lit(65)]);
8042 let params = HashMap::new();
8043 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8044 assert!(
8045 result.is_none(),
8046 "non-map list items can't be field-projected"
8047 );
8048 }
8049
8050 #[test]
8051 fn materialize_unwind_field_rejects_oversized_list() {
8052 let oversized = MAX_UNWIND_IN_PUSHDOWN_VALUES + 1;
8054 let items: Vec<Expr> = (0..oversized)
8055 .map(|i| Expr::Map(vec![map_entry("nid", int_lit(i as i64))]))
8056 .collect();
8057 let unwind_expr = Expr::List(items);
8058 let params = HashMap::new();
8059 let result = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid");
8060 assert!(result.is_none(), "oversized list should bail");
8061 }
8062
8063 #[test]
8064 fn materialize_unwind_field_param_form_still_works() {
8065 let mut params = HashMap::new();
8068 params.insert(
8069 "updates".to_string(),
8070 uni_common::Value::List(vec![
8071 uni_common::Value::Map({
8072 let mut m = HashMap::new();
8073 m.insert("nid".to_string(), uni_common::Value::Int(64));
8074 m
8075 }),
8076 uni_common::Value::Map({
8077 let mut m = HashMap::new();
8078 m.insert("nid".to_string(), uni_common::Value::Int(65));
8079 m
8080 }),
8081 ]),
8082 );
8083 let unwind_expr = Expr::Parameter("updates".to_string());
8084 let values = materialize_unwind_source_field(&unwind_expr, ¶ms, "nid")
8085 .expect("parameter form should produce IN-list");
8086 assert_eq!(values.len(), 2);
8087 }
8088
8089 fn make_filter_crossjoin_scan(
8101 unwind_source: Expr,
8102 unwind_var: &str,
8103 scan_label_id: u16,
8104 scan_label: &str,
8105 scan_var: &str,
8106 predicate: Expr,
8107 ) -> LogicalPlan {
8108 let unwind = LogicalPlan::Unwind {
8109 input: Box::new(LogicalPlan::Project {
8110 input: Box::new(LogicalPlan::Scan {
8111 label_id: scan_label_id,
8112 labels: vec![scan_label.to_string()],
8113 variable: "__dummy__".to_string(),
8114 filter: None,
8115 optional: false,
8116 }),
8117 projections: vec![],
8118 }),
8119 expr: unwind_source,
8120 variable: unwind_var.to_string(),
8121 };
8122 let scan = LogicalPlan::Scan {
8123 label_id: scan_label_id,
8124 labels: vec![scan_label.to_string()],
8125 variable: scan_var.to_string(),
8126 filter: None,
8127 optional: false,
8128 };
8129 LogicalPlan::Filter {
8130 input: Box::new(LogicalPlan::CrossJoin {
8131 left: Box::new(unwind),
8132 right: Box::new(scan),
8133 }),
8134 predicate,
8135 optional_variables: HashSet::new(),
8136 }
8137 }
8138
8139 fn eq_property_predicate(scan_var: &str, prop: &str, unwind_var: &str) -> Expr {
8142 Expr::BinaryOp {
8143 left: Box::new(Expr::Property(
8144 Box::new(Expr::Variable(scan_var.to_string())),
8145 prop.to_string(),
8146 )),
8147 op: uni_cypher::ast::BinaryOp::Eq,
8148 right: Box::new(Expr::Variable(unwind_var.to_string())),
8149 }
8150 }
8151
8152 fn assert_scan_filter_is_in_list(plan: &LogicalPlan, expected_label: &str) {
8153 let LogicalPlan::Filter { input, .. } = plan else {
8156 panic!("expected top-level Filter, got {plan:?}");
8157 };
8158 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8159 panic!("expected CrossJoin under Filter, got {input:?}");
8160 };
8161 let LogicalPlan::Scan { labels, filter, .. } = right.as_ref() else {
8162 panic!("expected Scan as right subtree, got {right:?}");
8163 };
8164 assert_eq!(labels, &vec![expected_label.to_string()]);
8165 let filter_expr = filter
8166 .as_ref()
8167 .expect("Scan.filter must be Some after pass");
8168 assert!(
8169 matches!(filter_expr, Expr::In { .. }),
8170 "Scan.filter should be Expr::In, got {filter_expr:?}"
8171 );
8172 }
8173
8174 #[test]
8175 fn merge_pass_pushes_in_list_into_scan_filter() {
8176 let unwind_source = Expr::List(vec![str_lit("a"), str_lit("b")]);
8178 let plan = make_filter_crossjoin_scan(
8179 unwind_source,
8180 "u",
8181 1,
8182 "Item",
8183 "n",
8184 eq_property_predicate("n", "name", "u"),
8185 );
8186 let params = HashMap::new();
8187 let rewritten = merge_unwind_in_filters(&plan, ¶ms);
8188 assert_scan_filter_is_in_list(&rewritten, "Item");
8189 }
8190
8191 #[test]
8192 fn merge_pass_idempotent() {
8193 let unwind_source = Expr::List(vec![str_lit("a"), str_lit("b")]);
8197 let plan = make_filter_crossjoin_scan(
8198 unwind_source,
8199 "u",
8200 1,
8201 "Item",
8202 "n",
8203 eq_property_predicate("n", "name", "u"),
8204 );
8205 let params = HashMap::new();
8206 let pass1 = merge_unwind_in_filters(&plan, ¶ms);
8207 let pass2 = merge_unwind_in_filters(&pass1, ¶ms);
8208
8209 let LogicalPlan::Filter { input, .. } = &pass2 else {
8214 panic!("expected Filter");
8215 };
8216 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8217 panic!("expected CrossJoin");
8218 };
8219 let LogicalPlan::Scan { filter, .. } = right.as_ref() else {
8220 panic!("expected Scan");
8221 };
8222 let filter_expr = filter.as_ref().expect("Scan.filter must be Some");
8223 assert!(
8224 matches!(filter_expr, Expr::In { .. }),
8225 "After 2 passes the filter should still be a single Expr::In, \
8226 not ANDed with a duplicate; got {filter_expr:?}"
8227 );
8228 }
8229
8230 #[test]
8231 fn merge_pass_leaves_non_pushable_predicates_alone() {
8232 let unwind_source = Expr::List(vec![str_lit("a")]);
8236 let starts_with = Expr::BinaryOp {
8237 left: Box::new(Expr::Property(
8238 Box::new(Expr::Variable("n".to_string())),
8239 "name".to_string(),
8240 )),
8241 op: uni_cypher::ast::BinaryOp::StartsWith,
8242 right: Box::new(str_lit("x")),
8243 };
8244 let plan = make_filter_crossjoin_scan(unwind_source, "u", 1, "Item", "n", starts_with);
8245 let params = HashMap::new();
8246 let rewritten = merge_unwind_in_filters(&plan, ¶ms);
8247
8248 let LogicalPlan::Filter { input, .. } = &rewritten else {
8251 panic!("expected Filter");
8252 };
8253 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8254 panic!("expected CrossJoin");
8255 };
8256 let LogicalPlan::Scan { filter, .. } = right.as_ref() else {
8257 panic!("expected Scan");
8258 };
8259 assert!(
8260 filter.is_none(),
8261 "no equi-pair → no pushdown; Scan.filter should remain None, got {filter:?}"
8262 );
8263 }
8264
8265 #[test]
8266 fn merge_pass_handles_nested_crossjoin() {
8267 let unwind_source = Expr::List(vec![str_lit("a")]);
8279 let unwind = LogicalPlan::Unwind {
8280 input: Box::new(LogicalPlan::Project {
8281 input: Box::new(LogicalPlan::Scan {
8282 label_id: 0,
8283 labels: vec!["__".to_string()],
8284 variable: "__".to_string(),
8285 filter: None,
8286 optional: false,
8287 }),
8288 projections: vec![],
8289 }),
8290 expr: unwind_source,
8291 variable: "u".to_string(),
8292 };
8293 let inner_cross = LogicalPlan::CrossJoin {
8294 left: Box::new(LogicalPlan::Scan {
8295 label_id: 1,
8296 labels: vec!["Item".to_string()],
8297 variable: "n".to_string(),
8298 filter: None,
8299 optional: false,
8300 }),
8301 right: Box::new(LogicalPlan::Scan {
8302 label_id: 2,
8303 labels: vec!["Other".to_string()],
8304 variable: "m".to_string(),
8305 filter: None,
8306 optional: false,
8307 }),
8308 };
8309 let plan = LogicalPlan::Filter {
8310 input: Box::new(LogicalPlan::CrossJoin {
8311 left: Box::new(unwind),
8312 right: Box::new(inner_cross),
8313 }),
8314 predicate: eq_property_predicate("n", "name", "u"),
8315 optional_variables: HashSet::new(),
8316 };
8317 let params = HashMap::new();
8318 let rewritten = merge_unwind_in_filters(&plan, ¶ms);
8319
8320 let LogicalPlan::Filter { input, .. } = &rewritten else {
8327 panic!("expected outer Filter");
8328 };
8329 let LogicalPlan::CrossJoin { right, .. } = input.as_ref() else {
8330 panic!("expected outer CrossJoin");
8331 };
8332 match right.as_ref() {
8336 LogicalPlan::Filter { predicate, .. } => {
8337 assert!(
8338 matches!(predicate, Expr::In { .. }),
8339 "expected Expr::In wrapping inner CrossJoin, got {predicate:?}"
8340 );
8341 }
8342 other => panic!(
8343 "expected Filter wrapping inner CrossJoin, got {other:?}. \
8344 This is acceptable behaviour — the IN-list is preserved \
8345 above the inner join — but the test should be updated if \
8346 wrap_with_filter changes to descend through CrossJoins."
8347 ),
8348 }
8349 }
8350}