1use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27 array::{ArrayRef, BooleanArray, new_null_array},
28 datatypes::{DataType, Field, Schema, SchemaRef},
29 record_batch::{RecordBatch, RecordBatchOptions},
30};
31pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::{TransformedResult, TreeNodeRecursion};
39use datafusion_common::{Column, DFSchema, assert_eq_or_internal_err};
40use datafusion_common::{
41 ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err,
42 tree_node::{Transformed, TreeNode},
43};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::expressions::CastColumnExpr;
46use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
47use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
48use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
49use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
50
51#[derive(Debug, Clone)]
363pub struct PruningPredicate {
364 schema: SchemaRef,
366 predicate_expr: Arc<dyn PhysicalExpr>,
369 required_columns: RequiredColumns,
371 orig_expr: Arc<dyn PhysicalExpr>,
374 literal_guarantees: Vec<LiteralGuarantee>,
379}
380
381pub fn build_pruning_predicate(
387 predicate: Arc<dyn PhysicalExpr>,
388 file_schema: &SchemaRef,
389 predicate_creation_errors: &Count,
390) -> Option<Arc<PruningPredicate>> {
391 match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
392 Ok(pruning_predicate) => {
393 if !pruning_predicate.always_true() {
394 return Some(Arc::new(pruning_predicate));
395 }
396 }
397 Err(e) => {
398 debug!("Could not create pruning predicate for: {e}");
399 predicate_creation_errors.add(1);
400 }
401 }
402 None
403}
404
405pub trait UnhandledPredicateHook {
409 fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
412}
413
414#[derive(Debug, Clone)]
417struct ConstantUnhandledPredicateHook {
418 default: Arc<dyn PhysicalExpr>,
419}
420
421impl Default for ConstantUnhandledPredicateHook {
422 fn default() -> Self {
423 Self {
424 default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
425 }
426 }
427}
428
429impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
430 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
431 Arc::clone(&self.default)
432 }
433}
434
435impl PruningPredicate {
436 pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
465 let tf = snapshot_physical_expr_opt(expr)?;
469 if tf.transformed {
470 let simplifier = PhysicalExprSimplifier::new(&schema);
477 expr = simplifier.simplify(tf.data)?;
478 } else {
479 expr = tf.data;
480 }
481 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
482
483 let mut required_columns = RequiredColumns::new();
485 let predicate_expr = build_predicate_expression(
486 &expr,
487 &schema,
488 &mut required_columns,
489 &unhandled_hook,
490 );
491 let predicate_schema = required_columns.schema();
492 let predicate_expr =
494 PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
495 let literal_guarantees = LiteralGuarantee::analyze(&expr);
496
497 Ok(Self {
498 schema,
499 predicate_expr,
500 required_columns,
501 orig_expr: expr,
502 literal_guarantees,
503 })
504 }
505
506 pub fn prune<S: PruningStatistics + ?Sized>(
521 &self,
522 statistics: &S,
523 ) -> Result<Vec<bool>> {
524 let mut builder = BoolVecBuilder::new(statistics.num_containers());
525
526 for literal_guarantee in &self.literal_guarantees {
529 let LiteralGuarantee {
530 column,
531 guarantee,
532 literals,
533 } = literal_guarantee;
534 if let Some(results) = statistics.contained(column, literals) {
535 match guarantee {
536 Guarantee::In => builder.combine_array(&results),
541 Guarantee::NotIn => {
547 builder.combine_array(&arrow::compute::not(&results)?)
548 }
549 }
550 if builder.check_all_pruned() {
553 return Ok(builder.build());
554 }
555 }
556 }
557
558 let statistics_batch =
564 build_statistics_record_batch(statistics, &self.required_columns)?;
565
566 builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
568
569 Ok(builder.build())
570 }
571
572 pub fn schema(&self) -> &SchemaRef {
574 &self.schema
575 }
576
577 pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
579 &self.orig_expr
580 }
581
582 pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
584 &self.predicate_expr
585 }
586
587 pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
593 &self.literal_guarantees
594 }
595
596 pub fn always_true(&self) -> bool {
603 is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
604 }
605
606 pub fn required_columns(&self) -> &RequiredColumns {
607 &self.required_columns
608 }
609
610 pub fn literal_columns(&self) -> Vec<String> {
618 let mut seen = HashSet::new();
619 self.literal_guarantees
620 .iter()
621 .map(|e| &e.column.name)
622 .filter(|name| seen.insert(*name))
624 .map(|s| s.to_string())
625 .collect()
626 }
627}
628
629#[derive(Debug)]
631struct BoolVecBuilder {
632 inner: Vec<bool>,
636}
637
638impl BoolVecBuilder {
639 fn new(num_containers: usize) -> Self {
641 Self {
642 inner: vec![true; num_containers],
644 }
645 }
646
647 fn combine_array(&mut self, array: &BooleanArray) {
655 assert_eq!(array.len(), self.inner.len());
656 for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
657 if let Some(false) = new {
661 *cur = false;
662 }
663 }
664 }
665
666 fn combine_value(&mut self, value: ColumnarValue) {
672 match value {
673 ColumnarValue::Array(array) => {
674 self.combine_array(array.as_boolean());
675 }
676 ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
677 self.inner = vec![false; self.inner.len()];
679 }
680 _ => {
681 }
684 }
685 }
686
687 fn build(self) -> Vec<bool> {
689 self.inner
690 }
691
692 fn check_all_pruned(&self) -> bool {
694 self.inner.iter().all(|&x| !x)
695 }
696}
697
698fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
699 expr.as_any()
700 .downcast_ref::<phys_expr::Literal>()
701 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
702 .unwrap_or_default()
703}
704
705fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
706 expr.as_any()
707 .downcast_ref::<phys_expr::Literal>()
708 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
709 .unwrap_or_default()
710}
711
712#[derive(Debug, Default, Clone)]
722pub struct RequiredColumns {
723 columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
729}
730
731impl RequiredColumns {
732 fn new() -> Self {
733 Self::default()
734 }
735
736 pub fn single_column(&self) -> Option<&phys_expr::Column> {
745 if self.columns.windows(2).all(|w| {
746 let c1 = &w[0].0;
748 let c2 = &w[1].0;
749 c1 == c2
750 }) {
751 self.columns.first().map(|r| &r.0)
752 } else {
753 None
754 }
755 }
756
757 fn schema(&self) -> Schema {
764 let fields = self
765 .columns
766 .iter()
767 .map(|(_c, _t, f)| f.clone())
768 .collect::<Vec<_>>();
769 Schema::new(fields)
770 }
771
772 pub(crate) fn iter(
775 &self,
776 ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
777 self.columns.iter()
778 }
779
780 fn find_stat_column(
781 &self,
782 column: &phys_expr::Column,
783 statistics_type: StatisticsType,
784 ) -> Option<usize> {
785 match statistics_type {
786 StatisticsType::RowCount => {
787 self.columns
789 .iter()
790 .enumerate()
791 .find(|(_i, (_c, t, _f))| t == &statistics_type)
792 .map(|(i, (_c, _t, _f))| i)
793 }
794 _ => self
795 .columns
796 .iter()
797 .enumerate()
798 .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
799 .map(|(i, (_c, _t, _f))| i),
800 }
801 }
802
803 fn stat_column_expr(
812 &mut self,
813 column: &phys_expr::Column,
814 column_expr: &Arc<dyn PhysicalExpr>,
815 field: &Field,
816 stat_type: StatisticsType,
817 ) -> Result<Arc<dyn PhysicalExpr>> {
818 let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
819 Some(idx) => (idx, false),
820 None => (self.columns.len(), true),
821 };
822
823 let column_name = column.name();
824 let stat_column_name = match stat_type {
825 StatisticsType::Min => format!("{column_name}_min"),
826 StatisticsType::Max => format!("{column_name}_max"),
827 StatisticsType::NullCount => format!("{column_name}_null_count"),
828 StatisticsType::RowCount => "row_count".to_string(),
829 };
830
831 let stat_column = phys_expr::Column::new(&stat_column_name, idx);
832
833 if need_to_insert {
835 let nullable = true;
837 let stat_field =
838 Field::new(stat_column.name(), field.data_type().clone(), nullable);
839 self.columns.push((column.clone(), stat_type, stat_field));
840 }
841 rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
842 }
843
844 fn min_column_expr(
846 &mut self,
847 column: &phys_expr::Column,
848 column_expr: &Arc<dyn PhysicalExpr>,
849 field: &Field,
850 ) -> Result<Arc<dyn PhysicalExpr>> {
851 self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
852 }
853
854 fn max_column_expr(
856 &mut self,
857 column: &phys_expr::Column,
858 column_expr: &Arc<dyn PhysicalExpr>,
859 field: &Field,
860 ) -> Result<Arc<dyn PhysicalExpr>> {
861 self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
862 }
863
864 fn null_count_column_expr(
866 &mut self,
867 column: &phys_expr::Column,
868 column_expr: &Arc<dyn PhysicalExpr>,
869 field: &Field,
870 ) -> Result<Arc<dyn PhysicalExpr>> {
871 self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
872 }
873
874 fn row_count_column_expr(
876 &mut self,
877 column: &phys_expr::Column,
878 column_expr: &Arc<dyn PhysicalExpr>,
879 field: &Field,
880 ) -> Result<Arc<dyn PhysicalExpr>> {
881 self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
882 }
883}
884
885impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
886 fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
887 Self { columns }
888 }
889}
890
891fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
917 statistics: &S,
918 required_columns: &RequiredColumns,
919) -> Result<RecordBatch> {
920 let mut arrays = Vec::<ArrayRef>::new();
921 for (column, statistics_type, stat_field) in required_columns.iter() {
923 let column = Column::from_name(column.name());
924 let data_type = stat_field.data_type();
925
926 let num_containers = statistics.num_containers();
927
928 let array = match statistics_type {
929 StatisticsType::Min => statistics.min_values(&column),
930 StatisticsType::Max => statistics.max_values(&column),
931 StatisticsType::NullCount => statistics.null_counts(&column),
932 StatisticsType::RowCount => statistics.row_counts(&column),
933 };
934 let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
935
936 assert_eq_or_internal_err!(
937 num_containers,
938 array.len(),
939 "mismatched statistics length. Expected {}, got {}",
940 num_containers,
941 array.len()
942 );
943
944 let array = arrow::compute::cast(&array, data_type)?;
947
948 arrays.push(array);
949 }
950
951 let schema = Arc::new(required_columns.schema());
952 let mut options = RecordBatchOptions::default();
954 options.row_count = Some(statistics.num_containers());
955
956 trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
957
958 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
959 plan_datafusion_err!("Can not create statistics record batch: {err}")
960 })
961}
962
963struct PruningExpressionBuilder<'a> {
964 column: phys_expr::Column,
965 column_expr: Arc<dyn PhysicalExpr>,
966 op: Operator,
967 scalar_expr: Arc<dyn PhysicalExpr>,
968 field: &'a Field,
969 required_columns: &'a mut RequiredColumns,
970}
971
972impl<'a> PruningExpressionBuilder<'a> {
973 fn try_new(
974 left: &'a Arc<dyn PhysicalExpr>,
975 right: &'a Arc<dyn PhysicalExpr>,
976 left_columns: ColumnReferenceCount,
977 right_columns: ColumnReferenceCount,
978 op: Operator,
979 schema: &'a SchemaRef,
980 required_columns: &'a mut RequiredColumns,
981 ) -> Result<Self> {
982 let (column_expr, scalar_expr, column, correct_operator) = match (
984 left_columns,
985 right_columns,
986 ) {
987 (ColumnReferenceCount::One(column), ColumnReferenceCount::Zero) => {
988 (left, right, column, op)
989 }
990 (ColumnReferenceCount::Zero, ColumnReferenceCount::One(column)) => {
991 (right, left, column, reverse_operator(op)?)
992 }
993 (ColumnReferenceCount::One(_), ColumnReferenceCount::One(_)) => {
994 return plan_err!(
996 "Expression not supported for pruning: left has 1 column, right has 1 column"
997 );
998 }
999 (ColumnReferenceCount::Zero, ColumnReferenceCount::Zero) => {
1000 return plan_err!(
1002 "Pruning literal expressions is not supported, please call PhysicalExprSimplifier first"
1003 );
1004 }
1005 (ColumnReferenceCount::Many, _) | (_, ColumnReferenceCount::Many) => {
1006 return plan_err!(
1007 "Expression not supported for pruning: left or right has multiple columns"
1008 );
1009 }
1010 };
1011
1012 let df_schema = DFSchema::try_from(Arc::clone(schema))?;
1013 let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1014 column_expr,
1015 correct_operator,
1016 scalar_expr,
1017 df_schema,
1018 )?;
1019 let field = match schema.column_with_name(column.name()) {
1020 Some((_, f)) => f,
1021 _ => {
1022 return plan_err!("Field not found in schema");
1023 }
1024 };
1025
1026 Ok(Self {
1027 column,
1028 column_expr,
1029 op: correct_operator,
1030 scalar_expr,
1031 field,
1032 required_columns,
1033 })
1034 }
1035
1036 fn op(&self) -> Operator {
1037 self.op
1038 }
1039
1040 fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1041 &self.scalar_expr
1042 }
1043
1044 fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1045 self.required_columns
1046 .min_column_expr(&self.column, &self.column_expr, self.field)
1047 }
1048
1049 fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1050 self.required_columns
1051 .max_column_expr(&self.column, &self.column_expr, self.field)
1052 }
1053
1054 fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1061 let column_expr = Arc::new(self.column.clone()) as _;
1063
1064 let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1066
1067 self.required_columns.null_count_column_expr(
1068 &self.column,
1069 &column_expr,
1070 null_count_field,
1071 )
1072 }
1073
1074 fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1081 let column_expr = Arc::new(self.column.clone()) as _;
1083
1084 let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1086
1087 self.required_columns.row_count_column_expr(
1088 &self.column,
1089 &column_expr,
1090 row_count_field,
1091 )
1092 }
1093}
1094
1095fn rewrite_expr_to_prunable(
1108 column_expr: &PhysicalExprRef,
1109 op: Operator,
1110 scalar_expr: &PhysicalExprRef,
1111 schema: DFSchema,
1112) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1113 if !is_compare_op(op) {
1114 return plan_err!("rewrite_expr_to_prunable only support compare expression");
1115 }
1116
1117 let column_expr_any = column_expr.as_any();
1118
1119 if column_expr_any
1120 .downcast_ref::<phys_expr::Column>()
1121 .is_some()
1122 {
1123 Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1125 } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1126 let arrow_schema = schema.as_arrow();
1128 let from_type = cast.expr().data_type(arrow_schema)?;
1129 verify_support_type_for_prune(&from_type, cast.cast_type())?;
1130 let (left, op, right) =
1131 rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1132 let left = Arc::new(phys_expr::CastExpr::new(
1133 left,
1134 cast.cast_type().clone(),
1135 None,
1136 ));
1137 Ok((left, op, right))
1138 } else if let Some(cast_col) = column_expr_any.downcast_ref::<CastColumnExpr>() {
1139 let arrow_schema = schema.as_arrow();
1141 let from_type = cast_col.expr().data_type(arrow_schema)?;
1142 let to_type = cast_col.target_field().data_type();
1143 verify_support_type_for_prune(&from_type, to_type)?;
1144 let (left, op, right) =
1145 rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, schema)?;
1146 let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), None));
1151 Ok((left, op, right))
1152 } else if let Some(try_cast) =
1153 column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1154 {
1155 let arrow_schema = schema.as_arrow();
1157 let from_type = try_cast.expr().data_type(arrow_schema)?;
1158 verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1159 let (left, op, right) =
1160 rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1161 let left = Arc::new(phys_expr::TryCastExpr::new(
1162 left,
1163 try_cast.cast_type().clone(),
1164 ));
1165 Ok((left, op, right))
1166 } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1167 let (left, op, right) =
1169 rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1170 let right = Arc::new(phys_expr::NegativeExpr::new(right));
1171 Ok((left, reverse_operator(op)?, right))
1172 } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1173 if op != Operator::Eq && op != Operator::NotEq {
1175 return plan_err!("Not with operator other than Eq / NotEq is not supported");
1176 }
1177 if not
1178 .arg()
1179 .as_any()
1180 .downcast_ref::<phys_expr::Column>()
1181 .is_some()
1182 {
1183 let left = Arc::clone(not.arg());
1184 let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1185 Ok((left, reverse_operator(op)?, right))
1186 } else {
1187 plan_err!("Not with complex expression {column_expr:?} is not supported")
1188 }
1189 } else {
1190 plan_err!("column expression {column_expr:?} is not supported")
1191 }
1192}
1193
1194fn is_compare_op(op: Operator) -> bool {
1195 matches!(
1196 op,
1197 Operator::Eq
1198 | Operator::NotEq
1199 | Operator::Lt
1200 | Operator::LtEq
1201 | Operator::Gt
1202 | Operator::GtEq
1203 | Operator::LikeMatch
1204 | Operator::NotLikeMatch
1205 )
1206}
1207
1208fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1213 let from_type = match from_type {
1215 DataType::Dictionary(_, t) => {
1216 return verify_support_type_for_prune(t.as_ref(), to_type);
1217 }
1218 _ => from_type,
1219 };
1220 let to_type = match to_type {
1221 DataType::Dictionary(_, t) => {
1222 return verify_support_type_for_prune(from_type, t.as_ref());
1223 }
1224 _ => to_type,
1225 };
1226 if from_type.is_string() == to_type.is_string() {
1230 Ok(())
1231 } else {
1232 plan_err!(
1233 "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1234 )
1235 }
1236}
1237
1238fn rewrite_column_expr(
1240 e: Arc<dyn PhysicalExpr>,
1241 column_old: &phys_expr::Column,
1242 column_new: &phys_expr::Column,
1243) -> Result<Arc<dyn PhysicalExpr>> {
1244 e.transform(|expr| {
1245 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>()
1246 && column == column_old
1247 {
1248 return Ok(Transformed::yes(Arc::new(column_new.clone())));
1249 }
1250
1251 Ok(Transformed::no(expr))
1252 })
1253 .data()
1254}
1255
1256fn reverse_operator(op: Operator) -> Result<Operator> {
1257 op.swap().ok_or_else(|| {
1258 internal_datafusion_err!(
1259 "Could not reverse operator {op} while building pruning predicate"
1260 )
1261 })
1262}
1263
1264fn build_single_column_expr(
1269 column: &phys_expr::Column,
1270 schema: &Schema,
1271 required_columns: &mut RequiredColumns,
1272 is_not: bool, ) -> Option<Arc<dyn PhysicalExpr>> {
1274 let field = schema.field_with_name(column.name()).ok()?;
1275
1276 if *field.data_type() == DataType::Boolean {
1277 let col_ref = Arc::new(column.clone()) as _;
1278
1279 let min = required_columns
1280 .min_column_expr(column, &col_ref, field)
1281 .ok()?;
1282 let max = required_columns
1283 .max_column_expr(column, &col_ref, field)
1284 .ok()?;
1285
1286 if is_not {
1290 Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1293 phys_expr::BinaryExpr::new(min, Operator::And, max),
1294 ))))
1295 } else {
1296 Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1299 }
1300 } else {
1301 None
1302 }
1303}
1304
1305fn build_is_null_column_expr(
1314 expr: &Arc<dyn PhysicalExpr>,
1315 schema: &Schema,
1316 required_columns: &mut RequiredColumns,
1317 with_not: bool,
1318) -> Option<Arc<dyn PhysicalExpr>> {
1319 if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1320 let field = schema.field_with_name(col.name()).ok()?;
1321
1322 let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1323 if with_not {
1324 if let Ok(row_count_expr) =
1325 required_columns.row_count_column_expr(col, expr, null_count_field)
1326 {
1327 required_columns
1328 .null_count_column_expr(col, expr, null_count_field)
1329 .map(|null_count_column_expr| {
1330 Arc::new(phys_expr::BinaryExpr::new(
1332 null_count_column_expr,
1333 Operator::NotEq,
1334 row_count_expr,
1335 )) as _
1336 })
1337 .ok()
1338 } else {
1339 None
1340 }
1341 } else {
1342 required_columns
1343 .null_count_column_expr(col, expr, null_count_field)
1344 .map(|null_count_column_expr| {
1345 Arc::new(phys_expr::BinaryExpr::new(
1347 null_count_column_expr,
1348 Operator::Gt,
1349 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1350 )) as _
1351 })
1352 .ok()
1353 }
1354 } else {
1355 None
1356 }
1357}
1358
1359const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1362
1363pub struct PredicateRewriter {
1366 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1367}
1368
1369impl Default for PredicateRewriter {
1370 fn default() -> Self {
1371 Self {
1372 unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1373 }
1374 }
1375}
1376
1377impl PredicateRewriter {
1378 pub fn new() -> Self {
1380 Self::default()
1381 }
1382
1383 pub fn with_unhandled_hook(
1385 self,
1386 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1387 ) -> Self {
1388 Self { unhandled_hook }
1389 }
1390
1391 pub fn rewrite_predicate_to_statistics_predicate(
1401 &self,
1402 expr: &Arc<dyn PhysicalExpr>,
1403 schema: &Schema,
1404 ) -> Arc<dyn PhysicalExpr> {
1405 let mut required_columns = RequiredColumns::new();
1406 build_predicate_expression(
1407 expr,
1408 &Arc::new(schema.clone()),
1409 &mut required_columns,
1410 &self.unhandled_hook,
1411 )
1412 }
1413}
1414
1415fn build_predicate_expression(
1425 expr: &Arc<dyn PhysicalExpr>,
1426 schema: &SchemaRef,
1427 required_columns: &mut RequiredColumns,
1428 unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1429) -> Arc<dyn PhysicalExpr> {
1430 if is_always_false(expr) {
1431 return Arc::clone(expr);
1434 }
1435 let expr_any = expr.as_any();
1437 if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1438 return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1439 .unwrap_or_else(|| unhandled_hook.handle(expr));
1440 }
1441 if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1442 return build_is_null_column_expr(
1443 is_not_null.arg(),
1444 schema,
1445 required_columns,
1446 true,
1447 )
1448 .unwrap_or_else(|| unhandled_hook.handle(expr));
1449 }
1450 if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1451 return build_single_column_expr(col, schema, required_columns, false)
1452 .unwrap_or_else(|| unhandled_hook.handle(expr));
1453 }
1454 if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1455 if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1457 return build_single_column_expr(col, schema, required_columns, true)
1458 .unwrap_or_else(|| unhandled_hook.handle(expr));
1459 } else {
1460 return unhandled_hook.handle(expr);
1461 }
1462 }
1463 if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1464 if !in_list.list().is_empty()
1465 && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1466 {
1467 let eq_op = if in_list.negated() {
1468 Operator::NotEq
1469 } else {
1470 Operator::Eq
1471 };
1472 let re_op = if in_list.negated() {
1473 Operator::And
1474 } else {
1475 Operator::Or
1476 };
1477 let change_expr = in_list
1478 .list()
1479 .iter()
1480 .map(|e| {
1481 Arc::new(phys_expr::BinaryExpr::new(
1482 Arc::clone(in_list.expr()),
1483 eq_op,
1484 Arc::clone(e),
1485 )) as _
1486 })
1487 .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1488 .unwrap();
1489 return build_predicate_expression(
1490 &change_expr,
1491 schema,
1492 required_columns,
1493 unhandled_hook,
1494 );
1495 } else {
1496 return unhandled_hook.handle(expr);
1497 }
1498 }
1499
1500 let (left, op, right) = {
1501 if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1502 (
1503 Arc::clone(bin_expr.left()),
1504 *bin_expr.op(),
1505 Arc::clone(bin_expr.right()),
1506 )
1507 } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1508 if like_expr.case_insensitive() {
1509 return unhandled_hook.handle(expr);
1510 }
1511 let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1512 (false, false) => Operator::LikeMatch,
1513 (true, false) => Operator::NotLikeMatch,
1514 (false, true) => Operator::ILikeMatch,
1515 (true, true) => Operator::NotILikeMatch,
1516 };
1517 (
1518 Arc::clone(like_expr.expr()),
1519 op,
1520 Arc::clone(like_expr.pattern()),
1521 )
1522 } else {
1523 return unhandled_hook.handle(expr);
1524 }
1525 };
1526
1527 if op == Operator::And || op == Operator::Or {
1528 let left_expr =
1529 build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1530 let right_expr =
1531 build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1532 let expr = match (&left_expr, op, &right_expr) {
1534 (left, Operator::And, right)
1535 if is_always_false(left) || is_always_false(right) =>
1536 {
1537 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1538 }
1539 (left, Operator::And, _) if is_always_true(left) => right_expr,
1540 (_, Operator::And, right) if is_always_true(right) => left_expr,
1541 (left, Operator::Or, right)
1542 if is_always_true(left) || is_always_true(right) =>
1543 {
1544 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1545 }
1546 (left, Operator::Or, _) if is_always_false(left) => right_expr,
1547 (_, Operator::Or, right) if is_always_false(right) => left_expr,
1548
1549 _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1550 };
1551 return expr;
1552 }
1553
1554 let left_columns = ColumnReferenceCount::from_expression(&left);
1555 let right_columns = ColumnReferenceCount::from_expression(&right);
1556 let expr_builder = PruningExpressionBuilder::try_new(
1557 &left,
1558 &right,
1559 left_columns,
1560 right_columns,
1561 op,
1562 schema,
1563 required_columns,
1564 );
1565 let mut expr_builder = match expr_builder {
1566 Ok(builder) => builder,
1567 Err(e) => {
1570 debug!("Error building pruning expression: {e}");
1571 return unhandled_hook.handle(expr);
1572 }
1573 };
1574
1575 build_statistics_expr(&mut expr_builder)
1576 .unwrap_or_else(|_| unhandled_hook.handle(expr))
1577}
1578
1579#[derive(Debug, PartialEq, Eq)]
1589enum ColumnReferenceCount {
1590 Zero,
1592 One(phys_expr::Column),
1594 Many,
1596}
1597
1598impl ColumnReferenceCount {
1599 fn from_expression(expr: &Arc<dyn PhysicalExpr>) -> Self {
1601 let mut seen = HashSet::<phys_expr::Column>::new();
1602 expr.apply(|expr| {
1603 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1604 seen.insert(column.clone());
1605 if seen.len() > 1 {
1606 return Ok(TreeNodeRecursion::Stop);
1607 }
1608 }
1609 Ok(TreeNodeRecursion::Continue)
1610 })
1611 .expect("no way to return error during recursion");
1613 match seen.len() {
1614 0 => ColumnReferenceCount::Zero,
1615 1 => ColumnReferenceCount::One(
1616 seen.into_iter().next().expect("just checked len==1"),
1617 ),
1618 _ => ColumnReferenceCount::Many,
1619 }
1620 }
1621}
1622
1623fn build_statistics_expr(
1624 expr_builder: &mut PruningExpressionBuilder,
1625) -> Result<Arc<dyn PhysicalExpr>> {
1626 let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1627 Operator::NotEq => {
1628 let min_column_expr = expr_builder.min_column_expr()?;
1632 let max_column_expr = expr_builder.max_column_expr()?;
1633 Arc::new(phys_expr::BinaryExpr::new(
1634 Arc::new(phys_expr::BinaryExpr::new(
1635 min_column_expr,
1636 Operator::NotEq,
1637 Arc::clone(expr_builder.scalar_expr()),
1638 )),
1639 Operator::Or,
1640 Arc::new(phys_expr::BinaryExpr::new(
1641 Arc::clone(expr_builder.scalar_expr()),
1642 Operator::NotEq,
1643 max_column_expr,
1644 )),
1645 ))
1646 }
1647 Operator::Eq => {
1648 let min_column_expr = expr_builder.min_column_expr()?;
1651 let max_column_expr = expr_builder.max_column_expr()?;
1652 Arc::new(phys_expr::BinaryExpr::new(
1653 Arc::new(phys_expr::BinaryExpr::new(
1654 min_column_expr,
1655 Operator::LtEq,
1656 Arc::clone(expr_builder.scalar_expr()),
1657 )),
1658 Operator::And,
1659 Arc::new(phys_expr::BinaryExpr::new(
1660 Arc::clone(expr_builder.scalar_expr()),
1661 Operator::LtEq,
1662 max_column_expr,
1663 )),
1664 ))
1665 }
1666 Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1667 Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1668 plan_datafusion_err!(
1669 "LIKE expression with wildcard at the beginning is not supported"
1670 )
1671 })?,
1672 Operator::Gt => {
1673 Arc::new(phys_expr::BinaryExpr::new(
1675 expr_builder.max_column_expr()?,
1676 Operator::Gt,
1677 Arc::clone(expr_builder.scalar_expr()),
1678 ))
1679 }
1680 Operator::GtEq => {
1681 Arc::new(phys_expr::BinaryExpr::new(
1683 expr_builder.max_column_expr()?,
1684 Operator::GtEq,
1685 Arc::clone(expr_builder.scalar_expr()),
1686 ))
1687 }
1688 Operator::Lt => {
1689 Arc::new(phys_expr::BinaryExpr::new(
1691 expr_builder.min_column_expr()?,
1692 Operator::Lt,
1693 Arc::clone(expr_builder.scalar_expr()),
1694 ))
1695 }
1696 Operator::LtEq => {
1697 Arc::new(phys_expr::BinaryExpr::new(
1699 expr_builder.min_column_expr()?,
1700 Operator::LtEq,
1701 Arc::clone(expr_builder.scalar_expr()),
1702 ))
1703 }
1704 _ => {
1706 return plan_err!(
1707 "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1708 );
1709 }
1710 };
1711 let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1712 Ok(statistics_expr)
1713}
1714
1715fn unpack_string(s: &ScalarValue) -> Option<&str> {
1717 s.try_as_str().flatten()
1718}
1719
1720fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1721 if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1722 let s = unpack_string(lit.value())?;
1723 return Some(s);
1724 }
1725 None
1726}
1727
1728fn build_like_match(
1732 expr_builder: &mut PruningExpressionBuilder,
1733) -> Option<Arc<dyn PhysicalExpr>> {
1734 let min_column_expr = expr_builder.min_column_expr().ok()?;
1743 let max_column_expr = expr_builder.max_column_expr().ok()?;
1744 let scalar_expr = expr_builder.scalar_expr();
1745 let s = extract_string_literal(scalar_expr)?;
1747 let first_wildcard_index = s.find(['%', '_']);
1749 if first_wildcard_index == Some(0) {
1750 return None;
1752 }
1753 let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1754 let prefix = &s[..wildcard_index];
1755 let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1756 prefix.to_string(),
1757 ))));
1758 let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1759 increment_utf8(prefix)?,
1760 ))));
1761 (lower_bound_lit, upper_bound_lit)
1762 } else {
1763 let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1765 s.to_string(),
1766 ))));
1767 (Arc::clone(&bound), bound)
1768 };
1769 let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1770 lower_bound,
1771 Operator::LtEq,
1772 Arc::clone(&max_column_expr),
1773 ));
1774 let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1775 Arc::clone(&min_column_expr),
1776 Operator::LtEq,
1777 upper_bound,
1778 ));
1779 let combined = Arc::new(phys_expr::BinaryExpr::new(
1780 upper_bound_expr,
1781 Operator::And,
1782 lower_bound_expr,
1783 ));
1784 Some(combined)
1785}
1786
1787fn build_not_like_match(
1793 expr_builder: &mut PruningExpressionBuilder<'_>,
1794) -> Result<Arc<dyn PhysicalExpr>> {
1795 let min_column_expr = expr_builder.min_column_expr()?;
1798 let max_column_expr = expr_builder.max_column_expr()?;
1799
1800 let scalar_expr = expr_builder.scalar_expr();
1801
1802 let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1803 plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1804 })?;
1805
1806 let (const_prefix, remaining) = split_constant_prefix(pattern);
1807 if const_prefix.is_empty() || remaining != "%" {
1808 return Err(plan_datafusion_err!(
1820 "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1821 ));
1822 }
1823
1824 let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1825 true,
1826 false,
1827 Arc::clone(&min_column_expr),
1828 Arc::clone(scalar_expr),
1829 ));
1830
1831 let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1832 true,
1833 false,
1834 Arc::clone(&max_column_expr),
1835 Arc::clone(scalar_expr),
1836 ));
1837
1838 Ok(Arc::new(phys_expr::BinaryExpr::new(
1839 min_col_not_like_epxr,
1840 Operator::Or,
1841 max_col_not_like_expr,
1842 )))
1843}
1844
1845fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1847 let char_indices = pattern.char_indices().collect::<Vec<_>>();
1848 for i in 0..char_indices.len() {
1849 let (idx, char) = char_indices[i];
1850 if char == '%' || char == '_' {
1851 if i != 0 && char_indices[i - 1].1 == '\\' {
1852 continue;
1854 }
1855 return (&pattern[..idx], &pattern[idx..]);
1856 }
1857 }
1858 (pattern, "")
1859}
1860
1861fn increment_utf8(data: &str) -> Option<String> {
1869 fn is_valid_unicode(c: char) -> bool {
1871 let cp = c as u32;
1872
1873 if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1875 return false;
1876 }
1877
1878 if cp >= 0x110000 {
1880 return false;
1881 }
1882
1883 true
1884 }
1885
1886 let mut code_points: Vec<char> = data.chars().collect();
1888
1889 for idx in (0..code_points.len()).rev() {
1891 let original = code_points[idx] as u32;
1892
1893 if let Some(next_char) = char::from_u32(original + 1)
1895 && is_valid_unicode(next_char)
1896 {
1897 code_points[idx] = next_char;
1898 code_points.truncate(idx + 1);
1900 return Some(code_points.into_iter().collect());
1901 }
1902 }
1903
1904 None
1905}
1906
1907fn wrap_null_count_check_expr(
1928 statistics_expr: Arc<dyn PhysicalExpr>,
1929 expr_builder: &mut PruningExpressionBuilder,
1930) -> Result<Arc<dyn PhysicalExpr>> {
1931 let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1933 expr_builder.null_count_column_expr()?,
1934 Operator::NotEq,
1935 expr_builder.row_count_column_expr()?,
1936 ));
1937
1938 Ok(Arc::new(phys_expr::BinaryExpr::new(
1940 not_when_null_count_eq_row_count,
1941 Operator::And,
1942 statistics_expr,
1943 )))
1944}
1945
1946#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1947pub(crate) enum StatisticsType {
1948 Min,
1949 Max,
1950 NullCount,
1951 RowCount,
1952}
1953
1954#[cfg(test)]
1955mod tests {
1956 use std::collections::HashMap;
1957 use std::ops::{Not, Rem};
1958
1959 use super::*;
1960 use datafusion_common::test_util::batches_to_string;
1961 use datafusion_expr::{and, col, lit, or};
1962 use datafusion_physical_expr::utils::collect_columns;
1963 use insta::assert_snapshot;
1964
1965 use arrow::array::Decimal128Array;
1966 use arrow::{
1967 array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1968 datatypes::TimeUnit,
1969 };
1970 use datafusion_expr::expr::InList;
1971 use datafusion_expr::{Expr, cast, is_null, try_cast};
1972 use datafusion_functions_nested::expr_fn::{array_has, make_array};
1973 use datafusion_physical_expr::expressions::{
1974 self as phys_expr, DynamicFilterPhysicalExpr,
1975 };
1976 use datafusion_physical_expr::planner::logical2physical;
1977 use itertools::Itertools;
1978
1979 #[derive(Debug, Default)]
1980 struct ContainerStats {
1988 min: Option<ArrayRef>,
1989 max: Option<ArrayRef>,
1990 null_counts: Option<ArrayRef>,
1992 row_counts: Option<ArrayRef>,
1993 contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1997 }
1998
1999 impl ContainerStats {
2000 fn new() -> Self {
2001 Default::default()
2002 }
2003 fn new_decimal128(
2004 min: impl IntoIterator<Item = Option<i128>>,
2005 max: impl IntoIterator<Item = Option<i128>>,
2006 precision: u8,
2007 scale: i8,
2008 ) -> Self {
2009 Self::new()
2010 .with_min(Arc::new(
2011 min.into_iter()
2012 .collect::<Decimal128Array>()
2013 .with_precision_and_scale(precision, scale)
2014 .unwrap(),
2015 ))
2016 .with_max(Arc::new(
2017 max.into_iter()
2018 .collect::<Decimal128Array>()
2019 .with_precision_and_scale(precision, scale)
2020 .unwrap(),
2021 ))
2022 }
2023
2024 fn new_i64(
2025 min: impl IntoIterator<Item = Option<i64>>,
2026 max: impl IntoIterator<Item = Option<i64>>,
2027 ) -> Self {
2028 Self::new()
2029 .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
2030 .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
2031 }
2032
2033 fn new_i32(
2034 min: impl IntoIterator<Item = Option<i32>>,
2035 max: impl IntoIterator<Item = Option<i32>>,
2036 ) -> Self {
2037 Self::new()
2038 .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
2039 .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
2040 }
2041
2042 fn new_utf8<'a>(
2043 min: impl IntoIterator<Item = Option<&'a str>>,
2044 max: impl IntoIterator<Item = Option<&'a str>>,
2045 ) -> Self {
2046 Self::new()
2047 .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
2048 .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
2049 }
2050
2051 fn new_bool(
2052 min: impl IntoIterator<Item = Option<bool>>,
2053 max: impl IntoIterator<Item = Option<bool>>,
2054 ) -> Self {
2055 Self::new()
2056 .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
2057 .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
2058 }
2059
2060 fn min(&self) -> Option<ArrayRef> {
2061 self.min.clone()
2062 }
2063
2064 fn max(&self) -> Option<ArrayRef> {
2065 self.max.clone()
2066 }
2067
2068 fn null_counts(&self) -> Option<ArrayRef> {
2069 self.null_counts.clone()
2070 }
2071
2072 fn row_counts(&self) -> Option<ArrayRef> {
2073 self.row_counts.clone()
2074 }
2075
2076 fn arrays(&self) -> Vec<ArrayRef> {
2078 let contained_arrays = self
2079 .contained
2080 .iter()
2081 .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2082
2083 [
2084 self.min.as_ref().cloned(),
2085 self.max.as_ref().cloned(),
2086 self.null_counts.as_ref().cloned(),
2087 self.row_counts.as_ref().cloned(),
2088 ]
2089 .into_iter()
2090 .flatten()
2091 .chain(contained_arrays)
2092 .collect()
2093 }
2094
2095 fn len(&self) -> usize {
2099 self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2101 }
2102
2103 fn assert_invariants(&self) {
2105 let mut prev_len = None;
2106
2107 for len in self.arrays().iter().map(|a| a.len()) {
2108 match prev_len {
2110 None => {
2111 prev_len = Some(len);
2112 }
2113 Some(prev_len) => {
2114 assert_eq!(prev_len, len);
2115 }
2116 }
2117 }
2118 }
2119
2120 fn with_min(mut self, min: ArrayRef) -> Self {
2122 self.min = Some(min);
2123 self
2124 }
2125
2126 fn with_max(mut self, max: ArrayRef) -> Self {
2128 self.max = Some(max);
2129 self
2130 }
2131
2132 fn with_null_counts(
2135 mut self,
2136 counts: impl IntoIterator<Item = Option<u64>>,
2137 ) -> Self {
2138 let null_counts: ArrayRef =
2139 Arc::new(counts.into_iter().collect::<UInt64Array>());
2140
2141 self.assert_invariants();
2142 self.null_counts = Some(null_counts);
2143 self
2144 }
2145
2146 fn with_row_counts(
2149 mut self,
2150 counts: impl IntoIterator<Item = Option<u64>>,
2151 ) -> Self {
2152 let row_counts: ArrayRef =
2153 Arc::new(counts.into_iter().collect::<UInt64Array>());
2154
2155 self.assert_invariants();
2156 self.row_counts = Some(row_counts);
2157 self
2158 }
2159
2160 pub fn with_contained(
2162 mut self,
2163 values: impl IntoIterator<Item = ScalarValue>,
2164 contained: impl IntoIterator<Item = Option<bool>>,
2165 ) -> Self {
2166 let contained: BooleanArray = contained.into_iter().collect();
2167 let values: HashSet<_> = values.into_iter().collect();
2168
2169 self.contained.push((values, contained));
2170 self.assert_invariants();
2171 self
2172 }
2173
2174 fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2176 self.contained
2178 .iter()
2179 .find(|(values, _contained)| values == find_values)
2180 .map(|(_values, contained)| contained.clone())
2181 }
2182 }
2183
2184 #[derive(Debug, Default)]
2185 struct TestStatistics {
2186 stats: HashMap<Column, ContainerStats>,
2188 }
2189
2190 impl TestStatistics {
2191 fn new() -> Self {
2192 Self::default()
2193 }
2194
2195 fn with(
2196 mut self,
2197 name: impl Into<String>,
2198 container_stats: ContainerStats,
2199 ) -> Self {
2200 let col = Column::from_name(name.into());
2201 self.stats.insert(col, container_stats);
2202 self
2203 }
2204
2205 fn with_null_counts(
2209 mut self,
2210 name: impl Into<String>,
2211 counts: impl IntoIterator<Item = Option<u64>>,
2212 ) -> Self {
2213 let col = Column::from_name(name.into());
2214
2215 let container_stats = self
2217 .stats
2218 .remove(&col)
2219 .unwrap_or_default()
2220 .with_null_counts(counts);
2221
2222 self.stats.insert(col, container_stats);
2224 self
2225 }
2226
2227 fn with_row_counts(
2231 mut self,
2232 name: impl Into<String>,
2233 counts: impl IntoIterator<Item = Option<u64>>,
2234 ) -> Self {
2235 let col = Column::from_name(name.into());
2236
2237 let container_stats = self
2239 .stats
2240 .remove(&col)
2241 .unwrap_or_default()
2242 .with_row_counts(counts);
2243
2244 self.stats.insert(col, container_stats);
2246 self
2247 }
2248
2249 fn with_contained(
2251 mut self,
2252 name: impl Into<String>,
2253 values: impl IntoIterator<Item = ScalarValue>,
2254 contained: impl IntoIterator<Item = Option<bool>>,
2255 ) -> Self {
2256 let col = Column::from_name(name.into());
2257
2258 let container_stats = self
2260 .stats
2261 .remove(&col)
2262 .unwrap_or_default()
2263 .with_contained(values, contained);
2264
2265 self.stats.insert(col, container_stats);
2267 self
2268 }
2269 }
2270
2271 impl PruningStatistics for TestStatistics {
2272 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2273 self.stats
2274 .get(column)
2275 .map(|container_stats| container_stats.min())
2276 .unwrap_or(None)
2277 }
2278
2279 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2280 self.stats
2281 .get(column)
2282 .map(|container_stats| container_stats.max())
2283 .unwrap_or(None)
2284 }
2285
2286 fn num_containers(&self) -> usize {
2287 self.stats
2288 .values()
2289 .next()
2290 .map(|container_stats| container_stats.len())
2291 .unwrap_or(0)
2292 }
2293
2294 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2295 self.stats
2296 .get(column)
2297 .map(|container_stats| container_stats.null_counts())
2298 .unwrap_or(None)
2299 }
2300
2301 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2302 self.stats
2303 .get(column)
2304 .map(|container_stats| container_stats.row_counts())
2305 .unwrap_or(None)
2306 }
2307
2308 fn contained(
2309 &self,
2310 column: &Column,
2311 values: &HashSet<ScalarValue>,
2312 ) -> Option<BooleanArray> {
2313 self.stats
2314 .get(column)
2315 .and_then(|container_stats| container_stats.contained(values))
2316 }
2317 }
2318
2319 struct OneContainerStats {
2321 min_values: Option<ArrayRef>,
2322 max_values: Option<ArrayRef>,
2323 num_containers: usize,
2324 }
2325
2326 impl PruningStatistics for OneContainerStats {
2327 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2328 self.min_values.clone()
2329 }
2330
2331 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2332 self.max_values.clone()
2333 }
2334
2335 fn num_containers(&self) -> usize {
2336 self.num_containers
2337 }
2338
2339 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2340 None
2341 }
2342
2343 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2344 None
2345 }
2346
2347 fn contained(
2348 &self,
2349 _column: &Column,
2350 _values: &HashSet<ScalarValue>,
2351 ) -> Option<BooleanArray> {
2352 None
2353 }
2354 }
2355
2356 #[test]
2359 fn test_unique_row_count_field_and_column() {
2360 let schema: SchemaRef = Arc::new(Schema::new(vec![
2362 Field::new("c1", DataType::Int32, true),
2363 Field::new("c2", DataType::Int32, true),
2364 ]));
2365 let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2366 let expr = logical2physical(&expr, &schema);
2367 let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2368 assert_eq!(
2370 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2371 p.predicate_expr.to_string()
2372 );
2373
2374 let mut fields = HashSet::new();
2377 for (_col, _ty, field) in p.required_columns().iter() {
2378 let was_new = fields.insert(field);
2379 if !was_new {
2380 panic!(
2381 "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2382 );
2383 }
2384 }
2385 }
2386
2387 #[test]
2388 fn prune_all_rows_null_counts() {
2389 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2392 let statistics = TestStatistics::new().with(
2393 "i",
2394 ContainerStats::new_i32(
2395 vec![Some(0)], vec![Some(0)], )
2398 .with_null_counts(vec![Some(1)])
2399 .with_row_counts(vec![Some(1)]),
2400 );
2401 let expected_ret = &[false];
2402 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2403
2404 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2406 let container_stats = ContainerStats {
2407 min: Some(Arc::new(Int32Array::from(vec![None]))),
2408 max: Some(Arc::new(Int32Array::from(vec![None]))),
2409 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2410 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2411 ..ContainerStats::default()
2412 };
2413 let statistics = TestStatistics::new().with("i", container_stats);
2414 let expected_ret = &[false];
2415 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2416
2417 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2419 let container_stats = ContainerStats {
2420 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2421 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2422 null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2423 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2424 ..ContainerStats::default()
2425 };
2426 let statistics = TestStatistics::new().with("i", container_stats);
2427 let expected_ret = &[true];
2428 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2429 let expected_ret = &[false];
2430 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2431
2432 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2434 let container_stats = ContainerStats {
2435 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2436 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2437 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2438 row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2439 ..ContainerStats::default()
2440 };
2441 let statistics = TestStatistics::new().with("i", container_stats);
2442 let expected_ret = &[true];
2443 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2444 let expected_ret = &[false];
2445 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2446 }
2447
2448 #[test]
2449 fn prune_missing_statistics() {
2450 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2453 let container_stats = ContainerStats {
2454 min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2455 max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2456 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2457 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2458 ..ContainerStats::default()
2459 };
2460 let statistics = TestStatistics::new().with("i", container_stats);
2461 let expected_ret = &[true, true];
2462 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2463 let expected_ret = &[false, true];
2464 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2465 let expected_ret = &[true, false];
2466 prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2467 }
2468
2469 #[test]
2470 fn prune_null_stats() {
2471 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2474
2475 let statistics = TestStatistics::new().with(
2476 "i",
2477 ContainerStats::new_i32(
2478 vec![Some(0)], vec![Some(0)], )
2481 .with_null_counts(vec![Some(1)])
2482 .with_row_counts(vec![Some(1)]),
2483 );
2484
2485 let expected_ret = &[false];
2486
2487 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2489 }
2490
2491 #[test]
2492 fn test_build_statistics_record_batch() {
2493 let required_columns = RequiredColumns::from(vec![
2495 (
2497 phys_expr::Column::new("s1", 1),
2498 StatisticsType::Min,
2499 Field::new("s1_min", DataType::Int32, true),
2500 ),
2501 (
2503 phys_expr::Column::new("s2", 2),
2504 StatisticsType::Max,
2505 Field::new("s2_max", DataType::Int32, true),
2506 ),
2507 (
2509 phys_expr::Column::new("s3", 3),
2510 StatisticsType::Max,
2511 Field::new("s3_max", DataType::Utf8, true),
2512 ),
2513 (
2515 phys_expr::Column::new("s3", 3),
2516 StatisticsType::Min,
2517 Field::new("s3_min", DataType::Utf8, true),
2518 ),
2519 ]);
2520
2521 let statistics = TestStatistics::new()
2522 .with(
2523 "s1",
2524 ContainerStats::new_i32(
2525 vec![None, None, Some(9), None], vec![Some(10), None, None, None], ),
2528 )
2529 .with(
2530 "s2",
2531 ContainerStats::new_i32(
2532 vec![Some(2), None, None, None], vec![Some(20), None, None, None], ),
2535 )
2536 .with(
2537 "s3",
2538 ContainerStats::new_utf8(
2539 vec![Some("a"), None, None, None], vec![Some("q"), None, Some("r"), None], ),
2542 );
2543
2544 let batch =
2545 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2546 assert_snapshot!(batches_to_string(&[batch]), @r"
2547 +--------+--------+--------+--------+
2548 | s1_min | s2_max | s3_max | s3_min |
2549 +--------+--------+--------+--------+
2550 | | 20 | q | a |
2551 | | | | |
2552 | 9 | | r | |
2553 | | | | |
2554 +--------+--------+--------+--------+
2555 ");
2556 }
2557
2558 #[test]
2559 fn test_build_statistics_casting() {
2560 let required_columns = RequiredColumns::from(vec![(
2565 phys_expr::Column::new("s3", 3),
2566 StatisticsType::Min,
2567 Field::new(
2568 "s1_min",
2569 DataType::Timestamp(TimeUnit::Nanosecond, None),
2570 true,
2571 ),
2572 )]);
2573
2574 let statistics = OneContainerStats {
2576 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2577 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2578 num_containers: 1,
2579 };
2580
2581 let batch =
2582 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2583
2584 assert_snapshot!(batches_to_string(&[batch]), @r"
2585 +-------------------------------+
2586 | s1_min |
2587 +-------------------------------+
2588 | 1970-01-01T00:00:00.000000010 |
2589 +-------------------------------+
2590 ");
2591 }
2592
2593 #[test]
2594 fn test_build_statistics_no_required_stats() {
2595 let required_columns = RequiredColumns::new();
2596
2597 let statistics = OneContainerStats {
2598 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2599 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2600 num_containers: 1,
2601 };
2602
2603 let batch =
2604 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2605 assert_eq!(batch.num_rows(), 1); }
2607
2608 #[test]
2609 fn test_build_statistics_inconsistent_types() {
2610 let required_columns = RequiredColumns::from(vec![(
2614 phys_expr::Column::new("s3", 3),
2615 StatisticsType::Min,
2616 Field::new("s1_min", DataType::Utf8, true),
2617 )]);
2618
2619 let statistics = OneContainerStats {
2621 min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2622 max_values: None,
2623 num_containers: 1,
2624 };
2625
2626 let batch =
2627 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2628 assert_snapshot!(batches_to_string(&[batch]), @r"
2629 +--------+
2630 | s1_min |
2631 +--------+
2632 | |
2633 +--------+
2634 ");
2635 }
2636
2637 #[test]
2638 fn test_build_statistics_inconsistent_length() {
2639 let required_columns = RequiredColumns::from(vec![(
2641 phys_expr::Column::new("s1", 3),
2642 StatisticsType::Min,
2643 Field::new("s1_min", DataType::Int64, true),
2644 )]);
2645
2646 let statistics = OneContainerStats {
2648 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2649 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2650 num_containers: 3,
2651 };
2652
2653 let result =
2654 build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2655 assert!(
2656 result
2657 .to_string()
2658 .contains("mismatched statistics length. Expected 3, got 1"),
2659 "{}",
2660 result
2661 );
2662 }
2663
2664 #[test]
2665 fn row_group_predicate_eq() -> Result<()> {
2666 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2667 let expected_expr =
2668 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2669
2670 let expr = col("c1").eq(lit(1));
2672 let predicate_expr =
2673 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2674 assert_eq!(predicate_expr.to_string(), expected_expr);
2675
2676 let expr = lit(1).eq(col("c1"));
2678 let predicate_expr =
2679 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2680 assert_eq!(predicate_expr.to_string(), expected_expr);
2681
2682 Ok(())
2683 }
2684
2685 #[test]
2686 fn row_group_predicate_not_eq() -> Result<()> {
2687 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2688 let expected_expr =
2689 "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2690
2691 let expr = col("c1").not_eq(lit(1));
2693 let predicate_expr =
2694 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2695 assert_eq!(predicate_expr.to_string(), expected_expr);
2696
2697 let expr = lit(1).not_eq(col("c1"));
2699 let predicate_expr =
2700 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2701 assert_eq!(predicate_expr.to_string(), expected_expr);
2702
2703 Ok(())
2704 }
2705
2706 #[test]
2707 fn row_group_predicate_gt() -> Result<()> {
2708 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2709 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2710
2711 let expr = col("c1").gt(lit(1));
2713 let predicate_expr =
2714 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2715 assert_eq!(predicate_expr.to_string(), expected_expr);
2716
2717 let expr = lit(1).lt(col("c1"));
2719 let predicate_expr =
2720 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2721 assert_eq!(predicate_expr.to_string(), expected_expr);
2722
2723 Ok(())
2724 }
2725
2726 #[test]
2727 fn row_group_predicate_gt_eq() -> Result<()> {
2728 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2729 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2730
2731 let expr = col("c1").gt_eq(lit(1));
2733 let predicate_expr =
2734 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2735 assert_eq!(predicate_expr.to_string(), expected_expr);
2736 let expr = lit(1).lt_eq(col("c1"));
2738 let predicate_expr =
2739 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2740 assert_eq!(predicate_expr.to_string(), expected_expr);
2741
2742 Ok(())
2743 }
2744
2745 #[test]
2746 fn row_group_predicate_lt() -> Result<()> {
2747 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2748 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2749
2750 let expr = col("c1").lt(lit(1));
2752 let predicate_expr =
2753 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2754 assert_eq!(predicate_expr.to_string(), expected_expr);
2755
2756 let expr = lit(1).gt(col("c1"));
2758 let predicate_expr =
2759 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2760 assert_eq!(predicate_expr.to_string(), expected_expr);
2761
2762 Ok(())
2763 }
2764
2765 #[test]
2766 fn row_group_predicate_lt_eq() -> Result<()> {
2767 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2768 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2769
2770 let expr = col("c1").lt_eq(lit(1));
2772 let predicate_expr =
2773 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2774 assert_eq!(predicate_expr.to_string(), expected_expr);
2775 let expr = lit(1).gt_eq(col("c1"));
2777 let predicate_expr =
2778 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2779 assert_eq!(predicate_expr.to_string(), expected_expr);
2780
2781 Ok(())
2782 }
2783
2784 #[test]
2785 fn row_group_predicate_and() -> Result<()> {
2786 let schema = Schema::new(vec![
2787 Field::new("c1", DataType::Int32, false),
2788 Field::new("c2", DataType::Int32, false),
2789 Field::new("c3", DataType::Int32, false),
2790 ]);
2791 let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2793 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2794 let predicate_expr =
2795 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2796 assert_eq!(predicate_expr.to_string(), expected_expr);
2797
2798 Ok(())
2799 }
2800
2801 #[test]
2802 fn row_group_predicate_or() -> Result<()> {
2803 let schema = Schema::new(vec![
2804 Field::new("c1", DataType::Int32, false),
2805 Field::new("c2", DataType::Int32, false),
2806 ]);
2807 let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2809 let expected_expr = "true";
2810 let predicate_expr =
2811 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2812 assert_eq!(predicate_expr.to_string(), expected_expr);
2813
2814 Ok(())
2815 }
2816
2817 #[test]
2818 fn row_group_predicate_not() -> Result<()> {
2819 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2820 let expected_expr = "true";
2821
2822 let expr = col("c1").not();
2823 let predicate_expr =
2824 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2825 assert_eq!(predicate_expr.to_string(), expected_expr);
2826
2827 Ok(())
2828 }
2829
2830 #[test]
2831 fn row_group_predicate_not_bool() -> Result<()> {
2832 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2833 let expected_expr = "NOT c1_min@0 AND c1_max@1";
2834
2835 let expr = col("c1").not();
2836 let predicate_expr =
2837 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2838 assert_eq!(predicate_expr.to_string(), expected_expr);
2839
2840 Ok(())
2841 }
2842
2843 #[test]
2844 fn row_group_predicate_bool() -> Result<()> {
2845 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2846 let expected_expr = "c1_min@0 OR c1_max@1";
2847
2848 let expr = col("c1");
2849 let predicate_expr =
2850 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2851 assert_eq!(predicate_expr.to_string(), expected_expr);
2852
2853 Ok(())
2854 }
2855
2856 #[test]
2858 fn row_group_predicate_non_boolean() {
2859 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2860 let statistics = TestStatistics::new()
2861 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2862 let expected_ret = &[true];
2863 prune_with_expr(lit(1), &schema, &statistics, expected_ret);
2864 }
2865
2866 #[test]
2870 fn row_group_predicate_literal_false() {
2871 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2873 let statistics = TestStatistics::new()
2874 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2875 let expected_ret = &[false];
2876 prune_with_simplified_expr(lit(1).eq(lit(2)), &schema, &statistics, expected_ret);
2877 }
2878
2879 #[test]
2882 fn row_group_predicate_literal_true() {
2883 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2885 let statistics = TestStatistics::new()
2886 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2887 let expected_ret = &[true];
2888 prune_with_simplified_expr(lit(1).eq(lit(1)), &schema, &statistics, expected_ret);
2889 }
2890
2891 #[test]
2894 fn row_group_predicate_literal_null() {
2895 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2897 let statistics = TestStatistics::new()
2898 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2899 let expected_ret = &[true];
2900 prune_with_simplified_expr(
2901 lit(1).eq(lit(ScalarValue::Null)),
2902 &schema,
2903 &statistics,
2904 expected_ret,
2905 );
2906 }
2907
2908 #[test]
2911 fn row_group_predicate_complex_literals() {
2912 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2913 let statistics = TestStatistics::new()
2914 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2915
2916 prune_with_simplified_expr(
2918 (lit(1) + lit(2)).gt(lit(0)),
2919 &schema,
2920 &statistics,
2921 &[true],
2922 );
2923
2924 prune_with_simplified_expr(
2926 (lit(1) + lit(2)).lt(lit(0)),
2927 &schema,
2928 &statistics,
2929 &[false],
2930 );
2931
2932 prune_with_simplified_expr(
2934 lit(true).and(lit(false)),
2935 &schema,
2936 &statistics,
2937 &[false],
2938 );
2939
2940 prune_with_simplified_expr(
2942 lit(true).or(lit(false)),
2943 &schema,
2944 &statistics,
2945 &[true],
2946 );
2947
2948 prune_with_simplified_expr(
2950 lit(1).lt(lit(2)).and(lit(3).gt(lit(1))),
2951 &schema,
2952 &statistics,
2953 &[true],
2954 );
2955
2956 prune_with_simplified_expr(
2958 lit(1).gt(lit(2)).or(lit(3).lt(lit(1))),
2959 &schema,
2960 &statistics,
2961 &[false],
2962 );
2963 }
2964
2965 #[test]
2967 fn row_group_predicate_dynamic_filter_with_literals() {
2968 let schema = Arc::new(Schema::new(vec![
2969 Field::new("c1", DataType::Int32, true),
2970 Field::new("part", DataType::Utf8, true),
2971 ]));
2972 let statistics = TestStatistics::new()
2973 .with_row_counts("c1", vec![Some(10)]);
2975 let dynamic_filter_expr = col("c1").gt(lit(5)).and(col("part").eq(lit("B")));
2976 let phys_expr = logical2physical(&dynamic_filter_expr, &schema);
2977 let children = collect_columns(&phys_expr)
2978 .iter()
2979 .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
2980 .collect_vec();
2981 let dynamic_phys_expr =
2982 Arc::new(DynamicFilterPhysicalExpr::new(children, phys_expr))
2983 as Arc<dyn PhysicalExpr>;
2984 let remapped_expr = dynamic_phys_expr
2986 .children()
2987 .into_iter()
2988 .map(|child_expr| {
2989 let Some(col_expr) =
2990 child_expr.as_any().downcast_ref::<phys_expr::Column>()
2991 else {
2992 return Arc::clone(child_expr);
2993 };
2994 if col_expr.name() == "part" {
2995 Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
2997 "A".to_string(),
2998 )))) as Arc<dyn PhysicalExpr>
2999 } else {
3000 Arc::clone(child_expr)
3001 }
3002 })
3003 .collect_vec();
3004 let dynamic_filter_expr =
3005 dynamic_phys_expr.with_new_children(remapped_expr).unwrap();
3006 let expected = &[false];
3008 let p =
3009 PruningPredicate::try_new(dynamic_filter_expr, Arc::clone(&schema)).unwrap();
3010 let result = p.prune(&statistics).unwrap();
3011 assert_eq!(result, expected);
3012 }
3013
3014 #[test]
3015 fn row_group_predicate_lt_bool() -> Result<()> {
3016 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
3017 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
3018
3019 let expr = col("c1").lt(lit(true));
3022 let predicate_expr =
3023 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3024 assert_eq!(predicate_expr.to_string(), expected_expr);
3025
3026 Ok(())
3027 }
3028
3029 #[test]
3030 fn row_group_predicate_required_columns() -> Result<()> {
3031 let schema = Schema::new(vec![
3032 Field::new("c1", DataType::Int32, false),
3033 Field::new("c2", DataType::Int32, false),
3034 ]);
3035 let mut required_columns = RequiredColumns::new();
3036 let expr = col("c1")
3038 .lt(lit(1))
3039 .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
3040 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
3041 let predicate_expr =
3042 test_build_predicate_expression(&expr, &schema, &mut required_columns);
3043 assert_eq!(predicate_expr.to_string(), expected_expr);
3044 println!("required_columns: {required_columns:#?}"); let c1_min_field = Field::new("c1_min", DataType::Int32, false);
3047 assert_eq!(
3048 required_columns.columns[0],
3049 (
3050 phys_expr::Column::new("c1", 0),
3051 StatisticsType::Min,
3052 c1_min_field.with_nullable(true) )
3054 );
3055 let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
3057 assert_eq!(
3058 required_columns.columns[1],
3059 (
3060 phys_expr::Column::new("c1", 0),
3061 StatisticsType::NullCount,
3062 c1_null_count_field.with_nullable(true) )
3064 );
3065 let row_count_field = Field::new("row_count", DataType::UInt64, false);
3067 assert_eq!(
3068 required_columns.columns[2],
3069 (
3070 phys_expr::Column::new("c1", 0),
3071 StatisticsType::RowCount,
3072 row_count_field.with_nullable(true) )
3074 );
3075 let c2_min_field = Field::new("c2_min", DataType::Int32, false);
3077 assert_eq!(
3078 required_columns.columns[3],
3079 (
3080 phys_expr::Column::new("c2", 1),
3081 StatisticsType::Min,
3082 c2_min_field.with_nullable(true) )
3084 );
3085 let c2_max_field = Field::new("c2_max", DataType::Int32, false);
3086 assert_eq!(
3087 required_columns.columns[4],
3088 (
3089 phys_expr::Column::new("c2", 1),
3090 StatisticsType::Max,
3091 c2_max_field.with_nullable(true) )
3093 );
3094 let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
3096 assert_eq!(
3097 required_columns.columns[5],
3098 (
3099 phys_expr::Column::new("c2", 1),
3100 StatisticsType::NullCount,
3101 c2_null_count_field.with_nullable(true) )
3103 );
3104 let row_count_field = Field::new("row_count", DataType::UInt64, false);
3106 assert_eq!(
3107 required_columns.columns[2],
3108 (
3109 phys_expr::Column::new("c1", 0),
3110 StatisticsType::RowCount,
3111 row_count_field.with_nullable(true) )
3113 );
3114 assert_eq!(required_columns.columns.len(), 6);
3116
3117 Ok(())
3118 }
3119
3120 #[test]
3121 fn row_group_predicate_in_list() -> Result<()> {
3122 let schema = Schema::new(vec![
3123 Field::new("c1", DataType::Int32, false),
3124 Field::new("c2", DataType::Int32, false),
3125 ]);
3126 let expr = Expr::InList(InList::new(
3128 Box::new(col("c1")),
3129 vec![lit(1), lit(2), lit(3)],
3130 false,
3131 ));
3132 let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
3133 let predicate_expr =
3134 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3135 assert_eq!(predicate_expr.to_string(), expected_expr);
3136
3137 Ok(())
3138 }
3139
3140 #[test]
3141 fn row_group_predicate_in_list_empty() -> Result<()> {
3142 let schema = Schema::new(vec![
3143 Field::new("c1", DataType::Int32, false),
3144 Field::new("c2", DataType::Int32, false),
3145 ]);
3146 let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
3148 let expected_expr = "true";
3149 let predicate_expr =
3150 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3151 assert_eq!(predicate_expr.to_string(), expected_expr);
3152
3153 Ok(())
3154 }
3155
3156 #[test]
3157 fn row_group_predicate_in_list_negated() -> Result<()> {
3158 let schema = Schema::new(vec![
3159 Field::new("c1", DataType::Int32, false),
3160 Field::new("c2", DataType::Int32, false),
3161 ]);
3162 let expr = Expr::InList(InList::new(
3164 Box::new(col("c1")),
3165 vec![lit(1), lit(2), lit(3)],
3166 true,
3167 ));
3168 let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
3169 let predicate_expr =
3170 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3171 assert_eq!(predicate_expr.to_string(), expected_expr);
3172
3173 Ok(())
3174 }
3175
3176 #[test]
3177 fn row_group_predicate_between() -> Result<()> {
3178 let schema = Schema::new(vec![
3179 Field::new("c1", DataType::Int32, false),
3180 Field::new("c2", DataType::Int32, false),
3181 ]);
3182
3183 let expr1 = col("c1").between(lit(1), lit(5));
3185
3186 let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
3188
3189 let predicate_expr1 =
3190 test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
3191
3192 let predicate_expr2 =
3193 test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
3194 assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
3195
3196 Ok(())
3197 }
3198
3199 #[test]
3200 fn row_group_predicate_between_with_in_list() -> Result<()> {
3201 let schema = Schema::new(vec![
3202 Field::new("c1", DataType::Int32, false),
3203 Field::new("c2", DataType::Int32, false),
3204 ]);
3205 let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
3207
3208 let expr2 = col("c2").between(lit(4), lit(5));
3210
3211 let expr3 = expr1.and(expr2);
3213
3214 let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
3215 let predicate_expr =
3216 test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
3217 assert_eq!(predicate_expr.to_string(), expected_expr);
3218
3219 Ok(())
3220 }
3221
3222 #[test]
3223 fn row_group_predicate_in_list_to_many_values() -> Result<()> {
3224 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3225 let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
3229
3230 let expected_expr = "true";
3231 let predicate_expr =
3232 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3233 assert_eq!(predicate_expr.to_string(), expected_expr);
3234
3235 Ok(())
3236 }
3237
3238 #[test]
3239 fn row_group_predicate_cast_int_int() -> Result<()> {
3240 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3241 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3242
3243 let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3246 let predicate_expr =
3247 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3248 assert_eq!(predicate_expr.to_string(), expected_expr);
3249
3250 let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3252 let predicate_expr =
3253 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3254 assert_eq!(predicate_expr.to_string(), expected_expr);
3255
3256 let expected_expr =
3257 "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3258
3259 let expr =
3261 try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3262 let predicate_expr =
3263 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3264 assert_eq!(predicate_expr.to_string(), expected_expr);
3265
3266 let expr =
3268 lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3269 let predicate_expr =
3270 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3271 assert_eq!(predicate_expr.to_string(), expected_expr);
3272
3273 Ok(())
3274 }
3275
3276 #[test]
3277 fn row_group_predicate_cast_string_string() -> Result<()> {
3278 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3279 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3280
3281 let expr = cast(col("c1"), DataType::Utf8)
3283 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3284 let predicate_expr =
3285 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3286 assert_eq!(predicate_expr.to_string(), expected_expr);
3287
3288 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3290 .eq(cast(col("c1"), DataType::Utf8));
3291 let predicate_expr =
3292 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3293 assert_eq!(predicate_expr.to_string(), expected_expr);
3294
3295 Ok(())
3296 }
3297
3298 #[test]
3299 fn row_group_predicate_cast_string_int() -> Result<()> {
3300 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3301 let expected_expr = "true";
3302
3303 let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3305 let predicate_expr =
3306 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3307 assert_eq!(predicate_expr.to_string(), expected_expr);
3308
3309 let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3311 let predicate_expr =
3312 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3313 assert_eq!(predicate_expr.to_string(), expected_expr);
3314
3315 Ok(())
3316 }
3317
3318 #[test]
3319 fn row_group_predicate_cast_int_string() -> Result<()> {
3320 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3321 let expected_expr = "true";
3322
3323 let expr = cast(col("c1"), DataType::Utf8)
3325 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3326 let predicate_expr =
3327 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3328 assert_eq!(predicate_expr.to_string(), expected_expr);
3329
3330 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3332 .eq(cast(col("c1"), DataType::Utf8));
3333 let predicate_expr =
3334 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3335 assert_eq!(predicate_expr.to_string(), expected_expr);
3336
3337 Ok(())
3338 }
3339
3340 #[test]
3341 fn row_group_predicate_date_date() -> Result<()> {
3342 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3343 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3344
3345 let expr =
3347 cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3348 let predicate_expr =
3349 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3350 assert_eq!(predicate_expr.to_string(), expected_expr);
3351
3352 let expr =
3354 lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3355 let predicate_expr =
3356 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3357 assert_eq!(predicate_expr.to_string(), expected_expr);
3358
3359 Ok(())
3360 }
3361
3362 #[test]
3363 fn row_group_predicate_dict_string_date() -> Result<()> {
3364 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3366 let expected_expr = "true";
3367
3368 let expr = cast(
3370 col("c1"),
3371 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3372 )
3373 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3374 let predicate_expr =
3375 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3376 assert_eq!(predicate_expr.to_string(), expected_expr);
3377
3378 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3380 col("c1"),
3381 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3382 ));
3383 let predicate_expr =
3384 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3385 assert_eq!(predicate_expr.to_string(), expected_expr);
3386
3387 Ok(())
3388 }
3389
3390 #[test]
3391 fn row_group_predicate_date_dict_string() -> Result<()> {
3392 let schema = Schema::new(vec![Field::new(
3394 "c1",
3395 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3396 false,
3397 )]);
3398 let expected_expr = "true";
3399
3400 let expr =
3402 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3403 let predicate_expr =
3404 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3405 assert_eq!(predicate_expr.to_string(), expected_expr);
3406
3407 let expr =
3409 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3410 let predicate_expr =
3411 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3412 assert_eq!(predicate_expr.to_string(), expected_expr);
3413
3414 Ok(())
3415 }
3416
3417 #[test]
3418 fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3419 let schema = Schema::new(vec![Field::new(
3421 "c1",
3422 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3423 false,
3424 )]);
3425
3426 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3428 let predicate_expr =
3429 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3430 let expected_expr =
3431 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3432 assert_eq!(predicate_expr.to_string(), expected_expr);
3433
3434 let expr = cast(
3436 col("c1"),
3437 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3438 )
3439 .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3440 let predicate_expr =
3441 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3442 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3443 assert_eq!(predicate_expr.to_string(), expected_expr);
3444
3445 Ok(())
3446 }
3447
3448 #[test]
3449 fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3450 let schema = Schema::new(vec![Field::new(
3452 "c1",
3453 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3454 false,
3455 )]);
3456 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3457
3458 let expr =
3460 cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3461 let predicate_expr =
3462 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3463 assert_eq!(predicate_expr.to_string(), expected_expr);
3464
3465 Ok(())
3466 }
3467
3468 #[test]
3469 fn row_group_predicate_nested_dict() -> Result<()> {
3470 let schema = Schema::new(vec![Field::new(
3472 "c1",
3473 DataType::Dictionary(
3474 Box::new(DataType::UInt8),
3475 Box::new(DataType::Dictionary(
3476 Box::new(DataType::UInt16),
3477 Box::new(DataType::Utf8),
3478 )),
3479 ),
3480 false,
3481 )]);
3482 let expected_expr =
3483 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3484
3485 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3487 let predicate_expr =
3488 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3489 assert_eq!(predicate_expr.to_string(), expected_expr);
3490
3491 Ok(())
3492 }
3493
3494 #[test]
3495 fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3496 let schema = Schema::new(vec![Field::new(
3498 "c1",
3499 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3500 false,
3501 )]);
3502 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3503
3504 let expr = cast(
3506 col("c1"),
3507 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3508 )
3509 .eq(lit(ScalarValue::Date64(Some(123))));
3510 let predicate_expr =
3511 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3512 assert_eq!(predicate_expr.to_string(), expected_expr);
3513
3514 Ok(())
3515 }
3516
3517 #[test]
3518 fn row_group_predicate_date_string() -> Result<()> {
3519 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3520 let expected_expr = "true";
3521
3522 let expr =
3524 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3525 let predicate_expr =
3526 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3527 assert_eq!(predicate_expr.to_string(), expected_expr);
3528
3529 let expr =
3531 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3532 let predicate_expr =
3533 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3534 assert_eq!(predicate_expr.to_string(), expected_expr);
3535
3536 Ok(())
3537 }
3538
3539 #[test]
3540 fn row_group_predicate_string_date() -> Result<()> {
3541 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3542 let expected_expr = "true";
3543
3544 let expr = cast(col("c1"), DataType::Utf8)
3546 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3547 let predicate_expr =
3548 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3549 assert_eq!(predicate_expr.to_string(), expected_expr);
3550
3551 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3553 .eq(cast(col("c1"), DataType::Utf8));
3554 let predicate_expr =
3555 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3556 assert_eq!(predicate_expr.to_string(), expected_expr);
3557
3558 Ok(())
3559 }
3560
3561 #[test]
3562 fn row_group_predicate_cast_list() -> Result<()> {
3563 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3564 let expr = Expr::InList(InList::new(
3566 Box::new(cast(col("c1"), DataType::Int64)),
3567 vec![
3568 lit(ScalarValue::Int64(Some(1))),
3569 lit(ScalarValue::Int64(Some(2))),
3570 lit(ScalarValue::Int64(Some(3))),
3571 ],
3572 false,
3573 ));
3574 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3575 let predicate_expr =
3576 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3577 assert_eq!(predicate_expr.to_string(), expected_expr);
3578
3579 let expr = Expr::InList(InList::new(
3580 Box::new(cast(col("c1"), DataType::Int64)),
3581 vec![
3582 lit(ScalarValue::Int64(Some(1))),
3583 lit(ScalarValue::Int64(Some(2))),
3584 lit(ScalarValue::Int64(Some(3))),
3585 ],
3586 true,
3587 ));
3588 let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3589 let predicate_expr =
3590 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3591 assert_eq!(predicate_expr.to_string(), expected_expr);
3592
3593 Ok(())
3594 }
3595
3596 #[test]
3597 fn prune_decimal_data() {
3598 let schema = Arc::new(Schema::new(vec![Field::new(
3600 "s1",
3601 DataType::Decimal128(9, 2),
3602 true,
3603 )]));
3604
3605 prune_with_expr(
3606 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3608 &schema,
3609 &TestStatistics::new().with(
3612 "s1",
3613 ContainerStats::new_i32(
3614 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3617 ),
3618 &[false, true, false, true],
3619 );
3620
3621 prune_with_expr(
3622 cast(col("s1"), DataType::Decimal128(14, 3))
3624 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3625 &schema,
3626 &TestStatistics::new().with(
3627 "s1",
3628 ContainerStats::new_i32(
3629 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3632 ),
3633 &[false, true, false, true],
3634 );
3635
3636 prune_with_expr(
3637 try_cast(col("s1"), DataType::Decimal128(14, 3))
3639 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3640 &schema,
3641 &TestStatistics::new().with(
3642 "s1",
3643 ContainerStats::new_i32(
3644 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3647 ),
3648 &[false, true, false, true],
3649 );
3650
3651 let schema = Arc::new(Schema::new(vec![Field::new(
3653 "s1",
3654 DataType::Decimal128(18, 2),
3655 true,
3656 )]));
3657 prune_with_expr(
3658 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3660 &schema,
3661 &TestStatistics::new().with(
3664 "s1",
3665 ContainerStats::new_i64(
3666 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3669 ),
3670 &[false, true, false, true],
3671 );
3672
3673 let schema = Arc::new(Schema::new(vec![Field::new(
3675 "s1",
3676 DataType::Decimal128(23, 2),
3677 true,
3678 )]));
3679
3680 prune_with_expr(
3681 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3683 &schema,
3684 &TestStatistics::new().with(
3685 "s1",
3686 ContainerStats::new_decimal128(
3687 vec![Some(0), Some(400), None, Some(300)], vec![Some(500), Some(600), Some(400), None], 23,
3690 2,
3691 ),
3692 ),
3693 &[false, true, false, true],
3694 );
3695 }
3696
3697 #[test]
3698 fn prune_api() {
3699 let schema = Arc::new(Schema::new(vec![
3700 Field::new("s1", DataType::Utf8, true),
3701 Field::new("s2", DataType::Int32, true),
3702 ]));
3703
3704 let statistics = TestStatistics::new().with(
3705 "s2",
3706 ContainerStats::new_i32(
3707 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), None, None], ),
3710 );
3711 prune_with_expr(
3712 col("s2").gt(lit(5)),
3714 &schema,
3715 &statistics,
3716 &[false, true, true, true],
3721 );
3722
3723 prune_with_expr(
3724 cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3726 &schema,
3727 &statistics,
3728 &[false, true, true, true],
3729 );
3730 }
3731
3732 #[test]
3733 fn prune_not_eq_data() {
3734 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3735
3736 prune_with_expr(
3737 col("s1").not_eq(lit("M")),
3739 &schema,
3740 &TestStatistics::new().with(
3741 "s1",
3742 ContainerStats::new_utf8(
3743 vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], ),
3746 ),
3747 &[true, true, true, false, true, true],
3754 );
3755 }
3756
3757 fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3773 let schema =
3774 Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3775
3776 let statistics = TestStatistics::new().with(
3777 "b1",
3778 ContainerStats::new_bool(
3779 vec![Some(false), Some(false), Some(true), None, Some(false)], vec![Some(false), Some(true), Some(true), None, None], ),
3782 );
3783 let expected_true = vec![false, true, true, true, true];
3784 let expected_false = vec![true, true, false, true, true];
3785
3786 (schema, statistics, expected_true, expected_false)
3787 }
3788
3789 #[test]
3790 fn prune_bool_const_expr() {
3791 let (schema, statistics, _, _) = bool_setup();
3792
3793 prune_with_expr(
3794 lit(true),
3796 &schema,
3797 &statistics,
3798 &[true, true, true, true, true],
3799 );
3800
3801 prune_with_expr(
3802 lit(false),
3804 &schema,
3805 &statistics,
3806 &[false, false, false, false, false],
3807 );
3808 }
3809
3810 #[test]
3811 fn prune_bool_column() {
3812 let (schema, statistics, expected_true, _) = bool_setup();
3813
3814 prune_with_expr(
3815 col("b1"),
3817 &schema,
3818 &statistics,
3819 &expected_true,
3820 );
3821 }
3822
3823 #[test]
3824 fn prune_bool_not_column() {
3825 let (schema, statistics, _, expected_false) = bool_setup();
3826
3827 prune_with_expr(
3828 col("b1").not(),
3830 &schema,
3831 &statistics,
3832 &expected_false,
3833 );
3834 }
3835
3836 #[test]
3837 fn prune_bool_column_eq_true() {
3838 let (schema, statistics, expected_true, _) = bool_setup();
3839
3840 prune_with_expr(
3841 col("b1").eq(lit(true)),
3843 &schema,
3844 &statistics,
3845 &expected_true,
3846 );
3847 }
3848
3849 #[test]
3850 fn prune_bool_not_column_eq_true() {
3851 let (schema, statistics, _, expected_false) = bool_setup();
3852
3853 prune_with_expr(
3854 col("b1").not().eq(lit(true)),
3856 &schema,
3857 &statistics,
3858 &expected_false,
3859 );
3860 }
3861
3862 fn int32_setup() -> (SchemaRef, TestStatistics) {
3872 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3873
3874 let statistics = TestStatistics::new().with(
3875 "i",
3876 ContainerStats::new_i32(
3877 vec![Some(-5), Some(1), Some(-11), None, Some(1)], vec![Some(5), Some(11), Some(-1), None, None], ),
3880 );
3881 (schema, statistics)
3882 }
3883
3884 #[test]
3885 fn prune_int32_col_gt_zero() {
3886 let (schema, statistics) = int32_setup();
3887
3888 let expected_ret = &[true, true, false, true, true];
3895
3896 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3898
3899 prune_with_expr(
3901 Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3902 &schema,
3903 &statistics,
3904 expected_ret,
3905 );
3906 }
3907
3908 #[test]
3909 fn prune_int32_col_lte_zero() {
3910 let (schema, statistics) = int32_setup();
3911
3912 let expected_ret = &[true, false, true, true, false];
3919
3920 prune_with_expr(
3921 col("i").lt_eq(lit(0)),
3923 &schema,
3924 &statistics,
3925 expected_ret,
3926 );
3927
3928 prune_with_expr(
3929 Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3931 &schema,
3932 &statistics,
3933 expected_ret,
3934 );
3935 }
3936
3937 #[test]
3938 fn prune_int32_col_lte_zero_cast() {
3939 let (schema, statistics) = int32_setup();
3940
3941 let expected_ret = &[true, true, true, true, true];
3948
3949 prune_with_expr(
3950 cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3952 &schema,
3953 &statistics,
3954 expected_ret,
3955 );
3956
3957 prune_with_expr(
3958 try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3960 &schema,
3961 &statistics,
3962 expected_ret,
3963 );
3964
3965 prune_with_expr(
3966 cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3968 &schema,
3969 &statistics,
3970 expected_ret,
3971 );
3972
3973 prune_with_expr(
3974 try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3976 &schema,
3977 &statistics,
3978 expected_ret,
3979 );
3980 }
3981
3982 #[test]
3983 fn prune_int32_col_eq_zero() {
3984 let (schema, statistics) = int32_setup();
3985
3986 let expected_ret = &[true, false, false, true, false];
3993
3994 prune_with_expr(
3995 col("i").eq(lit(0)),
3997 &schema,
3998 &statistics,
3999 expected_ret,
4000 );
4001 }
4002
4003 #[test]
4004 fn prune_int32_col_eq_zero_cast() {
4005 let (schema, statistics) = int32_setup();
4006
4007 let expected_ret = &[true, false, false, true, false];
4014
4015 prune_with_expr(
4016 cast(col("i"), DataType::Int64).eq(lit(0i64)),
4017 &schema,
4018 &statistics,
4019 expected_ret,
4020 );
4021
4022 prune_with_expr(
4023 try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
4024 &schema,
4025 &statistics,
4026 expected_ret,
4027 );
4028 }
4029
4030 #[test]
4031 fn prune_int32_col_eq_zero_cast_as_str() {
4032 let (schema, statistics) = int32_setup();
4033
4034 let expected_ret = &[true, true, true, true, true];
4044
4045 prune_with_expr(
4046 cast(col("i"), DataType::Utf8).eq(lit("0")),
4047 &schema,
4048 &statistics,
4049 expected_ret,
4050 );
4051 }
4052
4053 #[test]
4054 fn prune_int32_col_lt_neg_one() {
4055 let (schema, statistics) = int32_setup();
4056
4057 let expected_ret = &[true, true, false, true, true];
4064
4065 prune_with_expr(
4066 col("i").gt(lit(-1)),
4068 &schema,
4069 &statistics,
4070 expected_ret,
4071 );
4072
4073 prune_with_expr(
4074 Expr::Negative(Box::new(col("i"))).lt(lit(1)),
4076 &schema,
4077 &statistics,
4078 expected_ret,
4079 );
4080 }
4081
4082 #[test]
4083 fn prune_int32_is_null() {
4084 let (schema, statistics) = int32_setup();
4085
4086 let expected_ret = &[true, true, true, true, true];
4089
4090 prune_with_expr(
4091 col("i").is_null(),
4093 &schema,
4094 &statistics,
4095 expected_ret,
4096 );
4097
4098 let statistics = statistics.with_null_counts(
4100 "i",
4101 vec![
4102 Some(0), Some(1), None, None, Some(0), ],
4108 );
4109
4110 let expected_ret = &[false, true, true, true, false];
4111
4112 prune_with_expr(
4113 col("i").is_null(),
4115 &schema,
4116 &statistics,
4117 expected_ret,
4118 );
4119 }
4120
4121 #[test]
4122 fn prune_int32_column_is_known_all_null() {
4123 let (schema, statistics) = int32_setup();
4124
4125 let expected_ret = &[true, false, true, true, false];
4132
4133 prune_with_expr(
4134 col("i").lt(lit(0)),
4136 &schema,
4137 &statistics,
4138 expected_ret,
4139 );
4140
4141 let statistics = statistics.with_row_counts(
4143 "i",
4144 vec![
4145 Some(10), Some(9), None, Some(4),
4149 Some(10),
4150 ],
4151 );
4152
4153 prune_with_expr(
4155 col("i").lt(lit(0)),
4157 &schema,
4158 &statistics,
4159 expected_ret,
4160 );
4161
4162 let statistics = statistics.with_null_counts(
4164 "i",
4165 vec![
4166 Some(0), Some(1), None, Some(4), Some(0), ],
4172 );
4173
4174 let expected_ret = &[true, false, true, false, false];
4183
4184 prune_with_expr(
4185 col("i").lt(lit(0)),
4187 &schema,
4188 &statistics,
4189 expected_ret,
4190 );
4191 }
4192
4193 #[test]
4194 fn prune_cast_column_scalar() {
4195 let (schema, statistics) = int32_setup();
4197 let expected_ret = &[true, true, false, true, true];
4198
4199 prune_with_expr(
4200 col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
4202 &schema,
4203 &statistics,
4204 expected_ret,
4205 );
4206
4207 prune_with_expr(
4208 cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4210 &schema,
4211 &statistics,
4212 expected_ret,
4213 );
4214
4215 prune_with_expr(
4216 try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4218 &schema,
4219 &statistics,
4220 expected_ret,
4221 );
4222
4223 prune_with_expr(
4224 Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
4226 .lt(lit(ScalarValue::Int64(Some(0)))),
4227 &schema,
4228 &statistics,
4229 expected_ret,
4230 );
4231 }
4232
4233 #[test]
4234 fn test_increment_utf8() {
4235 assert_eq!(increment_utf8("abc").unwrap(), "abd");
4237 assert_eq!(increment_utf8("abz").unwrap(), "ab{");
4238
4239 assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); assert_eq!(increment_utf8("ß").unwrap(), "à"); assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); assert!(increment_utf8("").is_none());
4255 assert!(increment_utf8("\u{10FFFF}").is_none()); assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4259
4260 assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4262 assert!(increment_utf8("\u{D7FF}").is_none());
4263
4264 assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4266 assert!(increment_utf8("\u{FDCF}").is_none());
4267
4268 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4270 assert!(increment_utf8("\u{10FFFF}").is_none()); }
4272
4273 fn utf8_setup() -> (SchemaRef, TestStatistics) {
4286 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4287
4288 let statistics = TestStatistics::new().with(
4289 "s1",
4290 ContainerStats::new_utf8(
4291 vec![
4292 Some("A"),
4293 Some("A"),
4294 Some("N"),
4295 Some("M"),
4296 None,
4297 Some("A"),
4298 Some(""),
4299 Some(""),
4300 Some("AB"),
4301 Some("A\u{10ffff}\u{10ffff}"),
4302 ], vec![
4304 Some("Z"),
4305 Some("L"),
4306 Some("Z"),
4307 Some("M"),
4308 None,
4309 None,
4310 Some("A"),
4311 Some(""),
4312 Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4313 Some("A\u{10ffff}\u{10ffff}"),
4314 ], ),
4316 );
4317 (schema, statistics)
4318 }
4319
4320 #[test]
4321 fn prune_utf8_eq() {
4322 let (schema, statistics) = utf8_setup();
4323
4324 let expr = col("s1").eq(lit("A"));
4325 #[rustfmt::skip]
4326 let expected_ret = &[
4327 true,
4329 true,
4331 false,
4333 false,
4335 true,
4337 true,
4339 true,
4341 false,
4343 false,
4345 false,
4347 ];
4348 prune_with_expr(expr, &schema, &statistics, expected_ret);
4349
4350 let expr = col("s1").eq(lit(""));
4351 #[rustfmt::skip]
4352 let expected_ret = &[
4353 false,
4355 false,
4357 false,
4359 false,
4361 true,
4363 false,
4365 true,
4367 true,
4369 false,
4371 false,
4373 ];
4374 prune_with_expr(expr, &schema, &statistics, expected_ret);
4375 }
4376
4377 #[test]
4378 fn prune_utf8_not_eq() {
4379 let (schema, statistics) = utf8_setup();
4380
4381 let expr = col("s1").not_eq(lit("A"));
4382 #[rustfmt::skip]
4383 let expected_ret = &[
4384 true,
4386 true,
4388 true,
4390 true,
4392 true,
4394 true,
4396 true,
4398 true,
4400 true,
4402 true,
4404 ];
4405 prune_with_expr(expr, &schema, &statistics, expected_ret);
4406
4407 let expr = col("s1").not_eq(lit(""));
4408 #[rustfmt::skip]
4409 let expected_ret = &[
4410 true,
4412 true,
4414 true,
4416 true,
4418 true,
4420 true,
4422 true,
4424 false,
4426 true,
4428 true,
4430 ];
4431 prune_with_expr(expr, &schema, &statistics, expected_ret);
4432 }
4433
4434 #[test]
4435 fn prune_utf8_like_one() {
4436 let (schema, statistics) = utf8_setup();
4437
4438 let expr = col("s1").like(lit("A_"));
4439 #[rustfmt::skip]
4440 let expected_ret = &[
4441 true,
4443 true,
4445 false,
4447 false,
4449 true,
4451 true,
4453 true,
4455 false,
4457 true,
4459 true,
4461 ];
4462 prune_with_expr(expr, &schema, &statistics, expected_ret);
4463
4464 let expr = col("s1").like(lit("_A_"));
4465 #[rustfmt::skip]
4466 let expected_ret = &[
4467 true,
4469 true,
4471 true,
4473 true,
4475 true,
4477 true,
4479 true,
4481 true,
4483 true,
4485 true,
4487 ];
4488 prune_with_expr(expr, &schema, &statistics, expected_ret);
4489
4490 let expr = col("s1").like(lit("_"));
4491 #[rustfmt::skip]
4492 let expected_ret = &[
4493 true,
4495 true,
4497 true,
4499 true,
4501 true,
4503 true,
4505 true,
4507 true,
4509 true,
4511 true,
4513 ];
4514 prune_with_expr(expr, &schema, &statistics, expected_ret);
4515
4516 let expr = col("s1").like(lit(""));
4517 #[rustfmt::skip]
4518 let expected_ret = &[
4519 false,
4521 false,
4523 false,
4525 false,
4527 true,
4529 false,
4531 true,
4533 true,
4535 false,
4537 false,
4539 ];
4540 prune_with_expr(expr, &schema, &statistics, expected_ret);
4541 }
4542
4543 #[test]
4544 fn prune_utf8_like_many() {
4545 let (schema, statistics) = utf8_setup();
4546
4547 let expr = col("s1").like(lit("A%"));
4548 #[rustfmt::skip]
4549 let expected_ret = &[
4550 true,
4552 true,
4554 false,
4556 false,
4558 true,
4560 true,
4562 true,
4564 false,
4566 true,
4568 true,
4570 ];
4571 prune_with_expr(expr, &schema, &statistics, expected_ret);
4572
4573 let expr = col("s1").like(lit("%A%"));
4574 #[rustfmt::skip]
4575 let expected_ret = &[
4576 true,
4578 true,
4580 true,
4582 true,
4584 true,
4586 true,
4588 true,
4590 true,
4592 true,
4594 true,
4596 ];
4597 prune_with_expr(expr, &schema, &statistics, expected_ret);
4598
4599 let expr = col("s1").like(lit("%"));
4600 #[rustfmt::skip]
4601 let expected_ret = &[
4602 true,
4604 true,
4606 true,
4608 true,
4610 true,
4612 true,
4614 true,
4616 true,
4618 true,
4620 true,
4622 ];
4623 prune_with_expr(expr, &schema, &statistics, expected_ret);
4624
4625 let expr = col("s1").like(lit(""));
4626 #[rustfmt::skip]
4627 let expected_ret = &[
4628 false,
4630 false,
4632 false,
4634 false,
4636 true,
4638 false,
4640 true,
4642 true,
4644 false,
4646 false,
4648 ];
4649 prune_with_expr(expr, &schema, &statistics, expected_ret);
4650 }
4651
4652 #[test]
4653 fn prune_utf8_not_like_one() {
4654 let (schema, statistics) = utf8_setup();
4655
4656 let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4657 #[rustfmt::skip]
4658 let expected_ret = &[
4659 true,
4661 true,
4663 true,
4665 true,
4667 true,
4669 true,
4671 true,
4673 true,
4675 true,
4677 true,
4680 ];
4681 prune_with_expr(expr, &schema, &statistics, expected_ret);
4682 }
4683
4684 #[test]
4685 fn prune_utf8_not_like_many() {
4686 let (schema, statistics) = utf8_setup();
4687
4688 let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4689 #[rustfmt::skip]
4690 let expected_ret = &[
4691 true,
4693 true,
4695 true,
4697 true,
4699 true,
4701 true,
4703 true,
4705 true,
4707 true,
4709 false,
4711 ];
4712 prune_with_expr(expr, &schema, &statistics, expected_ret);
4713
4714 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4715 #[rustfmt::skip]
4716 let expected_ret = &[
4717 true,
4719 true,
4721 true,
4723 true,
4725 true,
4727 true,
4729 true,
4731 true,
4733 true,
4735 true,
4737 ];
4738 prune_with_expr(expr, &schema, &statistics, expected_ret);
4739
4740 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4741 #[rustfmt::skip]
4742 let expected_ret = &[
4743 true,
4745 true,
4747 true,
4749 true,
4751 true,
4753 true,
4755 true,
4757 true,
4759 true,
4761 true,
4763 ];
4764 prune_with_expr(expr, &schema, &statistics, expected_ret);
4765
4766 let expr = col("s1").not_like(lit("A\\%%"));
4767 let statistics = TestStatistics::new().with(
4768 "s1",
4769 ContainerStats::new_utf8(
4770 vec![Some("A%a"), Some("A")],
4771 vec![Some("A%c"), Some("A")],
4772 ),
4773 );
4774 let expected_ret = &[false, true];
4775 prune_with_expr(expr, &schema, &statistics, expected_ret);
4776 }
4777
4778 #[test]
4779 fn test_rewrite_expr_to_prunable() {
4780 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4781 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4782
4783 let left_input = col("a");
4785 let left_input = logical2physical(&left_input, &schema);
4786 let right_input = lit(ScalarValue::Int32(Some(12)));
4787 let right_input = logical2physical(&right_input, &schema);
4788 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4789 &left_input,
4790 Operator::Eq,
4791 &right_input,
4792 df_schema.clone(),
4793 )
4794 .unwrap();
4795 assert_eq!(result_left.to_string(), left_input.to_string());
4796 assert_eq!(result_right.to_string(), right_input.to_string());
4797
4798 let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4800 let left_input = logical2physical(&left_input, &schema);
4801 let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4802 let right_input = logical2physical(&right_input, &schema);
4803 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4804 &left_input,
4805 Operator::Gt,
4806 &right_input,
4807 df_schema.clone(),
4808 )
4809 .unwrap();
4810 assert_eq!(result_left.to_string(), left_input.to_string());
4811 assert_eq!(result_right.to_string(), right_input.to_string());
4812
4813 let left_input = try_cast(col("a"), DataType::Int64);
4815 let left_input = logical2physical(&left_input, &schema);
4816 let right_input = lit(ScalarValue::Int64(Some(12)));
4817 let right_input = logical2physical(&right_input, &schema);
4818 let (result_left, _, result_right) =
4819 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4820 .unwrap();
4821 assert_eq!(result_left.to_string(), left_input.to_string());
4822 assert_eq!(result_right.to_string(), right_input.to_string());
4823
4824 }
4826
4827 #[test]
4828 fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4829 struct CustomUnhandledHook;
4830
4831 impl UnhandledPredicateHook for CustomUnhandledHook {
4832 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4836 Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4837 }
4838 }
4839
4840 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4841 let schema_with_b = Schema::new(vec![
4842 Field::new("a", DataType::Int32, true),
4843 Field::new("b", DataType::Int32, true),
4844 ]);
4845
4846 let rewriter = PredicateRewriter::new()
4847 .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4848
4849 let transform_expr = |expr| {
4850 let expr = logical2physical(&expr, &schema_with_b);
4851 rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4852 };
4853
4854 let known_expression = col("a").eq(lit(12));
4856 let known_expression_transformed = PredicateRewriter::new()
4857 .rewrite_predicate_to_statistics_predicate(
4858 &logical2physical(&known_expression, &schema),
4859 &schema,
4860 );
4861
4862 let input = col("b").eq(lit(12));
4864 let expected = logical2physical(&lit(42), &schema);
4865 let transformed = transform_expr(input.clone());
4866 assert_eq!(transformed.to_string(), expected.to_string());
4867
4868 let input = known_expression.clone().and(input.clone());
4870 let expected = phys_expr::BinaryExpr::new(
4871 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4872 Operator::And,
4873 logical2physical(&lit(42), &schema),
4874 );
4875 let transformed = transform_expr(input.clone());
4876 assert_eq!(transformed.to_string(), expected.to_string());
4877
4878 let input = array_has(make_array(vec![lit(1)]), col("a"));
4880 let expected = logical2physical(&lit(42), &schema);
4881 let transformed = transform_expr(input.clone());
4882 assert_eq!(transformed.to_string(), expected.to_string());
4883
4884 let input = known_expression.and(input);
4886 let expected = phys_expr::BinaryExpr::new(
4887 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4888 Operator::And,
4889 logical2physical(&lit(42), &schema),
4890 );
4891 let transformed = transform_expr(input.clone());
4892 assert_eq!(transformed.to_string(), expected.to_string());
4893 }
4894
4895 #[test]
4896 fn test_rewrite_expr_to_prunable_error() {
4897 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4900 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4901 let left_input = cast(col("a"), DataType::Int64);
4902 let left_input = logical2physical(&left_input, &schema);
4903 let right_input = lit(ScalarValue::Int64(Some(12)));
4904 let right_input = logical2physical(&right_input, &schema);
4905 let result = rewrite_expr_to_prunable(
4906 &left_input,
4907 Operator::Gt,
4908 &right_input,
4909 df_schema.clone(),
4910 );
4911 assert!(result.is_err());
4912
4913 let left_input = is_null(col("a"));
4915 let left_input = logical2physical(&left_input, &schema);
4916 let right_input = lit(ScalarValue::Int64(Some(12)));
4917 let right_input = logical2physical(&right_input, &schema);
4918 let result =
4919 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4920 assert!(result.is_err());
4921 }
4923
4924 #[test]
4925 fn prune_with_contained_one_column() {
4926 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4927
4928 let statistics = TestStatistics::new()
4930 .with_contained(
4931 "s1",
4932 [ScalarValue::from("foo")],
4933 [
4934 Some(true),
4936 Some(false),
4938 None,
4940 Some(true),
4942 Some(false),
4944 None,
4946 Some(true),
4948 Some(false),
4950 None,
4952 ],
4953 )
4954 .with_contained(
4955 "s1",
4956 [ScalarValue::from("bar")],
4957 [
4958 Some(true),
4960 Some(true),
4961 Some(true),
4962 Some(false),
4964 Some(false),
4965 Some(false),
4966 None,
4968 None,
4969 None,
4970 ],
4971 )
4972 .with_contained(
4973 "s1",
4976 [ScalarValue::from("foo"), ScalarValue::from("bar")],
4977 [
4978 None,
4980 None,
4981 None,
4982 Some(true),
4984 Some(true),
4985 Some(true),
4986 Some(false),
4988 Some(false),
4989 Some(false),
4990 ],
4991 );
4992
4993 prune_with_expr(
4995 col("s1").eq(lit("foo")),
4996 &schema,
4997 &statistics,
4998 &[true, false, true, true, false, true, true, false, true],
5000 );
5001
5002 prune_with_expr(
5004 col("s1").eq(lit("bar")),
5005 &schema,
5006 &statistics,
5007 &[true, true, true, false, false, false, true, true, true],
5009 );
5010
5011 prune_with_expr(
5013 col("s1").eq(lit("baz")),
5014 &schema,
5015 &statistics,
5016 &[true, true, true, true, true, true, true, true, true],
5018 );
5019
5020 prune_with_expr(
5022 col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
5023 &schema,
5024 &statistics,
5025 &[true, true, true, true, true, true, true, true, true],
5029 );
5030
5031 prune_with_expr(
5033 col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
5034 &schema,
5035 &statistics,
5036 &[true, true, true, true, true, true, false, false, false],
5038 );
5039
5040 prune_with_expr(
5042 col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
5043 &schema,
5044 &statistics,
5045 &[true, true, true, true, true, true, true, true, true],
5047 );
5048
5049 prune_with_expr(
5051 col("s1")
5052 .eq(lit("foo"))
5053 .or(col("s1").eq(lit("bar")))
5054 .or(col("s1").eq(lit("baz"))),
5055 &schema,
5056 &statistics,
5057 &[true, true, true, true, true, true, true, true, true],
5060 );
5061
5062 prune_with_expr(
5064 col("s1").not_eq(lit("foo")),
5065 &schema,
5066 &statistics,
5067 &[false, true, true, false, true, true, false, true, true],
5069 );
5070
5071 prune_with_expr(
5073 col("s1").not_eq(lit("bar")),
5074 &schema,
5075 &statistics,
5076 &[false, false, false, true, true, true, true, true, true],
5078 );
5079
5080 prune_with_expr(
5082 col("s1")
5083 .not_eq(lit("foo"))
5084 .and(col("s1").not_eq(lit("bar"))),
5085 &schema,
5086 &statistics,
5087 &[true, true, true, false, false, false, true, true, true],
5089 );
5090
5091 prune_with_expr(
5093 col("s1")
5094 .not_eq(lit("foo"))
5095 .and(col("s1").not_eq(lit("bar")))
5096 .and(col("s1").not_eq(lit("baz"))),
5097 &schema,
5098 &statistics,
5099 &[true, true, true, true, true, true, true, true, true],
5101 );
5102
5103 prune_with_expr(
5105 col("s1")
5106 .not_eq(lit("foo"))
5107 .or(col("s1").not_eq(lit("bar"))),
5108 &schema,
5109 &statistics,
5110 &[true, true, true, true, true, true, true, true, true],
5112 );
5113
5114 prune_with_expr(
5116 col("s1")
5117 .not_eq(lit("foo"))
5118 .or(col("s1").not_eq(lit("bar")))
5119 .or(col("s1").not_eq(lit("baz"))),
5120 &schema,
5121 &statistics,
5122 &[true, true, true, true, true, true, true, true, true],
5124 );
5125 }
5126
5127 #[test]
5128 fn prune_with_contained_two_columns() {
5129 let schema = Arc::new(Schema::new(vec![
5130 Field::new("s1", DataType::Utf8, true),
5131 Field::new("s2", DataType::Utf8, true),
5132 ]));
5133
5134 let statistics = TestStatistics::new()
5136 .with_contained(
5137 "s1",
5138 [ScalarValue::from("foo")],
5139 [
5140 Some(true),
5142 Some(false),
5144 None,
5146 Some(true),
5148 Some(false),
5150 None,
5152 Some(true),
5154 Some(false),
5156 None,
5158 ],
5159 )
5160 .with_contained(
5161 "s2", [ScalarValue::from("bar")],
5163 [
5164 Some(true),
5166 Some(true),
5167 Some(true),
5168 Some(false),
5170 Some(false),
5171 Some(false),
5172 None,
5174 None,
5175 None,
5176 ],
5177 );
5178
5179 prune_with_expr(
5181 col("s1").eq(lit("foo")),
5182 &schema,
5183 &statistics,
5184 &[true, false, true, true, false, true, true, false, true],
5186 );
5187
5188 let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
5190 prune_with_expr(
5191 expr,
5192 &schema,
5193 &statistics,
5194 &[true, true, true, true, true, true, true, true, true],
5196 );
5197
5198 prune_with_expr(
5200 col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
5201 &schema,
5202 &statistics,
5203 &[false, false, false, true, false, true, true, false, true],
5207 );
5208
5209 prune_with_expr(
5211 col("s1")
5212 .not_eq(lit("foo"))
5213 .and(col("s2").not_eq(lit("bar"))),
5214 &schema,
5215 &statistics,
5216 &[false, false, false, false, true, true, false, true, true],
5220 );
5221
5222 prune_with_expr(
5224 col("s1")
5225 .not_eq(lit("foo"))
5226 .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
5227 &schema,
5228 &statistics,
5229 &[false, true, true, false, true, true, false, true, true],
5232 );
5233
5234 prune_with_expr(
5236 col("s1").like(lit("foo%bar%")),
5237 &schema,
5238 &statistics,
5239 &[true, true, true, true, true, true, true, true, true],
5241 );
5242
5243 prune_with_expr(
5245 col("s1")
5246 .like(lit("foo%bar%"))
5247 .and(col("s2").eq(lit("bar"))),
5248 &schema,
5249 &statistics,
5250 &[true, true, true, false, false, false, true, true, true],
5252 );
5253
5254 prune_with_expr(
5256 col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5257 &schema,
5258 &statistics,
5259 &[true, true, true, true, true, true, true, true, true],
5262 );
5263 }
5264
5265 #[test]
5266 fn prune_with_range_and_contained() {
5267 let schema = Arc::new(Schema::new(vec![
5269 Field::new("i", DataType::Int32, true),
5270 Field::new("s", DataType::Utf8, true),
5271 ]));
5272
5273 let statistics = TestStatistics::new()
5274 .with(
5275 "i",
5276 ContainerStats::new_i32(
5277 vec![
5281 Some(-5),
5282 Some(10),
5283 None,
5284 Some(-5),
5285 Some(10),
5286 None,
5287 Some(-5),
5288 Some(10),
5289 None,
5290 ], vec![
5292 Some(5),
5293 Some(20),
5294 None,
5295 Some(5),
5296 Some(20),
5297 None,
5298 Some(5),
5299 Some(20),
5300 None,
5301 ], ),
5303 )
5304 .with_contained(
5306 "s",
5307 [ScalarValue::from("foo")],
5308 [
5309 Some(true),
5311 Some(true),
5312 Some(true),
5313 Some(false),
5315 Some(false),
5316 Some(false),
5317 None,
5319 None,
5320 None,
5321 ],
5322 );
5323
5324 prune_with_expr(
5326 col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5327 &schema,
5328 &statistics,
5329 &[true, false, true, false, false, false, true, false, true],
5334 );
5335
5336 prune_with_expr(
5338 col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5339 &schema,
5340 &statistics,
5341 &[false, false, false, true, false, true, true, false, true],
5345 );
5346
5347 prune_with_expr(
5349 col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5350 &schema,
5351 &statistics,
5352 &[true, true, true, true, true, true, true, true, true],
5355 );
5356 }
5357
5358 fn prune_with_expr(
5365 expr: Expr,
5366 schema: &SchemaRef,
5367 statistics: &TestStatistics,
5368 expected: &[bool],
5369 ) {
5370 println!("Pruning with expr: {expr}");
5371 let expr = logical2physical(&expr, schema);
5372 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5373 let result = p.prune(statistics).unwrap();
5374 assert_eq!(result, expected);
5375 }
5376
5377 fn prune_with_simplified_expr(
5378 expr: Expr,
5379 schema: &SchemaRef,
5380 statistics: &TestStatistics,
5381 expected: &[bool],
5382 ) {
5383 println!("Pruning with expr: {expr}");
5384 let expr = logical2physical(&expr, schema);
5385 let simplifier = PhysicalExprSimplifier::new(schema);
5386 let expr = simplifier.simplify(expr).unwrap();
5387 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5388 let result = p.prune(statistics).unwrap();
5389 assert_eq!(result, expected);
5390 }
5391
5392 fn test_build_predicate_expression(
5393 expr: &Expr,
5394 schema: &Schema,
5395 required_columns: &mut RequiredColumns,
5396 ) -> Arc<dyn PhysicalExpr> {
5397 let expr = logical2physical(expr, schema);
5398 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5399 build_predicate_expression(
5400 &expr,
5401 &Arc::new(schema.clone()),
5402 required_columns,
5403 &unhandled_hook,
5404 )
5405 }
5406
5407 #[test]
5408 fn test_build_predicate_expression_with_false() {
5409 let expr = lit(ScalarValue::Boolean(Some(false)));
5410 let schema = Schema::empty();
5411 let res =
5412 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5413 let expected = logical2physical(&expr, &schema);
5414 assert_eq!(&res, &expected);
5415 }
5416
5417 #[test]
5418 fn test_build_predicate_expression_with_and_false() {
5419 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5420 let expr = and(
5421 col("c1").eq(lit("a")),
5422 lit(ScalarValue::Boolean(Some(false))),
5423 );
5424 let res =
5425 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5426 let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5427 assert_eq!(&res, &expected);
5428 }
5429
5430 #[test]
5431 fn test_build_predicate_expression_with_or_false() {
5432 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5433 let left_expr = col("c1").eq(lit("a"));
5434 let right_expr = lit(ScalarValue::Boolean(Some(false)));
5435 let res = test_build_predicate_expression(
5436 &or(left_expr.clone(), right_expr.clone()),
5437 &schema,
5438 &mut RequiredColumns::new(),
5439 );
5440 let expected =
5441 "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5442 assert_eq!(res.to_string(), expected);
5443 }
5444}