1use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27 array::{ArrayRef, BooleanArray, new_null_array},
28 datatypes::{DataType, Field, Schema, SchemaRef},
29 record_batch::{RecordBatch, RecordBatchOptions},
30};
31pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::{TransformedResult, TreeNodeRecursion};
39use datafusion_common::{Column, DFSchema, assert_eq_or_internal_err};
40use datafusion_common::{
41 ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err,
42 tree_node::{Transformed, TreeNode},
43};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::expressions::CastColumnExpr;
46use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
47use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
48use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
49use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
50
51#[derive(Debug, Clone)]
363pub struct PruningPredicate {
364 schema: SchemaRef,
366 predicate_expr: Arc<dyn PhysicalExpr>,
369 required_columns: RequiredColumns,
371 orig_expr: Arc<dyn PhysicalExpr>,
374 literal_guarantees: Vec<LiteralGuarantee>,
379}
380
381pub fn build_pruning_predicate(
387 predicate: Arc<dyn PhysicalExpr>,
388 file_schema: &SchemaRef,
389 predicate_creation_errors: &Count,
390) -> Option<Arc<PruningPredicate>> {
391 match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
392 Ok(pruning_predicate) => {
393 if !pruning_predicate.always_true() {
394 return Some(Arc::new(pruning_predicate));
395 }
396 }
397 Err(e) => {
398 debug!("Could not create pruning predicate for: {e}");
399 predicate_creation_errors.add(1);
400 }
401 }
402 None
403}
404
405pub trait UnhandledPredicateHook {
409 fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
412}
413
414#[derive(Debug, Clone)]
417struct ConstantUnhandledPredicateHook {
418 default: Arc<dyn PhysicalExpr>,
419}
420
421impl Default for ConstantUnhandledPredicateHook {
422 fn default() -> Self {
423 Self {
424 default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
425 }
426 }
427}
428
429impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
430 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
431 Arc::clone(&self.default)
432 }
433}
434
435impl PruningPredicate {
436 pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
465 let tf = snapshot_physical_expr_opt(expr)?;
469 if tf.transformed {
470 let simplifier = PhysicalExprSimplifier::new(&schema);
477 expr = simplifier.simplify(tf.data)?;
478 } else {
479 expr = tf.data;
480 }
481 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
482
483 let mut required_columns = RequiredColumns::new();
485 let predicate_expr = build_predicate_expression(
486 &expr,
487 &schema,
488 &mut required_columns,
489 &unhandled_hook,
490 );
491 let predicate_schema = required_columns.schema();
492 let predicate_expr =
494 PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
495
496 let literal_guarantees = LiteralGuarantee::analyze(&expr);
497
498 Ok(Self {
499 schema,
500 predicate_expr,
501 required_columns,
502 orig_expr: expr,
503 literal_guarantees,
504 })
505 }
506
507 pub fn prune<S: PruningStatistics + ?Sized>(
522 &self,
523 statistics: &S,
524 ) -> Result<Vec<bool>> {
525 let mut builder = BoolVecBuilder::new(statistics.num_containers());
526
527 for literal_guarantee in &self.literal_guarantees {
530 let LiteralGuarantee {
531 column,
532 guarantee,
533 literals,
534 } = literal_guarantee;
535 if let Some(results) = statistics.contained(column, literals) {
536 match guarantee {
537 Guarantee::In => builder.combine_array(&results),
542 Guarantee::NotIn => {
548 builder.combine_array(&arrow::compute::not(&results)?)
549 }
550 }
551 if builder.check_all_pruned() {
554 return Ok(builder.build());
555 }
556 }
557 }
558
559 let statistics_batch =
565 build_statistics_record_batch(statistics, &self.required_columns)?;
566
567 builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
569
570 Ok(builder.build())
571 }
572
573 pub fn schema(&self) -> &SchemaRef {
575 &self.schema
576 }
577
578 pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
580 &self.orig_expr
581 }
582
583 pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
585 &self.predicate_expr
586 }
587
588 pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
594 &self.literal_guarantees
595 }
596
597 pub fn always_true(&self) -> bool {
604 is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
605 }
606
607 pub fn required_columns(&self) -> &RequiredColumns {
608 &self.required_columns
609 }
610
611 pub fn literal_columns(&self) -> Vec<String> {
619 let mut seen = HashSet::new();
620 self.literal_guarantees
621 .iter()
622 .map(|e| &e.column.name)
623 .filter(|name| seen.insert(*name))
625 .map(|s| s.to_string())
626 .collect()
627 }
628}
629
630#[derive(Debug)]
632struct BoolVecBuilder {
633 inner: Vec<bool>,
637}
638
639impl BoolVecBuilder {
640 fn new(num_containers: usize) -> Self {
642 Self {
643 inner: vec![true; num_containers],
645 }
646 }
647
648 fn combine_array(&mut self, array: &BooleanArray) {
656 assert_eq!(array.len(), self.inner.len());
657 for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
658 if let Some(false) = new {
662 *cur = false;
663 }
664 }
665 }
666
667 fn combine_value(&mut self, value: ColumnarValue) {
673 match value {
674 ColumnarValue::Array(array) => {
675 self.combine_array(array.as_boolean());
676 }
677 ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
678 self.inner = vec![false; self.inner.len()];
680 }
681 _ => {
682 }
685 }
686 }
687
688 fn build(self) -> Vec<bool> {
690 self.inner
691 }
692
693 fn check_all_pruned(&self) -> bool {
695 self.inner.iter().all(|&x| !x)
696 }
697}
698
699fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
700 expr.as_any()
701 .downcast_ref::<phys_expr::Literal>()
702 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
703 .unwrap_or_default()
704}
705
706fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
707 expr.as_any()
708 .downcast_ref::<phys_expr::Literal>()
709 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
710 .unwrap_or_default()
711}
712
713#[derive(Debug, Default, Clone)]
723pub struct RequiredColumns {
724 columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
730}
731
732impl RequiredColumns {
733 fn new() -> Self {
734 Self::default()
735 }
736
737 pub fn single_column(&self) -> Option<&phys_expr::Column> {
746 if self.columns.windows(2).all(|w| {
747 let c1 = &w[0].0;
749 let c2 = &w[1].0;
750 c1 == c2
751 }) {
752 self.columns.first().map(|r| &r.0)
753 } else {
754 None
755 }
756 }
757
758 fn schema(&self) -> Schema {
765 let fields = self
766 .columns
767 .iter()
768 .map(|(_c, _t, f)| f.clone())
769 .collect::<Vec<_>>();
770 Schema::new(fields)
771 }
772
773 pub(crate) fn iter(
776 &self,
777 ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
778 self.columns.iter()
779 }
780
781 fn find_stat_column(
782 &self,
783 column: &phys_expr::Column,
784 statistics_type: StatisticsType,
785 ) -> Option<usize> {
786 match statistics_type {
787 StatisticsType::RowCount => {
788 self.columns
790 .iter()
791 .enumerate()
792 .find(|(_i, (_c, t, _f))| t == &statistics_type)
793 .map(|(i, (_c, _t, _f))| i)
794 }
795 _ => self
796 .columns
797 .iter()
798 .enumerate()
799 .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
800 .map(|(i, (_c, _t, _f))| i),
801 }
802 }
803
804 fn stat_column_expr(
813 &mut self,
814 column: &phys_expr::Column,
815 column_expr: &Arc<dyn PhysicalExpr>,
816 field: &Field,
817 stat_type: StatisticsType,
818 ) -> Result<Arc<dyn PhysicalExpr>> {
819 let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
820 Some(idx) => (idx, false),
821 None => (self.columns.len(), true),
822 };
823
824 let column_name = column.name();
825 let stat_column_name = match stat_type {
826 StatisticsType::Min => format!("{column_name}_min"),
827 StatisticsType::Max => format!("{column_name}_max"),
828 StatisticsType::NullCount => format!("{column_name}_null_count"),
829 StatisticsType::RowCount => "row_count".to_string(),
830 };
831
832 let stat_column = phys_expr::Column::new(&stat_column_name, idx);
833
834 if need_to_insert {
836 let nullable = true;
838 let stat_field =
839 Field::new(stat_column.name(), field.data_type().clone(), nullable);
840 self.columns.push((column.clone(), stat_type, stat_field));
841 }
842 rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
843 }
844
845 fn min_column_expr(
847 &mut self,
848 column: &phys_expr::Column,
849 column_expr: &Arc<dyn PhysicalExpr>,
850 field: &Field,
851 ) -> Result<Arc<dyn PhysicalExpr>> {
852 self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
853 }
854
855 fn max_column_expr(
857 &mut self,
858 column: &phys_expr::Column,
859 column_expr: &Arc<dyn PhysicalExpr>,
860 field: &Field,
861 ) -> Result<Arc<dyn PhysicalExpr>> {
862 self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
863 }
864
865 fn null_count_column_expr(
867 &mut self,
868 column: &phys_expr::Column,
869 column_expr: &Arc<dyn PhysicalExpr>,
870 field: &Field,
871 ) -> Result<Arc<dyn PhysicalExpr>> {
872 self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
873 }
874
875 fn row_count_column_expr(
877 &mut self,
878 column: &phys_expr::Column,
879 column_expr: &Arc<dyn PhysicalExpr>,
880 field: &Field,
881 ) -> Result<Arc<dyn PhysicalExpr>> {
882 self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
883 }
884}
885
886impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
887 fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
888 Self { columns }
889 }
890}
891
892fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
918 statistics: &S,
919 required_columns: &RequiredColumns,
920) -> Result<RecordBatch> {
921 let mut arrays = Vec::<ArrayRef>::new();
922 for (column, statistics_type, stat_field) in required_columns.iter() {
924 let column = Column::from_name(column.name());
925 let data_type = stat_field.data_type();
926
927 let num_containers = statistics.num_containers();
928
929 let array = match statistics_type {
930 StatisticsType::Min => statistics.min_values(&column),
931 StatisticsType::Max => statistics.max_values(&column),
932 StatisticsType::NullCount => statistics.null_counts(&column),
933 StatisticsType::RowCount => statistics.row_counts(&column),
934 };
935 let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
936
937 assert_eq_or_internal_err!(
938 num_containers,
939 array.len(),
940 "mismatched statistics length. Expected {}, got {}",
941 num_containers,
942 array.len()
943 );
944
945 let array = arrow::compute::cast(&array, data_type)?;
948
949 arrays.push(array);
950 }
951
952 let schema = Arc::new(required_columns.schema());
953 let mut options = RecordBatchOptions::default();
955 options.row_count = Some(statistics.num_containers());
956
957 trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
958
959 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
960 plan_datafusion_err!("Can not create statistics record batch: {err}")
961 })
962}
963
964struct PruningExpressionBuilder<'a> {
965 column: phys_expr::Column,
966 column_expr: Arc<dyn PhysicalExpr>,
967 op: Operator,
968 scalar_expr: Arc<dyn PhysicalExpr>,
969 field: &'a Field,
970 required_columns: &'a mut RequiredColumns,
971}
972
973impl<'a> PruningExpressionBuilder<'a> {
974 fn try_new(
975 left: &'a Arc<dyn PhysicalExpr>,
976 right: &'a Arc<dyn PhysicalExpr>,
977 left_columns: ColumnReferenceCount,
978 right_columns: ColumnReferenceCount,
979 op: Operator,
980 schema: &'a SchemaRef,
981 required_columns: &'a mut RequiredColumns,
982 ) -> Result<Self> {
983 let (column_expr, scalar_expr, column, correct_operator) = match (
985 left_columns,
986 right_columns,
987 ) {
988 (ColumnReferenceCount::One(column), ColumnReferenceCount::Zero) => {
989 (left, right, column, op)
990 }
991 (ColumnReferenceCount::Zero, ColumnReferenceCount::One(column)) => {
992 (right, left, column, reverse_operator(op)?)
993 }
994 (ColumnReferenceCount::One(_), ColumnReferenceCount::One(_)) => {
995 return plan_err!(
997 "Expression not supported for pruning: left has 1 column, right has 1 column"
998 );
999 }
1000 (ColumnReferenceCount::Zero, ColumnReferenceCount::Zero) => {
1001 return plan_err!(
1003 "Pruning literal expressions is not supported, please call PhysicalExprSimplifier first"
1004 );
1005 }
1006 (ColumnReferenceCount::Many, _) | (_, ColumnReferenceCount::Many) => {
1007 return plan_err!(
1008 "Expression not supported for pruning: left or right has multiple columns"
1009 );
1010 }
1011 };
1012
1013 let df_schema = DFSchema::try_from(Arc::clone(schema))?;
1014 let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1015 column_expr,
1016 correct_operator,
1017 scalar_expr,
1018 df_schema,
1019 )?;
1020 let field = match schema.column_with_name(column.name()) {
1021 Some((_, f)) => f,
1022 _ => {
1023 return plan_err!("Field not found in schema");
1024 }
1025 };
1026
1027 Ok(Self {
1028 column,
1029 column_expr,
1030 op: correct_operator,
1031 scalar_expr,
1032 field,
1033 required_columns,
1034 })
1035 }
1036
1037 fn op(&self) -> Operator {
1038 self.op
1039 }
1040
1041 fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1042 &self.scalar_expr
1043 }
1044
1045 fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1046 self.required_columns
1047 .min_column_expr(&self.column, &self.column_expr, self.field)
1048 }
1049
1050 fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1051 self.required_columns
1052 .max_column_expr(&self.column, &self.column_expr, self.field)
1053 }
1054
1055 fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1062 let column_expr = Arc::new(self.column.clone()) as _;
1064
1065 let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1067
1068 self.required_columns.null_count_column_expr(
1069 &self.column,
1070 &column_expr,
1071 null_count_field,
1072 )
1073 }
1074
1075 fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1082 let column_expr = Arc::new(self.column.clone()) as _;
1084
1085 let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1087
1088 self.required_columns.row_count_column_expr(
1089 &self.column,
1090 &column_expr,
1091 row_count_field,
1092 )
1093 }
1094}
1095
1096fn rewrite_expr_to_prunable(
1109 column_expr: &PhysicalExprRef,
1110 op: Operator,
1111 scalar_expr: &PhysicalExprRef,
1112 schema: DFSchema,
1113) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1114 if !is_compare_op(op) {
1115 return plan_err!("rewrite_expr_to_prunable only support compare expression");
1116 }
1117
1118 let column_expr_any = column_expr.as_any();
1119
1120 if column_expr_any
1121 .downcast_ref::<phys_expr::Column>()
1122 .is_some()
1123 {
1124 Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1126 } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1127 let arrow_schema = schema.as_arrow();
1129 let from_type = cast.expr().data_type(arrow_schema)?;
1130 verify_support_type_for_prune(&from_type, cast.cast_type())?;
1131 let (left, op, right) =
1132 rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1133 let left = Arc::new(phys_expr::CastExpr::new(
1134 left,
1135 cast.cast_type().clone(),
1136 None,
1137 ));
1138 Ok((left, op, right))
1139 } else if let Some(cast_col) = column_expr_any.downcast_ref::<CastColumnExpr>() {
1140 let arrow_schema = schema.as_arrow();
1142 let from_type = cast_col.expr().data_type(arrow_schema)?;
1143 let to_type = cast_col.target_field().data_type();
1144 verify_support_type_for_prune(&from_type, to_type)?;
1145 let (left, op, right) =
1146 rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, schema)?;
1147 let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), None));
1152 Ok((left, op, right))
1153 } else if let Some(try_cast) =
1154 column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1155 {
1156 let arrow_schema = schema.as_arrow();
1158 let from_type = try_cast.expr().data_type(arrow_schema)?;
1159 verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1160 let (left, op, right) =
1161 rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1162 let left = Arc::new(phys_expr::TryCastExpr::new(
1163 left,
1164 try_cast.cast_type().clone(),
1165 ));
1166 Ok((left, op, right))
1167 } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1168 let (left, op, right) =
1170 rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1171 let right = Arc::new(phys_expr::NegativeExpr::new(right));
1172 Ok((left, reverse_operator(op)?, right))
1173 } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1174 if op != Operator::Eq && op != Operator::NotEq {
1176 return plan_err!("Not with operator other than Eq / NotEq is not supported");
1177 }
1178 if not
1179 .arg()
1180 .as_any()
1181 .downcast_ref::<phys_expr::Column>()
1182 .is_some()
1183 {
1184 let left = Arc::clone(not.arg());
1185 let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1186 Ok((left, reverse_operator(op)?, right))
1187 } else {
1188 plan_err!("Not with complex expression {column_expr:?} is not supported")
1189 }
1190 } else {
1191 plan_err!("column expression {column_expr:?} is not supported")
1192 }
1193}
1194
1195fn is_compare_op(op: Operator) -> bool {
1196 matches!(
1197 op,
1198 Operator::Eq
1199 | Operator::NotEq
1200 | Operator::Lt
1201 | Operator::LtEq
1202 | Operator::Gt
1203 | Operator::GtEq
1204 | Operator::LikeMatch
1205 | Operator::NotLikeMatch
1206 )
1207}
1208
1209fn is_string_type(data_type: &DataType) -> bool {
1210 matches!(
1211 data_type,
1212 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1213 )
1214}
1215
1216fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1221 let from_type = match from_type {
1223 DataType::Dictionary(_, t) => {
1224 return verify_support_type_for_prune(t.as_ref(), to_type);
1225 }
1226 _ => from_type,
1227 };
1228 let to_type = match to_type {
1229 DataType::Dictionary(_, t) => {
1230 return verify_support_type_for_prune(from_type, t.as_ref());
1231 }
1232 _ => to_type,
1233 };
1234 if is_string_type(from_type) == is_string_type(to_type) {
1238 Ok(())
1239 } else {
1240 plan_err!(
1241 "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1242 )
1243 }
1244}
1245
1246fn rewrite_column_expr(
1248 e: Arc<dyn PhysicalExpr>,
1249 column_old: &phys_expr::Column,
1250 column_new: &phys_expr::Column,
1251) -> Result<Arc<dyn PhysicalExpr>> {
1252 e.transform(|expr| {
1253 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>()
1254 && column == column_old
1255 {
1256 return Ok(Transformed::yes(Arc::new(column_new.clone())));
1257 }
1258
1259 Ok(Transformed::no(expr))
1260 })
1261 .data()
1262}
1263
1264fn reverse_operator(op: Operator) -> Result<Operator> {
1265 op.swap().ok_or_else(|| {
1266 internal_datafusion_err!(
1267 "Could not reverse operator {op} while building pruning predicate"
1268 )
1269 })
1270}
1271
1272fn build_single_column_expr(
1277 column: &phys_expr::Column,
1278 schema: &Schema,
1279 required_columns: &mut RequiredColumns,
1280 is_not: bool, ) -> Option<Arc<dyn PhysicalExpr>> {
1282 let field = schema.field_with_name(column.name()).ok()?;
1283
1284 if matches!(field.data_type(), &DataType::Boolean) {
1285 let col_ref = Arc::new(column.clone()) as _;
1286
1287 let min = required_columns
1288 .min_column_expr(column, &col_ref, field)
1289 .ok()?;
1290 let max = required_columns
1291 .max_column_expr(column, &col_ref, field)
1292 .ok()?;
1293
1294 if is_not {
1298 Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1301 phys_expr::BinaryExpr::new(min, Operator::And, max),
1302 ))))
1303 } else {
1304 Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1307 }
1308 } else {
1309 None
1310 }
1311}
1312
1313fn build_is_null_column_expr(
1322 expr: &Arc<dyn PhysicalExpr>,
1323 schema: &Schema,
1324 required_columns: &mut RequiredColumns,
1325 with_not: bool,
1326) -> Option<Arc<dyn PhysicalExpr>> {
1327 if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1328 let field = schema.field_with_name(col.name()).ok()?;
1329
1330 let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1331 if with_not {
1332 if let Ok(row_count_expr) =
1333 required_columns.row_count_column_expr(col, expr, null_count_field)
1334 {
1335 required_columns
1336 .null_count_column_expr(col, expr, null_count_field)
1337 .map(|null_count_column_expr| {
1338 Arc::new(phys_expr::BinaryExpr::new(
1340 null_count_column_expr,
1341 Operator::NotEq,
1342 row_count_expr,
1343 )) as _
1344 })
1345 .ok()
1346 } else {
1347 None
1348 }
1349 } else {
1350 required_columns
1351 .null_count_column_expr(col, expr, null_count_field)
1352 .map(|null_count_column_expr| {
1353 Arc::new(phys_expr::BinaryExpr::new(
1355 null_count_column_expr,
1356 Operator::Gt,
1357 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1358 )) as _
1359 })
1360 .ok()
1361 }
1362 } else {
1363 None
1364 }
1365}
1366
1367const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1370
1371pub struct PredicateRewriter {
1374 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1375}
1376
1377impl Default for PredicateRewriter {
1378 fn default() -> Self {
1379 Self {
1380 unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1381 }
1382 }
1383}
1384
1385impl PredicateRewriter {
1386 pub fn new() -> Self {
1388 Self::default()
1389 }
1390
1391 pub fn with_unhandled_hook(
1393 self,
1394 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1395 ) -> Self {
1396 Self { unhandled_hook }
1397 }
1398
1399 pub fn rewrite_predicate_to_statistics_predicate(
1409 &self,
1410 expr: &Arc<dyn PhysicalExpr>,
1411 schema: &Schema,
1412 ) -> Arc<dyn PhysicalExpr> {
1413 let mut required_columns = RequiredColumns::new();
1414 build_predicate_expression(
1415 expr,
1416 &Arc::new(schema.clone()),
1417 &mut required_columns,
1418 &self.unhandled_hook,
1419 )
1420 }
1421}
1422
1423fn build_predicate_expression(
1433 expr: &Arc<dyn PhysicalExpr>,
1434 schema: &SchemaRef,
1435 required_columns: &mut RequiredColumns,
1436 unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1437) -> Arc<dyn PhysicalExpr> {
1438 if is_always_false(expr) {
1439 return Arc::clone(expr);
1442 }
1443 let expr_any = expr.as_any();
1445 if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1446 return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1447 .unwrap_or_else(|| unhandled_hook.handle(expr));
1448 }
1449 if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1450 return build_is_null_column_expr(
1451 is_not_null.arg(),
1452 schema,
1453 required_columns,
1454 true,
1455 )
1456 .unwrap_or_else(|| unhandled_hook.handle(expr));
1457 }
1458 if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1459 return build_single_column_expr(col, schema, required_columns, false)
1460 .unwrap_or_else(|| unhandled_hook.handle(expr));
1461 }
1462 if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1463 if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1465 return build_single_column_expr(col, schema, required_columns, true)
1466 .unwrap_or_else(|| unhandled_hook.handle(expr));
1467 } else {
1468 return unhandled_hook.handle(expr);
1469 }
1470 }
1471 if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1472 if !in_list.list().is_empty()
1473 && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1474 {
1475 let eq_op = if in_list.negated() {
1476 Operator::NotEq
1477 } else {
1478 Operator::Eq
1479 };
1480 let re_op = if in_list.negated() {
1481 Operator::And
1482 } else {
1483 Operator::Or
1484 };
1485 let change_expr = in_list
1486 .list()
1487 .iter()
1488 .map(|e| {
1489 Arc::new(phys_expr::BinaryExpr::new(
1490 Arc::clone(in_list.expr()),
1491 eq_op,
1492 Arc::clone(e),
1493 )) as _
1494 })
1495 .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1496 .unwrap();
1497 return build_predicate_expression(
1498 &change_expr,
1499 schema,
1500 required_columns,
1501 unhandled_hook,
1502 );
1503 } else {
1504 return unhandled_hook.handle(expr);
1505 }
1506 }
1507
1508 let (left, op, right) = {
1509 if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1510 (
1511 Arc::clone(bin_expr.left()),
1512 *bin_expr.op(),
1513 Arc::clone(bin_expr.right()),
1514 )
1515 } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1516 if like_expr.case_insensitive() {
1517 return unhandled_hook.handle(expr);
1518 }
1519 let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1520 (false, false) => Operator::LikeMatch,
1521 (true, false) => Operator::NotLikeMatch,
1522 (false, true) => Operator::ILikeMatch,
1523 (true, true) => Operator::NotILikeMatch,
1524 };
1525 (
1526 Arc::clone(like_expr.expr()),
1527 op,
1528 Arc::clone(like_expr.pattern()),
1529 )
1530 } else {
1531 return unhandled_hook.handle(expr);
1532 }
1533 };
1534
1535 if op == Operator::And || op == Operator::Or {
1536 let left_expr =
1537 build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1538 let right_expr =
1539 build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1540 let expr = match (&left_expr, op, &right_expr) {
1542 (left, Operator::And, right)
1543 if is_always_false(left) || is_always_false(right) =>
1544 {
1545 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1546 }
1547 (left, Operator::And, _) if is_always_true(left) => right_expr,
1548 (_, Operator::And, right) if is_always_true(right) => left_expr,
1549 (left, Operator::Or, right)
1550 if is_always_true(left) || is_always_true(right) =>
1551 {
1552 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1553 }
1554 (left, Operator::Or, _) if is_always_false(left) => right_expr,
1555 (_, Operator::Or, right) if is_always_false(right) => left_expr,
1556
1557 _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1558 };
1559 return expr;
1560 }
1561
1562 let left_columns = ColumnReferenceCount::from_expression(&left);
1563 let right_columns = ColumnReferenceCount::from_expression(&right);
1564 let expr_builder = PruningExpressionBuilder::try_new(
1565 &left,
1566 &right,
1567 left_columns,
1568 right_columns,
1569 op,
1570 schema,
1571 required_columns,
1572 );
1573 let mut expr_builder = match expr_builder {
1574 Ok(builder) => builder,
1575 Err(e) => {
1578 debug!("Error building pruning expression: {e}");
1579 return unhandled_hook.handle(expr);
1580 }
1581 };
1582
1583 build_statistics_expr(&mut expr_builder)
1584 .unwrap_or_else(|_| unhandled_hook.handle(expr))
1585}
1586
1587#[derive(Debug, PartialEq, Eq)]
1597enum ColumnReferenceCount {
1598 Zero,
1600 One(phys_expr::Column),
1602 Many,
1604}
1605
1606impl ColumnReferenceCount {
1607 fn from_expression(expr: &Arc<dyn PhysicalExpr>) -> Self {
1609 let mut seen = HashSet::<phys_expr::Column>::new();
1610 expr.apply(|expr| {
1611 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1612 seen.insert(column.clone());
1613 if seen.len() > 1 {
1614 return Ok(TreeNodeRecursion::Stop);
1615 }
1616 }
1617 Ok(TreeNodeRecursion::Continue)
1618 })
1619 .expect("no way to return error during recursion");
1621 match seen.len() {
1622 0 => ColumnReferenceCount::Zero,
1623 1 => ColumnReferenceCount::One(
1624 seen.into_iter().next().expect("just checked len==1"),
1625 ),
1626 _ => ColumnReferenceCount::Many,
1627 }
1628 }
1629}
1630
1631fn build_statistics_expr(
1632 expr_builder: &mut PruningExpressionBuilder,
1633) -> Result<Arc<dyn PhysicalExpr>> {
1634 let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1635 Operator::NotEq => {
1636 let min_column_expr = expr_builder.min_column_expr()?;
1640 let max_column_expr = expr_builder.max_column_expr()?;
1641 Arc::new(phys_expr::BinaryExpr::new(
1642 Arc::new(phys_expr::BinaryExpr::new(
1643 min_column_expr,
1644 Operator::NotEq,
1645 Arc::clone(expr_builder.scalar_expr()),
1646 )),
1647 Operator::Or,
1648 Arc::new(phys_expr::BinaryExpr::new(
1649 Arc::clone(expr_builder.scalar_expr()),
1650 Operator::NotEq,
1651 max_column_expr,
1652 )),
1653 ))
1654 }
1655 Operator::Eq => {
1656 let min_column_expr = expr_builder.min_column_expr()?;
1659 let max_column_expr = expr_builder.max_column_expr()?;
1660 Arc::new(phys_expr::BinaryExpr::new(
1661 Arc::new(phys_expr::BinaryExpr::new(
1662 min_column_expr,
1663 Operator::LtEq,
1664 Arc::clone(expr_builder.scalar_expr()),
1665 )),
1666 Operator::And,
1667 Arc::new(phys_expr::BinaryExpr::new(
1668 Arc::clone(expr_builder.scalar_expr()),
1669 Operator::LtEq,
1670 max_column_expr,
1671 )),
1672 ))
1673 }
1674 Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1675 Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1676 plan_datafusion_err!(
1677 "LIKE expression with wildcard at the beginning is not supported"
1678 )
1679 })?,
1680 Operator::Gt => {
1681 Arc::new(phys_expr::BinaryExpr::new(
1683 expr_builder.max_column_expr()?,
1684 Operator::Gt,
1685 Arc::clone(expr_builder.scalar_expr()),
1686 ))
1687 }
1688 Operator::GtEq => {
1689 Arc::new(phys_expr::BinaryExpr::new(
1691 expr_builder.max_column_expr()?,
1692 Operator::GtEq,
1693 Arc::clone(expr_builder.scalar_expr()),
1694 ))
1695 }
1696 Operator::Lt => {
1697 Arc::new(phys_expr::BinaryExpr::new(
1699 expr_builder.min_column_expr()?,
1700 Operator::Lt,
1701 Arc::clone(expr_builder.scalar_expr()),
1702 ))
1703 }
1704 Operator::LtEq => {
1705 Arc::new(phys_expr::BinaryExpr::new(
1707 expr_builder.min_column_expr()?,
1708 Operator::LtEq,
1709 Arc::clone(expr_builder.scalar_expr()),
1710 ))
1711 }
1712 _ => {
1714 return plan_err!(
1715 "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1716 );
1717 }
1718 };
1719 let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1720 Ok(statistics_expr)
1721}
1722
1723fn unpack_string(s: &ScalarValue) -> Option<&str> {
1725 s.try_as_str().flatten()
1726}
1727
1728fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1729 if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1730 let s = unpack_string(lit.value())?;
1731 return Some(s);
1732 }
1733 None
1734}
1735
1736fn build_like_match(
1740 expr_builder: &mut PruningExpressionBuilder,
1741) -> Option<Arc<dyn PhysicalExpr>> {
1742 let min_column_expr = expr_builder.min_column_expr().ok()?;
1751 let max_column_expr = expr_builder.max_column_expr().ok()?;
1752 let scalar_expr = expr_builder.scalar_expr();
1753 let s = extract_string_literal(scalar_expr)?;
1755 let first_wildcard_index = s.find(['%', '_']);
1757 if first_wildcard_index == Some(0) {
1758 return None;
1760 }
1761 let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1762 let prefix = &s[..wildcard_index];
1763 let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1764 prefix.to_string(),
1765 ))));
1766 let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1767 increment_utf8(prefix)?,
1768 ))));
1769 (lower_bound_lit, upper_bound_lit)
1770 } else {
1771 let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1773 s.to_string(),
1774 ))));
1775 (Arc::clone(&bound), bound)
1776 };
1777 let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1778 lower_bound,
1779 Operator::LtEq,
1780 Arc::clone(&max_column_expr),
1781 ));
1782 let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1783 Arc::clone(&min_column_expr),
1784 Operator::LtEq,
1785 upper_bound,
1786 ));
1787 let combined = Arc::new(phys_expr::BinaryExpr::new(
1788 upper_bound_expr,
1789 Operator::And,
1790 lower_bound_expr,
1791 ));
1792 Some(combined)
1793}
1794
1795fn build_not_like_match(
1801 expr_builder: &mut PruningExpressionBuilder<'_>,
1802) -> Result<Arc<dyn PhysicalExpr>> {
1803 let min_column_expr = expr_builder.min_column_expr()?;
1806 let max_column_expr = expr_builder.max_column_expr()?;
1807
1808 let scalar_expr = expr_builder.scalar_expr();
1809
1810 let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1811 plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1812 })?;
1813
1814 let (const_prefix, remaining) = split_constant_prefix(pattern);
1815 if const_prefix.is_empty() || remaining != "%" {
1816 return Err(plan_datafusion_err!(
1828 "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1829 ));
1830 }
1831
1832 let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1833 true,
1834 false,
1835 Arc::clone(&min_column_expr),
1836 Arc::clone(scalar_expr),
1837 ));
1838
1839 let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1840 true,
1841 false,
1842 Arc::clone(&max_column_expr),
1843 Arc::clone(scalar_expr),
1844 ));
1845
1846 Ok(Arc::new(phys_expr::BinaryExpr::new(
1847 min_col_not_like_epxr,
1848 Operator::Or,
1849 max_col_not_like_expr,
1850 )))
1851}
1852
1853fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1855 let char_indices = pattern.char_indices().collect::<Vec<_>>();
1856 for i in 0..char_indices.len() {
1857 let (idx, char) = char_indices[i];
1858 if char == '%' || char == '_' {
1859 if i != 0 && char_indices[i - 1].1 == '\\' {
1860 continue;
1862 }
1863 return (&pattern[..idx], &pattern[idx..]);
1864 }
1865 }
1866 (pattern, "")
1867}
1868
1869fn increment_utf8(data: &str) -> Option<String> {
1877 fn is_valid_unicode(c: char) -> bool {
1879 let cp = c as u32;
1880
1881 if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1883 return false;
1884 }
1885
1886 if cp >= 0x110000 {
1888 return false;
1889 }
1890
1891 true
1892 }
1893
1894 let mut code_points: Vec<char> = data.chars().collect();
1896
1897 for idx in (0..code_points.len()).rev() {
1899 let original = code_points[idx] as u32;
1900
1901 if let Some(next_char) = char::from_u32(original + 1)
1903 && is_valid_unicode(next_char)
1904 {
1905 code_points[idx] = next_char;
1906 code_points.truncate(idx + 1);
1908 return Some(code_points.into_iter().collect());
1909 }
1910 }
1911
1912 None
1913}
1914
1915fn wrap_null_count_check_expr(
1936 statistics_expr: Arc<dyn PhysicalExpr>,
1937 expr_builder: &mut PruningExpressionBuilder,
1938) -> Result<Arc<dyn PhysicalExpr>> {
1939 let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1941 expr_builder.null_count_column_expr()?,
1942 Operator::NotEq,
1943 expr_builder.row_count_column_expr()?,
1944 ));
1945
1946 Ok(Arc::new(phys_expr::BinaryExpr::new(
1948 not_when_null_count_eq_row_count,
1949 Operator::And,
1950 statistics_expr,
1951 )))
1952}
1953
1954#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1955pub(crate) enum StatisticsType {
1956 Min,
1957 Max,
1958 NullCount,
1959 RowCount,
1960}
1961
1962#[cfg(test)]
1963mod tests {
1964 use std::collections::HashMap;
1965 use std::ops::{Not, Rem};
1966
1967 use super::*;
1968 use datafusion_common::test_util::batches_to_string;
1969 use datafusion_expr::{and, col, lit, or};
1970 use datafusion_physical_expr::utils::collect_columns;
1971 use insta::assert_snapshot;
1972
1973 use arrow::array::Decimal128Array;
1974 use arrow::{
1975 array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1976 datatypes::TimeUnit,
1977 };
1978 use datafusion_expr::expr::InList;
1979 use datafusion_expr::{Expr, cast, is_null, try_cast};
1980 use datafusion_functions_nested::expr_fn::{array_has, make_array};
1981 use datafusion_physical_expr::expressions::{
1982 self as phys_expr, DynamicFilterPhysicalExpr,
1983 };
1984 use datafusion_physical_expr::planner::logical2physical;
1985 use itertools::Itertools;
1986
1987 #[derive(Debug, Default)]
1988 struct ContainerStats {
1996 min: Option<ArrayRef>,
1997 max: Option<ArrayRef>,
1998 null_counts: Option<ArrayRef>,
2000 row_counts: Option<ArrayRef>,
2001 contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
2005 }
2006
2007 impl ContainerStats {
2008 fn new() -> Self {
2009 Default::default()
2010 }
2011 fn new_decimal128(
2012 min: impl IntoIterator<Item = Option<i128>>,
2013 max: impl IntoIterator<Item = Option<i128>>,
2014 precision: u8,
2015 scale: i8,
2016 ) -> Self {
2017 Self::new()
2018 .with_min(Arc::new(
2019 min.into_iter()
2020 .collect::<Decimal128Array>()
2021 .with_precision_and_scale(precision, scale)
2022 .unwrap(),
2023 ))
2024 .with_max(Arc::new(
2025 max.into_iter()
2026 .collect::<Decimal128Array>()
2027 .with_precision_and_scale(precision, scale)
2028 .unwrap(),
2029 ))
2030 }
2031
2032 fn new_i64(
2033 min: impl IntoIterator<Item = Option<i64>>,
2034 max: impl IntoIterator<Item = Option<i64>>,
2035 ) -> Self {
2036 Self::new()
2037 .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
2038 .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
2039 }
2040
2041 fn new_i32(
2042 min: impl IntoIterator<Item = Option<i32>>,
2043 max: impl IntoIterator<Item = Option<i32>>,
2044 ) -> Self {
2045 Self::new()
2046 .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
2047 .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
2048 }
2049
2050 fn new_utf8<'a>(
2051 min: impl IntoIterator<Item = Option<&'a str>>,
2052 max: impl IntoIterator<Item = Option<&'a str>>,
2053 ) -> Self {
2054 Self::new()
2055 .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
2056 .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
2057 }
2058
2059 fn new_bool(
2060 min: impl IntoIterator<Item = Option<bool>>,
2061 max: impl IntoIterator<Item = Option<bool>>,
2062 ) -> Self {
2063 Self::new()
2064 .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
2065 .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
2066 }
2067
2068 fn min(&self) -> Option<ArrayRef> {
2069 self.min.clone()
2070 }
2071
2072 fn max(&self) -> Option<ArrayRef> {
2073 self.max.clone()
2074 }
2075
2076 fn null_counts(&self) -> Option<ArrayRef> {
2077 self.null_counts.clone()
2078 }
2079
2080 fn row_counts(&self) -> Option<ArrayRef> {
2081 self.row_counts.clone()
2082 }
2083
2084 fn arrays(&self) -> Vec<ArrayRef> {
2086 let contained_arrays = self
2087 .contained
2088 .iter()
2089 .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2090
2091 [
2092 self.min.as_ref().cloned(),
2093 self.max.as_ref().cloned(),
2094 self.null_counts.as_ref().cloned(),
2095 self.row_counts.as_ref().cloned(),
2096 ]
2097 .into_iter()
2098 .flatten()
2099 .chain(contained_arrays)
2100 .collect()
2101 }
2102
2103 fn len(&self) -> usize {
2107 self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2109 }
2110
2111 fn assert_invariants(&self) {
2113 let mut prev_len = None;
2114
2115 for len in self.arrays().iter().map(|a| a.len()) {
2116 match prev_len {
2118 None => {
2119 prev_len = Some(len);
2120 }
2121 Some(prev_len) => {
2122 assert_eq!(prev_len, len);
2123 }
2124 }
2125 }
2126 }
2127
2128 fn with_min(mut self, min: ArrayRef) -> Self {
2130 self.min = Some(min);
2131 self
2132 }
2133
2134 fn with_max(mut self, max: ArrayRef) -> Self {
2136 self.max = Some(max);
2137 self
2138 }
2139
2140 fn with_null_counts(
2143 mut self,
2144 counts: impl IntoIterator<Item = Option<u64>>,
2145 ) -> Self {
2146 let null_counts: ArrayRef =
2147 Arc::new(counts.into_iter().collect::<UInt64Array>());
2148
2149 self.assert_invariants();
2150 self.null_counts = Some(null_counts);
2151 self
2152 }
2153
2154 fn with_row_counts(
2157 mut self,
2158 counts: impl IntoIterator<Item = Option<u64>>,
2159 ) -> Self {
2160 let row_counts: ArrayRef =
2161 Arc::new(counts.into_iter().collect::<UInt64Array>());
2162
2163 self.assert_invariants();
2164 self.row_counts = Some(row_counts);
2165 self
2166 }
2167
2168 pub fn with_contained(
2170 mut self,
2171 values: impl IntoIterator<Item = ScalarValue>,
2172 contained: impl IntoIterator<Item = Option<bool>>,
2173 ) -> Self {
2174 let contained: BooleanArray = contained.into_iter().collect();
2175 let values: HashSet<_> = values.into_iter().collect();
2176
2177 self.contained.push((values, contained));
2178 self.assert_invariants();
2179 self
2180 }
2181
2182 fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2184 self.contained
2186 .iter()
2187 .find(|(values, _contained)| values == find_values)
2188 .map(|(_values, contained)| contained.clone())
2189 }
2190 }
2191
2192 #[derive(Debug, Default)]
2193 struct TestStatistics {
2194 stats: HashMap<Column, ContainerStats>,
2196 }
2197
2198 impl TestStatistics {
2199 fn new() -> Self {
2200 Self::default()
2201 }
2202
2203 fn with(
2204 mut self,
2205 name: impl Into<String>,
2206 container_stats: ContainerStats,
2207 ) -> Self {
2208 let col = Column::from_name(name.into());
2209 self.stats.insert(col, container_stats);
2210 self
2211 }
2212
2213 fn with_null_counts(
2217 mut self,
2218 name: impl Into<String>,
2219 counts: impl IntoIterator<Item = Option<u64>>,
2220 ) -> Self {
2221 let col = Column::from_name(name.into());
2222
2223 let container_stats = self
2225 .stats
2226 .remove(&col)
2227 .unwrap_or_default()
2228 .with_null_counts(counts);
2229
2230 self.stats.insert(col, container_stats);
2232 self
2233 }
2234
2235 fn with_row_counts(
2239 mut self,
2240 name: impl Into<String>,
2241 counts: impl IntoIterator<Item = Option<u64>>,
2242 ) -> Self {
2243 let col = Column::from_name(name.into());
2244
2245 let container_stats = self
2247 .stats
2248 .remove(&col)
2249 .unwrap_or_default()
2250 .with_row_counts(counts);
2251
2252 self.stats.insert(col, container_stats);
2254 self
2255 }
2256
2257 fn with_contained(
2259 mut self,
2260 name: impl Into<String>,
2261 values: impl IntoIterator<Item = ScalarValue>,
2262 contained: impl IntoIterator<Item = Option<bool>>,
2263 ) -> Self {
2264 let col = Column::from_name(name.into());
2265
2266 let container_stats = self
2268 .stats
2269 .remove(&col)
2270 .unwrap_or_default()
2271 .with_contained(values, contained);
2272
2273 self.stats.insert(col, container_stats);
2275 self
2276 }
2277 }
2278
2279 impl PruningStatistics for TestStatistics {
2280 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2281 self.stats
2282 .get(column)
2283 .map(|container_stats| container_stats.min())
2284 .unwrap_or(None)
2285 }
2286
2287 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2288 self.stats
2289 .get(column)
2290 .map(|container_stats| container_stats.max())
2291 .unwrap_or(None)
2292 }
2293
2294 fn num_containers(&self) -> usize {
2295 self.stats
2296 .values()
2297 .next()
2298 .map(|container_stats| container_stats.len())
2299 .unwrap_or(0)
2300 }
2301
2302 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2303 self.stats
2304 .get(column)
2305 .map(|container_stats| container_stats.null_counts())
2306 .unwrap_or(None)
2307 }
2308
2309 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2310 self.stats
2311 .get(column)
2312 .map(|container_stats| container_stats.row_counts())
2313 .unwrap_or(None)
2314 }
2315
2316 fn contained(
2317 &self,
2318 column: &Column,
2319 values: &HashSet<ScalarValue>,
2320 ) -> Option<BooleanArray> {
2321 self.stats
2322 .get(column)
2323 .and_then(|container_stats| container_stats.contained(values))
2324 }
2325 }
2326
2327 struct OneContainerStats {
2329 min_values: Option<ArrayRef>,
2330 max_values: Option<ArrayRef>,
2331 num_containers: usize,
2332 }
2333
2334 impl PruningStatistics for OneContainerStats {
2335 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2336 self.min_values.clone()
2337 }
2338
2339 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2340 self.max_values.clone()
2341 }
2342
2343 fn num_containers(&self) -> usize {
2344 self.num_containers
2345 }
2346
2347 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2348 None
2349 }
2350
2351 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2352 None
2353 }
2354
2355 fn contained(
2356 &self,
2357 _column: &Column,
2358 _values: &HashSet<ScalarValue>,
2359 ) -> Option<BooleanArray> {
2360 None
2361 }
2362 }
2363
2364 #[test]
2367 fn test_unique_row_count_field_and_column() {
2368 let schema: SchemaRef = Arc::new(Schema::new(vec![
2370 Field::new("c1", DataType::Int32, true),
2371 Field::new("c2", DataType::Int32, true),
2372 ]));
2373 let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2374 let expr = logical2physical(&expr, &schema);
2375 let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2376 assert_eq!(
2378 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2379 p.predicate_expr.to_string()
2380 );
2381
2382 let mut fields = HashSet::new();
2385 for (_col, _ty, field) in p.required_columns().iter() {
2386 let was_new = fields.insert(field);
2387 if !was_new {
2388 panic!(
2389 "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2390 );
2391 }
2392 }
2393 }
2394
2395 #[test]
2396 fn prune_all_rows_null_counts() {
2397 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2400 let statistics = TestStatistics::new().with(
2401 "i",
2402 ContainerStats::new_i32(
2403 vec![Some(0)], vec![Some(0)], )
2406 .with_null_counts(vec![Some(1)])
2407 .with_row_counts(vec![Some(1)]),
2408 );
2409 let expected_ret = &[false];
2410 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2411
2412 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2414 let container_stats = ContainerStats {
2415 min: Some(Arc::new(Int32Array::from(vec![None]))),
2416 max: Some(Arc::new(Int32Array::from(vec![None]))),
2417 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2418 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2419 ..ContainerStats::default()
2420 };
2421 let statistics = TestStatistics::new().with("i", container_stats);
2422 let expected_ret = &[false];
2423 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2424
2425 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2427 let container_stats = ContainerStats {
2428 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2429 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2430 null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2431 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2432 ..ContainerStats::default()
2433 };
2434 let statistics = TestStatistics::new().with("i", container_stats);
2435 let expected_ret = &[true];
2436 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2437 let expected_ret = &[false];
2438 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2439
2440 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2442 let container_stats = ContainerStats {
2443 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2444 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2445 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2446 row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2447 ..ContainerStats::default()
2448 };
2449 let statistics = TestStatistics::new().with("i", container_stats);
2450 let expected_ret = &[true];
2451 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2452 let expected_ret = &[false];
2453 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2454 }
2455
2456 #[test]
2457 fn prune_missing_statistics() {
2458 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2461 let container_stats = ContainerStats {
2462 min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2463 max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2464 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2465 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2466 ..ContainerStats::default()
2467 };
2468 let statistics = TestStatistics::new().with("i", container_stats);
2469 let expected_ret = &[true, true];
2470 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2471 let expected_ret = &[false, true];
2472 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2473 let expected_ret = &[true, false];
2474 prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2475 }
2476
2477 #[test]
2478 fn prune_null_stats() {
2479 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2482
2483 let statistics = TestStatistics::new().with(
2484 "i",
2485 ContainerStats::new_i32(
2486 vec![Some(0)], vec![Some(0)], )
2489 .with_null_counts(vec![Some(1)])
2490 .with_row_counts(vec![Some(1)]),
2491 );
2492
2493 let expected_ret = &[false];
2494
2495 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2497 }
2498
2499 #[test]
2500 fn test_build_statistics_record_batch() {
2501 let required_columns = RequiredColumns::from(vec![
2503 (
2505 phys_expr::Column::new("s1", 1),
2506 StatisticsType::Min,
2507 Field::new("s1_min", DataType::Int32, true),
2508 ),
2509 (
2511 phys_expr::Column::new("s2", 2),
2512 StatisticsType::Max,
2513 Field::new("s2_max", DataType::Int32, true),
2514 ),
2515 (
2517 phys_expr::Column::new("s3", 3),
2518 StatisticsType::Max,
2519 Field::new("s3_max", DataType::Utf8, true),
2520 ),
2521 (
2523 phys_expr::Column::new("s3", 3),
2524 StatisticsType::Min,
2525 Field::new("s3_min", DataType::Utf8, true),
2526 ),
2527 ]);
2528
2529 let statistics = TestStatistics::new()
2530 .with(
2531 "s1",
2532 ContainerStats::new_i32(
2533 vec![None, None, Some(9), None], vec![Some(10), None, None, None], ),
2536 )
2537 .with(
2538 "s2",
2539 ContainerStats::new_i32(
2540 vec![Some(2), None, None, None], vec![Some(20), None, None, None], ),
2543 )
2544 .with(
2545 "s3",
2546 ContainerStats::new_utf8(
2547 vec![Some("a"), None, None, None], vec![Some("q"), None, Some("r"), None], ),
2550 );
2551
2552 let batch =
2553 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2554 assert_snapshot!(batches_to_string(&[batch]), @r"
2555 +--------+--------+--------+--------+
2556 | s1_min | s2_max | s3_max | s3_min |
2557 +--------+--------+--------+--------+
2558 | | 20 | q | a |
2559 | | | | |
2560 | 9 | | r | |
2561 | | | | |
2562 +--------+--------+--------+--------+
2563 ");
2564 }
2565
2566 #[test]
2567 fn test_build_statistics_casting() {
2568 let required_columns = RequiredColumns::from(vec![(
2573 phys_expr::Column::new("s3", 3),
2574 StatisticsType::Min,
2575 Field::new(
2576 "s1_min",
2577 DataType::Timestamp(TimeUnit::Nanosecond, None),
2578 true,
2579 ),
2580 )]);
2581
2582 let statistics = OneContainerStats {
2584 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2585 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2586 num_containers: 1,
2587 };
2588
2589 let batch =
2590 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2591
2592 assert_snapshot!(batches_to_string(&[batch]), @r"
2593 +-------------------------------+
2594 | s1_min |
2595 +-------------------------------+
2596 | 1970-01-01T00:00:00.000000010 |
2597 +-------------------------------+
2598 ");
2599 }
2600
2601 #[test]
2602 fn test_build_statistics_no_required_stats() {
2603 let required_columns = RequiredColumns::new();
2604
2605 let statistics = OneContainerStats {
2606 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2607 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2608 num_containers: 1,
2609 };
2610
2611 let batch =
2612 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2613 assert_eq!(batch.num_rows(), 1); }
2615
2616 #[test]
2617 fn test_build_statistics_inconsistent_types() {
2618 let required_columns = RequiredColumns::from(vec![(
2622 phys_expr::Column::new("s3", 3),
2623 StatisticsType::Min,
2624 Field::new("s1_min", DataType::Utf8, true),
2625 )]);
2626
2627 let statistics = OneContainerStats {
2629 min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2630 max_values: None,
2631 num_containers: 1,
2632 };
2633
2634 let batch =
2635 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2636 assert_snapshot!(batches_to_string(&[batch]), @r"
2637 +--------+
2638 | s1_min |
2639 +--------+
2640 | |
2641 +--------+
2642 ");
2643 }
2644
2645 #[test]
2646 fn test_build_statistics_inconsistent_length() {
2647 let required_columns = RequiredColumns::from(vec![(
2649 phys_expr::Column::new("s1", 3),
2650 StatisticsType::Min,
2651 Field::new("s1_min", DataType::Int64, true),
2652 )]);
2653
2654 let statistics = OneContainerStats {
2656 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2657 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2658 num_containers: 3,
2659 };
2660
2661 let result =
2662 build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2663 assert!(
2664 result
2665 .to_string()
2666 .contains("mismatched statistics length. Expected 3, got 1"),
2667 "{}",
2668 result
2669 );
2670 }
2671
2672 #[test]
2673 fn row_group_predicate_eq() -> Result<()> {
2674 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2675 let expected_expr =
2676 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2677
2678 let expr = col("c1").eq(lit(1));
2680 let predicate_expr =
2681 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2682 assert_eq!(predicate_expr.to_string(), expected_expr);
2683
2684 let expr = lit(1).eq(col("c1"));
2686 let predicate_expr =
2687 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2688 assert_eq!(predicate_expr.to_string(), expected_expr);
2689
2690 Ok(())
2691 }
2692
2693 #[test]
2694 fn row_group_predicate_not_eq() -> Result<()> {
2695 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2696 let expected_expr =
2697 "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2698
2699 let expr = col("c1").not_eq(lit(1));
2701 let predicate_expr =
2702 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2703 assert_eq!(predicate_expr.to_string(), expected_expr);
2704
2705 let expr = lit(1).not_eq(col("c1"));
2707 let predicate_expr =
2708 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2709 assert_eq!(predicate_expr.to_string(), expected_expr);
2710
2711 Ok(())
2712 }
2713
2714 #[test]
2715 fn row_group_predicate_gt() -> Result<()> {
2716 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2717 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2718
2719 let expr = col("c1").gt(lit(1));
2721 let predicate_expr =
2722 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2723 assert_eq!(predicate_expr.to_string(), expected_expr);
2724
2725 let expr = lit(1).lt(col("c1"));
2727 let predicate_expr =
2728 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2729 assert_eq!(predicate_expr.to_string(), expected_expr);
2730
2731 Ok(())
2732 }
2733
2734 #[test]
2735 fn row_group_predicate_gt_eq() -> Result<()> {
2736 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2737 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2738
2739 let expr = col("c1").gt_eq(lit(1));
2741 let predicate_expr =
2742 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2743 assert_eq!(predicate_expr.to_string(), expected_expr);
2744 let expr = lit(1).lt_eq(col("c1"));
2746 let predicate_expr =
2747 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2748 assert_eq!(predicate_expr.to_string(), expected_expr);
2749
2750 Ok(())
2751 }
2752
2753 #[test]
2754 fn row_group_predicate_lt() -> Result<()> {
2755 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2756 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2757
2758 let expr = col("c1").lt(lit(1));
2760 let predicate_expr =
2761 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2762 assert_eq!(predicate_expr.to_string(), expected_expr);
2763
2764 let expr = lit(1).gt(col("c1"));
2766 let predicate_expr =
2767 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2768 assert_eq!(predicate_expr.to_string(), expected_expr);
2769
2770 Ok(())
2771 }
2772
2773 #[test]
2774 fn row_group_predicate_lt_eq() -> Result<()> {
2775 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2776 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2777
2778 let expr = col("c1").lt_eq(lit(1));
2780 let predicate_expr =
2781 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2782 assert_eq!(predicate_expr.to_string(), expected_expr);
2783 let expr = lit(1).gt_eq(col("c1"));
2785 let predicate_expr =
2786 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2787 assert_eq!(predicate_expr.to_string(), expected_expr);
2788
2789 Ok(())
2790 }
2791
2792 #[test]
2793 fn row_group_predicate_and() -> Result<()> {
2794 let schema = Schema::new(vec![
2795 Field::new("c1", DataType::Int32, false),
2796 Field::new("c2", DataType::Int32, false),
2797 Field::new("c3", DataType::Int32, false),
2798 ]);
2799 let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2801 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2802 let predicate_expr =
2803 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2804 assert_eq!(predicate_expr.to_string(), expected_expr);
2805
2806 Ok(())
2807 }
2808
2809 #[test]
2810 fn row_group_predicate_or() -> Result<()> {
2811 let schema = Schema::new(vec![
2812 Field::new("c1", DataType::Int32, false),
2813 Field::new("c2", DataType::Int32, false),
2814 ]);
2815 let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2817 let expected_expr = "true";
2818 let predicate_expr =
2819 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2820 assert_eq!(predicate_expr.to_string(), expected_expr);
2821
2822 Ok(())
2823 }
2824
2825 #[test]
2826 fn row_group_predicate_not() -> Result<()> {
2827 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2828 let expected_expr = "true";
2829
2830 let expr = col("c1").not();
2831 let predicate_expr =
2832 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2833 assert_eq!(predicate_expr.to_string(), expected_expr);
2834
2835 Ok(())
2836 }
2837
2838 #[test]
2839 fn row_group_predicate_not_bool() -> Result<()> {
2840 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2841 let expected_expr = "NOT c1_min@0 AND c1_max@1";
2842
2843 let expr = col("c1").not();
2844 let predicate_expr =
2845 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2846 assert_eq!(predicate_expr.to_string(), expected_expr);
2847
2848 Ok(())
2849 }
2850
2851 #[test]
2852 fn row_group_predicate_bool() -> Result<()> {
2853 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2854 let expected_expr = "c1_min@0 OR c1_max@1";
2855
2856 let expr = col("c1");
2857 let predicate_expr =
2858 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2859 assert_eq!(predicate_expr.to_string(), expected_expr);
2860
2861 Ok(())
2862 }
2863
2864 #[test]
2866 fn row_group_predicate_non_boolean() {
2867 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2868 let statistics = TestStatistics::new()
2869 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2870 let expected_ret = &[true];
2871 prune_with_expr(lit(1), &schema, &statistics, expected_ret);
2872 }
2873
2874 #[test]
2878 fn row_group_predicate_literal_false() {
2879 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2881 let statistics = TestStatistics::new()
2882 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2883 let expected_ret = &[false];
2884 prune_with_simplified_expr(lit(1).eq(lit(2)), &schema, &statistics, expected_ret);
2885 }
2886
2887 #[test]
2890 fn row_group_predicate_literal_true() {
2891 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2893 let statistics = TestStatistics::new()
2894 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2895 let expected_ret = &[true];
2896 prune_with_simplified_expr(lit(1).eq(lit(1)), &schema, &statistics, expected_ret);
2897 }
2898
2899 #[test]
2902 fn row_group_predicate_literal_null() {
2903 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2905 let statistics = TestStatistics::new()
2906 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2907 let expected_ret = &[true];
2908 prune_with_simplified_expr(
2909 lit(1).eq(lit(ScalarValue::Null)),
2910 &schema,
2911 &statistics,
2912 expected_ret,
2913 );
2914 }
2915
2916 #[test]
2919 fn row_group_predicate_complex_literals() {
2920 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2921 let statistics = TestStatistics::new()
2922 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2923
2924 prune_with_simplified_expr(
2926 (lit(1) + lit(2)).gt(lit(0)),
2927 &schema,
2928 &statistics,
2929 &[true],
2930 );
2931
2932 prune_with_simplified_expr(
2934 (lit(1) + lit(2)).lt(lit(0)),
2935 &schema,
2936 &statistics,
2937 &[false],
2938 );
2939
2940 prune_with_simplified_expr(
2942 lit(true).and(lit(false)),
2943 &schema,
2944 &statistics,
2945 &[false],
2946 );
2947
2948 prune_with_simplified_expr(
2950 lit(true).or(lit(false)),
2951 &schema,
2952 &statistics,
2953 &[true],
2954 );
2955
2956 prune_with_simplified_expr(
2958 lit(1).lt(lit(2)).and(lit(3).gt(lit(1))),
2959 &schema,
2960 &statistics,
2961 &[true],
2962 );
2963
2964 prune_with_simplified_expr(
2966 lit(1).gt(lit(2)).or(lit(3).lt(lit(1))),
2967 &schema,
2968 &statistics,
2969 &[false],
2970 );
2971 }
2972
2973 #[test]
2975 fn row_group_predicate_dynamic_filter_with_literals() {
2976 let schema = Arc::new(Schema::new(vec![
2977 Field::new("c1", DataType::Int32, true),
2978 Field::new("part", DataType::Utf8, true),
2979 ]));
2980 let statistics = TestStatistics::new()
2981 .with_row_counts("c1", vec![Some(10)]);
2983 let dynamic_filter_expr = col("c1").gt(lit(5)).and(col("part").eq(lit("B")));
2984 let phys_expr = logical2physical(&dynamic_filter_expr, &schema);
2985 let children = collect_columns(&phys_expr)
2986 .iter()
2987 .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
2988 .collect_vec();
2989 let dynamic_phys_expr =
2990 Arc::new(DynamicFilterPhysicalExpr::new(children, phys_expr))
2991 as Arc<dyn PhysicalExpr>;
2992 let remapped_expr = dynamic_phys_expr
2994 .children()
2995 .into_iter()
2996 .map(|child_expr| {
2997 let Some(col_expr) =
2998 child_expr.as_any().downcast_ref::<phys_expr::Column>()
2999 else {
3000 return Arc::clone(child_expr);
3001 };
3002 if col_expr.name() == "part" {
3003 Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
3005 "A".to_string(),
3006 )))) as Arc<dyn PhysicalExpr>
3007 } else {
3008 Arc::clone(child_expr)
3009 }
3010 })
3011 .collect_vec();
3012 let dynamic_filter_expr =
3013 dynamic_phys_expr.with_new_children(remapped_expr).unwrap();
3014 let expected = &[false];
3016 let p =
3017 PruningPredicate::try_new(dynamic_filter_expr, Arc::clone(&schema)).unwrap();
3018 let result = p.prune(&statistics).unwrap();
3019 assert_eq!(result, expected);
3020 }
3021
3022 #[test]
3023 fn row_group_predicate_lt_bool() -> Result<()> {
3024 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
3025 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
3026
3027 let expr = col("c1").lt(lit(true));
3030 let predicate_expr =
3031 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3032 assert_eq!(predicate_expr.to_string(), expected_expr);
3033
3034 Ok(())
3035 }
3036
3037 #[test]
3038 fn row_group_predicate_required_columns() -> Result<()> {
3039 let schema = Schema::new(vec![
3040 Field::new("c1", DataType::Int32, false),
3041 Field::new("c2", DataType::Int32, false),
3042 ]);
3043 let mut required_columns = RequiredColumns::new();
3044 let expr = col("c1")
3046 .lt(lit(1))
3047 .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
3048 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
3049 let predicate_expr =
3050 test_build_predicate_expression(&expr, &schema, &mut required_columns);
3051 assert_eq!(predicate_expr.to_string(), expected_expr);
3052 println!("required_columns: {required_columns:#?}"); let c1_min_field = Field::new("c1_min", DataType::Int32, false);
3055 assert_eq!(
3056 required_columns.columns[0],
3057 (
3058 phys_expr::Column::new("c1", 0),
3059 StatisticsType::Min,
3060 c1_min_field.with_nullable(true) )
3062 );
3063 let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
3065 assert_eq!(
3066 required_columns.columns[1],
3067 (
3068 phys_expr::Column::new("c1", 0),
3069 StatisticsType::NullCount,
3070 c1_null_count_field.with_nullable(true) )
3072 );
3073 let row_count_field = Field::new("row_count", DataType::UInt64, false);
3075 assert_eq!(
3076 required_columns.columns[2],
3077 (
3078 phys_expr::Column::new("c1", 0),
3079 StatisticsType::RowCount,
3080 row_count_field.with_nullable(true) )
3082 );
3083 let c2_min_field = Field::new("c2_min", DataType::Int32, false);
3085 assert_eq!(
3086 required_columns.columns[3],
3087 (
3088 phys_expr::Column::new("c2", 1),
3089 StatisticsType::Min,
3090 c2_min_field.with_nullable(true) )
3092 );
3093 let c2_max_field = Field::new("c2_max", DataType::Int32, false);
3094 assert_eq!(
3095 required_columns.columns[4],
3096 (
3097 phys_expr::Column::new("c2", 1),
3098 StatisticsType::Max,
3099 c2_max_field.with_nullable(true) )
3101 );
3102 let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
3104 assert_eq!(
3105 required_columns.columns[5],
3106 (
3107 phys_expr::Column::new("c2", 1),
3108 StatisticsType::NullCount,
3109 c2_null_count_field.with_nullable(true) )
3111 );
3112 let row_count_field = Field::new("row_count", DataType::UInt64, false);
3114 assert_eq!(
3115 required_columns.columns[2],
3116 (
3117 phys_expr::Column::new("c1", 0),
3118 StatisticsType::RowCount,
3119 row_count_field.with_nullable(true) )
3121 );
3122 assert_eq!(required_columns.columns.len(), 6);
3124
3125 Ok(())
3126 }
3127
3128 #[test]
3129 fn row_group_predicate_in_list() -> Result<()> {
3130 let schema = Schema::new(vec![
3131 Field::new("c1", DataType::Int32, false),
3132 Field::new("c2", DataType::Int32, false),
3133 ]);
3134 let expr = Expr::InList(InList::new(
3136 Box::new(col("c1")),
3137 vec![lit(1), lit(2), lit(3)],
3138 false,
3139 ));
3140 let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
3141 let predicate_expr =
3142 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3143 assert_eq!(predicate_expr.to_string(), expected_expr);
3144
3145 Ok(())
3146 }
3147
3148 #[test]
3149 fn row_group_predicate_in_list_empty() -> Result<()> {
3150 let schema = Schema::new(vec![
3151 Field::new("c1", DataType::Int32, false),
3152 Field::new("c2", DataType::Int32, false),
3153 ]);
3154 let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
3156 let expected_expr = "true";
3157 let predicate_expr =
3158 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3159 assert_eq!(predicate_expr.to_string(), expected_expr);
3160
3161 Ok(())
3162 }
3163
3164 #[test]
3165 fn row_group_predicate_in_list_negated() -> Result<()> {
3166 let schema = Schema::new(vec![
3167 Field::new("c1", DataType::Int32, false),
3168 Field::new("c2", DataType::Int32, false),
3169 ]);
3170 let expr = Expr::InList(InList::new(
3172 Box::new(col("c1")),
3173 vec![lit(1), lit(2), lit(3)],
3174 true,
3175 ));
3176 let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
3177 let predicate_expr =
3178 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3179 assert_eq!(predicate_expr.to_string(), expected_expr);
3180
3181 Ok(())
3182 }
3183
3184 #[test]
3185 fn row_group_predicate_between() -> Result<()> {
3186 let schema = Schema::new(vec![
3187 Field::new("c1", DataType::Int32, false),
3188 Field::new("c2", DataType::Int32, false),
3189 ]);
3190
3191 let expr1 = col("c1").between(lit(1), lit(5));
3193
3194 let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
3196
3197 let predicate_expr1 =
3198 test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
3199
3200 let predicate_expr2 =
3201 test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
3202 assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
3203
3204 Ok(())
3205 }
3206
3207 #[test]
3208 fn row_group_predicate_between_with_in_list() -> Result<()> {
3209 let schema = Schema::new(vec![
3210 Field::new("c1", DataType::Int32, false),
3211 Field::new("c2", DataType::Int32, false),
3212 ]);
3213 let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
3215
3216 let expr2 = col("c2").between(lit(4), lit(5));
3218
3219 let expr3 = expr1.and(expr2);
3221
3222 let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
3223 let predicate_expr =
3224 test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
3225 assert_eq!(predicate_expr.to_string(), expected_expr);
3226
3227 Ok(())
3228 }
3229
3230 #[test]
3231 fn row_group_predicate_in_list_to_many_values() -> Result<()> {
3232 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3233 let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
3237
3238 let expected_expr = "true";
3239 let predicate_expr =
3240 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3241 assert_eq!(predicate_expr.to_string(), expected_expr);
3242
3243 Ok(())
3244 }
3245
3246 #[test]
3247 fn row_group_predicate_cast_int_int() -> Result<()> {
3248 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3249 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3250
3251 let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3254 let predicate_expr =
3255 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3256 assert_eq!(predicate_expr.to_string(), expected_expr);
3257
3258 let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3260 let predicate_expr =
3261 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3262 assert_eq!(predicate_expr.to_string(), expected_expr);
3263
3264 let expected_expr =
3265 "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3266
3267 let expr =
3269 try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3270 let predicate_expr =
3271 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3272 assert_eq!(predicate_expr.to_string(), expected_expr);
3273
3274 let expr =
3276 lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3277 let predicate_expr =
3278 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3279 assert_eq!(predicate_expr.to_string(), expected_expr);
3280
3281 Ok(())
3282 }
3283
3284 #[test]
3285 fn row_group_predicate_cast_string_string() -> Result<()> {
3286 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3287 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3288
3289 let expr = cast(col("c1"), DataType::Utf8)
3291 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3292 let predicate_expr =
3293 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3294 assert_eq!(predicate_expr.to_string(), expected_expr);
3295
3296 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3298 .eq(cast(col("c1"), DataType::Utf8));
3299 let predicate_expr =
3300 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3301 assert_eq!(predicate_expr.to_string(), expected_expr);
3302
3303 Ok(())
3304 }
3305
3306 #[test]
3307 fn row_group_predicate_cast_string_int() -> Result<()> {
3308 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3309 let expected_expr = "true";
3310
3311 let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3313 let predicate_expr =
3314 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3315 assert_eq!(predicate_expr.to_string(), expected_expr);
3316
3317 let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3319 let predicate_expr =
3320 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3321 assert_eq!(predicate_expr.to_string(), expected_expr);
3322
3323 Ok(())
3324 }
3325
3326 #[test]
3327 fn row_group_predicate_cast_int_string() -> Result<()> {
3328 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3329 let expected_expr = "true";
3330
3331 let expr = cast(col("c1"), DataType::Utf8)
3333 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3334 let predicate_expr =
3335 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3336 assert_eq!(predicate_expr.to_string(), expected_expr);
3337
3338 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3340 .eq(cast(col("c1"), DataType::Utf8));
3341 let predicate_expr =
3342 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3343 assert_eq!(predicate_expr.to_string(), expected_expr);
3344
3345 Ok(())
3346 }
3347
3348 #[test]
3349 fn row_group_predicate_date_date() -> Result<()> {
3350 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3351 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3352
3353 let expr =
3355 cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3356 let predicate_expr =
3357 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3358 assert_eq!(predicate_expr.to_string(), expected_expr);
3359
3360 let expr =
3362 lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3363 let predicate_expr =
3364 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3365 assert_eq!(predicate_expr.to_string(), expected_expr);
3366
3367 Ok(())
3368 }
3369
3370 #[test]
3371 fn row_group_predicate_dict_string_date() -> Result<()> {
3372 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3374 let expected_expr = "true";
3375
3376 let expr = cast(
3378 col("c1"),
3379 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3380 )
3381 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3382 let predicate_expr =
3383 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3384 assert_eq!(predicate_expr.to_string(), expected_expr);
3385
3386 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3388 col("c1"),
3389 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3390 ));
3391 let predicate_expr =
3392 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3393 assert_eq!(predicate_expr.to_string(), expected_expr);
3394
3395 Ok(())
3396 }
3397
3398 #[test]
3399 fn row_group_predicate_date_dict_string() -> Result<()> {
3400 let schema = Schema::new(vec![Field::new(
3402 "c1",
3403 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3404 false,
3405 )]);
3406 let expected_expr = "true";
3407
3408 let expr =
3410 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3411 let predicate_expr =
3412 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3413 assert_eq!(predicate_expr.to_string(), expected_expr);
3414
3415 let expr =
3417 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3418 let predicate_expr =
3419 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3420 assert_eq!(predicate_expr.to_string(), expected_expr);
3421
3422 Ok(())
3423 }
3424
3425 #[test]
3426 fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3427 let schema = Schema::new(vec![Field::new(
3429 "c1",
3430 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3431 false,
3432 )]);
3433
3434 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3436 let predicate_expr =
3437 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3438 let expected_expr =
3439 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3440 assert_eq!(predicate_expr.to_string(), expected_expr);
3441
3442 let expr = cast(
3444 col("c1"),
3445 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3446 )
3447 .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3448 let predicate_expr =
3449 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3450 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3451 assert_eq!(predicate_expr.to_string(), expected_expr);
3452
3453 Ok(())
3454 }
3455
3456 #[test]
3457 fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3458 let schema = Schema::new(vec![Field::new(
3460 "c1",
3461 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3462 false,
3463 )]);
3464 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3465
3466 let expr =
3468 cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3469 let predicate_expr =
3470 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3471 assert_eq!(predicate_expr.to_string(), expected_expr);
3472
3473 Ok(())
3474 }
3475
3476 #[test]
3477 fn row_group_predicate_nested_dict() -> Result<()> {
3478 let schema = Schema::new(vec![Field::new(
3480 "c1",
3481 DataType::Dictionary(
3482 Box::new(DataType::UInt8),
3483 Box::new(DataType::Dictionary(
3484 Box::new(DataType::UInt16),
3485 Box::new(DataType::Utf8),
3486 )),
3487 ),
3488 false,
3489 )]);
3490 let expected_expr =
3491 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3492
3493 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3495 let predicate_expr =
3496 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3497 assert_eq!(predicate_expr.to_string(), expected_expr);
3498
3499 Ok(())
3500 }
3501
3502 #[test]
3503 fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3504 let schema = Schema::new(vec![Field::new(
3506 "c1",
3507 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3508 false,
3509 )]);
3510 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3511
3512 let expr = cast(
3514 col("c1"),
3515 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3516 )
3517 .eq(lit(ScalarValue::Date64(Some(123))));
3518 let predicate_expr =
3519 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3520 assert_eq!(predicate_expr.to_string(), expected_expr);
3521
3522 Ok(())
3523 }
3524
3525 #[test]
3526 fn row_group_predicate_date_string() -> Result<()> {
3527 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3528 let expected_expr = "true";
3529
3530 let expr =
3532 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3533 let predicate_expr =
3534 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3535 assert_eq!(predicate_expr.to_string(), expected_expr);
3536
3537 let expr =
3539 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3540 let predicate_expr =
3541 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3542 assert_eq!(predicate_expr.to_string(), expected_expr);
3543
3544 Ok(())
3545 }
3546
3547 #[test]
3548 fn row_group_predicate_string_date() -> Result<()> {
3549 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3550 let expected_expr = "true";
3551
3552 let expr = cast(col("c1"), DataType::Utf8)
3554 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3555 let predicate_expr =
3556 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3557 assert_eq!(predicate_expr.to_string(), expected_expr);
3558
3559 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3561 .eq(cast(col("c1"), DataType::Utf8));
3562 let predicate_expr =
3563 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3564 assert_eq!(predicate_expr.to_string(), expected_expr);
3565
3566 Ok(())
3567 }
3568
3569 #[test]
3570 fn row_group_predicate_cast_list() -> Result<()> {
3571 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3572 let expr = Expr::InList(InList::new(
3574 Box::new(cast(col("c1"), DataType::Int64)),
3575 vec![
3576 lit(ScalarValue::Int64(Some(1))),
3577 lit(ScalarValue::Int64(Some(2))),
3578 lit(ScalarValue::Int64(Some(3))),
3579 ],
3580 false,
3581 ));
3582 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3583 let predicate_expr =
3584 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3585 assert_eq!(predicate_expr.to_string(), expected_expr);
3586
3587 let expr = Expr::InList(InList::new(
3588 Box::new(cast(col("c1"), DataType::Int64)),
3589 vec![
3590 lit(ScalarValue::Int64(Some(1))),
3591 lit(ScalarValue::Int64(Some(2))),
3592 lit(ScalarValue::Int64(Some(3))),
3593 ],
3594 true,
3595 ));
3596 let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3597 let predicate_expr =
3598 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3599 assert_eq!(predicate_expr.to_string(), expected_expr);
3600
3601 Ok(())
3602 }
3603
3604 #[test]
3605 fn prune_decimal_data() {
3606 let schema = Arc::new(Schema::new(vec![Field::new(
3608 "s1",
3609 DataType::Decimal128(9, 2),
3610 true,
3611 )]));
3612
3613 prune_with_expr(
3614 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3616 &schema,
3617 &TestStatistics::new().with(
3620 "s1",
3621 ContainerStats::new_i32(
3622 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3625 ),
3626 &[false, true, false, true],
3627 );
3628
3629 prune_with_expr(
3630 cast(col("s1"), DataType::Decimal128(14, 3))
3632 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3633 &schema,
3634 &TestStatistics::new().with(
3635 "s1",
3636 ContainerStats::new_i32(
3637 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3640 ),
3641 &[false, true, false, true],
3642 );
3643
3644 prune_with_expr(
3645 try_cast(col("s1"), DataType::Decimal128(14, 3))
3647 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3648 &schema,
3649 &TestStatistics::new().with(
3650 "s1",
3651 ContainerStats::new_i32(
3652 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3655 ),
3656 &[false, true, false, true],
3657 );
3658
3659 let schema = Arc::new(Schema::new(vec![Field::new(
3661 "s1",
3662 DataType::Decimal128(18, 2),
3663 true,
3664 )]));
3665 prune_with_expr(
3666 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3668 &schema,
3669 &TestStatistics::new().with(
3672 "s1",
3673 ContainerStats::new_i64(
3674 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3677 ),
3678 &[false, true, false, true],
3679 );
3680
3681 let schema = Arc::new(Schema::new(vec![Field::new(
3683 "s1",
3684 DataType::Decimal128(23, 2),
3685 true,
3686 )]));
3687
3688 prune_with_expr(
3689 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3691 &schema,
3692 &TestStatistics::new().with(
3693 "s1",
3694 ContainerStats::new_decimal128(
3695 vec![Some(0), Some(400), None, Some(300)], vec![Some(500), Some(600), Some(400), None], 23,
3698 2,
3699 ),
3700 ),
3701 &[false, true, false, true],
3702 );
3703 }
3704
3705 #[test]
3706 fn prune_api() {
3707 let schema = Arc::new(Schema::new(vec![
3708 Field::new("s1", DataType::Utf8, true),
3709 Field::new("s2", DataType::Int32, true),
3710 ]));
3711
3712 let statistics = TestStatistics::new().with(
3713 "s2",
3714 ContainerStats::new_i32(
3715 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), None, None], ),
3718 );
3719 prune_with_expr(
3720 col("s2").gt(lit(5)),
3722 &schema,
3723 &statistics,
3724 &[false, true, true, true],
3729 );
3730
3731 prune_with_expr(
3732 cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3734 &schema,
3735 &statistics,
3736 &[false, true, true, true],
3737 );
3738 }
3739
3740 #[test]
3741 fn prune_not_eq_data() {
3742 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3743
3744 prune_with_expr(
3745 col("s1").not_eq(lit("M")),
3747 &schema,
3748 &TestStatistics::new().with(
3749 "s1",
3750 ContainerStats::new_utf8(
3751 vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], ),
3754 ),
3755 &[true, true, true, false, true, true],
3762 );
3763 }
3764
3765 fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3781 let schema =
3782 Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3783
3784 let statistics = TestStatistics::new().with(
3785 "b1",
3786 ContainerStats::new_bool(
3787 vec![Some(false), Some(false), Some(true), None, Some(false)], vec![Some(false), Some(true), Some(true), None, None], ),
3790 );
3791 let expected_true = vec![false, true, true, true, true];
3792 let expected_false = vec![true, true, false, true, true];
3793
3794 (schema, statistics, expected_true, expected_false)
3795 }
3796
3797 #[test]
3798 fn prune_bool_const_expr() {
3799 let (schema, statistics, _, _) = bool_setup();
3800
3801 prune_with_expr(
3802 lit(true),
3804 &schema,
3805 &statistics,
3806 &[true, true, true, true, true],
3807 );
3808
3809 prune_with_expr(
3810 lit(false),
3812 &schema,
3813 &statistics,
3814 &[false, false, false, false, false],
3815 );
3816 }
3817
3818 #[test]
3819 fn prune_bool_column() {
3820 let (schema, statistics, expected_true, _) = bool_setup();
3821
3822 prune_with_expr(
3823 col("b1"),
3825 &schema,
3826 &statistics,
3827 &expected_true,
3828 );
3829 }
3830
3831 #[test]
3832 fn prune_bool_not_column() {
3833 let (schema, statistics, _, expected_false) = bool_setup();
3834
3835 prune_with_expr(
3836 col("b1").not(),
3838 &schema,
3839 &statistics,
3840 &expected_false,
3841 );
3842 }
3843
3844 #[test]
3845 fn prune_bool_column_eq_true() {
3846 let (schema, statistics, expected_true, _) = bool_setup();
3847
3848 prune_with_expr(
3849 col("b1").eq(lit(true)),
3851 &schema,
3852 &statistics,
3853 &expected_true,
3854 );
3855 }
3856
3857 #[test]
3858 fn prune_bool_not_column_eq_true() {
3859 let (schema, statistics, _, expected_false) = bool_setup();
3860
3861 prune_with_expr(
3862 col("b1").not().eq(lit(true)),
3864 &schema,
3865 &statistics,
3866 &expected_false,
3867 );
3868 }
3869
3870 fn int32_setup() -> (SchemaRef, TestStatistics) {
3880 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3881
3882 let statistics = TestStatistics::new().with(
3883 "i",
3884 ContainerStats::new_i32(
3885 vec![Some(-5), Some(1), Some(-11), None, Some(1)], vec![Some(5), Some(11), Some(-1), None, None], ),
3888 );
3889 (schema, statistics)
3890 }
3891
3892 #[test]
3893 fn prune_int32_col_gt_zero() {
3894 let (schema, statistics) = int32_setup();
3895
3896 let expected_ret = &[true, true, false, true, true];
3903
3904 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3906
3907 prune_with_expr(
3909 Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3910 &schema,
3911 &statistics,
3912 expected_ret,
3913 );
3914 }
3915
3916 #[test]
3917 fn prune_int32_col_lte_zero() {
3918 let (schema, statistics) = int32_setup();
3919
3920 let expected_ret = &[true, false, true, true, false];
3927
3928 prune_with_expr(
3929 col("i").lt_eq(lit(0)),
3931 &schema,
3932 &statistics,
3933 expected_ret,
3934 );
3935
3936 prune_with_expr(
3937 Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3939 &schema,
3940 &statistics,
3941 expected_ret,
3942 );
3943 }
3944
3945 #[test]
3946 fn prune_int32_col_lte_zero_cast() {
3947 let (schema, statistics) = int32_setup();
3948
3949 let expected_ret = &[true, true, true, true, true];
3956
3957 prune_with_expr(
3958 cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3960 &schema,
3961 &statistics,
3962 expected_ret,
3963 );
3964
3965 prune_with_expr(
3966 try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3968 &schema,
3969 &statistics,
3970 expected_ret,
3971 );
3972
3973 prune_with_expr(
3974 cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3976 &schema,
3977 &statistics,
3978 expected_ret,
3979 );
3980
3981 prune_with_expr(
3982 try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3984 &schema,
3985 &statistics,
3986 expected_ret,
3987 );
3988 }
3989
3990 #[test]
3991 fn prune_int32_col_eq_zero() {
3992 let (schema, statistics) = int32_setup();
3993
3994 let expected_ret = &[true, false, false, true, false];
4001
4002 prune_with_expr(
4003 col("i").eq(lit(0)),
4005 &schema,
4006 &statistics,
4007 expected_ret,
4008 );
4009 }
4010
4011 #[test]
4012 fn prune_int32_col_eq_zero_cast() {
4013 let (schema, statistics) = int32_setup();
4014
4015 let expected_ret = &[true, false, false, true, false];
4022
4023 prune_with_expr(
4024 cast(col("i"), DataType::Int64).eq(lit(0i64)),
4025 &schema,
4026 &statistics,
4027 expected_ret,
4028 );
4029
4030 prune_with_expr(
4031 try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
4032 &schema,
4033 &statistics,
4034 expected_ret,
4035 );
4036 }
4037
4038 #[test]
4039 fn prune_int32_col_eq_zero_cast_as_str() {
4040 let (schema, statistics) = int32_setup();
4041
4042 let expected_ret = &[true, true, true, true, true];
4052
4053 prune_with_expr(
4054 cast(col("i"), DataType::Utf8).eq(lit("0")),
4055 &schema,
4056 &statistics,
4057 expected_ret,
4058 );
4059 }
4060
4061 #[test]
4062 fn prune_int32_col_lt_neg_one() {
4063 let (schema, statistics) = int32_setup();
4064
4065 let expected_ret = &[true, true, false, true, true];
4072
4073 prune_with_expr(
4074 col("i").gt(lit(-1)),
4076 &schema,
4077 &statistics,
4078 expected_ret,
4079 );
4080
4081 prune_with_expr(
4082 Expr::Negative(Box::new(col("i"))).lt(lit(1)),
4084 &schema,
4085 &statistics,
4086 expected_ret,
4087 );
4088 }
4089
4090 #[test]
4091 fn prune_int32_is_null() {
4092 let (schema, statistics) = int32_setup();
4093
4094 let expected_ret = &[true, true, true, true, true];
4097
4098 prune_with_expr(
4099 col("i").is_null(),
4101 &schema,
4102 &statistics,
4103 expected_ret,
4104 );
4105
4106 let statistics = statistics.with_null_counts(
4108 "i",
4109 vec![
4110 Some(0), Some(1), None, None, Some(0), ],
4116 );
4117
4118 let expected_ret = &[false, true, true, true, false];
4119
4120 prune_with_expr(
4121 col("i").is_null(),
4123 &schema,
4124 &statistics,
4125 expected_ret,
4126 );
4127 }
4128
4129 #[test]
4130 fn prune_int32_column_is_known_all_null() {
4131 let (schema, statistics) = int32_setup();
4132
4133 let expected_ret = &[true, false, true, true, false];
4140
4141 prune_with_expr(
4142 col("i").lt(lit(0)),
4144 &schema,
4145 &statistics,
4146 expected_ret,
4147 );
4148
4149 let statistics = statistics.with_row_counts(
4151 "i",
4152 vec![
4153 Some(10), Some(9), None, Some(4),
4157 Some(10),
4158 ],
4159 );
4160
4161 prune_with_expr(
4163 col("i").lt(lit(0)),
4165 &schema,
4166 &statistics,
4167 expected_ret,
4168 );
4169
4170 let statistics = statistics.with_null_counts(
4172 "i",
4173 vec![
4174 Some(0), Some(1), None, Some(4), Some(0), ],
4180 );
4181
4182 let expected_ret = &[true, false, true, false, false];
4191
4192 prune_with_expr(
4193 col("i").lt(lit(0)),
4195 &schema,
4196 &statistics,
4197 expected_ret,
4198 );
4199 }
4200
4201 #[test]
4202 fn prune_cast_column_scalar() {
4203 let (schema, statistics) = int32_setup();
4205 let expected_ret = &[true, true, false, true, true];
4206
4207 prune_with_expr(
4208 col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
4210 &schema,
4211 &statistics,
4212 expected_ret,
4213 );
4214
4215 prune_with_expr(
4216 cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4218 &schema,
4219 &statistics,
4220 expected_ret,
4221 );
4222
4223 prune_with_expr(
4224 try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4226 &schema,
4227 &statistics,
4228 expected_ret,
4229 );
4230
4231 prune_with_expr(
4232 Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
4234 .lt(lit(ScalarValue::Int64(Some(0)))),
4235 &schema,
4236 &statistics,
4237 expected_ret,
4238 );
4239 }
4240
4241 #[test]
4242 fn test_increment_utf8() {
4243 assert_eq!(increment_utf8("abc").unwrap(), "abd");
4245 assert_eq!(increment_utf8("abz").unwrap(), "ab{");
4246
4247 assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); assert_eq!(increment_utf8("ß").unwrap(), "à"); assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); assert!(increment_utf8("").is_none());
4263 assert!(increment_utf8("\u{10FFFF}").is_none()); assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4267
4268 assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4270 assert!(increment_utf8("\u{D7FF}").is_none());
4271
4272 assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4274 assert!(increment_utf8("\u{FDCF}").is_none());
4275
4276 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4278 assert!(increment_utf8("\u{10FFFF}").is_none()); }
4280
4281 fn utf8_setup() -> (SchemaRef, TestStatistics) {
4294 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4295
4296 let statistics = TestStatistics::new().with(
4297 "s1",
4298 ContainerStats::new_utf8(
4299 vec![
4300 Some("A"),
4301 Some("A"),
4302 Some("N"),
4303 Some("M"),
4304 None,
4305 Some("A"),
4306 Some(""),
4307 Some(""),
4308 Some("AB"),
4309 Some("A\u{10ffff}\u{10ffff}"),
4310 ], vec![
4312 Some("Z"),
4313 Some("L"),
4314 Some("Z"),
4315 Some("M"),
4316 None,
4317 None,
4318 Some("A"),
4319 Some(""),
4320 Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4321 Some("A\u{10ffff}\u{10ffff}"),
4322 ], ),
4324 );
4325 (schema, statistics)
4326 }
4327
4328 #[test]
4329 fn prune_utf8_eq() {
4330 let (schema, statistics) = utf8_setup();
4331
4332 let expr = col("s1").eq(lit("A"));
4333 #[rustfmt::skip]
4334 let expected_ret = &[
4335 true,
4337 true,
4339 false,
4341 false,
4343 true,
4345 true,
4347 true,
4349 false,
4351 false,
4353 false,
4355 ];
4356 prune_with_expr(expr, &schema, &statistics, expected_ret);
4357
4358 let expr = col("s1").eq(lit(""));
4359 #[rustfmt::skip]
4360 let expected_ret = &[
4361 false,
4363 false,
4365 false,
4367 false,
4369 true,
4371 false,
4373 true,
4375 true,
4377 false,
4379 false,
4381 ];
4382 prune_with_expr(expr, &schema, &statistics, expected_ret);
4383 }
4384
4385 #[test]
4386 fn prune_utf8_not_eq() {
4387 let (schema, statistics) = utf8_setup();
4388
4389 let expr = col("s1").not_eq(lit("A"));
4390 #[rustfmt::skip]
4391 let expected_ret = &[
4392 true,
4394 true,
4396 true,
4398 true,
4400 true,
4402 true,
4404 true,
4406 true,
4408 true,
4410 true,
4412 ];
4413 prune_with_expr(expr, &schema, &statistics, expected_ret);
4414
4415 let expr = col("s1").not_eq(lit(""));
4416 #[rustfmt::skip]
4417 let expected_ret = &[
4418 true,
4420 true,
4422 true,
4424 true,
4426 true,
4428 true,
4430 true,
4432 false,
4434 true,
4436 true,
4438 ];
4439 prune_with_expr(expr, &schema, &statistics, expected_ret);
4440 }
4441
4442 #[test]
4443 fn prune_utf8_like_one() {
4444 let (schema, statistics) = utf8_setup();
4445
4446 let expr = col("s1").like(lit("A_"));
4447 #[rustfmt::skip]
4448 let expected_ret = &[
4449 true,
4451 true,
4453 false,
4455 false,
4457 true,
4459 true,
4461 true,
4463 false,
4465 true,
4467 true,
4469 ];
4470 prune_with_expr(expr, &schema, &statistics, expected_ret);
4471
4472 let expr = col("s1").like(lit("_A_"));
4473 #[rustfmt::skip]
4474 let expected_ret = &[
4475 true,
4477 true,
4479 true,
4481 true,
4483 true,
4485 true,
4487 true,
4489 true,
4491 true,
4493 true,
4495 ];
4496 prune_with_expr(expr, &schema, &statistics, expected_ret);
4497
4498 let expr = col("s1").like(lit("_"));
4499 #[rustfmt::skip]
4500 let expected_ret = &[
4501 true,
4503 true,
4505 true,
4507 true,
4509 true,
4511 true,
4513 true,
4515 true,
4517 true,
4519 true,
4521 ];
4522 prune_with_expr(expr, &schema, &statistics, expected_ret);
4523
4524 let expr = col("s1").like(lit(""));
4525 #[rustfmt::skip]
4526 let expected_ret = &[
4527 false,
4529 false,
4531 false,
4533 false,
4535 true,
4537 false,
4539 true,
4541 true,
4543 false,
4545 false,
4547 ];
4548 prune_with_expr(expr, &schema, &statistics, expected_ret);
4549 }
4550
4551 #[test]
4552 fn prune_utf8_like_many() {
4553 let (schema, statistics) = utf8_setup();
4554
4555 let expr = col("s1").like(lit("A%"));
4556 #[rustfmt::skip]
4557 let expected_ret = &[
4558 true,
4560 true,
4562 false,
4564 false,
4566 true,
4568 true,
4570 true,
4572 false,
4574 true,
4576 true,
4578 ];
4579 prune_with_expr(expr, &schema, &statistics, expected_ret);
4580
4581 let expr = col("s1").like(lit("%A%"));
4582 #[rustfmt::skip]
4583 let expected_ret = &[
4584 true,
4586 true,
4588 true,
4590 true,
4592 true,
4594 true,
4596 true,
4598 true,
4600 true,
4602 true,
4604 ];
4605 prune_with_expr(expr, &schema, &statistics, expected_ret);
4606
4607 let expr = col("s1").like(lit("%"));
4608 #[rustfmt::skip]
4609 let expected_ret = &[
4610 true,
4612 true,
4614 true,
4616 true,
4618 true,
4620 true,
4622 true,
4624 true,
4626 true,
4628 true,
4630 ];
4631 prune_with_expr(expr, &schema, &statistics, expected_ret);
4632
4633 let expr = col("s1").like(lit(""));
4634 #[rustfmt::skip]
4635 let expected_ret = &[
4636 false,
4638 false,
4640 false,
4642 false,
4644 true,
4646 false,
4648 true,
4650 true,
4652 false,
4654 false,
4656 ];
4657 prune_with_expr(expr, &schema, &statistics, expected_ret);
4658 }
4659
4660 #[test]
4661 fn prune_utf8_not_like_one() {
4662 let (schema, statistics) = utf8_setup();
4663
4664 let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4665 #[rustfmt::skip]
4666 let expected_ret = &[
4667 true,
4669 true,
4671 true,
4673 true,
4675 true,
4677 true,
4679 true,
4681 true,
4683 true,
4685 true,
4688 ];
4689 prune_with_expr(expr, &schema, &statistics, expected_ret);
4690 }
4691
4692 #[test]
4693 fn prune_utf8_not_like_many() {
4694 let (schema, statistics) = utf8_setup();
4695
4696 let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4697 #[rustfmt::skip]
4698 let expected_ret = &[
4699 true,
4701 true,
4703 true,
4705 true,
4707 true,
4709 true,
4711 true,
4713 true,
4715 true,
4717 false,
4719 ];
4720 prune_with_expr(expr, &schema, &statistics, expected_ret);
4721
4722 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4723 #[rustfmt::skip]
4724 let expected_ret = &[
4725 true,
4727 true,
4729 true,
4731 true,
4733 true,
4735 true,
4737 true,
4739 true,
4741 true,
4743 true,
4745 ];
4746 prune_with_expr(expr, &schema, &statistics, expected_ret);
4747
4748 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4749 #[rustfmt::skip]
4750 let expected_ret = &[
4751 true,
4753 true,
4755 true,
4757 true,
4759 true,
4761 true,
4763 true,
4765 true,
4767 true,
4769 true,
4771 ];
4772 prune_with_expr(expr, &schema, &statistics, expected_ret);
4773
4774 let expr = col("s1").not_like(lit("A\\%%"));
4775 let statistics = TestStatistics::new().with(
4776 "s1",
4777 ContainerStats::new_utf8(
4778 vec![Some("A%a"), Some("A")],
4779 vec![Some("A%c"), Some("A")],
4780 ),
4781 );
4782 let expected_ret = &[false, true];
4783 prune_with_expr(expr, &schema, &statistics, expected_ret);
4784 }
4785
4786 #[test]
4787 fn test_rewrite_expr_to_prunable() {
4788 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4789 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4790
4791 let left_input = col("a");
4793 let left_input = logical2physical(&left_input, &schema);
4794 let right_input = lit(ScalarValue::Int32(Some(12)));
4795 let right_input = logical2physical(&right_input, &schema);
4796 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4797 &left_input,
4798 Operator::Eq,
4799 &right_input,
4800 df_schema.clone(),
4801 )
4802 .unwrap();
4803 assert_eq!(result_left.to_string(), left_input.to_string());
4804 assert_eq!(result_right.to_string(), right_input.to_string());
4805
4806 let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4808 let left_input = logical2physical(&left_input, &schema);
4809 let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4810 let right_input = logical2physical(&right_input, &schema);
4811 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4812 &left_input,
4813 Operator::Gt,
4814 &right_input,
4815 df_schema.clone(),
4816 )
4817 .unwrap();
4818 assert_eq!(result_left.to_string(), left_input.to_string());
4819 assert_eq!(result_right.to_string(), right_input.to_string());
4820
4821 let left_input = try_cast(col("a"), DataType::Int64);
4823 let left_input = logical2physical(&left_input, &schema);
4824 let right_input = lit(ScalarValue::Int64(Some(12)));
4825 let right_input = logical2physical(&right_input, &schema);
4826 let (result_left, _, result_right) =
4827 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4828 .unwrap();
4829 assert_eq!(result_left.to_string(), left_input.to_string());
4830 assert_eq!(result_right.to_string(), right_input.to_string());
4831
4832 }
4834
4835 #[test]
4836 fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4837 struct CustomUnhandledHook;
4838
4839 impl UnhandledPredicateHook for CustomUnhandledHook {
4840 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4844 Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4845 }
4846 }
4847
4848 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4849 let schema_with_b = Schema::new(vec![
4850 Field::new("a", DataType::Int32, true),
4851 Field::new("b", DataType::Int32, true),
4852 ]);
4853
4854 let rewriter = PredicateRewriter::new()
4855 .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4856
4857 let transform_expr = |expr| {
4858 let expr = logical2physical(&expr, &schema_with_b);
4859 rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4860 };
4861
4862 let known_expression = col("a").eq(lit(12));
4864 let known_expression_transformed = PredicateRewriter::new()
4865 .rewrite_predicate_to_statistics_predicate(
4866 &logical2physical(&known_expression, &schema),
4867 &schema,
4868 );
4869
4870 let input = col("b").eq(lit(12));
4872 let expected = logical2physical(&lit(42), &schema);
4873 let transformed = transform_expr(input.clone());
4874 assert_eq!(transformed.to_string(), expected.to_string());
4875
4876 let input = known_expression.clone().and(input.clone());
4878 let expected = phys_expr::BinaryExpr::new(
4879 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4880 Operator::And,
4881 logical2physical(&lit(42), &schema),
4882 );
4883 let transformed = transform_expr(input.clone());
4884 assert_eq!(transformed.to_string(), expected.to_string());
4885
4886 let input = array_has(make_array(vec![lit(1)]), col("a"));
4888 let expected = logical2physical(&lit(42), &schema);
4889 let transformed = transform_expr(input.clone());
4890 assert_eq!(transformed.to_string(), expected.to_string());
4891
4892 let input = known_expression.and(input);
4894 let expected = phys_expr::BinaryExpr::new(
4895 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4896 Operator::And,
4897 logical2physical(&lit(42), &schema),
4898 );
4899 let transformed = transform_expr(input.clone());
4900 assert_eq!(transformed.to_string(), expected.to_string());
4901 }
4902
4903 #[test]
4904 fn test_rewrite_expr_to_prunable_error() {
4905 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4908 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4909 let left_input = cast(col("a"), DataType::Int64);
4910 let left_input = logical2physical(&left_input, &schema);
4911 let right_input = lit(ScalarValue::Int64(Some(12)));
4912 let right_input = logical2physical(&right_input, &schema);
4913 let result = rewrite_expr_to_prunable(
4914 &left_input,
4915 Operator::Gt,
4916 &right_input,
4917 df_schema.clone(),
4918 );
4919 assert!(result.is_err());
4920
4921 let left_input = is_null(col("a"));
4923 let left_input = logical2physical(&left_input, &schema);
4924 let right_input = lit(ScalarValue::Int64(Some(12)));
4925 let right_input = logical2physical(&right_input, &schema);
4926 let result =
4927 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4928 assert!(result.is_err());
4929 }
4931
4932 #[test]
4933 fn prune_with_contained_one_column() {
4934 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4935
4936 let statistics = TestStatistics::new()
4938 .with_contained(
4939 "s1",
4940 [ScalarValue::from("foo")],
4941 [
4942 Some(true),
4944 Some(false),
4946 None,
4948 Some(true),
4950 Some(false),
4952 None,
4954 Some(true),
4956 Some(false),
4958 None,
4960 ],
4961 )
4962 .with_contained(
4963 "s1",
4964 [ScalarValue::from("bar")],
4965 [
4966 Some(true),
4968 Some(true),
4969 Some(true),
4970 Some(false),
4972 Some(false),
4973 Some(false),
4974 None,
4976 None,
4977 None,
4978 ],
4979 )
4980 .with_contained(
4981 "s1",
4984 [ScalarValue::from("foo"), ScalarValue::from("bar")],
4985 [
4986 None,
4988 None,
4989 None,
4990 Some(true),
4992 Some(true),
4993 Some(true),
4994 Some(false),
4996 Some(false),
4997 Some(false),
4998 ],
4999 );
5000
5001 prune_with_expr(
5003 col("s1").eq(lit("foo")),
5004 &schema,
5005 &statistics,
5006 &[true, false, true, true, false, true, true, false, true],
5008 );
5009
5010 prune_with_expr(
5012 col("s1").eq(lit("bar")),
5013 &schema,
5014 &statistics,
5015 &[true, true, true, false, false, false, true, true, true],
5017 );
5018
5019 prune_with_expr(
5021 col("s1").eq(lit("baz")),
5022 &schema,
5023 &statistics,
5024 &[true, true, true, true, true, true, true, true, true],
5026 );
5027
5028 prune_with_expr(
5030 col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
5031 &schema,
5032 &statistics,
5033 &[true, true, true, true, true, true, true, true, true],
5037 );
5038
5039 prune_with_expr(
5041 col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
5042 &schema,
5043 &statistics,
5044 &[true, true, true, true, true, true, false, false, false],
5046 );
5047
5048 prune_with_expr(
5050 col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
5051 &schema,
5052 &statistics,
5053 &[true, true, true, true, true, true, true, true, true],
5055 );
5056
5057 prune_with_expr(
5059 col("s1")
5060 .eq(lit("foo"))
5061 .or(col("s1").eq(lit("bar")))
5062 .or(col("s1").eq(lit("baz"))),
5063 &schema,
5064 &statistics,
5065 &[true, true, true, true, true, true, true, true, true],
5068 );
5069
5070 prune_with_expr(
5072 col("s1").not_eq(lit("foo")),
5073 &schema,
5074 &statistics,
5075 &[false, true, true, false, true, true, false, true, true],
5077 );
5078
5079 prune_with_expr(
5081 col("s1").not_eq(lit("bar")),
5082 &schema,
5083 &statistics,
5084 &[false, false, false, true, true, true, true, true, true],
5086 );
5087
5088 prune_with_expr(
5090 col("s1")
5091 .not_eq(lit("foo"))
5092 .and(col("s1").not_eq(lit("bar"))),
5093 &schema,
5094 &statistics,
5095 &[true, true, true, false, false, false, true, true, true],
5097 );
5098
5099 prune_with_expr(
5101 col("s1")
5102 .not_eq(lit("foo"))
5103 .and(col("s1").not_eq(lit("bar")))
5104 .and(col("s1").not_eq(lit("baz"))),
5105 &schema,
5106 &statistics,
5107 &[true, true, true, true, true, true, true, true, true],
5109 );
5110
5111 prune_with_expr(
5113 col("s1")
5114 .not_eq(lit("foo"))
5115 .or(col("s1").not_eq(lit("bar"))),
5116 &schema,
5117 &statistics,
5118 &[true, true, true, true, true, true, true, true, true],
5120 );
5121
5122 prune_with_expr(
5124 col("s1")
5125 .not_eq(lit("foo"))
5126 .or(col("s1").not_eq(lit("bar")))
5127 .or(col("s1").not_eq(lit("baz"))),
5128 &schema,
5129 &statistics,
5130 &[true, true, true, true, true, true, true, true, true],
5132 );
5133 }
5134
5135 #[test]
5136 fn prune_with_contained_two_columns() {
5137 let schema = Arc::new(Schema::new(vec![
5138 Field::new("s1", DataType::Utf8, true),
5139 Field::new("s2", DataType::Utf8, true),
5140 ]));
5141
5142 let statistics = TestStatistics::new()
5144 .with_contained(
5145 "s1",
5146 [ScalarValue::from("foo")],
5147 [
5148 Some(true),
5150 Some(false),
5152 None,
5154 Some(true),
5156 Some(false),
5158 None,
5160 Some(true),
5162 Some(false),
5164 None,
5166 ],
5167 )
5168 .with_contained(
5169 "s2", [ScalarValue::from("bar")],
5171 [
5172 Some(true),
5174 Some(true),
5175 Some(true),
5176 Some(false),
5178 Some(false),
5179 Some(false),
5180 None,
5182 None,
5183 None,
5184 ],
5185 );
5186
5187 prune_with_expr(
5189 col("s1").eq(lit("foo")),
5190 &schema,
5191 &statistics,
5192 &[true, false, true, true, false, true, true, false, true],
5194 );
5195
5196 let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
5198 prune_with_expr(
5199 expr,
5200 &schema,
5201 &statistics,
5202 &[true, true, true, true, true, true, true, true, true],
5204 );
5205
5206 prune_with_expr(
5208 col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
5209 &schema,
5210 &statistics,
5211 &[false, false, false, true, false, true, true, false, true],
5215 );
5216
5217 prune_with_expr(
5219 col("s1")
5220 .not_eq(lit("foo"))
5221 .and(col("s2").not_eq(lit("bar"))),
5222 &schema,
5223 &statistics,
5224 &[false, false, false, false, true, true, false, true, true],
5228 );
5229
5230 prune_with_expr(
5232 col("s1")
5233 .not_eq(lit("foo"))
5234 .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
5235 &schema,
5236 &statistics,
5237 &[false, true, true, false, true, true, false, true, true],
5240 );
5241
5242 prune_with_expr(
5244 col("s1").like(lit("foo%bar%")),
5245 &schema,
5246 &statistics,
5247 &[true, true, true, true, true, true, true, true, true],
5249 );
5250
5251 prune_with_expr(
5253 col("s1")
5254 .like(lit("foo%bar%"))
5255 .and(col("s2").eq(lit("bar"))),
5256 &schema,
5257 &statistics,
5258 &[true, true, true, false, false, false, true, true, true],
5260 );
5261
5262 prune_with_expr(
5264 col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5265 &schema,
5266 &statistics,
5267 &[true, true, true, true, true, true, true, true, true],
5270 );
5271 }
5272
5273 #[test]
5274 fn prune_with_range_and_contained() {
5275 let schema = Arc::new(Schema::new(vec![
5277 Field::new("i", DataType::Int32, true),
5278 Field::new("s", DataType::Utf8, true),
5279 ]));
5280
5281 let statistics = TestStatistics::new()
5282 .with(
5283 "i",
5284 ContainerStats::new_i32(
5285 vec![
5289 Some(-5),
5290 Some(10),
5291 None,
5292 Some(-5),
5293 Some(10),
5294 None,
5295 Some(-5),
5296 Some(10),
5297 None,
5298 ], vec![
5300 Some(5),
5301 Some(20),
5302 None,
5303 Some(5),
5304 Some(20),
5305 None,
5306 Some(5),
5307 Some(20),
5308 None,
5309 ], ),
5311 )
5312 .with_contained(
5314 "s",
5315 [ScalarValue::from("foo")],
5316 [
5317 Some(true),
5319 Some(true),
5320 Some(true),
5321 Some(false),
5323 Some(false),
5324 Some(false),
5325 None,
5327 None,
5328 None,
5329 ],
5330 );
5331
5332 prune_with_expr(
5334 col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5335 &schema,
5336 &statistics,
5337 &[true, false, true, false, false, false, true, false, true],
5342 );
5343
5344 prune_with_expr(
5346 col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5347 &schema,
5348 &statistics,
5349 &[false, false, false, true, false, true, true, false, true],
5353 );
5354
5355 prune_with_expr(
5357 col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5358 &schema,
5359 &statistics,
5360 &[true, true, true, true, true, true, true, true, true],
5363 );
5364 }
5365
5366 fn prune_with_expr(
5373 expr: Expr,
5374 schema: &SchemaRef,
5375 statistics: &TestStatistics,
5376 expected: &[bool],
5377 ) {
5378 println!("Pruning with expr: {expr}");
5379 let expr = logical2physical(&expr, schema);
5380 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5381 let result = p.prune(statistics).unwrap();
5382 assert_eq!(result, expected);
5383 }
5384
5385 fn prune_with_simplified_expr(
5386 expr: Expr,
5387 schema: &SchemaRef,
5388 statistics: &TestStatistics,
5389 expected: &[bool],
5390 ) {
5391 println!("Pruning with expr: {expr}");
5392 let expr = logical2physical(&expr, schema);
5393 let simplifier = PhysicalExprSimplifier::new(schema);
5394 let expr = simplifier.simplify(expr).unwrap();
5395 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5396 let result = p.prune(statistics).unwrap();
5397 assert_eq!(result, expected);
5398 }
5399
5400 fn test_build_predicate_expression(
5401 expr: &Expr,
5402 schema: &Schema,
5403 required_columns: &mut RequiredColumns,
5404 ) -> Arc<dyn PhysicalExpr> {
5405 let expr = logical2physical(expr, schema);
5406 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5407 build_predicate_expression(
5408 &expr,
5409 &Arc::new(schema.clone()),
5410 required_columns,
5411 &unhandled_hook,
5412 )
5413 }
5414
5415 #[test]
5416 fn test_build_predicate_expression_with_false() {
5417 let expr = lit(ScalarValue::Boolean(Some(false)));
5418 let schema = Schema::empty();
5419 let res =
5420 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5421 let expected = logical2physical(&expr, &schema);
5422 assert_eq!(&res, &expected);
5423 }
5424
5425 #[test]
5426 fn test_build_predicate_expression_with_and_false() {
5427 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5428 let expr = and(
5429 col("c1").eq(lit("a")),
5430 lit(ScalarValue::Boolean(Some(false))),
5431 );
5432 let res =
5433 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5434 let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5435 assert_eq!(&res, &expected);
5436 }
5437
5438 #[test]
5439 fn test_build_predicate_expression_with_or_false() {
5440 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5441 let left_expr = col("c1").eq(lit("a"));
5442 let right_expr = lit(ScalarValue::Boolean(Some(false)));
5443 let res = test_build_predicate_expression(
5444 &or(left_expr.clone(), right_expr.clone()),
5445 &schema,
5446 &mut RequiredColumns::new(),
5447 );
5448 let expected =
5449 "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5450 assert_eq!(res.to_string(), expected);
5451 }
5452}