1use std::ops::Deref;
19use std::sync::Arc;
20
21use crate::expressions::Column;
22use crate::utils::collect_columns;
23use crate::PhysicalExpr;
24
25use arrow::datatypes::{Field, Schema, SchemaRef};
26use datafusion_common::stats::{ColumnStatistics, Precision};
27use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
28use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
29
30use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
31use indexmap::IndexMap;
32use itertools::Itertools;
33
34#[derive(Debug, Clone)]
43pub struct ProjectionExpr {
44 pub expr: Arc<dyn PhysicalExpr>,
46 pub alias: String,
48}
49
50impl std::fmt::Display for ProjectionExpr {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 if self.expr.to_string() == self.alias {
53 write!(f, "{}", self.alias)
54 } else {
55 write!(f, "{} AS {}", self.expr, self.alias)
56 }
57 }
58}
59
60impl ProjectionExpr {
61 pub fn new(expr: Arc<dyn PhysicalExpr>, alias: String) -> Self {
63 Self { expr, alias }
64 }
65
66 pub fn new_from_expression(
68 expr: Arc<dyn PhysicalExpr>,
69 schema: &Schema,
70 ) -> Result<Self> {
71 let field = expr.return_field(schema)?;
72 Ok(Self {
73 expr,
74 alias: field.name().to_string(),
75 })
76 }
77}
78
79impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
80 fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
81 Self::new(value.0, value.1)
82 }
83}
84
85impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
86 fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
87 Self::new(Arc::clone(&value.0), value.1.clone())
88 }
89}
90
91impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
92 fn from(value: ProjectionExpr) -> Self {
93 (value.expr, value.alias)
94 }
95}
96
97#[derive(Debug, Clone)]
103pub struct ProjectionExprs {
104 exprs: Vec<ProjectionExpr>,
105}
106
107impl std::fmt::Display for ProjectionExprs {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
110 write!(f, "Projection[{}]", exprs.join(", "))
111 }
112}
113
114impl From<Vec<ProjectionExpr>> for ProjectionExprs {
115 fn from(value: Vec<ProjectionExpr>) -> Self {
116 Self { exprs: value }
117 }
118}
119
120impl From<&[ProjectionExpr]> for ProjectionExprs {
121 fn from(value: &[ProjectionExpr]) -> Self {
122 Self {
123 exprs: value.to_vec(),
124 }
125 }
126}
127
128impl FromIterator<ProjectionExpr> for ProjectionExprs {
129 fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
130 Self {
131 exprs: exprs.into_iter().collect::<Vec<_>>(),
132 }
133 }
134}
135
136impl AsRef<[ProjectionExpr]> for ProjectionExprs {
137 fn as_ref(&self) -> &[ProjectionExpr] {
138 &self.exprs
139 }
140}
141
142impl ProjectionExprs {
143 pub fn new<I>(exprs: I) -> Self
144 where
145 I: IntoIterator<Item = ProjectionExpr>,
146 {
147 Self {
148 exprs: exprs.into_iter().collect::<Vec<_>>(),
149 }
150 }
151
152 pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self {
196 let projection_exprs = indices.iter().map(|&i| {
197 let field = schema.field(i);
198 ProjectionExpr {
199 expr: Arc::new(Column::new(field.name(), i)),
200 alias: field.name().clone(),
201 }
202 });
203
204 Self::from_iter(projection_exprs)
205 }
206
207 pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
209 self.exprs.iter()
210 }
211
212 pub fn projection_mapping(
214 &self,
215 input_schema: &SchemaRef,
216 ) -> Result<ProjectionMapping> {
217 ProjectionMapping::try_new(
218 self.exprs
219 .iter()
220 .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
221 input_schema,
222 )
223 }
224
225 pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
227 self.exprs.iter().map(|e| Arc::clone(&e.expr))
228 }
229
230 pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
296 let mut new_exprs = Vec::with_capacity(other.exprs.len());
297 for proj_expr in &other.exprs {
298 let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
299 .ok_or_else(|| {
300 internal_datafusion_err!(
301 "Failed to combine projections: expression {} could not be applied on top of existing projections {}",
302 proj_expr.expr,
303 self.exprs.iter().map(|e| format!("{e}")).join(", ")
304 )
305 })?;
306 new_exprs.push(ProjectionExpr {
307 expr: new_expr,
308 alias: proj_expr.alias.clone(),
309 });
310 }
311 Ok(ProjectionExprs::new(new_exprs))
312 }
313
314 pub fn column_indices(&self) -> Vec<usize> {
319 self.exprs
320 .iter()
321 .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
322 .sorted_unstable()
323 .dedup()
324 .collect_vec()
325 }
326
327 pub fn ordered_column_indices(&self) -> Vec<usize> {
355 self.exprs
356 .iter()
357 .map(|e| {
358 e.expr
359 .as_any()
360 .downcast_ref::<Column>()
361 .expect("Expected column reference in projection")
362 .index()
363 })
364 .collect()
365 }
366
367 pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
372 let fields: Result<Vec<Field>> = self
373 .exprs
374 .iter()
375 .map(|proj_expr| {
376 let metadata = proj_expr
377 .expr
378 .return_field(input_schema)?
379 .metadata()
380 .clone();
381
382 let field = Field::new(
383 &proj_expr.alias,
384 proj_expr.expr.data_type(input_schema)?,
385 proj_expr.expr.nullable(input_schema)?,
386 )
387 .with_metadata(metadata);
388
389 Ok(field)
390 })
391 .collect();
392
393 Ok(Schema::new_with_metadata(
394 fields?,
395 input_schema.metadata().clone(),
396 ))
397 }
398
399 pub fn project_statistics(
403 &self,
404 mut stats: datafusion_common::Statistics,
405 input_schema: &Schema,
406 ) -> Result<datafusion_common::Statistics> {
407 let mut primitive_row_size = 0;
408 let mut primitive_row_size_possible = true;
409 let mut column_statistics = vec![];
410
411 for proj_expr in &self.exprs {
412 let expr = &proj_expr.expr;
413 let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
414 stats.column_statistics[col.index()].clone()
415 } else {
416 ColumnStatistics::new_unknown()
419 };
420 column_statistics.push(col_stats);
421 let data_type = expr.data_type(input_schema)?;
422 if let Some(value) = data_type.primitive_width() {
423 primitive_row_size += value;
424 continue;
425 }
426 primitive_row_size_possible = false;
427 }
428
429 if primitive_row_size_possible {
430 stats.total_byte_size =
431 Precision::Exact(primitive_row_size).multiply(&stats.num_rows);
432 }
433 stats.column_statistics = column_statistics;
434 Ok(stats)
435 }
436}
437
438impl<'a> IntoIterator for &'a ProjectionExprs {
439 type Item = &'a ProjectionExpr;
440 type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
441
442 fn into_iter(self) -> Self::IntoIter {
443 self.exprs.iter()
444 }
445}
446
447impl IntoIterator for ProjectionExprs {
448 type Item = ProjectionExpr;
449 type IntoIter = std::vec::IntoIter<ProjectionExpr>;
450
451 fn into_iter(self) -> Self::IntoIter {
452 self.exprs.into_iter()
453 }
454}
455
456pub fn update_expr(
484 expr: &Arc<dyn PhysicalExpr>,
485 projected_exprs: &[ProjectionExpr],
486 sync_with_child: bool,
487) -> Result<Option<Arc<dyn PhysicalExpr>>> {
488 #[derive(Debug, PartialEq)]
489 enum RewriteState {
490 Unchanged,
492 RewrittenValid,
494 RewrittenInvalid,
497 }
498
499 let mut state = RewriteState::Unchanged;
500
501 let new_expr = Arc::clone(expr)
502 .transform_up(|expr| {
503 if state == RewriteState::RewrittenInvalid {
504 return Ok(Transformed::no(expr));
505 }
506
507 let Some(column) = expr.as_any().downcast_ref::<Column>() else {
508 return Ok(Transformed::no(expr));
509 };
510 if sync_with_child {
511 state = RewriteState::RewrittenValid;
512 let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
514 internal_datafusion_err!(
515 "Column index {} out of bounds for projected expressions of length {}",
516 column.index(),
517 projected_exprs.len()
518 )
519 })?;
520 Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
521 } else {
522 state = RewriteState::RewrittenInvalid;
524 projected_exprs
526 .iter()
527 .enumerate()
528 .find_map(|(index, proj_expr)| {
529 proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
530 |projected_column| {
531 (column.name().eq(projected_column.name())
532 && column.index() == projected_column.index())
533 .then(|| {
534 state = RewriteState::RewrittenValid;
535 Arc::new(Column::new(&proj_expr.alias, index)) as _
536 })
537 },
538 )
539 })
540 .map_or_else(
541 || Ok(Transformed::no(expr)),
542 |c| Ok(Transformed::yes(c)),
543 )
544 }
545 })
546 .data()?;
547
548 Ok((state == RewriteState::RewrittenValid).then_some(new_expr))
549}
550
551#[derive(Clone, Debug, Default)]
554pub struct ProjectionTargets {
555 exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
559}
560
561impl ProjectionTargets {
562 pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
564 self.exprs_indices.first().unwrap()
566 }
567
568 pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
570 self.exprs_indices.push(target);
571 }
572}
573
574impl Deref for ProjectionTargets {
575 type Target = [(Arc<dyn PhysicalExpr>, usize)];
576
577 fn deref(&self) -> &Self::Target {
578 &self.exprs_indices
579 }
580}
581
582impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
583 fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
584 Self { exprs_indices }
585 }
586}
587
588#[derive(Clone, Debug)]
591pub struct ProjectionMapping {
592 map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
595}
596
597impl ProjectionMapping {
598 pub fn try_new(
612 expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
613 input_schema: &SchemaRef,
614 ) -> Result<Self> {
615 let mut map = IndexMap::<_, ProjectionTargets>::new();
617 for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
618 let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
619 let source_expr = expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
620 Some(col) => {
621 let idx = col.index();
626 let matching_field = input_schema.field(idx);
627 let matching_name = matching_field.name();
628 if col.name() != matching_name {
629 return internal_err!(
630 "Input field name {} does not match with the projection expression {}",
631 matching_name,
632 col.name()
633 );
634 }
635 let matching_column = Column::new(matching_name, idx);
636 Ok(Transformed::yes(Arc::new(matching_column)))
637 }
638 None => Ok(Transformed::no(e)),
639 })
640 .data()?;
641 map.entry(source_expr)
642 .or_default()
643 .push((target_expr, expr_idx));
644 }
645 Ok(Self { map })
646 }
647
648 pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
653 let projection_exprs = indices.iter().map(|index| {
654 let field = schema.field(*index);
655 let column = Arc::new(Column::new(field.name(), *index));
656 (column as _, field.name().clone())
657 });
658 ProjectionMapping::try_new(projection_exprs, schema)
659 }
660}
661
662impl Deref for ProjectionMapping {
663 type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
664
665 fn deref(&self) -> &Self::Target {
666 &self.map
667 }
668}
669
670impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
671 fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
672 iter: T,
673 ) -> Self {
674 Self {
675 map: IndexMap::from_iter(iter),
676 }
677 }
678}
679
680pub fn project_orderings(
693 orderings: &[LexOrdering],
694 schema: &SchemaRef,
695) -> Vec<LexOrdering> {
696 let mut projected_orderings = vec![];
697
698 for ordering in orderings {
699 projected_orderings.extend(project_ordering(ordering, schema));
700 }
701
702 projected_orderings
703}
704
705pub fn project_ordering(
735 ordering: &LexOrdering,
736 schema: &SchemaRef,
737) -> Option<LexOrdering> {
738 let mut projected_exprs = vec![];
739 for PhysicalSortExpr { expr, options } in ordering.iter() {
740 let transformed = Arc::clone(expr).transform_up(|expr| {
741 let Some(col) = expr.as_any().downcast_ref::<Column>() else {
742 return Ok(Transformed::no(expr));
743 };
744
745 let name = col.name();
746 if let Some((idx, _)) = schema.column_with_name(name) {
747 Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
749 } else {
750 plan_err!("")
753 }
754 });
755
756 match transformed {
757 Ok(transformed) => {
758 projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
759 }
760 Err(_) => {
761 break;
764 }
765 }
766 }
767
768 LexOrdering::new(projected_exprs)
769}
770
771#[cfg(test)]
772pub(crate) mod tests {
773 use std::collections::HashMap;
774
775 use super::*;
776 use crate::equivalence::{convert_to_orderings, EquivalenceProperties};
777 use crate::expressions::{col, BinaryExpr, Literal};
778 use crate::utils::tests::TestScalarUDF;
779 use crate::{PhysicalExprRef, ScalarFunctionExpr};
780
781 use arrow::compute::SortOptions;
782 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
783 use datafusion_common::config::ConfigOptions;
784 use datafusion_common::{ScalarValue, Statistics};
785 use datafusion_expr::{Operator, ScalarUDF};
786 use insta::assert_snapshot;
787
788 pub(crate) fn output_schema(
789 mapping: &ProjectionMapping,
790 input_schema: &Arc<Schema>,
791 ) -> Result<SchemaRef> {
792 let mut fields = vec![];
794 for (source, targets) in mapping.iter() {
795 let data_type = source.data_type(input_schema)?;
796 let nullable = source.nullable(input_schema)?;
797 for (target, _) in targets.iter() {
798 let Some(column) = target.as_any().downcast_ref::<Column>() else {
799 return plan_err!("Expects to have column");
800 };
801 fields.push(Field::new(column.name(), data_type.clone(), nullable));
802 }
803 }
804
805 let output_schema = Arc::new(Schema::new_with_metadata(
806 fields,
807 input_schema.metadata().clone(),
808 ));
809
810 Ok(output_schema)
811 }
812
813 #[test]
814 fn project_orderings() -> Result<()> {
815 let schema = Arc::new(Schema::new(vec![
816 Field::new("a", DataType::Int32, true),
817 Field::new("b", DataType::Int32, true),
818 Field::new("c", DataType::Int32, true),
819 Field::new("d", DataType::Int32, true),
820 Field::new("e", DataType::Int32, true),
821 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
822 ]));
823 let col_a = &col("a", &schema)?;
824 let col_b = &col("b", &schema)?;
825 let col_c = &col("c", &schema)?;
826 let col_d = &col("d", &schema)?;
827 let col_e = &col("e", &schema)?;
828 let col_ts = &col("ts", &schema)?;
829 let a_plus_b = Arc::new(BinaryExpr::new(
830 Arc::clone(col_a),
831 Operator::Plus,
832 Arc::clone(col_b),
833 )) as Arc<dyn PhysicalExpr>;
834 let b_plus_d = Arc::new(BinaryExpr::new(
835 Arc::clone(col_b),
836 Operator::Plus,
837 Arc::clone(col_d),
838 )) as Arc<dyn PhysicalExpr>;
839 let b_plus_e = Arc::new(BinaryExpr::new(
840 Arc::clone(col_b),
841 Operator::Plus,
842 Arc::clone(col_e),
843 )) as Arc<dyn PhysicalExpr>;
844 let c_plus_d = Arc::new(BinaryExpr::new(
845 Arc::clone(col_c),
846 Operator::Plus,
847 Arc::clone(col_d),
848 )) as Arc<dyn PhysicalExpr>;
849
850 let option_asc = SortOptions {
851 descending: false,
852 nulls_first: false,
853 };
854 let option_desc = SortOptions {
855 descending: true,
856 nulls_first: true,
857 };
858
859 let test_cases = vec![
860 (
862 vec![
864 vec![(col_b, option_asc)],
866 ],
867 vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
869 vec![
871 vec![("b_new", option_asc)],
873 ],
874 ),
875 (
877 vec![
879 ],
881 vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
883 vec![
885 ],
887 ),
888 (
890 vec![
892 vec![(col_ts, option_asc)],
894 ],
895 vec![
897 (col_b, "b_new".to_string()),
898 (col_a, "a_new".to_string()),
899 (col_ts, "ts_new".to_string()),
900 ],
901 vec![
903 vec![("ts_new", option_asc)],
905 ],
906 ),
907 (
909 vec![
911 vec![(col_a, option_asc), (col_ts, option_asc)],
913 vec![(col_b, option_asc), (col_ts, option_asc)],
915 ],
916 vec![
918 (col_b, "b_new".to_string()),
919 (col_a, "a_new".to_string()),
920 (col_ts, "ts_new".to_string()),
921 ],
922 vec![
924 vec![("a_new", option_asc), ("ts_new", option_asc)],
926 vec![("b_new", option_asc), ("ts_new", option_asc)],
928 ],
929 ),
930 (
932 vec![
934 vec![(&a_plus_b, option_asc)],
936 ],
937 vec![
939 (col_b, "b_new".to_string()),
940 (col_a, "a_new".to_string()),
941 (&a_plus_b, "a+b".to_string()),
942 ],
943 vec![
945 vec![("a+b", option_asc)],
947 ],
948 ),
949 (
951 vec![
953 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
955 ],
956 vec![
958 (col_b, "b_new".to_string()),
959 (col_a, "a_new".to_string()),
960 (col_c, "c_new".to_string()),
961 (&a_plus_b, "a+b".to_string()),
962 ],
963 vec![
965 vec![("a+b", option_asc), ("c_new", option_asc)],
967 ],
968 ),
969 (
971 vec![
972 vec![(col_a, option_asc), (col_b, option_asc)],
974 vec![(col_a, option_asc), (col_d, option_asc)],
976 ],
977 vec![
979 (col_b, "b_new".to_string()),
980 (col_a, "a_new".to_string()),
981 (col_d, "d_new".to_string()),
982 (&b_plus_d, "b+d".to_string()),
983 ],
984 vec![
986 vec![("a_new", option_asc), ("b_new", option_asc)],
988 vec![("a_new", option_asc), ("d_new", option_asc)],
990 vec![("a_new", option_asc), ("b+d", option_asc)],
992 ],
993 ),
994 (
996 vec![
998 vec![(&b_plus_d, option_asc)],
1000 ],
1001 vec![
1003 (col_b, "b_new".to_string()),
1004 (col_a, "a_new".to_string()),
1005 (col_d, "d_new".to_string()),
1006 (&b_plus_d, "b+d".to_string()),
1007 ],
1008 vec![
1010 vec![("b+d", option_asc)],
1012 ],
1013 ),
1014 (
1016 vec![
1018 vec![
1020 (col_a, option_asc),
1021 (col_d, option_asc),
1022 (col_b, option_asc),
1023 ],
1024 vec![(col_c, option_asc)],
1026 ],
1027 vec![
1029 (col_b, "b_new".to_string()),
1030 (col_a, "a_new".to_string()),
1031 (col_d, "d_new".to_string()),
1032 (col_c, "c_new".to_string()),
1033 ],
1034 vec![
1036 vec![
1038 ("a_new", option_asc),
1039 ("d_new", option_asc),
1040 ("b_new", option_asc),
1041 ],
1042 vec![("c_new", option_asc)],
1044 ],
1045 ),
1046 (
1048 vec![
1049 vec![
1051 (col_a, option_asc),
1052 (col_b, option_asc),
1053 (col_c, option_asc),
1054 ],
1055 vec![(col_a, option_asc), (col_d, option_asc)],
1057 ],
1058 vec![
1060 (col_b, "b_new".to_string()),
1061 (col_a, "a_new".to_string()),
1062 (col_c, "c_new".to_string()),
1063 (&c_plus_d, "c+d".to_string()),
1064 ],
1065 vec![
1067 vec![
1069 ("a_new", option_asc),
1070 ("b_new", option_asc),
1071 ("c_new", option_asc),
1072 ],
1073 vec![
1075 ("a_new", option_asc),
1076 ("b_new", option_asc),
1077 ("c+d", option_asc),
1078 ],
1079 ],
1080 ),
1081 (
1083 vec![
1085 vec![(col_a, option_asc), (col_b, option_asc)],
1087 vec![(col_a, option_asc), (col_d, option_asc)],
1089 ],
1090 vec![
1092 (col_b, "b_new".to_string()),
1093 (col_a, "a_new".to_string()),
1094 (&b_plus_d, "b+d".to_string()),
1095 ],
1096 vec![
1098 vec![("a_new", option_asc), ("b_new", option_asc)],
1100 vec![("a_new", option_asc), ("b+d", option_asc)],
1102 ],
1103 ),
1104 (
1106 vec![
1108 vec![
1110 (col_a, option_asc),
1111 (col_b, option_asc),
1112 (col_c, option_asc),
1113 ],
1114 ],
1115 vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1117 vec![
1119 vec![("a_new", option_asc)],
1121 ],
1122 ),
1123 (
1125 vec![
1127 vec![
1129 (col_a, option_asc),
1130 (col_b, option_asc),
1131 (col_c, option_asc),
1132 ],
1133 vec![
1135 (col_a, option_asc),
1136 (&a_plus_b, option_asc),
1137 (col_c, option_asc),
1138 ],
1139 ],
1140 vec![
1142 (col_c, "c_new".to_string()),
1143 (col_b, "b_new".to_string()),
1144 (col_a, "a_new".to_string()),
1145 (&a_plus_b, "a+b".to_string()),
1146 ],
1147 vec![
1149 vec![
1151 ("a_new", option_asc),
1152 ("b_new", option_asc),
1153 ("c_new", option_asc),
1154 ],
1155 vec![
1157 ("a_new", option_asc),
1158 ("a+b", option_asc),
1159 ("c_new", option_asc),
1160 ],
1161 ],
1162 ),
1163 (
1165 vec![
1167 vec![(col_a, option_asc), (col_b, option_asc)],
1169 vec![(col_c, option_asc), (col_b, option_asc)],
1171 vec![(col_d, option_asc), (col_e, option_asc)],
1173 ],
1174 vec![
1176 (col_c, "c_new".to_string()),
1177 (col_d, "d_new".to_string()),
1178 (col_a, "a_new".to_string()),
1179 (&b_plus_e, "b+e".to_string()),
1180 ],
1181 vec![
1183 vec![
1185 ("a_new", option_asc),
1186 ("d_new", option_asc),
1187 ("b+e", option_asc),
1188 ],
1189 vec![
1191 ("d_new", option_asc),
1192 ("a_new", option_asc),
1193 ("b+e", option_asc),
1194 ],
1195 vec![
1197 ("c_new", option_asc),
1198 ("d_new", option_asc),
1199 ("b+e", option_asc),
1200 ],
1201 vec![
1203 ("d_new", option_asc),
1204 ("c_new", option_asc),
1205 ("b+e", option_asc),
1206 ],
1207 ],
1208 ),
1209 (
1211 vec![
1213 vec![
1215 (col_a, option_asc),
1216 (col_c, option_asc),
1217 (col_b, option_asc),
1218 ],
1219 ],
1220 vec![
1222 (col_c, "c_new".to_string()),
1223 (col_a, "a_new".to_string()),
1224 (&a_plus_b, "a+b".to_string()),
1225 ],
1226 vec![
1228 vec![
1230 ("a_new", option_asc),
1231 ("c_new", option_asc),
1232 ("a+b", option_asc),
1233 ],
1234 ],
1235 ),
1236 (
1238 vec![
1240 vec![(col_a, option_asc), (col_b, option_asc)],
1242 vec![(col_c, option_asc), (col_b, option_desc)],
1244 vec![(col_e, option_asc)],
1246 ],
1247 vec![
1249 (col_c, "c_new".to_string()),
1250 (col_a, "a_new".to_string()),
1251 (col_b, "b_new".to_string()),
1252 (&b_plus_e, "b+e".to_string()),
1253 ],
1254 vec![
1256 vec![("a_new", option_asc), ("b_new", option_asc)],
1258 vec![("a_new", option_asc), ("b+e", option_asc)],
1260 vec![("c_new", option_asc), ("b_new", option_desc)],
1262 ],
1263 ),
1264 ];
1265
1266 for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1267 {
1268 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1269
1270 let orderings = convert_to_orderings(&orderings);
1271 eq_properties.add_orderings(orderings);
1272
1273 let proj_exprs = proj_exprs
1274 .into_iter()
1275 .map(|(expr, name)| (Arc::clone(expr), name));
1276 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1277 let output_schema = output_schema(&projection_mapping, &schema)?;
1278
1279 let expected = expected
1280 .into_iter()
1281 .map(|ordering| {
1282 ordering
1283 .into_iter()
1284 .map(|(name, options)| {
1285 (col(name, &output_schema).unwrap(), options)
1286 })
1287 .collect::<Vec<_>>()
1288 })
1289 .collect::<Vec<_>>();
1290 let expected = convert_to_orderings(&expected);
1291
1292 let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1293 let orderings = projected_eq.oeq_class();
1294
1295 let err_msg = format!(
1296 "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1297 );
1298
1299 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1300 for expected_ordering in &expected {
1301 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1302 }
1303 }
1304
1305 Ok(())
1306 }
1307
1308 #[test]
1309 fn project_orderings2() -> Result<()> {
1310 let schema = Arc::new(Schema::new(vec![
1311 Field::new("a", DataType::Int32, true),
1312 Field::new("b", DataType::Int32, true),
1313 Field::new("c", DataType::Int32, true),
1314 Field::new("d", DataType::Int32, true),
1315 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1316 ]));
1317 let col_a = &col("a", &schema)?;
1318 let col_b = &col("b", &schema)?;
1319 let col_c = &col("c", &schema)?;
1320 let col_ts = &col("ts", &schema)?;
1321 let a_plus_b = Arc::new(BinaryExpr::new(
1322 Arc::clone(col_a),
1323 Operator::Plus,
1324 Arc::clone(col_b),
1325 )) as Arc<dyn PhysicalExpr>;
1326
1327 let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1328
1329 let round_c = Arc::new(ScalarFunctionExpr::try_new(
1330 test_fun,
1331 vec![Arc::clone(col_c)],
1332 &schema,
1333 Arc::new(ConfigOptions::default()),
1334 )?) as PhysicalExprRef;
1335
1336 let option_asc = SortOptions {
1337 descending: false,
1338 nulls_first: false,
1339 };
1340
1341 let proj_exprs = vec![
1342 (col_b, "b_new".to_string()),
1343 (col_a, "a_new".to_string()),
1344 (col_c, "c_new".to_string()),
1345 (&round_c, "round_c_res".to_string()),
1346 ];
1347 let proj_exprs = proj_exprs
1348 .into_iter()
1349 .map(|(expr, name)| (Arc::clone(expr), name));
1350 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1351 let output_schema = output_schema(&projection_mapping, &schema)?;
1352
1353 let col_a_new = &col("a_new", &output_schema)?;
1354 let col_b_new = &col("b_new", &output_schema)?;
1355 let col_c_new = &col("c_new", &output_schema)?;
1356 let col_round_c_res = &col("round_c_res", &output_schema)?;
1357 let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1358 Arc::clone(col_a_new),
1359 Operator::Plus,
1360 Arc::clone(col_b_new),
1361 )) as Arc<dyn PhysicalExpr>;
1362
1363 let test_cases = [
1364 (
1366 vec![
1368 vec![(col_a, option_asc)],
1370 ],
1371 vec![
1373 vec![(col_a_new, option_asc)],
1375 ],
1376 ),
1377 (
1379 vec![
1381 vec![(&a_plus_b, option_asc)],
1383 ],
1384 vec![
1386 vec![(&a_new_plus_b_new, option_asc)],
1388 ],
1389 ),
1390 (
1392 vec![
1394 vec![(col_a, option_asc), (col_ts, option_asc)],
1396 ],
1397 vec![
1399 vec![(col_a_new, option_asc)],
1401 ],
1402 ),
1403 (
1405 vec![
1407 vec![
1409 (col_a, option_asc),
1410 (col_ts, option_asc),
1411 (col_b, option_asc),
1412 ],
1413 ],
1414 vec![
1416 vec![(col_a_new, option_asc)],
1418 ],
1419 ),
1420 (
1422 vec![
1424 vec![(col_a, option_asc), (col_c, option_asc)],
1426 ],
1427 vec![
1429 vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1431 vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1433 ],
1434 ),
1435 (
1437 vec![
1439 vec![(col_c, option_asc), (col_b, option_asc)],
1441 ],
1442 vec![
1444 vec![(col_round_c_res, option_asc)],
1446 vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1448 ],
1449 ),
1450 (
1452 vec![
1454 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1456 ],
1457 vec![
1459 vec![
1461 (&a_new_plus_b_new, option_asc),
1462 (col_round_c_res, option_asc),
1463 ],
1464 vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1466 ],
1467 ),
1468 ];
1469
1470 for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1471 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1472
1473 let orderings = convert_to_orderings(orderings);
1474 eq_properties.add_orderings(orderings);
1475
1476 let expected = convert_to_orderings(expected);
1477
1478 let projected_eq =
1479 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1480 let orderings = projected_eq.oeq_class();
1481
1482 let err_msg = format!(
1483 "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1484 );
1485
1486 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1487 for expected_ordering in &expected {
1488 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1489 }
1490 }
1491 Ok(())
1492 }
1493
1494 #[test]
1495 fn project_orderings3() -> Result<()> {
1496 let schema = Arc::new(Schema::new(vec![
1497 Field::new("a", DataType::Int32, true),
1498 Field::new("b", DataType::Int32, true),
1499 Field::new("c", DataType::Int32, true),
1500 Field::new("d", DataType::Int32, true),
1501 Field::new("e", DataType::Int32, true),
1502 Field::new("f", DataType::Int32, true),
1503 ]));
1504 let col_a = &col("a", &schema)?;
1505 let col_b = &col("b", &schema)?;
1506 let col_c = &col("c", &schema)?;
1507 let col_d = &col("d", &schema)?;
1508 let col_e = &col("e", &schema)?;
1509 let col_f = &col("f", &schema)?;
1510 let a_plus_b = Arc::new(BinaryExpr::new(
1511 Arc::clone(col_a),
1512 Operator::Plus,
1513 Arc::clone(col_b),
1514 )) as Arc<dyn PhysicalExpr>;
1515
1516 let option_asc = SortOptions {
1517 descending: false,
1518 nulls_first: false,
1519 };
1520
1521 let proj_exprs = vec![
1522 (col_c, "c_new".to_string()),
1523 (col_d, "d_new".to_string()),
1524 (&a_plus_b, "a+b".to_string()),
1525 ];
1526 let proj_exprs = proj_exprs
1527 .into_iter()
1528 .map(|(expr, name)| (Arc::clone(expr), name));
1529 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1530 let output_schema = output_schema(&projection_mapping, &schema)?;
1531
1532 let col_a_plus_b_new = &col("a+b", &output_schema)?;
1533 let col_c_new = &col("c_new", &output_schema)?;
1534 let col_d_new = &col("d_new", &output_schema)?;
1535
1536 let test_cases = vec![
1537 (
1539 vec![
1541 vec![(col_d, option_asc), (col_b, option_asc)],
1543 vec![(col_c, option_asc), (col_a, option_asc)],
1545 ],
1546 vec![],
1548 vec![
1550 vec![
1552 (col_d_new, option_asc),
1553 (col_c_new, option_asc),
1554 (col_a_plus_b_new, option_asc),
1555 ],
1556 vec![
1558 (col_c_new, option_asc),
1559 (col_d_new, option_asc),
1560 (col_a_plus_b_new, option_asc),
1561 ],
1562 ],
1563 ),
1564 (
1566 vec![
1568 vec![(col_d, option_asc), (col_b, option_asc)],
1570 vec![(col_c, option_asc), (col_e, option_asc)],
1572 ],
1573 vec![(col_e, col_a)],
1575 vec![
1577 vec![
1579 (col_d_new, option_asc),
1580 (col_c_new, option_asc),
1581 (col_a_plus_b_new, option_asc),
1582 ],
1583 vec![
1585 (col_c_new, option_asc),
1586 (col_d_new, option_asc),
1587 (col_a_plus_b_new, option_asc),
1588 ],
1589 ],
1590 ),
1591 (
1593 vec![
1595 vec![(col_d, option_asc), (col_b, option_asc)],
1597 vec![(col_c, option_asc), (col_e, option_asc)],
1599 ],
1600 vec![(col_a, col_f)],
1602 vec![
1604 vec![(col_d_new, option_asc)],
1606 vec![(col_c_new, option_asc)],
1608 ],
1609 ),
1610 ];
1611 for (orderings, equal_columns, expected) in test_cases {
1612 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1613 for (lhs, rhs) in equal_columns {
1614 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
1615 }
1616
1617 let orderings = convert_to_orderings(&orderings);
1618 eq_properties.add_orderings(orderings);
1619
1620 let expected = convert_to_orderings(&expected);
1621
1622 let projected_eq =
1623 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1624 let orderings = projected_eq.oeq_class();
1625
1626 let err_msg = format!(
1627 "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1628 );
1629
1630 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1631 for expected_ordering in &expected {
1632 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1633 }
1634 }
1635
1636 Ok(())
1637 }
1638
1639 fn get_stats() -> Statistics {
1640 Statistics {
1641 num_rows: Precision::Exact(5),
1642 total_byte_size: Precision::Exact(23),
1643 column_statistics: vec![
1644 ColumnStatistics {
1645 distinct_count: Precision::Exact(5),
1646 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1647 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1648 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1649 null_count: Precision::Exact(0),
1650 },
1651 ColumnStatistics {
1652 distinct_count: Precision::Exact(1),
1653 max_value: Precision::Exact(ScalarValue::from("x")),
1654 min_value: Precision::Exact(ScalarValue::from("a")),
1655 sum_value: Precision::Absent,
1656 null_count: Precision::Exact(3),
1657 },
1658 ColumnStatistics {
1659 distinct_count: Precision::Absent,
1660 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1661 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1662 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1663 null_count: Precision::Absent,
1664 },
1665 ],
1666 }
1667 }
1668
1669 fn get_schema() -> Schema {
1670 let field_0 = Field::new("col0", DataType::Int64, false);
1671 let field_1 = Field::new("col1", DataType::Utf8, false);
1672 let field_2 = Field::new("col2", DataType::Float32, false);
1673 Schema::new(vec![field_0, field_1, field_2])
1674 }
1675
1676 #[test]
1677 fn test_stats_projection_columns_only() {
1678 let source = get_stats();
1679 let schema = get_schema();
1680
1681 let projection = ProjectionExprs::new(vec![
1682 ProjectionExpr {
1683 expr: Arc::new(Column::new("col1", 1)),
1684 alias: "col1".to_string(),
1685 },
1686 ProjectionExpr {
1687 expr: Arc::new(Column::new("col0", 0)),
1688 alias: "col0".to_string(),
1689 },
1690 ]);
1691
1692 let result = projection.project_statistics(source, &schema).unwrap();
1693
1694 let expected = Statistics {
1695 num_rows: Precision::Exact(5),
1696 total_byte_size: Precision::Exact(23),
1697 column_statistics: vec![
1698 ColumnStatistics {
1699 distinct_count: Precision::Exact(1),
1700 max_value: Precision::Exact(ScalarValue::from("x")),
1701 min_value: Precision::Exact(ScalarValue::from("a")),
1702 sum_value: Precision::Absent,
1703 null_count: Precision::Exact(3),
1704 },
1705 ColumnStatistics {
1706 distinct_count: Precision::Exact(5),
1707 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1708 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1709 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1710 null_count: Precision::Exact(0),
1711 },
1712 ],
1713 };
1714
1715 assert_eq!(result, expected);
1716 }
1717
1718 #[test]
1719 fn test_stats_projection_column_with_primitive_width_only() {
1720 let source = get_stats();
1721 let schema = get_schema();
1722
1723 let projection = ProjectionExprs::new(vec![
1724 ProjectionExpr {
1725 expr: Arc::new(Column::new("col2", 2)),
1726 alias: "col2".to_string(),
1727 },
1728 ProjectionExpr {
1729 expr: Arc::new(Column::new("col0", 0)),
1730 alias: "col0".to_string(),
1731 },
1732 ]);
1733
1734 let result = projection.project_statistics(source, &schema).unwrap();
1735
1736 let expected = Statistics {
1737 num_rows: Precision::Exact(5),
1738 total_byte_size: Precision::Exact(60),
1739 column_statistics: vec![
1740 ColumnStatistics {
1741 distinct_count: Precision::Absent,
1742 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1743 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1744 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1745 null_count: Precision::Absent,
1746 },
1747 ColumnStatistics {
1748 distinct_count: Precision::Exact(5),
1749 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1750 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1751 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1752 null_count: Precision::Exact(0),
1753 },
1754 ],
1755 };
1756
1757 assert_eq!(result, expected);
1758 }
1759
1760 #[test]
1763 fn test_projection_new() -> Result<()> {
1764 let exprs = vec![
1765 ProjectionExpr {
1766 expr: Arc::new(Column::new("a", 0)),
1767 alias: "a".to_string(),
1768 },
1769 ProjectionExpr {
1770 expr: Arc::new(Column::new("b", 1)),
1771 alias: "b".to_string(),
1772 },
1773 ];
1774 let projection = ProjectionExprs::new(exprs.clone());
1775 assert_eq!(projection.as_ref().len(), 2);
1776 Ok(())
1777 }
1778
1779 #[test]
1780 fn test_projection_from_vec() -> Result<()> {
1781 let exprs = vec![ProjectionExpr {
1782 expr: Arc::new(Column::new("x", 0)),
1783 alias: "x".to_string(),
1784 }];
1785 let projection: ProjectionExprs = exprs.clone().into();
1786 assert_eq!(projection.as_ref().len(), 1);
1787 Ok(())
1788 }
1789
1790 #[test]
1791 fn test_projection_as_ref() -> Result<()> {
1792 let exprs = vec![
1793 ProjectionExpr {
1794 expr: Arc::new(Column::new("col1", 0)),
1795 alias: "col1".to_string(),
1796 },
1797 ProjectionExpr {
1798 expr: Arc::new(Column::new("col2", 1)),
1799 alias: "col2".to_string(),
1800 },
1801 ];
1802 let projection = ProjectionExprs::new(exprs);
1803 let as_ref: &[ProjectionExpr] = projection.as_ref();
1804 assert_eq!(as_ref.len(), 2);
1805 Ok(())
1806 }
1807
1808 #[test]
1809 fn test_column_indices_multiple_columns() -> Result<()> {
1810 let projection = ProjectionExprs::new(vec![
1812 ProjectionExpr {
1813 expr: Arc::new(Column::new("c", 5)),
1814 alias: "c".to_string(),
1815 },
1816 ProjectionExpr {
1817 expr: Arc::new(Column::new("b", 2)),
1818 alias: "b".to_string(),
1819 },
1820 ProjectionExpr {
1821 expr: Arc::new(Column::new("a", 0)),
1822 alias: "a".to_string(),
1823 },
1824 ]);
1825 assert_eq!(projection.column_indices(), vec![0, 2, 5]);
1827 Ok(())
1828 }
1829
1830 #[test]
1831 fn test_column_indices_duplicates() -> Result<()> {
1832 let projection = ProjectionExprs::new(vec![
1834 ProjectionExpr {
1835 expr: Arc::new(Column::new("a", 1)),
1836 alias: "a".to_string(),
1837 },
1838 ProjectionExpr {
1839 expr: Arc::new(Column::new("b", 3)),
1840 alias: "b".to_string(),
1841 },
1842 ProjectionExpr {
1843 expr: Arc::new(Column::new("a2", 1)), alias: "a2".to_string(),
1845 },
1846 ]);
1847 assert_eq!(projection.column_indices(), vec![1, 3]);
1848 Ok(())
1849 }
1850
1851 #[test]
1852 fn test_column_indices_unsorted() -> Result<()> {
1853 let projection = ProjectionExprs::new(vec![
1855 ProjectionExpr {
1856 expr: Arc::new(Column::new("c", 5)),
1857 alias: "c".to_string(),
1858 },
1859 ProjectionExpr {
1860 expr: Arc::new(Column::new("a", 1)),
1861 alias: "a".to_string(),
1862 },
1863 ProjectionExpr {
1864 expr: Arc::new(Column::new("b", 3)),
1865 alias: "b".to_string(),
1866 },
1867 ]);
1868 assert_eq!(projection.column_indices(), vec![1, 3, 5]);
1869 Ok(())
1870 }
1871
1872 #[test]
1873 fn test_column_indices_complex_expr() -> Result<()> {
1874 let expr = Arc::new(BinaryExpr::new(
1876 Arc::new(Column::new("a", 1)),
1877 Operator::Plus,
1878 Arc::new(Column::new("b", 4)),
1879 ));
1880 let projection = ProjectionExprs::new(vec![
1881 ProjectionExpr {
1882 expr,
1883 alias: "sum".to_string(),
1884 },
1885 ProjectionExpr {
1886 expr: Arc::new(Column::new("c", 2)),
1887 alias: "c".to_string(),
1888 },
1889 ]);
1890 assert_eq!(projection.column_indices(), vec![1, 2, 4]);
1892 Ok(())
1893 }
1894
1895 #[test]
1896 fn test_column_indices_empty() -> Result<()> {
1897 let projection = ProjectionExprs::new(vec![]);
1898 assert_eq!(projection.column_indices(), Vec::<usize>::new());
1899 Ok(())
1900 }
1901
1902 #[test]
1903 fn test_merge_simple_columns() -> Result<()> {
1904 let base_projection = ProjectionExprs::new(vec![
1906 ProjectionExpr {
1907 expr: Arc::new(Column::new("c", 2)),
1908 alias: "x".to_string(),
1909 },
1910 ProjectionExpr {
1911 expr: Arc::new(Column::new("b", 1)),
1912 alias: "y".to_string(),
1913 },
1914 ProjectionExpr {
1915 expr: Arc::new(Column::new("a", 0)),
1916 alias: "z".to_string(),
1917 },
1918 ]);
1919
1920 let top_projection = ProjectionExprs::new(vec![
1922 ProjectionExpr {
1923 expr: Arc::new(Column::new("y", 1)),
1924 alias: "col2".to_string(),
1925 },
1926 ProjectionExpr {
1927 expr: Arc::new(Column::new("x", 0)),
1928 alias: "col1".to_string(),
1929 },
1930 ]);
1931
1932 let merged = base_projection.try_merge(&top_projection)?;
1934 assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
1935
1936 Ok(())
1937 }
1938
1939 #[test]
1940 fn test_merge_with_expressions() -> Result<()> {
1941 let base_projection = ProjectionExprs::new(vec![
1943 ProjectionExpr {
1944 expr: Arc::new(Column::new("c", 2)),
1945 alias: "x".to_string(),
1946 },
1947 ProjectionExpr {
1948 expr: Arc::new(Column::new("b", 1)),
1949 alias: "y".to_string(),
1950 },
1951 ProjectionExpr {
1952 expr: Arc::new(Column::new("a", 0)),
1953 alias: "z".to_string(),
1954 },
1955 ]);
1956
1957 let top_projection = ProjectionExprs::new(vec![
1959 ProjectionExpr {
1960 expr: Arc::new(BinaryExpr::new(
1961 Arc::new(Column::new("y", 1)),
1962 Operator::Plus,
1963 Arc::new(Column::new("z", 2)),
1964 )),
1965 alias: "c2".to_string(),
1966 },
1967 ProjectionExpr {
1968 expr: Arc::new(BinaryExpr::new(
1969 Arc::new(Column::new("x", 0)),
1970 Operator::Plus,
1971 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1972 )),
1973 alias: "c1".to_string(),
1974 },
1975 ]);
1976
1977 let merged = base_projection.try_merge(&top_projection)?;
1979 assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
1980
1981 Ok(())
1982 }
1983
1984 #[test]
1985 fn try_merge_error() {
1986 let base = ProjectionExprs::new(vec![
1988 ProjectionExpr {
1989 expr: Arc::new(Column::new("a", 0)),
1990 alias: "x".to_string(),
1991 },
1992 ProjectionExpr {
1993 expr: Arc::new(Column::new("b", 1)),
1994 alias: "y".to_string(),
1995 },
1996 ]);
1997
1998 let top = ProjectionExprs::new(vec![ProjectionExpr {
2000 expr: Arc::new(Column::new("z", 5)), alias: "result".to_string(),
2002 }]);
2003
2004 let err_msg = base.try_merge(&top).unwrap_err().to_string();
2006 assert!(
2007 err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2008 "Unexpected error message: {err_msg}",
2009 );
2010 }
2011
2012 #[test]
2013 fn test_project_schema_simple_columns() -> Result<()> {
2014 let input_schema = get_schema();
2016
2017 let projection = ProjectionExprs::new(vec![
2019 ProjectionExpr {
2020 expr: Arc::new(Column::new("col2", 2)),
2021 alias: "c".to_string(),
2022 },
2023 ProjectionExpr {
2024 expr: Arc::new(Column::new("col0", 0)),
2025 alias: "a".to_string(),
2026 },
2027 ]);
2028
2029 let output_schema = projection.project_schema(&input_schema)?;
2030
2031 assert_eq!(output_schema.fields().len(), 2);
2033
2034 assert_eq!(output_schema.field(0).name(), "c");
2036 assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2037
2038 assert_eq!(output_schema.field(1).name(), "a");
2040 assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2041
2042 Ok(())
2043 }
2044
2045 #[test]
2046 fn test_project_schema_with_expressions() -> Result<()> {
2047 let input_schema = get_schema();
2049
2050 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2052 expr: Arc::new(BinaryExpr::new(
2053 Arc::new(Column::new("col0", 0)),
2054 Operator::Plus,
2055 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2056 )),
2057 alias: "incremented".to_string(),
2058 }]);
2059
2060 let output_schema = projection.project_schema(&input_schema)?;
2061
2062 assert_eq!(output_schema.fields().len(), 1);
2064
2065 assert_eq!(output_schema.field(0).name(), "incremented");
2067 assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2068
2069 Ok(())
2070 }
2071
2072 #[test]
2073 fn test_project_schema_preserves_metadata() -> Result<()> {
2074 let mut metadata = HashMap::new();
2076 metadata.insert("key".to_string(), "value".to_string());
2077 let field_with_metadata =
2078 Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2079 let input_schema = Schema::new(vec![
2080 field_with_metadata,
2081 Field::new("col1", DataType::Utf8, false),
2082 ]);
2083
2084 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2086 expr: Arc::new(Column::new("col0", 0)),
2087 alias: "renamed".to_string(),
2088 }]);
2089
2090 let output_schema = projection.project_schema(&input_schema)?;
2091
2092 assert_eq!(output_schema.fields().len(), 1);
2094
2095 assert_eq!(output_schema.field(0).name(), "renamed");
2097 assert_eq!(output_schema.field(0).metadata(), &metadata);
2098
2099 Ok(())
2100 }
2101
2102 #[test]
2103 fn test_project_schema_empty() -> Result<()> {
2104 let input_schema = get_schema();
2105 let projection = ProjectionExprs::new(vec![]);
2106
2107 let output_schema = projection.project_schema(&input_schema)?;
2108
2109 assert_eq!(output_schema.fields().len(), 0);
2110
2111 Ok(())
2112 }
2113
2114 #[test]
2115 fn test_project_statistics_columns_only() -> Result<()> {
2116 let input_stats = get_stats();
2117 let input_schema = get_schema();
2118
2119 let projection = ProjectionExprs::new(vec![
2121 ProjectionExpr {
2122 expr: Arc::new(Column::new("col1", 1)),
2123 alias: "text".to_string(),
2124 },
2125 ProjectionExpr {
2126 expr: Arc::new(Column::new("col0", 0)),
2127 alias: "num".to_string(),
2128 },
2129 ]);
2130
2131 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2132
2133 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2135
2136 assert_eq!(output_stats.column_statistics.len(), 2);
2138
2139 assert_eq!(
2141 output_stats.column_statistics[0].distinct_count,
2142 Precision::Exact(1)
2143 );
2144 assert_eq!(
2145 output_stats.column_statistics[0].max_value,
2146 Precision::Exact(ScalarValue::from("x"))
2147 );
2148
2149 assert_eq!(
2151 output_stats.column_statistics[1].distinct_count,
2152 Precision::Exact(5)
2153 );
2154 assert_eq!(
2155 output_stats.column_statistics[1].max_value,
2156 Precision::Exact(ScalarValue::Int64(Some(21)))
2157 );
2158
2159 Ok(())
2160 }
2161
2162 #[test]
2163 fn test_project_statistics_with_expressions() -> Result<()> {
2164 let input_stats = get_stats();
2165 let input_schema = get_schema();
2166
2167 let projection = ProjectionExprs::new(vec![
2169 ProjectionExpr {
2170 expr: Arc::new(BinaryExpr::new(
2171 Arc::new(Column::new("col0", 0)),
2172 Operator::Plus,
2173 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2174 )),
2175 alias: "incremented".to_string(),
2176 },
2177 ProjectionExpr {
2178 expr: Arc::new(Column::new("col1", 1)),
2179 alias: "text".to_string(),
2180 },
2181 ]);
2182
2183 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2184
2185 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2187
2188 assert_eq!(output_stats.column_statistics.len(), 2);
2190
2191 assert_eq!(
2193 output_stats.column_statistics[0].distinct_count,
2194 Precision::Absent
2195 );
2196 assert_eq!(
2197 output_stats.column_statistics[0].max_value,
2198 Precision::Absent
2199 );
2200
2201 assert_eq!(
2203 output_stats.column_statistics[1].distinct_count,
2204 Precision::Exact(1)
2205 );
2206
2207 Ok(())
2208 }
2209
2210 #[test]
2211 fn test_project_statistics_primitive_width_only() -> Result<()> {
2212 let input_stats = get_stats();
2213 let input_schema = get_schema();
2214
2215 let projection = ProjectionExprs::new(vec![
2217 ProjectionExpr {
2218 expr: Arc::new(Column::new("col2", 2)),
2219 alias: "f".to_string(),
2220 },
2221 ProjectionExpr {
2222 expr: Arc::new(Column::new("col0", 0)),
2223 alias: "i".to_string(),
2224 },
2225 ]);
2226
2227 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2228
2229 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2231
2232 assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2235
2236 assert_eq!(output_stats.column_statistics.len(), 2);
2238
2239 Ok(())
2240 }
2241
2242 #[test]
2243 fn test_project_statistics_empty() -> Result<()> {
2244 let input_stats = get_stats();
2245 let input_schema = get_schema();
2246
2247 let projection = ProjectionExprs::new(vec![]);
2248
2249 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2250
2251 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2253
2254 assert_eq!(output_stats.column_statistics.len(), 0);
2256
2257 assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2259
2260 Ok(())
2261 }
2262}