1use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27 array::{new_null_array, ArrayRef, BooleanArray},
28 datatypes::{DataType, Field, Schema, SchemaRef},
29 record_batch::{RecordBatch, RecordBatchOptions},
30};
31pub use datafusion_common::pruning::PruningStatistics;
33use log::{debug, trace};
34
35use datafusion_common::error::{DataFusionError, Result};
36use datafusion_common::tree_node::TransformedResult;
37use datafusion_common::{
38 internal_err, plan_datafusion_err, plan_err,
39 tree_node::{Transformed, TreeNode},
40 ScalarValue,
41};
42use datafusion_common::{Column, DFSchema};
43use datafusion_expr_common::operator::Operator;
44use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
45use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
46use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
47use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
48
49#[derive(Debug, Clone)]
361pub struct PruningPredicate {
362 schema: SchemaRef,
364 predicate_expr: Arc<dyn PhysicalExpr>,
367 required_columns: RequiredColumns,
369 orig_expr: Arc<dyn PhysicalExpr>,
372 literal_guarantees: Vec<LiteralGuarantee>,
377}
378
379pub trait UnhandledPredicateHook {
383 fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
386}
387
388#[derive(Debug, Clone)]
391struct ConstantUnhandledPredicateHook {
392 default: Arc<dyn PhysicalExpr>,
393}
394
395impl Default for ConstantUnhandledPredicateHook {
396 fn default() -> Self {
397 Self {
398 default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
399 }
400 }
401}
402
403impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
404 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
405 Arc::clone(&self.default)
406 }
407}
408
409impl PruningPredicate {
410 pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
433 let expr = snapshot_physical_expr(expr)?;
436 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
437
438 let mut required_columns = RequiredColumns::new();
440 let predicate_expr = build_predicate_expression(
441 &expr,
442 schema.as_ref(),
443 &mut required_columns,
444 &unhandled_hook,
445 );
446
447 let literal_guarantees = LiteralGuarantee::analyze(&expr);
448
449 Ok(Self {
450 schema,
451 predicate_expr,
452 required_columns,
453 orig_expr: expr,
454 literal_guarantees,
455 })
456 }
457
458 pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
473 let mut builder = BoolVecBuilder::new(statistics.num_containers());
474
475 for literal_guarantee in &self.literal_guarantees {
478 let LiteralGuarantee {
479 column,
480 guarantee,
481 literals,
482 } = literal_guarantee;
483 if let Some(results) = statistics.contained(column, literals) {
484 match guarantee {
485 Guarantee::In => builder.combine_array(&results),
490 Guarantee::NotIn => {
496 builder.combine_array(&arrow::compute::not(&results)?)
497 }
498 }
499 if builder.check_all_pruned() {
502 return Ok(builder.build());
503 }
504 }
505 }
506
507 let statistics_batch =
513 build_statistics_record_batch(statistics, &self.required_columns)?;
514
515 builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
517
518 Ok(builder.build())
519 }
520
521 pub fn schema(&self) -> &SchemaRef {
523 &self.schema
524 }
525
526 pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
528 &self.orig_expr
529 }
530
531 pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
533 &self.predicate_expr
534 }
535
536 pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
542 &self.literal_guarantees
543 }
544
545 pub fn always_true(&self) -> bool {
552 is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
553 }
554
555 #[allow(dead_code)]
557 pub fn required_columns(&self) -> &RequiredColumns {
558 &self.required_columns
559 }
560
561 pub fn literal_columns(&self) -> Vec<String> {
569 let mut seen = HashSet::new();
570 self.literal_guarantees
571 .iter()
572 .map(|e| &e.column.name)
573 .filter(|name| seen.insert(*name))
575 .map(|s| s.to_string())
576 .collect()
577 }
578}
579
580#[derive(Debug)]
582struct BoolVecBuilder {
583 inner: Vec<bool>,
587}
588
589impl BoolVecBuilder {
590 fn new(num_containers: usize) -> Self {
592 Self {
593 inner: vec![true; num_containers],
595 }
596 }
597
598 fn combine_array(&mut self, array: &BooleanArray) {
606 assert_eq!(array.len(), self.inner.len());
607 for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
608 if let Some(false) = new {
612 *cur = false;
613 }
614 }
615 }
616
617 fn combine_value(&mut self, value: ColumnarValue) {
623 match value {
624 ColumnarValue::Array(array) => {
625 self.combine_array(array.as_boolean());
626 }
627 ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
628 self.inner = vec![false; self.inner.len()];
630 }
631 _ => {
632 }
635 }
636 }
637
638 fn build(self) -> Vec<bool> {
640 self.inner
641 }
642
643 fn check_all_pruned(&self) -> bool {
645 self.inner.iter().all(|&x| !x)
646 }
647}
648
649fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
650 expr.as_any()
651 .downcast_ref::<phys_expr::Literal>()
652 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
653 .unwrap_or_default()
654}
655
656fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
657 expr.as_any()
658 .downcast_ref::<phys_expr::Literal>()
659 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
660 .unwrap_or_default()
661}
662
663#[derive(Debug, Default, Clone)]
673pub struct RequiredColumns {
674 columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
680}
681
682impl RequiredColumns {
683 fn new() -> Self {
684 Self::default()
685 }
686
687 #[allow(dead_code)]
696 pub fn single_column(&self) -> Option<&phys_expr::Column> {
698 if self.columns.windows(2).all(|w| {
699 let c1 = &w[0].0;
701 let c2 = &w[1].0;
702 c1 == c2
703 }) {
704 self.columns.first().map(|r| &r.0)
705 } else {
706 None
707 }
708 }
709
710 pub(crate) fn iter(
713 &self,
714 ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
715 self.columns.iter()
716 }
717
718 fn find_stat_column(
719 &self,
720 column: &phys_expr::Column,
721 statistics_type: StatisticsType,
722 ) -> Option<usize> {
723 match statistics_type {
724 StatisticsType::RowCount => {
725 self.columns
727 .iter()
728 .enumerate()
729 .find(|(_i, (_c, t, _f))| t == &statistics_type)
730 .map(|(i, (_c, _t, _f))| i)
731 }
732 _ => self
733 .columns
734 .iter()
735 .enumerate()
736 .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
737 .map(|(i, (_c, _t, _f))| i),
738 }
739 }
740
741 fn stat_column_expr(
750 &mut self,
751 column: &phys_expr::Column,
752 column_expr: &Arc<dyn PhysicalExpr>,
753 field: &Field,
754 stat_type: StatisticsType,
755 ) -> Result<Arc<dyn PhysicalExpr>> {
756 let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
757 Some(idx) => (idx, false),
758 None => (self.columns.len(), true),
759 };
760
761 let column_name = column.name();
762 let stat_column_name = match stat_type {
763 StatisticsType::Min => format!("{column_name}_min"),
764 StatisticsType::Max => format!("{column_name}_max"),
765 StatisticsType::NullCount => format!("{column_name}_null_count"),
766 StatisticsType::RowCount => "row_count".to_string(),
767 };
768
769 let stat_column = phys_expr::Column::new(&stat_column_name, idx);
770
771 if need_to_insert {
773 let nullable = true;
775 let stat_field =
776 Field::new(stat_column.name(), field.data_type().clone(), nullable);
777 self.columns.push((column.clone(), stat_type, stat_field));
778 }
779 rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
780 }
781
782 fn min_column_expr(
784 &mut self,
785 column: &phys_expr::Column,
786 column_expr: &Arc<dyn PhysicalExpr>,
787 field: &Field,
788 ) -> Result<Arc<dyn PhysicalExpr>> {
789 self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
790 }
791
792 fn max_column_expr(
794 &mut self,
795 column: &phys_expr::Column,
796 column_expr: &Arc<dyn PhysicalExpr>,
797 field: &Field,
798 ) -> Result<Arc<dyn PhysicalExpr>> {
799 self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
800 }
801
802 fn null_count_column_expr(
804 &mut self,
805 column: &phys_expr::Column,
806 column_expr: &Arc<dyn PhysicalExpr>,
807 field: &Field,
808 ) -> Result<Arc<dyn PhysicalExpr>> {
809 self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
810 }
811
812 fn row_count_column_expr(
814 &mut self,
815 column: &phys_expr::Column,
816 column_expr: &Arc<dyn PhysicalExpr>,
817 field: &Field,
818 ) -> Result<Arc<dyn PhysicalExpr>> {
819 self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
820 }
821}
822
823impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
824 fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
825 Self { columns }
826 }
827}
828
829fn build_statistics_record_batch<S: PruningStatistics>(
855 statistics: &S,
856 required_columns: &RequiredColumns,
857) -> Result<RecordBatch> {
858 let mut fields = Vec::<Field>::new();
859 let mut arrays = Vec::<ArrayRef>::new();
860 for (column, statistics_type, stat_field) in required_columns.iter() {
862 let column = Column::from_name(column.name());
863 let data_type = stat_field.data_type();
864
865 let num_containers = statistics.num_containers();
866
867 let array = match statistics_type {
868 StatisticsType::Min => statistics.min_values(&column),
869 StatisticsType::Max => statistics.max_values(&column),
870 StatisticsType::NullCount => statistics.null_counts(&column),
871 StatisticsType::RowCount => statistics.row_counts(&column),
872 };
873 let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
874
875 if num_containers != array.len() {
876 return internal_err!(
877 "mismatched statistics length. Expected {}, got {}",
878 num_containers,
879 array.len()
880 );
881 }
882
883 let array = arrow::compute::cast(&array, data_type)?;
886
887 fields.push(stat_field.clone());
888 arrays.push(array);
889 }
890
891 let schema = Arc::new(Schema::new(fields));
892 let mut options = RecordBatchOptions::default();
894 options.row_count = Some(statistics.num_containers());
895
896 trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
897
898 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
899 plan_datafusion_err!("Can not create statistics record batch: {err}")
900 })
901}
902
903struct PruningExpressionBuilder<'a> {
904 column: phys_expr::Column,
905 column_expr: Arc<dyn PhysicalExpr>,
906 op: Operator,
907 scalar_expr: Arc<dyn PhysicalExpr>,
908 field: &'a Field,
909 required_columns: &'a mut RequiredColumns,
910}
911
912impl<'a> PruningExpressionBuilder<'a> {
913 fn try_new(
914 left: &'a Arc<dyn PhysicalExpr>,
915 right: &'a Arc<dyn PhysicalExpr>,
916 op: Operator,
917 schema: &'a Schema,
918 required_columns: &'a mut RequiredColumns,
919 ) -> Result<Self> {
920 let left_columns = collect_columns(left);
922 let right_columns = collect_columns(right);
923 let (column_expr, scalar_expr, columns, correct_operator) =
924 match (left_columns.len(), right_columns.len()) {
925 (1, 0) => (left, right, left_columns, op),
926 (0, 1) => (right, left, right_columns, reverse_operator(op)?),
927 _ => {
928 return plan_err!(
930 "Multi-column expressions are not currently supported"
931 );
932 }
933 };
934
935 let df_schema = DFSchema::try_from(schema.clone())?;
936 let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
937 column_expr,
938 correct_operator,
939 scalar_expr,
940 df_schema,
941 )?;
942 let column = columns.iter().next().unwrap().clone();
943 let field = match schema.column_with_name(column.name()) {
944 Some((_, f)) => f,
945 _ => {
946 return plan_err!("Field not found in schema");
947 }
948 };
949
950 Ok(Self {
951 column,
952 column_expr,
953 op: correct_operator,
954 scalar_expr,
955 field,
956 required_columns,
957 })
958 }
959
960 fn op(&self) -> Operator {
961 self.op
962 }
963
964 fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
965 &self.scalar_expr
966 }
967
968 fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
969 self.required_columns
970 .min_column_expr(&self.column, &self.column_expr, self.field)
971 }
972
973 fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
974 self.required_columns
975 .max_column_expr(&self.column, &self.column_expr, self.field)
976 }
977
978 fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
985 let column_expr = Arc::new(self.column.clone()) as _;
987
988 let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
990
991 self.required_columns.null_count_column_expr(
992 &self.column,
993 &column_expr,
994 null_count_field,
995 )
996 }
997
998 fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1005 let column_expr = Arc::new(self.column.clone()) as _;
1007
1008 let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1010
1011 self.required_columns.row_count_column_expr(
1012 &self.column,
1013 &column_expr,
1014 row_count_field,
1015 )
1016 }
1017}
1018
1019fn rewrite_expr_to_prunable(
1032 column_expr: &PhysicalExprRef,
1033 op: Operator,
1034 scalar_expr: &PhysicalExprRef,
1035 schema: DFSchema,
1036) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1037 if !is_compare_op(op) {
1038 return plan_err!("rewrite_expr_to_prunable only support compare expression");
1039 }
1040
1041 let column_expr_any = column_expr.as_any();
1042
1043 if column_expr_any
1044 .downcast_ref::<phys_expr::Column>()
1045 .is_some()
1046 {
1047 Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1049 } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1050 let arrow_schema: SchemaRef = schema.clone().into();
1052 let from_type = cast.expr().data_type(&arrow_schema)?;
1053 verify_support_type_for_prune(&from_type, cast.cast_type())?;
1054 let (left, op, right) =
1055 rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1056 let left = Arc::new(phys_expr::CastExpr::new(
1057 left,
1058 cast.cast_type().clone(),
1059 None,
1060 ));
1061 Ok((left, op, right))
1062 } else if let Some(try_cast) =
1063 column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1064 {
1065 let arrow_schema: SchemaRef = schema.clone().into();
1067 let from_type = try_cast.expr().data_type(&arrow_schema)?;
1068 verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1069 let (left, op, right) =
1070 rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1071 let left = Arc::new(phys_expr::TryCastExpr::new(
1072 left,
1073 try_cast.cast_type().clone(),
1074 ));
1075 Ok((left, op, right))
1076 } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1077 let (left, op, right) =
1079 rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1080 let right = Arc::new(phys_expr::NegativeExpr::new(right));
1081 Ok((left, reverse_operator(op)?, right))
1082 } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1083 if op != Operator::Eq && op != Operator::NotEq {
1085 return plan_err!("Not with operator other than Eq / NotEq is not supported");
1086 }
1087 if not
1088 .arg()
1089 .as_any()
1090 .downcast_ref::<phys_expr::Column>()
1091 .is_some()
1092 {
1093 let left = Arc::clone(not.arg());
1094 let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1095 Ok((left, reverse_operator(op)?, right))
1096 } else {
1097 plan_err!("Not with complex expression {column_expr:?} is not supported")
1098 }
1099 } else {
1100 plan_err!("column expression {column_expr:?} is not supported")
1101 }
1102}
1103
1104fn is_compare_op(op: Operator) -> bool {
1105 matches!(
1106 op,
1107 Operator::Eq
1108 | Operator::NotEq
1109 | Operator::Lt
1110 | Operator::LtEq
1111 | Operator::Gt
1112 | Operator::GtEq
1113 | Operator::LikeMatch
1114 | Operator::NotLikeMatch
1115 )
1116}
1117
1118fn is_string_type(data_type: &DataType) -> bool {
1119 matches!(
1120 data_type,
1121 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1122 )
1123}
1124
1125fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1130 let from_type = match from_type {
1132 DataType::Dictionary(_, t) => {
1133 return verify_support_type_for_prune(t.as_ref(), to_type)
1134 }
1135 _ => from_type,
1136 };
1137 let to_type = match to_type {
1138 DataType::Dictionary(_, t) => {
1139 return verify_support_type_for_prune(from_type, t.as_ref())
1140 }
1141 _ => to_type,
1142 };
1143 if is_string_type(from_type) == is_string_type(to_type) {
1147 Ok(())
1148 } else {
1149 plan_err!(
1150 "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1151 )
1152 }
1153}
1154
1155fn rewrite_column_expr(
1157 e: Arc<dyn PhysicalExpr>,
1158 column_old: &phys_expr::Column,
1159 column_new: &phys_expr::Column,
1160) -> Result<Arc<dyn PhysicalExpr>> {
1161 e.transform(|expr| {
1162 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1163 if column == column_old {
1164 return Ok(Transformed::yes(Arc::new(column_new.clone())));
1165 }
1166 }
1167
1168 Ok(Transformed::no(expr))
1169 })
1170 .data()
1171}
1172
1173fn reverse_operator(op: Operator) -> Result<Operator> {
1174 op.swap().ok_or_else(|| {
1175 DataFusionError::Internal(format!(
1176 "Could not reverse operator {op} while building pruning predicate"
1177 ))
1178 })
1179}
1180
1181fn build_single_column_expr(
1186 column: &phys_expr::Column,
1187 schema: &Schema,
1188 required_columns: &mut RequiredColumns,
1189 is_not: bool, ) -> Option<Arc<dyn PhysicalExpr>> {
1191 let field = schema.field_with_name(column.name()).ok()?;
1192
1193 if matches!(field.data_type(), &DataType::Boolean) {
1194 let col_ref = Arc::new(column.clone()) as _;
1195
1196 let min = required_columns
1197 .min_column_expr(column, &col_ref, field)
1198 .ok()?;
1199 let max = required_columns
1200 .max_column_expr(column, &col_ref, field)
1201 .ok()?;
1202
1203 if is_not {
1207 Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1210 phys_expr::BinaryExpr::new(min, Operator::And, max),
1211 ))))
1212 } else {
1213 Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1216 }
1217 } else {
1218 None
1219 }
1220}
1221
1222fn build_is_null_column_expr(
1231 expr: &Arc<dyn PhysicalExpr>,
1232 schema: &Schema,
1233 required_columns: &mut RequiredColumns,
1234 with_not: bool,
1235) -> Option<Arc<dyn PhysicalExpr>> {
1236 if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1237 let field = schema.field_with_name(col.name()).ok()?;
1238
1239 let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1240 if with_not {
1241 if let Ok(row_count_expr) =
1242 required_columns.row_count_column_expr(col, expr, null_count_field)
1243 {
1244 required_columns
1245 .null_count_column_expr(col, expr, null_count_field)
1246 .map(|null_count_column_expr| {
1247 Arc::new(phys_expr::BinaryExpr::new(
1249 null_count_column_expr,
1250 Operator::NotEq,
1251 row_count_expr,
1252 )) as _
1253 })
1254 .ok()
1255 } else {
1256 None
1257 }
1258 } else {
1259 required_columns
1260 .null_count_column_expr(col, expr, null_count_field)
1261 .map(|null_count_column_expr| {
1262 Arc::new(phys_expr::BinaryExpr::new(
1264 null_count_column_expr,
1265 Operator::Gt,
1266 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1267 )) as _
1268 })
1269 .ok()
1270 }
1271 } else {
1272 None
1273 }
1274}
1275
1276const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1279
1280pub struct PredicateRewriter {
1283 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1284}
1285
1286impl Default for PredicateRewriter {
1287 fn default() -> Self {
1288 Self {
1289 unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1290 }
1291 }
1292}
1293
1294impl PredicateRewriter {
1295 pub fn new() -> Self {
1297 Self::default()
1298 }
1299
1300 pub fn with_unhandled_hook(
1302 self,
1303 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1304 ) -> Self {
1305 Self { unhandled_hook }
1306 }
1307
1308 pub fn rewrite_predicate_to_statistics_predicate(
1318 &self,
1319 expr: &Arc<dyn PhysicalExpr>,
1320 schema: &Schema,
1321 ) -> Arc<dyn PhysicalExpr> {
1322 let mut required_columns = RequiredColumns::new();
1323 build_predicate_expression(
1324 expr,
1325 schema,
1326 &mut required_columns,
1327 &self.unhandled_hook,
1328 )
1329 }
1330}
1331
1332fn build_predicate_expression(
1342 expr: &Arc<dyn PhysicalExpr>,
1343 schema: &Schema,
1344 required_columns: &mut RequiredColumns,
1345 unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1346) -> Arc<dyn PhysicalExpr> {
1347 if is_always_false(expr) {
1348 return Arc::clone(expr);
1351 }
1352 let expr_any = expr.as_any();
1354 if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1355 return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1356 .unwrap_or_else(|| unhandled_hook.handle(expr));
1357 }
1358 if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1359 return build_is_null_column_expr(
1360 is_not_null.arg(),
1361 schema,
1362 required_columns,
1363 true,
1364 )
1365 .unwrap_or_else(|| unhandled_hook.handle(expr));
1366 }
1367 if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1368 return build_single_column_expr(col, schema, required_columns, false)
1369 .unwrap_or_else(|| unhandled_hook.handle(expr));
1370 }
1371 if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1372 if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1374 return build_single_column_expr(col, schema, required_columns, true)
1375 .unwrap_or_else(|| unhandled_hook.handle(expr));
1376 } else {
1377 return unhandled_hook.handle(expr);
1378 }
1379 }
1380 if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1381 if !in_list.list().is_empty()
1382 && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1383 {
1384 let eq_op = if in_list.negated() {
1385 Operator::NotEq
1386 } else {
1387 Operator::Eq
1388 };
1389 let re_op = if in_list.negated() {
1390 Operator::And
1391 } else {
1392 Operator::Or
1393 };
1394 let change_expr = in_list
1395 .list()
1396 .iter()
1397 .map(|e| {
1398 Arc::new(phys_expr::BinaryExpr::new(
1399 Arc::clone(in_list.expr()),
1400 eq_op,
1401 Arc::clone(e),
1402 )) as _
1403 })
1404 .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1405 .unwrap();
1406 return build_predicate_expression(
1407 &change_expr,
1408 schema,
1409 required_columns,
1410 unhandled_hook,
1411 );
1412 } else {
1413 return unhandled_hook.handle(expr);
1414 }
1415 }
1416
1417 let (left, op, right) = {
1418 if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1419 (
1420 Arc::clone(bin_expr.left()),
1421 *bin_expr.op(),
1422 Arc::clone(bin_expr.right()),
1423 )
1424 } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1425 if like_expr.case_insensitive() {
1426 return unhandled_hook.handle(expr);
1427 }
1428 let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1429 (false, false) => Operator::LikeMatch,
1430 (true, false) => Operator::NotLikeMatch,
1431 (false, true) => Operator::ILikeMatch,
1432 (true, true) => Operator::NotILikeMatch,
1433 };
1434 (
1435 Arc::clone(like_expr.expr()),
1436 op,
1437 Arc::clone(like_expr.pattern()),
1438 )
1439 } else {
1440 return unhandled_hook.handle(expr);
1441 }
1442 };
1443
1444 if op == Operator::And || op == Operator::Or {
1445 let left_expr =
1446 build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1447 let right_expr =
1448 build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1449 let expr = match (&left_expr, op, &right_expr) {
1451 (left, Operator::And, right)
1452 if is_always_false(left) || is_always_false(right) =>
1453 {
1454 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1455 }
1456 (left, Operator::And, _) if is_always_true(left) => right_expr,
1457 (_, Operator::And, right) if is_always_true(right) => left_expr,
1458 (left, Operator::Or, right)
1459 if is_always_true(left) || is_always_true(right) =>
1460 {
1461 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1462 }
1463 (left, Operator::Or, _) if is_always_false(left) => right_expr,
1464 (_, Operator::Or, right) if is_always_false(right) => left_expr,
1465
1466 _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1467 };
1468 return expr;
1469 }
1470
1471 let expr_builder =
1472 PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1473 let mut expr_builder = match expr_builder {
1474 Ok(builder) => builder,
1475 Err(e) => {
1478 debug!("Error building pruning expression: {e}");
1479 return unhandled_hook.handle(expr);
1480 }
1481 };
1482
1483 build_statistics_expr(&mut expr_builder)
1484 .unwrap_or_else(|_| unhandled_hook.handle(expr))
1485}
1486
1487fn build_statistics_expr(
1488 expr_builder: &mut PruningExpressionBuilder,
1489) -> Result<Arc<dyn PhysicalExpr>> {
1490 let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1491 Operator::NotEq => {
1492 let min_column_expr = expr_builder.min_column_expr()?;
1496 let max_column_expr = expr_builder.max_column_expr()?;
1497 Arc::new(phys_expr::BinaryExpr::new(
1498 Arc::new(phys_expr::BinaryExpr::new(
1499 min_column_expr,
1500 Operator::NotEq,
1501 Arc::clone(expr_builder.scalar_expr()),
1502 )),
1503 Operator::Or,
1504 Arc::new(phys_expr::BinaryExpr::new(
1505 Arc::clone(expr_builder.scalar_expr()),
1506 Operator::NotEq,
1507 max_column_expr,
1508 )),
1509 ))
1510 }
1511 Operator::Eq => {
1512 let min_column_expr = expr_builder.min_column_expr()?;
1515 let max_column_expr = expr_builder.max_column_expr()?;
1516 Arc::new(phys_expr::BinaryExpr::new(
1517 Arc::new(phys_expr::BinaryExpr::new(
1518 min_column_expr,
1519 Operator::LtEq,
1520 Arc::clone(expr_builder.scalar_expr()),
1521 )),
1522 Operator::And,
1523 Arc::new(phys_expr::BinaryExpr::new(
1524 Arc::clone(expr_builder.scalar_expr()),
1525 Operator::LtEq,
1526 max_column_expr,
1527 )),
1528 ))
1529 }
1530 Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1531 Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1532 plan_datafusion_err!(
1533 "LIKE expression with wildcard at the beginning is not supported"
1534 )
1535 })?,
1536 Operator::Gt => {
1537 Arc::new(phys_expr::BinaryExpr::new(
1539 expr_builder.max_column_expr()?,
1540 Operator::Gt,
1541 Arc::clone(expr_builder.scalar_expr()),
1542 ))
1543 }
1544 Operator::GtEq => {
1545 Arc::new(phys_expr::BinaryExpr::new(
1547 expr_builder.max_column_expr()?,
1548 Operator::GtEq,
1549 Arc::clone(expr_builder.scalar_expr()),
1550 ))
1551 }
1552 Operator::Lt => {
1553 Arc::new(phys_expr::BinaryExpr::new(
1555 expr_builder.min_column_expr()?,
1556 Operator::Lt,
1557 Arc::clone(expr_builder.scalar_expr()),
1558 ))
1559 }
1560 Operator::LtEq => {
1561 Arc::new(phys_expr::BinaryExpr::new(
1563 expr_builder.min_column_expr()?,
1564 Operator::LtEq,
1565 Arc::clone(expr_builder.scalar_expr()),
1566 ))
1567 }
1568 _ => {
1570 return plan_err!(
1571 "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1572 );
1573 }
1574 };
1575 let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1576 Ok(statistics_expr)
1577}
1578
1579fn unpack_string(s: &ScalarValue) -> Option<&str> {
1581 s.try_as_str().flatten()
1582}
1583
1584fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1585 if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1586 let s = unpack_string(lit.value())?;
1587 return Some(s);
1588 }
1589 None
1590}
1591
1592fn build_like_match(
1596 expr_builder: &mut PruningExpressionBuilder,
1597) -> Option<Arc<dyn PhysicalExpr>> {
1598 let min_column_expr = expr_builder.min_column_expr().ok()?;
1607 let max_column_expr = expr_builder.max_column_expr().ok()?;
1608 let scalar_expr = expr_builder.scalar_expr();
1609 let s = extract_string_literal(scalar_expr)?;
1611 let first_wildcard_index = s.find(['%', '_']);
1613 if first_wildcard_index == Some(0) {
1614 return None;
1616 }
1617 let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1618 let prefix = &s[..wildcard_index];
1619 let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1620 prefix.to_string(),
1621 ))));
1622 let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1623 increment_utf8(prefix)?,
1624 ))));
1625 (lower_bound_lit, upper_bound_lit)
1626 } else {
1627 let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1629 s.to_string(),
1630 ))));
1631 (Arc::clone(&bound), bound)
1632 };
1633 let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1634 lower_bound,
1635 Operator::LtEq,
1636 Arc::clone(&max_column_expr),
1637 ));
1638 let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1639 Arc::clone(&min_column_expr),
1640 Operator::LtEq,
1641 upper_bound,
1642 ));
1643 let combined = Arc::new(phys_expr::BinaryExpr::new(
1644 upper_bound_expr,
1645 Operator::And,
1646 lower_bound_expr,
1647 ));
1648 Some(combined)
1649}
1650
1651fn build_not_like_match(
1657 expr_builder: &mut PruningExpressionBuilder<'_>,
1658) -> Result<Arc<dyn PhysicalExpr>> {
1659 let min_column_expr = expr_builder.min_column_expr()?;
1662 let max_column_expr = expr_builder.max_column_expr()?;
1663
1664 let scalar_expr = expr_builder.scalar_expr();
1665
1666 let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1667 plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1668 })?;
1669
1670 let (const_prefix, remaining) = split_constant_prefix(pattern);
1671 if const_prefix.is_empty() || remaining != "%" {
1672 return Err(plan_datafusion_err!(
1684 "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1685 ));
1686 }
1687
1688 let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1689 true,
1690 false,
1691 Arc::clone(&min_column_expr),
1692 Arc::clone(scalar_expr),
1693 ));
1694
1695 let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1696 true,
1697 false,
1698 Arc::clone(&max_column_expr),
1699 Arc::clone(scalar_expr),
1700 ));
1701
1702 Ok(Arc::new(phys_expr::BinaryExpr::new(
1703 min_col_not_like_epxr,
1704 Operator::Or,
1705 max_col_not_like_expr,
1706 )))
1707}
1708
1709fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1711 let char_indices = pattern.char_indices().collect::<Vec<_>>();
1712 for i in 0..char_indices.len() {
1713 let (idx, char) = char_indices[i];
1714 if char == '%' || char == '_' {
1715 if i != 0 && char_indices[i - 1].1 == '\\' {
1716 continue;
1718 }
1719 return (&pattern[..idx], &pattern[idx..]);
1720 }
1721 }
1722 (pattern, "")
1723}
1724
1725fn increment_utf8(data: &str) -> Option<String> {
1733 fn is_valid_unicode(c: char) -> bool {
1735 let cp = c as u32;
1736
1737 if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1739 return false;
1740 }
1741
1742 if cp >= 0x110000 {
1744 return false;
1745 }
1746
1747 true
1748 }
1749
1750 let mut code_points: Vec<char> = data.chars().collect();
1752
1753 for idx in (0..code_points.len()).rev() {
1755 let original = code_points[idx] as u32;
1756
1757 if let Some(next_char) = char::from_u32(original + 1) {
1759 if is_valid_unicode(next_char) {
1760 code_points[idx] = next_char;
1761 code_points.truncate(idx + 1);
1763 return Some(code_points.into_iter().collect());
1764 }
1765 }
1766 }
1767
1768 None
1769}
1770
1771fn wrap_null_count_check_expr(
1792 statistics_expr: Arc<dyn PhysicalExpr>,
1793 expr_builder: &mut PruningExpressionBuilder,
1794) -> Result<Arc<dyn PhysicalExpr>> {
1795 let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1797 expr_builder.null_count_column_expr()?,
1798 Operator::NotEq,
1799 expr_builder.row_count_column_expr()?,
1800 ));
1801
1802 Ok(Arc::new(phys_expr::BinaryExpr::new(
1804 not_when_null_count_eq_row_count,
1805 Operator::And,
1806 statistics_expr,
1807 )))
1808}
1809
1810#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1811pub(crate) enum StatisticsType {
1812 Min,
1813 Max,
1814 NullCount,
1815 RowCount,
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820 use std::collections::HashMap;
1821 use std::ops::{Not, Rem};
1822
1823 use super::*;
1824 use datafusion_common::test_util::batches_to_string;
1825 use datafusion_expr::{and, col, lit, or};
1826 use insta::assert_snapshot;
1827
1828 use arrow::array::Decimal128Array;
1829 use arrow::{
1830 array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1831 datatypes::TimeUnit,
1832 };
1833 use datafusion_expr::expr::InList;
1834 use datafusion_expr::{cast, is_null, try_cast, Expr};
1835 use datafusion_functions_nested::expr_fn::{array_has, make_array};
1836 use datafusion_physical_expr::expressions as phys_expr;
1837 use datafusion_physical_expr::planner::logical2physical;
1838
1839 #[derive(Debug, Default)]
1840 struct ContainerStats {
1848 min: Option<ArrayRef>,
1849 max: Option<ArrayRef>,
1850 null_counts: Option<ArrayRef>,
1852 row_counts: Option<ArrayRef>,
1853 contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1857 }
1858
1859 impl ContainerStats {
1860 fn new() -> Self {
1861 Default::default()
1862 }
1863 fn new_decimal128(
1864 min: impl IntoIterator<Item = Option<i128>>,
1865 max: impl IntoIterator<Item = Option<i128>>,
1866 precision: u8,
1867 scale: i8,
1868 ) -> Self {
1869 Self::new()
1870 .with_min(Arc::new(
1871 min.into_iter()
1872 .collect::<Decimal128Array>()
1873 .with_precision_and_scale(precision, scale)
1874 .unwrap(),
1875 ))
1876 .with_max(Arc::new(
1877 max.into_iter()
1878 .collect::<Decimal128Array>()
1879 .with_precision_and_scale(precision, scale)
1880 .unwrap(),
1881 ))
1882 }
1883
1884 fn new_i64(
1885 min: impl IntoIterator<Item = Option<i64>>,
1886 max: impl IntoIterator<Item = Option<i64>>,
1887 ) -> Self {
1888 Self::new()
1889 .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1890 .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1891 }
1892
1893 fn new_i32(
1894 min: impl IntoIterator<Item = Option<i32>>,
1895 max: impl IntoIterator<Item = Option<i32>>,
1896 ) -> Self {
1897 Self::new()
1898 .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1899 .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1900 }
1901
1902 fn new_utf8<'a>(
1903 min: impl IntoIterator<Item = Option<&'a str>>,
1904 max: impl IntoIterator<Item = Option<&'a str>>,
1905 ) -> Self {
1906 Self::new()
1907 .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1908 .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1909 }
1910
1911 fn new_bool(
1912 min: impl IntoIterator<Item = Option<bool>>,
1913 max: impl IntoIterator<Item = Option<bool>>,
1914 ) -> Self {
1915 Self::new()
1916 .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1917 .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1918 }
1919
1920 fn min(&self) -> Option<ArrayRef> {
1921 self.min.clone()
1922 }
1923
1924 fn max(&self) -> Option<ArrayRef> {
1925 self.max.clone()
1926 }
1927
1928 fn null_counts(&self) -> Option<ArrayRef> {
1929 self.null_counts.clone()
1930 }
1931
1932 fn row_counts(&self) -> Option<ArrayRef> {
1933 self.row_counts.clone()
1934 }
1935
1936 fn arrays(&self) -> Vec<ArrayRef> {
1938 let contained_arrays = self
1939 .contained
1940 .iter()
1941 .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
1942
1943 [
1944 self.min.as_ref().cloned(),
1945 self.max.as_ref().cloned(),
1946 self.null_counts.as_ref().cloned(),
1947 self.row_counts.as_ref().cloned(),
1948 ]
1949 .into_iter()
1950 .flatten()
1951 .chain(contained_arrays)
1952 .collect()
1953 }
1954
1955 fn len(&self) -> usize {
1959 self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
1961 }
1962
1963 fn assert_invariants(&self) {
1965 let mut prev_len = None;
1966
1967 for len in self.arrays().iter().map(|a| a.len()) {
1968 match prev_len {
1970 None => {
1971 prev_len = Some(len);
1972 }
1973 Some(prev_len) => {
1974 assert_eq!(prev_len, len);
1975 }
1976 }
1977 }
1978 }
1979
1980 fn with_min(mut self, min: ArrayRef) -> Self {
1982 self.min = Some(min);
1983 self
1984 }
1985
1986 fn with_max(mut self, max: ArrayRef) -> Self {
1988 self.max = Some(max);
1989 self
1990 }
1991
1992 fn with_null_counts(
1995 mut self,
1996 counts: impl IntoIterator<Item = Option<u64>>,
1997 ) -> Self {
1998 let null_counts: ArrayRef =
1999 Arc::new(counts.into_iter().collect::<UInt64Array>());
2000
2001 self.assert_invariants();
2002 self.null_counts = Some(null_counts);
2003 self
2004 }
2005
2006 fn with_row_counts(
2009 mut self,
2010 counts: impl IntoIterator<Item = Option<u64>>,
2011 ) -> Self {
2012 let row_counts: ArrayRef =
2013 Arc::new(counts.into_iter().collect::<UInt64Array>());
2014
2015 self.assert_invariants();
2016 self.row_counts = Some(row_counts);
2017 self
2018 }
2019
2020 pub fn with_contained(
2022 mut self,
2023 values: impl IntoIterator<Item = ScalarValue>,
2024 contained: impl IntoIterator<Item = Option<bool>>,
2025 ) -> Self {
2026 let contained: BooleanArray = contained.into_iter().collect();
2027 let values: HashSet<_> = values.into_iter().collect();
2028
2029 self.contained.push((values, contained));
2030 self.assert_invariants();
2031 self
2032 }
2033
2034 fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2036 self.contained
2038 .iter()
2039 .find(|(values, _contained)| values == find_values)
2040 .map(|(_values, contained)| contained.clone())
2041 }
2042 }
2043
2044 #[derive(Debug, Default)]
2045 struct TestStatistics {
2046 stats: HashMap<Column, ContainerStats>,
2048 }
2049
2050 impl TestStatistics {
2051 fn new() -> Self {
2052 Self::default()
2053 }
2054
2055 fn with(
2056 mut self,
2057 name: impl Into<String>,
2058 container_stats: ContainerStats,
2059 ) -> Self {
2060 let col = Column::from_name(name.into());
2061 self.stats.insert(col, container_stats);
2062 self
2063 }
2064
2065 fn with_null_counts(
2069 mut self,
2070 name: impl Into<String>,
2071 counts: impl IntoIterator<Item = Option<u64>>,
2072 ) -> Self {
2073 let col = Column::from_name(name.into());
2074
2075 let container_stats = self
2077 .stats
2078 .remove(&col)
2079 .unwrap_or_default()
2080 .with_null_counts(counts);
2081
2082 self.stats.insert(col, container_stats);
2084 self
2085 }
2086
2087 fn with_row_counts(
2091 mut self,
2092 name: impl Into<String>,
2093 counts: impl IntoIterator<Item = Option<u64>>,
2094 ) -> Self {
2095 let col = Column::from_name(name.into());
2096
2097 let container_stats = self
2099 .stats
2100 .remove(&col)
2101 .unwrap_or_default()
2102 .with_row_counts(counts);
2103
2104 self.stats.insert(col, container_stats);
2106 self
2107 }
2108
2109 fn with_contained(
2111 mut self,
2112 name: impl Into<String>,
2113 values: impl IntoIterator<Item = ScalarValue>,
2114 contained: impl IntoIterator<Item = Option<bool>>,
2115 ) -> Self {
2116 let col = Column::from_name(name.into());
2117
2118 let container_stats = self
2120 .stats
2121 .remove(&col)
2122 .unwrap_or_default()
2123 .with_contained(values, contained);
2124
2125 self.stats.insert(col, container_stats);
2127 self
2128 }
2129 }
2130
2131 impl PruningStatistics for TestStatistics {
2132 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2133 self.stats
2134 .get(column)
2135 .map(|container_stats| container_stats.min())
2136 .unwrap_or(None)
2137 }
2138
2139 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2140 self.stats
2141 .get(column)
2142 .map(|container_stats| container_stats.max())
2143 .unwrap_or(None)
2144 }
2145
2146 fn num_containers(&self) -> usize {
2147 self.stats
2148 .values()
2149 .next()
2150 .map(|container_stats| container_stats.len())
2151 .unwrap_or(0)
2152 }
2153
2154 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2155 self.stats
2156 .get(column)
2157 .map(|container_stats| container_stats.null_counts())
2158 .unwrap_or(None)
2159 }
2160
2161 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2162 self.stats
2163 .get(column)
2164 .map(|container_stats| container_stats.row_counts())
2165 .unwrap_or(None)
2166 }
2167
2168 fn contained(
2169 &self,
2170 column: &Column,
2171 values: &HashSet<ScalarValue>,
2172 ) -> Option<BooleanArray> {
2173 self.stats
2174 .get(column)
2175 .and_then(|container_stats| container_stats.contained(values))
2176 }
2177 }
2178
2179 struct OneContainerStats {
2181 min_values: Option<ArrayRef>,
2182 max_values: Option<ArrayRef>,
2183 num_containers: usize,
2184 }
2185
2186 impl PruningStatistics for OneContainerStats {
2187 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2188 self.min_values.clone()
2189 }
2190
2191 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2192 self.max_values.clone()
2193 }
2194
2195 fn num_containers(&self) -> usize {
2196 self.num_containers
2197 }
2198
2199 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2200 None
2201 }
2202
2203 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2204 None
2205 }
2206
2207 fn contained(
2208 &self,
2209 _column: &Column,
2210 _values: &HashSet<ScalarValue>,
2211 ) -> Option<BooleanArray> {
2212 None
2213 }
2214 }
2215
2216 #[test]
2219 fn test_unique_row_count_field_and_column() {
2220 let schema: SchemaRef = Arc::new(Schema::new(vec![
2222 Field::new("c1", DataType::Int32, true),
2223 Field::new("c2", DataType::Int32, true),
2224 ]));
2225 let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2226 let expr = logical2physical(&expr, &schema);
2227 let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2228 assert_eq!(
2230 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2231 p.predicate_expr.to_string()
2232 );
2233
2234 let mut fields = HashSet::new();
2237 for (_col, _ty, field) in p.required_columns().iter() {
2238 let was_new = fields.insert(field);
2239 if !was_new {
2240 panic!(
2241 "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2242 );
2243 }
2244 }
2245 }
2246
2247 #[test]
2248 fn prune_all_rows_null_counts() {
2249 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2252 let statistics = TestStatistics::new().with(
2253 "i",
2254 ContainerStats::new_i32(
2255 vec![Some(0)], vec![Some(0)], )
2258 .with_null_counts(vec![Some(1)])
2259 .with_row_counts(vec![Some(1)]),
2260 );
2261 let expected_ret = &[false];
2262 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2263
2264 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2266 let container_stats = ContainerStats {
2267 min: Some(Arc::new(Int32Array::from(vec![None]))),
2268 max: Some(Arc::new(Int32Array::from(vec![None]))),
2269 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2270 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2271 ..ContainerStats::default()
2272 };
2273 let statistics = TestStatistics::new().with("i", container_stats);
2274 let expected_ret = &[false];
2275 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2276
2277 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2279 let container_stats = ContainerStats {
2280 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2281 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2282 null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2283 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2284 ..ContainerStats::default()
2285 };
2286 let statistics = TestStatistics::new().with("i", container_stats);
2287 let expected_ret = &[true];
2288 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2289 let expected_ret = &[false];
2290 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2291
2292 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2294 let container_stats = ContainerStats {
2295 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2296 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2297 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2298 row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2299 ..ContainerStats::default()
2300 };
2301 let statistics = TestStatistics::new().with("i", container_stats);
2302 let expected_ret = &[true];
2303 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2304 let expected_ret = &[false];
2305 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2306 }
2307
2308 #[test]
2309 fn prune_missing_statistics() {
2310 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2313 let container_stats = ContainerStats {
2314 min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2315 max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2316 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2317 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2318 ..ContainerStats::default()
2319 };
2320 let statistics = TestStatistics::new().with("i", container_stats);
2321 let expected_ret = &[true, true];
2322 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2323 let expected_ret = &[false, true];
2324 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2325 let expected_ret = &[true, false];
2326 prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2327 }
2328
2329 #[test]
2330 fn prune_null_stats() {
2331 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2334
2335 let statistics = TestStatistics::new().with(
2336 "i",
2337 ContainerStats::new_i32(
2338 vec![Some(0)], vec![Some(0)], )
2341 .with_null_counts(vec![Some(1)])
2342 .with_row_counts(vec![Some(1)]),
2343 );
2344
2345 let expected_ret = &[false];
2346
2347 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2349 }
2350
2351 #[test]
2352 fn test_build_statistics_record_batch() {
2353 let required_columns = RequiredColumns::from(vec![
2355 (
2357 phys_expr::Column::new("s1", 1),
2358 StatisticsType::Min,
2359 Field::new("s1_min", DataType::Int32, true),
2360 ),
2361 (
2363 phys_expr::Column::new("s2", 2),
2364 StatisticsType::Max,
2365 Field::new("s2_max", DataType::Int32, true),
2366 ),
2367 (
2369 phys_expr::Column::new("s3", 3),
2370 StatisticsType::Max,
2371 Field::new("s3_max", DataType::Utf8, true),
2372 ),
2373 (
2375 phys_expr::Column::new("s3", 3),
2376 StatisticsType::Min,
2377 Field::new("s3_min", DataType::Utf8, true),
2378 ),
2379 ]);
2380
2381 let statistics = TestStatistics::new()
2382 .with(
2383 "s1",
2384 ContainerStats::new_i32(
2385 vec![None, None, Some(9), None], vec![Some(10), None, None, None], ),
2388 )
2389 .with(
2390 "s2",
2391 ContainerStats::new_i32(
2392 vec![Some(2), None, None, None], vec![Some(20), None, None, None], ),
2395 )
2396 .with(
2397 "s3",
2398 ContainerStats::new_utf8(
2399 vec![Some("a"), None, None, None], vec![Some("q"), None, Some("r"), None], ),
2402 );
2403
2404 let batch =
2405 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2406 assert_snapshot!(batches_to_string(&[batch]), @r"
2407 +--------+--------+--------+--------+
2408 | s1_min | s2_max | s3_max | s3_min |
2409 +--------+--------+--------+--------+
2410 | | 20 | q | a |
2411 | | | | |
2412 | 9 | | r | |
2413 | | | | |
2414 +--------+--------+--------+--------+
2415 ");
2416 }
2417
2418 #[test]
2419 fn test_build_statistics_casting() {
2420 let required_columns = RequiredColumns::from(vec![(
2425 phys_expr::Column::new("s3", 3),
2426 StatisticsType::Min,
2427 Field::new(
2428 "s1_min",
2429 DataType::Timestamp(TimeUnit::Nanosecond, None),
2430 true,
2431 ),
2432 )]);
2433
2434 let statistics = OneContainerStats {
2436 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2437 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2438 num_containers: 1,
2439 };
2440
2441 let batch =
2442 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2443
2444 assert_snapshot!(batches_to_string(&[batch]), @r"
2445 +-------------------------------+
2446 | s1_min |
2447 +-------------------------------+
2448 | 1970-01-01T00:00:00.000000010 |
2449 +-------------------------------+
2450 ");
2451 }
2452
2453 #[test]
2454 fn test_build_statistics_no_required_stats() {
2455 let required_columns = RequiredColumns::new();
2456
2457 let statistics = OneContainerStats {
2458 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2459 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2460 num_containers: 1,
2461 };
2462
2463 let batch =
2464 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2465 assert_eq!(batch.num_rows(), 1); }
2467
2468 #[test]
2469 fn test_build_statistics_inconsistent_types() {
2470 let required_columns = RequiredColumns::from(vec![(
2474 phys_expr::Column::new("s3", 3),
2475 StatisticsType::Min,
2476 Field::new("s1_min", DataType::Utf8, true),
2477 )]);
2478
2479 let statistics = OneContainerStats {
2481 min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2482 max_values: None,
2483 num_containers: 1,
2484 };
2485
2486 let batch =
2487 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2488 assert_snapshot!(batches_to_string(&[batch]), @r"
2489 +--------+
2490 | s1_min |
2491 +--------+
2492 | |
2493 +--------+
2494 ");
2495 }
2496
2497 #[test]
2498 fn test_build_statistics_inconsistent_length() {
2499 let required_columns = RequiredColumns::from(vec![(
2501 phys_expr::Column::new("s1", 3),
2502 StatisticsType::Min,
2503 Field::new("s1_min", DataType::Int64, true),
2504 )]);
2505
2506 let statistics = OneContainerStats {
2508 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2509 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2510 num_containers: 3,
2511 };
2512
2513 let result =
2514 build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2515 assert!(
2516 result
2517 .to_string()
2518 .contains("mismatched statistics length. Expected 3, got 1"),
2519 "{}",
2520 result
2521 );
2522 }
2523
2524 #[test]
2525 fn row_group_predicate_eq() -> Result<()> {
2526 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2527 let expected_expr =
2528 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2529
2530 let expr = col("c1").eq(lit(1));
2532 let predicate_expr =
2533 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2534 assert_eq!(predicate_expr.to_string(), expected_expr);
2535
2536 let expr = lit(1).eq(col("c1"));
2538 let predicate_expr =
2539 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2540 assert_eq!(predicate_expr.to_string(), expected_expr);
2541
2542 Ok(())
2543 }
2544
2545 #[test]
2546 fn row_group_predicate_not_eq() -> Result<()> {
2547 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2548 let expected_expr =
2549 "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2550
2551 let expr = col("c1").not_eq(lit(1));
2553 let predicate_expr =
2554 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2555 assert_eq!(predicate_expr.to_string(), expected_expr);
2556
2557 let expr = lit(1).not_eq(col("c1"));
2559 let predicate_expr =
2560 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2561 assert_eq!(predicate_expr.to_string(), expected_expr);
2562
2563 Ok(())
2564 }
2565
2566 #[test]
2567 fn row_group_predicate_gt() -> Result<()> {
2568 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2569 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2570
2571 let expr = col("c1").gt(lit(1));
2573 let predicate_expr =
2574 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2575 assert_eq!(predicate_expr.to_string(), expected_expr);
2576
2577 let expr = lit(1).lt(col("c1"));
2579 let predicate_expr =
2580 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2581 assert_eq!(predicate_expr.to_string(), expected_expr);
2582
2583 Ok(())
2584 }
2585
2586 #[test]
2587 fn row_group_predicate_gt_eq() -> Result<()> {
2588 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2589 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2590
2591 let expr = col("c1").gt_eq(lit(1));
2593 let predicate_expr =
2594 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2595 assert_eq!(predicate_expr.to_string(), expected_expr);
2596 let expr = lit(1).lt_eq(col("c1"));
2598 let predicate_expr =
2599 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2600 assert_eq!(predicate_expr.to_string(), expected_expr);
2601
2602 Ok(())
2603 }
2604
2605 #[test]
2606 fn row_group_predicate_lt() -> Result<()> {
2607 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2608 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2609
2610 let expr = col("c1").lt(lit(1));
2612 let predicate_expr =
2613 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2614 assert_eq!(predicate_expr.to_string(), expected_expr);
2615
2616 let expr = lit(1).gt(col("c1"));
2618 let predicate_expr =
2619 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2620 assert_eq!(predicate_expr.to_string(), expected_expr);
2621
2622 Ok(())
2623 }
2624
2625 #[test]
2626 fn row_group_predicate_lt_eq() -> Result<()> {
2627 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2628 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2629
2630 let expr = col("c1").lt_eq(lit(1));
2632 let predicate_expr =
2633 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2634 assert_eq!(predicate_expr.to_string(), expected_expr);
2635 let expr = lit(1).gt_eq(col("c1"));
2637 let predicate_expr =
2638 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2639 assert_eq!(predicate_expr.to_string(), expected_expr);
2640
2641 Ok(())
2642 }
2643
2644 #[test]
2645 fn row_group_predicate_and() -> Result<()> {
2646 let schema = Schema::new(vec![
2647 Field::new("c1", DataType::Int32, false),
2648 Field::new("c2", DataType::Int32, false),
2649 Field::new("c3", DataType::Int32, false),
2650 ]);
2651 let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2653 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2654 let predicate_expr =
2655 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2656 assert_eq!(predicate_expr.to_string(), expected_expr);
2657
2658 Ok(())
2659 }
2660
2661 #[test]
2662 fn row_group_predicate_or() -> Result<()> {
2663 let schema = Schema::new(vec![
2664 Field::new("c1", DataType::Int32, false),
2665 Field::new("c2", DataType::Int32, false),
2666 ]);
2667 let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2669 let expected_expr = "true";
2670 let predicate_expr =
2671 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2672 assert_eq!(predicate_expr.to_string(), expected_expr);
2673
2674 Ok(())
2675 }
2676
2677 #[test]
2678 fn row_group_predicate_not() -> Result<()> {
2679 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2680 let expected_expr = "true";
2681
2682 let expr = col("c1").not();
2683 let predicate_expr =
2684 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2685 assert_eq!(predicate_expr.to_string(), expected_expr);
2686
2687 Ok(())
2688 }
2689
2690 #[test]
2691 fn row_group_predicate_not_bool() -> Result<()> {
2692 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2693 let expected_expr = "NOT c1_min@0 AND c1_max@1";
2694
2695 let expr = col("c1").not();
2696 let predicate_expr =
2697 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2698 assert_eq!(predicate_expr.to_string(), expected_expr);
2699
2700 Ok(())
2701 }
2702
2703 #[test]
2704 fn row_group_predicate_bool() -> Result<()> {
2705 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2706 let expected_expr = "c1_min@0 OR c1_max@1";
2707
2708 let expr = col("c1");
2709 let predicate_expr =
2710 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2711 assert_eq!(predicate_expr.to_string(), expected_expr);
2712
2713 Ok(())
2714 }
2715
2716 #[test]
2717 fn row_group_predicate_lt_bool() -> Result<()> {
2718 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2719 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2720
2721 let expr = col("c1").lt(lit(true));
2724 let predicate_expr =
2725 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2726 assert_eq!(predicate_expr.to_string(), expected_expr);
2727
2728 Ok(())
2729 }
2730
2731 #[test]
2732 fn row_group_predicate_required_columns() -> Result<()> {
2733 let schema = Schema::new(vec![
2734 Field::new("c1", DataType::Int32, false),
2735 Field::new("c2", DataType::Int32, false),
2736 ]);
2737 let mut required_columns = RequiredColumns::new();
2738 let expr = col("c1")
2740 .lt(lit(1))
2741 .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2742 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2743 let predicate_expr =
2744 test_build_predicate_expression(&expr, &schema, &mut required_columns);
2745 assert_eq!(predicate_expr.to_string(), expected_expr);
2746 println!("required_columns: {required_columns:#?}"); let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2749 assert_eq!(
2750 required_columns.columns[0],
2751 (
2752 phys_expr::Column::new("c1", 0),
2753 StatisticsType::Min,
2754 c1_min_field.with_nullable(true) )
2756 );
2757 let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2759 assert_eq!(
2760 required_columns.columns[1],
2761 (
2762 phys_expr::Column::new("c1", 0),
2763 StatisticsType::NullCount,
2764 c1_null_count_field.with_nullable(true) )
2766 );
2767 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2769 assert_eq!(
2770 required_columns.columns[2],
2771 (
2772 phys_expr::Column::new("c1", 0),
2773 StatisticsType::RowCount,
2774 row_count_field.with_nullable(true) )
2776 );
2777 let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2779 assert_eq!(
2780 required_columns.columns[3],
2781 (
2782 phys_expr::Column::new("c2", 1),
2783 StatisticsType::Min,
2784 c2_min_field.with_nullable(true) )
2786 );
2787 let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2788 assert_eq!(
2789 required_columns.columns[4],
2790 (
2791 phys_expr::Column::new("c2", 1),
2792 StatisticsType::Max,
2793 c2_max_field.with_nullable(true) )
2795 );
2796 let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2798 assert_eq!(
2799 required_columns.columns[5],
2800 (
2801 phys_expr::Column::new("c2", 1),
2802 StatisticsType::NullCount,
2803 c2_null_count_field.with_nullable(true) )
2805 );
2806 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2808 assert_eq!(
2809 required_columns.columns[2],
2810 (
2811 phys_expr::Column::new("c1", 0),
2812 StatisticsType::RowCount,
2813 row_count_field.with_nullable(true) )
2815 );
2816 assert_eq!(required_columns.columns.len(), 6);
2818
2819 Ok(())
2820 }
2821
2822 #[test]
2823 fn row_group_predicate_in_list() -> Result<()> {
2824 let schema = Schema::new(vec![
2825 Field::new("c1", DataType::Int32, false),
2826 Field::new("c2", DataType::Int32, false),
2827 ]);
2828 let expr = Expr::InList(InList::new(
2830 Box::new(col("c1")),
2831 vec![lit(1), lit(2), lit(3)],
2832 false,
2833 ));
2834 let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2835 let predicate_expr =
2836 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2837 assert_eq!(predicate_expr.to_string(), expected_expr);
2838
2839 Ok(())
2840 }
2841
2842 #[test]
2843 fn row_group_predicate_in_list_empty() -> Result<()> {
2844 let schema = Schema::new(vec![
2845 Field::new("c1", DataType::Int32, false),
2846 Field::new("c2", DataType::Int32, false),
2847 ]);
2848 let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2850 let expected_expr = "true";
2851 let predicate_expr =
2852 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2853 assert_eq!(predicate_expr.to_string(), expected_expr);
2854
2855 Ok(())
2856 }
2857
2858 #[test]
2859 fn row_group_predicate_in_list_negated() -> Result<()> {
2860 let schema = Schema::new(vec![
2861 Field::new("c1", DataType::Int32, false),
2862 Field::new("c2", DataType::Int32, false),
2863 ]);
2864 let expr = Expr::InList(InList::new(
2866 Box::new(col("c1")),
2867 vec![lit(1), lit(2), lit(3)],
2868 true,
2869 ));
2870 let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2871 let predicate_expr =
2872 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2873 assert_eq!(predicate_expr.to_string(), expected_expr);
2874
2875 Ok(())
2876 }
2877
2878 #[test]
2879 fn row_group_predicate_between() -> Result<()> {
2880 let schema = Schema::new(vec![
2881 Field::new("c1", DataType::Int32, false),
2882 Field::new("c2", DataType::Int32, false),
2883 ]);
2884
2885 let expr1 = col("c1").between(lit(1), lit(5));
2887
2888 let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2890
2891 let predicate_expr1 =
2892 test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2893
2894 let predicate_expr2 =
2895 test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2896 assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2897
2898 Ok(())
2899 }
2900
2901 #[test]
2902 fn row_group_predicate_between_with_in_list() -> Result<()> {
2903 let schema = Schema::new(vec![
2904 Field::new("c1", DataType::Int32, false),
2905 Field::new("c2", DataType::Int32, false),
2906 ]);
2907 let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2909
2910 let expr2 = col("c2").between(lit(4), lit(5));
2912
2913 let expr3 = expr1.and(expr2);
2915
2916 let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2917 let predicate_expr =
2918 test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2919 assert_eq!(predicate_expr.to_string(), expected_expr);
2920
2921 Ok(())
2922 }
2923
2924 #[test]
2925 fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2926 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2927 let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2931
2932 let expected_expr = "true";
2933 let predicate_expr =
2934 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2935 assert_eq!(predicate_expr.to_string(), expected_expr);
2936
2937 Ok(())
2938 }
2939
2940 #[test]
2941 fn row_group_predicate_cast_int_int() -> Result<()> {
2942 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2943 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
2944
2945 let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
2948 let predicate_expr =
2949 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2950 assert_eq!(predicate_expr.to_string(), expected_expr);
2951
2952 let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
2954 let predicate_expr =
2955 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2956 assert_eq!(predicate_expr.to_string(), expected_expr);
2957
2958 let expected_expr =
2959 "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
2960
2961 let expr =
2963 try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
2964 let predicate_expr =
2965 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2966 assert_eq!(predicate_expr.to_string(), expected_expr);
2967
2968 let expr =
2970 lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
2971 let predicate_expr =
2972 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2973 assert_eq!(predicate_expr.to_string(), expected_expr);
2974
2975 Ok(())
2976 }
2977
2978 #[test]
2979 fn row_group_predicate_cast_string_string() -> Result<()> {
2980 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
2981 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
2982
2983 let expr = cast(col("c1"), DataType::Utf8)
2985 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
2986 let predicate_expr =
2987 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2988 assert_eq!(predicate_expr.to_string(), expected_expr);
2989
2990 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
2992 .eq(cast(col("c1"), DataType::Utf8));
2993 let predicate_expr =
2994 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2995 assert_eq!(predicate_expr.to_string(), expected_expr);
2996
2997 Ok(())
2998 }
2999
3000 #[test]
3001 fn row_group_predicate_cast_string_int() -> Result<()> {
3002 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3003 let expected_expr = "true";
3004
3005 let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3007 let predicate_expr =
3008 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3009 assert_eq!(predicate_expr.to_string(), expected_expr);
3010
3011 let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3013 let predicate_expr =
3014 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3015 assert_eq!(predicate_expr.to_string(), expected_expr);
3016
3017 Ok(())
3018 }
3019
3020 #[test]
3021 fn row_group_predicate_cast_int_string() -> Result<()> {
3022 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3023 let expected_expr = "true";
3024
3025 let expr = cast(col("c1"), DataType::Utf8)
3027 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3028 let predicate_expr =
3029 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3030 assert_eq!(predicate_expr.to_string(), expected_expr);
3031
3032 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3034 .eq(cast(col("c1"), DataType::Utf8));
3035 let predicate_expr =
3036 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3037 assert_eq!(predicate_expr.to_string(), expected_expr);
3038
3039 Ok(())
3040 }
3041
3042 #[test]
3043 fn row_group_predicate_date_date() -> Result<()> {
3044 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3045 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3046
3047 let expr =
3049 cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3050 let predicate_expr =
3051 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3052 assert_eq!(predicate_expr.to_string(), expected_expr);
3053
3054 let expr =
3056 lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3057 let predicate_expr =
3058 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3059 assert_eq!(predicate_expr.to_string(), expected_expr);
3060
3061 Ok(())
3062 }
3063
3064 #[test]
3065 fn row_group_predicate_dict_string_date() -> Result<()> {
3066 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3068 let expected_expr = "true";
3069
3070 let expr = cast(
3072 col("c1"),
3073 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3074 )
3075 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3076 let predicate_expr =
3077 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3078 assert_eq!(predicate_expr.to_string(), expected_expr);
3079
3080 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3082 col("c1"),
3083 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3084 ));
3085 let predicate_expr =
3086 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3087 assert_eq!(predicate_expr.to_string(), expected_expr);
3088
3089 Ok(())
3090 }
3091
3092 #[test]
3093 fn row_group_predicate_date_dict_string() -> Result<()> {
3094 let schema = Schema::new(vec![Field::new(
3096 "c1",
3097 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3098 false,
3099 )]);
3100 let expected_expr = "true";
3101
3102 let expr =
3104 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3105 let predicate_expr =
3106 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3107 assert_eq!(predicate_expr.to_string(), expected_expr);
3108
3109 let expr =
3111 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3112 let predicate_expr =
3113 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3114 assert_eq!(predicate_expr.to_string(), expected_expr);
3115
3116 Ok(())
3117 }
3118
3119 #[test]
3120 fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3121 let schema = Schema::new(vec![Field::new(
3123 "c1",
3124 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3125 false,
3126 )]);
3127
3128 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3130 let predicate_expr =
3131 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3132 let expected_expr =
3133 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3134 assert_eq!(predicate_expr.to_string(), expected_expr);
3135
3136 let expr = cast(
3138 col("c1"),
3139 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3140 )
3141 .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3142 let predicate_expr =
3143 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3144 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3145 assert_eq!(predicate_expr.to_string(), expected_expr);
3146
3147 Ok(())
3148 }
3149
3150 #[test]
3151 fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3152 let schema = Schema::new(vec![Field::new(
3154 "c1",
3155 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3156 false,
3157 )]);
3158 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3159
3160 let expr =
3162 cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3163 let predicate_expr =
3164 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3165 assert_eq!(predicate_expr.to_string(), expected_expr);
3166
3167 Ok(())
3168 }
3169
3170 #[test]
3171 fn row_group_predicate_nested_dict() -> Result<()> {
3172 let schema = Schema::new(vec![Field::new(
3174 "c1",
3175 DataType::Dictionary(
3176 Box::new(DataType::UInt8),
3177 Box::new(DataType::Dictionary(
3178 Box::new(DataType::UInt16),
3179 Box::new(DataType::Utf8),
3180 )),
3181 ),
3182 false,
3183 )]);
3184 let expected_expr =
3185 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3186
3187 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3189 let predicate_expr =
3190 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3191 assert_eq!(predicate_expr.to_string(), expected_expr);
3192
3193 Ok(())
3194 }
3195
3196 #[test]
3197 fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3198 let schema = Schema::new(vec![Field::new(
3200 "c1",
3201 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3202 false,
3203 )]);
3204 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3205
3206 let expr = cast(
3208 col("c1"),
3209 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3210 )
3211 .eq(lit(ScalarValue::Date64(Some(123))));
3212 let predicate_expr =
3213 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3214 assert_eq!(predicate_expr.to_string(), expected_expr);
3215
3216 Ok(())
3217 }
3218
3219 #[test]
3220 fn row_group_predicate_date_string() -> Result<()> {
3221 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3222 let expected_expr = "true";
3223
3224 let expr =
3226 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3227 let predicate_expr =
3228 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3229 assert_eq!(predicate_expr.to_string(), expected_expr);
3230
3231 let expr =
3233 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3234 let predicate_expr =
3235 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3236 assert_eq!(predicate_expr.to_string(), expected_expr);
3237
3238 Ok(())
3239 }
3240
3241 #[test]
3242 fn row_group_predicate_string_date() -> Result<()> {
3243 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3244 let expected_expr = "true";
3245
3246 let expr = cast(col("c1"), DataType::Utf8)
3248 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3249 let predicate_expr =
3250 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3251 assert_eq!(predicate_expr.to_string(), expected_expr);
3252
3253 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3255 .eq(cast(col("c1"), DataType::Utf8));
3256 let predicate_expr =
3257 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3258 assert_eq!(predicate_expr.to_string(), expected_expr);
3259
3260 Ok(())
3261 }
3262
3263 #[test]
3264 fn row_group_predicate_cast_list() -> Result<()> {
3265 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3266 let expr = Expr::InList(InList::new(
3268 Box::new(cast(col("c1"), DataType::Int64)),
3269 vec![
3270 lit(ScalarValue::Int64(Some(1))),
3271 lit(ScalarValue::Int64(Some(2))),
3272 lit(ScalarValue::Int64(Some(3))),
3273 ],
3274 false,
3275 ));
3276 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3277 let predicate_expr =
3278 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3279 assert_eq!(predicate_expr.to_string(), expected_expr);
3280
3281 let expr = Expr::InList(InList::new(
3282 Box::new(cast(col("c1"), DataType::Int64)),
3283 vec![
3284 lit(ScalarValue::Int64(Some(1))),
3285 lit(ScalarValue::Int64(Some(2))),
3286 lit(ScalarValue::Int64(Some(3))),
3287 ],
3288 true,
3289 ));
3290 let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3291 let predicate_expr =
3292 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3293 assert_eq!(predicate_expr.to_string(), expected_expr);
3294
3295 Ok(())
3296 }
3297
3298 #[test]
3299 fn prune_decimal_data() {
3300 let schema = Arc::new(Schema::new(vec![Field::new(
3302 "s1",
3303 DataType::Decimal128(9, 2),
3304 true,
3305 )]));
3306
3307 prune_with_expr(
3308 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3310 &schema,
3311 &TestStatistics::new().with(
3314 "s1",
3315 ContainerStats::new_i32(
3316 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3319 ),
3320 &[false, true, false, true],
3321 );
3322
3323 prune_with_expr(
3324 cast(col("s1"), DataType::Decimal128(14, 3))
3326 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3327 &schema,
3328 &TestStatistics::new().with(
3329 "s1",
3330 ContainerStats::new_i32(
3331 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3334 ),
3335 &[false, true, false, true],
3336 );
3337
3338 prune_with_expr(
3339 try_cast(col("s1"), DataType::Decimal128(14, 3))
3341 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3342 &schema,
3343 &TestStatistics::new().with(
3344 "s1",
3345 ContainerStats::new_i32(
3346 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3349 ),
3350 &[false, true, false, true],
3351 );
3352
3353 let schema = Arc::new(Schema::new(vec![Field::new(
3355 "s1",
3356 DataType::Decimal128(18, 2),
3357 true,
3358 )]));
3359 prune_with_expr(
3360 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3362 &schema,
3363 &TestStatistics::new().with(
3366 "s1",
3367 ContainerStats::new_i64(
3368 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3371 ),
3372 &[false, true, false, true],
3373 );
3374
3375 let schema = Arc::new(Schema::new(vec![Field::new(
3377 "s1",
3378 DataType::Decimal128(23, 2),
3379 true,
3380 )]));
3381
3382 prune_with_expr(
3383 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3385 &schema,
3386 &TestStatistics::new().with(
3387 "s1",
3388 ContainerStats::new_decimal128(
3389 vec![Some(0), Some(400), None, Some(300)], vec![Some(500), Some(600), Some(400), None], 23,
3392 2,
3393 ),
3394 ),
3395 &[false, true, false, true],
3396 );
3397 }
3398
3399 #[test]
3400 fn prune_api() {
3401 let schema = Arc::new(Schema::new(vec![
3402 Field::new("s1", DataType::Utf8, true),
3403 Field::new("s2", DataType::Int32, true),
3404 ]));
3405
3406 let statistics = TestStatistics::new().with(
3407 "s2",
3408 ContainerStats::new_i32(
3409 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), None, None], ),
3412 );
3413 prune_with_expr(
3414 col("s2").gt(lit(5)),
3416 &schema,
3417 &statistics,
3418 &[false, true, true, true],
3423 );
3424
3425 prune_with_expr(
3426 cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3428 &schema,
3429 &statistics,
3430 &[false, true, true, true],
3431 );
3432 }
3433
3434 #[test]
3435 fn prune_not_eq_data() {
3436 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3437
3438 prune_with_expr(
3439 col("s1").not_eq(lit("M")),
3441 &schema,
3442 &TestStatistics::new().with(
3443 "s1",
3444 ContainerStats::new_utf8(
3445 vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], ),
3448 ),
3449 &[true, true, true, false, true, true],
3456 );
3457 }
3458
3459 fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3475 let schema =
3476 Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3477
3478 let statistics = TestStatistics::new().with(
3479 "b1",
3480 ContainerStats::new_bool(
3481 vec![Some(false), Some(false), Some(true), None, Some(false)], vec![Some(false), Some(true), Some(true), None, None], ),
3484 );
3485 let expected_true = vec![false, true, true, true, true];
3486 let expected_false = vec![true, true, false, true, true];
3487
3488 (schema, statistics, expected_true, expected_false)
3489 }
3490
3491 #[test]
3492 fn prune_bool_const_expr() {
3493 let (schema, statistics, _, _) = bool_setup();
3494
3495 prune_with_expr(
3496 lit(true),
3498 &schema,
3499 &statistics,
3500 &[true, true, true, true, true],
3501 );
3502
3503 prune_with_expr(
3504 lit(false),
3506 &schema,
3507 &statistics,
3508 &[false, false, false, false, false],
3509 );
3510 }
3511
3512 #[test]
3513 fn prune_bool_column() {
3514 let (schema, statistics, expected_true, _) = bool_setup();
3515
3516 prune_with_expr(
3517 col("b1"),
3519 &schema,
3520 &statistics,
3521 &expected_true,
3522 );
3523 }
3524
3525 #[test]
3526 fn prune_bool_not_column() {
3527 let (schema, statistics, _, expected_false) = bool_setup();
3528
3529 prune_with_expr(
3530 col("b1").not(),
3532 &schema,
3533 &statistics,
3534 &expected_false,
3535 );
3536 }
3537
3538 #[test]
3539 fn prune_bool_column_eq_true() {
3540 let (schema, statistics, expected_true, _) = bool_setup();
3541
3542 prune_with_expr(
3543 col("b1").eq(lit(true)),
3545 &schema,
3546 &statistics,
3547 &expected_true,
3548 );
3549 }
3550
3551 #[test]
3552 fn prune_bool_not_column_eq_true() {
3553 let (schema, statistics, _, expected_false) = bool_setup();
3554
3555 prune_with_expr(
3556 col("b1").not().eq(lit(true)),
3558 &schema,
3559 &statistics,
3560 &expected_false,
3561 );
3562 }
3563
3564 fn int32_setup() -> (SchemaRef, TestStatistics) {
3574 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3575
3576 let statistics = TestStatistics::new().with(
3577 "i",
3578 ContainerStats::new_i32(
3579 vec![Some(-5), Some(1), Some(-11), None, Some(1)], vec![Some(5), Some(11), Some(-1), None, None], ),
3582 );
3583 (schema, statistics)
3584 }
3585
3586 #[test]
3587 fn prune_int32_col_gt_zero() {
3588 let (schema, statistics) = int32_setup();
3589
3590 let expected_ret = &[true, true, false, true, true];
3597
3598 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3600
3601 prune_with_expr(
3603 Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3604 &schema,
3605 &statistics,
3606 expected_ret,
3607 );
3608 }
3609
3610 #[test]
3611 fn prune_int32_col_lte_zero() {
3612 let (schema, statistics) = int32_setup();
3613
3614 let expected_ret = &[true, false, true, true, false];
3621
3622 prune_with_expr(
3623 col("i").lt_eq(lit(0)),
3625 &schema,
3626 &statistics,
3627 expected_ret,
3628 );
3629
3630 prune_with_expr(
3631 Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3633 &schema,
3634 &statistics,
3635 expected_ret,
3636 );
3637 }
3638
3639 #[test]
3640 fn prune_int32_col_lte_zero_cast() {
3641 let (schema, statistics) = int32_setup();
3642
3643 let expected_ret = &[true, true, true, true, true];
3650
3651 prune_with_expr(
3652 cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3654 &schema,
3655 &statistics,
3656 expected_ret,
3657 );
3658
3659 prune_with_expr(
3660 try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3662 &schema,
3663 &statistics,
3664 expected_ret,
3665 );
3666
3667 prune_with_expr(
3668 cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3670 &schema,
3671 &statistics,
3672 expected_ret,
3673 );
3674
3675 prune_with_expr(
3676 try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3678 &schema,
3679 &statistics,
3680 expected_ret,
3681 );
3682 }
3683
3684 #[test]
3685 fn prune_int32_col_eq_zero() {
3686 let (schema, statistics) = int32_setup();
3687
3688 let expected_ret = &[true, false, false, true, false];
3695
3696 prune_with_expr(
3697 col("i").eq(lit(0)),
3699 &schema,
3700 &statistics,
3701 expected_ret,
3702 );
3703 }
3704
3705 #[test]
3706 fn prune_int32_col_eq_zero_cast() {
3707 let (schema, statistics) = int32_setup();
3708
3709 let expected_ret = &[true, false, false, true, false];
3716
3717 prune_with_expr(
3718 cast(col("i"), DataType::Int64).eq(lit(0i64)),
3719 &schema,
3720 &statistics,
3721 expected_ret,
3722 );
3723
3724 prune_with_expr(
3725 try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3726 &schema,
3727 &statistics,
3728 expected_ret,
3729 );
3730 }
3731
3732 #[test]
3733 fn prune_int32_col_eq_zero_cast_as_str() {
3734 let (schema, statistics) = int32_setup();
3735
3736 let expected_ret = &[true, true, true, true, true];
3746
3747 prune_with_expr(
3748 cast(col("i"), DataType::Utf8).eq(lit("0")),
3749 &schema,
3750 &statistics,
3751 expected_ret,
3752 );
3753 }
3754
3755 #[test]
3756 fn prune_int32_col_lt_neg_one() {
3757 let (schema, statistics) = int32_setup();
3758
3759 let expected_ret = &[true, true, false, true, true];
3766
3767 prune_with_expr(
3768 col("i").gt(lit(-1)),
3770 &schema,
3771 &statistics,
3772 expected_ret,
3773 );
3774
3775 prune_with_expr(
3776 Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3778 &schema,
3779 &statistics,
3780 expected_ret,
3781 );
3782 }
3783
3784 #[test]
3785 fn prune_int32_is_null() {
3786 let (schema, statistics) = int32_setup();
3787
3788 let expected_ret = &[true, true, true, true, true];
3791
3792 prune_with_expr(
3793 col("i").is_null(),
3795 &schema,
3796 &statistics,
3797 expected_ret,
3798 );
3799
3800 let statistics = statistics.with_null_counts(
3802 "i",
3803 vec![
3804 Some(0), Some(1), None, None, Some(0), ],
3810 );
3811
3812 let expected_ret = &[false, true, true, true, false];
3813
3814 prune_with_expr(
3815 col("i").is_null(),
3817 &schema,
3818 &statistics,
3819 expected_ret,
3820 );
3821 }
3822
3823 #[test]
3824 fn prune_int32_column_is_known_all_null() {
3825 let (schema, statistics) = int32_setup();
3826
3827 let expected_ret = &[true, false, true, true, false];
3834
3835 prune_with_expr(
3836 col("i").lt(lit(0)),
3838 &schema,
3839 &statistics,
3840 expected_ret,
3841 );
3842
3843 let statistics = statistics.with_row_counts(
3845 "i",
3846 vec![
3847 Some(10), Some(9), None, Some(4),
3851 Some(10),
3852 ],
3853 );
3854
3855 prune_with_expr(
3857 col("i").lt(lit(0)),
3859 &schema,
3860 &statistics,
3861 expected_ret,
3862 );
3863
3864 let statistics = statistics.with_null_counts(
3866 "i",
3867 vec![
3868 Some(0), Some(1), None, Some(4), Some(0), ],
3874 );
3875
3876 let expected_ret = &[true, false, true, false, false];
3885
3886 prune_with_expr(
3887 col("i").lt(lit(0)),
3889 &schema,
3890 &statistics,
3891 expected_ret,
3892 );
3893 }
3894
3895 #[test]
3896 fn prune_cast_column_scalar() {
3897 let (schema, statistics) = int32_setup();
3899 let expected_ret = &[true, true, false, true, true];
3900
3901 prune_with_expr(
3902 col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3904 &schema,
3905 &statistics,
3906 expected_ret,
3907 );
3908
3909 prune_with_expr(
3910 cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3912 &schema,
3913 &statistics,
3914 expected_ret,
3915 );
3916
3917 prune_with_expr(
3918 try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3920 &schema,
3921 &statistics,
3922 expected_ret,
3923 );
3924
3925 prune_with_expr(
3926 Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3928 .lt(lit(ScalarValue::Int64(Some(0)))),
3929 &schema,
3930 &statistics,
3931 expected_ret,
3932 );
3933 }
3934
3935 #[test]
3936 fn test_increment_utf8() {
3937 assert_eq!(increment_utf8("abc").unwrap(), "abd");
3939 assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3940
3941 assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); assert_eq!(increment_utf8("ß").unwrap(), "à"); assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); assert!(increment_utf8("").is_none());
3957 assert!(increment_utf8("\u{10FFFF}").is_none()); assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3961
3962 assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
3964 assert!(increment_utf8("\u{D7FF}").is_none());
3965
3966 assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
3968 assert!(increment_utf8("\u{FDCF}").is_none());
3969
3970 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3972 assert!(increment_utf8("\u{10FFFF}").is_none()); }
3974
3975 fn utf8_setup() -> (SchemaRef, TestStatistics) {
3988 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3989
3990 let statistics = TestStatistics::new().with(
3991 "s1",
3992 ContainerStats::new_utf8(
3993 vec![
3994 Some("A"),
3995 Some("A"),
3996 Some("N"),
3997 Some("M"),
3998 None,
3999 Some("A"),
4000 Some(""),
4001 Some(""),
4002 Some("AB"),
4003 Some("A\u{10ffff}\u{10ffff}"),
4004 ], vec![
4006 Some("Z"),
4007 Some("L"),
4008 Some("Z"),
4009 Some("M"),
4010 None,
4011 None,
4012 Some("A"),
4013 Some(""),
4014 Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4015 Some("A\u{10ffff}\u{10ffff}"),
4016 ], ),
4018 );
4019 (schema, statistics)
4020 }
4021
4022 #[test]
4023 fn prune_utf8_eq() {
4024 let (schema, statistics) = utf8_setup();
4025
4026 let expr = col("s1").eq(lit("A"));
4027 #[rustfmt::skip]
4028 let expected_ret = &[
4029 true,
4031 true,
4033 false,
4035 false,
4037 true,
4039 true,
4041 true,
4043 false,
4045 false,
4047 false,
4049 ];
4050 prune_with_expr(expr, &schema, &statistics, expected_ret);
4051
4052 let expr = col("s1").eq(lit(""));
4053 #[rustfmt::skip]
4054 let expected_ret = &[
4055 false,
4057 false,
4059 false,
4061 false,
4063 true,
4065 false,
4067 true,
4069 true,
4071 false,
4073 false,
4075 ];
4076 prune_with_expr(expr, &schema, &statistics, expected_ret);
4077 }
4078
4079 #[test]
4080 fn prune_utf8_not_eq() {
4081 let (schema, statistics) = utf8_setup();
4082
4083 let expr = col("s1").not_eq(lit("A"));
4084 #[rustfmt::skip]
4085 let expected_ret = &[
4086 true,
4088 true,
4090 true,
4092 true,
4094 true,
4096 true,
4098 true,
4100 true,
4102 true,
4104 true,
4106 ];
4107 prune_with_expr(expr, &schema, &statistics, expected_ret);
4108
4109 let expr = col("s1").not_eq(lit(""));
4110 #[rustfmt::skip]
4111 let expected_ret = &[
4112 true,
4114 true,
4116 true,
4118 true,
4120 true,
4122 true,
4124 true,
4126 false,
4128 true,
4130 true,
4132 ];
4133 prune_with_expr(expr, &schema, &statistics, expected_ret);
4134 }
4135
4136 #[test]
4137 fn prune_utf8_like_one() {
4138 let (schema, statistics) = utf8_setup();
4139
4140 let expr = col("s1").like(lit("A_"));
4141 #[rustfmt::skip]
4142 let expected_ret = &[
4143 true,
4145 true,
4147 false,
4149 false,
4151 true,
4153 true,
4155 true,
4157 false,
4159 true,
4161 true,
4163 ];
4164 prune_with_expr(expr, &schema, &statistics, expected_ret);
4165
4166 let expr = col("s1").like(lit("_A_"));
4167 #[rustfmt::skip]
4168 let expected_ret = &[
4169 true,
4171 true,
4173 true,
4175 true,
4177 true,
4179 true,
4181 true,
4183 true,
4185 true,
4187 true,
4189 ];
4190 prune_with_expr(expr, &schema, &statistics, expected_ret);
4191
4192 let expr = col("s1").like(lit("_"));
4193 #[rustfmt::skip]
4194 let expected_ret = &[
4195 true,
4197 true,
4199 true,
4201 true,
4203 true,
4205 true,
4207 true,
4209 true,
4211 true,
4213 true,
4215 ];
4216 prune_with_expr(expr, &schema, &statistics, expected_ret);
4217
4218 let expr = col("s1").like(lit(""));
4219 #[rustfmt::skip]
4220 let expected_ret = &[
4221 false,
4223 false,
4225 false,
4227 false,
4229 true,
4231 false,
4233 true,
4235 true,
4237 false,
4239 false,
4241 ];
4242 prune_with_expr(expr, &schema, &statistics, expected_ret);
4243 }
4244
4245 #[test]
4246 fn prune_utf8_like_many() {
4247 let (schema, statistics) = utf8_setup();
4248
4249 let expr = col("s1").like(lit("A%"));
4250 #[rustfmt::skip]
4251 let expected_ret = &[
4252 true,
4254 true,
4256 false,
4258 false,
4260 true,
4262 true,
4264 true,
4266 false,
4268 true,
4270 true,
4272 ];
4273 prune_with_expr(expr, &schema, &statistics, expected_ret);
4274
4275 let expr = col("s1").like(lit("%A%"));
4276 #[rustfmt::skip]
4277 let expected_ret = &[
4278 true,
4280 true,
4282 true,
4284 true,
4286 true,
4288 true,
4290 true,
4292 true,
4294 true,
4296 true,
4298 ];
4299 prune_with_expr(expr, &schema, &statistics, expected_ret);
4300
4301 let expr = col("s1").like(lit("%"));
4302 #[rustfmt::skip]
4303 let expected_ret = &[
4304 true,
4306 true,
4308 true,
4310 true,
4312 true,
4314 true,
4316 true,
4318 true,
4320 true,
4322 true,
4324 ];
4325 prune_with_expr(expr, &schema, &statistics, expected_ret);
4326
4327 let expr = col("s1").like(lit(""));
4328 #[rustfmt::skip]
4329 let expected_ret = &[
4330 false,
4332 false,
4334 false,
4336 false,
4338 true,
4340 false,
4342 true,
4344 true,
4346 false,
4348 false,
4350 ];
4351 prune_with_expr(expr, &schema, &statistics, expected_ret);
4352 }
4353
4354 #[test]
4355 fn prune_utf8_not_like_one() {
4356 let (schema, statistics) = utf8_setup();
4357
4358 let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4359 #[rustfmt::skip]
4360 let expected_ret = &[
4361 true,
4363 true,
4365 true,
4367 true,
4369 true,
4371 true,
4373 true,
4375 true,
4377 true,
4379 true,
4382 ];
4383 prune_with_expr(expr, &schema, &statistics, expected_ret);
4384 }
4385
4386 #[test]
4387 fn prune_utf8_not_like_many() {
4388 let (schema, statistics) = utf8_setup();
4389
4390 let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4391 #[rustfmt::skip]
4392 let expected_ret = &[
4393 true,
4395 true,
4397 true,
4399 true,
4401 true,
4403 true,
4405 true,
4407 true,
4409 true,
4411 false,
4413 ];
4414 prune_with_expr(expr, &schema, &statistics, expected_ret);
4415
4416 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4417 #[rustfmt::skip]
4418 let expected_ret = &[
4419 true,
4421 true,
4423 true,
4425 true,
4427 true,
4429 true,
4431 true,
4433 true,
4435 true,
4437 true,
4439 ];
4440 prune_with_expr(expr, &schema, &statistics, expected_ret);
4441
4442 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4443 #[rustfmt::skip]
4444 let expected_ret = &[
4445 true,
4447 true,
4449 true,
4451 true,
4453 true,
4455 true,
4457 true,
4459 true,
4461 true,
4463 true,
4465 ];
4466 prune_with_expr(expr, &schema, &statistics, expected_ret);
4467
4468 let expr = col("s1").not_like(lit("A\\%%"));
4469 let statistics = TestStatistics::new().with(
4470 "s1",
4471 ContainerStats::new_utf8(
4472 vec![Some("A%a"), Some("A")],
4473 vec![Some("A%c"), Some("A")],
4474 ),
4475 );
4476 let expected_ret = &[false, true];
4477 prune_with_expr(expr, &schema, &statistics, expected_ret);
4478 }
4479
4480 #[test]
4481 fn test_rewrite_expr_to_prunable() {
4482 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4483 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4484
4485 let left_input = col("a");
4487 let left_input = logical2physical(&left_input, &schema);
4488 let right_input = lit(ScalarValue::Int32(Some(12)));
4489 let right_input = logical2physical(&right_input, &schema);
4490 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4491 &left_input,
4492 Operator::Eq,
4493 &right_input,
4494 df_schema.clone(),
4495 )
4496 .unwrap();
4497 assert_eq!(result_left.to_string(), left_input.to_string());
4498 assert_eq!(result_right.to_string(), right_input.to_string());
4499
4500 let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4502 let left_input = logical2physical(&left_input, &schema);
4503 let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4504 let right_input = logical2physical(&right_input, &schema);
4505 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4506 &left_input,
4507 Operator::Gt,
4508 &right_input,
4509 df_schema.clone(),
4510 )
4511 .unwrap();
4512 assert_eq!(result_left.to_string(), left_input.to_string());
4513 assert_eq!(result_right.to_string(), right_input.to_string());
4514
4515 let left_input = try_cast(col("a"), DataType::Int64);
4517 let left_input = logical2physical(&left_input, &schema);
4518 let right_input = lit(ScalarValue::Int64(Some(12)));
4519 let right_input = logical2physical(&right_input, &schema);
4520 let (result_left, _, result_right) =
4521 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4522 .unwrap();
4523 assert_eq!(result_left.to_string(), left_input.to_string());
4524 assert_eq!(result_right.to_string(), right_input.to_string());
4525
4526 }
4528
4529 #[test]
4530 fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4531 struct CustomUnhandledHook;
4532
4533 impl UnhandledPredicateHook for CustomUnhandledHook {
4534 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4538 Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4539 }
4540 }
4541
4542 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4543 let schema_with_b = Schema::new(vec![
4544 Field::new("a", DataType::Int32, true),
4545 Field::new("b", DataType::Int32, true),
4546 ]);
4547
4548 let rewriter = PredicateRewriter::new()
4549 .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4550
4551 let transform_expr = |expr| {
4552 let expr = logical2physical(&expr, &schema_with_b);
4553 rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4554 };
4555
4556 let known_expression = col("a").eq(lit(12));
4558 let known_expression_transformed = PredicateRewriter::new()
4559 .rewrite_predicate_to_statistics_predicate(
4560 &logical2physical(&known_expression, &schema),
4561 &schema,
4562 );
4563
4564 let input = col("b").eq(lit(12));
4566 let expected = logical2physical(&lit(42), &schema);
4567 let transformed = transform_expr(input.clone());
4568 assert_eq!(transformed.to_string(), expected.to_string());
4569
4570 let input = known_expression.clone().and(input.clone());
4572 let expected = phys_expr::BinaryExpr::new(
4573 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4574 Operator::And,
4575 logical2physical(&lit(42), &schema),
4576 );
4577 let transformed = transform_expr(input.clone());
4578 assert_eq!(transformed.to_string(), expected.to_string());
4579
4580 let input = array_has(make_array(vec![lit(1)]), col("a"));
4582 let expected = logical2physical(&lit(42), &schema);
4583 let transformed = transform_expr(input.clone());
4584 assert_eq!(transformed.to_string(), expected.to_string());
4585
4586 let input = known_expression.and(input);
4588 let expected = phys_expr::BinaryExpr::new(
4589 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4590 Operator::And,
4591 logical2physical(&lit(42), &schema),
4592 );
4593 let transformed = transform_expr(input.clone());
4594 assert_eq!(transformed.to_string(), expected.to_string());
4595 }
4596
4597 #[test]
4598 fn test_rewrite_expr_to_prunable_error() {
4599 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4602 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4603 let left_input = cast(col("a"), DataType::Int64);
4604 let left_input = logical2physical(&left_input, &schema);
4605 let right_input = lit(ScalarValue::Int64(Some(12)));
4606 let right_input = logical2physical(&right_input, &schema);
4607 let result = rewrite_expr_to_prunable(
4608 &left_input,
4609 Operator::Gt,
4610 &right_input,
4611 df_schema.clone(),
4612 );
4613 assert!(result.is_err());
4614
4615 let left_input = is_null(col("a"));
4617 let left_input = logical2physical(&left_input, &schema);
4618 let right_input = lit(ScalarValue::Int64(Some(12)));
4619 let right_input = logical2physical(&right_input, &schema);
4620 let result =
4621 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4622 assert!(result.is_err());
4623 }
4625
4626 #[test]
4627 fn prune_with_contained_one_column() {
4628 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4629
4630 let statistics = TestStatistics::new()
4632 .with_contained(
4633 "s1",
4634 [ScalarValue::from("foo")],
4635 [
4636 Some(true),
4638 Some(false),
4640 None,
4642 Some(true),
4644 Some(false),
4646 None,
4648 Some(true),
4650 Some(false),
4652 None,
4654 ],
4655 )
4656 .with_contained(
4657 "s1",
4658 [ScalarValue::from("bar")],
4659 [
4660 Some(true),
4662 Some(true),
4663 Some(true),
4664 Some(false),
4666 Some(false),
4667 Some(false),
4668 None,
4670 None,
4671 None,
4672 ],
4673 )
4674 .with_contained(
4675 "s1",
4678 [ScalarValue::from("foo"), ScalarValue::from("bar")],
4679 [
4680 None,
4682 None,
4683 None,
4684 Some(true),
4686 Some(true),
4687 Some(true),
4688 Some(false),
4690 Some(false),
4691 Some(false),
4692 ],
4693 );
4694
4695 prune_with_expr(
4697 col("s1").eq(lit("foo")),
4698 &schema,
4699 &statistics,
4700 &[true, false, true, true, false, true, true, false, true],
4702 );
4703
4704 prune_with_expr(
4706 col("s1").eq(lit("bar")),
4707 &schema,
4708 &statistics,
4709 &[true, true, true, false, false, false, true, true, true],
4711 );
4712
4713 prune_with_expr(
4715 col("s1").eq(lit("baz")),
4716 &schema,
4717 &statistics,
4718 &[true, true, true, true, true, true, true, true, true],
4720 );
4721
4722 prune_with_expr(
4724 col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4725 &schema,
4726 &statistics,
4727 &[true, true, true, true, true, true, true, true, true],
4731 );
4732
4733 prune_with_expr(
4735 col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4736 &schema,
4737 &statistics,
4738 &[true, true, true, true, true, true, false, false, false],
4740 );
4741
4742 prune_with_expr(
4744 col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4745 &schema,
4746 &statistics,
4747 &[true, true, true, true, true, true, true, true, true],
4749 );
4750
4751 prune_with_expr(
4753 col("s1")
4754 .eq(lit("foo"))
4755 .or(col("s1").eq(lit("bar")))
4756 .or(col("s1").eq(lit("baz"))),
4757 &schema,
4758 &statistics,
4759 &[true, true, true, true, true, true, true, true, true],
4762 );
4763
4764 prune_with_expr(
4766 col("s1").not_eq(lit("foo")),
4767 &schema,
4768 &statistics,
4769 &[false, true, true, false, true, true, false, true, true],
4771 );
4772
4773 prune_with_expr(
4775 col("s1").not_eq(lit("bar")),
4776 &schema,
4777 &statistics,
4778 &[false, false, false, true, true, true, true, true, true],
4780 );
4781
4782 prune_with_expr(
4784 col("s1")
4785 .not_eq(lit("foo"))
4786 .and(col("s1").not_eq(lit("bar"))),
4787 &schema,
4788 &statistics,
4789 &[true, true, true, false, false, false, true, true, true],
4791 );
4792
4793 prune_with_expr(
4795 col("s1")
4796 .not_eq(lit("foo"))
4797 .and(col("s1").not_eq(lit("bar")))
4798 .and(col("s1").not_eq(lit("baz"))),
4799 &schema,
4800 &statistics,
4801 &[true, true, true, true, true, true, true, true, true],
4803 );
4804
4805 prune_with_expr(
4807 col("s1")
4808 .not_eq(lit("foo"))
4809 .or(col("s1").not_eq(lit("bar"))),
4810 &schema,
4811 &statistics,
4812 &[true, true, true, true, true, true, true, true, true],
4814 );
4815
4816 prune_with_expr(
4818 col("s1")
4819 .not_eq(lit("foo"))
4820 .or(col("s1").not_eq(lit("bar")))
4821 .or(col("s1").not_eq(lit("baz"))),
4822 &schema,
4823 &statistics,
4824 &[true, true, true, true, true, true, true, true, true],
4826 );
4827 }
4828
4829 #[test]
4830 fn prune_with_contained_two_columns() {
4831 let schema = Arc::new(Schema::new(vec![
4832 Field::new("s1", DataType::Utf8, true),
4833 Field::new("s2", DataType::Utf8, true),
4834 ]));
4835
4836 let statistics = TestStatistics::new()
4838 .with_contained(
4839 "s1",
4840 [ScalarValue::from("foo")],
4841 [
4842 Some(true),
4844 Some(false),
4846 None,
4848 Some(true),
4850 Some(false),
4852 None,
4854 Some(true),
4856 Some(false),
4858 None,
4860 ],
4861 )
4862 .with_contained(
4863 "s2", [ScalarValue::from("bar")],
4865 [
4866 Some(true),
4868 Some(true),
4869 Some(true),
4870 Some(false),
4872 Some(false),
4873 Some(false),
4874 None,
4876 None,
4877 None,
4878 ],
4879 );
4880
4881 prune_with_expr(
4883 col("s1").eq(lit("foo")),
4884 &schema,
4885 &statistics,
4886 &[true, false, true, true, false, true, true, false, true],
4888 );
4889
4890 let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4892 prune_with_expr(
4893 expr,
4894 &schema,
4895 &statistics,
4896 &[true, true, true, true, true, true, true, true, true],
4898 );
4899
4900 prune_with_expr(
4902 col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4903 &schema,
4904 &statistics,
4905 &[false, false, false, true, false, true, true, false, true],
4909 );
4910
4911 prune_with_expr(
4913 col("s1")
4914 .not_eq(lit("foo"))
4915 .and(col("s2").not_eq(lit("bar"))),
4916 &schema,
4917 &statistics,
4918 &[false, false, false, false, true, true, false, true, true],
4922 );
4923
4924 prune_with_expr(
4926 col("s1")
4927 .not_eq(lit("foo"))
4928 .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4929 &schema,
4930 &statistics,
4931 &[false, true, true, false, true, true, false, true, true],
4934 );
4935
4936 prune_with_expr(
4938 col("s1").like(lit("foo%bar%")),
4939 &schema,
4940 &statistics,
4941 &[true, true, true, true, true, true, true, true, true],
4943 );
4944
4945 prune_with_expr(
4947 col("s1")
4948 .like(lit("foo%bar%"))
4949 .and(col("s2").eq(lit("bar"))),
4950 &schema,
4951 &statistics,
4952 &[true, true, true, false, false, false, true, true, true],
4954 );
4955
4956 prune_with_expr(
4958 col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
4959 &schema,
4960 &statistics,
4961 &[true, true, true, true, true, true, true, true, true],
4964 );
4965 }
4966
4967 #[test]
4968 fn prune_with_range_and_contained() {
4969 let schema = Arc::new(Schema::new(vec![
4971 Field::new("i", DataType::Int32, true),
4972 Field::new("s", DataType::Utf8, true),
4973 ]));
4974
4975 let statistics = TestStatistics::new()
4976 .with(
4977 "i",
4978 ContainerStats::new_i32(
4979 vec![
4983 Some(-5),
4984 Some(10),
4985 None,
4986 Some(-5),
4987 Some(10),
4988 None,
4989 Some(-5),
4990 Some(10),
4991 None,
4992 ], vec![
4994 Some(5),
4995 Some(20),
4996 None,
4997 Some(5),
4998 Some(20),
4999 None,
5000 Some(5),
5001 Some(20),
5002 None,
5003 ], ),
5005 )
5006 .with_contained(
5008 "s",
5009 [ScalarValue::from("foo")],
5010 [
5011 Some(true),
5013 Some(true),
5014 Some(true),
5015 Some(false),
5017 Some(false),
5018 Some(false),
5019 None,
5021 None,
5022 None,
5023 ],
5024 );
5025
5026 prune_with_expr(
5028 col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5029 &schema,
5030 &statistics,
5031 &[true, false, true, false, false, false, true, false, true],
5036 );
5037
5038 prune_with_expr(
5040 col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5041 &schema,
5042 &statistics,
5043 &[false, false, false, true, false, true, true, false, true],
5047 );
5048
5049 prune_with_expr(
5051 col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5052 &schema,
5053 &statistics,
5054 &[true, true, true, true, true, true, true, true, true],
5057 );
5058 }
5059
5060 fn prune_with_expr(
5068 expr: Expr,
5069 schema: &SchemaRef,
5070 statistics: &TestStatistics,
5071 expected: &[bool],
5072 ) {
5073 println!("Pruning with expr: {expr}");
5074 let expr = logical2physical(&expr, schema);
5075 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5076 let result = p.prune(statistics).unwrap();
5077 assert_eq!(result, expected);
5078 }
5079
5080 fn test_build_predicate_expression(
5081 expr: &Expr,
5082 schema: &Schema,
5083 required_columns: &mut RequiredColumns,
5084 ) -> Arc<dyn PhysicalExpr> {
5085 let expr = logical2physical(expr, schema);
5086 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5087 build_predicate_expression(&expr, schema, required_columns, &unhandled_hook)
5088 }
5089
5090 #[test]
5091 fn test_build_predicate_expression_with_false() {
5092 let expr = lit(ScalarValue::Boolean(Some(false)));
5093 let schema = Schema::empty();
5094 let res =
5095 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5096 let expected = logical2physical(&expr, &schema);
5097 assert_eq!(&res, &expected);
5098 }
5099
5100 #[test]
5101 fn test_build_predicate_expression_with_and_false() {
5102 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5103 let expr = and(
5104 col("c1").eq(lit("a")),
5105 lit(ScalarValue::Boolean(Some(false))),
5106 );
5107 let res =
5108 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5109 let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5110 assert_eq!(&res, &expected);
5111 }
5112
5113 #[test]
5114 fn test_build_predicate_expression_with_or_false() {
5115 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5116 let left_expr = col("c1").eq(lit("a"));
5117 let right_expr = lit(ScalarValue::Boolean(Some(false)));
5118 let res = test_build_predicate_expression(
5119 &or(left_expr.clone(), right_expr.clone()),
5120 &schema,
5121 &mut RequiredColumns::new(),
5122 );
5123 let expected =
5124 "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5125 assert_eq!(res.to_string(), expected);
5126 }
5127}