1use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27 array::{new_null_array, ArrayRef, BooleanArray},
28 datatypes::{DataType, Field, Schema, SchemaRef},
29 record_batch::{RecordBatch, RecordBatchOptions},
30};
31pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::TransformedResult;
39use datafusion_common::{
40 internal_datafusion_err, internal_err, plan_datafusion_err, plan_err,
41 tree_node::{Transformed, TreeNode},
42 ScalarValue,
43};
44use datafusion_common::{Column, DFSchema};
45use datafusion_expr_common::operator::Operator;
46use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
47use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
48use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
49use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
50
51#[derive(Debug, Clone)]
363pub struct PruningPredicate {
364 schema: SchemaRef,
366 predicate_expr: Arc<dyn PhysicalExpr>,
369 required_columns: RequiredColumns,
371 orig_expr: Arc<dyn PhysicalExpr>,
374 literal_guarantees: Vec<LiteralGuarantee>,
379}
380
381pub fn build_pruning_predicate(
387 predicate: Arc<dyn PhysicalExpr>,
388 file_schema: &SchemaRef,
389 predicate_creation_errors: &Count,
390) -> Option<Arc<PruningPredicate>> {
391 match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
392 Ok(pruning_predicate) => {
393 if !pruning_predicate.always_true() {
394 return Some(Arc::new(pruning_predicate));
395 }
396 }
397 Err(e) => {
398 debug!("Could not create pruning predicate for: {e}");
399 predicate_creation_errors.add(1);
400 }
401 }
402 None
403}
404
405pub trait UnhandledPredicateHook {
409 fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
412}
413
414#[derive(Debug, Clone)]
417struct ConstantUnhandledPredicateHook {
418 default: Arc<dyn PhysicalExpr>,
419}
420
421impl Default for ConstantUnhandledPredicateHook {
422 fn default() -> Self {
423 Self {
424 default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
425 }
426 }
427}
428
429impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
430 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
431 Arc::clone(&self.default)
432 }
433}
434
435impl PruningPredicate {
436 pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
459 let expr = snapshot_physical_expr(expr)?;
462 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
463
464 let mut required_columns = RequiredColumns::new();
466 let predicate_expr = build_predicate_expression(
467 &expr,
468 &schema,
469 &mut required_columns,
470 &unhandled_hook,
471 );
472 let predicate_schema = required_columns.schema();
473 let predicate_expr =
475 PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
476
477 let literal_guarantees = LiteralGuarantee::analyze(&expr);
478
479 Ok(Self {
480 schema,
481 predicate_expr,
482 required_columns,
483 orig_expr: expr,
484 literal_guarantees,
485 })
486 }
487
488 pub fn prune<S: PruningStatistics + ?Sized>(
503 &self,
504 statistics: &S,
505 ) -> Result<Vec<bool>> {
506 let mut builder = BoolVecBuilder::new(statistics.num_containers());
507
508 for literal_guarantee in &self.literal_guarantees {
511 let LiteralGuarantee {
512 column,
513 guarantee,
514 literals,
515 } = literal_guarantee;
516 if let Some(results) = statistics.contained(column, literals) {
517 match guarantee {
518 Guarantee::In => builder.combine_array(&results),
523 Guarantee::NotIn => {
529 builder.combine_array(&arrow::compute::not(&results)?)
530 }
531 }
532 if builder.check_all_pruned() {
535 return Ok(builder.build());
536 }
537 }
538 }
539
540 let statistics_batch =
546 build_statistics_record_batch(statistics, &self.required_columns)?;
547
548 builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
550
551 Ok(builder.build())
552 }
553
554 pub fn schema(&self) -> &SchemaRef {
556 &self.schema
557 }
558
559 pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
561 &self.orig_expr
562 }
563
564 pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
566 &self.predicate_expr
567 }
568
569 pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
575 &self.literal_guarantees
576 }
577
578 pub fn always_true(&self) -> bool {
585 is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
586 }
587
588 #[allow(dead_code)]
590 pub fn required_columns(&self) -> &RequiredColumns {
591 &self.required_columns
592 }
593
594 pub fn literal_columns(&self) -> Vec<String> {
602 let mut seen = HashSet::new();
603 self.literal_guarantees
604 .iter()
605 .map(|e| &e.column.name)
606 .filter(|name| seen.insert(*name))
608 .map(|s| s.to_string())
609 .collect()
610 }
611}
612
613#[derive(Debug)]
615struct BoolVecBuilder {
616 inner: Vec<bool>,
620}
621
622impl BoolVecBuilder {
623 fn new(num_containers: usize) -> Self {
625 Self {
626 inner: vec![true; num_containers],
628 }
629 }
630
631 fn combine_array(&mut self, array: &BooleanArray) {
639 assert_eq!(array.len(), self.inner.len());
640 for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
641 if let Some(false) = new {
645 *cur = false;
646 }
647 }
648 }
649
650 fn combine_value(&mut self, value: ColumnarValue) {
656 match value {
657 ColumnarValue::Array(array) => {
658 self.combine_array(array.as_boolean());
659 }
660 ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
661 self.inner = vec![false; self.inner.len()];
663 }
664 _ => {
665 }
668 }
669 }
670
671 fn build(self) -> Vec<bool> {
673 self.inner
674 }
675
676 fn check_all_pruned(&self) -> bool {
678 self.inner.iter().all(|&x| !x)
679 }
680}
681
682fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
683 expr.as_any()
684 .downcast_ref::<phys_expr::Literal>()
685 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
686 .unwrap_or_default()
687}
688
689fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
690 expr.as_any()
691 .downcast_ref::<phys_expr::Literal>()
692 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
693 .unwrap_or_default()
694}
695
696#[derive(Debug, Default, Clone)]
706pub struct RequiredColumns {
707 columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
713}
714
715impl RequiredColumns {
716 fn new() -> Self {
717 Self::default()
718 }
719
720 #[allow(dead_code)]
729 pub fn single_column(&self) -> Option<&phys_expr::Column> {
731 if self.columns.windows(2).all(|w| {
732 let c1 = &w[0].0;
734 let c2 = &w[1].0;
735 c1 == c2
736 }) {
737 self.columns.first().map(|r| &r.0)
738 } else {
739 None
740 }
741 }
742
743 fn schema(&self) -> Schema {
750 let fields = self
751 .columns
752 .iter()
753 .map(|(_c, _t, f)| f.clone())
754 .collect::<Vec<_>>();
755 Schema::new(fields)
756 }
757
758 pub(crate) fn iter(
761 &self,
762 ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
763 self.columns.iter()
764 }
765
766 fn find_stat_column(
767 &self,
768 column: &phys_expr::Column,
769 statistics_type: StatisticsType,
770 ) -> Option<usize> {
771 match statistics_type {
772 StatisticsType::RowCount => {
773 self.columns
775 .iter()
776 .enumerate()
777 .find(|(_i, (_c, t, _f))| t == &statistics_type)
778 .map(|(i, (_c, _t, _f))| i)
779 }
780 _ => self
781 .columns
782 .iter()
783 .enumerate()
784 .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
785 .map(|(i, (_c, _t, _f))| i),
786 }
787 }
788
789 fn stat_column_expr(
798 &mut self,
799 column: &phys_expr::Column,
800 column_expr: &Arc<dyn PhysicalExpr>,
801 field: &Field,
802 stat_type: StatisticsType,
803 ) -> Result<Arc<dyn PhysicalExpr>> {
804 let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
805 Some(idx) => (idx, false),
806 None => (self.columns.len(), true),
807 };
808
809 let column_name = column.name();
810 let stat_column_name = match stat_type {
811 StatisticsType::Min => format!("{column_name}_min"),
812 StatisticsType::Max => format!("{column_name}_max"),
813 StatisticsType::NullCount => format!("{column_name}_null_count"),
814 StatisticsType::RowCount => "row_count".to_string(),
815 };
816
817 let stat_column = phys_expr::Column::new(&stat_column_name, idx);
818
819 if need_to_insert {
821 let nullable = true;
823 let stat_field =
824 Field::new(stat_column.name(), field.data_type().clone(), nullable);
825 self.columns.push((column.clone(), stat_type, stat_field));
826 }
827 rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
828 }
829
830 fn min_column_expr(
832 &mut self,
833 column: &phys_expr::Column,
834 column_expr: &Arc<dyn PhysicalExpr>,
835 field: &Field,
836 ) -> Result<Arc<dyn PhysicalExpr>> {
837 self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
838 }
839
840 fn max_column_expr(
842 &mut self,
843 column: &phys_expr::Column,
844 column_expr: &Arc<dyn PhysicalExpr>,
845 field: &Field,
846 ) -> Result<Arc<dyn PhysicalExpr>> {
847 self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
848 }
849
850 fn null_count_column_expr(
852 &mut self,
853 column: &phys_expr::Column,
854 column_expr: &Arc<dyn PhysicalExpr>,
855 field: &Field,
856 ) -> Result<Arc<dyn PhysicalExpr>> {
857 self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
858 }
859
860 fn row_count_column_expr(
862 &mut self,
863 column: &phys_expr::Column,
864 column_expr: &Arc<dyn PhysicalExpr>,
865 field: &Field,
866 ) -> Result<Arc<dyn PhysicalExpr>> {
867 self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
868 }
869}
870
871impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
872 fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
873 Self { columns }
874 }
875}
876
877fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
903 statistics: &S,
904 required_columns: &RequiredColumns,
905) -> Result<RecordBatch> {
906 let mut arrays = Vec::<ArrayRef>::new();
907 for (column, statistics_type, stat_field) in required_columns.iter() {
909 let column = Column::from_name(column.name());
910 let data_type = stat_field.data_type();
911
912 let num_containers = statistics.num_containers();
913
914 let array = match statistics_type {
915 StatisticsType::Min => statistics.min_values(&column),
916 StatisticsType::Max => statistics.max_values(&column),
917 StatisticsType::NullCount => statistics.null_counts(&column),
918 StatisticsType::RowCount => statistics.row_counts(&column),
919 };
920 let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
921
922 if num_containers != array.len() {
923 return internal_err!(
924 "mismatched statistics length. Expected {}, got {}",
925 num_containers,
926 array.len()
927 );
928 }
929
930 let array = arrow::compute::cast(&array, data_type)?;
933
934 arrays.push(array);
935 }
936
937 let schema = Arc::new(required_columns.schema());
938 let mut options = RecordBatchOptions::default();
940 options.row_count = Some(statistics.num_containers());
941
942 trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
943
944 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
945 plan_datafusion_err!("Can not create statistics record batch: {err}")
946 })
947}
948
949struct PruningExpressionBuilder<'a> {
950 column: phys_expr::Column,
951 column_expr: Arc<dyn PhysicalExpr>,
952 op: Operator,
953 scalar_expr: Arc<dyn PhysicalExpr>,
954 field: &'a Field,
955 required_columns: &'a mut RequiredColumns,
956}
957
958impl<'a> PruningExpressionBuilder<'a> {
959 fn try_new(
960 left: &'a Arc<dyn PhysicalExpr>,
961 right: &'a Arc<dyn PhysicalExpr>,
962 op: Operator,
963 schema: &'a SchemaRef,
964 required_columns: &'a mut RequiredColumns,
965 ) -> Result<Self> {
966 let left_columns = collect_columns(left);
968 let right_columns = collect_columns(right);
969 let (column_expr, scalar_expr, columns, correct_operator) =
970 match (left_columns.len(), right_columns.len()) {
971 (1, 0) => (left, right, left_columns, op),
972 (0, 1) => (right, left, right_columns, reverse_operator(op)?),
973 _ => {
974 return plan_err!(
976 "Multi-column expressions are not currently supported"
977 );
978 }
979 };
980
981 let df_schema = DFSchema::try_from(Arc::clone(schema))?;
982 let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
983 column_expr,
984 correct_operator,
985 scalar_expr,
986 df_schema,
987 )?;
988 let column = columns.iter().next().unwrap().clone();
989 let field = match schema.column_with_name(column.name()) {
990 Some((_, f)) => f,
991 _ => {
992 return plan_err!("Field not found in schema");
993 }
994 };
995
996 Ok(Self {
997 column,
998 column_expr,
999 op: correct_operator,
1000 scalar_expr,
1001 field,
1002 required_columns,
1003 })
1004 }
1005
1006 fn op(&self) -> Operator {
1007 self.op
1008 }
1009
1010 fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1011 &self.scalar_expr
1012 }
1013
1014 fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1015 self.required_columns
1016 .min_column_expr(&self.column, &self.column_expr, self.field)
1017 }
1018
1019 fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1020 self.required_columns
1021 .max_column_expr(&self.column, &self.column_expr, self.field)
1022 }
1023
1024 fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1031 let column_expr = Arc::new(self.column.clone()) as _;
1033
1034 let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1036
1037 self.required_columns.null_count_column_expr(
1038 &self.column,
1039 &column_expr,
1040 null_count_field,
1041 )
1042 }
1043
1044 fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1051 let column_expr = Arc::new(self.column.clone()) as _;
1053
1054 let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1056
1057 self.required_columns.row_count_column_expr(
1058 &self.column,
1059 &column_expr,
1060 row_count_field,
1061 )
1062 }
1063}
1064
1065fn rewrite_expr_to_prunable(
1078 column_expr: &PhysicalExprRef,
1079 op: Operator,
1080 scalar_expr: &PhysicalExprRef,
1081 schema: DFSchema,
1082) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1083 if !is_compare_op(op) {
1084 return plan_err!("rewrite_expr_to_prunable only support compare expression");
1085 }
1086
1087 let column_expr_any = column_expr.as_any();
1088
1089 if column_expr_any
1090 .downcast_ref::<phys_expr::Column>()
1091 .is_some()
1092 {
1093 Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1095 } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1096 let arrow_schema = schema.as_arrow();
1098 let from_type = cast.expr().data_type(arrow_schema)?;
1099 verify_support_type_for_prune(&from_type, cast.cast_type())?;
1100 let (left, op, right) =
1101 rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1102 let left = Arc::new(phys_expr::CastExpr::new(
1103 left,
1104 cast.cast_type().clone(),
1105 None,
1106 ));
1107 Ok((left, op, right))
1108 } else if let Some(try_cast) =
1109 column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1110 {
1111 let arrow_schema = schema.as_arrow();
1113 let from_type = try_cast.expr().data_type(arrow_schema)?;
1114 verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1115 let (left, op, right) =
1116 rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1117 let left = Arc::new(phys_expr::TryCastExpr::new(
1118 left,
1119 try_cast.cast_type().clone(),
1120 ));
1121 Ok((left, op, right))
1122 } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1123 let (left, op, right) =
1125 rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1126 let right = Arc::new(phys_expr::NegativeExpr::new(right));
1127 Ok((left, reverse_operator(op)?, right))
1128 } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1129 if op != Operator::Eq && op != Operator::NotEq {
1131 return plan_err!("Not with operator other than Eq / NotEq is not supported");
1132 }
1133 if not
1134 .arg()
1135 .as_any()
1136 .downcast_ref::<phys_expr::Column>()
1137 .is_some()
1138 {
1139 let left = Arc::clone(not.arg());
1140 let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1141 Ok((left, reverse_operator(op)?, right))
1142 } else {
1143 plan_err!("Not with complex expression {column_expr:?} is not supported")
1144 }
1145 } else {
1146 plan_err!("column expression {column_expr:?} is not supported")
1147 }
1148}
1149
1150fn is_compare_op(op: Operator) -> bool {
1151 matches!(
1152 op,
1153 Operator::Eq
1154 | Operator::NotEq
1155 | Operator::Lt
1156 | Operator::LtEq
1157 | Operator::Gt
1158 | Operator::GtEq
1159 | Operator::LikeMatch
1160 | Operator::NotLikeMatch
1161 )
1162}
1163
1164fn is_string_type(data_type: &DataType) -> bool {
1165 matches!(
1166 data_type,
1167 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1168 )
1169}
1170
1171fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1176 let from_type = match from_type {
1178 DataType::Dictionary(_, t) => {
1179 return verify_support_type_for_prune(t.as_ref(), to_type)
1180 }
1181 _ => from_type,
1182 };
1183 let to_type = match to_type {
1184 DataType::Dictionary(_, t) => {
1185 return verify_support_type_for_prune(from_type, t.as_ref())
1186 }
1187 _ => to_type,
1188 };
1189 if is_string_type(from_type) == is_string_type(to_type) {
1193 Ok(())
1194 } else {
1195 plan_err!(
1196 "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1197 )
1198 }
1199}
1200
1201fn rewrite_column_expr(
1203 e: Arc<dyn PhysicalExpr>,
1204 column_old: &phys_expr::Column,
1205 column_new: &phys_expr::Column,
1206) -> Result<Arc<dyn PhysicalExpr>> {
1207 e.transform(|expr| {
1208 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1209 if column == column_old {
1210 return Ok(Transformed::yes(Arc::new(column_new.clone())));
1211 }
1212 }
1213
1214 Ok(Transformed::no(expr))
1215 })
1216 .data()
1217}
1218
1219fn reverse_operator(op: Operator) -> Result<Operator> {
1220 op.swap().ok_or_else(|| {
1221 internal_datafusion_err!(
1222 "Could not reverse operator {op} while building pruning predicate"
1223 )
1224 })
1225}
1226
1227fn build_single_column_expr(
1232 column: &phys_expr::Column,
1233 schema: &Schema,
1234 required_columns: &mut RequiredColumns,
1235 is_not: bool, ) -> Option<Arc<dyn PhysicalExpr>> {
1237 let field = schema.field_with_name(column.name()).ok()?;
1238
1239 if matches!(field.data_type(), &DataType::Boolean) {
1240 let col_ref = Arc::new(column.clone()) as _;
1241
1242 let min = required_columns
1243 .min_column_expr(column, &col_ref, field)
1244 .ok()?;
1245 let max = required_columns
1246 .max_column_expr(column, &col_ref, field)
1247 .ok()?;
1248
1249 if is_not {
1253 Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1256 phys_expr::BinaryExpr::new(min, Operator::And, max),
1257 ))))
1258 } else {
1259 Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1262 }
1263 } else {
1264 None
1265 }
1266}
1267
1268fn build_is_null_column_expr(
1277 expr: &Arc<dyn PhysicalExpr>,
1278 schema: &Schema,
1279 required_columns: &mut RequiredColumns,
1280 with_not: bool,
1281) -> Option<Arc<dyn PhysicalExpr>> {
1282 if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1283 let field = schema.field_with_name(col.name()).ok()?;
1284
1285 let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1286 if with_not {
1287 if let Ok(row_count_expr) =
1288 required_columns.row_count_column_expr(col, expr, null_count_field)
1289 {
1290 required_columns
1291 .null_count_column_expr(col, expr, null_count_field)
1292 .map(|null_count_column_expr| {
1293 Arc::new(phys_expr::BinaryExpr::new(
1295 null_count_column_expr,
1296 Operator::NotEq,
1297 row_count_expr,
1298 )) as _
1299 })
1300 .ok()
1301 } else {
1302 None
1303 }
1304 } else {
1305 required_columns
1306 .null_count_column_expr(col, expr, null_count_field)
1307 .map(|null_count_column_expr| {
1308 Arc::new(phys_expr::BinaryExpr::new(
1310 null_count_column_expr,
1311 Operator::Gt,
1312 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1313 )) as _
1314 })
1315 .ok()
1316 }
1317 } else {
1318 None
1319 }
1320}
1321
1322const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1325
1326pub struct PredicateRewriter {
1329 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1330}
1331
1332impl Default for PredicateRewriter {
1333 fn default() -> Self {
1334 Self {
1335 unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1336 }
1337 }
1338}
1339
1340impl PredicateRewriter {
1341 pub fn new() -> Self {
1343 Self::default()
1344 }
1345
1346 pub fn with_unhandled_hook(
1348 self,
1349 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1350 ) -> Self {
1351 Self { unhandled_hook }
1352 }
1353
1354 pub fn rewrite_predicate_to_statistics_predicate(
1364 &self,
1365 expr: &Arc<dyn PhysicalExpr>,
1366 schema: &Schema,
1367 ) -> Arc<dyn PhysicalExpr> {
1368 let mut required_columns = RequiredColumns::new();
1369 build_predicate_expression(
1370 expr,
1371 &Arc::new(schema.clone()),
1372 &mut required_columns,
1373 &self.unhandled_hook,
1374 )
1375 }
1376}
1377
1378fn build_predicate_expression(
1388 expr: &Arc<dyn PhysicalExpr>,
1389 schema: &SchemaRef,
1390 required_columns: &mut RequiredColumns,
1391 unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1392) -> Arc<dyn PhysicalExpr> {
1393 if is_always_false(expr) {
1394 return Arc::clone(expr);
1397 }
1398 let expr_any = expr.as_any();
1400 if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1401 return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1402 .unwrap_or_else(|| unhandled_hook.handle(expr));
1403 }
1404 if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1405 return build_is_null_column_expr(
1406 is_not_null.arg(),
1407 schema,
1408 required_columns,
1409 true,
1410 )
1411 .unwrap_or_else(|| unhandled_hook.handle(expr));
1412 }
1413 if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1414 return build_single_column_expr(col, schema, required_columns, false)
1415 .unwrap_or_else(|| unhandled_hook.handle(expr));
1416 }
1417 if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1418 if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1420 return build_single_column_expr(col, schema, required_columns, true)
1421 .unwrap_or_else(|| unhandled_hook.handle(expr));
1422 } else {
1423 return unhandled_hook.handle(expr);
1424 }
1425 }
1426 if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1427 if !in_list.list().is_empty()
1428 && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1429 {
1430 let eq_op = if in_list.negated() {
1431 Operator::NotEq
1432 } else {
1433 Operator::Eq
1434 };
1435 let re_op = if in_list.negated() {
1436 Operator::And
1437 } else {
1438 Operator::Or
1439 };
1440 let change_expr = in_list
1441 .list()
1442 .iter()
1443 .map(|e| {
1444 Arc::new(phys_expr::BinaryExpr::new(
1445 Arc::clone(in_list.expr()),
1446 eq_op,
1447 Arc::clone(e),
1448 )) as _
1449 })
1450 .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1451 .unwrap();
1452 return build_predicate_expression(
1453 &change_expr,
1454 schema,
1455 required_columns,
1456 unhandled_hook,
1457 );
1458 } else {
1459 return unhandled_hook.handle(expr);
1460 }
1461 }
1462
1463 let (left, op, right) = {
1464 if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1465 (
1466 Arc::clone(bin_expr.left()),
1467 *bin_expr.op(),
1468 Arc::clone(bin_expr.right()),
1469 )
1470 } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1471 if like_expr.case_insensitive() {
1472 return unhandled_hook.handle(expr);
1473 }
1474 let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1475 (false, false) => Operator::LikeMatch,
1476 (true, false) => Operator::NotLikeMatch,
1477 (false, true) => Operator::ILikeMatch,
1478 (true, true) => Operator::NotILikeMatch,
1479 };
1480 (
1481 Arc::clone(like_expr.expr()),
1482 op,
1483 Arc::clone(like_expr.pattern()),
1484 )
1485 } else {
1486 return unhandled_hook.handle(expr);
1487 }
1488 };
1489
1490 if op == Operator::And || op == Operator::Or {
1491 let left_expr =
1492 build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1493 let right_expr =
1494 build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1495 let expr = match (&left_expr, op, &right_expr) {
1497 (left, Operator::And, right)
1498 if is_always_false(left) || is_always_false(right) =>
1499 {
1500 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1501 }
1502 (left, Operator::And, _) if is_always_true(left) => right_expr,
1503 (_, Operator::And, right) if is_always_true(right) => left_expr,
1504 (left, Operator::Or, right)
1505 if is_always_true(left) || is_always_true(right) =>
1506 {
1507 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1508 }
1509 (left, Operator::Or, _) if is_always_false(left) => right_expr,
1510 (_, Operator::Or, right) if is_always_false(right) => left_expr,
1511
1512 _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1513 };
1514 return expr;
1515 }
1516
1517 let expr_builder =
1518 PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1519 let mut expr_builder = match expr_builder {
1520 Ok(builder) => builder,
1521 Err(e) => {
1524 debug!("Error building pruning expression: {e}");
1525 return unhandled_hook.handle(expr);
1526 }
1527 };
1528
1529 build_statistics_expr(&mut expr_builder)
1530 .unwrap_or_else(|_| unhandled_hook.handle(expr))
1531}
1532
1533fn build_statistics_expr(
1534 expr_builder: &mut PruningExpressionBuilder,
1535) -> Result<Arc<dyn PhysicalExpr>> {
1536 let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1537 Operator::NotEq => {
1538 let min_column_expr = expr_builder.min_column_expr()?;
1542 let max_column_expr = expr_builder.max_column_expr()?;
1543 Arc::new(phys_expr::BinaryExpr::new(
1544 Arc::new(phys_expr::BinaryExpr::new(
1545 min_column_expr,
1546 Operator::NotEq,
1547 Arc::clone(expr_builder.scalar_expr()),
1548 )),
1549 Operator::Or,
1550 Arc::new(phys_expr::BinaryExpr::new(
1551 Arc::clone(expr_builder.scalar_expr()),
1552 Operator::NotEq,
1553 max_column_expr,
1554 )),
1555 ))
1556 }
1557 Operator::Eq => {
1558 let min_column_expr = expr_builder.min_column_expr()?;
1561 let max_column_expr = expr_builder.max_column_expr()?;
1562 Arc::new(phys_expr::BinaryExpr::new(
1563 Arc::new(phys_expr::BinaryExpr::new(
1564 min_column_expr,
1565 Operator::LtEq,
1566 Arc::clone(expr_builder.scalar_expr()),
1567 )),
1568 Operator::And,
1569 Arc::new(phys_expr::BinaryExpr::new(
1570 Arc::clone(expr_builder.scalar_expr()),
1571 Operator::LtEq,
1572 max_column_expr,
1573 )),
1574 ))
1575 }
1576 Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1577 Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1578 plan_datafusion_err!(
1579 "LIKE expression with wildcard at the beginning is not supported"
1580 )
1581 })?,
1582 Operator::Gt => {
1583 Arc::new(phys_expr::BinaryExpr::new(
1585 expr_builder.max_column_expr()?,
1586 Operator::Gt,
1587 Arc::clone(expr_builder.scalar_expr()),
1588 ))
1589 }
1590 Operator::GtEq => {
1591 Arc::new(phys_expr::BinaryExpr::new(
1593 expr_builder.max_column_expr()?,
1594 Operator::GtEq,
1595 Arc::clone(expr_builder.scalar_expr()),
1596 ))
1597 }
1598 Operator::Lt => {
1599 Arc::new(phys_expr::BinaryExpr::new(
1601 expr_builder.min_column_expr()?,
1602 Operator::Lt,
1603 Arc::clone(expr_builder.scalar_expr()),
1604 ))
1605 }
1606 Operator::LtEq => {
1607 Arc::new(phys_expr::BinaryExpr::new(
1609 expr_builder.min_column_expr()?,
1610 Operator::LtEq,
1611 Arc::clone(expr_builder.scalar_expr()),
1612 ))
1613 }
1614 _ => {
1616 return plan_err!(
1617 "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1618 );
1619 }
1620 };
1621 let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1622 Ok(statistics_expr)
1623}
1624
1625fn unpack_string(s: &ScalarValue) -> Option<&str> {
1627 s.try_as_str().flatten()
1628}
1629
1630fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1631 if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1632 let s = unpack_string(lit.value())?;
1633 return Some(s);
1634 }
1635 None
1636}
1637
1638fn build_like_match(
1642 expr_builder: &mut PruningExpressionBuilder,
1643) -> Option<Arc<dyn PhysicalExpr>> {
1644 let min_column_expr = expr_builder.min_column_expr().ok()?;
1653 let max_column_expr = expr_builder.max_column_expr().ok()?;
1654 let scalar_expr = expr_builder.scalar_expr();
1655 let s = extract_string_literal(scalar_expr)?;
1657 let first_wildcard_index = s.find(['%', '_']);
1659 if first_wildcard_index == Some(0) {
1660 return None;
1662 }
1663 let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1664 let prefix = &s[..wildcard_index];
1665 let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1666 prefix.to_string(),
1667 ))));
1668 let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1669 increment_utf8(prefix)?,
1670 ))));
1671 (lower_bound_lit, upper_bound_lit)
1672 } else {
1673 let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1675 s.to_string(),
1676 ))));
1677 (Arc::clone(&bound), bound)
1678 };
1679 let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1680 lower_bound,
1681 Operator::LtEq,
1682 Arc::clone(&max_column_expr),
1683 ));
1684 let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1685 Arc::clone(&min_column_expr),
1686 Operator::LtEq,
1687 upper_bound,
1688 ));
1689 let combined = Arc::new(phys_expr::BinaryExpr::new(
1690 upper_bound_expr,
1691 Operator::And,
1692 lower_bound_expr,
1693 ));
1694 Some(combined)
1695}
1696
1697fn build_not_like_match(
1703 expr_builder: &mut PruningExpressionBuilder<'_>,
1704) -> Result<Arc<dyn PhysicalExpr>> {
1705 let min_column_expr = expr_builder.min_column_expr()?;
1708 let max_column_expr = expr_builder.max_column_expr()?;
1709
1710 let scalar_expr = expr_builder.scalar_expr();
1711
1712 let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1713 plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1714 })?;
1715
1716 let (const_prefix, remaining) = split_constant_prefix(pattern);
1717 if const_prefix.is_empty() || remaining != "%" {
1718 return Err(plan_datafusion_err!(
1730 "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1731 ));
1732 }
1733
1734 let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1735 true,
1736 false,
1737 Arc::clone(&min_column_expr),
1738 Arc::clone(scalar_expr),
1739 ));
1740
1741 let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1742 true,
1743 false,
1744 Arc::clone(&max_column_expr),
1745 Arc::clone(scalar_expr),
1746 ));
1747
1748 Ok(Arc::new(phys_expr::BinaryExpr::new(
1749 min_col_not_like_epxr,
1750 Operator::Or,
1751 max_col_not_like_expr,
1752 )))
1753}
1754
1755fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1757 let char_indices = pattern.char_indices().collect::<Vec<_>>();
1758 for i in 0..char_indices.len() {
1759 let (idx, char) = char_indices[i];
1760 if char == '%' || char == '_' {
1761 if i != 0 && char_indices[i - 1].1 == '\\' {
1762 continue;
1764 }
1765 return (&pattern[..idx], &pattern[idx..]);
1766 }
1767 }
1768 (pattern, "")
1769}
1770
1771fn increment_utf8(data: &str) -> Option<String> {
1779 fn is_valid_unicode(c: char) -> bool {
1781 let cp = c as u32;
1782
1783 if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1785 return false;
1786 }
1787
1788 if cp >= 0x110000 {
1790 return false;
1791 }
1792
1793 true
1794 }
1795
1796 let mut code_points: Vec<char> = data.chars().collect();
1798
1799 for idx in (0..code_points.len()).rev() {
1801 let original = code_points[idx] as u32;
1802
1803 if let Some(next_char) = char::from_u32(original + 1) {
1805 if is_valid_unicode(next_char) {
1806 code_points[idx] = next_char;
1807 code_points.truncate(idx + 1);
1809 return Some(code_points.into_iter().collect());
1810 }
1811 }
1812 }
1813
1814 None
1815}
1816
1817fn wrap_null_count_check_expr(
1838 statistics_expr: Arc<dyn PhysicalExpr>,
1839 expr_builder: &mut PruningExpressionBuilder,
1840) -> Result<Arc<dyn PhysicalExpr>> {
1841 let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1843 expr_builder.null_count_column_expr()?,
1844 Operator::NotEq,
1845 expr_builder.row_count_column_expr()?,
1846 ));
1847
1848 Ok(Arc::new(phys_expr::BinaryExpr::new(
1850 not_when_null_count_eq_row_count,
1851 Operator::And,
1852 statistics_expr,
1853 )))
1854}
1855
1856#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1857pub(crate) enum StatisticsType {
1858 Min,
1859 Max,
1860 NullCount,
1861 RowCount,
1862}
1863
1864#[cfg(test)]
1865mod tests {
1866 use std::collections::HashMap;
1867 use std::ops::{Not, Rem};
1868
1869 use super::*;
1870 use datafusion_common::test_util::batches_to_string;
1871 use datafusion_expr::{and, col, lit, or};
1872 use insta::assert_snapshot;
1873
1874 use arrow::array::Decimal128Array;
1875 use arrow::{
1876 array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1877 datatypes::TimeUnit,
1878 };
1879 use datafusion_expr::expr::InList;
1880 use datafusion_expr::{cast, is_null, try_cast, Expr};
1881 use datafusion_functions_nested::expr_fn::{array_has, make_array};
1882 use datafusion_physical_expr::expressions as phys_expr;
1883 use datafusion_physical_expr::planner::logical2physical;
1884
1885 #[derive(Debug, Default)]
1886 struct ContainerStats {
1894 min: Option<ArrayRef>,
1895 max: Option<ArrayRef>,
1896 null_counts: Option<ArrayRef>,
1898 row_counts: Option<ArrayRef>,
1899 contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1903 }
1904
1905 impl ContainerStats {
1906 fn new() -> Self {
1907 Default::default()
1908 }
1909 fn new_decimal128(
1910 min: impl IntoIterator<Item = Option<i128>>,
1911 max: impl IntoIterator<Item = Option<i128>>,
1912 precision: u8,
1913 scale: i8,
1914 ) -> Self {
1915 Self::new()
1916 .with_min(Arc::new(
1917 min.into_iter()
1918 .collect::<Decimal128Array>()
1919 .with_precision_and_scale(precision, scale)
1920 .unwrap(),
1921 ))
1922 .with_max(Arc::new(
1923 max.into_iter()
1924 .collect::<Decimal128Array>()
1925 .with_precision_and_scale(precision, scale)
1926 .unwrap(),
1927 ))
1928 }
1929
1930 fn new_i64(
1931 min: impl IntoIterator<Item = Option<i64>>,
1932 max: impl IntoIterator<Item = Option<i64>>,
1933 ) -> Self {
1934 Self::new()
1935 .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1936 .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1937 }
1938
1939 fn new_i32(
1940 min: impl IntoIterator<Item = Option<i32>>,
1941 max: impl IntoIterator<Item = Option<i32>>,
1942 ) -> Self {
1943 Self::new()
1944 .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1945 .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1946 }
1947
1948 fn new_utf8<'a>(
1949 min: impl IntoIterator<Item = Option<&'a str>>,
1950 max: impl IntoIterator<Item = Option<&'a str>>,
1951 ) -> Self {
1952 Self::new()
1953 .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1954 .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1955 }
1956
1957 fn new_bool(
1958 min: impl IntoIterator<Item = Option<bool>>,
1959 max: impl IntoIterator<Item = Option<bool>>,
1960 ) -> Self {
1961 Self::new()
1962 .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1963 .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1964 }
1965
1966 fn min(&self) -> Option<ArrayRef> {
1967 self.min.clone()
1968 }
1969
1970 fn max(&self) -> Option<ArrayRef> {
1971 self.max.clone()
1972 }
1973
1974 fn null_counts(&self) -> Option<ArrayRef> {
1975 self.null_counts.clone()
1976 }
1977
1978 fn row_counts(&self) -> Option<ArrayRef> {
1979 self.row_counts.clone()
1980 }
1981
1982 fn arrays(&self) -> Vec<ArrayRef> {
1984 let contained_arrays = self
1985 .contained
1986 .iter()
1987 .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
1988
1989 [
1990 self.min.as_ref().cloned(),
1991 self.max.as_ref().cloned(),
1992 self.null_counts.as_ref().cloned(),
1993 self.row_counts.as_ref().cloned(),
1994 ]
1995 .into_iter()
1996 .flatten()
1997 .chain(contained_arrays)
1998 .collect()
1999 }
2000
2001 fn len(&self) -> usize {
2005 self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2007 }
2008
2009 fn assert_invariants(&self) {
2011 let mut prev_len = None;
2012
2013 for len in self.arrays().iter().map(|a| a.len()) {
2014 match prev_len {
2016 None => {
2017 prev_len = Some(len);
2018 }
2019 Some(prev_len) => {
2020 assert_eq!(prev_len, len);
2021 }
2022 }
2023 }
2024 }
2025
2026 fn with_min(mut self, min: ArrayRef) -> Self {
2028 self.min = Some(min);
2029 self
2030 }
2031
2032 fn with_max(mut self, max: ArrayRef) -> Self {
2034 self.max = Some(max);
2035 self
2036 }
2037
2038 fn with_null_counts(
2041 mut self,
2042 counts: impl IntoIterator<Item = Option<u64>>,
2043 ) -> Self {
2044 let null_counts: ArrayRef =
2045 Arc::new(counts.into_iter().collect::<UInt64Array>());
2046
2047 self.assert_invariants();
2048 self.null_counts = Some(null_counts);
2049 self
2050 }
2051
2052 fn with_row_counts(
2055 mut self,
2056 counts: impl IntoIterator<Item = Option<u64>>,
2057 ) -> Self {
2058 let row_counts: ArrayRef =
2059 Arc::new(counts.into_iter().collect::<UInt64Array>());
2060
2061 self.assert_invariants();
2062 self.row_counts = Some(row_counts);
2063 self
2064 }
2065
2066 pub fn with_contained(
2068 mut self,
2069 values: impl IntoIterator<Item = ScalarValue>,
2070 contained: impl IntoIterator<Item = Option<bool>>,
2071 ) -> Self {
2072 let contained: BooleanArray = contained.into_iter().collect();
2073 let values: HashSet<_> = values.into_iter().collect();
2074
2075 self.contained.push((values, contained));
2076 self.assert_invariants();
2077 self
2078 }
2079
2080 fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2082 self.contained
2084 .iter()
2085 .find(|(values, _contained)| values == find_values)
2086 .map(|(_values, contained)| contained.clone())
2087 }
2088 }
2089
2090 #[derive(Debug, Default)]
2091 struct TestStatistics {
2092 stats: HashMap<Column, ContainerStats>,
2094 }
2095
2096 impl TestStatistics {
2097 fn new() -> Self {
2098 Self::default()
2099 }
2100
2101 fn with(
2102 mut self,
2103 name: impl Into<String>,
2104 container_stats: ContainerStats,
2105 ) -> Self {
2106 let col = Column::from_name(name.into());
2107 self.stats.insert(col, container_stats);
2108 self
2109 }
2110
2111 fn with_null_counts(
2115 mut self,
2116 name: impl Into<String>,
2117 counts: impl IntoIterator<Item = Option<u64>>,
2118 ) -> Self {
2119 let col = Column::from_name(name.into());
2120
2121 let container_stats = self
2123 .stats
2124 .remove(&col)
2125 .unwrap_or_default()
2126 .with_null_counts(counts);
2127
2128 self.stats.insert(col, container_stats);
2130 self
2131 }
2132
2133 fn with_row_counts(
2137 mut self,
2138 name: impl Into<String>,
2139 counts: impl IntoIterator<Item = Option<u64>>,
2140 ) -> Self {
2141 let col = Column::from_name(name.into());
2142
2143 let container_stats = self
2145 .stats
2146 .remove(&col)
2147 .unwrap_or_default()
2148 .with_row_counts(counts);
2149
2150 self.stats.insert(col, container_stats);
2152 self
2153 }
2154
2155 fn with_contained(
2157 mut self,
2158 name: impl Into<String>,
2159 values: impl IntoIterator<Item = ScalarValue>,
2160 contained: impl IntoIterator<Item = Option<bool>>,
2161 ) -> Self {
2162 let col = Column::from_name(name.into());
2163
2164 let container_stats = self
2166 .stats
2167 .remove(&col)
2168 .unwrap_or_default()
2169 .with_contained(values, contained);
2170
2171 self.stats.insert(col, container_stats);
2173 self
2174 }
2175 }
2176
2177 impl PruningStatistics for TestStatistics {
2178 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2179 self.stats
2180 .get(column)
2181 .map(|container_stats| container_stats.min())
2182 .unwrap_or(None)
2183 }
2184
2185 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2186 self.stats
2187 .get(column)
2188 .map(|container_stats| container_stats.max())
2189 .unwrap_or(None)
2190 }
2191
2192 fn num_containers(&self) -> usize {
2193 self.stats
2194 .values()
2195 .next()
2196 .map(|container_stats| container_stats.len())
2197 .unwrap_or(0)
2198 }
2199
2200 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2201 self.stats
2202 .get(column)
2203 .map(|container_stats| container_stats.null_counts())
2204 .unwrap_or(None)
2205 }
2206
2207 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2208 self.stats
2209 .get(column)
2210 .map(|container_stats| container_stats.row_counts())
2211 .unwrap_or(None)
2212 }
2213
2214 fn contained(
2215 &self,
2216 column: &Column,
2217 values: &HashSet<ScalarValue>,
2218 ) -> Option<BooleanArray> {
2219 self.stats
2220 .get(column)
2221 .and_then(|container_stats| container_stats.contained(values))
2222 }
2223 }
2224
2225 struct OneContainerStats {
2227 min_values: Option<ArrayRef>,
2228 max_values: Option<ArrayRef>,
2229 num_containers: usize,
2230 }
2231
2232 impl PruningStatistics for OneContainerStats {
2233 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2234 self.min_values.clone()
2235 }
2236
2237 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2238 self.max_values.clone()
2239 }
2240
2241 fn num_containers(&self) -> usize {
2242 self.num_containers
2243 }
2244
2245 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2246 None
2247 }
2248
2249 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2250 None
2251 }
2252
2253 fn contained(
2254 &self,
2255 _column: &Column,
2256 _values: &HashSet<ScalarValue>,
2257 ) -> Option<BooleanArray> {
2258 None
2259 }
2260 }
2261
2262 #[test]
2265 fn test_unique_row_count_field_and_column() {
2266 let schema: SchemaRef = Arc::new(Schema::new(vec![
2268 Field::new("c1", DataType::Int32, true),
2269 Field::new("c2", DataType::Int32, true),
2270 ]));
2271 let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2272 let expr = logical2physical(&expr, &schema);
2273 let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2274 assert_eq!(
2276 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2277 p.predicate_expr.to_string()
2278 );
2279
2280 let mut fields = HashSet::new();
2283 for (_col, _ty, field) in p.required_columns().iter() {
2284 let was_new = fields.insert(field);
2285 if !was_new {
2286 panic!(
2287 "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2288 );
2289 }
2290 }
2291 }
2292
2293 #[test]
2294 fn prune_all_rows_null_counts() {
2295 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2298 let statistics = TestStatistics::new().with(
2299 "i",
2300 ContainerStats::new_i32(
2301 vec![Some(0)], vec![Some(0)], )
2304 .with_null_counts(vec![Some(1)])
2305 .with_row_counts(vec![Some(1)]),
2306 );
2307 let expected_ret = &[false];
2308 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2309
2310 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2312 let container_stats = ContainerStats {
2313 min: Some(Arc::new(Int32Array::from(vec![None]))),
2314 max: Some(Arc::new(Int32Array::from(vec![None]))),
2315 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2316 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2317 ..ContainerStats::default()
2318 };
2319 let statistics = TestStatistics::new().with("i", container_stats);
2320 let expected_ret = &[false];
2321 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2322
2323 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2325 let container_stats = ContainerStats {
2326 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2327 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2328 null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2329 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2330 ..ContainerStats::default()
2331 };
2332 let statistics = TestStatistics::new().with("i", container_stats);
2333 let expected_ret = &[true];
2334 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2335 let expected_ret = &[false];
2336 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2337
2338 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2340 let container_stats = ContainerStats {
2341 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2342 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2343 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2344 row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2345 ..ContainerStats::default()
2346 };
2347 let statistics = TestStatistics::new().with("i", container_stats);
2348 let expected_ret = &[true];
2349 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2350 let expected_ret = &[false];
2351 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2352 }
2353
2354 #[test]
2355 fn prune_missing_statistics() {
2356 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2359 let container_stats = ContainerStats {
2360 min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2361 max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2362 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2363 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2364 ..ContainerStats::default()
2365 };
2366 let statistics = TestStatistics::new().with("i", container_stats);
2367 let expected_ret = &[true, true];
2368 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2369 let expected_ret = &[false, true];
2370 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2371 let expected_ret = &[true, false];
2372 prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2373 }
2374
2375 #[test]
2376 fn prune_null_stats() {
2377 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2380
2381 let statistics = TestStatistics::new().with(
2382 "i",
2383 ContainerStats::new_i32(
2384 vec![Some(0)], vec![Some(0)], )
2387 .with_null_counts(vec![Some(1)])
2388 .with_row_counts(vec![Some(1)]),
2389 );
2390
2391 let expected_ret = &[false];
2392
2393 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2395 }
2396
2397 #[test]
2398 fn test_build_statistics_record_batch() {
2399 let required_columns = RequiredColumns::from(vec![
2401 (
2403 phys_expr::Column::new("s1", 1),
2404 StatisticsType::Min,
2405 Field::new("s1_min", DataType::Int32, true),
2406 ),
2407 (
2409 phys_expr::Column::new("s2", 2),
2410 StatisticsType::Max,
2411 Field::new("s2_max", DataType::Int32, true),
2412 ),
2413 (
2415 phys_expr::Column::new("s3", 3),
2416 StatisticsType::Max,
2417 Field::new("s3_max", DataType::Utf8, true),
2418 ),
2419 (
2421 phys_expr::Column::new("s3", 3),
2422 StatisticsType::Min,
2423 Field::new("s3_min", DataType::Utf8, true),
2424 ),
2425 ]);
2426
2427 let statistics = TestStatistics::new()
2428 .with(
2429 "s1",
2430 ContainerStats::new_i32(
2431 vec![None, None, Some(9), None], vec![Some(10), None, None, None], ),
2434 )
2435 .with(
2436 "s2",
2437 ContainerStats::new_i32(
2438 vec![Some(2), None, None, None], vec![Some(20), None, None, None], ),
2441 )
2442 .with(
2443 "s3",
2444 ContainerStats::new_utf8(
2445 vec![Some("a"), None, None, None], vec![Some("q"), None, Some("r"), None], ),
2448 );
2449
2450 let batch =
2451 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2452 assert_snapshot!(batches_to_string(&[batch]), @r"
2453 +--------+--------+--------+--------+
2454 | s1_min | s2_max | s3_max | s3_min |
2455 +--------+--------+--------+--------+
2456 | | 20 | q | a |
2457 | | | | |
2458 | 9 | | r | |
2459 | | | | |
2460 +--------+--------+--------+--------+
2461 ");
2462 }
2463
2464 #[test]
2465 fn test_build_statistics_casting() {
2466 let required_columns = RequiredColumns::from(vec![(
2471 phys_expr::Column::new("s3", 3),
2472 StatisticsType::Min,
2473 Field::new(
2474 "s1_min",
2475 DataType::Timestamp(TimeUnit::Nanosecond, None),
2476 true,
2477 ),
2478 )]);
2479
2480 let statistics = OneContainerStats {
2482 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2483 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2484 num_containers: 1,
2485 };
2486
2487 let batch =
2488 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2489
2490 assert_snapshot!(batches_to_string(&[batch]), @r"
2491 +-------------------------------+
2492 | s1_min |
2493 +-------------------------------+
2494 | 1970-01-01T00:00:00.000000010 |
2495 +-------------------------------+
2496 ");
2497 }
2498
2499 #[test]
2500 fn test_build_statistics_no_required_stats() {
2501 let required_columns = RequiredColumns::new();
2502
2503 let statistics = OneContainerStats {
2504 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2505 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2506 num_containers: 1,
2507 };
2508
2509 let batch =
2510 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2511 assert_eq!(batch.num_rows(), 1); }
2513
2514 #[test]
2515 fn test_build_statistics_inconsistent_types() {
2516 let required_columns = RequiredColumns::from(vec![(
2520 phys_expr::Column::new("s3", 3),
2521 StatisticsType::Min,
2522 Field::new("s1_min", DataType::Utf8, true),
2523 )]);
2524
2525 let statistics = OneContainerStats {
2527 min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2528 max_values: None,
2529 num_containers: 1,
2530 };
2531
2532 let batch =
2533 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2534 assert_snapshot!(batches_to_string(&[batch]), @r"
2535 +--------+
2536 | s1_min |
2537 +--------+
2538 | |
2539 +--------+
2540 ");
2541 }
2542
2543 #[test]
2544 fn test_build_statistics_inconsistent_length() {
2545 let required_columns = RequiredColumns::from(vec![(
2547 phys_expr::Column::new("s1", 3),
2548 StatisticsType::Min,
2549 Field::new("s1_min", DataType::Int64, true),
2550 )]);
2551
2552 let statistics = OneContainerStats {
2554 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2555 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2556 num_containers: 3,
2557 };
2558
2559 let result =
2560 build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2561 assert!(
2562 result
2563 .to_string()
2564 .contains("mismatched statistics length. Expected 3, got 1"),
2565 "{}",
2566 result
2567 );
2568 }
2569
2570 #[test]
2571 fn row_group_predicate_eq() -> Result<()> {
2572 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2573 let expected_expr =
2574 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2575
2576 let expr = col("c1").eq(lit(1));
2578 let predicate_expr =
2579 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2580 assert_eq!(predicate_expr.to_string(), expected_expr);
2581
2582 let expr = lit(1).eq(col("c1"));
2584 let predicate_expr =
2585 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2586 assert_eq!(predicate_expr.to_string(), expected_expr);
2587
2588 Ok(())
2589 }
2590
2591 #[test]
2592 fn row_group_predicate_not_eq() -> Result<()> {
2593 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2594 let expected_expr =
2595 "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2596
2597 let expr = col("c1").not_eq(lit(1));
2599 let predicate_expr =
2600 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2601 assert_eq!(predicate_expr.to_string(), expected_expr);
2602
2603 let expr = lit(1).not_eq(col("c1"));
2605 let predicate_expr =
2606 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2607 assert_eq!(predicate_expr.to_string(), expected_expr);
2608
2609 Ok(())
2610 }
2611
2612 #[test]
2613 fn row_group_predicate_gt() -> Result<()> {
2614 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2615 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2616
2617 let expr = col("c1").gt(lit(1));
2619 let predicate_expr =
2620 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2621 assert_eq!(predicate_expr.to_string(), expected_expr);
2622
2623 let expr = lit(1).lt(col("c1"));
2625 let predicate_expr =
2626 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2627 assert_eq!(predicate_expr.to_string(), expected_expr);
2628
2629 Ok(())
2630 }
2631
2632 #[test]
2633 fn row_group_predicate_gt_eq() -> Result<()> {
2634 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2635 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2636
2637 let expr = col("c1").gt_eq(lit(1));
2639 let predicate_expr =
2640 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2641 assert_eq!(predicate_expr.to_string(), expected_expr);
2642 let expr = lit(1).lt_eq(col("c1"));
2644 let predicate_expr =
2645 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2646 assert_eq!(predicate_expr.to_string(), expected_expr);
2647
2648 Ok(())
2649 }
2650
2651 #[test]
2652 fn row_group_predicate_lt() -> Result<()> {
2653 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2654 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2655
2656 let expr = col("c1").lt(lit(1));
2658 let predicate_expr =
2659 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2660 assert_eq!(predicate_expr.to_string(), expected_expr);
2661
2662 let expr = lit(1).gt(col("c1"));
2664 let predicate_expr =
2665 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2666 assert_eq!(predicate_expr.to_string(), expected_expr);
2667
2668 Ok(())
2669 }
2670
2671 #[test]
2672 fn row_group_predicate_lt_eq() -> Result<()> {
2673 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2674 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2675
2676 let expr = col("c1").lt_eq(lit(1));
2678 let predicate_expr =
2679 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2680 assert_eq!(predicate_expr.to_string(), expected_expr);
2681 let expr = lit(1).gt_eq(col("c1"));
2683 let predicate_expr =
2684 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2685 assert_eq!(predicate_expr.to_string(), expected_expr);
2686
2687 Ok(())
2688 }
2689
2690 #[test]
2691 fn row_group_predicate_and() -> Result<()> {
2692 let schema = Schema::new(vec![
2693 Field::new("c1", DataType::Int32, false),
2694 Field::new("c2", DataType::Int32, false),
2695 Field::new("c3", DataType::Int32, false),
2696 ]);
2697 let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2699 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2700 let predicate_expr =
2701 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2702 assert_eq!(predicate_expr.to_string(), expected_expr);
2703
2704 Ok(())
2705 }
2706
2707 #[test]
2708 fn row_group_predicate_or() -> Result<()> {
2709 let schema = Schema::new(vec![
2710 Field::new("c1", DataType::Int32, false),
2711 Field::new("c2", DataType::Int32, false),
2712 ]);
2713 let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2715 let expected_expr = "true";
2716 let predicate_expr =
2717 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2718 assert_eq!(predicate_expr.to_string(), expected_expr);
2719
2720 Ok(())
2721 }
2722
2723 #[test]
2724 fn row_group_predicate_not() -> Result<()> {
2725 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2726 let expected_expr = "true";
2727
2728 let expr = col("c1").not();
2729 let predicate_expr =
2730 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2731 assert_eq!(predicate_expr.to_string(), expected_expr);
2732
2733 Ok(())
2734 }
2735
2736 #[test]
2737 fn row_group_predicate_not_bool() -> Result<()> {
2738 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2739 let expected_expr = "NOT c1_min@0 AND c1_max@1";
2740
2741 let expr = col("c1").not();
2742 let predicate_expr =
2743 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2744 assert_eq!(predicate_expr.to_string(), expected_expr);
2745
2746 Ok(())
2747 }
2748
2749 #[test]
2750 fn row_group_predicate_bool() -> Result<()> {
2751 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2752 let expected_expr = "c1_min@0 OR c1_max@1";
2753
2754 let expr = col("c1");
2755 let predicate_expr =
2756 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2757 assert_eq!(predicate_expr.to_string(), expected_expr);
2758
2759 Ok(())
2760 }
2761
2762 #[test]
2763 fn row_group_predicate_lt_bool() -> Result<()> {
2764 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2765 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2766
2767 let expr = col("c1").lt(lit(true));
2770 let predicate_expr =
2771 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2772 assert_eq!(predicate_expr.to_string(), expected_expr);
2773
2774 Ok(())
2775 }
2776
2777 #[test]
2778 fn row_group_predicate_required_columns() -> Result<()> {
2779 let schema = Schema::new(vec![
2780 Field::new("c1", DataType::Int32, false),
2781 Field::new("c2", DataType::Int32, false),
2782 ]);
2783 let mut required_columns = RequiredColumns::new();
2784 let expr = col("c1")
2786 .lt(lit(1))
2787 .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2788 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2789 let predicate_expr =
2790 test_build_predicate_expression(&expr, &schema, &mut required_columns);
2791 assert_eq!(predicate_expr.to_string(), expected_expr);
2792 println!("required_columns: {required_columns:#?}"); let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2795 assert_eq!(
2796 required_columns.columns[0],
2797 (
2798 phys_expr::Column::new("c1", 0),
2799 StatisticsType::Min,
2800 c1_min_field.with_nullable(true) )
2802 );
2803 let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2805 assert_eq!(
2806 required_columns.columns[1],
2807 (
2808 phys_expr::Column::new("c1", 0),
2809 StatisticsType::NullCount,
2810 c1_null_count_field.with_nullable(true) )
2812 );
2813 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2815 assert_eq!(
2816 required_columns.columns[2],
2817 (
2818 phys_expr::Column::new("c1", 0),
2819 StatisticsType::RowCount,
2820 row_count_field.with_nullable(true) )
2822 );
2823 let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2825 assert_eq!(
2826 required_columns.columns[3],
2827 (
2828 phys_expr::Column::new("c2", 1),
2829 StatisticsType::Min,
2830 c2_min_field.with_nullable(true) )
2832 );
2833 let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2834 assert_eq!(
2835 required_columns.columns[4],
2836 (
2837 phys_expr::Column::new("c2", 1),
2838 StatisticsType::Max,
2839 c2_max_field.with_nullable(true) )
2841 );
2842 let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2844 assert_eq!(
2845 required_columns.columns[5],
2846 (
2847 phys_expr::Column::new("c2", 1),
2848 StatisticsType::NullCount,
2849 c2_null_count_field.with_nullable(true) )
2851 );
2852 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2854 assert_eq!(
2855 required_columns.columns[2],
2856 (
2857 phys_expr::Column::new("c1", 0),
2858 StatisticsType::RowCount,
2859 row_count_field.with_nullable(true) )
2861 );
2862 assert_eq!(required_columns.columns.len(), 6);
2864
2865 Ok(())
2866 }
2867
2868 #[test]
2869 fn row_group_predicate_in_list() -> Result<()> {
2870 let schema = Schema::new(vec![
2871 Field::new("c1", DataType::Int32, false),
2872 Field::new("c2", DataType::Int32, false),
2873 ]);
2874 let expr = Expr::InList(InList::new(
2876 Box::new(col("c1")),
2877 vec![lit(1), lit(2), lit(3)],
2878 false,
2879 ));
2880 let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2881 let predicate_expr =
2882 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2883 assert_eq!(predicate_expr.to_string(), expected_expr);
2884
2885 Ok(())
2886 }
2887
2888 #[test]
2889 fn row_group_predicate_in_list_empty() -> Result<()> {
2890 let schema = Schema::new(vec![
2891 Field::new("c1", DataType::Int32, false),
2892 Field::new("c2", DataType::Int32, false),
2893 ]);
2894 let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2896 let expected_expr = "true";
2897 let predicate_expr =
2898 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2899 assert_eq!(predicate_expr.to_string(), expected_expr);
2900
2901 Ok(())
2902 }
2903
2904 #[test]
2905 fn row_group_predicate_in_list_negated() -> Result<()> {
2906 let schema = Schema::new(vec![
2907 Field::new("c1", DataType::Int32, false),
2908 Field::new("c2", DataType::Int32, false),
2909 ]);
2910 let expr = Expr::InList(InList::new(
2912 Box::new(col("c1")),
2913 vec![lit(1), lit(2), lit(3)],
2914 true,
2915 ));
2916 let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2917 let predicate_expr =
2918 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2919 assert_eq!(predicate_expr.to_string(), expected_expr);
2920
2921 Ok(())
2922 }
2923
2924 #[test]
2925 fn row_group_predicate_between() -> Result<()> {
2926 let schema = Schema::new(vec![
2927 Field::new("c1", DataType::Int32, false),
2928 Field::new("c2", DataType::Int32, false),
2929 ]);
2930
2931 let expr1 = col("c1").between(lit(1), lit(5));
2933
2934 let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2936
2937 let predicate_expr1 =
2938 test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2939
2940 let predicate_expr2 =
2941 test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2942 assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2943
2944 Ok(())
2945 }
2946
2947 #[test]
2948 fn row_group_predicate_between_with_in_list() -> Result<()> {
2949 let schema = Schema::new(vec![
2950 Field::new("c1", DataType::Int32, false),
2951 Field::new("c2", DataType::Int32, false),
2952 ]);
2953 let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2955
2956 let expr2 = col("c2").between(lit(4), lit(5));
2958
2959 let expr3 = expr1.and(expr2);
2961
2962 let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2963 let predicate_expr =
2964 test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2965 assert_eq!(predicate_expr.to_string(), expected_expr);
2966
2967 Ok(())
2968 }
2969
2970 #[test]
2971 fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2972 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2973 let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2977
2978 let expected_expr = "true";
2979 let predicate_expr =
2980 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2981 assert_eq!(predicate_expr.to_string(), expected_expr);
2982
2983 Ok(())
2984 }
2985
2986 #[test]
2987 fn row_group_predicate_cast_int_int() -> Result<()> {
2988 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2989 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
2990
2991 let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
2994 let predicate_expr =
2995 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2996 assert_eq!(predicate_expr.to_string(), expected_expr);
2997
2998 let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3000 let predicate_expr =
3001 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3002 assert_eq!(predicate_expr.to_string(), expected_expr);
3003
3004 let expected_expr =
3005 "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3006
3007 let expr =
3009 try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3010 let predicate_expr =
3011 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3012 assert_eq!(predicate_expr.to_string(), expected_expr);
3013
3014 let expr =
3016 lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3017 let predicate_expr =
3018 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3019 assert_eq!(predicate_expr.to_string(), expected_expr);
3020
3021 Ok(())
3022 }
3023
3024 #[test]
3025 fn row_group_predicate_cast_string_string() -> Result<()> {
3026 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3027 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3028
3029 let expr = cast(col("c1"), DataType::Utf8)
3031 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3032 let predicate_expr =
3033 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3034 assert_eq!(predicate_expr.to_string(), expected_expr);
3035
3036 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3038 .eq(cast(col("c1"), DataType::Utf8));
3039 let predicate_expr =
3040 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3041 assert_eq!(predicate_expr.to_string(), expected_expr);
3042
3043 Ok(())
3044 }
3045
3046 #[test]
3047 fn row_group_predicate_cast_string_int() -> Result<()> {
3048 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3049 let expected_expr = "true";
3050
3051 let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3053 let predicate_expr =
3054 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3055 assert_eq!(predicate_expr.to_string(), expected_expr);
3056
3057 let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3059 let predicate_expr =
3060 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3061 assert_eq!(predicate_expr.to_string(), expected_expr);
3062
3063 Ok(())
3064 }
3065
3066 #[test]
3067 fn row_group_predicate_cast_int_string() -> Result<()> {
3068 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3069 let expected_expr = "true";
3070
3071 let expr = cast(col("c1"), DataType::Utf8)
3073 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3074 let predicate_expr =
3075 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3076 assert_eq!(predicate_expr.to_string(), expected_expr);
3077
3078 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3080 .eq(cast(col("c1"), DataType::Utf8));
3081 let predicate_expr =
3082 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3083 assert_eq!(predicate_expr.to_string(), expected_expr);
3084
3085 Ok(())
3086 }
3087
3088 #[test]
3089 fn row_group_predicate_date_date() -> Result<()> {
3090 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3091 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3092
3093 let expr =
3095 cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3096 let predicate_expr =
3097 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3098 assert_eq!(predicate_expr.to_string(), expected_expr);
3099
3100 let expr =
3102 lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3103 let predicate_expr =
3104 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3105 assert_eq!(predicate_expr.to_string(), expected_expr);
3106
3107 Ok(())
3108 }
3109
3110 #[test]
3111 fn row_group_predicate_dict_string_date() -> Result<()> {
3112 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3114 let expected_expr = "true";
3115
3116 let expr = cast(
3118 col("c1"),
3119 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3120 )
3121 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3122 let predicate_expr =
3123 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3124 assert_eq!(predicate_expr.to_string(), expected_expr);
3125
3126 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3128 col("c1"),
3129 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3130 ));
3131 let predicate_expr =
3132 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3133 assert_eq!(predicate_expr.to_string(), expected_expr);
3134
3135 Ok(())
3136 }
3137
3138 #[test]
3139 fn row_group_predicate_date_dict_string() -> Result<()> {
3140 let schema = Schema::new(vec![Field::new(
3142 "c1",
3143 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3144 false,
3145 )]);
3146 let expected_expr = "true";
3147
3148 let expr =
3150 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3151 let predicate_expr =
3152 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3153 assert_eq!(predicate_expr.to_string(), expected_expr);
3154
3155 let expr =
3157 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3158 let predicate_expr =
3159 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3160 assert_eq!(predicate_expr.to_string(), expected_expr);
3161
3162 Ok(())
3163 }
3164
3165 #[test]
3166 fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3167 let schema = Schema::new(vec![Field::new(
3169 "c1",
3170 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3171 false,
3172 )]);
3173
3174 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3176 let predicate_expr =
3177 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3178 let expected_expr =
3179 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3180 assert_eq!(predicate_expr.to_string(), expected_expr);
3181
3182 let expr = cast(
3184 col("c1"),
3185 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3186 )
3187 .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3188 let predicate_expr =
3189 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3190 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3191 assert_eq!(predicate_expr.to_string(), expected_expr);
3192
3193 Ok(())
3194 }
3195
3196 #[test]
3197 fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3198 let schema = Schema::new(vec![Field::new(
3200 "c1",
3201 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3202 false,
3203 )]);
3204 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3205
3206 let expr =
3208 cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3209 let predicate_expr =
3210 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3211 assert_eq!(predicate_expr.to_string(), expected_expr);
3212
3213 Ok(())
3214 }
3215
3216 #[test]
3217 fn row_group_predicate_nested_dict() -> Result<()> {
3218 let schema = Schema::new(vec![Field::new(
3220 "c1",
3221 DataType::Dictionary(
3222 Box::new(DataType::UInt8),
3223 Box::new(DataType::Dictionary(
3224 Box::new(DataType::UInt16),
3225 Box::new(DataType::Utf8),
3226 )),
3227 ),
3228 false,
3229 )]);
3230 let expected_expr =
3231 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3232
3233 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3235 let predicate_expr =
3236 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3237 assert_eq!(predicate_expr.to_string(), expected_expr);
3238
3239 Ok(())
3240 }
3241
3242 #[test]
3243 fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3244 let schema = Schema::new(vec![Field::new(
3246 "c1",
3247 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3248 false,
3249 )]);
3250 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3251
3252 let expr = cast(
3254 col("c1"),
3255 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3256 )
3257 .eq(lit(ScalarValue::Date64(Some(123))));
3258 let predicate_expr =
3259 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3260 assert_eq!(predicate_expr.to_string(), expected_expr);
3261
3262 Ok(())
3263 }
3264
3265 #[test]
3266 fn row_group_predicate_date_string() -> Result<()> {
3267 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3268 let expected_expr = "true";
3269
3270 let expr =
3272 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3273 let predicate_expr =
3274 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3275 assert_eq!(predicate_expr.to_string(), expected_expr);
3276
3277 let expr =
3279 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3280 let predicate_expr =
3281 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3282 assert_eq!(predicate_expr.to_string(), expected_expr);
3283
3284 Ok(())
3285 }
3286
3287 #[test]
3288 fn row_group_predicate_string_date() -> Result<()> {
3289 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3290 let expected_expr = "true";
3291
3292 let expr = cast(col("c1"), DataType::Utf8)
3294 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3295 let predicate_expr =
3296 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3297 assert_eq!(predicate_expr.to_string(), expected_expr);
3298
3299 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3301 .eq(cast(col("c1"), DataType::Utf8));
3302 let predicate_expr =
3303 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3304 assert_eq!(predicate_expr.to_string(), expected_expr);
3305
3306 Ok(())
3307 }
3308
3309 #[test]
3310 fn row_group_predicate_cast_list() -> Result<()> {
3311 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3312 let expr = Expr::InList(InList::new(
3314 Box::new(cast(col("c1"), DataType::Int64)),
3315 vec![
3316 lit(ScalarValue::Int64(Some(1))),
3317 lit(ScalarValue::Int64(Some(2))),
3318 lit(ScalarValue::Int64(Some(3))),
3319 ],
3320 false,
3321 ));
3322 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3323 let predicate_expr =
3324 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3325 assert_eq!(predicate_expr.to_string(), expected_expr);
3326
3327 let expr = Expr::InList(InList::new(
3328 Box::new(cast(col("c1"), DataType::Int64)),
3329 vec![
3330 lit(ScalarValue::Int64(Some(1))),
3331 lit(ScalarValue::Int64(Some(2))),
3332 lit(ScalarValue::Int64(Some(3))),
3333 ],
3334 true,
3335 ));
3336 let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3337 let predicate_expr =
3338 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3339 assert_eq!(predicate_expr.to_string(), expected_expr);
3340
3341 Ok(())
3342 }
3343
3344 #[test]
3345 fn prune_decimal_data() {
3346 let schema = Arc::new(Schema::new(vec![Field::new(
3348 "s1",
3349 DataType::Decimal128(9, 2),
3350 true,
3351 )]));
3352
3353 prune_with_expr(
3354 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3356 &schema,
3357 &TestStatistics::new().with(
3360 "s1",
3361 ContainerStats::new_i32(
3362 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3365 ),
3366 &[false, true, false, true],
3367 );
3368
3369 prune_with_expr(
3370 cast(col("s1"), DataType::Decimal128(14, 3))
3372 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3373 &schema,
3374 &TestStatistics::new().with(
3375 "s1",
3376 ContainerStats::new_i32(
3377 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3380 ),
3381 &[false, true, false, true],
3382 );
3383
3384 prune_with_expr(
3385 try_cast(col("s1"), DataType::Decimal128(14, 3))
3387 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3388 &schema,
3389 &TestStatistics::new().with(
3390 "s1",
3391 ContainerStats::new_i32(
3392 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3395 ),
3396 &[false, true, false, true],
3397 );
3398
3399 let schema = Arc::new(Schema::new(vec![Field::new(
3401 "s1",
3402 DataType::Decimal128(18, 2),
3403 true,
3404 )]));
3405 prune_with_expr(
3406 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3408 &schema,
3409 &TestStatistics::new().with(
3412 "s1",
3413 ContainerStats::new_i64(
3414 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3417 ),
3418 &[false, true, false, true],
3419 );
3420
3421 let schema = Arc::new(Schema::new(vec![Field::new(
3423 "s1",
3424 DataType::Decimal128(23, 2),
3425 true,
3426 )]));
3427
3428 prune_with_expr(
3429 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3431 &schema,
3432 &TestStatistics::new().with(
3433 "s1",
3434 ContainerStats::new_decimal128(
3435 vec![Some(0), Some(400), None, Some(300)], vec![Some(500), Some(600), Some(400), None], 23,
3438 2,
3439 ),
3440 ),
3441 &[false, true, false, true],
3442 );
3443 }
3444
3445 #[test]
3446 fn prune_api() {
3447 let schema = Arc::new(Schema::new(vec![
3448 Field::new("s1", DataType::Utf8, true),
3449 Field::new("s2", DataType::Int32, true),
3450 ]));
3451
3452 let statistics = TestStatistics::new().with(
3453 "s2",
3454 ContainerStats::new_i32(
3455 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), None, None], ),
3458 );
3459 prune_with_expr(
3460 col("s2").gt(lit(5)),
3462 &schema,
3463 &statistics,
3464 &[false, true, true, true],
3469 );
3470
3471 prune_with_expr(
3472 cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3474 &schema,
3475 &statistics,
3476 &[false, true, true, true],
3477 );
3478 }
3479
3480 #[test]
3481 fn prune_not_eq_data() {
3482 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3483
3484 prune_with_expr(
3485 col("s1").not_eq(lit("M")),
3487 &schema,
3488 &TestStatistics::new().with(
3489 "s1",
3490 ContainerStats::new_utf8(
3491 vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], ),
3494 ),
3495 &[true, true, true, false, true, true],
3502 );
3503 }
3504
3505 fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3521 let schema =
3522 Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3523
3524 let statistics = TestStatistics::new().with(
3525 "b1",
3526 ContainerStats::new_bool(
3527 vec![Some(false), Some(false), Some(true), None, Some(false)], vec![Some(false), Some(true), Some(true), None, None], ),
3530 );
3531 let expected_true = vec![false, true, true, true, true];
3532 let expected_false = vec![true, true, false, true, true];
3533
3534 (schema, statistics, expected_true, expected_false)
3535 }
3536
3537 #[test]
3538 fn prune_bool_const_expr() {
3539 let (schema, statistics, _, _) = bool_setup();
3540
3541 prune_with_expr(
3542 lit(true),
3544 &schema,
3545 &statistics,
3546 &[true, true, true, true, true],
3547 );
3548
3549 prune_with_expr(
3550 lit(false),
3552 &schema,
3553 &statistics,
3554 &[false, false, false, false, false],
3555 );
3556 }
3557
3558 #[test]
3559 fn prune_bool_column() {
3560 let (schema, statistics, expected_true, _) = bool_setup();
3561
3562 prune_with_expr(
3563 col("b1"),
3565 &schema,
3566 &statistics,
3567 &expected_true,
3568 );
3569 }
3570
3571 #[test]
3572 fn prune_bool_not_column() {
3573 let (schema, statistics, _, expected_false) = bool_setup();
3574
3575 prune_with_expr(
3576 col("b1").not(),
3578 &schema,
3579 &statistics,
3580 &expected_false,
3581 );
3582 }
3583
3584 #[test]
3585 fn prune_bool_column_eq_true() {
3586 let (schema, statistics, expected_true, _) = bool_setup();
3587
3588 prune_with_expr(
3589 col("b1").eq(lit(true)),
3591 &schema,
3592 &statistics,
3593 &expected_true,
3594 );
3595 }
3596
3597 #[test]
3598 fn prune_bool_not_column_eq_true() {
3599 let (schema, statistics, _, expected_false) = bool_setup();
3600
3601 prune_with_expr(
3602 col("b1").not().eq(lit(true)),
3604 &schema,
3605 &statistics,
3606 &expected_false,
3607 );
3608 }
3609
3610 fn int32_setup() -> (SchemaRef, TestStatistics) {
3620 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3621
3622 let statistics = TestStatistics::new().with(
3623 "i",
3624 ContainerStats::new_i32(
3625 vec![Some(-5), Some(1), Some(-11), None, Some(1)], vec![Some(5), Some(11), Some(-1), None, None], ),
3628 );
3629 (schema, statistics)
3630 }
3631
3632 #[test]
3633 fn prune_int32_col_gt_zero() {
3634 let (schema, statistics) = int32_setup();
3635
3636 let expected_ret = &[true, true, false, true, true];
3643
3644 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3646
3647 prune_with_expr(
3649 Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3650 &schema,
3651 &statistics,
3652 expected_ret,
3653 );
3654 }
3655
3656 #[test]
3657 fn prune_int32_col_lte_zero() {
3658 let (schema, statistics) = int32_setup();
3659
3660 let expected_ret = &[true, false, true, true, false];
3667
3668 prune_with_expr(
3669 col("i").lt_eq(lit(0)),
3671 &schema,
3672 &statistics,
3673 expected_ret,
3674 );
3675
3676 prune_with_expr(
3677 Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3679 &schema,
3680 &statistics,
3681 expected_ret,
3682 );
3683 }
3684
3685 #[test]
3686 fn prune_int32_col_lte_zero_cast() {
3687 let (schema, statistics) = int32_setup();
3688
3689 let expected_ret = &[true, true, true, true, true];
3696
3697 prune_with_expr(
3698 cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3700 &schema,
3701 &statistics,
3702 expected_ret,
3703 );
3704
3705 prune_with_expr(
3706 try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3708 &schema,
3709 &statistics,
3710 expected_ret,
3711 );
3712
3713 prune_with_expr(
3714 cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3716 &schema,
3717 &statistics,
3718 expected_ret,
3719 );
3720
3721 prune_with_expr(
3722 try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3724 &schema,
3725 &statistics,
3726 expected_ret,
3727 );
3728 }
3729
3730 #[test]
3731 fn prune_int32_col_eq_zero() {
3732 let (schema, statistics) = int32_setup();
3733
3734 let expected_ret = &[true, false, false, true, false];
3741
3742 prune_with_expr(
3743 col("i").eq(lit(0)),
3745 &schema,
3746 &statistics,
3747 expected_ret,
3748 );
3749 }
3750
3751 #[test]
3752 fn prune_int32_col_eq_zero_cast() {
3753 let (schema, statistics) = int32_setup();
3754
3755 let expected_ret = &[true, false, false, true, false];
3762
3763 prune_with_expr(
3764 cast(col("i"), DataType::Int64).eq(lit(0i64)),
3765 &schema,
3766 &statistics,
3767 expected_ret,
3768 );
3769
3770 prune_with_expr(
3771 try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3772 &schema,
3773 &statistics,
3774 expected_ret,
3775 );
3776 }
3777
3778 #[test]
3779 fn prune_int32_col_eq_zero_cast_as_str() {
3780 let (schema, statistics) = int32_setup();
3781
3782 let expected_ret = &[true, true, true, true, true];
3792
3793 prune_with_expr(
3794 cast(col("i"), DataType::Utf8).eq(lit("0")),
3795 &schema,
3796 &statistics,
3797 expected_ret,
3798 );
3799 }
3800
3801 #[test]
3802 fn prune_int32_col_lt_neg_one() {
3803 let (schema, statistics) = int32_setup();
3804
3805 let expected_ret = &[true, true, false, true, true];
3812
3813 prune_with_expr(
3814 col("i").gt(lit(-1)),
3816 &schema,
3817 &statistics,
3818 expected_ret,
3819 );
3820
3821 prune_with_expr(
3822 Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3824 &schema,
3825 &statistics,
3826 expected_ret,
3827 );
3828 }
3829
3830 #[test]
3831 fn prune_int32_is_null() {
3832 let (schema, statistics) = int32_setup();
3833
3834 let expected_ret = &[true, true, true, true, true];
3837
3838 prune_with_expr(
3839 col("i").is_null(),
3841 &schema,
3842 &statistics,
3843 expected_ret,
3844 );
3845
3846 let statistics = statistics.with_null_counts(
3848 "i",
3849 vec![
3850 Some(0), Some(1), None, None, Some(0), ],
3856 );
3857
3858 let expected_ret = &[false, true, true, true, false];
3859
3860 prune_with_expr(
3861 col("i").is_null(),
3863 &schema,
3864 &statistics,
3865 expected_ret,
3866 );
3867 }
3868
3869 #[test]
3870 fn prune_int32_column_is_known_all_null() {
3871 let (schema, statistics) = int32_setup();
3872
3873 let expected_ret = &[true, false, true, true, false];
3880
3881 prune_with_expr(
3882 col("i").lt(lit(0)),
3884 &schema,
3885 &statistics,
3886 expected_ret,
3887 );
3888
3889 let statistics = statistics.with_row_counts(
3891 "i",
3892 vec![
3893 Some(10), Some(9), None, Some(4),
3897 Some(10),
3898 ],
3899 );
3900
3901 prune_with_expr(
3903 col("i").lt(lit(0)),
3905 &schema,
3906 &statistics,
3907 expected_ret,
3908 );
3909
3910 let statistics = statistics.with_null_counts(
3912 "i",
3913 vec![
3914 Some(0), Some(1), None, Some(4), Some(0), ],
3920 );
3921
3922 let expected_ret = &[true, false, true, false, false];
3931
3932 prune_with_expr(
3933 col("i").lt(lit(0)),
3935 &schema,
3936 &statistics,
3937 expected_ret,
3938 );
3939 }
3940
3941 #[test]
3942 fn prune_cast_column_scalar() {
3943 let (schema, statistics) = int32_setup();
3945 let expected_ret = &[true, true, false, true, true];
3946
3947 prune_with_expr(
3948 col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3950 &schema,
3951 &statistics,
3952 expected_ret,
3953 );
3954
3955 prune_with_expr(
3956 cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3958 &schema,
3959 &statistics,
3960 expected_ret,
3961 );
3962
3963 prune_with_expr(
3964 try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3966 &schema,
3967 &statistics,
3968 expected_ret,
3969 );
3970
3971 prune_with_expr(
3972 Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3974 .lt(lit(ScalarValue::Int64(Some(0)))),
3975 &schema,
3976 &statistics,
3977 expected_ret,
3978 );
3979 }
3980
3981 #[test]
3982 fn test_increment_utf8() {
3983 assert_eq!(increment_utf8("abc").unwrap(), "abd");
3985 assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3986
3987 assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); assert_eq!(increment_utf8("ß").unwrap(), "à"); assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); assert!(increment_utf8("").is_none());
4003 assert!(increment_utf8("\u{10FFFF}").is_none()); assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4007
4008 assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4010 assert!(increment_utf8("\u{D7FF}").is_none());
4011
4012 assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4014 assert!(increment_utf8("\u{FDCF}").is_none());
4015
4016 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4018 assert!(increment_utf8("\u{10FFFF}").is_none()); }
4020
4021 fn utf8_setup() -> (SchemaRef, TestStatistics) {
4034 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4035
4036 let statistics = TestStatistics::new().with(
4037 "s1",
4038 ContainerStats::new_utf8(
4039 vec![
4040 Some("A"),
4041 Some("A"),
4042 Some("N"),
4043 Some("M"),
4044 None,
4045 Some("A"),
4046 Some(""),
4047 Some(""),
4048 Some("AB"),
4049 Some("A\u{10ffff}\u{10ffff}"),
4050 ], vec![
4052 Some("Z"),
4053 Some("L"),
4054 Some("Z"),
4055 Some("M"),
4056 None,
4057 None,
4058 Some("A"),
4059 Some(""),
4060 Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4061 Some("A\u{10ffff}\u{10ffff}"),
4062 ], ),
4064 );
4065 (schema, statistics)
4066 }
4067
4068 #[test]
4069 fn prune_utf8_eq() {
4070 let (schema, statistics) = utf8_setup();
4071
4072 let expr = col("s1").eq(lit("A"));
4073 #[rustfmt::skip]
4074 let expected_ret = &[
4075 true,
4077 true,
4079 false,
4081 false,
4083 true,
4085 true,
4087 true,
4089 false,
4091 false,
4093 false,
4095 ];
4096 prune_with_expr(expr, &schema, &statistics, expected_ret);
4097
4098 let expr = col("s1").eq(lit(""));
4099 #[rustfmt::skip]
4100 let expected_ret = &[
4101 false,
4103 false,
4105 false,
4107 false,
4109 true,
4111 false,
4113 true,
4115 true,
4117 false,
4119 false,
4121 ];
4122 prune_with_expr(expr, &schema, &statistics, expected_ret);
4123 }
4124
4125 #[test]
4126 fn prune_utf8_not_eq() {
4127 let (schema, statistics) = utf8_setup();
4128
4129 let expr = col("s1").not_eq(lit("A"));
4130 #[rustfmt::skip]
4131 let expected_ret = &[
4132 true,
4134 true,
4136 true,
4138 true,
4140 true,
4142 true,
4144 true,
4146 true,
4148 true,
4150 true,
4152 ];
4153 prune_with_expr(expr, &schema, &statistics, expected_ret);
4154
4155 let expr = col("s1").not_eq(lit(""));
4156 #[rustfmt::skip]
4157 let expected_ret = &[
4158 true,
4160 true,
4162 true,
4164 true,
4166 true,
4168 true,
4170 true,
4172 false,
4174 true,
4176 true,
4178 ];
4179 prune_with_expr(expr, &schema, &statistics, expected_ret);
4180 }
4181
4182 #[test]
4183 fn prune_utf8_like_one() {
4184 let (schema, statistics) = utf8_setup();
4185
4186 let expr = col("s1").like(lit("A_"));
4187 #[rustfmt::skip]
4188 let expected_ret = &[
4189 true,
4191 true,
4193 false,
4195 false,
4197 true,
4199 true,
4201 true,
4203 false,
4205 true,
4207 true,
4209 ];
4210 prune_with_expr(expr, &schema, &statistics, expected_ret);
4211
4212 let expr = col("s1").like(lit("_A_"));
4213 #[rustfmt::skip]
4214 let expected_ret = &[
4215 true,
4217 true,
4219 true,
4221 true,
4223 true,
4225 true,
4227 true,
4229 true,
4231 true,
4233 true,
4235 ];
4236 prune_with_expr(expr, &schema, &statistics, expected_ret);
4237
4238 let expr = col("s1").like(lit("_"));
4239 #[rustfmt::skip]
4240 let expected_ret = &[
4241 true,
4243 true,
4245 true,
4247 true,
4249 true,
4251 true,
4253 true,
4255 true,
4257 true,
4259 true,
4261 ];
4262 prune_with_expr(expr, &schema, &statistics, expected_ret);
4263
4264 let expr = col("s1").like(lit(""));
4265 #[rustfmt::skip]
4266 let expected_ret = &[
4267 false,
4269 false,
4271 false,
4273 false,
4275 true,
4277 false,
4279 true,
4281 true,
4283 false,
4285 false,
4287 ];
4288 prune_with_expr(expr, &schema, &statistics, expected_ret);
4289 }
4290
4291 #[test]
4292 fn prune_utf8_like_many() {
4293 let (schema, statistics) = utf8_setup();
4294
4295 let expr = col("s1").like(lit("A%"));
4296 #[rustfmt::skip]
4297 let expected_ret = &[
4298 true,
4300 true,
4302 false,
4304 false,
4306 true,
4308 true,
4310 true,
4312 false,
4314 true,
4316 true,
4318 ];
4319 prune_with_expr(expr, &schema, &statistics, expected_ret);
4320
4321 let expr = col("s1").like(lit("%A%"));
4322 #[rustfmt::skip]
4323 let expected_ret = &[
4324 true,
4326 true,
4328 true,
4330 true,
4332 true,
4334 true,
4336 true,
4338 true,
4340 true,
4342 true,
4344 ];
4345 prune_with_expr(expr, &schema, &statistics, expected_ret);
4346
4347 let expr = col("s1").like(lit("%"));
4348 #[rustfmt::skip]
4349 let expected_ret = &[
4350 true,
4352 true,
4354 true,
4356 true,
4358 true,
4360 true,
4362 true,
4364 true,
4366 true,
4368 true,
4370 ];
4371 prune_with_expr(expr, &schema, &statistics, expected_ret);
4372
4373 let expr = col("s1").like(lit(""));
4374 #[rustfmt::skip]
4375 let expected_ret = &[
4376 false,
4378 false,
4380 false,
4382 false,
4384 true,
4386 false,
4388 true,
4390 true,
4392 false,
4394 false,
4396 ];
4397 prune_with_expr(expr, &schema, &statistics, expected_ret);
4398 }
4399
4400 #[test]
4401 fn prune_utf8_not_like_one() {
4402 let (schema, statistics) = utf8_setup();
4403
4404 let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4405 #[rustfmt::skip]
4406 let expected_ret = &[
4407 true,
4409 true,
4411 true,
4413 true,
4415 true,
4417 true,
4419 true,
4421 true,
4423 true,
4425 true,
4428 ];
4429 prune_with_expr(expr, &schema, &statistics, expected_ret);
4430 }
4431
4432 #[test]
4433 fn prune_utf8_not_like_many() {
4434 let (schema, statistics) = utf8_setup();
4435
4436 let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4437 #[rustfmt::skip]
4438 let expected_ret = &[
4439 true,
4441 true,
4443 true,
4445 true,
4447 true,
4449 true,
4451 true,
4453 true,
4455 true,
4457 false,
4459 ];
4460 prune_with_expr(expr, &schema, &statistics, expected_ret);
4461
4462 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4463 #[rustfmt::skip]
4464 let expected_ret = &[
4465 true,
4467 true,
4469 true,
4471 true,
4473 true,
4475 true,
4477 true,
4479 true,
4481 true,
4483 true,
4485 ];
4486 prune_with_expr(expr, &schema, &statistics, expected_ret);
4487
4488 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4489 #[rustfmt::skip]
4490 let expected_ret = &[
4491 true,
4493 true,
4495 true,
4497 true,
4499 true,
4501 true,
4503 true,
4505 true,
4507 true,
4509 true,
4511 ];
4512 prune_with_expr(expr, &schema, &statistics, expected_ret);
4513
4514 let expr = col("s1").not_like(lit("A\\%%"));
4515 let statistics = TestStatistics::new().with(
4516 "s1",
4517 ContainerStats::new_utf8(
4518 vec![Some("A%a"), Some("A")],
4519 vec![Some("A%c"), Some("A")],
4520 ),
4521 );
4522 let expected_ret = &[false, true];
4523 prune_with_expr(expr, &schema, &statistics, expected_ret);
4524 }
4525
4526 #[test]
4527 fn test_rewrite_expr_to_prunable() {
4528 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4529 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4530
4531 let left_input = col("a");
4533 let left_input = logical2physical(&left_input, &schema);
4534 let right_input = lit(ScalarValue::Int32(Some(12)));
4535 let right_input = logical2physical(&right_input, &schema);
4536 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4537 &left_input,
4538 Operator::Eq,
4539 &right_input,
4540 df_schema.clone(),
4541 )
4542 .unwrap();
4543 assert_eq!(result_left.to_string(), left_input.to_string());
4544 assert_eq!(result_right.to_string(), right_input.to_string());
4545
4546 let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4548 let left_input = logical2physical(&left_input, &schema);
4549 let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4550 let right_input = logical2physical(&right_input, &schema);
4551 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4552 &left_input,
4553 Operator::Gt,
4554 &right_input,
4555 df_schema.clone(),
4556 )
4557 .unwrap();
4558 assert_eq!(result_left.to_string(), left_input.to_string());
4559 assert_eq!(result_right.to_string(), right_input.to_string());
4560
4561 let left_input = try_cast(col("a"), DataType::Int64);
4563 let left_input = logical2physical(&left_input, &schema);
4564 let right_input = lit(ScalarValue::Int64(Some(12)));
4565 let right_input = logical2physical(&right_input, &schema);
4566 let (result_left, _, result_right) =
4567 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4568 .unwrap();
4569 assert_eq!(result_left.to_string(), left_input.to_string());
4570 assert_eq!(result_right.to_string(), right_input.to_string());
4571
4572 }
4574
4575 #[test]
4576 fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4577 struct CustomUnhandledHook;
4578
4579 impl UnhandledPredicateHook for CustomUnhandledHook {
4580 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4584 Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4585 }
4586 }
4587
4588 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4589 let schema_with_b = Schema::new(vec![
4590 Field::new("a", DataType::Int32, true),
4591 Field::new("b", DataType::Int32, true),
4592 ]);
4593
4594 let rewriter = PredicateRewriter::new()
4595 .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4596
4597 let transform_expr = |expr| {
4598 let expr = logical2physical(&expr, &schema_with_b);
4599 rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4600 };
4601
4602 let known_expression = col("a").eq(lit(12));
4604 let known_expression_transformed = PredicateRewriter::new()
4605 .rewrite_predicate_to_statistics_predicate(
4606 &logical2physical(&known_expression, &schema),
4607 &schema,
4608 );
4609
4610 let input = col("b").eq(lit(12));
4612 let expected = logical2physical(&lit(42), &schema);
4613 let transformed = transform_expr(input.clone());
4614 assert_eq!(transformed.to_string(), expected.to_string());
4615
4616 let input = known_expression.clone().and(input.clone());
4618 let expected = phys_expr::BinaryExpr::new(
4619 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4620 Operator::And,
4621 logical2physical(&lit(42), &schema),
4622 );
4623 let transformed = transform_expr(input.clone());
4624 assert_eq!(transformed.to_string(), expected.to_string());
4625
4626 let input = array_has(make_array(vec![lit(1)]), col("a"));
4628 let expected = logical2physical(&lit(42), &schema);
4629 let transformed = transform_expr(input.clone());
4630 assert_eq!(transformed.to_string(), expected.to_string());
4631
4632 let input = known_expression.and(input);
4634 let expected = phys_expr::BinaryExpr::new(
4635 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4636 Operator::And,
4637 logical2physical(&lit(42), &schema),
4638 );
4639 let transformed = transform_expr(input.clone());
4640 assert_eq!(transformed.to_string(), expected.to_string());
4641 }
4642
4643 #[test]
4644 fn test_rewrite_expr_to_prunable_error() {
4645 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4648 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4649 let left_input = cast(col("a"), DataType::Int64);
4650 let left_input = logical2physical(&left_input, &schema);
4651 let right_input = lit(ScalarValue::Int64(Some(12)));
4652 let right_input = logical2physical(&right_input, &schema);
4653 let result = rewrite_expr_to_prunable(
4654 &left_input,
4655 Operator::Gt,
4656 &right_input,
4657 df_schema.clone(),
4658 );
4659 assert!(result.is_err());
4660
4661 let left_input = is_null(col("a"));
4663 let left_input = logical2physical(&left_input, &schema);
4664 let right_input = lit(ScalarValue::Int64(Some(12)));
4665 let right_input = logical2physical(&right_input, &schema);
4666 let result =
4667 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4668 assert!(result.is_err());
4669 }
4671
4672 #[test]
4673 fn prune_with_contained_one_column() {
4674 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4675
4676 let statistics = TestStatistics::new()
4678 .with_contained(
4679 "s1",
4680 [ScalarValue::from("foo")],
4681 [
4682 Some(true),
4684 Some(false),
4686 None,
4688 Some(true),
4690 Some(false),
4692 None,
4694 Some(true),
4696 Some(false),
4698 None,
4700 ],
4701 )
4702 .with_contained(
4703 "s1",
4704 [ScalarValue::from("bar")],
4705 [
4706 Some(true),
4708 Some(true),
4709 Some(true),
4710 Some(false),
4712 Some(false),
4713 Some(false),
4714 None,
4716 None,
4717 None,
4718 ],
4719 )
4720 .with_contained(
4721 "s1",
4724 [ScalarValue::from("foo"), ScalarValue::from("bar")],
4725 [
4726 None,
4728 None,
4729 None,
4730 Some(true),
4732 Some(true),
4733 Some(true),
4734 Some(false),
4736 Some(false),
4737 Some(false),
4738 ],
4739 );
4740
4741 prune_with_expr(
4743 col("s1").eq(lit("foo")),
4744 &schema,
4745 &statistics,
4746 &[true, false, true, true, false, true, true, false, true],
4748 );
4749
4750 prune_with_expr(
4752 col("s1").eq(lit("bar")),
4753 &schema,
4754 &statistics,
4755 &[true, true, true, false, false, false, true, true, true],
4757 );
4758
4759 prune_with_expr(
4761 col("s1").eq(lit("baz")),
4762 &schema,
4763 &statistics,
4764 &[true, true, true, true, true, true, true, true, true],
4766 );
4767
4768 prune_with_expr(
4770 col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4771 &schema,
4772 &statistics,
4773 &[true, true, true, true, true, true, true, true, true],
4777 );
4778
4779 prune_with_expr(
4781 col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4782 &schema,
4783 &statistics,
4784 &[true, true, true, true, true, true, false, false, false],
4786 );
4787
4788 prune_with_expr(
4790 col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4791 &schema,
4792 &statistics,
4793 &[true, true, true, true, true, true, true, true, true],
4795 );
4796
4797 prune_with_expr(
4799 col("s1")
4800 .eq(lit("foo"))
4801 .or(col("s1").eq(lit("bar")))
4802 .or(col("s1").eq(lit("baz"))),
4803 &schema,
4804 &statistics,
4805 &[true, true, true, true, true, true, true, true, true],
4808 );
4809
4810 prune_with_expr(
4812 col("s1").not_eq(lit("foo")),
4813 &schema,
4814 &statistics,
4815 &[false, true, true, false, true, true, false, true, true],
4817 );
4818
4819 prune_with_expr(
4821 col("s1").not_eq(lit("bar")),
4822 &schema,
4823 &statistics,
4824 &[false, false, false, true, true, true, true, true, true],
4826 );
4827
4828 prune_with_expr(
4830 col("s1")
4831 .not_eq(lit("foo"))
4832 .and(col("s1").not_eq(lit("bar"))),
4833 &schema,
4834 &statistics,
4835 &[true, true, true, false, false, false, true, true, true],
4837 );
4838
4839 prune_with_expr(
4841 col("s1")
4842 .not_eq(lit("foo"))
4843 .and(col("s1").not_eq(lit("bar")))
4844 .and(col("s1").not_eq(lit("baz"))),
4845 &schema,
4846 &statistics,
4847 &[true, true, true, true, true, true, true, true, true],
4849 );
4850
4851 prune_with_expr(
4853 col("s1")
4854 .not_eq(lit("foo"))
4855 .or(col("s1").not_eq(lit("bar"))),
4856 &schema,
4857 &statistics,
4858 &[true, true, true, true, true, true, true, true, true],
4860 );
4861
4862 prune_with_expr(
4864 col("s1")
4865 .not_eq(lit("foo"))
4866 .or(col("s1").not_eq(lit("bar")))
4867 .or(col("s1").not_eq(lit("baz"))),
4868 &schema,
4869 &statistics,
4870 &[true, true, true, true, true, true, true, true, true],
4872 );
4873 }
4874
4875 #[test]
4876 fn prune_with_contained_two_columns() {
4877 let schema = Arc::new(Schema::new(vec![
4878 Field::new("s1", DataType::Utf8, true),
4879 Field::new("s2", DataType::Utf8, true),
4880 ]));
4881
4882 let statistics = TestStatistics::new()
4884 .with_contained(
4885 "s1",
4886 [ScalarValue::from("foo")],
4887 [
4888 Some(true),
4890 Some(false),
4892 None,
4894 Some(true),
4896 Some(false),
4898 None,
4900 Some(true),
4902 Some(false),
4904 None,
4906 ],
4907 )
4908 .with_contained(
4909 "s2", [ScalarValue::from("bar")],
4911 [
4912 Some(true),
4914 Some(true),
4915 Some(true),
4916 Some(false),
4918 Some(false),
4919 Some(false),
4920 None,
4922 None,
4923 None,
4924 ],
4925 );
4926
4927 prune_with_expr(
4929 col("s1").eq(lit("foo")),
4930 &schema,
4931 &statistics,
4932 &[true, false, true, true, false, true, true, false, true],
4934 );
4935
4936 let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4938 prune_with_expr(
4939 expr,
4940 &schema,
4941 &statistics,
4942 &[true, true, true, true, true, true, true, true, true],
4944 );
4945
4946 prune_with_expr(
4948 col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4949 &schema,
4950 &statistics,
4951 &[false, false, false, true, false, true, true, false, true],
4955 );
4956
4957 prune_with_expr(
4959 col("s1")
4960 .not_eq(lit("foo"))
4961 .and(col("s2").not_eq(lit("bar"))),
4962 &schema,
4963 &statistics,
4964 &[false, false, false, false, true, true, false, true, true],
4968 );
4969
4970 prune_with_expr(
4972 col("s1")
4973 .not_eq(lit("foo"))
4974 .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4975 &schema,
4976 &statistics,
4977 &[false, true, true, false, true, true, false, true, true],
4980 );
4981
4982 prune_with_expr(
4984 col("s1").like(lit("foo%bar%")),
4985 &schema,
4986 &statistics,
4987 &[true, true, true, true, true, true, true, true, true],
4989 );
4990
4991 prune_with_expr(
4993 col("s1")
4994 .like(lit("foo%bar%"))
4995 .and(col("s2").eq(lit("bar"))),
4996 &schema,
4997 &statistics,
4998 &[true, true, true, false, false, false, true, true, true],
5000 );
5001
5002 prune_with_expr(
5004 col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5005 &schema,
5006 &statistics,
5007 &[true, true, true, true, true, true, true, true, true],
5010 );
5011 }
5012
5013 #[test]
5014 fn prune_with_range_and_contained() {
5015 let schema = Arc::new(Schema::new(vec![
5017 Field::new("i", DataType::Int32, true),
5018 Field::new("s", DataType::Utf8, true),
5019 ]));
5020
5021 let statistics = TestStatistics::new()
5022 .with(
5023 "i",
5024 ContainerStats::new_i32(
5025 vec![
5029 Some(-5),
5030 Some(10),
5031 None,
5032 Some(-5),
5033 Some(10),
5034 None,
5035 Some(-5),
5036 Some(10),
5037 None,
5038 ], vec![
5040 Some(5),
5041 Some(20),
5042 None,
5043 Some(5),
5044 Some(20),
5045 None,
5046 Some(5),
5047 Some(20),
5048 None,
5049 ], ),
5051 )
5052 .with_contained(
5054 "s",
5055 [ScalarValue::from("foo")],
5056 [
5057 Some(true),
5059 Some(true),
5060 Some(true),
5061 Some(false),
5063 Some(false),
5064 Some(false),
5065 None,
5067 None,
5068 None,
5069 ],
5070 );
5071
5072 prune_with_expr(
5074 col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5075 &schema,
5076 &statistics,
5077 &[true, false, true, false, false, false, true, false, true],
5082 );
5083
5084 prune_with_expr(
5086 col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5087 &schema,
5088 &statistics,
5089 &[false, false, false, true, false, true, true, false, true],
5093 );
5094
5095 prune_with_expr(
5097 col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5098 &schema,
5099 &statistics,
5100 &[true, true, true, true, true, true, true, true, true],
5103 );
5104 }
5105
5106 fn prune_with_expr(
5113 expr: Expr,
5114 schema: &SchemaRef,
5115 statistics: &TestStatistics,
5116 expected: &[bool],
5117 ) {
5118 println!("Pruning with expr: {expr}");
5119 let expr = logical2physical(&expr, schema);
5120 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5121 let result = p.prune(statistics).unwrap();
5122 assert_eq!(result, expected);
5123 }
5124
5125 fn test_build_predicate_expression(
5126 expr: &Expr,
5127 schema: &Schema,
5128 required_columns: &mut RequiredColumns,
5129 ) -> Arc<dyn PhysicalExpr> {
5130 let expr = logical2physical(expr, schema);
5131 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5132 build_predicate_expression(
5133 &expr,
5134 &Arc::new(schema.clone()),
5135 required_columns,
5136 &unhandled_hook,
5137 )
5138 }
5139
5140 #[test]
5141 fn test_build_predicate_expression_with_false() {
5142 let expr = lit(ScalarValue::Boolean(Some(false)));
5143 let schema = Schema::empty();
5144 let res =
5145 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5146 let expected = logical2physical(&expr, &schema);
5147 assert_eq!(&res, &expected);
5148 }
5149
5150 #[test]
5151 fn test_build_predicate_expression_with_and_false() {
5152 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5153 let expr = and(
5154 col("c1").eq(lit("a")),
5155 lit(ScalarValue::Boolean(Some(false))),
5156 );
5157 let res =
5158 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5159 let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5160 assert_eq!(&res, &expected);
5161 }
5162
5163 #[test]
5164 fn test_build_predicate_expression_with_or_false() {
5165 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5166 let left_expr = col("c1").eq(lit("a"));
5167 let right_expr = lit(ScalarValue::Boolean(Some(false)));
5168 let res = test_build_predicate_expression(
5169 &or(left_expr.clone(), right_expr.clone()),
5170 &schema,
5171 &mut RequiredColumns::new(),
5172 );
5173 let expected =
5174 "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5175 assert_eq!(res.to_string(), expected);
5176 }
5177}