1use std::cmp::Ordering;
21use std::collections::{BTreeSet, HashSet};
22use std::sync::Arc;
23
24use crate::expr::{Alias, Sort, WildcardOptions, WindowFunctionParams};
25use crate::expr_rewriter::strip_outer_reference;
26use crate::{
27 and, BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator,
28};
29use datafusion_expr_common::signature::{Signature, TypeSignature};
30
31use arrow::datatypes::{DataType, Field, Schema};
32use datafusion_common::tree_node::{
33 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
34};
35use datafusion_common::utils::get_at_indices;
36use datafusion_common::{
37 internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap,
38 Result, TableReference,
39};
40
41#[cfg(not(feature = "sql"))]
42use crate::expr::{ExceptSelectItem, ExcludeSelectItem};
43use indexmap::IndexSet;
44#[cfg(feature = "sql")]
45use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem};
46
47pub use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
48
49pub use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
52
53pub fn grouping_set_expr_count(group_expr: &[Expr]) -> Result<usize> {
56 if let Some(Expr::GroupingSet(grouping_set)) = group_expr.first() {
57 if group_expr.len() > 1 {
58 return plan_err!(
59 "Invalid group by expressions, GroupingSet must be the only expression"
60 );
61 }
62 Ok(grouping_set.distinct_expr().len() + 1)
64 } else {
65 grouping_set_to_exprlist(group_expr).map(|exprs| exprs.len())
66 }
67}
68
69fn powerset<T>(slice: &[T]) -> Result<Vec<Vec<&T>>, String> {
87 if slice.len() >= 64 {
88 return Err("The size of the set must be less than 64.".into());
89 }
90
91 let mut v = Vec::new();
92 for mask in 0..(1 << slice.len()) {
93 let mut ss = vec![];
94 let mut bitset = mask;
95 while bitset > 0 {
96 let rightmost: u64 = bitset & !(bitset - 1);
97 let idx = rightmost.trailing_zeros();
98 let item = slice.get(idx as usize).unwrap();
99 ss.push(item);
100 bitset &= bitset - 1;
102 }
103 v.push(ss);
104 }
105 Ok(v)
106}
107
108fn check_grouping_set_size_limit(size: usize) -> Result<()> {
110 let max_grouping_set_size = 65535;
111 if size > max_grouping_set_size {
112 return plan_err!("The number of group_expression in grouping_set exceeds the maximum limit {max_grouping_set_size}, found {size}");
113 }
114
115 Ok(())
116}
117
118fn check_grouping_sets_size_limit(size: usize) -> Result<()> {
120 let max_grouping_sets_size = 4096;
121 if size > max_grouping_sets_size {
122 return plan_err!("The number of grouping_set in grouping_sets exceeds the maximum limit {max_grouping_sets_size}, found {size}");
123 }
124
125 Ok(())
126}
127
128fn merge_grouping_set<T: Clone>(left: &[T], right: &[T]) -> Result<Vec<T>> {
140 check_grouping_set_size_limit(left.len() + right.len())?;
141 Ok(left.iter().chain(right.iter()).cloned().collect())
142}
143
144fn cross_join_grouping_sets<T: Clone>(
157 left: &[Vec<T>],
158 right: &[Vec<T>],
159) -> Result<Vec<Vec<T>>> {
160 let grouping_sets_size = left.len() * right.len();
161
162 check_grouping_sets_size_limit(grouping_sets_size)?;
163
164 let mut result = Vec::with_capacity(grouping_sets_size);
165 for le in left {
166 for re in right {
167 result.push(merge_grouping_set(le, re)?);
168 }
169 }
170 Ok(result)
171}
172
173pub fn enumerate_grouping_sets(group_expr: Vec<Expr>) -> Result<Vec<Expr>> {
194 let has_grouping_set = group_expr
195 .iter()
196 .any(|expr| matches!(expr, Expr::GroupingSet(_)));
197 if !has_grouping_set || group_expr.len() == 1 {
198 return Ok(group_expr);
199 }
200 let partial_sets = group_expr
202 .iter()
203 .map(|expr| {
204 let exprs = match expr {
205 Expr::GroupingSet(GroupingSet::GroupingSets(grouping_sets)) => {
206 check_grouping_sets_size_limit(grouping_sets.len())?;
207 grouping_sets.iter().map(|e| e.iter().collect()).collect()
208 }
209 Expr::GroupingSet(GroupingSet::Cube(group_exprs)) => {
210 let grouping_sets = powerset(group_exprs)
211 .map_err(|e| plan_datafusion_err!("{}", e))?;
212 check_grouping_sets_size_limit(grouping_sets.len())?;
213 grouping_sets
214 }
215 Expr::GroupingSet(GroupingSet::Rollup(group_exprs)) => {
216 let size = group_exprs.len();
217 let slice = group_exprs.as_slice();
218 check_grouping_sets_size_limit(size * (size + 1) / 2 + 1)?;
219 (0..(size + 1))
220 .map(|i| slice[0..i].iter().collect())
221 .collect()
222 }
223 expr => vec![vec![expr]],
224 };
225 Ok(exprs)
226 })
227 .collect::<Result<Vec<_>>>()?;
228
229 let grouping_sets = partial_sets
231 .into_iter()
232 .map(Ok)
233 .reduce(|l, r| cross_join_grouping_sets(&l?, &r?))
234 .transpose()?
235 .map(|e| {
236 e.into_iter()
237 .map(|e| e.into_iter().cloned().collect())
238 .collect()
239 })
240 .unwrap_or_default();
241
242 Ok(vec![Expr::GroupingSet(GroupingSet::GroupingSets(
243 grouping_sets,
244 ))])
245}
246
247pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result<Vec<&Expr>> {
250 if let Some(Expr::GroupingSet(grouping_set)) = group_expr.first() {
251 if group_expr.len() > 1 {
252 return plan_err!(
253 "Invalid group by expressions, GroupingSet must be the only expression"
254 );
255 }
256 Ok(grouping_set.distinct_expr())
257 } else {
258 Ok(group_expr
259 .iter()
260 .collect::<IndexSet<_>>()
261 .into_iter()
262 .collect())
263 }
264}
265
266pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
269 expr.apply(|expr| {
270 match expr {
271 Expr::Column(qc) => {
272 accum.insert(qc.clone());
273 }
274 #[expect(deprecated)]
279 Expr::Unnest(_)
280 | Expr::ScalarVariable(_, _)
281 | Expr::Alias(_)
282 | Expr::Literal(_, _)
283 | Expr::BinaryExpr { .. }
284 | Expr::Like { .. }
285 | Expr::SimilarTo { .. }
286 | Expr::Not(_)
287 | Expr::IsNotNull(_)
288 | Expr::IsNull(_)
289 | Expr::IsTrue(_)
290 | Expr::IsFalse(_)
291 | Expr::IsUnknown(_)
292 | Expr::IsNotTrue(_)
293 | Expr::IsNotFalse(_)
294 | Expr::IsNotUnknown(_)
295 | Expr::Negative(_)
296 | Expr::Between { .. }
297 | Expr::Case { .. }
298 | Expr::Cast { .. }
299 | Expr::TryCast { .. }
300 | Expr::ScalarFunction(..)
301 | Expr::WindowFunction { .. }
302 | Expr::AggregateFunction { .. }
303 | Expr::GroupingSet(_)
304 | Expr::InList { .. }
305 | Expr::Exists { .. }
306 | Expr::InSubquery(_)
307 | Expr::ScalarSubquery(_)
308 | Expr::Wildcard { .. }
309 | Expr::Placeholder(_)
310 | Expr::OuterReferenceColumn { .. } => {}
311 }
312 Ok(TreeNodeRecursion::Continue)
313 })
314 .map(|_| ())
315}
316
317fn get_excluded_columns(
320 opt_exclude: Option<&ExcludeSelectItem>,
321 opt_except: Option<&ExceptSelectItem>,
322 schema: &DFSchema,
323 qualifier: Option<&TableReference>,
324) -> Result<Vec<Column>> {
325 let mut idents = vec![];
326 if let Some(excepts) = opt_except {
327 idents.push(&excepts.first_element);
328 idents.extend(&excepts.additional_elements);
329 }
330 if let Some(exclude) = opt_exclude {
331 match exclude {
332 ExcludeSelectItem::Single(ident) => idents.push(ident),
333 ExcludeSelectItem::Multiple(idents_inner) => idents.extend(idents_inner),
334 }
335 }
336 let n_elem = idents.len();
338 let unique_idents = idents.into_iter().collect::<HashSet<_>>();
339 if n_elem != unique_idents.len() {
342 return plan_err!("EXCLUDE or EXCEPT contains duplicate column names");
343 }
344
345 let mut result = vec![];
346 for ident in unique_idents.into_iter() {
347 let col_name = ident.value.as_str();
348 let (qualifier, field) = schema.qualified_field_with_name(qualifier, col_name)?;
349 result.push(Column::from((qualifier, field)));
350 }
351 Ok(result)
352}
353
354fn get_exprs_except_skipped(
356 schema: &DFSchema,
357 columns_to_skip: HashSet<Column>,
358) -> Vec<Expr> {
359 if columns_to_skip.is_empty() {
360 schema.iter().map(Expr::from).collect::<Vec<Expr>>()
361 } else {
362 schema
363 .columns()
364 .iter()
365 .filter_map(|c| {
366 if !columns_to_skip.contains(c) {
367 Some(Expr::Column(c.clone()))
368 } else {
369 None
370 }
371 })
372 .collect::<Vec<Expr>>()
373 }
374}
375
376fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
380 let using_columns = plan.using_columns()?;
381 let excluded = using_columns
382 .into_iter()
383 .flat_map(|cols| {
385 let mut cols = cols.into_iter().collect::<Vec<_>>();
386 cols.sort();
389 let mut out_column_names: HashSet<String> = HashSet::new();
390 cols.into_iter().filter_map(move |c| {
391 if out_column_names.contains(&c.name) {
392 Some(c)
393 } else {
394 out_column_names.insert(c.name);
395 None
396 }
397 })
398 })
399 .collect::<HashSet<_>>();
400 Ok(excluded)
401}
402
403pub fn expand_wildcard(
405 schema: &DFSchema,
406 plan: &LogicalPlan,
407 wildcard_options: Option<&WildcardOptions>,
408) -> Result<Vec<Expr>> {
409 let mut columns_to_skip = exclude_using_columns(plan)?;
410 let excluded_columns = if let Some(WildcardOptions {
411 exclude: opt_exclude,
412 except: opt_except,
413 ..
414 }) = wildcard_options
415 {
416 get_excluded_columns(opt_exclude.as_ref(), opt_except.as_ref(), schema, None)?
417 } else {
418 vec![]
419 };
420 columns_to_skip.extend(excluded_columns);
422 Ok(get_exprs_except_skipped(schema, columns_to_skip))
423}
424
425pub fn expand_qualified_wildcard(
427 qualifier: &TableReference,
428 schema: &DFSchema,
429 wildcard_options: Option<&WildcardOptions>,
430) -> Result<Vec<Expr>> {
431 let qualified_indices = schema.fields_indices_with_qualified(qualifier);
432 let projected_func_dependencies = schema
433 .functional_dependencies()
434 .project_functional_dependencies(&qualified_indices, qualified_indices.len());
435 let fields_with_qualified = get_at_indices(schema.fields(), &qualified_indices)?;
436 if fields_with_qualified.is_empty() {
437 return plan_err!("Invalid qualifier {qualifier}");
438 }
439
440 let qualified_schema = Arc::new(Schema::new_with_metadata(
441 fields_with_qualified,
442 schema.metadata().clone(),
443 ));
444 let qualified_dfschema =
445 DFSchema::try_from_qualified_schema(qualifier.clone(), &qualified_schema)?
446 .with_functional_dependencies(projected_func_dependencies)?;
447 let excluded_columns = if let Some(WildcardOptions {
448 exclude: opt_exclude,
449 except: opt_except,
450 ..
451 }) = wildcard_options
452 {
453 get_excluded_columns(
454 opt_exclude.as_ref(),
455 opt_except.as_ref(),
456 schema,
457 Some(qualifier),
458 )?
459 } else {
460 vec![]
461 };
462 let mut columns_to_skip = HashSet::new();
464 columns_to_skip.extend(excluded_columns);
465 Ok(get_exprs_except_skipped(
466 &qualified_dfschema,
467 columns_to_skip,
468 ))
469}
470
471type WindowSortKey = Vec<(Sort, bool)>;
474
475pub fn generate_sort_key(
477 partition_by: &[Expr],
478 order_by: &[Sort],
479) -> Result<WindowSortKey> {
480 let normalized_order_by_keys = order_by
481 .iter()
482 .map(|e| {
483 let Sort { expr, .. } = e;
484 Sort::new(expr.clone(), true, false)
485 })
486 .collect::<Vec<_>>();
487
488 let mut final_sort_keys = vec![];
489 let mut is_partition_flag = vec![];
490 partition_by.iter().for_each(|e| {
491 let e = e.clone().sort(true, false);
494 if let Some(pos) = normalized_order_by_keys.iter().position(|key| key.eq(&e)) {
495 let order_by_key = &order_by[pos];
496 if !final_sort_keys.contains(order_by_key) {
497 final_sort_keys.push(order_by_key.clone());
498 is_partition_flag.push(true);
499 }
500 } else if !final_sort_keys.contains(&e) {
501 final_sort_keys.push(e);
502 is_partition_flag.push(true);
503 }
504 });
505
506 order_by.iter().for_each(|e| {
507 if !final_sort_keys.contains(e) {
508 final_sort_keys.push(e.clone());
509 is_partition_flag.push(false);
510 }
511 });
512 let res = final_sort_keys
513 .into_iter()
514 .zip(is_partition_flag)
515 .collect::<Vec<_>>();
516 Ok(res)
517}
518
519pub fn compare_sort_expr(
522 sort_expr_a: &Sort,
523 sort_expr_b: &Sort,
524 schema: &DFSchemaRef,
525) -> Ordering {
526 let Sort {
527 expr: expr_a,
528 asc: asc_a,
529 nulls_first: nulls_first_a,
530 } = sort_expr_a;
531
532 let Sort {
533 expr: expr_b,
534 asc: asc_b,
535 nulls_first: nulls_first_b,
536 } = sort_expr_b;
537
538 let ref_indexes_a = find_column_indexes_referenced_by_expr(expr_a, schema);
539 let ref_indexes_b = find_column_indexes_referenced_by_expr(expr_b, schema);
540 for (idx_a, idx_b) in ref_indexes_a.iter().zip(ref_indexes_b.iter()) {
541 match idx_a.cmp(idx_b) {
542 Ordering::Less => {
543 return Ordering::Less;
544 }
545 Ordering::Greater => {
546 return Ordering::Greater;
547 }
548 Ordering::Equal => {}
549 }
550 }
551 match ref_indexes_a.len().cmp(&ref_indexes_b.len()) {
552 Ordering::Less => return Ordering::Greater,
553 Ordering::Greater => {
554 return Ordering::Less;
555 }
556 Ordering::Equal => {}
557 }
558 match (asc_a, asc_b) {
559 (true, false) => {
560 return Ordering::Greater;
561 }
562 (false, true) => {
563 return Ordering::Less;
564 }
565 _ => {}
566 }
567 match (nulls_first_a, nulls_first_b) {
568 (true, false) => {
569 return Ordering::Less;
570 }
571 (false, true) => {
572 return Ordering::Greater;
573 }
574 _ => {}
575 }
576 Ordering::Equal
577}
578
579pub fn group_window_expr_by_sort_keys(
581 window_expr: impl IntoIterator<Item = Expr>,
582) -> Result<Vec<(WindowSortKey, Vec<Expr>)>> {
583 let mut result = vec![];
584 window_expr.into_iter().try_for_each(|expr| match &expr {
585 Expr::WindowFunction(window_fun) => {
586 let WindowFunctionParams{ partition_by, order_by, ..} = &window_fun.as_ref().params;
587 let sort_key = generate_sort_key(partition_by, order_by)?;
588 if let Some((_, values)) = result.iter_mut().find(
589 |group: &&mut (WindowSortKey, Vec<Expr>)| matches!(group, (key, _) if *key == sort_key),
590 ) {
591 values.push(expr);
592 } else {
593 result.push((sort_key, vec![expr]))
594 }
595 Ok(())
596 }
597 other => internal_err!(
598 "Impossibly got non-window expr {other:?}"
599 ),
600 })?;
601 Ok(result)
602}
603
604pub fn find_aggregate_exprs<'a>(exprs: impl IntoIterator<Item = &'a Expr>) -> Vec<Expr> {
608 find_exprs_in_exprs(exprs, &|nested_expr| {
609 matches!(nested_expr, Expr::AggregateFunction { .. })
610 })
611}
612
613pub fn find_window_exprs<'a>(exprs: impl IntoIterator<Item = &'a Expr>) -> Vec<Expr> {
616 find_exprs_in_exprs(exprs, &|nested_expr| {
617 matches!(nested_expr, Expr::WindowFunction { .. })
618 })
619}
620
621pub fn find_out_reference_exprs(expr: &Expr) -> Vec<Expr> {
624 find_exprs_in_expr(expr, &|nested_expr| {
625 matches!(nested_expr, Expr::OuterReferenceColumn { .. })
626 })
627}
628
629fn find_exprs_in_exprs<'a, F>(
633 exprs: impl IntoIterator<Item = &'a Expr>,
634 test_fn: &F,
635) -> Vec<Expr>
636where
637 F: Fn(&Expr) -> bool,
638{
639 exprs
640 .into_iter()
641 .flat_map(|expr| find_exprs_in_expr(expr, test_fn))
642 .fold(vec![], |mut acc, expr| {
643 if !acc.contains(&expr) {
644 acc.push(expr)
645 }
646 acc
647 })
648}
649
650fn find_exprs_in_expr<F>(expr: &Expr, test_fn: &F) -> Vec<Expr>
654where
655 F: Fn(&Expr) -> bool,
656{
657 let mut exprs = vec![];
658 expr.apply(|expr| {
659 if test_fn(expr) {
660 if !(exprs.contains(expr)) {
661 exprs.push(expr.clone())
662 }
663 return Ok(TreeNodeRecursion::Jump);
665 }
666
667 Ok(TreeNodeRecursion::Continue)
668 })
669 .expect("no way to return error during recursion");
671 exprs
672}
673
674pub fn inspect_expr_pre<F, E>(expr: &Expr, mut f: F) -> Result<(), E>
676where
677 F: FnMut(&Expr) -> Result<(), E>,
678{
679 let mut err = Ok(());
680 expr.apply(|expr| {
681 if let Err(e) = f(expr) {
682 err = Err(e);
684 Ok(TreeNodeRecursion::Stop)
685 } else {
686 Ok(TreeNodeRecursion::Continue)
688 }
689 })
690 .expect("no way to return error during recursion");
692
693 err
694}
695
696pub fn exprlist_to_fields<'a>(
714 exprs: impl IntoIterator<Item = &'a Expr>,
715 plan: &LogicalPlan,
716) -> Result<Vec<(Option<TableReference>, Arc<Field>)>> {
717 let input_schema = plan.schema();
719 exprs
720 .into_iter()
721 .map(|e| e.to_field(input_schema))
722 .collect()
723}
724
725pub fn columnize_expr(e: Expr, input: &LogicalPlan) -> Result<Expr> {
741 let output_exprs = match input.columnized_output_exprs() {
742 Ok(exprs) if !exprs.is_empty() => exprs,
743 _ => return Ok(e),
744 };
745 let exprs_map: HashMap<&Expr, Column> = output_exprs.into_iter().collect();
746 e.transform_down(|node: Expr| match exprs_map.get(&node) {
747 Some(column) => Ok(Transformed::new(
748 Expr::Column(column.clone()),
749 true,
750 TreeNodeRecursion::Jump,
751 )),
752 None => Ok(Transformed::no(node)),
753 })
754 .data()
755}
756
757pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
760 exprs
761 .iter()
762 .flat_map(find_columns_referenced_by_expr)
763 .map(Expr::Column)
764 .collect()
765}
766
767pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
768 let mut exprs = vec![];
769 e.apply(|expr| {
770 if let Expr::Column(c) = expr {
771 exprs.push(c.clone())
772 }
773 Ok(TreeNodeRecursion::Continue)
774 })
775 .expect("Unexpected error");
777 exprs
778}
779
780pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
782 match expr {
783 Expr::Column(col) => {
784 let (qualifier, field) = plan.schema().qualified_field_from_column(col)?;
785 Ok(Expr::from(Column::from((qualifier, field))))
786 }
787 _ => Ok(Expr::Column(Column::from_name(
788 expr.schema_name().to_string(),
789 ))),
790 }
791}
792
793pub(crate) fn find_column_indexes_referenced_by_expr(
796 e: &Expr,
797 schema: &DFSchemaRef,
798) -> Vec<usize> {
799 let mut indexes = vec![];
800 e.apply(|expr| {
801 match expr {
802 Expr::Column(qc) => {
803 if let Ok(idx) = schema.index_of_column(qc) {
804 indexes.push(idx);
805 }
806 }
807 Expr::Literal(_, _) => {
808 indexes.push(usize::MAX);
809 }
810 _ => {}
811 }
812 Ok(TreeNodeRecursion::Continue)
813 })
814 .unwrap();
815 indexes
816}
817
818pub fn can_hash(data_type: &DataType) -> bool {
822 match data_type {
823 DataType::Null => true,
824 DataType::Boolean => true,
825 DataType::Int8 => true,
826 DataType::Int16 => true,
827 DataType::Int32 => true,
828 DataType::Int64 => true,
829 DataType::UInt8 => true,
830 DataType::UInt16 => true,
831 DataType::UInt32 => true,
832 DataType::UInt64 => true,
833 DataType::Float16 => true,
834 DataType::Float32 => true,
835 DataType::Float64 => true,
836 DataType::Decimal32(_, _) => true,
837 DataType::Decimal64(_, _) => true,
838 DataType::Decimal128(_, _) => true,
839 DataType::Decimal256(_, _) => true,
840 DataType::Timestamp(_, _) => true,
841 DataType::Utf8 => true,
842 DataType::LargeUtf8 => true,
843 DataType::Utf8View => true,
844 DataType::Binary => true,
845 DataType::LargeBinary => true,
846 DataType::BinaryView => true,
847 DataType::Date32 => true,
848 DataType::Date64 => true,
849 DataType::Time32(_) => true,
850 DataType::Time64(_) => true,
851 DataType::Duration(_) => true,
852 DataType::Interval(_) => true,
853 DataType::FixedSizeBinary(_) => true,
854 DataType::Dictionary(key_type, value_type) => {
855 DataType::is_dictionary_key_type(key_type) && can_hash(value_type)
856 }
857 DataType::List(value_type) => can_hash(value_type.data_type()),
858 DataType::LargeList(value_type) => can_hash(value_type.data_type()),
859 DataType::FixedSizeList(value_type, _) => can_hash(value_type.data_type()),
860 DataType::Map(map_struct, true | false) => can_hash(map_struct.data_type()),
861 DataType::Struct(fields) => fields.iter().all(|f| can_hash(f.data_type())),
862
863 DataType::ListView(_)
864 | DataType::LargeListView(_)
865 | DataType::Union(_, _)
866 | DataType::RunEndEncoded(_, _) => false,
867 }
868}
869
870pub fn check_all_columns_from_schema(
872 columns: &HashSet<&Column>,
873 schema: &DFSchema,
874) -> Result<bool> {
875 for col in columns.iter() {
876 let exist = schema.is_column_from_schema(col);
877 if !exist {
878 return Ok(false);
879 }
880 }
881
882 Ok(true)
883}
884
885pub fn find_valid_equijoin_key_pair(
894 left_key: &Expr,
895 right_key: &Expr,
896 left_schema: &DFSchema,
897 right_schema: &DFSchema,
898) -> Result<Option<(Expr, Expr)>> {
899 let left_using_columns = left_key.column_refs();
900 let right_using_columns = right_key.column_refs();
901
902 if left_using_columns.is_empty() || right_using_columns.is_empty() {
904 return Ok(None);
905 }
906
907 if check_all_columns_from_schema(&left_using_columns, left_schema)?
908 && check_all_columns_from_schema(&right_using_columns, right_schema)?
909 {
910 return Ok(Some((left_key.clone(), right_key.clone())));
911 } else if check_all_columns_from_schema(&right_using_columns, left_schema)?
912 && check_all_columns_from_schema(&left_using_columns, right_schema)?
913 {
914 return Ok(Some((right_key.clone(), left_key.clone())));
915 }
916
917 Ok(None)
918}
919
920pub fn generate_signature_error_msg(
932 func_name: &str,
933 func_signature: Signature,
934 input_expr_types: &[DataType],
935) -> String {
936 let candidate_signatures = func_signature
937 .type_signature
938 .to_string_repr_with_names(func_signature.parameter_names.as_deref())
939 .iter()
940 .map(|args_str| format!("\t{func_name}({args_str})"))
941 .collect::<Vec<String>>()
942 .join("\n");
943
944 format!(
945 "No function matches the given name and argument types '{}({})'. You might need to add explicit type casts.\n\tCandidate functions:\n{}",
946 func_name, TypeSignature::join_types(input_expr_types, ", "), candidate_signatures
947 )
948}
949
950pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
954 split_conjunction_impl(expr, vec![])
955}
956
957fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> {
958 match expr {
959 Expr::BinaryExpr(BinaryExpr {
960 right,
961 op: Operator::And,
962 left,
963 }) => {
964 let exprs = split_conjunction_impl(left, exprs);
965 split_conjunction_impl(right, exprs)
966 }
967 Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
968 other => {
969 exprs.push(other);
970 exprs
971 }
972 }
973}
974
975pub fn iter_conjunction(expr: &Expr) -> impl Iterator<Item = &Expr> {
979 let mut stack = vec![expr];
980 std::iter::from_fn(move || {
981 while let Some(expr) = stack.pop() {
982 match expr {
983 Expr::BinaryExpr(BinaryExpr {
984 right,
985 op: Operator::And,
986 left,
987 }) => {
988 stack.push(right);
989 stack.push(left);
990 }
991 Expr::Alias(Alias { expr, .. }) => stack.push(expr),
992 other => return Some(other),
993 }
994 }
995 None
996 })
997}
998
999pub fn iter_conjunction_owned(expr: Expr) -> impl Iterator<Item = Expr> {
1003 let mut stack = vec![expr];
1004 std::iter::from_fn(move || {
1005 while let Some(expr) = stack.pop() {
1006 match expr {
1007 Expr::BinaryExpr(BinaryExpr {
1008 right,
1009 op: Operator::And,
1010 left,
1011 }) => {
1012 stack.push(*right);
1013 stack.push(*left);
1014 }
1015 Expr::Alias(Alias { expr, .. }) => stack.push(*expr),
1016 other => return Some(other),
1017 }
1018 }
1019 None
1020 })
1021}
1022
1023pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
1042 split_binary_owned(expr, Operator::And)
1043}
1044
1045pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
1065 split_binary_owned_impl(expr, op, vec![])
1066}
1067
1068fn split_binary_owned_impl(
1069 expr: Expr,
1070 operator: Operator,
1071 mut exprs: Vec<Expr>,
1072) -> Vec<Expr> {
1073 match expr {
1074 Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => {
1075 let exprs = split_binary_owned_impl(*left, operator, exprs);
1076 split_binary_owned_impl(*right, operator, exprs)
1077 }
1078 Expr::Alias(Alias { expr, .. }) => {
1079 split_binary_owned_impl(*expr, operator, exprs)
1080 }
1081 other => {
1082 exprs.push(other);
1083 exprs
1084 }
1085 }
1086}
1087
1088pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
1092 split_binary_impl(expr, op, vec![])
1093}
1094
1095fn split_binary_impl<'a>(
1096 expr: &'a Expr,
1097 operator: Operator,
1098 mut exprs: Vec<&'a Expr>,
1099) -> Vec<&'a Expr> {
1100 match expr {
1101 Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator => {
1102 let exprs = split_binary_impl(left, operator, exprs);
1103 split_binary_impl(right, operator, exprs)
1104 }
1105 Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, exprs),
1106 other => {
1107 exprs.push(other);
1108 exprs
1109 }
1110 }
1111}
1112
1113pub fn conjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
1133 filters.into_iter().reduce(Expr::and)
1134}
1135
1136pub fn disjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
1156 filters.into_iter().reduce(Expr::or)
1157}
1158
1159pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> Result<LogicalPlan> {
1174 let predicate = predicates
1176 .iter()
1177 .skip(1)
1178 .fold(predicates[0].clone(), |acc, predicate| {
1179 and(acc, (*predicate).to_owned())
1180 });
1181
1182 Ok(LogicalPlan::Filter(Filter::try_new(
1183 predicate,
1184 Arc::new(plan),
1185 )?))
1186}
1187
1188pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec<Expr>, Vec<Expr>)> {
1199 let mut joins = vec![];
1200 let mut others = vec![];
1201 for filter in exprs.into_iter() {
1202 if filter.contains_outer() {
1204 if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op: Operator::Eq, right }) if left.eq(right))
1205 {
1206 joins.push(strip_outer_reference((*filter).clone()));
1207 }
1208 } else {
1209 others.push((*filter).clone());
1210 }
1211 }
1212
1213 Ok((joins, others))
1214}
1215
1216pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
1226 match slice {
1227 [it] => Ok(it),
1228 [] => plan_err!("No items found!"),
1229 _ => plan_err!("More than one item found!"),
1230 }
1231}
1232
1233pub fn merge_schema(inputs: &[&LogicalPlan]) -> DFSchema {
1238 if inputs.len() == 1 {
1239 inputs[0].schema().as_ref().clone()
1240 } else {
1241 inputs.iter().map(|input| input.schema()).fold(
1242 DFSchema::empty(),
1243 |mut lhs, rhs| {
1244 lhs.merge(rhs);
1245 lhs
1246 },
1247 )
1248 }
1249}
1250
1251pub fn format_state_name(name: &str, state_name: &str) -> String {
1253 format!("{name}[{state_name}]")
1254}
1255
1256pub fn collect_subquery_cols(
1258 exprs: &[Expr],
1259 subquery_schema: &DFSchema,
1260) -> Result<BTreeSet<Column>> {
1261 exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
1262 let mut using_cols: Vec<Column> = vec![];
1263 for col in expr.column_refs().into_iter() {
1264 if subquery_schema.has_column(col) {
1265 using_cols.push(col.clone());
1266 }
1267 }
1268
1269 cols.extend(using_cols);
1270 Result::<_>::Ok(cols)
1271 })
1272}
1273
1274#[cfg(test)]
1275mod tests {
1276 use super::*;
1277 use crate::{
1278 col, cube,
1279 expr::WindowFunction,
1280 expr_vec_fmt, grouping_set, lit, rollup,
1281 test::function_stub::{max_udaf, min_udaf, sum_udaf},
1282 Cast, ExprFunctionExt, WindowFunctionDefinition,
1283 };
1284 use arrow::datatypes::{UnionFields, UnionMode};
1285 use datafusion_expr_common::signature::{TypeSignature, Volatility};
1286
1287 #[test]
1288 fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
1289 let result = group_window_expr_by_sort_keys(vec![])?;
1290 let expected: Vec<(WindowSortKey, Vec<Expr>)> = vec![];
1291 assert_eq!(expected, result);
1292 Ok(())
1293 }
1294
1295 #[test]
1296 fn test_group_window_expr_by_sort_keys_empty_window() -> Result<()> {
1297 let max1 = Expr::from(WindowFunction::new(
1298 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1299 vec![col("name")],
1300 ));
1301 let max2 = Expr::from(WindowFunction::new(
1302 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1303 vec![col("name")],
1304 ));
1305 let min3 = Expr::from(WindowFunction::new(
1306 WindowFunctionDefinition::AggregateUDF(min_udaf()),
1307 vec![col("name")],
1308 ));
1309 let sum4 = Expr::from(WindowFunction::new(
1310 WindowFunctionDefinition::AggregateUDF(sum_udaf()),
1311 vec![col("age")],
1312 ));
1313 let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
1314 let result = group_window_expr_by_sort_keys(exprs.to_vec())?;
1315 let key = vec![];
1316 let expected: Vec<(WindowSortKey, Vec<Expr>)> =
1317 vec![(key, vec![max1, max2, min3, sum4])];
1318 assert_eq!(expected, result);
1319 Ok(())
1320 }
1321
1322 #[test]
1323 fn test_group_window_expr_by_sort_keys() -> Result<()> {
1324 let age_asc = Sort::new(col("age"), true, true);
1325 let name_desc = Sort::new(col("name"), false, true);
1326 let created_at_desc = Sort::new(col("created_at"), false, true);
1327 let max1 = Expr::from(WindowFunction::new(
1328 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1329 vec![col("name")],
1330 ))
1331 .order_by(vec![age_asc.clone(), name_desc.clone()])
1332 .build()
1333 .unwrap();
1334 let max2 = Expr::from(WindowFunction::new(
1335 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1336 vec![col("name")],
1337 ));
1338 let min3 = Expr::from(WindowFunction::new(
1339 WindowFunctionDefinition::AggregateUDF(min_udaf()),
1340 vec![col("name")],
1341 ))
1342 .order_by(vec![age_asc.clone(), name_desc.clone()])
1343 .build()
1344 .unwrap();
1345 let sum4 = Expr::from(WindowFunction::new(
1346 WindowFunctionDefinition::AggregateUDF(sum_udaf()),
1347 vec![col("age")],
1348 ))
1349 .order_by(vec![
1350 name_desc.clone(),
1351 age_asc.clone(),
1352 created_at_desc.clone(),
1353 ])
1354 .build()
1355 .unwrap();
1356 let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
1358 let result = group_window_expr_by_sort_keys(exprs.to_vec())?;
1359
1360 let key1 = vec![(age_asc.clone(), false), (name_desc.clone(), false)];
1361 let key2 = vec![];
1362 let key3 = vec![
1363 (name_desc, false),
1364 (age_asc, false),
1365 (created_at_desc, false),
1366 ];
1367
1368 let expected: Vec<(WindowSortKey, Vec<Expr>)> = vec![
1369 (key1, vec![max1, min3]),
1370 (key2, vec![max2]),
1371 (key3, vec![sum4]),
1372 ];
1373 assert_eq!(expected, result);
1374 Ok(())
1375 }
1376
1377 #[test]
1378 fn avoid_generate_duplicate_sort_keys() -> Result<()> {
1379 let asc_or_desc = [true, false];
1380 let nulls_first_or_last = [true, false];
1381 let partition_by = &[col("age"), col("name"), col("created_at")];
1382 for asc_ in asc_or_desc {
1383 for nulls_first_ in nulls_first_or_last {
1384 let order_by = &[
1385 Sort {
1386 expr: col("age"),
1387 asc: asc_,
1388 nulls_first: nulls_first_,
1389 },
1390 Sort {
1391 expr: col("name"),
1392 asc: asc_,
1393 nulls_first: nulls_first_,
1394 },
1395 ];
1396
1397 let expected = vec![
1398 (
1399 Sort {
1400 expr: col("age"),
1401 asc: asc_,
1402 nulls_first: nulls_first_,
1403 },
1404 true,
1405 ),
1406 (
1407 Sort {
1408 expr: col("name"),
1409 asc: asc_,
1410 nulls_first: nulls_first_,
1411 },
1412 true,
1413 ),
1414 (
1415 Sort {
1416 expr: col("created_at"),
1417 asc: true,
1418 nulls_first: false,
1419 },
1420 true,
1421 ),
1422 ];
1423 let result = generate_sort_key(partition_by, order_by)?;
1424 assert_eq!(expected, result);
1425 }
1426 }
1427 Ok(())
1428 }
1429
1430 #[test]
1431 fn test_enumerate_grouping_sets() -> Result<()> {
1432 let multi_cols = vec![col("col1"), col("col2"), col("col3")];
1433 let simple_col = col("simple_col");
1434 let cube = cube(multi_cols.clone());
1435 let rollup = rollup(multi_cols.clone());
1436 let grouping_set = grouping_set(vec![multi_cols]);
1437
1438 let sets = enumerate_grouping_sets(vec![simple_col.clone()])?;
1440 let result = format!("[{}]", expr_vec_fmt!(sets));
1441 assert_eq!("[simple_col]", &result);
1442
1443 let sets = enumerate_grouping_sets(vec![cube.clone()])?;
1445 let result = format!("[{}]", expr_vec_fmt!(sets));
1446 assert_eq!("[CUBE (col1, col2, col3)]", &result);
1447
1448 let sets = enumerate_grouping_sets(vec![rollup.clone()])?;
1450 let result = format!("[{}]", expr_vec_fmt!(sets));
1451 assert_eq!("[ROLLUP (col1, col2, col3)]", &result);
1452
1453 let sets = enumerate_grouping_sets(vec![simple_col.clone(), cube.clone()])?;
1455 let result = format!("[{}]", expr_vec_fmt!(sets));
1456 assert_eq!(
1457 "[GROUPING SETS (\
1458 (simple_col), \
1459 (simple_col, col1), \
1460 (simple_col, col2), \
1461 (simple_col, col1, col2), \
1462 (simple_col, col3), \
1463 (simple_col, col1, col3), \
1464 (simple_col, col2, col3), \
1465 (simple_col, col1, col2, col3))]",
1466 &result
1467 );
1468
1469 let sets = enumerate_grouping_sets(vec![simple_col.clone(), rollup.clone()])?;
1471 let result = format!("[{}]", expr_vec_fmt!(sets));
1472 assert_eq!(
1473 "[GROUPING SETS (\
1474 (simple_col), \
1475 (simple_col, col1), \
1476 (simple_col, col1, col2), \
1477 (simple_col, col1, col2, col3))]",
1478 &result
1479 );
1480
1481 let sets =
1483 enumerate_grouping_sets(vec![simple_col.clone(), grouping_set.clone()])?;
1484 let result = format!("[{}]", expr_vec_fmt!(sets));
1485 assert_eq!(
1486 "[GROUPING SETS (\
1487 (simple_col, col1, col2, col3))]",
1488 &result
1489 );
1490
1491 let sets = enumerate_grouping_sets(vec![
1493 simple_col.clone(),
1494 grouping_set,
1495 rollup.clone(),
1496 ])?;
1497 let result = format!("[{}]", expr_vec_fmt!(sets));
1498 assert_eq!(
1499 "[GROUPING SETS (\
1500 (simple_col, col1, col2, col3), \
1501 (simple_col, col1, col2, col3, col1), \
1502 (simple_col, col1, col2, col3, col1, col2), \
1503 (simple_col, col1, col2, col3, col1, col2, col3))]",
1504 &result
1505 );
1506
1507 let sets = enumerate_grouping_sets(vec![simple_col, cube, rollup])?;
1509 let result = format!("[{}]", expr_vec_fmt!(sets));
1510 assert_eq!(
1511 "[GROUPING SETS (\
1512 (simple_col), \
1513 (simple_col, col1), \
1514 (simple_col, col1, col2), \
1515 (simple_col, col1, col2, col3), \
1516 (simple_col, col1), \
1517 (simple_col, col1, col1), \
1518 (simple_col, col1, col1, col2), \
1519 (simple_col, col1, col1, col2, col3), \
1520 (simple_col, col2), \
1521 (simple_col, col2, col1), \
1522 (simple_col, col2, col1, col2), \
1523 (simple_col, col2, col1, col2, col3), \
1524 (simple_col, col1, col2), \
1525 (simple_col, col1, col2, col1), \
1526 (simple_col, col1, col2, col1, col2), \
1527 (simple_col, col1, col2, col1, col2, col3), \
1528 (simple_col, col3), \
1529 (simple_col, col3, col1), \
1530 (simple_col, col3, col1, col2), \
1531 (simple_col, col3, col1, col2, col3), \
1532 (simple_col, col1, col3), \
1533 (simple_col, col1, col3, col1), \
1534 (simple_col, col1, col3, col1, col2), \
1535 (simple_col, col1, col3, col1, col2, col3), \
1536 (simple_col, col2, col3), \
1537 (simple_col, col2, col3, col1), \
1538 (simple_col, col2, col3, col1, col2), \
1539 (simple_col, col2, col3, col1, col2, col3), \
1540 (simple_col, col1, col2, col3), \
1541 (simple_col, col1, col2, col3, col1), \
1542 (simple_col, col1, col2, col3, col1, col2), \
1543 (simple_col, col1, col2, col3, col1, col2, col3))]",
1544 &result
1545 );
1546
1547 Ok(())
1548 }
1549 #[test]
1550 fn test_split_conjunction() {
1551 let expr = col("a");
1552 let result = split_conjunction(&expr);
1553 assert_eq!(result, vec![&expr]);
1554 }
1555
1556 #[test]
1557 fn test_split_conjunction_two() {
1558 let expr = col("a").eq(lit(5)).and(col("b"));
1559 let expr1 = col("a").eq(lit(5));
1560 let expr2 = col("b");
1561
1562 let result = split_conjunction(&expr);
1563 assert_eq!(result, vec![&expr1, &expr2]);
1564 }
1565
1566 #[test]
1567 fn test_split_conjunction_alias() {
1568 let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias"));
1569 let expr1 = col("a").eq(lit(5));
1570 let expr2 = col("b"); let result = split_conjunction(&expr);
1573 assert_eq!(result, vec![&expr1, &expr2]);
1574 }
1575
1576 #[test]
1577 fn test_split_conjunction_or() {
1578 let expr = col("a").eq(lit(5)).or(col("b"));
1579 let result = split_conjunction(&expr);
1580 assert_eq!(result, vec![&expr]);
1581 }
1582
1583 #[test]
1584 fn test_split_binary_owned() {
1585 let expr = col("a");
1586 assert_eq!(split_binary_owned(expr.clone(), Operator::And), vec![expr]);
1587 }
1588
1589 #[test]
1590 fn test_split_binary_owned_two() {
1591 assert_eq!(
1592 split_binary_owned(col("a").eq(lit(5)).and(col("b")), Operator::And),
1593 vec![col("a").eq(lit(5)), col("b")]
1594 );
1595 }
1596
1597 #[test]
1598 fn test_split_binary_owned_different_op() {
1599 let expr = col("a").eq(lit(5)).or(col("b"));
1600 assert_eq!(
1601 split_binary_owned(expr.clone(), Operator::And),
1603 vec![expr]
1604 );
1605 }
1606
1607 #[test]
1608 fn test_split_conjunction_owned() {
1609 let expr = col("a");
1610 assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
1611 }
1612
1613 #[test]
1614 fn test_split_conjunction_owned_two() {
1615 assert_eq!(
1616 split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))),
1617 vec![col("a").eq(lit(5)), col("b")]
1618 );
1619 }
1620
1621 #[test]
1622 fn test_split_conjunction_owned_alias() {
1623 assert_eq!(
1624 split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))),
1625 vec![
1626 col("a").eq(lit(5)),
1627 col("b"),
1629 ]
1630 );
1631 }
1632
1633 #[test]
1634 fn test_conjunction_empty() {
1635 assert_eq!(conjunction(vec![]), None);
1636 }
1637
1638 #[test]
1639 fn test_conjunction() {
1640 let expr = conjunction(vec![col("a"), col("b"), col("c")]);
1642
1643 assert_eq!(expr, Some(col("a").and(col("b")).and(col("c"))));
1645
1646 assert_ne!(expr, Some(col("a").and(col("b").and(col("c")))));
1648 }
1649
1650 #[test]
1651 fn test_disjunction_empty() {
1652 assert_eq!(disjunction(vec![]), None);
1653 }
1654
1655 #[test]
1656 fn test_disjunction() {
1657 let expr = disjunction(vec![col("a"), col("b"), col("c")]);
1659
1660 assert_eq!(expr, Some(col("a").or(col("b")).or(col("c"))));
1662
1663 assert_ne!(expr, Some(col("a").or(col("b").or(col("c")))));
1665 }
1666
1667 #[test]
1668 fn test_split_conjunction_owned_or() {
1669 let expr = col("a").eq(lit(5)).or(col("b"));
1670 assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]);
1671 }
1672
1673 #[test]
1674 fn test_collect_expr() -> Result<()> {
1675 let mut accum: HashSet<Column> = HashSet::new();
1676 expr_to_columns(
1677 &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
1678 &mut accum,
1679 )?;
1680 expr_to_columns(
1681 &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)),
1682 &mut accum,
1683 )?;
1684 assert_eq!(1, accum.len());
1685 assert!(accum.contains(&Column::from_name("a")));
1686 Ok(())
1687 }
1688
1689 #[test]
1690 fn test_can_hash() {
1691 let union_fields: UnionFields = [
1692 (0, Arc::new(Field::new("A", DataType::Int32, true))),
1693 (1, Arc::new(Field::new("B", DataType::Float64, true))),
1694 ]
1695 .into_iter()
1696 .collect();
1697
1698 let union_type = DataType::Union(union_fields, UnionMode::Sparse);
1699 assert!(!can_hash(&union_type));
1700
1701 let list_union_type =
1702 DataType::List(Arc::new(Field::new("my_union", union_type, true)));
1703 assert!(!can_hash(&list_union_type));
1704 }
1705
1706 #[test]
1707 fn test_generate_signature_error_msg_with_parameter_names() {
1708 let sig = Signature::one_of(
1709 vec![
1710 TypeSignature::Exact(vec![DataType::Utf8, DataType::Int64]),
1711 TypeSignature::Exact(vec![
1712 DataType::Utf8,
1713 DataType::Int64,
1714 DataType::Int64,
1715 ]),
1716 ],
1717 Volatility::Immutable,
1718 )
1719 .with_parameter_names(vec![
1720 "str".to_string(),
1721 "start_pos".to_string(),
1722 "length".to_string(),
1723 ])
1724 .expect("valid parameter names");
1725
1726 let error_msg = generate_signature_error_msg("substr", sig, &[DataType::Utf8]);
1728
1729 assert!(
1730 error_msg.contains("str: Utf8, start_pos: Int64"),
1731 "Expected 'str: Utf8, start_pos: Int64' in error message, got: {error_msg}"
1732 );
1733 assert!(
1734 error_msg.contains("str: Utf8, start_pos: Int64, length: Int64"),
1735 "Expected 'str: Utf8, start_pos: Int64, length: Int64' in error message, got: {error_msg}"
1736 );
1737 }
1738
1739 #[test]
1740 fn test_generate_signature_error_msg_without_parameter_names() {
1741 let sig = Signature::one_of(
1742 vec![TypeSignature::Any(2), TypeSignature::Any(3)],
1743 Volatility::Immutable,
1744 );
1745
1746 let error_msg = generate_signature_error_msg("my_func", sig, &[DataType::Int32]);
1747
1748 assert!(
1749 error_msg.contains("Any, Any"),
1750 "Expected 'Any, Any' without parameter names, got: {error_msg}"
1751 );
1752 }
1753}