1use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27 array::{ArrayRef, BooleanArray, new_null_array},
28 datatypes::{DataType, Field, Schema, SchemaRef},
29 record_batch::{RecordBatch, RecordBatchOptions},
30};
31pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::{TransformedResult, TreeNodeRecursion};
39use datafusion_common::{Column, DFSchema, assert_eq_or_internal_err};
40use datafusion_common::{
41 ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err,
42 tree_node::{Transformed, TreeNode},
43};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
46use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
47use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
48use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
49
50#[derive(Debug, Clone)]
362pub struct PruningPredicate {
363 schema: SchemaRef,
365 predicate_expr: Arc<dyn PhysicalExpr>,
368 required_columns: RequiredColumns,
370 orig_expr: Arc<dyn PhysicalExpr>,
373 literal_guarantees: Vec<LiteralGuarantee>,
378}
379
380pub fn build_pruning_predicate(
386 predicate: Arc<dyn PhysicalExpr>,
387 file_schema: &SchemaRef,
388 predicate_creation_errors: &Count,
389) -> Option<Arc<PruningPredicate>> {
390 match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
391 Ok(pruning_predicate) => {
392 if !pruning_predicate.always_true() {
393 return Some(Arc::new(pruning_predicate));
394 }
395 }
396 Err(e) => {
397 debug!("Could not create pruning predicate for: {e}");
398 predicate_creation_errors.add(1);
399 }
400 }
401 None
402}
403
404pub trait UnhandledPredicateHook {
408 fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
411}
412
413#[derive(Debug, Clone)]
416struct ConstantUnhandledPredicateHook {
417 default: Arc<dyn PhysicalExpr>,
418}
419
420impl Default for ConstantUnhandledPredicateHook {
421 fn default() -> Self {
422 Self {
423 default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
424 }
425 }
426}
427
428impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
429 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
430 Arc::clone(&self.default)
431 }
432}
433
434impl PruningPredicate {
435 pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
464 let tf = snapshot_physical_expr_opt(expr)?;
468 if tf.transformed {
469 let simplifier = PhysicalExprSimplifier::new(&schema);
476 expr = simplifier.simplify(tf.data)?;
477 } else {
478 expr = tf.data;
479 }
480 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
481
482 let mut required_columns = RequiredColumns::new();
484 let predicate_expr = build_predicate_expression(
485 &expr,
486 &schema,
487 &mut required_columns,
488 &unhandled_hook,
489 );
490 let predicate_schema = required_columns.schema();
491 let predicate_expr =
493 PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
494 let literal_guarantees = LiteralGuarantee::analyze(&expr);
495
496 Ok(Self {
497 schema,
498 predicate_expr,
499 required_columns,
500 orig_expr: expr,
501 literal_guarantees,
502 })
503 }
504
505 pub fn prune<S: PruningStatistics + ?Sized>(
520 &self,
521 statistics: &S,
522 ) -> Result<Vec<bool>> {
523 let mut builder = BoolVecBuilder::new(statistics.num_containers());
524
525 for literal_guarantee in &self.literal_guarantees {
528 let LiteralGuarantee {
529 column,
530 guarantee,
531 literals,
532 } = literal_guarantee;
533 if let Some(results) = statistics.contained(column, literals) {
534 match guarantee {
535 Guarantee::In => builder.combine_array(&results),
540 Guarantee::NotIn => {
546 builder.combine_array(&arrow::compute::not(&results)?)
547 }
548 }
549 if builder.check_all_pruned() {
552 return Ok(builder.build());
553 }
554 }
555 }
556
557 let statistics_batch =
563 build_statistics_record_batch(statistics, &self.required_columns)?;
564
565 builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
567
568 Ok(builder.build())
569 }
570
571 pub fn schema(&self) -> &SchemaRef {
573 &self.schema
574 }
575
576 pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
578 &self.orig_expr
579 }
580
581 pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
583 &self.predicate_expr
584 }
585
586 pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
592 &self.literal_guarantees
593 }
594
595 pub fn always_true(&self) -> bool {
602 is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
603 }
604
605 pub fn required_columns(&self) -> &RequiredColumns {
606 &self.required_columns
607 }
608
609 pub fn literal_columns(&self) -> Vec<String> {
617 let mut seen = HashSet::new();
618 self.literal_guarantees
619 .iter()
620 .map(|e| &e.column.name)
621 .filter(|name| seen.insert(*name))
623 .map(|s| s.to_string())
624 .collect()
625 }
626}
627
628#[derive(Debug)]
630struct BoolVecBuilder {
631 inner: Vec<bool>,
635}
636
637impl BoolVecBuilder {
638 fn new(num_containers: usize) -> Self {
640 Self {
641 inner: vec![true; num_containers],
643 }
644 }
645
646 fn combine_array(&mut self, array: &BooleanArray) {
654 assert_eq!(array.len(), self.inner.len());
655 for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
656 if let Some(false) = new {
660 *cur = false;
661 }
662 }
663 }
664
665 fn combine_value(&mut self, value: ColumnarValue) {
671 match value {
672 ColumnarValue::Array(array) => {
673 self.combine_array(array.as_boolean());
674 }
675 ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
676 self.inner = vec![false; self.inner.len()];
678 }
679 _ => {
680 }
683 }
684 }
685
686 fn build(self) -> Vec<bool> {
688 self.inner
689 }
690
691 fn check_all_pruned(&self) -> bool {
693 self.inner.iter().all(|&x| !x)
694 }
695}
696
697fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
698 expr.downcast_ref::<phys_expr::Literal>()
699 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
700 .unwrap_or_default()
701}
702
703fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
704 expr.downcast_ref::<phys_expr::Literal>()
705 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
706 .unwrap_or_default()
707}
708
709#[derive(Debug, Default, Clone)]
719pub struct RequiredColumns {
720 columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
726}
727
728impl RequiredColumns {
729 fn new() -> Self {
730 Self::default()
731 }
732
733 pub fn single_column(&self) -> Option<&phys_expr::Column> {
742 if self.columns.windows(2).all(|w| {
743 let c1 = &w[0].0;
745 let c2 = &w[1].0;
746 c1 == c2
747 }) {
748 self.columns.first().map(|r| &r.0)
749 } else {
750 None
751 }
752 }
753
754 fn schema(&self) -> Schema {
761 let fields = self
762 .columns
763 .iter()
764 .map(|(_c, _t, f)| f.clone())
765 .collect::<Vec<_>>();
766 Schema::new(fields)
767 }
768
769 pub(crate) fn iter(
772 &self,
773 ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
774 self.columns.iter()
775 }
776
777 fn find_stat_column(
778 &self,
779 column: &phys_expr::Column,
780 statistics_type: StatisticsType,
781 ) -> Option<usize> {
782 match statistics_type {
783 StatisticsType::RowCount => {
784 self.columns
786 .iter()
787 .enumerate()
788 .find(|(_i, (_c, t, _f))| t == &statistics_type)
789 .map(|(i, (_c, _t, _f))| i)
790 }
791 _ => self
792 .columns
793 .iter()
794 .enumerate()
795 .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
796 .map(|(i, (_c, _t, _f))| i),
797 }
798 }
799
800 fn stat_column_expr(
809 &mut self,
810 column: &phys_expr::Column,
811 column_expr: &Arc<dyn PhysicalExpr>,
812 field: &Field,
813 stat_type: StatisticsType,
814 ) -> Result<Arc<dyn PhysicalExpr>> {
815 let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
816 Some(idx) => (idx, false),
817 None => (self.columns.len(), true),
818 };
819
820 let column_name = column.name();
821 let stat_column_name = match stat_type {
822 StatisticsType::Min => format!("{column_name}_min"),
823 StatisticsType::Max => format!("{column_name}_max"),
824 StatisticsType::NullCount => format!("{column_name}_null_count"),
825 StatisticsType::RowCount => "row_count".to_string(),
826 };
827
828 let stat_column = phys_expr::Column::new(&stat_column_name, idx);
829
830 if need_to_insert {
832 let nullable = true;
834 let stat_field =
835 Field::new(stat_column.name(), field.data_type().clone(), nullable);
836 self.columns.push((column.clone(), stat_type, stat_field));
837 }
838 rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
839 }
840
841 fn min_column_expr(
843 &mut self,
844 column: &phys_expr::Column,
845 column_expr: &Arc<dyn PhysicalExpr>,
846 field: &Field,
847 ) -> Result<Arc<dyn PhysicalExpr>> {
848 self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
849 }
850
851 fn max_column_expr(
853 &mut self,
854 column: &phys_expr::Column,
855 column_expr: &Arc<dyn PhysicalExpr>,
856 field: &Field,
857 ) -> Result<Arc<dyn PhysicalExpr>> {
858 self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
859 }
860
861 fn null_count_column_expr(
863 &mut self,
864 column: &phys_expr::Column,
865 column_expr: &Arc<dyn PhysicalExpr>,
866 field: &Field,
867 ) -> Result<Arc<dyn PhysicalExpr>> {
868 self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
869 }
870
871 fn row_count_column_expr(
873 &mut self,
874 column: &phys_expr::Column,
875 column_expr: &Arc<dyn PhysicalExpr>,
876 field: &Field,
877 ) -> Result<Arc<dyn PhysicalExpr>> {
878 self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
879 }
880}
881
882impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
883 fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
884 Self { columns }
885 }
886}
887
888fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
914 statistics: &S,
915 required_columns: &RequiredColumns,
916) -> Result<RecordBatch> {
917 let mut arrays = Vec::<ArrayRef>::new();
918 for (column, statistics_type, stat_field) in required_columns.iter() {
920 let column = Column::from_name(column.name());
921 let data_type = stat_field.data_type();
922
923 let num_containers = statistics.num_containers();
924
925 let array = match statistics_type {
926 StatisticsType::Min => statistics.min_values(&column),
927 StatisticsType::Max => statistics.max_values(&column),
928 StatisticsType::NullCount => statistics.null_counts(&column),
929 StatisticsType::RowCount => statistics.row_counts(),
930 };
931 let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
932
933 assert_eq_or_internal_err!(
934 num_containers,
935 array.len(),
936 "mismatched statistics length. Expected {}, got {}",
937 num_containers,
938 array.len()
939 );
940
941 let array = arrow::compute::cast(&array, data_type)?;
944
945 arrays.push(array);
946 }
947
948 let schema = Arc::new(required_columns.schema());
949 let mut options = RecordBatchOptions::default();
951 options.row_count = Some(statistics.num_containers());
952
953 trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
954
955 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
956 plan_datafusion_err!("Can not create statistics record batch: {err}")
957 })
958}
959
960struct PruningExpressionBuilder<'a> {
961 column: phys_expr::Column,
962 column_expr: Arc<dyn PhysicalExpr>,
963 op: Operator,
964 scalar_expr: Arc<dyn PhysicalExpr>,
965 field: &'a Field,
966 required_columns: &'a mut RequiredColumns,
967}
968
969impl<'a> PruningExpressionBuilder<'a> {
970 fn try_new(
971 left: &'a Arc<dyn PhysicalExpr>,
972 right: &'a Arc<dyn PhysicalExpr>,
973 left_columns: ColumnReferenceCount,
974 right_columns: ColumnReferenceCount,
975 op: Operator,
976 schema: &'a SchemaRef,
977 required_columns: &'a mut RequiredColumns,
978 ) -> Result<Self> {
979 let (column_expr, scalar_expr, column, correct_operator) = match (
981 left_columns,
982 right_columns,
983 ) {
984 (ColumnReferenceCount::One(column), ColumnReferenceCount::Zero) => {
985 (left, right, column, op)
986 }
987 (ColumnReferenceCount::Zero, ColumnReferenceCount::One(column)) => {
988 (right, left, column, reverse_operator(op)?)
989 }
990 (ColumnReferenceCount::One(_), ColumnReferenceCount::One(_)) => {
991 return plan_err!(
993 "Expression not supported for pruning: left has 1 column, right has 1 column"
994 );
995 }
996 (ColumnReferenceCount::Zero, ColumnReferenceCount::Zero) => {
997 return plan_err!(
999 "Pruning literal expressions is not supported, please call PhysicalExprSimplifier first"
1000 );
1001 }
1002 (ColumnReferenceCount::Many, _) | (_, ColumnReferenceCount::Many) => {
1003 return plan_err!(
1004 "Expression not supported for pruning: left or right has multiple columns"
1005 );
1006 }
1007 };
1008
1009 let df_schema = DFSchema::try_from(Arc::clone(schema))?;
1010 let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1011 column_expr,
1012 correct_operator,
1013 scalar_expr,
1014 df_schema,
1015 )?;
1016 let field = match schema.column_with_name(column.name()) {
1017 Some((_, f)) => f,
1018 _ => {
1019 return plan_err!("Field not found in schema");
1020 }
1021 };
1022
1023 Ok(Self {
1024 column,
1025 column_expr,
1026 op: correct_operator,
1027 scalar_expr,
1028 field,
1029 required_columns,
1030 })
1031 }
1032
1033 fn op(&self) -> Operator {
1034 self.op
1035 }
1036
1037 fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1038 &self.scalar_expr
1039 }
1040
1041 fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1042 self.required_columns
1043 .min_column_expr(&self.column, &self.column_expr, self.field)
1044 }
1045
1046 fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1047 self.required_columns
1048 .max_column_expr(&self.column, &self.column_expr, self.field)
1049 }
1050
1051 fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1058 let column_expr = Arc::new(self.column.clone()) as _;
1060
1061 let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1063
1064 self.required_columns.null_count_column_expr(
1065 &self.column,
1066 &column_expr,
1067 null_count_field,
1068 )
1069 }
1070
1071 fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1078 let column_expr = Arc::new(self.column.clone()) as _;
1080
1081 let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1083
1084 self.required_columns.row_count_column_expr(
1085 &self.column,
1086 &column_expr,
1087 row_count_field,
1088 )
1089 }
1090}
1091
1092fn rewrite_expr_to_prunable(
1105 column_expr: &PhysicalExprRef,
1106 op: Operator,
1107 scalar_expr: &PhysicalExprRef,
1108 schema: DFSchema,
1109) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1110 if !is_compare_op(op) {
1111 return plan_err!("rewrite_expr_to_prunable only support compare expression");
1112 }
1113
1114 if column_expr.downcast_ref::<phys_expr::Column>().is_some() {
1115 Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1117 } else if let Some(cast) = column_expr.downcast_ref::<phys_expr::CastExpr>() {
1118 let (left, op, right) = rewrite_cast_child_to_prunable(
1120 cast.expr(),
1121 cast.cast_type(),
1122 op,
1123 scalar_expr,
1124 schema,
1125 )?;
1126 let left = Arc::new(phys_expr::CastExpr::new_with_target_field(
1127 left,
1128 Arc::clone(cast.target_field()),
1129 None,
1130 ));
1131 Ok((left, op, right))
1136 } else if let Some(try_cast) = column_expr.downcast_ref::<phys_expr::TryCastExpr>() {
1137 let (left, op, right) = rewrite_cast_child_to_prunable(
1139 try_cast.expr(),
1140 try_cast.cast_type(),
1141 op,
1142 scalar_expr,
1143 schema,
1144 )?;
1145 let left = Arc::new(phys_expr::TryCastExpr::new(
1146 left,
1147 try_cast.cast_type().clone(),
1148 ));
1149 Ok((left, op, right))
1150 } else if let Some(neg) = column_expr.downcast_ref::<phys_expr::NegativeExpr>() {
1151 let (left, op, right) =
1153 rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1154 let right = Arc::new(phys_expr::NegativeExpr::new(right));
1155 Ok((left, reverse_operator(op)?, right))
1156 } else if let Some(not) = column_expr.downcast_ref::<phys_expr::NotExpr>() {
1157 if !matches!(
1159 op,
1160 Operator::Eq
1161 | Operator::NotEq
1162 | Operator::IsDistinctFrom
1163 | Operator::IsNotDistinctFrom
1164 ) {
1165 return plan_err!(
1166 "Not with operator other than Eq / NotEq / IsDistinctFrom / IsNotDistinctFrom is not supported"
1167 );
1168 }
1169 if not.arg().downcast_ref::<phys_expr::Column>().is_some() {
1170 let left = Arc::clone(not.arg());
1171 let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1172 Ok((left, reverse_operator(op)?, right))
1173 } else {
1174 plan_err!("Not with complex expression {column_expr:?} is not supported")
1175 }
1176 } else {
1177 plan_err!("column expression {column_expr:?} is not supported")
1178 }
1179}
1180
1181fn rewrite_cast_child_to_prunable(
1182 cast_child_expr: &PhysicalExprRef,
1183 cast_type: &DataType,
1184 op: Operator,
1185 scalar_expr: &PhysicalExprRef,
1186 schema: DFSchema,
1187) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1188 verify_support_type_for_prune(
1189 &cast_child_expr.data_type(schema.as_arrow())?,
1190 cast_type,
1191 )?;
1192 rewrite_expr_to_prunable(cast_child_expr, op, scalar_expr, schema)
1193}
1194
1195fn is_compare_op(op: Operator) -> bool {
1196 matches!(
1197 op,
1198 Operator::Eq
1199 | Operator::NotEq
1200 | Operator::Lt
1201 | Operator::LtEq
1202 | Operator::Gt
1203 | Operator::GtEq
1204 | Operator::IsDistinctFrom
1205 | Operator::IsNotDistinctFrom
1206 | Operator::LikeMatch
1207 | Operator::NotLikeMatch
1208 )
1209}
1210
1211fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1216 let from_type = match from_type {
1218 DataType::Dictionary(_, t) => {
1219 return verify_support_type_for_prune(t.as_ref(), to_type);
1220 }
1221 _ => from_type,
1222 };
1223 let to_type = match to_type {
1224 DataType::Dictionary(_, t) => {
1225 return verify_support_type_for_prune(from_type, t.as_ref());
1226 }
1227 _ => to_type,
1228 };
1229 if from_type.is_string() == to_type.is_string() {
1233 Ok(())
1234 } else {
1235 plan_err!(
1236 "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1237 )
1238 }
1239}
1240
1241fn rewrite_column_expr(
1243 e: Arc<dyn PhysicalExpr>,
1244 column_old: &phys_expr::Column,
1245 column_new: &phys_expr::Column,
1246) -> Result<Arc<dyn PhysicalExpr>> {
1247 e.transform(|expr| {
1248 if let Some(column) = expr.downcast_ref::<phys_expr::Column>()
1249 && column == column_old
1250 {
1251 return Ok(Transformed::yes(Arc::new(column_new.clone())));
1252 }
1253
1254 Ok(Transformed::no(expr))
1255 })
1256 .data()
1257}
1258
1259fn reverse_operator(op: Operator) -> Result<Operator> {
1260 op.swap().ok_or_else(|| {
1261 internal_datafusion_err!(
1262 "Could not reverse operator {op} while building pruning predicate"
1263 )
1264 })
1265}
1266
1267fn build_single_column_expr(
1272 column: &phys_expr::Column,
1273 schema: &Schema,
1274 required_columns: &mut RequiredColumns,
1275 is_not: bool, ) -> Option<Arc<dyn PhysicalExpr>> {
1277 let field = schema.field_with_name(column.name()).ok()?;
1278
1279 if *field.data_type() == DataType::Boolean {
1280 let col_ref = Arc::new(column.clone()) as _;
1281
1282 let min = required_columns
1283 .min_column_expr(column, &col_ref, field)
1284 .ok()?;
1285 let max = required_columns
1286 .max_column_expr(column, &col_ref, field)
1287 .ok()?;
1288
1289 if is_not {
1293 Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1296 phys_expr::BinaryExpr::new(min, Operator::And, max),
1297 ))))
1298 } else {
1299 Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1302 }
1303 } else {
1304 None
1305 }
1306}
1307
1308fn build_is_null_column_expr(
1317 expr: &Arc<dyn PhysicalExpr>,
1318 schema: &Schema,
1319 required_columns: &mut RequiredColumns,
1320 with_not: bool,
1321) -> Option<Arc<dyn PhysicalExpr>> {
1322 if let Some(col) = expr.downcast_ref::<phys_expr::Column>() {
1323 let field = schema.field_with_name(col.name()).ok()?;
1324
1325 let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1326 if with_not {
1327 if let Ok(row_count_expr) =
1328 required_columns.row_count_column_expr(col, expr, null_count_field)
1329 {
1330 required_columns
1331 .null_count_column_expr(col, expr, null_count_field)
1332 .map(|null_count_column_expr| {
1333 Arc::new(phys_expr::BinaryExpr::new(
1335 null_count_column_expr,
1336 Operator::NotEq,
1337 row_count_expr,
1338 )) as _
1339 })
1340 .ok()
1341 } else {
1342 None
1343 }
1344 } else {
1345 required_columns
1346 .null_count_column_expr(col, expr, null_count_field)
1347 .map(|null_count_column_expr| {
1348 Arc::new(phys_expr::BinaryExpr::new(
1350 null_count_column_expr,
1351 Operator::Gt,
1352 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1353 )) as _
1354 })
1355 .ok()
1356 }
1357 } else {
1358 None
1359 }
1360}
1361
1362const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1365
1366pub struct PredicateRewriter {
1369 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1370}
1371
1372impl Default for PredicateRewriter {
1373 fn default() -> Self {
1374 Self {
1375 unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1376 }
1377 }
1378}
1379
1380impl PredicateRewriter {
1381 pub fn new() -> Self {
1383 Self::default()
1384 }
1385
1386 pub fn with_unhandled_hook(
1388 self,
1389 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1390 ) -> Self {
1391 Self { unhandled_hook }
1392 }
1393
1394 pub fn rewrite_predicate_to_statistics_predicate(
1404 &self,
1405 expr: &Arc<dyn PhysicalExpr>,
1406 schema: &Schema,
1407 ) -> Arc<dyn PhysicalExpr> {
1408 let mut required_columns = RequiredColumns::new();
1409 build_predicate_expression(
1410 expr,
1411 &Arc::new(schema.clone()),
1412 &mut required_columns,
1413 &self.unhandled_hook,
1414 )
1415 }
1416}
1417
1418fn build_predicate_expression(
1428 expr: &Arc<dyn PhysicalExpr>,
1429 schema: &SchemaRef,
1430 required_columns: &mut RequiredColumns,
1431 unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1432) -> Arc<dyn PhysicalExpr> {
1433 if is_always_false(expr) {
1434 return Arc::clone(expr);
1437 }
1438 if let Some(is_null) = expr.downcast_ref::<phys_expr::IsNullExpr>() {
1440 return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1441 .unwrap_or_else(|| unhandled_hook.handle(expr));
1442 }
1443 if let Some(is_not_null) = expr.downcast_ref::<phys_expr::IsNotNullExpr>() {
1444 return build_is_null_column_expr(
1445 is_not_null.arg(),
1446 schema,
1447 required_columns,
1448 true,
1449 )
1450 .unwrap_or_else(|| unhandled_hook.handle(expr));
1451 }
1452 if let Some(col) = expr.downcast_ref::<phys_expr::Column>() {
1453 return build_single_column_expr(col, schema, required_columns, false)
1454 .unwrap_or_else(|| unhandled_hook.handle(expr));
1455 }
1456 if let Some(not) = expr.downcast_ref::<phys_expr::NotExpr>() {
1457 if let Some(col) = not.arg().downcast_ref::<phys_expr::Column>() {
1459 return build_single_column_expr(col, schema, required_columns, true)
1460 .unwrap_or_else(|| unhandled_hook.handle(expr));
1461 } else {
1462 return unhandled_hook.handle(expr);
1463 }
1464 }
1465 if let Some(in_list) = expr.downcast_ref::<phys_expr::InListExpr>() {
1466 if !in_list.list().is_empty()
1467 && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1468 {
1469 let eq_op = if in_list.negated() {
1470 Operator::NotEq
1471 } else {
1472 Operator::Eq
1473 };
1474 let re_op = if in_list.negated() {
1475 Operator::And
1476 } else {
1477 Operator::Or
1478 };
1479 let change_expr = in_list
1480 .list()
1481 .iter()
1482 .map(|e| {
1483 Arc::new(phys_expr::BinaryExpr::new(
1484 Arc::clone(in_list.expr()),
1485 eq_op,
1486 Arc::clone(e),
1487 )) as _
1488 })
1489 .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1490 .unwrap();
1491 return build_predicate_expression(
1492 &change_expr,
1493 schema,
1494 required_columns,
1495 unhandled_hook,
1496 );
1497 } else {
1498 return unhandled_hook.handle(expr);
1499 }
1500 }
1501
1502 let (left, op, right) = {
1503 if let Some(bin_expr) = expr.downcast_ref::<phys_expr::BinaryExpr>() {
1504 (
1505 Arc::clone(bin_expr.left()),
1506 *bin_expr.op(),
1507 Arc::clone(bin_expr.right()),
1508 )
1509 } else if let Some(like_expr) = expr.downcast_ref::<phys_expr::LikeExpr>() {
1510 if like_expr.case_insensitive() {
1511 return unhandled_hook.handle(expr);
1512 }
1513 let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1514 (false, false) => Operator::LikeMatch,
1515 (true, false) => Operator::NotLikeMatch,
1516 (false, true) => Operator::ILikeMatch,
1517 (true, true) => Operator::NotILikeMatch,
1518 };
1519 (
1520 Arc::clone(like_expr.expr()),
1521 op,
1522 Arc::clone(like_expr.pattern()),
1523 )
1524 } else {
1525 return unhandled_hook.handle(expr);
1526 }
1527 };
1528
1529 if op == Operator::And || op == Operator::Or {
1530 let left_expr =
1531 build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1532 let right_expr =
1533 build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1534 let expr = match (&left_expr, op, &right_expr) {
1536 (left, Operator::And, right)
1537 if is_always_false(left) || is_always_false(right) =>
1538 {
1539 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1540 }
1541 (left, Operator::And, _) if is_always_true(left) => right_expr,
1542 (_, Operator::And, right) if is_always_true(right) => left_expr,
1543 (left, Operator::Or, right)
1544 if is_always_true(left) || is_always_true(right) =>
1545 {
1546 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1547 }
1548 (left, Operator::Or, _) if is_always_false(left) => right_expr,
1549 (_, Operator::Or, right) if is_always_false(right) => left_expr,
1550
1551 _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1552 };
1553 return expr;
1554 }
1555
1556 let left_columns = ColumnReferenceCount::from_expression(&left);
1557 let right_columns = ColumnReferenceCount::from_expression(&right);
1558 let expr_builder = PruningExpressionBuilder::try_new(
1559 &left,
1560 &right,
1561 left_columns,
1562 right_columns,
1563 op,
1564 schema,
1565 required_columns,
1566 );
1567 let mut expr_builder = match expr_builder {
1568 Ok(builder) => builder,
1569 Err(e) => {
1572 debug!("Error building pruning expression: {e}");
1573 return unhandled_hook.handle(expr);
1574 }
1575 };
1576
1577 build_statistics_expr(&mut expr_builder)
1578 .unwrap_or_else(|_| unhandled_hook.handle(expr))
1579}
1580
1581#[derive(Debug, PartialEq, Eq)]
1591enum ColumnReferenceCount {
1592 Zero,
1594 One(phys_expr::Column),
1596 Many,
1598}
1599
1600impl ColumnReferenceCount {
1601 fn from_expression(expr: &Arc<dyn PhysicalExpr>) -> Self {
1603 let mut seen = HashSet::<phys_expr::Column>::new();
1604 expr.apply(|expr| {
1605 if let Some(column) = expr.downcast_ref::<phys_expr::Column>() {
1606 seen.insert(column.clone());
1607 if seen.len() > 1 {
1608 return Ok(TreeNodeRecursion::Stop);
1609 }
1610 }
1611 Ok(TreeNodeRecursion::Continue)
1612 })
1613 .expect("no way to return error during recursion");
1615 match seen.len() {
1616 0 => ColumnReferenceCount::Zero,
1617 1 => ColumnReferenceCount::One(
1618 seen.into_iter().next().expect("just checked len==1"),
1619 ),
1620 _ => ColumnReferenceCount::Many,
1621 }
1622 }
1623}
1624
1625fn build_statistics_expr(
1626 expr_builder: &mut PruningExpressionBuilder,
1627) -> Result<Arc<dyn PhysicalExpr>> {
1628 let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1629 Operator::NotEq => build_ne_statistics_expr(expr_builder)?,
1630 Operator::Eq => {
1631 build_eq_statistics_expr(expr_builder)?
1634 }
1635 Operator::IsDistinctFrom => return build_is_distinct_from(expr_builder),
1636 Operator::IsNotDistinctFrom => return build_is_not_distinct_from(expr_builder),
1637 Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1638 Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1639 plan_datafusion_err!(
1640 "LIKE expression with wildcard at the beginning is not supported"
1641 )
1642 })?,
1643 Operator::Gt => {
1644 Arc::new(phys_expr::BinaryExpr::new(
1646 expr_builder.max_column_expr()?,
1647 Operator::Gt,
1648 Arc::clone(expr_builder.scalar_expr()),
1649 ))
1650 }
1651 Operator::GtEq => {
1652 Arc::new(phys_expr::BinaryExpr::new(
1654 expr_builder.max_column_expr()?,
1655 Operator::GtEq,
1656 Arc::clone(expr_builder.scalar_expr()),
1657 ))
1658 }
1659 Operator::Lt => {
1660 Arc::new(phys_expr::BinaryExpr::new(
1662 expr_builder.min_column_expr()?,
1663 Operator::Lt,
1664 Arc::clone(expr_builder.scalar_expr()),
1665 ))
1666 }
1667 Operator::LtEq => {
1668 Arc::new(phys_expr::BinaryExpr::new(
1670 expr_builder.min_column_expr()?,
1671 Operator::LtEq,
1672 Arc::clone(expr_builder.scalar_expr()),
1673 ))
1674 }
1675 _ => {
1677 return plan_err!(
1678 "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1679 );
1680 }
1681 };
1682 let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1683 Ok(statistics_expr)
1684}
1685
1686fn binary_expr(
1687 left: Arc<dyn PhysicalExpr>,
1688 op: Operator,
1689 right: Arc<dyn PhysicalExpr>,
1690) -> Arc<dyn PhysicalExpr> {
1691 Arc::new(phys_expr::BinaryExpr::new(left, op, right))
1692}
1693
1694fn and_expr(
1695 left: Arc<dyn PhysicalExpr>,
1696 right: Arc<dyn PhysicalExpr>,
1697) -> Arc<dyn PhysicalExpr> {
1698 binary_expr(left, Operator::And, right)
1699}
1700
1701fn or_expr(
1702 left: Arc<dyn PhysicalExpr>,
1703 right: Arc<dyn PhysicalExpr>,
1704) -> Arc<dyn PhysicalExpr> {
1705 binary_expr(left, Operator::Or, right)
1706}
1707
1708fn build_eq_statistics_expr(
1709 expr_builder: &mut PruningExpressionBuilder,
1710) -> Result<Arc<dyn PhysicalExpr>> {
1711 let min_column_expr = expr_builder.min_column_expr()?;
1712 let max_column_expr = expr_builder.max_column_expr()?;
1713 Ok(and_expr(
1714 binary_expr(
1715 min_column_expr,
1716 Operator::LtEq,
1717 Arc::clone(expr_builder.scalar_expr()),
1718 ),
1719 binary_expr(
1720 Arc::clone(expr_builder.scalar_expr()),
1721 Operator::LtEq,
1722 max_column_expr,
1723 ),
1724 ))
1725}
1726
1727fn build_ne_statistics_expr(
1728 expr_builder: &mut PruningExpressionBuilder,
1729) -> Result<Arc<dyn PhysicalExpr>> {
1730 let min_column_expr = expr_builder.min_column_expr()?;
1731 let max_column_expr = expr_builder.max_column_expr()?;
1732 Ok(or_expr(
1733 binary_expr(
1734 min_column_expr,
1735 Operator::NotEq,
1736 Arc::clone(expr_builder.scalar_expr()),
1737 ),
1738 binary_expr(
1739 Arc::clone(expr_builder.scalar_expr()),
1740 Operator::NotEq,
1741 max_column_expr,
1742 ),
1743 ))
1744}
1745
1746fn column_has_nulls_expr(
1747 expr_builder: &mut PruningExpressionBuilder,
1748) -> Result<Arc<dyn PhysicalExpr>> {
1749 Ok(binary_expr(
1750 expr_builder.null_count_column_expr()?,
1751 Operator::Gt,
1752 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1753 ))
1754}
1755
1756fn column_has_non_nulls_expr(
1757 expr_builder: &mut PruningExpressionBuilder,
1758) -> Result<Arc<dyn PhysicalExpr>> {
1759 Ok(binary_expr(
1760 expr_builder.null_count_column_expr()?,
1761 Operator::NotEq,
1762 expr_builder.row_count_column_expr()?,
1763 ))
1764}
1765
1766fn build_is_distinct_from(
1767 expr_builder: &mut PruningExpressionBuilder,
1768) -> Result<Arc<dyn PhysicalExpr>> {
1769 let scalar_expr = Arc::clone(expr_builder.scalar_expr());
1770
1771 Ok(or_expr(
1772 and_expr(
1773 Arc::new(phys_expr::IsNullExpr::new(Arc::clone(&scalar_expr))),
1774 column_has_non_nulls_expr(expr_builder)?,
1775 ),
1776 and_expr(
1777 Arc::new(phys_expr::IsNotNullExpr::new(scalar_expr)),
1778 or_expr(
1779 column_has_nulls_expr(expr_builder)?,
1780 build_ne_statistics_expr(expr_builder)?,
1781 ),
1782 ),
1783 ))
1784}
1785
1786fn build_is_not_distinct_from(
1787 expr_builder: &mut PruningExpressionBuilder,
1788) -> Result<Arc<dyn PhysicalExpr>> {
1789 let scalar_expr = Arc::clone(expr_builder.scalar_expr());
1790
1791 Ok(or_expr(
1792 and_expr(
1793 Arc::new(phys_expr::IsNullExpr::new(Arc::clone(&scalar_expr))),
1794 column_has_nulls_expr(expr_builder)?,
1795 ),
1796 and_expr(
1797 Arc::new(phys_expr::IsNotNullExpr::new(scalar_expr)),
1798 and_expr(
1799 column_has_non_nulls_expr(expr_builder)?,
1800 build_eq_statistics_expr(expr_builder)?,
1801 ),
1802 ),
1803 ))
1804}
1805
1806fn unpack_string(s: &ScalarValue) -> Option<&str> {
1808 s.try_as_str().flatten()
1809}
1810
1811fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1812 if let Some(lit) = expr.downcast_ref::<phys_expr::Literal>() {
1813 let s = unpack_string(lit.value())?;
1814 return Some(s);
1815 }
1816 None
1817}
1818
1819fn build_like_match(
1823 expr_builder: &mut PruningExpressionBuilder,
1824) -> Option<Arc<dyn PhysicalExpr>> {
1825 let min_column_expr = expr_builder.min_column_expr().ok()?;
1836 let max_column_expr = expr_builder.max_column_expr().ok()?;
1837 let scalar_expr = expr_builder.scalar_expr();
1838 let s = extract_string_literal(scalar_expr)?;
1840 let (decoded_prefix, rest) = split_constant_prefix(s);
1842 let has_wildcard = !rest.is_empty();
1843 if has_wildcard && decoded_prefix.is_empty() {
1844 return None;
1846 }
1847 let (lower_bound, upper_bound) = if has_wildcard {
1848 let incremented_prefix = increment_utf8(&decoded_prefix)?;
1849 let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1850 decoded_prefix,
1851 ))));
1852 let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1853 incremented_prefix,
1854 ))));
1855 (lower_bound_lit, upper_bound_lit)
1856 } else {
1857 let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1859 decoded_prefix,
1860 ))));
1861 (Arc::clone(&bound), bound)
1862 };
1863 let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1864 lower_bound,
1865 Operator::LtEq,
1866 Arc::clone(&max_column_expr),
1867 ));
1868 let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1869 Arc::clone(&min_column_expr),
1870 Operator::LtEq,
1871 upper_bound,
1872 ));
1873 let combined = Arc::new(phys_expr::BinaryExpr::new(
1874 upper_bound_expr,
1875 Operator::And,
1876 lower_bound_expr,
1877 ));
1878 Some(combined)
1879}
1880
1881fn build_not_like_match(
1887 expr_builder: &mut PruningExpressionBuilder<'_>,
1888) -> Result<Arc<dyn PhysicalExpr>> {
1889 let min_column_expr = expr_builder.min_column_expr()?;
1892 let max_column_expr = expr_builder.max_column_expr()?;
1893
1894 let scalar_expr = expr_builder.scalar_expr();
1895
1896 let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1897 plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1898 })?;
1899
1900 let (const_prefix, remaining) = split_constant_prefix(pattern);
1901 if const_prefix.is_empty() || remaining != "%" {
1902 return Err(plan_datafusion_err!(
1914 "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1915 ));
1916 }
1917
1918 let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1919 true,
1920 false,
1921 Arc::clone(&min_column_expr),
1922 Arc::clone(scalar_expr),
1923 ));
1924
1925 let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1926 true,
1927 false,
1928 Arc::clone(&max_column_expr),
1929 Arc::clone(scalar_expr),
1930 ));
1931
1932 Ok(Arc::new(phys_expr::BinaryExpr::new(
1933 min_col_not_like_epxr,
1934 Operator::Or,
1935 max_col_not_like_expr,
1936 )))
1937}
1938
1939fn split_constant_prefix(pattern: &str) -> (String, &str) {
1941 let mut prefix = String::with_capacity(pattern.len());
1942 let mut iter = pattern.char_indices();
1943 while let Some((idx, c)) = iter.next() {
1944 match c {
1945 '%' | '_' => return (prefix, &pattern[idx..]),
1946 '\\' => match iter.next() {
1947 Some((_, escaped)) => prefix.push(escaped),
1948 None => prefix.push('\\'),
1949 },
1950 _ => prefix.push(c),
1951 }
1952 }
1953 (prefix, "")
1954}
1955
1956fn increment_utf8(data: &str) -> Option<String> {
1964 fn is_valid_unicode(c: char) -> bool {
1966 let cp = c as u32;
1967
1968 if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1970 return false;
1971 }
1972
1973 if cp >= 0x110000 {
1975 return false;
1976 }
1977
1978 true
1979 }
1980
1981 let mut code_points: Vec<char> = data.chars().collect();
1983
1984 for idx in (0..code_points.len()).rev() {
1986 let original = code_points[idx] as u32;
1987
1988 if let Some(next_char) = char::from_u32(original + 1)
1990 && is_valid_unicode(next_char)
1991 {
1992 code_points[idx] = next_char;
1993 code_points.truncate(idx + 1);
1995 return Some(code_points.into_iter().collect());
1996 }
1997 }
1998
1999 None
2000}
2001
2002fn wrap_null_count_check_expr(
2023 statistics_expr: Arc<dyn PhysicalExpr>,
2024 expr_builder: &mut PruningExpressionBuilder,
2025) -> Result<Arc<dyn PhysicalExpr>> {
2026 Ok(and_expr(
2028 column_has_non_nulls_expr(expr_builder)?,
2029 statistics_expr,
2030 ))
2031}
2032
2033#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2034pub(crate) enum StatisticsType {
2035 Min,
2036 Max,
2037 NullCount,
2038 RowCount,
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043 use std::collections::HashMap;
2044 use std::ops::{Not, Rem};
2045
2046 use super::*;
2047 use datafusion_common::test_util::batches_to_string;
2048 use datafusion_expr::{and, col, lit, or};
2049 use datafusion_physical_expr::utils::collect_columns;
2050 use insta::assert_snapshot;
2051
2052 use arrow::array::Decimal128Array;
2053 use arrow::{
2054 array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
2055 datatypes::TimeUnit,
2056 };
2057 use datafusion_expr::expr::InList;
2058 use datafusion_expr::{BinaryExpr, Expr, cast, is_null, try_cast};
2059 use datafusion_functions_nested::expr_fn::{array_has, make_array};
2060 use datafusion_physical_expr::expressions::{
2061 self as phys_expr, DynamicFilterPhysicalExpr,
2062 };
2063 use datafusion_physical_expr::planner::logical2physical;
2064 use itertools::Itertools;
2065
2066 #[derive(Debug, Default)]
2067 struct ContainerStats {
2075 min: Option<ArrayRef>,
2076 max: Option<ArrayRef>,
2077 null_counts: Option<ArrayRef>,
2079 row_counts: Option<ArrayRef>,
2080 contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
2084 }
2085
2086 impl ContainerStats {
2087 fn new() -> Self {
2088 Default::default()
2089 }
2090 fn new_decimal128(
2091 min: impl IntoIterator<Item = Option<i128>>,
2092 max: impl IntoIterator<Item = Option<i128>>,
2093 precision: u8,
2094 scale: i8,
2095 ) -> Self {
2096 Self::new()
2097 .with_min(Arc::new(
2098 min.into_iter()
2099 .collect::<Decimal128Array>()
2100 .with_precision_and_scale(precision, scale)
2101 .unwrap(),
2102 ))
2103 .with_max(Arc::new(
2104 max.into_iter()
2105 .collect::<Decimal128Array>()
2106 .with_precision_and_scale(precision, scale)
2107 .unwrap(),
2108 ))
2109 }
2110
2111 fn new_i64(
2112 min: impl IntoIterator<Item = Option<i64>>,
2113 max: impl IntoIterator<Item = Option<i64>>,
2114 ) -> Self {
2115 Self::new()
2116 .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
2117 .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
2118 }
2119
2120 fn new_i32(
2121 min: impl IntoIterator<Item = Option<i32>>,
2122 max: impl IntoIterator<Item = Option<i32>>,
2123 ) -> Self {
2124 Self::new()
2125 .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
2126 .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
2127 }
2128
2129 fn new_utf8<'a>(
2130 min: impl IntoIterator<Item = Option<&'a str>>,
2131 max: impl IntoIterator<Item = Option<&'a str>>,
2132 ) -> Self {
2133 Self::new()
2134 .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
2135 .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
2136 }
2137
2138 fn new_bool(
2139 min: impl IntoIterator<Item = Option<bool>>,
2140 max: impl IntoIterator<Item = Option<bool>>,
2141 ) -> Self {
2142 Self::new()
2143 .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
2144 .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
2145 }
2146
2147 fn min(&self) -> Option<ArrayRef> {
2148 self.min.clone()
2149 }
2150
2151 fn max(&self) -> Option<ArrayRef> {
2152 self.max.clone()
2153 }
2154
2155 fn null_counts(&self) -> Option<ArrayRef> {
2156 self.null_counts.clone()
2157 }
2158
2159 fn row_counts(&self) -> Option<ArrayRef> {
2160 self.row_counts.clone()
2161 }
2162
2163 fn arrays(&self) -> Vec<ArrayRef> {
2165 let contained_arrays = self
2166 .contained
2167 .iter()
2168 .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2169
2170 [
2171 self.min.as_ref().cloned(),
2172 self.max.as_ref().cloned(),
2173 self.null_counts.as_ref().cloned(),
2174 self.row_counts.as_ref().cloned(),
2175 ]
2176 .into_iter()
2177 .flatten()
2178 .chain(contained_arrays)
2179 .collect()
2180 }
2181
2182 fn len(&self) -> usize {
2186 self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2188 }
2189
2190 fn assert_invariants(&self) {
2192 let mut prev_len = None;
2193
2194 for len in self.arrays().iter().map(|a| a.len()) {
2195 match prev_len {
2197 None => {
2198 prev_len = Some(len);
2199 }
2200 Some(prev_len) => {
2201 assert_eq!(prev_len, len);
2202 }
2203 }
2204 }
2205 }
2206
2207 fn with_min(mut self, min: ArrayRef) -> Self {
2209 self.min = Some(min);
2210 self
2211 }
2212
2213 fn with_max(mut self, max: ArrayRef) -> Self {
2215 self.max = Some(max);
2216 self
2217 }
2218
2219 fn with_null_counts(
2222 mut self,
2223 counts: impl IntoIterator<Item = Option<u64>>,
2224 ) -> Self {
2225 let null_counts: ArrayRef =
2226 Arc::new(counts.into_iter().collect::<UInt64Array>());
2227
2228 self.assert_invariants();
2229 self.null_counts = Some(null_counts);
2230 self
2231 }
2232
2233 fn with_row_counts(
2236 mut self,
2237 counts: impl IntoIterator<Item = Option<u64>>,
2238 ) -> Self {
2239 let row_counts: ArrayRef =
2240 Arc::new(counts.into_iter().collect::<UInt64Array>());
2241
2242 self.assert_invariants();
2243 self.row_counts = Some(row_counts);
2244 self
2245 }
2246
2247 #[allow(clippy::allow_attributes, clippy::mutable_key_type)] pub fn with_contained(
2250 mut self,
2251 values: impl IntoIterator<Item = ScalarValue>,
2252 contained: impl IntoIterator<Item = Option<bool>>,
2253 ) -> Self {
2254 let contained: BooleanArray = contained.into_iter().collect();
2255 let values: HashSet<_> = values.into_iter().collect();
2256
2257 self.contained.push((values, contained));
2258 self.assert_invariants();
2259 self
2260 }
2261
2262 #[allow(clippy::allow_attributes, clippy::mutable_key_type)] fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2265 self.contained
2267 .iter()
2268 .find(|(values, _contained)| values == find_values)
2269 .map(|(_values, contained)| contained.clone())
2270 }
2271 }
2272
2273 #[derive(Debug, Default)]
2274 struct TestStatistics {
2275 stats: HashMap<Column, ContainerStats>,
2277 }
2278
2279 impl TestStatistics {
2280 fn new() -> Self {
2281 Self::default()
2282 }
2283
2284 fn with(
2285 mut self,
2286 name: impl Into<String>,
2287 container_stats: ContainerStats,
2288 ) -> Self {
2289 let col = Column::from_name(name.into());
2290 self.stats.insert(col, container_stats);
2291 self
2292 }
2293
2294 fn with_null_counts(
2298 mut self,
2299 name: impl Into<String>,
2300 counts: impl IntoIterator<Item = Option<u64>>,
2301 ) -> Self {
2302 let col = Column::from_name(name.into());
2303
2304 let container_stats = self
2306 .stats
2307 .remove(&col)
2308 .unwrap_or_default()
2309 .with_null_counts(counts);
2310
2311 self.stats.insert(col, container_stats);
2313 self
2314 }
2315
2316 fn with_row_counts(
2320 mut self,
2321 name: impl Into<String>,
2322 counts: impl IntoIterator<Item = Option<u64>>,
2323 ) -> Self {
2324 let col = Column::from_name(name.into());
2325
2326 let container_stats = self
2328 .stats
2329 .remove(&col)
2330 .unwrap_or_default()
2331 .with_row_counts(counts);
2332
2333 self.stats.insert(col, container_stats);
2335 self
2336 }
2337
2338 fn with_contained(
2340 mut self,
2341 name: impl Into<String>,
2342 values: impl IntoIterator<Item = ScalarValue>,
2343 contained: impl IntoIterator<Item = Option<bool>>,
2344 ) -> Self {
2345 let col = Column::from_name(name.into());
2346
2347 let container_stats = self
2349 .stats
2350 .remove(&col)
2351 .unwrap_or_default()
2352 .with_contained(values, contained);
2353
2354 self.stats.insert(col, container_stats);
2356 self
2357 }
2358 }
2359
2360 impl PruningStatistics for TestStatistics {
2361 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2362 self.stats
2363 .get(column)
2364 .map(|container_stats| container_stats.min())
2365 .unwrap_or(None)
2366 }
2367
2368 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2369 self.stats
2370 .get(column)
2371 .map(|container_stats| container_stats.max())
2372 .unwrap_or(None)
2373 }
2374
2375 fn num_containers(&self) -> usize {
2376 self.stats
2377 .values()
2378 .next()
2379 .map(|container_stats| container_stats.len())
2380 .unwrap_or(0)
2381 }
2382
2383 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2384 self.stats
2385 .get(column)
2386 .map(|container_stats| container_stats.null_counts())
2387 .unwrap_or(None)
2388 }
2389
2390 fn row_counts(&self) -> Option<ArrayRef> {
2391 self.stats
2392 .values()
2393 .find_map(|container_stats| container_stats.row_counts())
2394 }
2395
2396 fn contained(
2397 &self,
2398 column: &Column,
2399 values: &HashSet<ScalarValue>,
2400 ) -> Option<BooleanArray> {
2401 self.stats
2402 .get(column)
2403 .and_then(|container_stats| container_stats.contained(values))
2404 }
2405 }
2406
2407 struct OneContainerStats {
2409 min_values: Option<ArrayRef>,
2410 max_values: Option<ArrayRef>,
2411 num_containers: usize,
2412 }
2413
2414 impl PruningStatistics for OneContainerStats {
2415 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2416 self.min_values.clone()
2417 }
2418
2419 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2420 self.max_values.clone()
2421 }
2422
2423 fn num_containers(&self) -> usize {
2424 self.num_containers
2425 }
2426
2427 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2428 None
2429 }
2430
2431 fn row_counts(&self) -> Option<ArrayRef> {
2432 None
2433 }
2434
2435 fn contained(
2436 &self,
2437 _column: &Column,
2438 _values: &HashSet<ScalarValue>,
2439 ) -> Option<BooleanArray> {
2440 None
2441 }
2442 }
2443
2444 #[test]
2447 fn test_unique_row_count_field_and_column() {
2448 let schema: SchemaRef = Arc::new(Schema::new(vec![
2450 Field::new("c1", DataType::Int32, true),
2451 Field::new("c2", DataType::Int32, true),
2452 ]));
2453 let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2454 let expr = logical2physical(&expr, &schema);
2455 let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2456 assert_eq!(
2458 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2459 p.predicate_expr.to_string()
2460 );
2461
2462 let mut fields = HashSet::new();
2465 for (_col, _ty, field) in p.required_columns().iter() {
2466 let was_new = fields.insert(field);
2467 if !was_new {
2468 panic!(
2469 "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2470 );
2471 }
2472 }
2473 }
2474
2475 #[test]
2476 fn prune_all_rows_null_counts() {
2477 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2480 let statistics = TestStatistics::new().with(
2481 "i",
2482 ContainerStats::new_i32(
2483 vec![Some(0)], vec![Some(0)], )
2486 .with_null_counts(vec![Some(1)])
2487 .with_row_counts(vec![Some(1)]),
2488 );
2489 let expected_ret = &[false];
2490 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2491
2492 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2494 let container_stats = ContainerStats {
2495 min: Some(Arc::new(Int32Array::from(vec![None]))),
2496 max: Some(Arc::new(Int32Array::from(vec![None]))),
2497 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2498 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2499 ..ContainerStats::default()
2500 };
2501 let statistics = TestStatistics::new().with("i", container_stats);
2502 let expected_ret = &[false];
2503 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2504
2505 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2507 let container_stats = ContainerStats {
2508 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2509 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2510 null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2511 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2512 ..ContainerStats::default()
2513 };
2514 let statistics = TestStatistics::new().with("i", container_stats);
2515 let expected_ret = &[true];
2516 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2517 let expected_ret = &[false];
2518 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2519
2520 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2522 let container_stats = ContainerStats {
2523 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2524 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2525 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2526 row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2527 ..ContainerStats::default()
2528 };
2529 let statistics = TestStatistics::new().with("i", container_stats);
2530 let expected_ret = &[true];
2531 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2532 let expected_ret = &[false];
2533 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2534 }
2535
2536 #[test]
2537 fn prune_missing_statistics() {
2538 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2541 let container_stats = ContainerStats {
2542 min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2543 max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2544 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2545 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2546 ..ContainerStats::default()
2547 };
2548 let statistics = TestStatistics::new().with("i", container_stats);
2549 let expected_ret = &[true, true];
2550 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2551 let expected_ret = &[false, true];
2552 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2553 let expected_ret = &[true, false];
2554 prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2555 }
2556
2557 #[test]
2558 fn prune_null_stats() {
2559 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2562
2563 let statistics = TestStatistics::new().with(
2564 "i",
2565 ContainerStats::new_i32(
2566 vec![Some(0)], vec![Some(0)], )
2569 .with_null_counts(vec![Some(1)])
2570 .with_row_counts(vec![Some(1)]),
2571 );
2572
2573 let expected_ret = &[false];
2574
2575 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2577 }
2578
2579 #[test]
2580 fn test_build_statistics_record_batch() {
2581 let required_columns = RequiredColumns::from(vec![
2583 (
2585 phys_expr::Column::new("s1", 1),
2586 StatisticsType::Min,
2587 Field::new("s1_min", DataType::Int32, true),
2588 ),
2589 (
2591 phys_expr::Column::new("s2", 2),
2592 StatisticsType::Max,
2593 Field::new("s2_max", DataType::Int32, true),
2594 ),
2595 (
2597 phys_expr::Column::new("s3", 3),
2598 StatisticsType::Max,
2599 Field::new("s3_max", DataType::Utf8, true),
2600 ),
2601 (
2603 phys_expr::Column::new("s3", 3),
2604 StatisticsType::Min,
2605 Field::new("s3_min", DataType::Utf8, true),
2606 ),
2607 ]);
2608
2609 let statistics = TestStatistics::new()
2610 .with(
2611 "s1",
2612 ContainerStats::new_i32(
2613 vec![None, None, Some(9), None], vec![Some(10), None, None, None], ),
2616 )
2617 .with(
2618 "s2",
2619 ContainerStats::new_i32(
2620 vec![Some(2), None, None, None], vec![Some(20), None, None, None], ),
2623 )
2624 .with(
2625 "s3",
2626 ContainerStats::new_utf8(
2627 vec![Some("a"), None, None, None], vec![Some("q"), None, Some("r"), None], ),
2630 );
2631
2632 let batch =
2633 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2634 assert_snapshot!(batches_to_string(&[batch]), @r"
2635 +--------+--------+--------+--------+
2636 | s1_min | s2_max | s3_max | s3_min |
2637 +--------+--------+--------+--------+
2638 | | 20 | q | a |
2639 | | | | |
2640 | 9 | | r | |
2641 | | | | |
2642 +--------+--------+--------+--------+
2643 ");
2644 }
2645
2646 #[test]
2647 fn test_build_statistics_casting() {
2648 let required_columns = RequiredColumns::from(vec![(
2653 phys_expr::Column::new("s3", 3),
2654 StatisticsType::Min,
2655 Field::new(
2656 "s1_min",
2657 DataType::Timestamp(TimeUnit::Nanosecond, None),
2658 true,
2659 ),
2660 )]);
2661
2662 let statistics = OneContainerStats {
2664 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2665 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2666 num_containers: 1,
2667 };
2668
2669 let batch =
2670 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2671
2672 assert_snapshot!(batches_to_string(&[batch]), @r"
2673 +-------------------------------+
2674 | s1_min |
2675 +-------------------------------+
2676 | 1970-01-01T00:00:00.000000010 |
2677 +-------------------------------+
2678 ");
2679 }
2680
2681 #[test]
2682 fn test_build_statistics_no_required_stats() {
2683 let required_columns = RequiredColumns::new();
2684
2685 let statistics = OneContainerStats {
2686 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2687 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2688 num_containers: 1,
2689 };
2690
2691 let batch =
2692 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2693 assert_eq!(batch.num_rows(), 1); }
2695
2696 #[test]
2697 fn test_build_statistics_inconsistent_types() {
2698 let required_columns = RequiredColumns::from(vec![(
2702 phys_expr::Column::new("s3", 3),
2703 StatisticsType::Min,
2704 Field::new("s1_min", DataType::Utf8, true),
2705 )]);
2706
2707 let statistics = OneContainerStats {
2709 min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2710 max_values: None,
2711 num_containers: 1,
2712 };
2713
2714 let batch =
2715 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2716 assert_snapshot!(batches_to_string(&[batch]), @r"
2717 +--------+
2718 | s1_min |
2719 +--------+
2720 | |
2721 +--------+
2722 ");
2723 }
2724
2725 #[test]
2726 fn test_build_statistics_inconsistent_length() {
2727 let required_columns = RequiredColumns::from(vec![(
2729 phys_expr::Column::new("s1", 3),
2730 StatisticsType::Min,
2731 Field::new("s1_min", DataType::Int64, true),
2732 )]);
2733
2734 let statistics = OneContainerStats {
2736 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2737 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2738 num_containers: 3,
2739 };
2740
2741 let result =
2742 build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2743 assert!(
2744 result
2745 .to_string()
2746 .contains("mismatched statistics length. Expected 3, got 1"),
2747 "{}",
2748 result
2749 );
2750 }
2751
2752 #[test]
2753 fn row_group_predicate_eq() -> Result<()> {
2754 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2755 let expected_expr =
2756 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2757
2758 let expr = col("c1").eq(lit(1));
2760 let predicate_expr =
2761 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2762 assert_eq!(predicate_expr.to_string(), expected_expr);
2763
2764 let expr = lit(1).eq(col("c1"));
2766 let predicate_expr =
2767 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2768 assert_eq!(predicate_expr.to_string(), expected_expr);
2769
2770 Ok(())
2771 }
2772
2773 #[test]
2774 fn row_group_predicate_not_eq() -> Result<()> {
2775 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2776 let expected_expr =
2777 "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2778
2779 let expr = col("c1").not_eq(lit(1));
2781 let predicate_expr =
2782 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2783 assert_eq!(predicate_expr.to_string(), expected_expr);
2784
2785 let expr = lit(1).not_eq(col("c1"));
2787 let predicate_expr =
2788 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2789 assert_eq!(predicate_expr.to_string(), expected_expr);
2790
2791 Ok(())
2792 }
2793
2794 #[test]
2795 fn row_group_predicate_gt() -> Result<()> {
2796 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2797 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2798
2799 let expr = col("c1").gt(lit(1));
2801 let predicate_expr =
2802 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2803 assert_eq!(predicate_expr.to_string(), expected_expr);
2804
2805 let expr = lit(1).lt(col("c1"));
2807 let predicate_expr =
2808 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2809 assert_eq!(predicate_expr.to_string(), expected_expr);
2810
2811 Ok(())
2812 }
2813
2814 #[test]
2815 fn row_group_predicate_gt_eq() -> Result<()> {
2816 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2817 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2818
2819 let expr = col("c1").gt_eq(lit(1));
2821 let predicate_expr =
2822 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2823 assert_eq!(predicate_expr.to_string(), expected_expr);
2824 let expr = lit(1).lt_eq(col("c1"));
2826 let predicate_expr =
2827 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2828 assert_eq!(predicate_expr.to_string(), expected_expr);
2829
2830 Ok(())
2831 }
2832
2833 #[test]
2834 fn row_group_predicate_lt() -> Result<()> {
2835 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2836 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2837
2838 let expr = col("c1").lt(lit(1));
2840 let predicate_expr =
2841 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2842 assert_eq!(predicate_expr.to_string(), expected_expr);
2843
2844 let expr = lit(1).gt(col("c1"));
2846 let predicate_expr =
2847 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2848 assert_eq!(predicate_expr.to_string(), expected_expr);
2849
2850 Ok(())
2851 }
2852
2853 #[test]
2854 fn row_group_predicate_lt_eq() -> Result<()> {
2855 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2856 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2857
2858 let expr = col("c1").lt_eq(lit(1));
2860 let predicate_expr =
2861 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2862 assert_eq!(predicate_expr.to_string(), expected_expr);
2863 let expr = lit(1).gt_eq(col("c1"));
2865 let predicate_expr =
2866 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2867 assert_eq!(predicate_expr.to_string(), expected_expr);
2868
2869 Ok(())
2870 }
2871
2872 #[test]
2873 fn row_group_predicate_and() -> Result<()> {
2874 let schema = Schema::new(vec![
2875 Field::new("c1", DataType::Int32, false),
2876 Field::new("c2", DataType::Int32, false),
2877 Field::new("c3", DataType::Int32, false),
2878 ]);
2879 let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2881 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2882 let predicate_expr =
2883 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2884 assert_eq!(predicate_expr.to_string(), expected_expr);
2885
2886 Ok(())
2887 }
2888
2889 #[test]
2890 fn row_group_predicate_or() -> Result<()> {
2891 let schema = Schema::new(vec![
2892 Field::new("c1", DataType::Int32, false),
2893 Field::new("c2", DataType::Int32, false),
2894 ]);
2895 let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2897 let expected_expr = "true";
2898 let predicate_expr =
2899 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2900 assert_eq!(predicate_expr.to_string(), expected_expr);
2901
2902 Ok(())
2903 }
2904
2905 #[test]
2906 fn row_group_predicate_not() -> Result<()> {
2907 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2908 let expected_expr = "true";
2909
2910 let expr = col("c1").not();
2911 let predicate_expr =
2912 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2913 assert_eq!(predicate_expr.to_string(), expected_expr);
2914
2915 Ok(())
2916 }
2917
2918 #[test]
2919 fn row_group_predicate_not_bool() -> Result<()> {
2920 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2921 let expected_expr = "NOT c1_min@0 AND c1_max@1";
2922
2923 let expr = col("c1").not();
2924 let predicate_expr =
2925 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2926 assert_eq!(predicate_expr.to_string(), expected_expr);
2927
2928 Ok(())
2929 }
2930
2931 #[test]
2932 fn row_group_predicate_bool() -> Result<()> {
2933 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2934 let expected_expr = "c1_min@0 OR c1_max@1";
2935
2936 let expr = col("c1");
2937 let predicate_expr =
2938 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2939 assert_eq!(predicate_expr.to_string(), expected_expr);
2940
2941 Ok(())
2942 }
2943
2944 #[test]
2946 fn row_group_predicate_non_boolean() {
2947 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2948 let statistics = TestStatistics::new()
2949 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2950 let expected_ret = &[true];
2951 prune_with_expr(lit(1), &schema, &statistics, expected_ret);
2952 }
2953
2954 #[test]
2958 fn row_group_predicate_literal_false() {
2959 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2961 let statistics = TestStatistics::new()
2962 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2963 let expected_ret = &[false];
2964 prune_with_simplified_expr(lit(1).eq(lit(2)), &schema, &statistics, expected_ret);
2965 }
2966
2967 #[test]
2970 fn row_group_predicate_literal_true() {
2971 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2973 let statistics = TestStatistics::new()
2974 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2975 let expected_ret = &[true];
2976 prune_with_simplified_expr(lit(1).eq(lit(1)), &schema, &statistics, expected_ret);
2977 }
2978
2979 #[test]
2982 fn row_group_predicate_literal_null() {
2983 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2985 let statistics = TestStatistics::new()
2986 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2987 let expected_ret = &[true];
2988 prune_with_simplified_expr(
2989 lit(1).eq(lit(ScalarValue::Null)),
2990 &schema,
2991 &statistics,
2992 expected_ret,
2993 );
2994 }
2995
2996 #[test]
2999 fn row_group_predicate_complex_literals() {
3000 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
3001 let statistics = TestStatistics::new()
3002 .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
3003
3004 prune_with_simplified_expr(
3006 (lit(1) + lit(2)).gt(lit(0)),
3007 &schema,
3008 &statistics,
3009 &[true],
3010 );
3011
3012 prune_with_simplified_expr(
3014 (lit(1) + lit(2)).lt(lit(0)),
3015 &schema,
3016 &statistics,
3017 &[false],
3018 );
3019
3020 prune_with_simplified_expr(
3022 lit(true).and(lit(false)),
3023 &schema,
3024 &statistics,
3025 &[false],
3026 );
3027
3028 prune_with_simplified_expr(
3030 lit(true).or(lit(false)),
3031 &schema,
3032 &statistics,
3033 &[true],
3034 );
3035
3036 prune_with_simplified_expr(
3038 lit(1).lt(lit(2)).and(lit(3).gt(lit(1))),
3039 &schema,
3040 &statistics,
3041 &[true],
3042 );
3043
3044 prune_with_simplified_expr(
3046 lit(1).gt(lit(2)).or(lit(3).lt(lit(1))),
3047 &schema,
3048 &statistics,
3049 &[false],
3050 );
3051 }
3052
3053 #[test]
3055 fn row_group_predicate_dynamic_filter_with_literals() {
3056 let schema = Arc::new(Schema::new(vec![
3057 Field::new("c1", DataType::Int32, true),
3058 Field::new("part", DataType::Utf8, true),
3059 ]));
3060 let statistics = TestStatistics::new()
3061 .with_row_counts("c1", vec![Some(10)]);
3063 let dynamic_filter_expr = col("c1").gt(lit(5)).and(col("part").eq(lit("B")));
3064 let phys_expr = logical2physical(&dynamic_filter_expr, &schema);
3065 let children = collect_columns(&phys_expr)
3066 .iter()
3067 .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
3068 .collect_vec();
3069 let dynamic_phys_expr =
3070 Arc::new(DynamicFilterPhysicalExpr::new(children, phys_expr))
3071 as Arc<dyn PhysicalExpr>;
3072 let remapped_expr = dynamic_phys_expr
3074 .children()
3075 .into_iter()
3076 .map(|child_expr| {
3077 let Some(col_expr) = child_expr.downcast_ref::<phys_expr::Column>()
3078 else {
3079 return Arc::clone(child_expr);
3080 };
3081 if col_expr.name() == "part" {
3082 Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
3084 "A".to_string(),
3085 )))) as Arc<dyn PhysicalExpr>
3086 } else {
3087 Arc::clone(child_expr)
3088 }
3089 })
3090 .collect_vec();
3091 let dynamic_filter_expr =
3092 dynamic_phys_expr.with_new_children(remapped_expr).unwrap();
3093 let expected = &[false];
3095 let p =
3096 PruningPredicate::try_new(dynamic_filter_expr, Arc::clone(&schema)).unwrap();
3097 let result = p.prune(&statistics).unwrap();
3098 assert_eq!(result, expected);
3099 }
3100
3101 #[test]
3102 fn row_group_predicate_lt_bool() -> Result<()> {
3103 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
3104 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
3105
3106 let expr = col("c1").lt(lit(true));
3109 let predicate_expr =
3110 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3111 assert_eq!(predicate_expr.to_string(), expected_expr);
3112
3113 Ok(())
3114 }
3115
3116 #[test]
3117 fn row_group_predicate_required_columns() -> Result<()> {
3118 let schema = Schema::new(vec![
3119 Field::new("c1", DataType::Int32, false),
3120 Field::new("c2", DataType::Int32, false),
3121 ]);
3122 let mut required_columns = RequiredColumns::new();
3123 let expr = col("c1")
3125 .lt(lit(1))
3126 .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
3127 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
3128 let predicate_expr =
3129 test_build_predicate_expression(&expr, &schema, &mut required_columns);
3130 assert_eq!(predicate_expr.to_string(), expected_expr);
3131 println!("required_columns: {required_columns:#?}"); let c1_min_field = Field::new("c1_min", DataType::Int32, false);
3134 assert_eq!(
3135 required_columns.columns[0],
3136 (
3137 phys_expr::Column::new("c1", 0),
3138 StatisticsType::Min,
3139 c1_min_field.with_nullable(true) )
3141 );
3142 let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
3144 assert_eq!(
3145 required_columns.columns[1],
3146 (
3147 phys_expr::Column::new("c1", 0),
3148 StatisticsType::NullCount,
3149 c1_null_count_field.with_nullable(true) )
3151 );
3152 let row_count_field = Field::new("row_count", DataType::UInt64, false);
3154 assert_eq!(
3155 required_columns.columns[2],
3156 (
3157 phys_expr::Column::new("c1", 0),
3158 StatisticsType::RowCount,
3159 row_count_field.with_nullable(true) )
3161 );
3162 let c2_min_field = Field::new("c2_min", DataType::Int32, false);
3164 assert_eq!(
3165 required_columns.columns[3],
3166 (
3167 phys_expr::Column::new("c2", 1),
3168 StatisticsType::Min,
3169 c2_min_field.with_nullable(true) )
3171 );
3172 let c2_max_field = Field::new("c2_max", DataType::Int32, false);
3173 assert_eq!(
3174 required_columns.columns[4],
3175 (
3176 phys_expr::Column::new("c2", 1),
3177 StatisticsType::Max,
3178 c2_max_field.with_nullable(true) )
3180 );
3181 let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
3183 assert_eq!(
3184 required_columns.columns[5],
3185 (
3186 phys_expr::Column::new("c2", 1),
3187 StatisticsType::NullCount,
3188 c2_null_count_field.with_nullable(true) )
3190 );
3191 let row_count_field = Field::new("row_count", DataType::UInt64, false);
3193 assert_eq!(
3194 required_columns.columns[2],
3195 (
3196 phys_expr::Column::new("c1", 0),
3197 StatisticsType::RowCount,
3198 row_count_field.with_nullable(true) )
3200 );
3201 assert_eq!(required_columns.columns.len(), 6);
3203
3204 Ok(())
3205 }
3206
3207 #[test]
3208 fn row_group_predicate_in_list() -> Result<()> {
3209 let schema = Schema::new(vec![
3210 Field::new("c1", DataType::Int32, false),
3211 Field::new("c2", DataType::Int32, false),
3212 ]);
3213 let expr = Expr::InList(InList::new(
3215 Box::new(col("c1")),
3216 vec![lit(1), lit(2), lit(3)],
3217 false,
3218 ));
3219 let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
3220 let predicate_expr =
3221 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3222 assert_eq!(predicate_expr.to_string(), expected_expr);
3223
3224 Ok(())
3225 }
3226
3227 #[test]
3228 fn row_group_predicate_in_list_empty() -> Result<()> {
3229 let schema = Schema::new(vec![
3230 Field::new("c1", DataType::Int32, false),
3231 Field::new("c2", DataType::Int32, false),
3232 ]);
3233 let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
3235 let expected_expr = "true";
3236 let predicate_expr =
3237 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3238 assert_eq!(predicate_expr.to_string(), expected_expr);
3239
3240 Ok(())
3241 }
3242
3243 #[test]
3244 fn row_group_predicate_in_list_negated() -> Result<()> {
3245 let schema = Schema::new(vec![
3246 Field::new("c1", DataType::Int32, false),
3247 Field::new("c2", DataType::Int32, false),
3248 ]);
3249 let expr = Expr::InList(InList::new(
3251 Box::new(col("c1")),
3252 vec![lit(1), lit(2), lit(3)],
3253 true,
3254 ));
3255 let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
3256 let predicate_expr =
3257 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3258 assert_eq!(predicate_expr.to_string(), expected_expr);
3259
3260 Ok(())
3261 }
3262
3263 #[test]
3264 fn row_group_predicate_between() -> Result<()> {
3265 let schema = Schema::new(vec![
3266 Field::new("c1", DataType::Int32, false),
3267 Field::new("c2", DataType::Int32, false),
3268 ]);
3269
3270 let expr1 = col("c1").between(lit(1), lit(5));
3272
3273 let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
3275
3276 let predicate_expr1 =
3277 test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
3278
3279 let predicate_expr2 =
3280 test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
3281 assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
3282
3283 Ok(())
3284 }
3285
3286 #[test]
3287 fn row_group_predicate_between_with_in_list() -> Result<()> {
3288 let schema = Schema::new(vec![
3289 Field::new("c1", DataType::Int32, false),
3290 Field::new("c2", DataType::Int32, false),
3291 ]);
3292 let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
3294
3295 let expr2 = col("c2").between(lit(4), lit(5));
3297
3298 let expr3 = expr1.and(expr2);
3300
3301 let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
3302 let predicate_expr =
3303 test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
3304 assert_eq!(predicate_expr.to_string(), expected_expr);
3305
3306 Ok(())
3307 }
3308
3309 #[test]
3310 fn row_group_predicate_in_list_to_many_values() -> Result<()> {
3311 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3312 let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
3316
3317 let expected_expr = "true";
3318 let predicate_expr =
3319 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3320 assert_eq!(predicate_expr.to_string(), expected_expr);
3321
3322 Ok(())
3323 }
3324
3325 #[test]
3326 fn row_group_predicate_cast_int_int() -> Result<()> {
3327 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3328 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3329
3330 let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3333 let predicate_expr =
3334 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3335 assert_eq!(predicate_expr.to_string(), expected_expr);
3336
3337 let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3339 let predicate_expr =
3340 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3341 assert_eq!(predicate_expr.to_string(), expected_expr);
3342
3343 let expected_expr =
3344 "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3345
3346 let expr =
3348 try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3349 let predicate_expr =
3350 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3351 assert_eq!(predicate_expr.to_string(), expected_expr);
3352
3353 let expr =
3355 lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3356 let predicate_expr =
3357 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3358 assert_eq!(predicate_expr.to_string(), expected_expr);
3359
3360 Ok(())
3361 }
3362
3363 #[test]
3364 fn row_group_predicate_cast_string_string() -> Result<()> {
3365 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3366 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3367
3368 let expr = cast(col("c1"), DataType::Utf8)
3370 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3371 let predicate_expr =
3372 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3373 assert_eq!(predicate_expr.to_string(), expected_expr);
3374
3375 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3377 .eq(cast(col("c1"), DataType::Utf8));
3378 let predicate_expr =
3379 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3380 assert_eq!(predicate_expr.to_string(), expected_expr);
3381
3382 Ok(())
3383 }
3384
3385 #[test]
3386 fn row_group_predicate_cast_string_int() -> Result<()> {
3387 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3388 let expected_expr = "true";
3389
3390 let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3392 let predicate_expr =
3393 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3394 assert_eq!(predicate_expr.to_string(), expected_expr);
3395
3396 let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3398 let predicate_expr =
3399 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3400 assert_eq!(predicate_expr.to_string(), expected_expr);
3401
3402 Ok(())
3403 }
3404
3405 #[test]
3406 fn row_group_predicate_cast_int_string() -> Result<()> {
3407 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3408 let expected_expr = "true";
3409
3410 let expr = cast(col("c1"), DataType::Utf8)
3412 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3413 let predicate_expr =
3414 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3415 assert_eq!(predicate_expr.to_string(), expected_expr);
3416
3417 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3419 .eq(cast(col("c1"), DataType::Utf8));
3420 let predicate_expr =
3421 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3422 assert_eq!(predicate_expr.to_string(), expected_expr);
3423
3424 Ok(())
3425 }
3426
3427 #[test]
3428 fn row_group_predicate_date_date() -> Result<()> {
3429 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3430 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3431
3432 let expr =
3434 cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3435 let predicate_expr =
3436 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3437 assert_eq!(predicate_expr.to_string(), expected_expr);
3438
3439 let expr =
3441 lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3442 let predicate_expr =
3443 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3444 assert_eq!(predicate_expr.to_string(), expected_expr);
3445
3446 Ok(())
3447 }
3448
3449 #[test]
3450 fn row_group_predicate_dict_string_date() -> Result<()> {
3451 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3453 let expected_expr = "true";
3454
3455 let expr = cast(
3457 col("c1"),
3458 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3459 )
3460 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3461 let predicate_expr =
3462 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3463 assert_eq!(predicate_expr.to_string(), expected_expr);
3464
3465 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3467 col("c1"),
3468 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3469 ));
3470 let predicate_expr =
3471 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3472 assert_eq!(predicate_expr.to_string(), expected_expr);
3473
3474 Ok(())
3475 }
3476
3477 #[test]
3478 fn row_group_predicate_date_dict_string() -> Result<()> {
3479 let schema = Schema::new(vec![Field::new(
3481 "c1",
3482 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3483 false,
3484 )]);
3485 let expected_expr = "true";
3486
3487 let expr =
3489 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3490 let predicate_expr =
3491 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3492 assert_eq!(predicate_expr.to_string(), expected_expr);
3493
3494 let expr =
3496 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3497 let predicate_expr =
3498 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3499 assert_eq!(predicate_expr.to_string(), expected_expr);
3500
3501 Ok(())
3502 }
3503
3504 #[test]
3505 fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3506 let schema = Schema::new(vec![Field::new(
3508 "c1",
3509 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3510 false,
3511 )]);
3512
3513 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3515 let predicate_expr =
3516 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3517 let expected_expr =
3518 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3519 assert_eq!(predicate_expr.to_string(), expected_expr);
3520
3521 let expr = cast(
3523 col("c1"),
3524 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3525 )
3526 .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3527 let predicate_expr =
3528 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3529 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3530 assert_eq!(predicate_expr.to_string(), expected_expr);
3531
3532 Ok(())
3533 }
3534
3535 #[test]
3536 fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3537 let schema = Schema::new(vec![Field::new(
3539 "c1",
3540 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3541 false,
3542 )]);
3543 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3544
3545 let expr =
3547 cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3548 let predicate_expr =
3549 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3550 assert_eq!(predicate_expr.to_string(), expected_expr);
3551
3552 Ok(())
3553 }
3554
3555 #[test]
3556 fn row_group_predicate_nested_dict() -> Result<()> {
3557 let schema = Schema::new(vec![Field::new(
3559 "c1",
3560 DataType::Dictionary(
3561 Box::new(DataType::UInt8),
3562 Box::new(DataType::Dictionary(
3563 Box::new(DataType::UInt16),
3564 Box::new(DataType::Utf8),
3565 )),
3566 ),
3567 false,
3568 )]);
3569 let expected_expr =
3570 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3571
3572 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3574 let predicate_expr =
3575 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3576 assert_eq!(predicate_expr.to_string(), expected_expr);
3577
3578 Ok(())
3579 }
3580
3581 #[test]
3582 fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3583 let schema = Schema::new(vec![Field::new(
3585 "c1",
3586 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3587 false,
3588 )]);
3589 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3590
3591 let expr = cast(
3593 col("c1"),
3594 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3595 )
3596 .eq(lit(ScalarValue::Date64(Some(123))));
3597 let predicate_expr =
3598 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3599 assert_eq!(predicate_expr.to_string(), expected_expr);
3600
3601 Ok(())
3602 }
3603
3604 #[test]
3605 fn row_group_predicate_date_string() -> Result<()> {
3606 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3607 let expected_expr = "true";
3608
3609 let expr =
3611 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3612 let predicate_expr =
3613 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3614 assert_eq!(predicate_expr.to_string(), expected_expr);
3615
3616 let expr =
3618 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3619 let predicate_expr =
3620 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3621 assert_eq!(predicate_expr.to_string(), expected_expr);
3622
3623 Ok(())
3624 }
3625
3626 #[test]
3627 fn row_group_predicate_string_date() -> Result<()> {
3628 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3629 let expected_expr = "true";
3630
3631 let expr = cast(col("c1"), DataType::Utf8)
3633 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3634 let predicate_expr =
3635 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3636 assert_eq!(predicate_expr.to_string(), expected_expr);
3637
3638 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3640 .eq(cast(col("c1"), DataType::Utf8));
3641 let predicate_expr =
3642 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3643 assert_eq!(predicate_expr.to_string(), expected_expr);
3644
3645 Ok(())
3646 }
3647
3648 #[test]
3649 fn row_group_predicate_cast_list() -> Result<()> {
3650 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3651 let expr = Expr::InList(InList::new(
3653 Box::new(cast(col("c1"), DataType::Int64)),
3654 vec![
3655 lit(ScalarValue::Int64(Some(1))),
3656 lit(ScalarValue::Int64(Some(2))),
3657 lit(ScalarValue::Int64(Some(3))),
3658 ],
3659 false,
3660 ));
3661 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3662 let predicate_expr =
3663 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3664 assert_eq!(predicate_expr.to_string(), expected_expr);
3665
3666 let expr = Expr::InList(InList::new(
3667 Box::new(cast(col("c1"), DataType::Int64)),
3668 vec![
3669 lit(ScalarValue::Int64(Some(1))),
3670 lit(ScalarValue::Int64(Some(2))),
3671 lit(ScalarValue::Int64(Some(3))),
3672 ],
3673 true,
3674 ));
3675 let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3676 let predicate_expr =
3677 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3678 assert_eq!(predicate_expr.to_string(), expected_expr);
3679
3680 Ok(())
3681 }
3682
3683 #[test]
3684 fn prune_decimal_data() {
3685 let schema = Arc::new(Schema::new(vec![Field::new(
3687 "s1",
3688 DataType::Decimal128(9, 2),
3689 true,
3690 )]));
3691
3692 prune_with_expr(
3693 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3695 &schema,
3696 &TestStatistics::new().with(
3699 "s1",
3700 ContainerStats::new_i32(
3701 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3704 ),
3705 &[false, true, false, true],
3706 );
3707
3708 prune_with_expr(
3709 cast(col("s1"), DataType::Decimal128(14, 3))
3711 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3712 &schema,
3713 &TestStatistics::new().with(
3714 "s1",
3715 ContainerStats::new_i32(
3716 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3719 ),
3720 &[false, true, false, true],
3721 );
3722
3723 prune_with_expr(
3724 try_cast(col("s1"), DataType::Decimal128(14, 3))
3726 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3727 &schema,
3728 &TestStatistics::new().with(
3729 "s1",
3730 ContainerStats::new_i32(
3731 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3734 ),
3735 &[false, true, false, true],
3736 );
3737
3738 let schema = Arc::new(Schema::new(vec![Field::new(
3740 "s1",
3741 DataType::Decimal128(18, 2),
3742 true,
3743 )]));
3744 prune_with_expr(
3745 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3747 &schema,
3748 &TestStatistics::new().with(
3751 "s1",
3752 ContainerStats::new_i64(
3753 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3756 ),
3757 &[false, true, false, true],
3758 );
3759
3760 let schema = Arc::new(Schema::new(vec![Field::new(
3762 "s1",
3763 DataType::Decimal128(23, 2),
3764 true,
3765 )]));
3766
3767 prune_with_expr(
3768 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3770 &schema,
3771 &TestStatistics::new().with(
3772 "s1",
3773 ContainerStats::new_decimal128(
3774 vec![Some(0), Some(400), None, Some(300)], vec![Some(500), Some(600), Some(400), None], 23,
3777 2,
3778 ),
3779 ),
3780 &[false, true, false, true],
3781 );
3782 }
3783
3784 #[test]
3785 fn prune_api() {
3786 let schema = Arc::new(Schema::new(vec![
3787 Field::new("s1", DataType::Utf8, true),
3788 Field::new("s2", DataType::Int32, true),
3789 ]));
3790
3791 let statistics = TestStatistics::new().with(
3792 "s2",
3793 ContainerStats::new_i32(
3794 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), None, None], ),
3797 );
3798 prune_with_expr(
3799 col("s2").gt(lit(5)),
3801 &schema,
3802 &statistics,
3803 &[false, true, true, true],
3808 );
3809
3810 prune_with_expr(
3811 cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3813 &schema,
3814 &statistics,
3815 &[false, true, true, true],
3816 );
3817 }
3818
3819 #[test]
3820 fn prune_not_eq_data() {
3821 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3822
3823 prune_with_expr(
3824 col("s1").not_eq(lit("M")),
3826 &schema,
3827 &TestStatistics::new().with(
3828 "s1",
3829 ContainerStats::new_utf8(
3830 vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], ),
3833 ),
3834 &[true, true, true, false, true, true],
3841 );
3842 }
3843
3844 fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3860 let schema =
3861 Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3862
3863 let statistics = TestStatistics::new().with(
3864 "b1",
3865 ContainerStats::new_bool(
3866 vec![Some(false), Some(false), Some(true), None, Some(false)], vec![Some(false), Some(true), Some(true), None, None], ),
3869 );
3870 let expected_true = vec![false, true, true, true, true];
3871 let expected_false = vec![true, true, false, true, true];
3872
3873 (schema, statistics, expected_true, expected_false)
3874 }
3875
3876 #[test]
3877 fn prune_bool_const_expr() {
3878 let (schema, statistics, _, _) = bool_setup();
3879
3880 prune_with_expr(
3881 lit(true),
3883 &schema,
3884 &statistics,
3885 &[true, true, true, true, true],
3886 );
3887
3888 prune_with_expr(
3889 lit(false),
3891 &schema,
3892 &statistics,
3893 &[false, false, false, false, false],
3894 );
3895 }
3896
3897 #[test]
3898 fn prune_bool_column() {
3899 let (schema, statistics, expected_true, _) = bool_setup();
3900
3901 prune_with_expr(
3902 col("b1"),
3904 &schema,
3905 &statistics,
3906 &expected_true,
3907 );
3908 }
3909
3910 #[test]
3911 fn prune_bool_not_column() {
3912 let (schema, statistics, _, expected_false) = bool_setup();
3913
3914 prune_with_expr(
3915 col("b1").not(),
3917 &schema,
3918 &statistics,
3919 &expected_false,
3920 );
3921 }
3922
3923 #[test]
3924 fn prune_bool_column_eq_true() {
3925 let (schema, statistics, expected_true, _) = bool_setup();
3926
3927 prune_with_expr(
3928 col("b1").eq(lit(true)),
3930 &schema,
3931 &statistics,
3932 &expected_true,
3933 );
3934 }
3935
3936 #[test]
3937 fn prune_bool_not_column_eq_true() {
3938 let (schema, statistics, _, expected_false) = bool_setup();
3939
3940 prune_with_expr(
3941 col("b1").not().eq(lit(true)),
3943 &schema,
3944 &statistics,
3945 &expected_false,
3946 );
3947 }
3948
3949 fn int32_setup() -> (SchemaRef, TestStatistics) {
3959 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3960
3961 let statistics = TestStatistics::new().with(
3962 "i",
3963 ContainerStats::new_i32(
3964 vec![Some(-5), Some(1), Some(-11), None, Some(1)], vec![Some(5), Some(11), Some(-1), None, None], ),
3967 );
3968 (schema, statistics)
3969 }
3970
3971 #[test]
3972 fn prune_int32_col_gt_zero() {
3973 let (schema, statistics) = int32_setup();
3974
3975 let expected_ret = &[true, true, false, true, true];
3982
3983 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3985
3986 prune_with_expr(
3988 Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3989 &schema,
3990 &statistics,
3991 expected_ret,
3992 );
3993 }
3994
3995 #[test]
3996 fn prune_int32_col_lte_zero() {
3997 let (schema, statistics) = int32_setup();
3998
3999 let expected_ret = &[true, false, true, true, false];
4006
4007 prune_with_expr(
4008 col("i").lt_eq(lit(0)),
4010 &schema,
4011 &statistics,
4012 expected_ret,
4013 );
4014
4015 prune_with_expr(
4016 Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
4018 &schema,
4019 &statistics,
4020 expected_ret,
4021 );
4022 }
4023
4024 #[test]
4025 fn prune_int32_col_lte_zero_cast() {
4026 let (schema, statistics) = int32_setup();
4027
4028 let expected_ret = &[true, true, true, true, true];
4035
4036 prune_with_expr(
4037 cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
4039 &schema,
4040 &statistics,
4041 expected_ret,
4042 );
4043
4044 prune_with_expr(
4045 try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
4047 &schema,
4048 &statistics,
4049 expected_ret,
4050 );
4051
4052 prune_with_expr(
4053 cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
4055 &schema,
4056 &statistics,
4057 expected_ret,
4058 );
4059
4060 prune_with_expr(
4061 try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
4063 &schema,
4064 &statistics,
4065 expected_ret,
4066 );
4067 }
4068
4069 #[test]
4070 fn prune_int32_col_eq_zero() {
4071 let (schema, statistics) = int32_setup();
4072
4073 let expected_ret = &[true, false, false, true, false];
4080
4081 prune_with_expr(
4082 col("i").eq(lit(0)),
4084 &schema,
4085 &statistics,
4086 expected_ret,
4087 );
4088 }
4089
4090 #[test]
4091 fn prune_int32_col_is_not_distinct_from() {
4092 let (schema, statistics) = int32_setup();
4093
4094 let expected_ret = &[true, false, false, true, false];
4097
4098 prune_with_expr(
4099 is_not_distinct_from(col("i"), lit(0)),
4100 &schema,
4101 &statistics,
4102 expected_ret,
4103 );
4104
4105 prune_with_expr(
4108 is_not_distinct_from(lit(0), col("i")),
4109 &schema,
4110 &statistics,
4111 expected_ret,
4112 );
4113
4114 let statistics = statistics
4115 .with_row_counts("i", vec![Some(10), Some(9), None, Some(4), Some(10)])
4116 .with_null_counts("i", vec![Some(0), Some(1), None, Some(4), Some(0)]);
4117
4118 let expected_ret = &[true, false, false, false, false];
4119 prune_with_expr(
4120 is_not_distinct_from(col("i"), lit(0)),
4121 &schema,
4122 &statistics,
4123 expected_ret,
4124 );
4125
4126 let expected_ret = &[false, true, true, true, false];
4127 prune_with_expr(
4128 is_not_distinct_from(col("i"), lit(ScalarValue::Int32(None))),
4129 &schema,
4130 &statistics,
4131 expected_ret,
4132 );
4133 }
4134
4135 #[test]
4136 fn prune_int32_col_is_distinct_from() {
4137 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
4138 let statistics = TestStatistics::new().with(
4139 "i",
4140 ContainerStats::new_i32(
4141 vec![Some(0), Some(0), Some(5), None],
4142 vec![Some(0), Some(2), Some(5), None],
4143 )
4144 .with_row_counts(vec![Some(2), Some(2), Some(2), Some(2)])
4145 .with_null_counts(vec![Some(0), Some(0), Some(0), Some(2)]),
4146 );
4147
4148 let expected_ret = &[false, true, true, true];
4149 prune_with_expr(
4150 is_distinct_from(col("i"), lit(0)),
4151 &schema,
4152 &statistics,
4153 expected_ret,
4154 );
4155
4156 prune_with_expr(
4159 is_distinct_from(lit(0), col("i")),
4160 &schema,
4161 &statistics,
4162 expected_ret,
4163 );
4164
4165 let expected_ret = &[true, true, true, false];
4166 prune_with_expr(
4167 is_distinct_from(col("i"), lit(ScalarValue::Int32(None))),
4168 &schema,
4169 &statistics,
4170 expected_ret,
4171 );
4172 }
4173
4174 #[test]
4175 fn prune_int32_col_eq_zero_cast() {
4176 let (schema, statistics) = int32_setup();
4177
4178 let expected_ret = &[true, false, false, true, false];
4185
4186 prune_with_expr(
4187 cast(col("i"), DataType::Int64).eq(lit(0i64)),
4188 &schema,
4189 &statistics,
4190 expected_ret,
4191 );
4192
4193 prune_with_expr(
4194 try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
4195 &schema,
4196 &statistics,
4197 expected_ret,
4198 );
4199 }
4200
4201 #[test]
4202 fn prune_int32_col_eq_zero_cast_as_str() {
4203 let (schema, statistics) = int32_setup();
4204
4205 let expected_ret = &[true, true, true, true, true];
4215
4216 prune_with_expr(
4217 cast(col("i"), DataType::Utf8).eq(lit("0")),
4218 &schema,
4219 &statistics,
4220 expected_ret,
4221 );
4222 }
4223
4224 #[test]
4225 fn prune_int32_col_lt_neg_one() {
4226 let (schema, statistics) = int32_setup();
4227
4228 let expected_ret = &[true, true, false, true, true];
4235
4236 prune_with_expr(
4237 col("i").gt(lit(-1)),
4239 &schema,
4240 &statistics,
4241 expected_ret,
4242 );
4243
4244 prune_with_expr(
4245 Expr::Negative(Box::new(col("i"))).lt(lit(1)),
4247 &schema,
4248 &statistics,
4249 expected_ret,
4250 );
4251 }
4252
4253 #[test]
4254 fn prune_int32_is_null() {
4255 let (schema, statistics) = int32_setup();
4256
4257 let expected_ret = &[true, true, true, true, true];
4260
4261 prune_with_expr(
4262 col("i").is_null(),
4264 &schema,
4265 &statistics,
4266 expected_ret,
4267 );
4268
4269 let statistics = statistics.with_null_counts(
4271 "i",
4272 vec![
4273 Some(0), Some(1), None, None, Some(0), ],
4279 );
4280
4281 let expected_ret = &[false, true, true, true, false];
4282
4283 prune_with_expr(
4284 col("i").is_null(),
4286 &schema,
4287 &statistics,
4288 expected_ret,
4289 );
4290 }
4291
4292 #[test]
4293 fn prune_int32_column_is_known_all_null() {
4294 let (schema, statistics) = int32_setup();
4295
4296 let expected_ret = &[true, false, true, true, false];
4303
4304 prune_with_expr(
4305 col("i").lt(lit(0)),
4307 &schema,
4308 &statistics,
4309 expected_ret,
4310 );
4311
4312 let statistics = statistics.with_row_counts(
4314 "i",
4315 vec![
4316 Some(10), Some(9), None, Some(4),
4320 Some(10),
4321 ],
4322 );
4323
4324 prune_with_expr(
4326 col("i").lt(lit(0)),
4328 &schema,
4329 &statistics,
4330 expected_ret,
4331 );
4332
4333 let statistics = statistics.with_null_counts(
4335 "i",
4336 vec![
4337 Some(0), Some(1), None, Some(4), Some(0), ],
4343 );
4344
4345 let expected_ret = &[true, false, true, false, false];
4354
4355 prune_with_expr(
4356 col("i").lt(lit(0)),
4358 &schema,
4359 &statistics,
4360 expected_ret,
4361 );
4362 }
4363
4364 #[test]
4365 fn prune_cast_scalar() {
4366 let (schema, statistics) = int32_setup();
4368 let expected_ret = &[true, true, false, true, true];
4369
4370 prune_with_expr(
4371 col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
4373 &schema,
4374 &statistics,
4375 expected_ret,
4376 );
4377
4378 prune_with_expr(
4379 cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4381 &schema,
4382 &statistics,
4383 expected_ret,
4384 );
4385
4386 prune_with_expr(
4387 try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4389 &schema,
4390 &statistics,
4391 expected_ret,
4392 );
4393
4394 prune_with_expr(
4395 Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
4397 .lt(lit(ScalarValue::Int64(Some(0)))),
4398 &schema,
4399 &statistics,
4400 expected_ret,
4401 );
4402 }
4403
4404 #[test]
4405 fn test_increment_utf8() {
4406 assert_eq!(increment_utf8("abc").unwrap(), "abd");
4408 assert_eq!(increment_utf8("abz").unwrap(), "ab{");
4409
4410 assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); assert_eq!(increment_utf8("ß").unwrap(), "à"); assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); assert!(increment_utf8("").is_none());
4426 assert!(increment_utf8("\u{10FFFF}").is_none()); assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4430
4431 assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4433 assert!(increment_utf8("\u{D7FF}").is_none());
4434
4435 assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4437 assert!(increment_utf8("\u{FDCF}").is_none());
4438
4439 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4441 assert!(increment_utf8("\u{10FFFF}").is_none()); }
4443
4444 fn utf8_setup() -> (SchemaRef, TestStatistics) {
4457 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4458
4459 let statistics = TestStatistics::new().with(
4460 "s1",
4461 ContainerStats::new_utf8(
4462 vec![
4463 Some("A"),
4464 Some("A"),
4465 Some("N"),
4466 Some("M"),
4467 None,
4468 Some("A"),
4469 Some(""),
4470 Some(""),
4471 Some("AB"),
4472 Some("A\u{10ffff}\u{10ffff}"),
4473 ], vec![
4475 Some("Z"),
4476 Some("L"),
4477 Some("Z"),
4478 Some("M"),
4479 None,
4480 None,
4481 Some("A"),
4482 Some(""),
4483 Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4484 Some("A\u{10ffff}\u{10ffff}"),
4485 ], ),
4487 );
4488 (schema, statistics)
4489 }
4490
4491 #[test]
4492 fn prune_utf8_eq() {
4493 let (schema, statistics) = utf8_setup();
4494
4495 let expr = col("s1").eq(lit("A"));
4496 #[rustfmt::skip]
4497 let expected_ret = &[
4498 true,
4500 true,
4502 false,
4504 false,
4506 true,
4508 true,
4510 true,
4512 false,
4514 false,
4516 false,
4518 ];
4519 prune_with_expr(expr, &schema, &statistics, expected_ret);
4520
4521 let expr = col("s1").eq(lit(""));
4522 #[rustfmt::skip]
4523 let expected_ret = &[
4524 false,
4526 false,
4528 false,
4530 false,
4532 true,
4534 false,
4536 true,
4538 true,
4540 false,
4542 false,
4544 ];
4545 prune_with_expr(expr, &schema, &statistics, expected_ret);
4546 }
4547
4548 #[test]
4549 fn prune_utf8_not_eq() {
4550 let (schema, statistics) = utf8_setup();
4551
4552 let expr = col("s1").not_eq(lit("A"));
4553 #[rustfmt::skip]
4554 let expected_ret = &[
4555 true,
4557 true,
4559 true,
4561 true,
4563 true,
4565 true,
4567 true,
4569 true,
4571 true,
4573 true,
4575 ];
4576 prune_with_expr(expr, &schema, &statistics, expected_ret);
4577
4578 let expr = col("s1").not_eq(lit(""));
4579 #[rustfmt::skip]
4580 let expected_ret = &[
4581 true,
4583 true,
4585 true,
4587 true,
4589 true,
4591 true,
4593 true,
4595 false,
4597 true,
4599 true,
4601 ];
4602 prune_with_expr(expr, &schema, &statistics, expected_ret);
4603 }
4604
4605 #[test]
4606 fn prune_utf8_like_one() {
4607 let (schema, statistics) = utf8_setup();
4608
4609 let expr = col("s1").like(lit("A_"));
4610 #[rustfmt::skip]
4611 let expected_ret = &[
4612 true,
4614 true,
4616 false,
4618 false,
4620 true,
4622 true,
4624 true,
4626 false,
4628 true,
4630 true,
4632 ];
4633 prune_with_expr(expr, &schema, &statistics, expected_ret);
4634
4635 let expr = col("s1").like(lit("_A_"));
4636 #[rustfmt::skip]
4637 let expected_ret = &[
4638 true,
4640 true,
4642 true,
4644 true,
4646 true,
4648 true,
4650 true,
4652 true,
4654 true,
4656 true,
4658 ];
4659 prune_with_expr(expr, &schema, &statistics, expected_ret);
4660
4661 let expr = col("s1").like(lit("_"));
4662 #[rustfmt::skip]
4663 let expected_ret = &[
4664 true,
4666 true,
4668 true,
4670 true,
4672 true,
4674 true,
4676 true,
4678 true,
4680 true,
4682 true,
4684 ];
4685 prune_with_expr(expr, &schema, &statistics, expected_ret);
4686
4687 let expr = col("s1").like(lit(""));
4688 #[rustfmt::skip]
4689 let expected_ret = &[
4690 false,
4692 false,
4694 false,
4696 false,
4698 true,
4700 false,
4702 true,
4704 true,
4706 false,
4708 false,
4710 ];
4711 prune_with_expr(expr, &schema, &statistics, expected_ret);
4712 }
4713
4714 #[test]
4715 fn prune_utf8_like_many() {
4716 let (schema, statistics) = utf8_setup();
4717
4718 let expr = col("s1").like(lit("A%"));
4719 #[rustfmt::skip]
4720 let expected_ret = &[
4721 true,
4723 true,
4725 false,
4727 false,
4729 true,
4731 true,
4733 true,
4735 false,
4737 true,
4739 true,
4741 ];
4742 prune_with_expr(expr, &schema, &statistics, expected_ret);
4743
4744 let expr = col("s1").like(lit("%A%"));
4745 #[rustfmt::skip]
4746 let expected_ret = &[
4747 true,
4749 true,
4751 true,
4753 true,
4755 true,
4757 true,
4759 true,
4761 true,
4763 true,
4765 true,
4767 ];
4768 prune_with_expr(expr, &schema, &statistics, expected_ret);
4769
4770 let expr = col("s1").like(lit("%"));
4771 #[rustfmt::skip]
4772 let expected_ret = &[
4773 true,
4775 true,
4777 true,
4779 true,
4781 true,
4783 true,
4785 true,
4787 true,
4789 true,
4791 true,
4793 ];
4794 prune_with_expr(expr, &schema, &statistics, expected_ret);
4795
4796 let expr = col("s1").like(lit(""));
4797 #[rustfmt::skip]
4798 let expected_ret = &[
4799 false,
4801 false,
4803 false,
4805 false,
4807 true,
4809 false,
4811 true,
4813 true,
4815 false,
4817 false,
4819 ];
4820 prune_with_expr(expr, &schema, &statistics, expected_ret);
4821 }
4822
4823 #[test]
4826 fn prune_utf8_like_escaped_chars() {
4827 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4828 let statistics = TestStatistics::new().with(
4829 "s1",
4830 ContainerStats::new_utf8(
4831 vec![
4832 Some("foo_aaa"),
4833 Some(r#"foo\aaa"#),
4834 Some("foo"),
4835 Some("bar"),
4836 Some("foo%aaa"),
4837 Some("%foo_aaa"),
4838 ], vec![
4840 Some("foo_zzz"),
4841 Some(r#"foo\zzz"#),
4842 Some("foozzz"),
4843 Some("baz"),
4844 Some("foo%zzz"),
4845 Some("%foo_zzz"),
4846 ], ),
4848 );
4849
4850 let expr = col("s1").like(lit(r#"foo\_%"#));
4851 #[rustfmt::skip]
4852 let expected_ret = &[
4853 true,
4856 false,
4858 true,
4861 false,
4863 false,
4865 false,
4867 ];
4868 prune_with_expr(expr, &schema, &statistics, expected_ret);
4869
4870 let expr = col("s1").like(lit(r#"foo\\%"#));
4871 #[rustfmt::skip]
4872 let expected_ret = &[
4873 false,
4875 true,
4878 true,
4881 false,
4883 false,
4885 false,
4887 ];
4888 prune_with_expr(expr, &schema, &statistics, expected_ret);
4889
4890 let expr = col("s1").like(lit(r#"foo\%%"#));
4891 #[rustfmt::skip]
4892 let expected_ret = &[
4893 false,
4895 false,
4897 true,
4899 false,
4901 true,
4904 false,
4906 ];
4907 prune_with_expr(expr, &schema, &statistics, expected_ret);
4908
4909 let expr = col("s1").like(lit(r#"foo\_"#));
4912 #[rustfmt::skip]
4913 let expected_ret = &[
4914 false,
4916 false,
4918 true,
4920 false,
4922 false,
4924 false,
4926 ];
4927 prune_with_expr(expr, &schema, &statistics, expected_ret);
4928
4929 let expr = col("s1").like(lit(r#"\%foo%"#));
4932 #[rustfmt::skip]
4933 let expected_ret = &[
4934 false,
4936 false,
4938 false,
4940 false,
4942 false,
4944 true,
4947 ];
4948 prune_with_expr(expr, &schema, &statistics, expected_ret);
4949
4950 let expr = col("s1").like(lit(r#"foo\%\_"#));
4952 #[rustfmt::skip]
4953 let expected_ret = &[
4954 false,
4956 false,
4958 true,
4960 false,
4962 false,
4964 false,
4966 ];
4967 prune_with_expr(expr, &schema, &statistics, expected_ret);
4968
4969 let expr = col("s1").like(lit(r#"foo\\bar%"#));
4972 #[rustfmt::skip]
4973 let expected_ret = &[
4974 false,
4976 true,
4979 true,
4981 false,
4983 false,
4985 false,
4987 ];
4988 prune_with_expr(expr, &schema, &statistics, expected_ret);
4989 }
4990
4991 #[test]
4992 fn prune_utf8_not_like_one() {
4993 let (schema, statistics) = utf8_setup();
4994
4995 let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4996 #[rustfmt::skip]
4997 let expected_ret = &[
4998 true,
5000 true,
5002 true,
5004 true,
5006 true,
5008 true,
5010 true,
5012 true,
5014 true,
5016 true,
5019 ];
5020 prune_with_expr(expr, &schema, &statistics, expected_ret);
5021 }
5022
5023 #[test]
5024 fn prune_utf8_not_like_many() {
5025 let (schema, statistics) = utf8_setup();
5026
5027 let expr = col("s1").not_like(lit("A\u{10ffff}%"));
5028 #[rustfmt::skip]
5029 let expected_ret = &[
5030 true,
5032 true,
5034 true,
5036 true,
5038 true,
5040 true,
5042 true,
5044 true,
5046 true,
5048 false,
5050 ];
5051 prune_with_expr(expr, &schema, &statistics, expected_ret);
5052
5053 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
5054 #[rustfmt::skip]
5055 let expected_ret = &[
5056 true,
5058 true,
5060 true,
5062 true,
5064 true,
5066 true,
5068 true,
5070 true,
5072 true,
5074 true,
5076 ];
5077 prune_with_expr(expr, &schema, &statistics, expected_ret);
5078
5079 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
5080 #[rustfmt::skip]
5081 let expected_ret = &[
5082 true,
5084 true,
5086 true,
5088 true,
5090 true,
5092 true,
5094 true,
5096 true,
5098 true,
5100 true,
5102 ];
5103 prune_with_expr(expr, &schema, &statistics, expected_ret);
5104
5105 let expr = col("s1").not_like(lit("A\\%%"));
5106 let statistics = TestStatistics::new().with(
5107 "s1",
5108 ContainerStats::new_utf8(
5109 vec![Some("A%a"), Some("A")],
5110 vec![Some("A%c"), Some("A")],
5111 ),
5112 );
5113 let expected_ret = &[false, true];
5114 prune_with_expr(expr, &schema, &statistics, expected_ret);
5115 }
5116
5117 #[test]
5118 fn test_rewrite_expr_to_prunable() {
5119 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
5120 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
5121
5122 let left_input = col("a");
5124 let left_input = logical2physical(&left_input, &schema);
5125 let right_input = lit(ScalarValue::Int32(Some(12)));
5126 let right_input = logical2physical(&right_input, &schema);
5127 let (result_left, _, result_right) = rewrite_expr_to_prunable(
5128 &left_input,
5129 Operator::Eq,
5130 &right_input,
5131 df_schema.clone(),
5132 )
5133 .unwrap();
5134 assert_eq!(result_left.to_string(), left_input.to_string());
5135 assert_eq!(result_right.to_string(), right_input.to_string());
5136
5137 let left_input = cast(col("a"), DataType::Decimal128(20, 3));
5139 let left_input = logical2physical(&left_input, &schema);
5140 let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
5141 let right_input = logical2physical(&right_input, &schema);
5142 let (result_left, _, result_right) = rewrite_expr_to_prunable(
5143 &left_input,
5144 Operator::Gt,
5145 &right_input,
5146 df_schema.clone(),
5147 )
5148 .unwrap();
5149 assert_eq!(result_left.to_string(), left_input.to_string());
5150 assert_eq!(result_right.to_string(), right_input.to_string());
5151
5152 let left_input = try_cast(col("a"), DataType::Int64);
5154 let left_input = logical2physical(&left_input, &schema);
5155 let right_input = lit(ScalarValue::Int64(Some(12)));
5156 let right_input = logical2physical(&right_input, &schema);
5157 let (result_left, _, result_right) =
5158 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
5159 .unwrap();
5160 assert_eq!(result_left.to_string(), left_input.to_string());
5161 assert_eq!(result_right.to_string(), right_input.to_string());
5162
5163 }
5165
5166 #[test]
5167 fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
5168 struct CustomUnhandledHook;
5169
5170 impl UnhandledPredicateHook for CustomUnhandledHook {
5171 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
5175 Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
5176 }
5177 }
5178
5179 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
5180 let schema_with_b = Schema::new(vec![
5181 Field::new("a", DataType::Int32, true),
5182 Field::new("b", DataType::Int32, true),
5183 ]);
5184
5185 let rewriter = PredicateRewriter::new()
5186 .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
5187
5188 let transform_expr = |expr| {
5189 let expr = logical2physical(&expr, &schema_with_b);
5190 rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
5191 };
5192
5193 let known_expression = col("a").eq(lit(12));
5195 let known_expression_transformed = PredicateRewriter::new()
5196 .rewrite_predicate_to_statistics_predicate(
5197 &logical2physical(&known_expression, &schema),
5198 &schema,
5199 );
5200
5201 let input = col("b").eq(lit(12));
5203 let expected = logical2physical(&lit(42), &schema);
5204 let transformed = transform_expr(input.clone());
5205 assert_eq!(transformed.to_string(), expected.to_string());
5206
5207 let input = known_expression.clone().and(input.clone());
5209 let expected = phys_expr::BinaryExpr::new(
5210 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
5211 Operator::And,
5212 logical2physical(&lit(42), &schema),
5213 );
5214 let transformed = transform_expr(input.clone());
5215 assert_eq!(transformed.to_string(), expected.to_string());
5216
5217 let input = array_has(make_array(vec![lit(1)]), col("a"));
5219 let expected = logical2physical(&lit(42), &schema);
5220 let transformed = transform_expr(input.clone());
5221 assert_eq!(transformed.to_string(), expected.to_string());
5222
5223 let input = known_expression.and(input);
5225 let expected = phys_expr::BinaryExpr::new(
5226 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
5227 Operator::And,
5228 logical2physical(&lit(42), &schema),
5229 );
5230 let transformed = transform_expr(input.clone());
5231 assert_eq!(transformed.to_string(), expected.to_string());
5232 }
5233
5234 #[test]
5235 fn test_rewrite_expr_to_prunable_error() {
5236 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
5239 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
5240 let left_input = cast(col("a"), DataType::Int64);
5241 let left_input = logical2physical(&left_input, &schema);
5242 let right_input = lit(ScalarValue::Int64(Some(12)));
5243 let right_input = logical2physical(&right_input, &schema);
5244 let result = rewrite_expr_to_prunable(
5245 &left_input,
5246 Operator::Gt,
5247 &right_input,
5248 df_schema.clone(),
5249 );
5250 assert!(result.is_err());
5251
5252 let left_input = is_null(col("a"));
5254 let left_input = logical2physical(&left_input, &schema);
5255 let right_input = lit(ScalarValue::Int64(Some(12)));
5256 let right_input = logical2physical(&right_input, &schema);
5257 let result =
5258 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
5259 assert!(result.is_err());
5260 }
5262
5263 #[test]
5264 fn prune_with_contained_one_column() {
5265 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
5266
5267 let statistics = TestStatistics::new()
5269 .with_contained(
5270 "s1",
5271 [ScalarValue::from("foo")],
5272 [
5273 Some(true),
5275 Some(false),
5277 None,
5279 Some(true),
5281 Some(false),
5283 None,
5285 Some(true),
5287 Some(false),
5289 None,
5291 ],
5292 )
5293 .with_contained(
5294 "s1",
5295 [ScalarValue::from("bar")],
5296 [
5297 Some(true),
5299 Some(true),
5300 Some(true),
5301 Some(false),
5303 Some(false),
5304 Some(false),
5305 None,
5307 None,
5308 None,
5309 ],
5310 )
5311 .with_contained(
5312 "s1",
5315 [ScalarValue::from("foo"), ScalarValue::from("bar")],
5316 [
5317 None,
5319 None,
5320 None,
5321 Some(true),
5323 Some(true),
5324 Some(true),
5325 Some(false),
5327 Some(false),
5328 Some(false),
5329 ],
5330 );
5331
5332 prune_with_expr(
5334 col("s1").eq(lit("foo")),
5335 &schema,
5336 &statistics,
5337 &[true, false, true, true, false, true, true, false, true],
5339 );
5340
5341 prune_with_expr(
5343 col("s1").eq(lit("bar")),
5344 &schema,
5345 &statistics,
5346 &[true, true, true, false, false, false, true, true, true],
5348 );
5349
5350 prune_with_expr(
5352 col("s1").eq(lit("baz")),
5353 &schema,
5354 &statistics,
5355 &[true, true, true, true, true, true, true, true, true],
5357 );
5358
5359 prune_with_expr(
5361 col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
5362 &schema,
5363 &statistics,
5364 &[true, true, true, true, true, true, true, true, true],
5368 );
5369
5370 prune_with_expr(
5372 col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
5373 &schema,
5374 &statistics,
5375 &[true, true, true, true, true, true, false, false, false],
5377 );
5378
5379 prune_with_expr(
5381 col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
5382 &schema,
5383 &statistics,
5384 &[true, true, true, true, true, true, true, true, true],
5386 );
5387
5388 prune_with_expr(
5390 col("s1")
5391 .eq(lit("foo"))
5392 .or(col("s1").eq(lit("bar")))
5393 .or(col("s1").eq(lit("baz"))),
5394 &schema,
5395 &statistics,
5396 &[true, true, true, true, true, true, true, true, true],
5399 );
5400
5401 prune_with_expr(
5403 col("s1").not_eq(lit("foo")),
5404 &schema,
5405 &statistics,
5406 &[false, true, true, false, true, true, false, true, true],
5408 );
5409
5410 prune_with_expr(
5412 col("s1").not_eq(lit("bar")),
5413 &schema,
5414 &statistics,
5415 &[false, false, false, true, true, true, true, true, true],
5417 );
5418
5419 prune_with_expr(
5421 col("s1")
5422 .not_eq(lit("foo"))
5423 .and(col("s1").not_eq(lit("bar"))),
5424 &schema,
5425 &statistics,
5426 &[true, true, true, false, false, false, true, true, true],
5428 );
5429
5430 prune_with_expr(
5432 col("s1")
5433 .not_eq(lit("foo"))
5434 .and(col("s1").not_eq(lit("bar")))
5435 .and(col("s1").not_eq(lit("baz"))),
5436 &schema,
5437 &statistics,
5438 &[true, true, true, true, true, true, true, true, true],
5440 );
5441
5442 prune_with_expr(
5444 col("s1")
5445 .not_eq(lit("foo"))
5446 .or(col("s1").not_eq(lit("bar"))),
5447 &schema,
5448 &statistics,
5449 &[true, true, true, true, true, true, true, true, true],
5451 );
5452
5453 prune_with_expr(
5455 col("s1")
5456 .not_eq(lit("foo"))
5457 .or(col("s1").not_eq(lit("bar")))
5458 .or(col("s1").not_eq(lit("baz"))),
5459 &schema,
5460 &statistics,
5461 &[true, true, true, true, true, true, true, true, true],
5463 );
5464 }
5465
5466 #[test]
5467 fn prune_with_contained_two_columns() {
5468 let schema = Arc::new(Schema::new(vec![
5469 Field::new("s1", DataType::Utf8, true),
5470 Field::new("s2", DataType::Utf8, true),
5471 ]));
5472
5473 let statistics = TestStatistics::new()
5475 .with_contained(
5476 "s1",
5477 [ScalarValue::from("foo")],
5478 [
5479 Some(true),
5481 Some(false),
5483 None,
5485 Some(true),
5487 Some(false),
5489 None,
5491 Some(true),
5493 Some(false),
5495 None,
5497 ],
5498 )
5499 .with_contained(
5500 "s2", [ScalarValue::from("bar")],
5502 [
5503 Some(true),
5505 Some(true),
5506 Some(true),
5507 Some(false),
5509 Some(false),
5510 Some(false),
5511 None,
5513 None,
5514 None,
5515 ],
5516 );
5517
5518 prune_with_expr(
5520 col("s1").eq(lit("foo")),
5521 &schema,
5522 &statistics,
5523 &[true, false, true, true, false, true, true, false, true],
5525 );
5526
5527 let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
5529 prune_with_expr(
5530 expr,
5531 &schema,
5532 &statistics,
5533 &[true, true, true, true, true, true, true, true, true],
5535 );
5536
5537 prune_with_expr(
5539 col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
5540 &schema,
5541 &statistics,
5542 &[false, false, false, true, false, true, true, false, true],
5546 );
5547
5548 prune_with_expr(
5550 col("s1")
5551 .not_eq(lit("foo"))
5552 .and(col("s2").not_eq(lit("bar"))),
5553 &schema,
5554 &statistics,
5555 &[false, false, false, false, true, true, false, true, true],
5559 );
5560
5561 prune_with_expr(
5563 col("s1")
5564 .not_eq(lit("foo"))
5565 .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
5566 &schema,
5567 &statistics,
5568 &[false, true, true, false, true, true, false, true, true],
5571 );
5572
5573 prune_with_expr(
5575 col("s1").like(lit("foo%bar%")),
5576 &schema,
5577 &statistics,
5578 &[true, true, true, true, true, true, true, true, true],
5580 );
5581
5582 prune_with_expr(
5584 col("s1")
5585 .like(lit("foo%bar%"))
5586 .and(col("s2").eq(lit("bar"))),
5587 &schema,
5588 &statistics,
5589 &[true, true, true, false, false, false, true, true, true],
5591 );
5592
5593 prune_with_expr(
5595 col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5596 &schema,
5597 &statistics,
5598 &[true, true, true, true, true, true, true, true, true],
5601 );
5602 }
5603
5604 #[test]
5605 fn prune_with_range_and_contained() {
5606 let schema = Arc::new(Schema::new(vec![
5608 Field::new("i", DataType::Int32, true),
5609 Field::new("s", DataType::Utf8, true),
5610 ]));
5611
5612 let statistics = TestStatistics::new()
5613 .with(
5614 "i",
5615 ContainerStats::new_i32(
5616 vec![
5620 Some(-5),
5621 Some(10),
5622 None,
5623 Some(-5),
5624 Some(10),
5625 None,
5626 Some(-5),
5627 Some(10),
5628 None,
5629 ], vec![
5631 Some(5),
5632 Some(20),
5633 None,
5634 Some(5),
5635 Some(20),
5636 None,
5637 Some(5),
5638 Some(20),
5639 None,
5640 ], ),
5642 )
5643 .with_contained(
5645 "s",
5646 [ScalarValue::from("foo")],
5647 [
5648 Some(true),
5650 Some(true),
5651 Some(true),
5652 Some(false),
5654 Some(false),
5655 Some(false),
5656 None,
5658 None,
5659 None,
5660 ],
5661 );
5662
5663 prune_with_expr(
5665 col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5666 &schema,
5667 &statistics,
5668 &[true, false, true, false, false, false, true, false, true],
5673 );
5674
5675 prune_with_expr(
5677 col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5678 &schema,
5679 &statistics,
5680 &[false, false, false, true, false, true, true, false, true],
5684 );
5685
5686 prune_with_expr(
5688 col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5689 &schema,
5690 &statistics,
5691 &[true, true, true, true, true, true, true, true, true],
5694 );
5695 }
5696
5697 fn prune_with_expr(
5704 expr: Expr,
5705 schema: &SchemaRef,
5706 statistics: &TestStatistics,
5707 expected: &[bool],
5708 ) {
5709 println!("Pruning with expr: {expr}");
5710 let expr = logical2physical(&expr, schema);
5711 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5712 let result = p.prune(statistics).unwrap();
5713 assert_eq!(result, expected);
5714 }
5715
5716 fn prune_with_simplified_expr(
5717 expr: Expr,
5718 schema: &SchemaRef,
5719 statistics: &TestStatistics,
5720 expected: &[bool],
5721 ) {
5722 println!("Pruning with expr: {expr}");
5723 let expr = logical2physical(&expr, schema);
5724 let simplifier = PhysicalExprSimplifier::new(schema);
5725 let expr = simplifier.simplify(expr).unwrap();
5726 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5727 let result = p.prune(statistics).unwrap();
5728 assert_eq!(result, expected);
5729 }
5730
5731 fn is_not_distinct_from(left: Expr, right: Expr) -> Expr {
5732 Expr::BinaryExpr(BinaryExpr::new(
5733 Box::new(left),
5734 Operator::IsNotDistinctFrom,
5735 Box::new(right),
5736 ))
5737 }
5738
5739 fn is_distinct_from(left: Expr, right: Expr) -> Expr {
5740 Expr::BinaryExpr(BinaryExpr::new(
5741 Box::new(left),
5742 Operator::IsDistinctFrom,
5743 Box::new(right),
5744 ))
5745 }
5746
5747 fn test_build_predicate_expression(
5748 expr: &Expr,
5749 schema: &Schema,
5750 required_columns: &mut RequiredColumns,
5751 ) -> Arc<dyn PhysicalExpr> {
5752 let expr = logical2physical(expr, schema);
5753 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5754 build_predicate_expression(
5755 &expr,
5756 &Arc::new(schema.clone()),
5757 required_columns,
5758 &unhandled_hook,
5759 )
5760 }
5761
5762 #[test]
5763 fn test_build_predicate_expression_with_false() {
5764 let expr = lit(ScalarValue::Boolean(Some(false)));
5765 let schema = Schema::empty();
5766 let res =
5767 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5768 let expected = logical2physical(&expr, &schema);
5769 assert_eq!(&res, &expected);
5770 }
5771
5772 #[test]
5773 fn test_build_predicate_expression_with_and_false() {
5774 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5775 let expr = and(
5776 col("c1").eq(lit("a")),
5777 lit(ScalarValue::Boolean(Some(false))),
5778 );
5779 let res =
5780 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5781 let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5782 assert_eq!(&res, &expected);
5783 }
5784
5785 #[test]
5786 fn test_build_predicate_expression_with_or_false() {
5787 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5788 let left_expr = col("c1").eq(lit("a"));
5789 let right_expr = lit(ScalarValue::Boolean(Some(false)));
5790 let res = test_build_predicate_expression(
5791 &or(left_expr.clone(), right_expr.clone()),
5792 &schema,
5793 &mut RequiredColumns::new(),
5794 );
5795 let expected =
5796 "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5797 assert_eq!(res.to_string(), expected);
5798 }
5799}