1use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24use crate::expressions::{Column, Literal};
25use crate::utils::collect_columns;
26
27use arrow::array::{RecordBatch, RecordBatchOptions};
28use arrow::datatypes::{Field, Schema, SchemaRef};
29use datafusion_common::stats::{ColumnStatistics, Precision};
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{
32 Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err,
33};
34
35use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet;
36use datafusion_physical_expr_common::metrics::ExpressionEvaluatorMetrics;
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
39use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays_with_metrics;
40use indexmap::IndexMap;
41use itertools::Itertools;
42
43#[derive(Debug, Clone)]
54pub struct ProjectionExpr {
55 pub expr: Arc<dyn PhysicalExpr>,
57 pub alias: String,
59}
60
61impl PartialEq for ProjectionExpr {
62 fn eq(&self, other: &Self) -> bool {
63 let ProjectionExpr { expr, alias } = self;
64 expr.eq(&other.expr) && *alias == other.alias
65 }
66}
67
68impl Eq for ProjectionExpr {}
69
70impl std::fmt::Display for ProjectionExpr {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 if self.expr.to_string() == self.alias {
73 write!(f, "{}", self.alias)
74 } else {
75 write!(f, "{} AS {}", self.expr, self.alias)
76 }
77 }
78}
79
80impl ProjectionExpr {
81 pub fn new(expr: Arc<dyn PhysicalExpr>, alias: impl Into<String>) -> Self {
83 let alias = alias.into();
84 Self { expr, alias }
85 }
86
87 pub fn new_from_expression(
89 expr: Arc<dyn PhysicalExpr>,
90 schema: &Schema,
91 ) -> Result<Self> {
92 let field = expr.return_field(schema)?;
93 Ok(Self {
94 expr,
95 alias: field.name().to_string(),
96 })
97 }
98}
99
100impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
101 fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
102 Self::new(value.0, value.1)
103 }
104}
105
106impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
107 fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
108 Self::new(Arc::clone(&value.0), value.1.clone())
109 }
110}
111
112impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
113 fn from(value: ProjectionExpr) -> Self {
114 (value.expr, value.alias)
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct ProjectionExprs {
128 exprs: Vec<ProjectionExpr>,
129}
130
131impl std::fmt::Display for ProjectionExprs {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
134 write!(f, "Projection[{}]", exprs.join(", "))
135 }
136}
137
138impl From<Vec<ProjectionExpr>> for ProjectionExprs {
139 fn from(value: Vec<ProjectionExpr>) -> Self {
140 Self { exprs: value }
141 }
142}
143
144impl From<&[ProjectionExpr]> for ProjectionExprs {
145 fn from(value: &[ProjectionExpr]) -> Self {
146 Self {
147 exprs: value.to_vec(),
148 }
149 }
150}
151
152impl FromIterator<ProjectionExpr> for ProjectionExprs {
153 fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
154 Self {
155 exprs: exprs.into_iter().collect::<Vec<_>>(),
156 }
157 }
158}
159
160impl AsRef<[ProjectionExpr]> for ProjectionExprs {
161 fn as_ref(&self) -> &[ProjectionExpr] {
162 &self.exprs
163 }
164}
165
166impl ProjectionExprs {
167 pub fn new<I>(exprs: I) -> Self
168 where
169 I: IntoIterator<Item = ProjectionExpr>,
170 {
171 Self {
172 exprs: exprs.into_iter().collect::<Vec<_>>(),
173 }
174 }
175
176 pub fn from_indices(indices: &[usize], schema: &Schema) -> Self {
220 let projection_exprs = indices.iter().map(|&i| {
221 let field = schema.field(i);
222 ProjectionExpr {
223 expr: Arc::new(Column::new(field.name(), i)),
224 alias: field.name().clone(),
225 }
226 });
227
228 Self::from_iter(projection_exprs)
229 }
230
231 pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
233 self.exprs.iter()
234 }
235
236 pub fn projection_mapping(
238 &self,
239 input_schema: &SchemaRef,
240 ) -> Result<ProjectionMapping> {
241 ProjectionMapping::try_new(
242 self.exprs
243 .iter()
244 .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
245 input_schema,
246 )
247 }
248
249 pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
251 self.exprs.iter().map(|e| Arc::clone(&e.expr))
252 }
253
254 pub fn try_map_exprs<F>(self, mut f: F) -> Result<Self>
283 where
284 F: FnMut(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
285 {
286 let exprs = self
287 .exprs
288 .into_iter()
289 .map(|mut proj| {
290 proj.expr = f(proj.expr)?;
291 Ok(proj)
292 })
293 .collect::<Result<Vec<_>>>()?;
294 Ok(Self::new(exprs))
295 }
296
297 pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
363 let mut new_exprs = Vec::with_capacity(other.exprs.len());
364 for proj_expr in &other.exprs {
365 let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
366 .ok_or_else(|| {
367 internal_datafusion_err!(
368 "Failed to combine projections: expression {} could not be applied on top of existing projections {}",
369 proj_expr.expr,
370 self.exprs.iter().map(|e| format!("{e}")).join(", ")
371 )
372 })?;
373 new_exprs.push(ProjectionExpr {
374 expr: new_expr,
375 alias: proj_expr.alias.clone(),
376 });
377 }
378 Ok(ProjectionExprs::new(new_exprs))
379 }
380
381 pub fn column_indices(&self) -> Vec<usize> {
386 self.exprs
387 .iter()
388 .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
389 .sorted_unstable()
390 .dedup()
391 .collect_vec()
392 }
393
394 #[deprecated(
426 since = "52.0.0",
427 note = "Use column_indices() instead. This method will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
428 )]
429 pub fn ordered_column_indices(&self) -> Vec<usize> {
430 self.exprs
431 .iter()
432 .map(|e| {
433 e.expr
434 .as_any()
435 .downcast_ref::<Column>()
436 .expect("Expected column reference in projection")
437 .index()
438 })
439 .collect()
440 }
441
442 pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
447 let fields: Result<Vec<Field>> = self
448 .exprs
449 .iter()
450 .map(|proj_expr| {
451 let metadata = proj_expr
452 .expr
453 .return_field(input_schema)?
454 .metadata()
455 .clone();
456
457 let field = Field::new(
458 &proj_expr.alias,
459 proj_expr.expr.data_type(input_schema)?,
460 proj_expr.expr.nullable(input_schema)?,
461 )
462 .with_metadata(metadata);
463
464 Ok(field)
465 })
466 .collect();
467
468 Ok(Schema::new_with_metadata(
469 fields?,
470 input_schema.metadata().clone(),
471 ))
472 }
473
474 pub fn make_projector(&self, input_schema: &Schema) -> Result<Projector> {
483 let output_schema = Arc::new(self.project_schema(input_schema)?);
484 Ok(Projector {
485 projection: self.clone(),
486 output_schema,
487 expression_metrics: None,
488 })
489 }
490
491 pub fn create_expression_metrics(
492 &self,
493 metrics: &ExecutionPlanMetricsSet,
494 partition: usize,
495 ) -> ExpressionEvaluatorMetrics {
496 let labels: Vec<String> = self
497 .exprs
498 .iter()
499 .map(|proj_expr| {
500 let expr_sql = fmt_sql(proj_expr.expr.as_ref()).to_string();
501 if proj_expr.expr.to_string() == proj_expr.alias {
502 expr_sql
503 } else {
504 format!("{expr_sql} AS {}", proj_expr.alias)
505 }
506 })
507 .collect();
508 ExpressionEvaluatorMetrics::new(metrics, partition, labels)
509 }
510
511 pub fn project_statistics(
604 &self,
605 mut stats: datafusion_common::Statistics,
606 output_schema: &Schema,
607 ) -> Result<datafusion_common::Statistics> {
608 let mut column_statistics = vec![];
609
610 for proj_expr in &self.exprs {
611 let expr = &proj_expr.expr;
612 let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
613 std::mem::take(&mut stats.column_statistics[col.index()])
614 } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
615 let data_type = expr.data_type(output_schema)?;
617
618 if literal.value().is_null() {
619 let null_count = match stats.num_rows {
620 Precision::Exact(num_rows) => Precision::Exact(num_rows),
621 _ => Precision::Absent,
622 };
623
624 ColumnStatistics {
625 min_value: Precision::Exact(literal.value().clone()),
626 max_value: Precision::Exact(literal.value().clone()),
627 distinct_count: Precision::Exact(1),
628 null_count,
629 sum_value: Precision::Exact(literal.value().clone()),
630 byte_size: Precision::Exact(0),
631 }
632 } else {
633 let value = literal.value();
634 let distinct_count = Precision::Exact(1);
635 let null_count = Precision::Exact(0);
636
637 let byte_size = if let Some(byte_width) = data_type.primitive_width()
638 {
639 stats.num_rows.multiply(&Precision::Exact(byte_width))
640 } else {
641 Precision::Absent
643 };
644
645 let sum_value = Precision::<ScalarValue>::from(stats.num_rows)
646 .cast_to(&value.data_type())
647 .ok()
648 .map(|row_count| {
649 Precision::Exact(value.clone()).multiply(&row_count)
650 })
651 .unwrap_or(Precision::Absent);
652
653 ColumnStatistics {
654 min_value: Precision::Exact(value.clone()),
655 max_value: Precision::Exact(value.clone()),
656 distinct_count,
657 null_count,
658 sum_value,
659 byte_size,
660 }
661 }
662 } else {
663 ColumnStatistics::new_unknown()
666 };
667 column_statistics.push(col_stats);
668 }
669 stats.calculate_total_byte_size(output_schema);
670 stats.column_statistics = column_statistics;
671 Ok(stats)
672 }
673}
674
675impl<'a> IntoIterator for &'a ProjectionExprs {
676 type Item = &'a ProjectionExpr;
677 type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
678
679 fn into_iter(self) -> Self::IntoIter {
680 self.exprs.iter()
681 }
682}
683
684#[derive(Clone, Debug)]
693pub struct Projector {
694 projection: ProjectionExprs,
695 output_schema: SchemaRef,
696 expression_metrics: Option<ExpressionEvaluatorMetrics>,
698}
699
700impl Projector {
701 pub fn with_metrics(
706 &self,
707 metrics: &ExecutionPlanMetricsSet,
708 partition: usize,
709 ) -> Self {
710 let expr_metrics = self
711 .projection
712 .create_expression_metrics(metrics, partition);
713 Self {
714 expression_metrics: Some(expr_metrics),
715 projection: self.projection.clone(),
716 output_schema: Arc::clone(&self.output_schema),
717 }
718 }
719
720 pub fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
727 let arrays = evaluate_expressions_to_arrays_with_metrics(
728 self.projection.exprs.iter().map(|p| &p.expr),
729 batch,
730 self.expression_metrics.as_ref(),
731 )?;
732
733 if arrays.is_empty() {
734 let options =
735 RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
736 RecordBatch::try_new_with_options(
737 Arc::clone(&self.output_schema),
738 arrays,
739 &options,
740 )
741 .map_err(Into::into)
742 } else {
743 RecordBatch::try_new(Arc::clone(&self.output_schema), arrays)
744 .map_err(Into::into)
745 }
746 }
747
748 pub fn output_schema(&self) -> &SchemaRef {
749 &self.output_schema
750 }
751
752 pub fn projection(&self) -> &ProjectionExprs {
753 &self.projection
754 }
755}
756
757impl IntoIterator for ProjectionExprs {
758 type Item = ProjectionExpr;
759 type IntoIter = std::vec::IntoIter<ProjectionExpr>;
760
761 fn into_iter(self) -> Self::IntoIter {
762 self.exprs.into_iter()
763 }
764}
765
766pub fn update_expr(
794 expr: &Arc<dyn PhysicalExpr>,
795 projected_exprs: &[ProjectionExpr],
796 sync_with_child: bool,
797) -> Result<Option<Arc<dyn PhysicalExpr>>> {
798 #[derive(Debug, PartialEq)]
799 enum RewriteState {
800 Unchanged,
802 RewrittenValid,
804 RewrittenInvalid,
807 }
808
809 let mut state = RewriteState::Unchanged;
810
811 let new_expr = Arc::clone(expr)
812 .transform_up(|expr| {
813 if state == RewriteState::RewrittenInvalid {
814 return Ok(Transformed::no(expr));
815 }
816
817 let Some(column) = expr.as_any().downcast_ref::<Column>() else {
818 return Ok(Transformed::no(expr));
819 };
820 if sync_with_child {
821 state = RewriteState::RewrittenValid;
822 let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
824 internal_datafusion_err!(
825 "Column index {} out of bounds for projected expressions of length {}",
826 column.index(),
827 projected_exprs.len()
828 )
829 })?;
830 Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
831 } else {
832 state = RewriteState::RewrittenInvalid;
834 projected_exprs
836 .iter()
837 .enumerate()
838 .find_map(|(index, proj_expr)| {
839 proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
840 |projected_column| {
841 (column.name().eq(projected_column.name())
842 && column.index() == projected_column.index())
843 .then(|| {
844 state = RewriteState::RewrittenValid;
845 Arc::new(Column::new(&proj_expr.alias, index)) as _
846 })
847 },
848 )
849 })
850 .map_or_else(
851 || Ok(Transformed::no(expr)),
852 |c| Ok(Transformed::yes(c)),
853 )
854 }
855 })
856 .data()?;
857
858 match state {
859 RewriteState::RewrittenInvalid => Ok(None),
860 RewriteState::Unchanged | RewriteState::RewrittenValid => Ok(Some(new_expr)),
864 }
865}
866
867#[derive(Clone, Debug, Default)]
870pub struct ProjectionTargets {
871 exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
875}
876
877impl ProjectionTargets {
878 pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
880 self.exprs_indices.first().unwrap()
882 }
883
884 pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
886 self.exprs_indices.push(target);
887 }
888}
889
890impl Deref for ProjectionTargets {
891 type Target = [(Arc<dyn PhysicalExpr>, usize)];
892
893 fn deref(&self) -> &Self::Target {
894 &self.exprs_indices
895 }
896}
897
898impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
899 fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
900 Self { exprs_indices }
901 }
902}
903
904#[derive(Clone, Debug)]
907pub struct ProjectionMapping {
908 map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
911}
912
913impl ProjectionMapping {
914 pub fn try_new(
928 expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
929 input_schema: &SchemaRef,
930 ) -> Result<Self> {
931 let mut map = IndexMap::<_, ProjectionTargets>::new();
933 for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
934 let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
935 let source_expr = expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
936 Some(col) => {
937 let idx = col.index();
942 let matching_field = input_schema.field(idx);
943 let matching_name = matching_field.name();
944 assert_or_internal_err!(
945 col.name() == matching_name,
946 "Input field name {matching_name} does not match with the projection expression {}",
947 col.name()
948 );
949 let matching_column = Column::new(matching_name, idx);
950 Ok(Transformed::yes(Arc::new(matching_column)))
951 }
952 None => Ok(Transformed::no(e)),
953 })
954 .data()?;
955 map.entry(source_expr)
956 .or_default()
957 .push((target_expr, expr_idx));
958 }
959 Ok(Self { map })
960 }
961
962 pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
967 let projection_exprs = indices.iter().map(|index| {
968 let field = schema.field(*index);
969 let column = Arc::new(Column::new(field.name(), *index));
970 (column as _, field.name().clone())
971 });
972 ProjectionMapping::try_new(projection_exprs, schema)
973 }
974}
975
976impl Deref for ProjectionMapping {
977 type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
978
979 fn deref(&self) -> &Self::Target {
980 &self.map
981 }
982}
983
984impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
985 fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
986 iter: T,
987 ) -> Self {
988 Self {
989 map: IndexMap::from_iter(iter),
990 }
991 }
992}
993
994pub fn project_orderings(
1007 orderings: &[LexOrdering],
1008 schema: &SchemaRef,
1009) -> Vec<LexOrdering> {
1010 let mut projected_orderings = vec![];
1011
1012 for ordering in orderings {
1013 projected_orderings.extend(project_ordering(ordering, schema));
1014 }
1015
1016 projected_orderings
1017}
1018
1019pub fn project_ordering(
1049 ordering: &LexOrdering,
1050 schema: &SchemaRef,
1051) -> Option<LexOrdering> {
1052 let mut projected_exprs = vec![];
1053 for PhysicalSortExpr { expr, options } in ordering.iter() {
1054 let transformed = Arc::clone(expr).transform_up(|expr| {
1055 let Some(col) = expr.as_any().downcast_ref::<Column>() else {
1056 return Ok(Transformed::no(expr));
1057 };
1058
1059 let name = col.name();
1060 if let Some((idx, _)) = schema.column_with_name(name) {
1061 Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
1063 } else {
1064 plan_err!("")
1067 }
1068 });
1069
1070 match transformed {
1071 Ok(transformed) => {
1072 projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
1073 }
1074 Err(_) => {
1075 break;
1078 }
1079 }
1080 }
1081
1082 LexOrdering::new(projected_exprs)
1083}
1084
1085#[cfg(test)]
1086pub(crate) mod tests {
1087 use std::collections::HashMap;
1088
1089 use super::*;
1090 use crate::equivalence::{EquivalenceProperties, convert_to_orderings};
1091 use crate::expressions::{BinaryExpr, Literal, col};
1092 use crate::utils::tests::TestScalarUDF;
1093 use crate::{PhysicalExprRef, ScalarFunctionExpr};
1094
1095 use arrow::compute::SortOptions;
1096 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1097 use datafusion_common::config::ConfigOptions;
1098 use datafusion_common::stats::Precision;
1099 use datafusion_common::{ScalarValue, Statistics};
1100 use datafusion_expr::{Operator, ScalarUDF};
1101 use insta::assert_snapshot;
1102
1103 pub(crate) fn output_schema(
1104 mapping: &ProjectionMapping,
1105 input_schema: &Arc<Schema>,
1106 ) -> Result<SchemaRef> {
1107 let mut fields = vec![];
1109 for (source, targets) in mapping.iter() {
1110 let data_type = source.data_type(input_schema)?;
1111 let nullable = source.nullable(input_schema)?;
1112 for (target, _) in targets.iter() {
1113 let Some(column) = target.as_any().downcast_ref::<Column>() else {
1114 return plan_err!("Expects to have column");
1115 };
1116 fields.push(Field::new(column.name(), data_type.clone(), nullable));
1117 }
1118 }
1119
1120 let output_schema = Arc::new(Schema::new_with_metadata(
1121 fields,
1122 input_schema.metadata().clone(),
1123 ));
1124
1125 Ok(output_schema)
1126 }
1127
1128 #[test]
1129 fn project_orderings() -> Result<()> {
1130 let schema = Arc::new(Schema::new(vec![
1131 Field::new("a", DataType::Int32, true),
1132 Field::new("b", DataType::Int32, true),
1133 Field::new("c", DataType::Int32, true),
1134 Field::new("d", DataType::Int32, true),
1135 Field::new("e", DataType::Int32, true),
1136 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1137 ]));
1138 let col_a = &col("a", &schema)?;
1139 let col_b = &col("b", &schema)?;
1140 let col_c = &col("c", &schema)?;
1141 let col_d = &col("d", &schema)?;
1142 let col_e = &col("e", &schema)?;
1143 let col_ts = &col("ts", &schema)?;
1144 let a_plus_b = Arc::new(BinaryExpr::new(
1145 Arc::clone(col_a),
1146 Operator::Plus,
1147 Arc::clone(col_b),
1148 )) as Arc<dyn PhysicalExpr>;
1149 let b_plus_d = Arc::new(BinaryExpr::new(
1150 Arc::clone(col_b),
1151 Operator::Plus,
1152 Arc::clone(col_d),
1153 )) as Arc<dyn PhysicalExpr>;
1154 let b_plus_e = Arc::new(BinaryExpr::new(
1155 Arc::clone(col_b),
1156 Operator::Plus,
1157 Arc::clone(col_e),
1158 )) as Arc<dyn PhysicalExpr>;
1159 let c_plus_d = Arc::new(BinaryExpr::new(
1160 Arc::clone(col_c),
1161 Operator::Plus,
1162 Arc::clone(col_d),
1163 )) as Arc<dyn PhysicalExpr>;
1164
1165 let option_asc = SortOptions {
1166 descending: false,
1167 nulls_first: false,
1168 };
1169 let option_desc = SortOptions {
1170 descending: true,
1171 nulls_first: true,
1172 };
1173
1174 let test_cases = vec![
1175 (
1177 vec![
1179 vec![(col_b, option_asc)],
1181 ],
1182 vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
1184 vec![
1186 vec![("b_new", option_asc)],
1188 ],
1189 ),
1190 (
1192 vec![
1194 ],
1196 vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
1198 vec![
1200 ],
1202 ),
1203 (
1205 vec![
1207 vec![(col_ts, option_asc)],
1209 ],
1210 vec![
1212 (col_b, "b_new".to_string()),
1213 (col_a, "a_new".to_string()),
1214 (col_ts, "ts_new".to_string()),
1215 ],
1216 vec![
1218 vec![("ts_new", option_asc)],
1220 ],
1221 ),
1222 (
1224 vec![
1226 vec![(col_a, option_asc), (col_ts, option_asc)],
1228 vec![(col_b, option_asc), (col_ts, option_asc)],
1230 ],
1231 vec![
1233 (col_b, "b_new".to_string()),
1234 (col_a, "a_new".to_string()),
1235 (col_ts, "ts_new".to_string()),
1236 ],
1237 vec![
1239 vec![("a_new", option_asc), ("ts_new", option_asc)],
1241 vec![("b_new", option_asc), ("ts_new", option_asc)],
1243 ],
1244 ),
1245 (
1247 vec![
1249 vec![(&a_plus_b, option_asc)],
1251 ],
1252 vec![
1254 (col_b, "b_new".to_string()),
1255 (col_a, "a_new".to_string()),
1256 (&a_plus_b, "a+b".to_string()),
1257 ],
1258 vec![
1260 vec![("a+b", option_asc)],
1262 ],
1263 ),
1264 (
1266 vec![
1268 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1270 ],
1271 vec![
1273 (col_b, "b_new".to_string()),
1274 (col_a, "a_new".to_string()),
1275 (col_c, "c_new".to_string()),
1276 (&a_plus_b, "a+b".to_string()),
1277 ],
1278 vec![
1280 vec![("a+b", option_asc), ("c_new", option_asc)],
1282 ],
1283 ),
1284 (
1286 vec![
1287 vec![(col_a, option_asc), (col_b, option_asc)],
1289 vec![(col_a, option_asc), (col_d, option_asc)],
1291 ],
1292 vec![
1294 (col_b, "b_new".to_string()),
1295 (col_a, "a_new".to_string()),
1296 (col_d, "d_new".to_string()),
1297 (&b_plus_d, "b+d".to_string()),
1298 ],
1299 vec![
1301 vec![("a_new", option_asc), ("b_new", option_asc)],
1303 vec![("a_new", option_asc), ("d_new", option_asc)],
1305 vec![("a_new", option_asc), ("b+d", option_asc)],
1307 ],
1308 ),
1309 (
1311 vec![
1313 vec![(&b_plus_d, option_asc)],
1315 ],
1316 vec![
1318 (col_b, "b_new".to_string()),
1319 (col_a, "a_new".to_string()),
1320 (col_d, "d_new".to_string()),
1321 (&b_plus_d, "b+d".to_string()),
1322 ],
1323 vec![
1325 vec![("b+d", option_asc)],
1327 ],
1328 ),
1329 (
1331 vec![
1333 vec![
1335 (col_a, option_asc),
1336 (col_d, option_asc),
1337 (col_b, option_asc),
1338 ],
1339 vec![(col_c, option_asc)],
1341 ],
1342 vec![
1344 (col_b, "b_new".to_string()),
1345 (col_a, "a_new".to_string()),
1346 (col_d, "d_new".to_string()),
1347 (col_c, "c_new".to_string()),
1348 ],
1349 vec![
1351 vec![
1353 ("a_new", option_asc),
1354 ("d_new", option_asc),
1355 ("b_new", option_asc),
1356 ],
1357 vec![("c_new", option_asc)],
1359 ],
1360 ),
1361 (
1363 vec![
1364 vec![
1366 (col_a, option_asc),
1367 (col_b, option_asc),
1368 (col_c, option_asc),
1369 ],
1370 vec![(col_a, option_asc), (col_d, option_asc)],
1372 ],
1373 vec![
1375 (col_b, "b_new".to_string()),
1376 (col_a, "a_new".to_string()),
1377 (col_c, "c_new".to_string()),
1378 (&c_plus_d, "c+d".to_string()),
1379 ],
1380 vec![
1382 vec![
1384 ("a_new", option_asc),
1385 ("b_new", option_asc),
1386 ("c_new", option_asc),
1387 ],
1388 vec![
1390 ("a_new", option_asc),
1391 ("b_new", option_asc),
1392 ("c+d", option_asc),
1393 ],
1394 ],
1395 ),
1396 (
1398 vec![
1400 vec![(col_a, option_asc), (col_b, option_asc)],
1402 vec![(col_a, option_asc), (col_d, option_asc)],
1404 ],
1405 vec![
1407 (col_b, "b_new".to_string()),
1408 (col_a, "a_new".to_string()),
1409 (&b_plus_d, "b+d".to_string()),
1410 ],
1411 vec![
1413 vec![("a_new", option_asc), ("b_new", option_asc)],
1415 vec![("a_new", option_asc), ("b+d", option_asc)],
1417 ],
1418 ),
1419 (
1421 vec![
1423 vec![
1425 (col_a, option_asc),
1426 (col_b, option_asc),
1427 (col_c, option_asc),
1428 ],
1429 ],
1430 vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1432 vec![
1434 vec![("a_new", option_asc)],
1436 ],
1437 ),
1438 (
1440 vec![
1442 vec![
1444 (col_a, option_asc),
1445 (col_b, option_asc),
1446 (col_c, option_asc),
1447 ],
1448 vec![
1450 (col_a, option_asc),
1451 (&a_plus_b, option_asc),
1452 (col_c, option_asc),
1453 ],
1454 ],
1455 vec![
1457 (col_c, "c_new".to_string()),
1458 (col_b, "b_new".to_string()),
1459 (col_a, "a_new".to_string()),
1460 (&a_plus_b, "a+b".to_string()),
1461 ],
1462 vec![
1464 vec![
1466 ("a_new", option_asc),
1467 ("b_new", option_asc),
1468 ("c_new", option_asc),
1469 ],
1470 vec![
1472 ("a_new", option_asc),
1473 ("a+b", option_asc),
1474 ("c_new", option_asc),
1475 ],
1476 ],
1477 ),
1478 (
1480 vec![
1482 vec![(col_a, option_asc), (col_b, option_asc)],
1484 vec![(col_c, option_asc), (col_b, option_asc)],
1486 vec![(col_d, option_asc), (col_e, option_asc)],
1488 ],
1489 vec![
1491 (col_c, "c_new".to_string()),
1492 (col_d, "d_new".to_string()),
1493 (col_a, "a_new".to_string()),
1494 (&b_plus_e, "b+e".to_string()),
1495 ],
1496 vec![
1498 vec![
1500 ("a_new", option_asc),
1501 ("d_new", option_asc),
1502 ("b+e", option_asc),
1503 ],
1504 vec![
1506 ("d_new", option_asc),
1507 ("a_new", option_asc),
1508 ("b+e", option_asc),
1509 ],
1510 vec![
1512 ("c_new", option_asc),
1513 ("d_new", option_asc),
1514 ("b+e", option_asc),
1515 ],
1516 vec![
1518 ("d_new", option_asc),
1519 ("c_new", option_asc),
1520 ("b+e", option_asc),
1521 ],
1522 ],
1523 ),
1524 (
1526 vec![
1528 vec![
1530 (col_a, option_asc),
1531 (col_c, option_asc),
1532 (col_b, option_asc),
1533 ],
1534 ],
1535 vec![
1537 (col_c, "c_new".to_string()),
1538 (col_a, "a_new".to_string()),
1539 (&a_plus_b, "a+b".to_string()),
1540 ],
1541 vec![
1543 vec![
1545 ("a_new", option_asc),
1546 ("c_new", option_asc),
1547 ("a+b", option_asc),
1548 ],
1549 ],
1550 ),
1551 (
1553 vec![
1555 vec![(col_a, option_asc), (col_b, option_asc)],
1557 vec![(col_c, option_asc), (col_b, option_desc)],
1559 vec![(col_e, option_asc)],
1561 ],
1562 vec![
1564 (col_c, "c_new".to_string()),
1565 (col_a, "a_new".to_string()),
1566 (col_b, "b_new".to_string()),
1567 (&b_plus_e, "b+e".to_string()),
1568 ],
1569 vec![
1571 vec![("a_new", option_asc), ("b_new", option_asc)],
1573 vec![("a_new", option_asc), ("b+e", option_asc)],
1575 vec![("c_new", option_asc), ("b_new", option_desc)],
1577 ],
1578 ),
1579 ];
1580
1581 for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1582 {
1583 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1584
1585 let orderings = convert_to_orderings(&orderings);
1586 eq_properties.add_orderings(orderings);
1587
1588 let proj_exprs = proj_exprs
1589 .into_iter()
1590 .map(|(expr, name)| (Arc::clone(expr), name));
1591 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1592 let output_schema = output_schema(&projection_mapping, &schema)?;
1593
1594 let expected = expected
1595 .into_iter()
1596 .map(|ordering| {
1597 ordering
1598 .into_iter()
1599 .map(|(name, options)| {
1600 (col(name, &output_schema).unwrap(), options)
1601 })
1602 .collect::<Vec<_>>()
1603 })
1604 .collect::<Vec<_>>();
1605 let expected = convert_to_orderings(&expected);
1606
1607 let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1608 let orderings = projected_eq.oeq_class();
1609
1610 let err_msg = format!(
1611 "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1612 );
1613
1614 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1615 for expected_ordering in &expected {
1616 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1617 }
1618 }
1619
1620 Ok(())
1621 }
1622
1623 #[test]
1624 fn project_orderings2() -> Result<()> {
1625 let schema = Arc::new(Schema::new(vec![
1626 Field::new("a", DataType::Int32, true),
1627 Field::new("b", DataType::Int32, true),
1628 Field::new("c", DataType::Int32, true),
1629 Field::new("d", DataType::Int32, true),
1630 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1631 ]));
1632 let col_a = &col("a", &schema)?;
1633 let col_b = &col("b", &schema)?;
1634 let col_c = &col("c", &schema)?;
1635 let col_ts = &col("ts", &schema)?;
1636 let a_plus_b = Arc::new(BinaryExpr::new(
1637 Arc::clone(col_a),
1638 Operator::Plus,
1639 Arc::clone(col_b),
1640 )) as Arc<dyn PhysicalExpr>;
1641
1642 let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1643
1644 let round_c = Arc::new(ScalarFunctionExpr::try_new(
1645 test_fun,
1646 vec![Arc::clone(col_c)],
1647 &schema,
1648 Arc::new(ConfigOptions::default()),
1649 )?) as PhysicalExprRef;
1650
1651 let option_asc = SortOptions {
1652 descending: false,
1653 nulls_first: false,
1654 };
1655
1656 let proj_exprs = vec![
1657 (col_b, "b_new".to_string()),
1658 (col_a, "a_new".to_string()),
1659 (col_c, "c_new".to_string()),
1660 (&round_c, "round_c_res".to_string()),
1661 ];
1662 let proj_exprs = proj_exprs
1663 .into_iter()
1664 .map(|(expr, name)| (Arc::clone(expr), name));
1665 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1666 let output_schema = output_schema(&projection_mapping, &schema)?;
1667
1668 let col_a_new = &col("a_new", &output_schema)?;
1669 let col_b_new = &col("b_new", &output_schema)?;
1670 let col_c_new = &col("c_new", &output_schema)?;
1671 let col_round_c_res = &col("round_c_res", &output_schema)?;
1672 let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1673 Arc::clone(col_a_new),
1674 Operator::Plus,
1675 Arc::clone(col_b_new),
1676 )) as Arc<dyn PhysicalExpr>;
1677
1678 let test_cases = [
1679 (
1681 vec![
1683 vec![(col_a, option_asc)],
1685 ],
1686 vec![
1688 vec![(col_a_new, option_asc)],
1690 ],
1691 ),
1692 (
1694 vec![
1696 vec![(&a_plus_b, option_asc)],
1698 ],
1699 vec![
1701 vec![(&a_new_plus_b_new, option_asc)],
1703 ],
1704 ),
1705 (
1707 vec![
1709 vec![(col_a, option_asc), (col_ts, option_asc)],
1711 ],
1712 vec![
1714 vec![(col_a_new, option_asc)],
1716 ],
1717 ),
1718 (
1720 vec![
1722 vec![
1724 (col_a, option_asc),
1725 (col_ts, option_asc),
1726 (col_b, option_asc),
1727 ],
1728 ],
1729 vec![
1731 vec![(col_a_new, option_asc)],
1733 ],
1734 ),
1735 (
1737 vec![
1739 vec![(col_a, option_asc), (col_c, option_asc)],
1741 ],
1742 vec![
1744 vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1746 vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1748 ],
1749 ),
1750 (
1752 vec![
1754 vec![(col_c, option_asc), (col_b, option_asc)],
1756 ],
1757 vec![
1759 vec![(col_round_c_res, option_asc)],
1761 vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1763 ],
1764 ),
1765 (
1767 vec![
1769 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1771 ],
1772 vec![
1774 vec![
1776 (&a_new_plus_b_new, option_asc),
1777 (col_round_c_res, option_asc),
1778 ],
1779 vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1781 ],
1782 ),
1783 ];
1784
1785 for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1786 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1787
1788 let orderings = convert_to_orderings(orderings);
1789 eq_properties.add_orderings(orderings);
1790
1791 let expected = convert_to_orderings(expected);
1792
1793 let projected_eq =
1794 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1795 let orderings = projected_eq.oeq_class();
1796
1797 let err_msg = format!(
1798 "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1799 );
1800
1801 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1802 for expected_ordering in &expected {
1803 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1804 }
1805 }
1806 Ok(())
1807 }
1808
1809 #[test]
1810 fn project_orderings3() -> Result<()> {
1811 let schema = Arc::new(Schema::new(vec![
1812 Field::new("a", DataType::Int32, true),
1813 Field::new("b", DataType::Int32, true),
1814 Field::new("c", DataType::Int32, true),
1815 Field::new("d", DataType::Int32, true),
1816 Field::new("e", DataType::Int32, true),
1817 Field::new("f", DataType::Int32, true),
1818 ]));
1819 let col_a = &col("a", &schema)?;
1820 let col_b = &col("b", &schema)?;
1821 let col_c = &col("c", &schema)?;
1822 let col_d = &col("d", &schema)?;
1823 let col_e = &col("e", &schema)?;
1824 let col_f = &col("f", &schema)?;
1825 let a_plus_b = Arc::new(BinaryExpr::new(
1826 Arc::clone(col_a),
1827 Operator::Plus,
1828 Arc::clone(col_b),
1829 )) as Arc<dyn PhysicalExpr>;
1830
1831 let option_asc = SortOptions {
1832 descending: false,
1833 nulls_first: false,
1834 };
1835
1836 let proj_exprs = vec![
1837 (col_c, "c_new".to_string()),
1838 (col_d, "d_new".to_string()),
1839 (&a_plus_b, "a+b".to_string()),
1840 ];
1841 let proj_exprs = proj_exprs
1842 .into_iter()
1843 .map(|(expr, name)| (Arc::clone(expr), name));
1844 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1845 let output_schema = output_schema(&projection_mapping, &schema)?;
1846
1847 let col_a_plus_b_new = &col("a+b", &output_schema)?;
1848 let col_c_new = &col("c_new", &output_schema)?;
1849 let col_d_new = &col("d_new", &output_schema)?;
1850
1851 let test_cases = vec![
1852 (
1854 vec![
1856 vec![(col_d, option_asc), (col_b, option_asc)],
1858 vec![(col_c, option_asc), (col_a, option_asc)],
1860 ],
1861 vec![],
1863 vec![
1865 vec![
1867 (col_d_new, option_asc),
1868 (col_c_new, option_asc),
1869 (col_a_plus_b_new, option_asc),
1870 ],
1871 vec![
1873 (col_c_new, option_asc),
1874 (col_d_new, option_asc),
1875 (col_a_plus_b_new, option_asc),
1876 ],
1877 ],
1878 ),
1879 (
1881 vec![
1883 vec![(col_d, option_asc), (col_b, option_asc)],
1885 vec![(col_c, option_asc), (col_e, option_asc)],
1887 ],
1888 vec![(col_e, col_a)],
1890 vec![
1892 vec![
1894 (col_d_new, option_asc),
1895 (col_c_new, option_asc),
1896 (col_a_plus_b_new, option_asc),
1897 ],
1898 vec![
1900 (col_c_new, option_asc),
1901 (col_d_new, option_asc),
1902 (col_a_plus_b_new, option_asc),
1903 ],
1904 ],
1905 ),
1906 (
1908 vec![
1910 vec![(col_d, option_asc), (col_b, option_asc)],
1912 vec![(col_c, option_asc), (col_e, option_asc)],
1914 ],
1915 vec![(col_a, col_f)],
1917 vec![
1919 vec![(col_d_new, option_asc)],
1921 vec![(col_c_new, option_asc)],
1923 ],
1924 ),
1925 ];
1926 for (orderings, equal_columns, expected) in test_cases {
1927 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1928 for (lhs, rhs) in equal_columns {
1929 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
1930 }
1931
1932 let orderings = convert_to_orderings(&orderings);
1933 eq_properties.add_orderings(orderings);
1934
1935 let expected = convert_to_orderings(&expected);
1936
1937 let projected_eq =
1938 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1939 let orderings = projected_eq.oeq_class();
1940
1941 let err_msg = format!(
1942 "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1943 );
1944
1945 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1946 for expected_ordering in &expected {
1947 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1948 }
1949 }
1950
1951 Ok(())
1952 }
1953
1954 fn get_stats() -> Statistics {
1955 Statistics {
1956 num_rows: Precision::Exact(5),
1957 total_byte_size: Precision::Exact(23),
1958 column_statistics: vec![
1959 ColumnStatistics {
1960 distinct_count: Precision::Exact(5),
1961 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1962 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1963 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1964 null_count: Precision::Exact(0),
1965 byte_size: Precision::Absent,
1966 },
1967 ColumnStatistics {
1968 distinct_count: Precision::Exact(1),
1969 max_value: Precision::Exact(ScalarValue::from("x")),
1970 min_value: Precision::Exact(ScalarValue::from("a")),
1971 sum_value: Precision::Absent,
1972 null_count: Precision::Exact(3),
1973 byte_size: Precision::Absent,
1974 },
1975 ColumnStatistics {
1976 distinct_count: Precision::Absent,
1977 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1978 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1979 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1980 null_count: Precision::Absent,
1981 byte_size: Precision::Absent,
1982 },
1983 ],
1984 }
1985 }
1986
1987 fn get_schema() -> Schema {
1988 let field_0 = Field::new("col0", DataType::Int64, false);
1989 let field_1 = Field::new("col1", DataType::Utf8, false);
1990 let field_2 = Field::new("col2", DataType::Float32, false);
1991 Schema::new(vec![field_0, field_1, field_2])
1992 }
1993
1994 #[test]
1995 fn test_stats_projection_columns_only() {
1996 let source = get_stats();
1997 let schema = get_schema();
1998
1999 let projection = ProjectionExprs::new(vec![
2000 ProjectionExpr {
2001 expr: Arc::new(Column::new("col1", 1)),
2002 alias: "col1".to_string(),
2003 },
2004 ProjectionExpr {
2005 expr: Arc::new(Column::new("col0", 0)),
2006 alias: "col0".to_string(),
2007 },
2008 ]);
2009
2010 let result = projection
2011 .project_statistics(source, &projection.project_schema(&schema).unwrap())
2012 .unwrap();
2013
2014 let expected = Statistics {
2015 num_rows: Precision::Exact(5),
2016 total_byte_size: Precision::Inexact(23),
2019 column_statistics: vec![
2020 ColumnStatistics {
2021 distinct_count: Precision::Exact(1),
2022 max_value: Precision::Exact(ScalarValue::from("x")),
2023 min_value: Precision::Exact(ScalarValue::from("a")),
2024 sum_value: Precision::Absent,
2025 null_count: Precision::Exact(3),
2026 byte_size: Precision::Absent,
2027 },
2028 ColumnStatistics {
2029 distinct_count: Precision::Exact(5),
2030 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2031 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2032 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2033 null_count: Precision::Exact(0),
2034 byte_size: Precision::Absent,
2035 },
2036 ],
2037 };
2038
2039 assert_eq!(result, expected);
2040 }
2041
2042 #[test]
2043 fn test_stats_projection_column_with_primitive_width_only() {
2044 let source = get_stats();
2045 let schema = get_schema();
2046
2047 let projection = ProjectionExprs::new(vec![
2048 ProjectionExpr {
2049 expr: Arc::new(Column::new("col2", 2)),
2050 alias: "col2".to_string(),
2051 },
2052 ProjectionExpr {
2053 expr: Arc::new(Column::new("col0", 0)),
2054 alias: "col0".to_string(),
2055 },
2056 ]);
2057
2058 let result = projection
2059 .project_statistics(source, &projection.project_schema(&schema).unwrap())
2060 .unwrap();
2061
2062 let expected = Statistics {
2063 num_rows: Precision::Exact(5),
2064 total_byte_size: Precision::Exact(60),
2065 column_statistics: vec![
2066 ColumnStatistics {
2067 distinct_count: Precision::Absent,
2068 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
2069 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
2070 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
2071 null_count: Precision::Absent,
2072 byte_size: Precision::Absent,
2073 },
2074 ColumnStatistics {
2075 distinct_count: Precision::Exact(5),
2076 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2077 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2078 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2079 null_count: Precision::Exact(0),
2080 byte_size: Precision::Absent,
2081 },
2082 ],
2083 };
2084
2085 assert_eq!(result, expected);
2086 }
2087
2088 #[test]
2091 fn test_projection_new() -> Result<()> {
2092 let exprs = vec![
2093 ProjectionExpr {
2094 expr: Arc::new(Column::new("a", 0)),
2095 alias: "a".to_string(),
2096 },
2097 ProjectionExpr {
2098 expr: Arc::new(Column::new("b", 1)),
2099 alias: "b".to_string(),
2100 },
2101 ];
2102 let projection = ProjectionExprs::new(exprs.clone());
2103 assert_eq!(projection.as_ref().len(), 2);
2104 Ok(())
2105 }
2106
2107 #[test]
2108 fn test_projection_from_vec() -> Result<()> {
2109 let exprs = vec![ProjectionExpr {
2110 expr: Arc::new(Column::new("x", 0)),
2111 alias: "x".to_string(),
2112 }];
2113 let projection: ProjectionExprs = exprs.clone().into();
2114 assert_eq!(projection.as_ref().len(), 1);
2115 Ok(())
2116 }
2117
2118 #[test]
2119 fn test_projection_as_ref() -> Result<()> {
2120 let exprs = vec![
2121 ProjectionExpr {
2122 expr: Arc::new(Column::new("col1", 0)),
2123 alias: "col1".to_string(),
2124 },
2125 ProjectionExpr {
2126 expr: Arc::new(Column::new("col2", 1)),
2127 alias: "col2".to_string(),
2128 },
2129 ];
2130 let projection = ProjectionExprs::new(exprs);
2131 let as_ref: &[ProjectionExpr] = projection.as_ref();
2132 assert_eq!(as_ref.len(), 2);
2133 Ok(())
2134 }
2135
2136 #[test]
2137 fn test_column_indices_multiple_columns() -> Result<()> {
2138 let projection = ProjectionExprs::new(vec![
2140 ProjectionExpr {
2141 expr: Arc::new(Column::new("c", 5)),
2142 alias: "c".to_string(),
2143 },
2144 ProjectionExpr {
2145 expr: Arc::new(Column::new("b", 2)),
2146 alias: "b".to_string(),
2147 },
2148 ProjectionExpr {
2149 expr: Arc::new(Column::new("a", 0)),
2150 alias: "a".to_string(),
2151 },
2152 ]);
2153 assert_eq!(projection.column_indices(), vec![0, 2, 5]);
2155 Ok(())
2156 }
2157
2158 #[test]
2159 fn test_column_indices_duplicates() -> Result<()> {
2160 let projection = ProjectionExprs::new(vec![
2162 ProjectionExpr {
2163 expr: Arc::new(Column::new("a", 1)),
2164 alias: "a".to_string(),
2165 },
2166 ProjectionExpr {
2167 expr: Arc::new(Column::new("b", 3)),
2168 alias: "b".to_string(),
2169 },
2170 ProjectionExpr {
2171 expr: Arc::new(Column::new("a2", 1)), alias: "a2".to_string(),
2173 },
2174 ]);
2175 assert_eq!(projection.column_indices(), vec![1, 3]);
2176 Ok(())
2177 }
2178
2179 #[test]
2180 fn test_column_indices_unsorted() -> Result<()> {
2181 let projection = ProjectionExprs::new(vec![
2183 ProjectionExpr {
2184 expr: Arc::new(Column::new("c", 5)),
2185 alias: "c".to_string(),
2186 },
2187 ProjectionExpr {
2188 expr: Arc::new(Column::new("a", 1)),
2189 alias: "a".to_string(),
2190 },
2191 ProjectionExpr {
2192 expr: Arc::new(Column::new("b", 3)),
2193 alias: "b".to_string(),
2194 },
2195 ]);
2196 assert_eq!(projection.column_indices(), vec![1, 3, 5]);
2197 Ok(())
2198 }
2199
2200 #[test]
2201 fn test_column_indices_complex_expr() -> Result<()> {
2202 let expr = Arc::new(BinaryExpr::new(
2204 Arc::new(Column::new("a", 1)),
2205 Operator::Plus,
2206 Arc::new(Column::new("b", 4)),
2207 ));
2208 let projection = ProjectionExprs::new(vec![
2209 ProjectionExpr {
2210 expr,
2211 alias: "sum".to_string(),
2212 },
2213 ProjectionExpr {
2214 expr: Arc::new(Column::new("c", 2)),
2215 alias: "c".to_string(),
2216 },
2217 ]);
2218 assert_eq!(projection.column_indices(), vec![1, 2, 4]);
2220 Ok(())
2221 }
2222
2223 #[test]
2224 fn test_column_indices_empty() -> Result<()> {
2225 let projection = ProjectionExprs::new(vec![]);
2226 assert_eq!(projection.column_indices(), Vec::<usize>::new());
2227 Ok(())
2228 }
2229
2230 #[test]
2231 fn test_merge_simple_columns() -> Result<()> {
2232 let base_projection = ProjectionExprs::new(vec![
2234 ProjectionExpr {
2235 expr: Arc::new(Column::new("c", 2)),
2236 alias: "x".to_string(),
2237 },
2238 ProjectionExpr {
2239 expr: Arc::new(Column::new("b", 1)),
2240 alias: "y".to_string(),
2241 },
2242 ProjectionExpr {
2243 expr: Arc::new(Column::new("a", 0)),
2244 alias: "z".to_string(),
2245 },
2246 ]);
2247
2248 let top_projection = ProjectionExprs::new(vec![
2250 ProjectionExpr {
2251 expr: Arc::new(Column::new("y", 1)),
2252 alias: "col2".to_string(),
2253 },
2254 ProjectionExpr {
2255 expr: Arc::new(Column::new("x", 0)),
2256 alias: "col1".to_string(),
2257 },
2258 ]);
2259
2260 let merged = base_projection.try_merge(&top_projection)?;
2262 assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
2263
2264 Ok(())
2265 }
2266
2267 #[test]
2268 fn test_merge_with_expressions() -> Result<()> {
2269 let base_projection = ProjectionExprs::new(vec![
2271 ProjectionExpr {
2272 expr: Arc::new(Column::new("c", 2)),
2273 alias: "x".to_string(),
2274 },
2275 ProjectionExpr {
2276 expr: Arc::new(Column::new("b", 1)),
2277 alias: "y".to_string(),
2278 },
2279 ProjectionExpr {
2280 expr: Arc::new(Column::new("a", 0)),
2281 alias: "z".to_string(),
2282 },
2283 ]);
2284
2285 let top_projection = ProjectionExprs::new(vec![
2287 ProjectionExpr {
2288 expr: Arc::new(BinaryExpr::new(
2289 Arc::new(Column::new("y", 1)),
2290 Operator::Plus,
2291 Arc::new(Column::new("z", 2)),
2292 )),
2293 alias: "c2".to_string(),
2294 },
2295 ProjectionExpr {
2296 expr: Arc::new(BinaryExpr::new(
2297 Arc::new(Column::new("x", 0)),
2298 Operator::Plus,
2299 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2300 )),
2301 alias: "c1".to_string(),
2302 },
2303 ]);
2304
2305 let merged = base_projection.try_merge(&top_projection)?;
2307 assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
2308
2309 Ok(())
2310 }
2311
2312 #[test]
2313 fn try_merge_error() {
2314 let base = ProjectionExprs::new(vec![
2316 ProjectionExpr {
2317 expr: Arc::new(Column::new("a", 0)),
2318 alias: "x".to_string(),
2319 },
2320 ProjectionExpr {
2321 expr: Arc::new(Column::new("b", 1)),
2322 alias: "y".to_string(),
2323 },
2324 ]);
2325
2326 let top = ProjectionExprs::new(vec![ProjectionExpr {
2328 expr: Arc::new(Column::new("z", 5)), alias: "result".to_string(),
2330 }]);
2331
2332 let err_msg = base.try_merge(&top).unwrap_err().to_string();
2334 assert!(
2335 err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2336 "Unexpected error message: {err_msg}",
2337 );
2338 }
2339
2340 #[test]
2341 fn test_merge_empty_projection_with_literal() -> Result<()> {
2342 let base_projection = ProjectionExprs::new(vec![]);
2349
2350 let top_projection = ProjectionExprs::new(vec![ProjectionExpr {
2352 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2353 alias: "Int64(1)".to_string(),
2354 }]);
2355
2356 let merged = base_projection.try_merge(&top_projection)?;
2359 assert_snapshot!(format!("{merged}"), @"Projection[1 AS Int64(1)]");
2360
2361 Ok(())
2362 }
2363
2364 #[test]
2365 fn test_update_expr_with_literal() -> Result<()> {
2366 let literal_expr: Arc<dyn PhysicalExpr> =
2368 Arc::new(Literal::new(ScalarValue::Int64(Some(42))));
2369 let empty_projection: Vec<ProjectionExpr> = vec![];
2370
2371 let result = update_expr(&literal_expr, &empty_projection, true)?;
2373 assert!(result.is_some(), "Literal expression should be valid");
2374
2375 let result_expr = result.unwrap();
2376 assert_eq!(
2377 result_expr
2378 .as_any()
2379 .downcast_ref::<Literal>()
2380 .unwrap()
2381 .value(),
2382 &ScalarValue::Int64(Some(42))
2383 );
2384
2385 Ok(())
2386 }
2387
2388 #[test]
2389 fn test_update_expr_with_complex_literal_expr() -> Result<()> {
2390 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2393 Arc::new(Literal::new(ScalarValue::Int64(Some(10)))),
2394 Operator::Plus,
2395 Arc::new(Column::new("x", 0)),
2396 ));
2397
2398 let base_projection = vec![ProjectionExpr {
2400 expr: Arc::new(Column::new("a", 5)),
2401 alias: "x".to_string(),
2402 }];
2403
2404 let result = update_expr(&expr, &base_projection, true)?;
2406 assert!(result.is_some(), "Expression should be valid");
2407
2408 let result_expr = result.unwrap();
2409 let binary = result_expr
2410 .as_any()
2411 .downcast_ref::<BinaryExpr>()
2412 .expect("Should be a BinaryExpr");
2413
2414 assert!(binary.left().as_any().downcast_ref::<Literal>().is_some());
2416
2417 let right_col = binary
2419 .right()
2420 .as_any()
2421 .downcast_ref::<Column>()
2422 .expect("Right should be a Column");
2423 assert_eq!(right_col.index(), 5);
2424
2425 Ok(())
2426 }
2427
2428 #[test]
2429 fn test_project_schema_simple_columns() -> Result<()> {
2430 let input_schema = get_schema();
2432
2433 let projection = ProjectionExprs::new(vec![
2435 ProjectionExpr {
2436 expr: Arc::new(Column::new("col2", 2)),
2437 alias: "c".to_string(),
2438 },
2439 ProjectionExpr {
2440 expr: Arc::new(Column::new("col0", 0)),
2441 alias: "a".to_string(),
2442 },
2443 ]);
2444
2445 let output_schema = projection.project_schema(&input_schema)?;
2446
2447 assert_eq!(output_schema.fields().len(), 2);
2449
2450 assert_eq!(output_schema.field(0).name(), "c");
2452 assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2453
2454 assert_eq!(output_schema.field(1).name(), "a");
2456 assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2457
2458 Ok(())
2459 }
2460
2461 #[test]
2462 fn test_project_schema_with_expressions() -> Result<()> {
2463 let input_schema = get_schema();
2465
2466 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2468 expr: Arc::new(BinaryExpr::new(
2469 Arc::new(Column::new("col0", 0)),
2470 Operator::Plus,
2471 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2472 )),
2473 alias: "incremented".to_string(),
2474 }]);
2475
2476 let output_schema = projection.project_schema(&input_schema)?;
2477
2478 assert_eq!(output_schema.fields().len(), 1);
2480
2481 assert_eq!(output_schema.field(0).name(), "incremented");
2483 assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2484
2485 Ok(())
2486 }
2487
2488 #[test]
2489 fn test_project_schema_preserves_metadata() -> Result<()> {
2490 let mut metadata = HashMap::new();
2492 metadata.insert("key".to_string(), "value".to_string());
2493 let field_with_metadata =
2494 Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2495 let input_schema = Schema::new(vec![
2496 field_with_metadata,
2497 Field::new("col1", DataType::Utf8, false),
2498 ]);
2499
2500 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2502 expr: Arc::new(Column::new("col0", 0)),
2503 alias: "renamed".to_string(),
2504 }]);
2505
2506 let output_schema = projection.project_schema(&input_schema)?;
2507
2508 assert_eq!(output_schema.fields().len(), 1);
2510
2511 assert_eq!(output_schema.field(0).name(), "renamed");
2513 assert_eq!(output_schema.field(0).metadata(), &metadata);
2514
2515 Ok(())
2516 }
2517
2518 #[test]
2519 fn test_project_schema_empty() -> Result<()> {
2520 let input_schema = get_schema();
2521 let projection = ProjectionExprs::new(vec![]);
2522
2523 let output_schema = projection.project_schema(&input_schema)?;
2524
2525 assert_eq!(output_schema.fields().len(), 0);
2526
2527 Ok(())
2528 }
2529
2530 #[test]
2531 fn test_project_statistics_columns_only() -> Result<()> {
2532 let input_stats = get_stats();
2533 let input_schema = get_schema();
2534
2535 let projection = ProjectionExprs::new(vec![
2537 ProjectionExpr {
2538 expr: Arc::new(Column::new("col1", 1)),
2539 alias: "text".to_string(),
2540 },
2541 ProjectionExpr {
2542 expr: Arc::new(Column::new("col0", 0)),
2543 alias: "num".to_string(),
2544 },
2545 ]);
2546
2547 let output_stats = projection.project_statistics(
2548 input_stats,
2549 &projection.project_schema(&input_schema)?,
2550 )?;
2551
2552 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2554
2555 assert_eq!(output_stats.column_statistics.len(), 2);
2557
2558 assert_eq!(
2560 output_stats.column_statistics[0].distinct_count,
2561 Precision::Exact(1)
2562 );
2563 assert_eq!(
2564 output_stats.column_statistics[0].max_value,
2565 Precision::Exact(ScalarValue::from("x"))
2566 );
2567
2568 assert_eq!(
2570 output_stats.column_statistics[1].distinct_count,
2571 Precision::Exact(5)
2572 );
2573 assert_eq!(
2574 output_stats.column_statistics[1].max_value,
2575 Precision::Exact(ScalarValue::Int64(Some(21)))
2576 );
2577
2578 Ok(())
2579 }
2580
2581 #[test]
2582 fn test_project_statistics_with_expressions() -> Result<()> {
2583 let input_stats = get_stats();
2584 let input_schema = get_schema();
2585
2586 let projection = ProjectionExprs::new(vec![
2588 ProjectionExpr {
2589 expr: Arc::new(BinaryExpr::new(
2590 Arc::new(Column::new("col0", 0)),
2591 Operator::Plus,
2592 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2593 )),
2594 alias: "incremented".to_string(),
2595 },
2596 ProjectionExpr {
2597 expr: Arc::new(Column::new("col1", 1)),
2598 alias: "text".to_string(),
2599 },
2600 ]);
2601
2602 let output_stats = projection.project_statistics(
2603 input_stats,
2604 &projection.project_schema(&input_schema)?,
2605 )?;
2606
2607 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2609
2610 assert_eq!(output_stats.column_statistics.len(), 2);
2612
2613 assert_eq!(
2615 output_stats.column_statistics[0].distinct_count,
2616 Precision::Absent
2617 );
2618 assert_eq!(
2619 output_stats.column_statistics[0].max_value,
2620 Precision::Absent
2621 );
2622
2623 assert_eq!(
2625 output_stats.column_statistics[1].distinct_count,
2626 Precision::Exact(1)
2627 );
2628
2629 Ok(())
2630 }
2631
2632 #[test]
2633 fn test_project_statistics_primitive_width_only() -> Result<()> {
2634 let input_stats = get_stats();
2635 let input_schema = get_schema();
2636
2637 let projection = ProjectionExprs::new(vec![
2639 ProjectionExpr {
2640 expr: Arc::new(Column::new("col2", 2)),
2641 alias: "f".to_string(),
2642 },
2643 ProjectionExpr {
2644 expr: Arc::new(Column::new("col0", 0)),
2645 alias: "i".to_string(),
2646 },
2647 ]);
2648
2649 let output_stats = projection.project_statistics(
2650 input_stats,
2651 &projection.project_schema(&input_schema)?,
2652 )?;
2653
2654 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2656
2657 assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2660
2661 assert_eq!(output_stats.column_statistics.len(), 2);
2663
2664 Ok(())
2665 }
2666
2667 #[test]
2668 fn test_project_statistics_empty() -> Result<()> {
2669 let input_stats = get_stats();
2670 let input_schema = get_schema();
2671
2672 let projection = ProjectionExprs::new(vec![]);
2673
2674 let output_stats = projection.project_statistics(
2675 input_stats,
2676 &projection.project_schema(&input_schema)?,
2677 )?;
2678
2679 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2681
2682 assert_eq!(output_stats.column_statistics.len(), 0);
2684
2685 assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2687
2688 Ok(())
2689 }
2690
2691 #[test]
2693 fn test_project_statistics_with_literal() -> Result<()> {
2694 let input_stats = get_stats();
2695 let input_schema = get_schema();
2696
2697 let projection = ProjectionExprs::new(vec![
2699 ProjectionExpr {
2700 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(42)))),
2701 alias: "constant".to_string(),
2702 },
2703 ProjectionExpr {
2704 expr: Arc::new(Column::new("col0", 0)),
2705 alias: "num".to_string(),
2706 },
2707 ]);
2708
2709 let output_stats = projection.project_statistics(
2710 input_stats,
2711 &projection.project_schema(&input_schema)?,
2712 )?;
2713
2714 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2716
2717 assert_eq!(output_stats.column_statistics.len(), 2);
2719
2720 assert_eq!(
2722 output_stats.column_statistics[0].min_value,
2723 Precision::Exact(ScalarValue::Int64(Some(42)))
2724 );
2725 assert_eq!(
2726 output_stats.column_statistics[0].max_value,
2727 Precision::Exact(ScalarValue::Int64(Some(42)))
2728 );
2729 assert_eq!(
2730 output_stats.column_statistics[0].distinct_count,
2731 Precision::Exact(1)
2732 );
2733 assert_eq!(
2734 output_stats.column_statistics[0].null_count,
2735 Precision::Exact(0)
2736 );
2737 assert_eq!(
2739 output_stats.column_statistics[0].byte_size,
2740 Precision::Exact(40)
2741 );
2742 assert_eq!(
2744 output_stats.column_statistics[0].sum_value,
2745 Precision::Exact(ScalarValue::Int64(Some(210)))
2746 );
2747
2748 assert_eq!(
2750 output_stats.column_statistics[1].distinct_count,
2751 Precision::Exact(5)
2752 );
2753 assert_eq!(
2754 output_stats.column_statistics[1].max_value,
2755 Precision::Exact(ScalarValue::Int64(Some(21)))
2756 );
2757
2758 Ok(())
2759 }
2760
2761 #[test]
2763 fn test_project_statistics_with_null_literal() -> Result<()> {
2764 let input_stats = get_stats();
2765 let input_schema = get_schema();
2766
2767 let projection = ProjectionExprs::new(vec![
2769 ProjectionExpr {
2770 expr: Arc::new(Literal::new(ScalarValue::Int64(None))),
2771 alias: "null_col".to_string(),
2772 },
2773 ProjectionExpr {
2774 expr: Arc::new(Column::new("col0", 0)),
2775 alias: "num".to_string(),
2776 },
2777 ]);
2778
2779 let output_stats = projection.project_statistics(
2780 input_stats,
2781 &projection.project_schema(&input_schema)?,
2782 )?;
2783
2784 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2786
2787 assert_eq!(output_stats.column_statistics.len(), 2);
2789
2790 assert_eq!(
2792 output_stats.column_statistics[0].min_value,
2793 Precision::Exact(ScalarValue::Int64(None))
2794 );
2795 assert_eq!(
2796 output_stats.column_statistics[0].max_value,
2797 Precision::Exact(ScalarValue::Int64(None))
2798 );
2799 assert_eq!(
2800 output_stats.column_statistics[0].distinct_count,
2801 Precision::Exact(1) );
2803 assert_eq!(
2804 output_stats.column_statistics[0].null_count,
2805 Precision::Exact(5) );
2807 assert_eq!(
2808 output_stats.column_statistics[0].byte_size,
2809 Precision::Exact(0)
2810 );
2811 assert_eq!(
2812 output_stats.column_statistics[0].sum_value,
2813 Precision::Exact(ScalarValue::Int64(None))
2814 );
2815
2816 assert_eq!(
2818 output_stats.column_statistics[1].distinct_count,
2819 Precision::Exact(5)
2820 );
2821 assert_eq!(
2822 output_stats.column_statistics[1].max_value,
2823 Precision::Exact(ScalarValue::Int64(Some(21)))
2824 );
2825
2826 Ok(())
2827 }
2828
2829 #[test]
2831 fn test_project_statistics_with_complex_type_literal() -> Result<()> {
2832 let input_stats = get_stats();
2833 let input_schema = get_schema();
2834
2835 let projection = ProjectionExprs::new(vec![
2837 ProjectionExpr {
2838 expr: Arc::new(Literal::new(ScalarValue::Utf8(Some(
2839 "hello".to_string(),
2840 )))),
2841 alias: "text".to_string(),
2842 },
2843 ProjectionExpr {
2844 expr: Arc::new(Column::new("col0", 0)),
2845 alias: "num".to_string(),
2846 },
2847 ]);
2848
2849 let output_stats = projection.project_statistics(
2850 input_stats,
2851 &projection.project_schema(&input_schema)?,
2852 )?;
2853
2854 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2856
2857 assert_eq!(output_stats.column_statistics.len(), 2);
2859
2860 assert_eq!(
2863 output_stats.column_statistics[0].min_value,
2864 Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2865 );
2866 assert_eq!(
2867 output_stats.column_statistics[0].max_value,
2868 Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2869 );
2870 assert_eq!(
2871 output_stats.column_statistics[0].distinct_count,
2872 Precision::Exact(1)
2873 );
2874 assert_eq!(
2875 output_stats.column_statistics[0].null_count,
2876 Precision::Exact(0)
2877 );
2878 assert_eq!(
2881 output_stats.column_statistics[0].byte_size,
2882 Precision::Absent
2883 );
2884 assert_eq!(
2887 output_stats.column_statistics[0].sum_value,
2888 Precision::Absent
2889 );
2890
2891 assert_eq!(
2893 output_stats.column_statistics[1].distinct_count,
2894 Precision::Exact(5)
2895 );
2896 assert_eq!(
2897 output_stats.column_statistics[1].max_value,
2898 Precision::Exact(ScalarValue::Int64(Some(21)))
2899 );
2900
2901 Ok(())
2902 }
2903}