1use std::cmp::Ordering;
21use std::collections::{BTreeSet, HashSet};
22use std::sync::Arc;
23
24use crate::expr::{Alias, Sort, WildcardOptions, WindowFunctionParams};
25use crate::expr_rewriter::strip_outer_reference;
26use crate::{
27 BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator, and,
28};
29use datafusion_expr_common::signature::{Signature, TypeSignature};
30
31use arrow::datatypes::{DataType, Field, Schema};
32use datafusion_common::tree_node::{
33 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
34};
35use datafusion_common::utils::get_at_indices;
36use datafusion_common::{
37 Column, DFSchema, DFSchemaRef, HashMap, Result, TableReference, internal_err,
38 plan_err,
39};
40
41#[cfg(not(feature = "sql"))]
42use crate::expr::{ExceptSelectItem, ExcludeSelectItem};
43use indexmap::IndexSet;
44#[cfg(feature = "sql")]
45use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem};
46
47pub use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
48
49pub use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
52
53pub fn grouping_set_expr_count(group_expr: &[Expr]) -> Result<usize> {
56 if let Some(Expr::GroupingSet(grouping_set)) = group_expr.first() {
57 if group_expr.len() > 1 {
58 return plan_err!(
59 "Invalid group by expressions, GroupingSet must be the only expression"
60 );
61 }
62 Ok(grouping_set.distinct_expr().len() + 1)
64 } else {
65 grouping_set_to_exprlist(group_expr).map(|exprs| exprs.len())
66 }
67}
68
69fn powerset_indices(len: usize) -> impl Iterator<Item = Vec<usize>> {
73 (0..(1 << len)).map(move |mask| {
74 let mut indices = vec![];
75 let mut bitset = mask;
76 while bitset > 0 {
77 let rightmost: u64 = bitset & !(bitset - 1);
78 let idx = rightmost.trailing_zeros() as usize;
79 indices.push(idx);
80 bitset &= bitset - 1;
81 }
82 indices
83 })
84}
85
86pub fn powerset<T>(slice: &[T]) -> Result<Vec<Vec<&T>>> {
104 if slice.len() >= 64 {
105 return plan_err!("The size of the set must be less than 64");
106 }
107
108 Ok(powerset_indices(slice.len())
109 .map(|indices| indices.iter().map(|&idx| &slice[idx]).collect())
110 .collect())
111}
112
113fn check_grouping_set_size_limit(size: usize) -> Result<()> {
115 let max_grouping_set_size = 65535;
116 if size > max_grouping_set_size {
117 return plan_err!(
118 "The number of group_expression in grouping_set exceeds the maximum limit {max_grouping_set_size}, found {size}"
119 );
120 }
121
122 Ok(())
123}
124
125fn check_grouping_sets_size_limit(size: usize) -> Result<()> {
127 let max_grouping_sets_size = 4096;
128 if size > max_grouping_sets_size {
129 return plan_err!(
130 "The number of grouping_set in grouping_sets exceeds the maximum limit {max_grouping_sets_size}, found {size}"
131 );
132 }
133
134 Ok(())
135}
136
137fn merge_grouping_set<T: Clone>(left: &[T], right: &[T]) -> Result<Vec<T>> {
149 check_grouping_set_size_limit(left.len() + right.len())?;
150 Ok(left.iter().chain(right.iter()).cloned().collect())
151}
152
153fn cross_join_grouping_sets<T: Clone>(
166 left: &[Vec<T>],
167 right: &[Vec<T>],
168) -> Result<Vec<Vec<T>>> {
169 let grouping_sets_size = left.len() * right.len();
170
171 check_grouping_sets_size_limit(grouping_sets_size)?;
172
173 let mut result = Vec::with_capacity(grouping_sets_size);
174 for le in left {
175 for re in right {
176 result.push(merge_grouping_set(le, re)?);
177 }
178 }
179 Ok(result)
180}
181
182pub fn enumerate_grouping_sets(group_expr: Vec<Expr>) -> Result<Vec<Expr>> {
203 let has_grouping_set = group_expr
204 .iter()
205 .any(|expr| matches!(expr, Expr::GroupingSet(_)));
206 if !has_grouping_set || group_expr.len() == 1 {
207 return Ok(group_expr);
208 }
209 let partial_sets = group_expr
211 .iter()
212 .map(|expr| {
213 let exprs = match expr {
214 Expr::GroupingSet(GroupingSet::GroupingSets(grouping_sets)) => {
215 check_grouping_sets_size_limit(grouping_sets.len())?;
216 grouping_sets.iter().map(|e| e.iter().collect()).collect()
217 }
218 Expr::GroupingSet(GroupingSet::Cube(group_exprs)) => {
219 let grouping_sets = powerset(group_exprs)?;
220 check_grouping_sets_size_limit(grouping_sets.len())?;
221 grouping_sets
222 }
223 Expr::GroupingSet(GroupingSet::Rollup(group_exprs)) => {
224 let size = group_exprs.len();
225 let slice = group_exprs.as_slice();
226 check_grouping_sets_size_limit(size * (size + 1) / 2 + 1)?;
227 (0..(size + 1))
228 .map(|i| slice[0..i].iter().collect())
229 .collect()
230 }
231 expr => vec![vec![expr]],
232 };
233 Ok(exprs)
234 })
235 .collect::<Result<Vec<_>>>()?;
236
237 let grouping_sets = partial_sets
239 .into_iter()
240 .map(Ok)
241 .reduce(|l, r| cross_join_grouping_sets(&l?, &r?))
242 .transpose()?
243 .map(|e| {
244 e.into_iter()
245 .map(|e| e.into_iter().cloned().collect())
246 .collect()
247 })
248 .unwrap_or_default();
249
250 Ok(vec![Expr::GroupingSet(GroupingSet::GroupingSets(
251 grouping_sets,
252 ))])
253}
254
255pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result<Vec<&Expr>> {
258 if let Some(Expr::GroupingSet(grouping_set)) = group_expr.first() {
259 if group_expr.len() > 1 {
260 return plan_err!(
261 "Invalid group by expressions, GroupingSet must be the only expression"
262 );
263 }
264 Ok(grouping_set.distinct_expr())
265 } else {
266 Ok(group_expr
267 .iter()
268 .collect::<IndexSet<_>>()
269 .into_iter()
270 .collect())
271 }
272}
273
274pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
277 expr.apply(|expr| {
278 match expr {
279 Expr::Column(qc) => {
280 accum.insert(qc.clone());
281 }
282 #[expect(deprecated)]
287 Expr::Unnest(_)
288 | Expr::ScalarVariable(_, _)
289 | Expr::Alias(_)
290 | Expr::Literal(_, _)
291 | Expr::BinaryExpr { .. }
292 | Expr::Like { .. }
293 | Expr::SimilarTo { .. }
294 | Expr::Not(_)
295 | Expr::IsNotNull(_)
296 | Expr::IsNull(_)
297 | Expr::IsTrue(_)
298 | Expr::IsFalse(_)
299 | Expr::IsUnknown(_)
300 | Expr::IsNotTrue(_)
301 | Expr::IsNotFalse(_)
302 | Expr::IsNotUnknown(_)
303 | Expr::Negative(_)
304 | Expr::Between { .. }
305 | Expr::Case { .. }
306 | Expr::Cast { .. }
307 | Expr::TryCast { .. }
308 | Expr::ScalarFunction(..)
309 | Expr::WindowFunction { .. }
310 | Expr::AggregateFunction { .. }
311 | Expr::GroupingSet(_)
312 | Expr::InList { .. }
313 | Expr::Exists { .. }
314 | Expr::InSubquery(_)
315 | Expr::ScalarSubquery(_)
316 | Expr::Wildcard { .. }
317 | Expr::Placeholder(_)
318 | Expr::OuterReferenceColumn { .. } => {}
319 }
320 Ok(TreeNodeRecursion::Continue)
321 })
322 .map(|_| ())
323}
324
325fn get_excluded_columns(
328 opt_exclude: Option<&ExcludeSelectItem>,
329 opt_except: Option<&ExceptSelectItem>,
330 schema: &DFSchema,
331 qualifier: Option<&TableReference>,
332) -> Result<Vec<Column>> {
333 let mut idents = vec![];
334 if let Some(excepts) = opt_except {
335 idents.push(&excepts.first_element);
336 idents.extend(&excepts.additional_elements);
337 }
338 if let Some(exclude) = opt_exclude {
339 match exclude {
340 ExcludeSelectItem::Single(ident) => idents.push(ident),
341 ExcludeSelectItem::Multiple(idents_inner) => idents.extend(idents_inner),
342 }
343 }
344 let n_elem = idents.len();
346 let unique_idents = idents.into_iter().collect::<HashSet<_>>();
347 if n_elem != unique_idents.len() {
350 return plan_err!("EXCLUDE or EXCEPT contains duplicate column names");
351 }
352
353 let mut result = vec![];
354 for ident in unique_idents.into_iter() {
355 let col_name = ident.value.as_str();
356 let (qualifier, field) = schema.qualified_field_with_name(qualifier, col_name)?;
357 result.push(Column::from((qualifier, field)));
358 }
359 Ok(result)
360}
361
362fn get_exprs_except_skipped(
364 schema: &DFSchema,
365 columns_to_skip: &HashSet<Column>,
366) -> Vec<Expr> {
367 if columns_to_skip.is_empty() {
368 schema.iter().map(Expr::from).collect::<Vec<Expr>>()
369 } else {
370 schema
371 .columns()
372 .iter()
373 .filter_map(|c| {
374 if !columns_to_skip.contains(c) {
375 Some(Expr::Column(c.clone()))
376 } else {
377 None
378 }
379 })
380 .collect::<Vec<Expr>>()
381 }
382}
383
384fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
388 let using_columns = plan.using_columns()?;
389 let excluded = using_columns
390 .into_iter()
391 .flat_map(|cols| {
393 let mut cols = cols.into_iter().collect::<Vec<_>>();
394 cols.sort();
397 let mut out_column_names: HashSet<String> = HashSet::new();
398 cols.into_iter().filter_map(move |c| {
399 if out_column_names.contains(&c.name) {
400 Some(c)
401 } else {
402 out_column_names.insert(c.name);
403 None
404 }
405 })
406 })
407 .collect::<HashSet<_>>();
408 Ok(excluded)
409}
410
411pub fn expand_wildcard(
413 schema: &DFSchema,
414 plan: &LogicalPlan,
415 wildcard_options: Option<&WildcardOptions>,
416) -> Result<Vec<Expr>> {
417 let mut columns_to_skip = exclude_using_columns(plan)?;
418 let excluded_columns = if let Some(WildcardOptions {
419 exclude: opt_exclude,
420 except: opt_except,
421 ..
422 }) = wildcard_options
423 {
424 get_excluded_columns(opt_exclude.as_ref(), opt_except.as_ref(), schema, None)?
425 } else {
426 vec![]
427 };
428 columns_to_skip.extend(excluded_columns);
430 Ok(get_exprs_except_skipped(schema, &columns_to_skip))
431}
432
433pub fn expand_qualified_wildcard(
435 qualifier: &TableReference,
436 schema: &DFSchema,
437 wildcard_options: Option<&WildcardOptions>,
438) -> Result<Vec<Expr>> {
439 let qualified_indices = schema.fields_indices_with_qualified(qualifier);
440 let projected_func_dependencies = schema
441 .functional_dependencies()
442 .project_functional_dependencies(&qualified_indices, qualified_indices.len());
443 let fields_with_qualified = get_at_indices(schema.fields(), &qualified_indices)?;
444 if fields_with_qualified.is_empty() {
445 return plan_err!("Invalid qualifier {qualifier}");
446 }
447
448 let qualified_schema = Arc::new(Schema::new_with_metadata(
449 fields_with_qualified,
450 schema.metadata().clone(),
451 ));
452 let qualified_dfschema =
453 DFSchema::try_from_qualified_schema(qualifier.clone(), &qualified_schema)?
454 .with_functional_dependencies(projected_func_dependencies)?;
455 let excluded_columns = if let Some(WildcardOptions {
456 exclude: opt_exclude,
457 except: opt_except,
458 ..
459 }) = wildcard_options
460 {
461 get_excluded_columns(
462 opt_exclude.as_ref(),
463 opt_except.as_ref(),
464 schema,
465 Some(qualifier),
466 )?
467 } else {
468 vec![]
469 };
470 let mut columns_to_skip = HashSet::new();
472 columns_to_skip.extend(excluded_columns);
473 Ok(get_exprs_except_skipped(
474 &qualified_dfschema,
475 &columns_to_skip,
476 ))
477}
478
479type WindowSortKey = Vec<(Sort, bool)>;
482
483pub fn generate_sort_key(
485 partition_by: &[Expr],
486 order_by: &[Sort],
487) -> Result<WindowSortKey> {
488 let normalized_order_by_keys = order_by
489 .iter()
490 .map(|e| {
491 let Sort { expr, .. } = e;
492 Sort::new(expr.clone(), true, false)
493 })
494 .collect::<Vec<_>>();
495
496 let mut final_sort_keys = vec![];
497 let mut is_partition_flag = vec![];
498 partition_by.iter().for_each(|e| {
499 let e = e.clone().sort(true, false);
502 if let Some(pos) = normalized_order_by_keys.iter().position(|key| key.eq(&e)) {
503 let order_by_key = &order_by[pos];
504 if !final_sort_keys.contains(order_by_key) {
505 final_sort_keys.push(order_by_key.clone());
506 is_partition_flag.push(true);
507 }
508 } else if !final_sort_keys.contains(&e) {
509 final_sort_keys.push(e);
510 is_partition_flag.push(true);
511 }
512 });
513
514 order_by.iter().for_each(|e| {
515 if !final_sort_keys.contains(e) {
516 final_sort_keys.push(e.clone());
517 is_partition_flag.push(false);
518 }
519 });
520 let res = final_sort_keys
521 .into_iter()
522 .zip(is_partition_flag)
523 .collect::<Vec<_>>();
524 Ok(res)
525}
526
527pub fn compare_sort_expr(
530 sort_expr_a: &Sort,
531 sort_expr_b: &Sort,
532 schema: &DFSchemaRef,
533) -> Ordering {
534 let Sort {
535 expr: expr_a,
536 asc: asc_a,
537 nulls_first: nulls_first_a,
538 } = sort_expr_a;
539
540 let Sort {
541 expr: expr_b,
542 asc: asc_b,
543 nulls_first: nulls_first_b,
544 } = sort_expr_b;
545
546 let ref_indexes_a = find_column_indexes_referenced_by_expr(expr_a, schema);
547 let ref_indexes_b = find_column_indexes_referenced_by_expr(expr_b, schema);
548 for (idx_a, idx_b) in ref_indexes_a.iter().zip(ref_indexes_b.iter()) {
549 match idx_a.cmp(idx_b) {
550 Ordering::Less => {
551 return Ordering::Less;
552 }
553 Ordering::Greater => {
554 return Ordering::Greater;
555 }
556 Ordering::Equal => {}
557 }
558 }
559 match ref_indexes_a.len().cmp(&ref_indexes_b.len()) {
560 Ordering::Less => return Ordering::Greater,
561 Ordering::Greater => {
562 return Ordering::Less;
563 }
564 Ordering::Equal => {}
565 }
566 match (asc_a, asc_b) {
567 (true, false) => {
568 return Ordering::Greater;
569 }
570 (false, true) => {
571 return Ordering::Less;
572 }
573 _ => {}
574 }
575 match (nulls_first_a, nulls_first_b) {
576 (true, false) => {
577 return Ordering::Less;
578 }
579 (false, true) => {
580 return Ordering::Greater;
581 }
582 _ => {}
583 }
584 Ordering::Equal
585}
586
587pub fn group_window_expr_by_sort_keys(
589 window_expr: impl IntoIterator<Item = Expr>,
590) -> Result<Vec<(WindowSortKey, Vec<Expr>)>> {
591 let mut result = vec![];
592 window_expr.into_iter().try_for_each(|expr| match &expr {
593 Expr::WindowFunction(window_fun) => {
594 let WindowFunctionParams{ partition_by, order_by, ..} = &window_fun.as_ref().params;
595 let sort_key = generate_sort_key(partition_by, order_by)?;
596 if let Some((_, values)) = result.iter_mut().find(
597 |group: &&mut (WindowSortKey, Vec<Expr>)| matches!(group, (key, _) if *key == sort_key),
598 ) {
599 values.push(expr);
600 } else {
601 result.push((sort_key, vec![expr]))
602 }
603 Ok(())
604 }
605 other => internal_err!(
606 "Impossibly got non-window expr {other:?}"
607 ),
608 })?;
609 Ok(result)
610}
611
612pub fn find_aggregate_exprs<'a>(exprs: impl IntoIterator<Item = &'a Expr>) -> Vec<Expr> {
616 find_exprs_in_exprs(exprs, &|nested_expr| {
617 matches!(nested_expr, Expr::AggregateFunction { .. })
618 })
619}
620
621pub fn find_window_exprs<'a>(exprs: impl IntoIterator<Item = &'a Expr>) -> Vec<Expr> {
624 find_exprs_in_exprs(exprs, &|nested_expr| {
625 matches!(nested_expr, Expr::WindowFunction { .. })
626 })
627}
628
629pub fn find_out_reference_exprs(expr: &Expr) -> Vec<Expr> {
632 find_exprs_in_expr(expr, &|nested_expr| {
633 matches!(nested_expr, Expr::OuterReferenceColumn { .. })
634 })
635}
636
637fn find_exprs_in_exprs<'a, F>(
641 exprs: impl IntoIterator<Item = &'a Expr>,
642 test_fn: &F,
643) -> Vec<Expr>
644where
645 F: Fn(&Expr) -> bool,
646{
647 exprs
648 .into_iter()
649 .flat_map(|expr| find_exprs_in_expr(expr, test_fn))
650 .fold(vec![], |mut acc, expr| {
651 if !acc.contains(&expr) {
652 acc.push(expr)
653 }
654 acc
655 })
656}
657
658fn find_exprs_in_expr<F>(expr: &Expr, test_fn: &F) -> Vec<Expr>
662where
663 F: Fn(&Expr) -> bool,
664{
665 let mut exprs = vec![];
666 expr.apply(|expr| {
667 if test_fn(expr) {
668 if !(exprs.contains(expr)) {
669 exprs.push(expr.clone())
670 }
671 return Ok(TreeNodeRecursion::Jump);
673 }
674
675 Ok(TreeNodeRecursion::Continue)
676 })
677 .expect("no way to return error during recursion");
679 exprs
680}
681
682pub fn inspect_expr_pre<F, E>(expr: &Expr, mut f: F) -> Result<(), E>
684where
685 F: FnMut(&Expr) -> Result<(), E>,
686{
687 let mut err = Ok(());
688 expr.apply(|expr| {
689 if let Err(e) = f(expr) {
690 err = Err(e);
692 Ok(TreeNodeRecursion::Stop)
693 } else {
694 Ok(TreeNodeRecursion::Continue)
696 }
697 })
698 .expect("no way to return error during recursion");
700
701 err
702}
703
704pub fn exprlist_to_fields<'a>(
722 exprs: impl IntoIterator<Item = &'a Expr>,
723 plan: &LogicalPlan,
724) -> Result<Vec<(Option<TableReference>, Arc<Field>)>> {
725 let input_schema = plan.schema();
727 exprs
728 .into_iter()
729 .map(|e| e.to_field(input_schema))
730 .collect()
731}
732
733pub fn columnize_expr(e: Expr, input: &LogicalPlan) -> Result<Expr> {
749 let output_exprs = match input.columnized_output_exprs() {
750 Ok(exprs) if !exprs.is_empty() => exprs,
751 _ => return Ok(e),
752 };
753 let exprs_map: HashMap<&Expr, Column> = output_exprs.into_iter().collect();
754 e.transform_down(|node: Expr| match exprs_map.get(&node) {
755 Some(column) => Ok(Transformed::new(
756 Expr::Column(column.clone()),
757 true,
758 TreeNodeRecursion::Jump,
759 )),
760 None => Ok(Transformed::no(node)),
761 })
762 .data()
763}
764
765pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
768 exprs
769 .iter()
770 .flat_map(find_columns_referenced_by_expr)
771 .map(Expr::Column)
772 .collect()
773}
774
775pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
776 let mut exprs = vec![];
777 e.apply(|expr| {
778 if let Expr::Column(c) = expr {
779 exprs.push(c.clone())
780 }
781 Ok(TreeNodeRecursion::Continue)
782 })
783 .expect("Unexpected error");
785 exprs
786}
787
788pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
790 match expr {
791 Expr::Column(col) => {
792 let (qualifier, field) = plan.schema().qualified_field_from_column(col)?;
793 Ok(Expr::from(Column::from((qualifier, field))))
794 }
795 _ => Ok(Expr::Column(Column::from_name(
796 expr.schema_name().to_string(),
797 ))),
798 }
799}
800
801pub(crate) fn find_column_indexes_referenced_by_expr(
804 e: &Expr,
805 schema: &DFSchemaRef,
806) -> Vec<usize> {
807 let mut indexes = vec![];
808 e.apply(|expr| {
809 match expr {
810 Expr::Column(qc) => {
811 if let Ok(idx) = schema.index_of_column(qc) {
812 indexes.push(idx);
813 }
814 }
815 Expr::Literal(_, _) => {
816 indexes.push(usize::MAX);
817 }
818 _ => {}
819 }
820 Ok(TreeNodeRecursion::Continue)
821 })
822 .unwrap();
823 indexes
824}
825
826pub fn can_hash(data_type: &DataType) -> bool {
830 match data_type {
831 DataType::Null => true,
832 DataType::Boolean => true,
833 DataType::Int8 => true,
834 DataType::Int16 => true,
835 DataType::Int32 => true,
836 DataType::Int64 => true,
837 DataType::UInt8 => true,
838 DataType::UInt16 => true,
839 DataType::UInt32 => true,
840 DataType::UInt64 => true,
841 DataType::Float16 => true,
842 DataType::Float32 => true,
843 DataType::Float64 => true,
844 DataType::Decimal32(_, _) => true,
845 DataType::Decimal64(_, _) => true,
846 DataType::Decimal128(_, _) => true,
847 DataType::Decimal256(_, _) => true,
848 DataType::Timestamp(_, _) => true,
849 DataType::Utf8 => true,
850 DataType::LargeUtf8 => true,
851 DataType::Utf8View => true,
852 DataType::Binary => true,
853 DataType::LargeBinary => true,
854 DataType::BinaryView => true,
855 DataType::Date32 => true,
856 DataType::Date64 => true,
857 DataType::Time32(_) => true,
858 DataType::Time64(_) => true,
859 DataType::Duration(_) => true,
860 DataType::Interval(_) => true,
861 DataType::FixedSizeBinary(_) => true,
862 DataType::Dictionary(key_type, value_type) => {
863 DataType::is_dictionary_key_type(key_type) && can_hash(value_type)
864 }
865 DataType::List(value_type) => can_hash(value_type.data_type()),
866 DataType::LargeList(value_type) => can_hash(value_type.data_type()),
867 DataType::FixedSizeList(value_type, _) => can_hash(value_type.data_type()),
868 DataType::Map(map_struct, true | false) => can_hash(map_struct.data_type()),
869 DataType::Struct(fields) => fields.iter().all(|f| can_hash(f.data_type())),
870
871 DataType::ListView(_)
872 | DataType::LargeListView(_)
873 | DataType::Union(_, _)
874 | DataType::RunEndEncoded(_, _) => false,
875 }
876}
877
878pub fn check_all_columns_from_schema(
880 columns: &HashSet<&Column>,
881 schema: &DFSchema,
882) -> Result<bool> {
883 for col in columns.iter() {
884 let exist = schema.is_column_from_schema(col);
885 if !exist {
886 return Ok(false);
887 }
888 }
889
890 Ok(true)
891}
892
893pub fn find_valid_equijoin_key_pair(
902 left_key: &Expr,
903 right_key: &Expr,
904 left_schema: &DFSchema,
905 right_schema: &DFSchema,
906) -> Result<Option<(Expr, Expr)>> {
907 let left_using_columns = left_key.column_refs();
908 let right_using_columns = right_key.column_refs();
909
910 if left_using_columns.is_empty() || right_using_columns.is_empty() {
912 return Ok(None);
913 }
914
915 if check_all_columns_from_schema(&left_using_columns, left_schema)?
916 && check_all_columns_from_schema(&right_using_columns, right_schema)?
917 {
918 return Ok(Some((left_key.clone(), right_key.clone())));
919 } else if check_all_columns_from_schema(&right_using_columns, left_schema)?
920 && check_all_columns_from_schema(&left_using_columns, right_schema)?
921 {
922 return Ok(Some((right_key.clone(), left_key.clone())));
923 }
924
925 Ok(None)
926}
927
928#[expect(clippy::needless_pass_by_value)]
940pub fn generate_signature_error_msg(
941 func_name: &str,
942 func_signature: Signature,
943 input_expr_types: &[DataType],
944) -> String {
945 let candidate_signatures = func_signature
946 .type_signature
947 .to_string_repr_with_names(func_signature.parameter_names.as_deref())
948 .iter()
949 .map(|args_str| format!("\t{func_name}({args_str})"))
950 .collect::<Vec<String>>()
951 .join("\n");
952
953 format!(
954 "No function matches the given name and argument types '{}({})'. You might need to add explicit type casts.\n\tCandidate functions:\n{}",
955 func_name,
956 TypeSignature::join_types(input_expr_types, ", "),
957 candidate_signatures
958 )
959}
960
961pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
965 split_conjunction_impl(expr, vec![])
966}
967
968fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> {
969 match expr {
970 Expr::BinaryExpr(BinaryExpr {
971 right,
972 op: Operator::And,
973 left,
974 }) => {
975 let exprs = split_conjunction_impl(left, exprs);
976 split_conjunction_impl(right, exprs)
977 }
978 Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
979 other => {
980 exprs.push(other);
981 exprs
982 }
983 }
984}
985
986pub fn iter_conjunction(expr: &Expr) -> impl Iterator<Item = &Expr> {
990 let mut stack = vec![expr];
991 std::iter::from_fn(move || {
992 while let Some(expr) = stack.pop() {
993 match expr {
994 Expr::BinaryExpr(BinaryExpr {
995 right,
996 op: Operator::And,
997 left,
998 }) => {
999 stack.push(right);
1000 stack.push(left);
1001 }
1002 Expr::Alias(Alias { expr, .. }) => stack.push(expr),
1003 other => return Some(other),
1004 }
1005 }
1006 None
1007 })
1008}
1009
1010pub fn iter_conjunction_owned(expr: Expr) -> impl Iterator<Item = Expr> {
1014 let mut stack = vec![expr];
1015 std::iter::from_fn(move || {
1016 while let Some(expr) = stack.pop() {
1017 match expr {
1018 Expr::BinaryExpr(BinaryExpr {
1019 right,
1020 op: Operator::And,
1021 left,
1022 }) => {
1023 stack.push(*right);
1024 stack.push(*left);
1025 }
1026 Expr::Alias(Alias { expr, .. }) => stack.push(*expr),
1027 other => return Some(other),
1028 }
1029 }
1030 None
1031 })
1032}
1033
1034pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
1053 split_binary_owned(expr, Operator::And)
1054}
1055
1056pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
1076 split_binary_owned_impl(expr, op, vec![])
1077}
1078
1079fn split_binary_owned_impl(
1080 expr: Expr,
1081 operator: Operator,
1082 mut exprs: Vec<Expr>,
1083) -> Vec<Expr> {
1084 match expr {
1085 Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => {
1086 let exprs = split_binary_owned_impl(*left, operator, exprs);
1087 split_binary_owned_impl(*right, operator, exprs)
1088 }
1089 Expr::Alias(Alias { expr, .. }) => {
1090 split_binary_owned_impl(*expr, operator, exprs)
1091 }
1092 other => {
1093 exprs.push(other);
1094 exprs
1095 }
1096 }
1097}
1098
1099pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
1103 split_binary_impl(expr, op, vec![])
1104}
1105
1106fn split_binary_impl<'a>(
1107 expr: &'a Expr,
1108 operator: Operator,
1109 mut exprs: Vec<&'a Expr>,
1110) -> Vec<&'a Expr> {
1111 match expr {
1112 Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator => {
1113 let exprs = split_binary_impl(left, operator, exprs);
1114 split_binary_impl(right, operator, exprs)
1115 }
1116 Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, exprs),
1117 other => {
1118 exprs.push(other);
1119 exprs
1120 }
1121 }
1122}
1123
1124pub fn conjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
1144 filters.into_iter().reduce(Expr::and)
1145}
1146
1147pub fn disjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
1167 filters.into_iter().reduce(Expr::or)
1168}
1169
1170pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> Result<LogicalPlan> {
1185 let predicate = predicates
1187 .iter()
1188 .skip(1)
1189 .fold(predicates[0].clone(), |acc, predicate| {
1190 and(acc, (*predicate).to_owned())
1191 });
1192
1193 Ok(LogicalPlan::Filter(Filter::try_new(
1194 predicate,
1195 Arc::new(plan),
1196 )?))
1197}
1198
1199pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec<Expr>, Vec<Expr>)> {
1210 let mut joins = vec![];
1211 let mut others = vec![];
1212 for filter in exprs.into_iter() {
1213 if filter.contains_outer() {
1215 if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op: Operator::Eq, right }) if left.eq(right))
1216 {
1217 joins.push(strip_outer_reference((*filter).clone()));
1218 }
1219 } else {
1220 others.push((*filter).clone());
1221 }
1222 }
1223
1224 Ok((joins, others))
1225}
1226
1227pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
1237 match slice {
1238 [it] => Ok(it),
1239 [] => plan_err!("No items found!"),
1240 _ => plan_err!("More than one item found!"),
1241 }
1242}
1243
1244pub fn merge_schema(inputs: &[&LogicalPlan]) -> DFSchema {
1249 if inputs.len() == 1 {
1250 inputs[0].schema().as_ref().clone()
1251 } else {
1252 inputs.iter().map(|input| input.schema()).fold(
1253 DFSchema::empty(),
1254 |mut lhs, rhs| {
1255 lhs.merge(rhs);
1256 lhs
1257 },
1258 )
1259 }
1260}
1261
1262pub fn format_state_name(name: &str, state_name: &str) -> String {
1264 format!("{name}[{state_name}]")
1265}
1266
1267pub fn collect_subquery_cols(
1269 exprs: &[Expr],
1270 subquery_schema: &DFSchema,
1271) -> Result<BTreeSet<Column>> {
1272 exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
1273 let mut using_cols: Vec<Column> = vec![];
1274 for col in expr.column_refs().into_iter() {
1275 if subquery_schema.has_column(col) {
1276 using_cols.push(col.clone());
1277 }
1278 }
1279
1280 cols.extend(using_cols);
1281 Result::<_>::Ok(cols)
1282 })
1283}
1284
1285#[cfg(test)]
1286mod tests {
1287 use super::*;
1288 use crate::{
1289 Cast, ExprFunctionExt, WindowFunctionDefinition, col, cube,
1290 expr::WindowFunction,
1291 expr_vec_fmt, grouping_set, lit, rollup,
1292 test::function_stub::{max_udaf, min_udaf, sum_udaf},
1293 };
1294 use arrow::datatypes::{UnionFields, UnionMode};
1295 use datafusion_expr_common::signature::{TypeSignature, Volatility};
1296
1297 #[test]
1298 fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
1299 let result = group_window_expr_by_sort_keys(vec![])?;
1300 let expected: Vec<(WindowSortKey, Vec<Expr>)> = vec![];
1301 assert_eq!(expected, result);
1302 Ok(())
1303 }
1304
1305 #[test]
1306 fn test_group_window_expr_by_sort_keys_empty_window() -> Result<()> {
1307 let max1 = Expr::from(WindowFunction::new(
1308 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1309 vec![col("name")],
1310 ));
1311 let max2 = Expr::from(WindowFunction::new(
1312 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1313 vec![col("name")],
1314 ));
1315 let min3 = Expr::from(WindowFunction::new(
1316 WindowFunctionDefinition::AggregateUDF(min_udaf()),
1317 vec![col("name")],
1318 ));
1319 let sum4 = Expr::from(WindowFunction::new(
1320 WindowFunctionDefinition::AggregateUDF(sum_udaf()),
1321 vec![col("age")],
1322 ));
1323 let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
1324 let result = group_window_expr_by_sort_keys(exprs.to_vec())?;
1325 let key = vec![];
1326 let expected: Vec<(WindowSortKey, Vec<Expr>)> =
1327 vec![(key, vec![max1, max2, min3, sum4])];
1328 assert_eq!(expected, result);
1329 Ok(())
1330 }
1331
1332 #[test]
1333 fn test_group_window_expr_by_sort_keys() -> Result<()> {
1334 let age_asc = Sort::new(col("age"), true, true);
1335 let name_desc = Sort::new(col("name"), false, true);
1336 let created_at_desc = Sort::new(col("created_at"), false, true);
1337 let max1 = Expr::from(WindowFunction::new(
1338 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1339 vec![col("name")],
1340 ))
1341 .order_by(vec![age_asc.clone(), name_desc.clone()])
1342 .build()
1343 .unwrap();
1344 let max2 = Expr::from(WindowFunction::new(
1345 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1346 vec![col("name")],
1347 ));
1348 let min3 = Expr::from(WindowFunction::new(
1349 WindowFunctionDefinition::AggregateUDF(min_udaf()),
1350 vec![col("name")],
1351 ))
1352 .order_by(vec![age_asc.clone(), name_desc.clone()])
1353 .build()
1354 .unwrap();
1355 let sum4 = Expr::from(WindowFunction::new(
1356 WindowFunctionDefinition::AggregateUDF(sum_udaf()),
1357 vec![col("age")],
1358 ))
1359 .order_by(vec![
1360 name_desc.clone(),
1361 age_asc.clone(),
1362 created_at_desc.clone(),
1363 ])
1364 .build()
1365 .unwrap();
1366 let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
1368 let result = group_window_expr_by_sort_keys(exprs.to_vec())?;
1369
1370 let key1 = vec![(age_asc.clone(), false), (name_desc.clone(), false)];
1371 let key2 = vec![];
1372 let key3 = vec![
1373 (name_desc, false),
1374 (age_asc, false),
1375 (created_at_desc, false),
1376 ];
1377
1378 let expected: Vec<(WindowSortKey, Vec<Expr>)> = vec![
1379 (key1, vec![max1, min3]),
1380 (key2, vec![max2]),
1381 (key3, vec![sum4]),
1382 ];
1383 assert_eq!(expected, result);
1384 Ok(())
1385 }
1386
1387 #[test]
1388 fn avoid_generate_duplicate_sort_keys() -> Result<()> {
1389 let asc_or_desc = [true, false];
1390 let nulls_first_or_last = [true, false];
1391 let partition_by = &[col("age"), col("name"), col("created_at")];
1392 for asc_ in asc_or_desc {
1393 for nulls_first_ in nulls_first_or_last {
1394 let order_by = &[
1395 Sort {
1396 expr: col("age"),
1397 asc: asc_,
1398 nulls_first: nulls_first_,
1399 },
1400 Sort {
1401 expr: col("name"),
1402 asc: asc_,
1403 nulls_first: nulls_first_,
1404 },
1405 ];
1406
1407 let expected = vec![
1408 (
1409 Sort {
1410 expr: col("age"),
1411 asc: asc_,
1412 nulls_first: nulls_first_,
1413 },
1414 true,
1415 ),
1416 (
1417 Sort {
1418 expr: col("name"),
1419 asc: asc_,
1420 nulls_first: nulls_first_,
1421 },
1422 true,
1423 ),
1424 (
1425 Sort {
1426 expr: col("created_at"),
1427 asc: true,
1428 nulls_first: false,
1429 },
1430 true,
1431 ),
1432 ];
1433 let result = generate_sort_key(partition_by, order_by)?;
1434 assert_eq!(expected, result);
1435 }
1436 }
1437 Ok(())
1438 }
1439
1440 #[test]
1441 fn test_enumerate_grouping_sets() -> Result<()> {
1442 let multi_cols = vec![col("col1"), col("col2"), col("col3")];
1443 let simple_col = col("simple_col");
1444 let cube = cube(multi_cols.clone());
1445 let rollup = rollup(multi_cols.clone());
1446 let grouping_set = grouping_set(vec![multi_cols]);
1447
1448 let sets = enumerate_grouping_sets(vec![simple_col.clone()])?;
1450 let result = format!("[{}]", expr_vec_fmt!(sets));
1451 assert_eq!("[simple_col]", &result);
1452
1453 let sets = enumerate_grouping_sets(vec![cube.clone()])?;
1455 let result = format!("[{}]", expr_vec_fmt!(sets));
1456 assert_eq!("[CUBE (col1, col2, col3)]", &result);
1457
1458 let sets = enumerate_grouping_sets(vec![rollup.clone()])?;
1460 let result = format!("[{}]", expr_vec_fmt!(sets));
1461 assert_eq!("[ROLLUP (col1, col2, col3)]", &result);
1462
1463 let sets = enumerate_grouping_sets(vec![simple_col.clone(), cube.clone()])?;
1465 let result = format!("[{}]", expr_vec_fmt!(sets));
1466 assert_eq!(
1467 "[GROUPING SETS (\
1468 (simple_col), \
1469 (simple_col, col1), \
1470 (simple_col, col2), \
1471 (simple_col, col1, col2), \
1472 (simple_col, col3), \
1473 (simple_col, col1, col3), \
1474 (simple_col, col2, col3), \
1475 (simple_col, col1, col2, col3))]",
1476 &result
1477 );
1478
1479 let sets = enumerate_grouping_sets(vec![simple_col.clone(), rollup.clone()])?;
1481 let result = format!("[{}]", expr_vec_fmt!(sets));
1482 assert_eq!(
1483 "[GROUPING SETS (\
1484 (simple_col), \
1485 (simple_col, col1), \
1486 (simple_col, col1, col2), \
1487 (simple_col, col1, col2, col3))]",
1488 &result
1489 );
1490
1491 let sets =
1493 enumerate_grouping_sets(vec![simple_col.clone(), grouping_set.clone()])?;
1494 let result = format!("[{}]", expr_vec_fmt!(sets));
1495 assert_eq!(
1496 "[GROUPING SETS (\
1497 (simple_col, col1, col2, col3))]",
1498 &result
1499 );
1500
1501 let sets = enumerate_grouping_sets(vec![
1503 simple_col.clone(),
1504 grouping_set,
1505 rollup.clone(),
1506 ])?;
1507 let result = format!("[{}]", expr_vec_fmt!(sets));
1508 assert_eq!(
1509 "[GROUPING SETS (\
1510 (simple_col, col1, col2, col3), \
1511 (simple_col, col1, col2, col3, col1), \
1512 (simple_col, col1, col2, col3, col1, col2), \
1513 (simple_col, col1, col2, col3, col1, col2, col3))]",
1514 &result
1515 );
1516
1517 let sets = enumerate_grouping_sets(vec![simple_col, cube, rollup])?;
1519 let result = format!("[{}]", expr_vec_fmt!(sets));
1520 assert_eq!(
1521 "[GROUPING SETS (\
1522 (simple_col), \
1523 (simple_col, col1), \
1524 (simple_col, col1, col2), \
1525 (simple_col, col1, col2, col3), \
1526 (simple_col, col1), \
1527 (simple_col, col1, col1), \
1528 (simple_col, col1, col1, col2), \
1529 (simple_col, col1, col1, col2, col3), \
1530 (simple_col, col2), \
1531 (simple_col, col2, col1), \
1532 (simple_col, col2, col1, col2), \
1533 (simple_col, col2, col1, col2, col3), \
1534 (simple_col, col1, col2), \
1535 (simple_col, col1, col2, col1), \
1536 (simple_col, col1, col2, col1, col2), \
1537 (simple_col, col1, col2, col1, col2, col3), \
1538 (simple_col, col3), \
1539 (simple_col, col3, col1), \
1540 (simple_col, col3, col1, col2), \
1541 (simple_col, col3, col1, col2, col3), \
1542 (simple_col, col1, col3), \
1543 (simple_col, col1, col3, col1), \
1544 (simple_col, col1, col3, col1, col2), \
1545 (simple_col, col1, col3, col1, col2, col3), \
1546 (simple_col, col2, col3), \
1547 (simple_col, col2, col3, col1), \
1548 (simple_col, col2, col3, col1, col2), \
1549 (simple_col, col2, col3, col1, col2, col3), \
1550 (simple_col, col1, col2, col3), \
1551 (simple_col, col1, col2, col3, col1), \
1552 (simple_col, col1, col2, col3, col1, col2), \
1553 (simple_col, col1, col2, col3, col1, col2, col3))]",
1554 &result
1555 );
1556
1557 Ok(())
1558 }
1559 #[test]
1560 fn test_split_conjunction() {
1561 let expr = col("a");
1562 let result = split_conjunction(&expr);
1563 assert_eq!(result, vec![&expr]);
1564 }
1565
1566 #[test]
1567 fn test_split_conjunction_two() {
1568 let expr = col("a").eq(lit(5)).and(col("b"));
1569 let expr1 = col("a").eq(lit(5));
1570 let expr2 = col("b");
1571
1572 let result = split_conjunction(&expr);
1573 assert_eq!(result, vec![&expr1, &expr2]);
1574 }
1575
1576 #[test]
1577 fn test_split_conjunction_alias() {
1578 let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias"));
1579 let expr1 = col("a").eq(lit(5));
1580 let expr2 = col("b"); let result = split_conjunction(&expr);
1583 assert_eq!(result, vec![&expr1, &expr2]);
1584 }
1585
1586 #[test]
1587 fn test_split_conjunction_or() {
1588 let expr = col("a").eq(lit(5)).or(col("b"));
1589 let result = split_conjunction(&expr);
1590 assert_eq!(result, vec![&expr]);
1591 }
1592
1593 #[test]
1594 fn test_split_binary_owned() {
1595 let expr = col("a");
1596 assert_eq!(split_binary_owned(expr.clone(), Operator::And), vec![expr]);
1597 }
1598
1599 #[test]
1600 fn test_split_binary_owned_two() {
1601 assert_eq!(
1602 split_binary_owned(col("a").eq(lit(5)).and(col("b")), Operator::And),
1603 vec![col("a").eq(lit(5)), col("b")]
1604 );
1605 }
1606
1607 #[test]
1608 fn test_split_binary_owned_different_op() {
1609 let expr = col("a").eq(lit(5)).or(col("b"));
1610 assert_eq!(
1611 split_binary_owned(expr.clone(), Operator::And),
1613 vec![expr]
1614 );
1615 }
1616
1617 #[test]
1618 fn test_split_conjunction_owned() {
1619 let expr = col("a");
1620 assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
1621 }
1622
1623 #[test]
1624 fn test_split_conjunction_owned_two() {
1625 assert_eq!(
1626 split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))),
1627 vec![col("a").eq(lit(5)), col("b")]
1628 );
1629 }
1630
1631 #[test]
1632 fn test_split_conjunction_owned_alias() {
1633 assert_eq!(
1634 split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))),
1635 vec![
1636 col("a").eq(lit(5)),
1637 col("b"),
1639 ]
1640 );
1641 }
1642
1643 #[test]
1644 fn test_conjunction_empty() {
1645 assert_eq!(conjunction(vec![]), None);
1646 }
1647
1648 #[test]
1649 fn test_conjunction() {
1650 let expr = conjunction(vec![col("a"), col("b"), col("c")]);
1652
1653 assert_eq!(expr, Some(col("a").and(col("b")).and(col("c"))));
1655
1656 assert_ne!(expr, Some(col("a").and(col("b").and(col("c")))));
1658 }
1659
1660 #[test]
1661 fn test_disjunction_empty() {
1662 assert_eq!(disjunction(vec![]), None);
1663 }
1664
1665 #[test]
1666 fn test_disjunction() {
1667 let expr = disjunction(vec![col("a"), col("b"), col("c")]);
1669
1670 assert_eq!(expr, Some(col("a").or(col("b")).or(col("c"))));
1672
1673 assert_ne!(expr, Some(col("a").or(col("b").or(col("c")))));
1675 }
1676
1677 #[test]
1678 fn test_split_conjunction_owned_or() {
1679 let expr = col("a").eq(lit(5)).or(col("b"));
1680 assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
1681 }
1682
1683 #[test]
1684 fn test_collect_expr() -> Result<()> {
1685 let mut accum: HashSet<Column> = HashSet::new();
1686 expr_to_columns(
1687 &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
1688 &mut accum,
1689 )?;
1690 expr_to_columns(
1691 &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
1692 &mut accum,
1693 )?;
1694 assert_eq!(1, accum.len());
1695 assert!(accum.contains(&Column::from_name("a")));
1696 Ok(())
1697 }
1698
1699 #[test]
1700 fn test_can_hash() {
1701 let union_fields: UnionFields = [
1702 (0, Arc::new(Field::new("A", DataType::Int32, true))),
1703 (1, Arc::new(Field::new("B", DataType::Float64, true))),
1704 ]
1705 .into_iter()
1706 .collect();
1707
1708 let union_type = DataType::Union(union_fields, UnionMode::Sparse);
1709 assert!(!can_hash(&union_type));
1710
1711 let list_union_type =
1712 DataType::List(Arc::new(Field::new("my_union", union_type, true)));
1713 assert!(!can_hash(&list_union_type));
1714 }
1715
1716 #[test]
1717 fn test_generate_signature_error_msg_with_parameter_names() {
1718 let sig = Signature::one_of(
1719 vec![
1720 TypeSignature::Exact(vec![DataType::Utf8, DataType::Int64]),
1721 TypeSignature::Exact(vec![
1722 DataType::Utf8,
1723 DataType::Int64,
1724 DataType::Int64,
1725 ]),
1726 ],
1727 Volatility::Immutable,
1728 )
1729 .with_parameter_names(vec![
1730 "str".to_string(),
1731 "start_pos".to_string(),
1732 "length".to_string(),
1733 ])
1734 .expect("valid parameter names");
1735
1736 let error_msg = generate_signature_error_msg("substr", sig, &[DataType::Utf8]);
1738
1739 assert!(
1740 error_msg.contains("str: Utf8, start_pos: Int64"),
1741 "Expected 'str: Utf8, start_pos: Int64' in error message, got: {error_msg}"
1742 );
1743 assert!(
1744 error_msg.contains("str: Utf8, start_pos: Int64, length: Int64"),
1745 "Expected 'str: Utf8, start_pos: Int64, length: Int64' in error message, got: {error_msg}"
1746 );
1747 }
1748
1749 #[test]
1750 fn test_generate_signature_error_msg_without_parameter_names() {
1751 let sig = Signature::one_of(
1752 vec![TypeSignature::Any(2), TypeSignature::Any(3)],
1753 Volatility::Immutable,
1754 );
1755
1756 let error_msg = generate_signature_error_msg("my_func", sig, &[DataType::Int32]);
1757
1758 assert!(
1759 error_msg.contains("Any, Any"),
1760 "Expected 'Any, Any' without parameter names, got: {error_msg}"
1761 );
1762 }
1763}