1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{ready, Context, Poll};
22
23use itertools::Itertools;
24
25use super::{
26 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27 RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::common::can_project;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33 FilterPushdownPropagation, PushedDown, PushedDownPredicate,
34};
35use crate::metrics::{MetricBuilder, MetricType};
36use crate::projection::{
37 make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
38 ProjectionExec, ProjectionExpr,
39};
40use crate::{
41 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
42 DisplayFormatType, ExecutionPlan,
43};
44
45use arrow::compute::filter_record_batch;
46use arrow::datatypes::{DataType, SchemaRef};
47use arrow::record_batch::RecordBatch;
48use datafusion_common::cast::as_boolean_array;
49use datafusion_common::config::ConfigOptions;
50use datafusion_common::stats::Precision;
51use datafusion_common::{
52 internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue,
53};
54use datafusion_execution::TaskContext;
55use datafusion_expr::Operator;
56use datafusion_physical_expr::equivalence::ProjectionMapping;
57use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column};
58use datafusion_physical_expr::intervals::utils::check_support;
59use datafusion_physical_expr::utils::collect_columns;
60use datafusion_physical_expr::{
61 analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext,
62 ConstExpr, ExprBoundaries, PhysicalExpr,
63};
64
65use datafusion_physical_expr_common::physical_expr::fmt_sql;
66use futures::stream::{Stream, StreamExt};
67use log::trace;
68
69const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
70
71#[derive(Debug, Clone)]
74pub struct FilterExec {
75 predicate: Arc<dyn PhysicalExpr>,
77 input: Arc<dyn ExecutionPlan>,
79 metrics: ExecutionPlanMetricsSet,
81 default_selectivity: u8,
83 cache: PlanProperties,
85 projection: Option<Vec<usize>>,
87}
88
89impl FilterExec {
90 pub fn try_new(
92 predicate: Arc<dyn PhysicalExpr>,
93 input: Arc<dyn ExecutionPlan>,
94 ) -> Result<Self> {
95 match predicate.data_type(input.schema().as_ref())? {
96 DataType::Boolean => {
97 let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
98 let cache = Self::compute_properties(
99 &input,
100 &predicate,
101 default_selectivity,
102 None,
103 )?;
104 Ok(Self {
105 predicate,
106 input: Arc::clone(&input),
107 metrics: ExecutionPlanMetricsSet::new(),
108 default_selectivity,
109 cache,
110 projection: None,
111 })
112 }
113 other => {
114 plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
115 }
116 }
117 }
118
119 pub fn with_default_selectivity(
120 mut self,
121 default_selectivity: u8,
122 ) -> Result<Self, DataFusionError> {
123 if default_selectivity > 100 {
124 return plan_err!(
125 "Default filter selectivity value needs to be less than or equal to 100"
126 );
127 }
128 self.default_selectivity = default_selectivity;
129 Ok(self)
130 }
131
132 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
134 can_project(&self.schema(), projection.as_ref())?;
136
137 let projection = match projection {
138 Some(projection) => match &self.projection {
139 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
140 None => Some(projection),
141 },
142 None => None,
143 };
144
145 let cache = Self::compute_properties(
146 &self.input,
147 &self.predicate,
148 self.default_selectivity,
149 projection.as_ref(),
150 )?;
151 Ok(Self {
152 predicate: Arc::clone(&self.predicate),
153 input: Arc::clone(&self.input),
154 metrics: self.metrics.clone(),
155 default_selectivity: self.default_selectivity,
156 cache,
157 projection,
158 })
159 }
160
161 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
163 &self.predicate
164 }
165
166 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
168 &self.input
169 }
170
171 pub fn default_selectivity(&self) -> u8 {
173 self.default_selectivity
174 }
175
176 pub fn projection(&self) -> Option<&Vec<usize>> {
178 self.projection.as_ref()
179 }
180
181 fn statistics_helper(
183 schema: SchemaRef,
184 input_stats: Statistics,
185 predicate: &Arc<dyn PhysicalExpr>,
186 default_selectivity: u8,
187 ) -> Result<Statistics> {
188 if !check_support(predicate, &schema) {
189 let selectivity = default_selectivity as f64 / 100.0;
190 let mut stats = input_stats.to_inexact();
191 stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
192 stats.total_byte_size = stats
193 .total_byte_size
194 .with_estimated_selectivity(selectivity);
195 return Ok(stats);
196 }
197
198 let num_rows = input_stats.num_rows;
199 let total_byte_size = input_stats.total_byte_size;
200 let input_analysis_ctx = AnalysisContext::try_from_statistics(
201 &schema,
202 &input_stats.column_statistics,
203 )?;
204
205 let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?;
206
207 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
209 let num_rows = num_rows.with_estimated_selectivity(selectivity);
210 let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
211
212 let column_statistics = collect_new_statistics(
213 &input_stats.column_statistics,
214 analysis_ctx.boundaries,
215 );
216 Ok(Statistics {
217 num_rows,
218 total_byte_size,
219 column_statistics,
220 })
221 }
222
223 fn extend_constants(
224 input: &Arc<dyn ExecutionPlan>,
225 predicate: &Arc<dyn PhysicalExpr>,
226 ) -> Vec<ConstExpr> {
227 let mut res_constants = Vec::new();
228 let input_eqs = input.equivalence_properties();
229
230 let conjunctions = split_conjunction(predicate);
231 for conjunction in conjunctions {
232 if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() {
233 if binary.op() == &Operator::Eq {
234 if input_eqs.is_expr_constant(binary.left()).is_some() {
236 let across = input_eqs
237 .is_expr_constant(binary.right())
238 .unwrap_or_default();
239 res_constants
240 .push(ConstExpr::new(Arc::clone(binary.right()), across));
241 } else if input_eqs.is_expr_constant(binary.right()).is_some() {
242 let across = input_eqs
243 .is_expr_constant(binary.left())
244 .unwrap_or_default();
245 res_constants
246 .push(ConstExpr::new(Arc::clone(binary.left()), across));
247 }
248 }
249 }
250 }
251 res_constants
252 }
253 fn compute_properties(
255 input: &Arc<dyn ExecutionPlan>,
256 predicate: &Arc<dyn PhysicalExpr>,
257 default_selectivity: u8,
258 projection: Option<&Vec<usize>>,
259 ) -> Result<PlanProperties> {
260 let stats = Self::statistics_helper(
263 input.schema(),
264 input.partition_statistics(None)?,
265 predicate,
266 default_selectivity,
267 )?;
268 let mut eq_properties = input.equivalence_properties().clone();
269 let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
270 for (lhs, rhs) in equal_pairs {
271 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
272 }
273 let constants = collect_columns(predicate)
276 .into_iter()
277 .filter(|column| stats.column_statistics[column.index()].is_singleton())
278 .map(|column| {
279 let value = stats.column_statistics[column.index()]
280 .min_value
281 .get_value();
282 let expr = Arc::new(column) as _;
283 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
284 });
285 eq_properties.add_constants(constants)?;
287 eq_properties.add_constants(Self::extend_constants(input, predicate))?;
290
291 let mut output_partitioning = input.output_partitioning().clone();
292 if let Some(projection) = projection {
294 let schema = eq_properties.schema();
295 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
296 let out_schema = project_schema(schema, Some(projection))?;
297 output_partitioning =
298 output_partitioning.project(&projection_mapping, &eq_properties);
299 eq_properties = eq_properties.project(&projection_mapping, out_schema);
300 }
301
302 Ok(PlanProperties::new(
303 eq_properties,
304 output_partitioning,
305 input.pipeline_behavior(),
306 input.boundedness(),
307 ))
308 }
309}
310
311impl DisplayAs for FilterExec {
312 fn fmt_as(
313 &self,
314 t: DisplayFormatType,
315 f: &mut std::fmt::Formatter,
316 ) -> std::fmt::Result {
317 match t {
318 DisplayFormatType::Default | DisplayFormatType::Verbose => {
319 let display_projections = if let Some(projection) =
320 self.projection.as_ref()
321 {
322 format!(
323 ", projection=[{}]",
324 projection
325 .iter()
326 .map(|index| format!(
327 "{}@{}",
328 self.input.schema().fields().get(*index).unwrap().name(),
329 index
330 ))
331 .collect::<Vec<_>>()
332 .join(", ")
333 )
334 } else {
335 "".to_string()
336 };
337 write!(f, "FilterExec: {}{}", self.predicate, display_projections)
338 }
339 DisplayFormatType::TreeRender => {
340 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
341 }
342 }
343 }
344}
345
346impl ExecutionPlan for FilterExec {
347 fn name(&self) -> &'static str {
348 "FilterExec"
349 }
350
351 fn as_any(&self) -> &dyn Any {
353 self
354 }
355
356 fn properties(&self) -> &PlanProperties {
357 &self.cache
358 }
359
360 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
361 vec![&self.input]
362 }
363
364 fn maintains_input_order(&self) -> Vec<bool> {
365 vec![true]
367 }
368
369 fn with_new_children(
370 self: Arc<Self>,
371 mut children: Vec<Arc<dyn ExecutionPlan>>,
372 ) -> Result<Arc<dyn ExecutionPlan>> {
373 FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
374 .and_then(|e| {
375 let selectivity = e.default_selectivity();
376 e.with_default_selectivity(selectivity)
377 })
378 .and_then(|e| e.with_projection(self.projection().cloned()))
379 .map(|e| Arc::new(e) as _)
380 }
381
382 fn execute(
383 &self,
384 partition: usize,
385 context: Arc<TaskContext>,
386 ) -> Result<SendableRecordBatchStream> {
387 trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
388 let metrics = FilterExecMetrics::new(&self.metrics, partition);
389 Ok(Box::pin(FilterExecStream {
390 schema: self.schema(),
391 predicate: Arc::clone(&self.predicate),
392 input: self.input.execute(partition, context)?,
393 metrics,
394 projection: self.projection.clone(),
395 }))
396 }
397
398 fn metrics(&self) -> Option<MetricsSet> {
399 Some(self.metrics.clone_inner())
400 }
401
402 fn statistics(&self) -> Result<Statistics> {
405 self.partition_statistics(None)
406 }
407
408 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
409 let input_stats = self.input.partition_statistics(partition)?;
410 let stats = Self::statistics_helper(
411 self.schema(),
412 input_stats,
413 self.predicate(),
414 self.default_selectivity,
415 )?;
416 Ok(stats.project(self.projection.as_ref()))
417 }
418
419 fn cardinality_effect(&self) -> CardinalityEffect {
420 CardinalityEffect::LowerEqual
421 }
422
423 fn try_swapping_with_projection(
426 &self,
427 projection: &ProjectionExec,
428 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
429 if projection.expr().len() < projection.input().schema().fields().len() {
431 if let Some(new_predicate) =
433 update_expr(self.predicate(), projection.expr(), false)?
434 {
435 return FilterExec::try_new(
436 new_predicate,
437 make_with_child(projection, self.input())?,
438 )
439 .and_then(|e| {
440 let selectivity = self.default_selectivity();
441 e.with_default_selectivity(selectivity)
442 })
443 .map(|e| Some(Arc::new(e) as _));
444 }
445 }
446 try_embed_projection(projection, self)
447 }
448
449 fn gather_filters_for_pushdown(
450 &self,
451 phase: FilterPushdownPhase,
452 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
453 _config: &ConfigOptions,
454 ) -> Result<FilterDescription> {
455 if !matches!(phase, FilterPushdownPhase::Pre) {
456 let filter_supports = parent_filters
458 .into_iter()
459 .map(PushedDownPredicate::supported)
460 .collect();
461 return Ok(FilterDescription::new().with_child(ChildFilterDescription {
462 parent_filters: filter_supports,
463 self_filters: vec![],
464 }));
465 }
466
467 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
468 .with_self_filters(
469 split_conjunction(&self.predicate)
470 .into_iter()
471 .cloned()
472 .collect(),
473 );
474
475 Ok(FilterDescription::new().with_child(child))
476 }
477
478 fn handle_child_pushdown_result(
479 &self,
480 phase: FilterPushdownPhase,
481 child_pushdown_result: ChildPushdownResult,
482 _config: &ConfigOptions,
483 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
484 if !matches!(phase, FilterPushdownPhase::Pre) {
485 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
486 }
487 let unsupported_parent_filters =
489 child_pushdown_result.parent_filters.iter().filter_map(|f| {
490 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
491 });
492 let unsupported_self_filters = child_pushdown_result
493 .self_filters
494 .first()
495 .expect("we have exactly one child")
496 .iter()
497 .filter_map(|f| match f.discriminant {
498 PushedDown::Yes => None,
499 PushedDown::No => Some(&f.predicate),
500 })
501 .cloned();
502
503 let unhandled_filters = unsupported_parent_filters
504 .into_iter()
505 .chain(unsupported_self_filters)
506 .collect_vec();
507
508 let filter_input = Arc::clone(self.input());
510 let new_predicate = conjunction(unhandled_filters);
511 let updated_node = if new_predicate.eq(&lit(true)) {
512 match self.projection() {
514 Some(projection_indices) => {
515 let filter_child_schema = filter_input.schema();
516 let proj_exprs = projection_indices
517 .iter()
518 .map(|p| {
519 let field = filter_child_schema.field(*p).clone();
520 ProjectionExpr {
521 expr: Arc::new(Column::new(field.name(), *p))
522 as Arc<dyn PhysicalExpr>,
523 alias: field.name().to_string(),
524 }
525 })
526 .collect::<Vec<_>>();
527 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
528 as Arc<dyn ExecutionPlan>)
529 }
530 None => {
531 Some(filter_input)
533 }
534 }
535 } else if new_predicate.eq(&self.predicate) {
536 None
538 } else {
539 let new = FilterExec {
541 predicate: Arc::clone(&new_predicate),
542 input: Arc::clone(&filter_input),
543 metrics: self.metrics.clone(),
544 default_selectivity: self.default_selectivity,
545 cache: Self::compute_properties(
546 &filter_input,
547 &new_predicate,
548 self.default_selectivity,
549 self.projection.as_ref(),
550 )?,
551 projection: None,
552 };
553 Some(Arc::new(new) as _)
554 };
555
556 Ok(FilterPushdownPropagation {
557 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
558 updated_node,
559 })
560 }
561}
562
563impl EmbeddedProjection for FilterExec {
564 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
565 self.with_projection(projection)
566 }
567}
568
569fn collect_new_statistics(
574 input_column_stats: &[ColumnStatistics],
575 analysis_boundaries: Vec<ExprBoundaries>,
576) -> Vec<ColumnStatistics> {
577 analysis_boundaries
578 .into_iter()
579 .enumerate()
580 .map(
581 |(
582 idx,
583 ExprBoundaries {
584 interval,
585 distinct_count,
586 ..
587 },
588 )| {
589 let Some(interval) = interval else {
590 return ColumnStatistics {
592 null_count: Precision::Exact(0),
593 max_value: Precision::Exact(ScalarValue::Null),
594 min_value: Precision::Exact(ScalarValue::Null),
595 sum_value: Precision::Exact(ScalarValue::Null),
596 distinct_count: Precision::Exact(0),
597 };
598 };
599 let (lower, upper) = interval.into_bounds();
600 let (min_value, max_value) = if lower.eq(&upper) {
601 (Precision::Exact(lower), Precision::Exact(upper))
602 } else {
603 (Precision::Inexact(lower), Precision::Inexact(upper))
604 };
605 ColumnStatistics {
606 null_count: input_column_stats[idx].null_count.to_inexact(),
607 max_value,
608 min_value,
609 sum_value: Precision::Absent,
610 distinct_count: distinct_count.to_inexact(),
611 }
612 },
613 )
614 .collect()
615}
616
617struct FilterExecStream {
620 schema: SchemaRef,
622 predicate: Arc<dyn PhysicalExpr>,
624 input: SendableRecordBatchStream,
626 metrics: FilterExecMetrics,
628 projection: Option<Vec<usize>>,
630}
631
632struct FilterExecMetrics {
634 baseline_metrics: BaselineMetrics,
636 selectivity: RatioMetrics,
638}
639
640impl FilterExecMetrics {
641 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
642 Self {
643 baseline_metrics: BaselineMetrics::new(metrics, partition),
644 selectivity: MetricBuilder::new(metrics)
645 .with_type(MetricType::SUMMARY)
646 .ratio_metrics("selectivity", partition),
647 }
648 }
649}
650
651pub fn batch_filter(
652 batch: &RecordBatch,
653 predicate: &Arc<dyn PhysicalExpr>,
654) -> Result<RecordBatch> {
655 filter_and_project(batch, predicate, None, &batch.schema())
656}
657
658fn filter_and_project(
659 batch: &RecordBatch,
660 predicate: &Arc<dyn PhysicalExpr>,
661 projection: Option<&Vec<usize>>,
662 output_schema: &SchemaRef,
663) -> Result<RecordBatch> {
664 predicate
665 .evaluate(batch)
666 .and_then(|v| v.into_array(batch.num_rows()))
667 .and_then(|array| {
668 Ok(match (as_boolean_array(&array), projection) {
669 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
671 (Ok(filter_array), Some(projection)) => {
672 let projected_columns = projection
673 .iter()
674 .map(|i| Arc::clone(batch.column(*i)))
675 .collect();
676 let projected_batch = RecordBatch::try_new(
677 Arc::clone(output_schema),
678 projected_columns,
679 )?;
680 filter_record_batch(&projected_batch, filter_array)?
681 }
682 (Err(_), _) => {
683 return internal_err!(
684 "Cannot create filter_array from non-boolean predicates"
685 );
686 }
687 })
688 })
689}
690
691impl Stream for FilterExecStream {
692 type Item = Result<RecordBatch>;
693
694 fn poll_next(
695 mut self: Pin<&mut Self>,
696 cx: &mut Context<'_>,
697 ) -> Poll<Option<Self::Item>> {
698 let poll;
699 loop {
700 match ready!(self.input.poll_next_unpin(cx)) {
701 Some(Ok(batch)) => {
702 let timer = self.metrics.baseline_metrics.elapsed_compute().timer();
703 let filtered_batch = filter_and_project(
704 &batch,
705 &self.predicate,
706 self.projection.as_ref(),
707 &self.schema,
708 )?;
709 timer.done();
710
711 self.metrics.selectivity.add_part(filtered_batch.num_rows());
712 self.metrics.selectivity.add_total(batch.num_rows());
713
714 if filtered_batch.num_rows() == 0 {
716 continue;
717 }
718 poll = Poll::Ready(Some(Ok(filtered_batch)));
719 break;
720 }
721 value => {
722 poll = Poll::Ready(value);
723 break;
724 }
725 }
726 }
727 self.metrics.baseline_metrics.record_poll(poll)
728 }
729
730 fn size_hint(&self) -> (usize, Option<usize>) {
731 self.input.size_hint()
733 }
734}
735
736impl RecordBatchStream for FilterExecStream {
737 fn schema(&self) -> SchemaRef {
738 Arc::clone(&self.schema)
739 }
740}
741
742#[deprecated(
744 since = "51.0.0",
745 note = "This function will be internal in the future"
746)]
747pub fn collect_columns_from_predicate(
748 predicate: &'_ Arc<dyn PhysicalExpr>,
749) -> EqualAndNonEqual<'_> {
750 collect_columns_from_predicate_inner(predicate)
751}
752
753fn collect_columns_from_predicate_inner(
754 predicate: &'_ Arc<dyn PhysicalExpr>,
755) -> EqualAndNonEqual<'_> {
756 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
757 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
758
759 let predicates = split_conjunction(predicate);
760 predicates.into_iter().for_each(|p| {
761 if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
762 match binary.op() {
763 Operator::Eq => {
764 eq_predicate_columns.push((binary.left(), binary.right()))
765 }
766 Operator::NotEq => {
767 ne_predicate_columns.push((binary.left(), binary.right()))
768 }
769 _ => {}
770 }
771 }
772 });
773
774 (eq_predicate_columns, ne_predicate_columns)
775}
776
777pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
779
780pub type EqualAndNonEqual<'a> =
782 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787 use crate::empty::EmptyExec;
788 use crate::expressions::*;
789 use crate::test;
790 use crate::test::exec::StatisticsExec;
791 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
792 use datafusion_common::ScalarValue;
793
794 #[tokio::test]
795 async fn collect_columns_predicates() -> Result<()> {
796 let schema = test::aggr_test_schema();
797 let predicate: Arc<dyn PhysicalExpr> = binary(
798 binary(
799 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
800 Operator::And,
801 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
802 &schema,
803 )?,
804 Operator::And,
805 binary(
806 binary(
807 col("c2", &schema)?,
808 Operator::Eq,
809 col("c9", &schema)?,
810 &schema,
811 )?,
812 Operator::And,
813 binary(
814 col("c1", &schema)?,
815 Operator::NotEq,
816 col("c13", &schema)?,
817 &schema,
818 )?,
819 &schema,
820 )?,
821 &schema,
822 )?;
823
824 let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
825 assert_eq!(2, equal_pairs.len());
826 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
827 assert!(equal_pairs[0].1.eq(&lit(4u32)));
828
829 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
830 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
831
832 assert_eq!(1, ne_pairs.len());
833 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
834 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
835
836 Ok(())
837 }
838
839 #[tokio::test]
840 async fn test_filter_statistics_basic_expr() -> Result<()> {
841 let bytes_per_row = 4;
844 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
845 let input = Arc::new(StatisticsExec::new(
846 Statistics {
847 num_rows: Precision::Inexact(100),
848 total_byte_size: Precision::Inexact(100 * bytes_per_row),
849 column_statistics: vec![ColumnStatistics {
850 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
851 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
852 ..Default::default()
853 }],
854 },
855 schema.clone(),
856 ));
857
858 let predicate: Arc<dyn PhysicalExpr> =
860 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
861
862 let filter: Arc<dyn ExecutionPlan> =
864 Arc::new(FilterExec::try_new(predicate, input)?);
865
866 let statistics = filter.partition_statistics(None)?;
867 assert_eq!(statistics.num_rows, Precision::Inexact(25));
868 assert_eq!(
869 statistics.total_byte_size,
870 Precision::Inexact(25 * bytes_per_row)
871 );
872 assert_eq!(
873 statistics.column_statistics,
874 vec![ColumnStatistics {
875 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
876 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
877 ..Default::default()
878 }]
879 );
880
881 Ok(())
882 }
883
884 #[tokio::test]
885 async fn test_filter_statistics_column_level_nested() -> Result<()> {
886 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
889 let input = Arc::new(StatisticsExec::new(
890 Statistics {
891 num_rows: Precision::Inexact(100),
892 column_statistics: vec![ColumnStatistics {
893 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
894 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
895 ..Default::default()
896 }],
897 total_byte_size: Precision::Absent,
898 },
899 schema.clone(),
900 ));
901
902 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
904 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
905 input,
906 )?);
907
908 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
912 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
913 sub_filter,
914 )?);
915
916 let statistics = filter.partition_statistics(None)?;
917 assert_eq!(statistics.num_rows, Precision::Inexact(16));
918 assert_eq!(
919 statistics.column_statistics,
920 vec![ColumnStatistics {
921 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
922 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
923 ..Default::default()
924 }]
925 );
926
927 Ok(())
928 }
929
930 #[tokio::test]
931 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
932 let schema = Schema::new(vec![
936 Field::new("a", DataType::Int32, false),
937 Field::new("b", DataType::Int32, false),
938 ]);
939 let input = Arc::new(StatisticsExec::new(
940 Statistics {
941 num_rows: Precision::Inexact(100),
942 column_statistics: vec![
943 ColumnStatistics {
944 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
945 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
946 ..Default::default()
947 },
948 ColumnStatistics {
949 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
950 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
951 ..Default::default()
952 },
953 ],
954 total_byte_size: Precision::Absent,
955 },
956 schema.clone(),
957 ));
958
959 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
961 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
962 input,
963 )?);
964
965 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
967 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
968 a_lte_25,
969 )?);
970
971 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
973 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
974 b_gt_5,
975 )?);
976 let statistics = filter.partition_statistics(None)?;
977 assert_eq!(statistics.num_rows, Precision::Inexact(2));
984 assert_eq!(
985 statistics.column_statistics,
986 vec![
987 ColumnStatistics {
988 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
989 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
990 ..Default::default()
991 },
992 ColumnStatistics {
993 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
994 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
995 ..Default::default()
996 }
997 ]
998 );
999
1000 Ok(())
1001 }
1002
1003 #[tokio::test]
1004 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1005 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1008 let input = Arc::new(StatisticsExec::new(
1009 Statistics::new_unknown(&schema),
1010 schema.clone(),
1011 ));
1012
1013 let predicate: Arc<dyn PhysicalExpr> =
1015 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1016
1017 let filter: Arc<dyn ExecutionPlan> =
1019 Arc::new(FilterExec::try_new(predicate, input)?);
1020
1021 let statistics = filter.partition_statistics(None)?;
1022 assert_eq!(statistics.num_rows, Precision::Absent);
1023
1024 Ok(())
1025 }
1026
1027 #[tokio::test]
1028 async fn test_filter_statistics_multiple_columns() -> Result<()> {
1029 let schema = Schema::new(vec![
1034 Field::new("a", DataType::Int32, false),
1035 Field::new("b", DataType::Int32, false),
1036 Field::new("c", DataType::Float32, false),
1037 ]);
1038 let input = Arc::new(StatisticsExec::new(
1039 Statistics {
1040 num_rows: Precision::Inexact(1000),
1041 total_byte_size: Precision::Inexact(4000),
1042 column_statistics: vec![
1043 ColumnStatistics {
1044 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1045 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1046 ..Default::default()
1047 },
1048 ColumnStatistics {
1049 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1050 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1051 ..Default::default()
1052 },
1053 ColumnStatistics {
1054 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1055 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1056 ..Default::default()
1057 },
1058 ],
1059 },
1060 schema,
1061 ));
1062 let predicate = Arc::new(BinaryExpr::new(
1064 Arc::new(BinaryExpr::new(
1065 Arc::new(Column::new("a", 0)),
1066 Operator::LtEq,
1067 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1068 )),
1069 Operator::And,
1070 Arc::new(BinaryExpr::new(
1071 Arc::new(BinaryExpr::new(
1072 Arc::new(Column::new("b", 1)),
1073 Operator::Eq,
1074 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1075 )),
1076 Operator::And,
1077 Arc::new(BinaryExpr::new(
1078 Arc::new(BinaryExpr::new(
1079 Arc::new(Column::new("c", 2)),
1080 Operator::LtEq,
1081 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1082 )),
1083 Operator::And,
1084 Arc::new(BinaryExpr::new(
1085 Arc::new(Column::new("a", 0)),
1086 Operator::Gt,
1087 Arc::new(Column::new("b", 1)),
1088 )),
1089 )),
1090 )),
1091 ));
1092 let filter: Arc<dyn ExecutionPlan> =
1093 Arc::new(FilterExec::try_new(predicate, input)?);
1094 let statistics = filter.partition_statistics(None)?;
1095 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1099 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1100 let exp_col_stats = vec![
1101 ColumnStatistics {
1102 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1103 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1104 ..Default::default()
1105 },
1106 ColumnStatistics {
1107 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1108 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1109 ..Default::default()
1110 },
1111 ColumnStatistics {
1112 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1113 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1114 ..Default::default()
1115 },
1116 ];
1117 let _ = exp_col_stats
1118 .into_iter()
1119 .zip(statistics.column_statistics)
1120 .map(|(expected, actual)| {
1121 if let Some(val) = actual.min_value.get_value() {
1122 if val.data_type().is_floating() {
1123 let actual_min = actual.min_value.get_value().unwrap();
1126 let actual_max = actual.max_value.get_value().unwrap();
1127 let expected_min = expected.min_value.get_value().unwrap();
1128 let expected_max = expected.max_value.get_value().unwrap();
1129 let eps = ScalarValue::Float32(Some(1e-6));
1130
1131 assert!(actual_min.sub(expected_min).unwrap() < eps);
1132 assert!(actual_min.sub(expected_min).unwrap() < eps);
1133
1134 assert!(actual_max.sub(expected_max).unwrap() < eps);
1135 assert!(actual_max.sub(expected_max).unwrap() < eps);
1136 } else {
1137 assert_eq!(actual, expected);
1138 }
1139 } else {
1140 assert_eq!(actual, expected);
1141 }
1142 });
1143
1144 Ok(())
1145 }
1146
1147 #[tokio::test]
1148 async fn test_filter_statistics_full_selective() -> Result<()> {
1149 let schema = Schema::new(vec![
1153 Field::new("a", DataType::Int32, false),
1154 Field::new("b", DataType::Int32, false),
1155 ]);
1156 let input = Arc::new(StatisticsExec::new(
1157 Statistics {
1158 num_rows: Precision::Inexact(1000),
1159 total_byte_size: Precision::Inexact(4000),
1160 column_statistics: vec![
1161 ColumnStatistics {
1162 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1163 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1164 ..Default::default()
1165 },
1166 ColumnStatistics {
1167 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1168 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1169 ..Default::default()
1170 },
1171 ],
1172 },
1173 schema,
1174 ));
1175 let predicate = Arc::new(BinaryExpr::new(
1177 Arc::new(BinaryExpr::new(
1178 Arc::new(Column::new("a", 0)),
1179 Operator::Lt,
1180 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1181 )),
1182 Operator::And,
1183 Arc::new(BinaryExpr::new(
1184 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1185 Operator::LtEq,
1186 Arc::new(Column::new("b", 1)),
1187 )),
1188 ));
1189 let expected = input.partition_statistics(None)?.column_statistics;
1191 let filter: Arc<dyn ExecutionPlan> =
1192 Arc::new(FilterExec::try_new(predicate, input)?);
1193 let statistics = filter.partition_statistics(None)?;
1194
1195 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1196 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1197 assert_eq!(statistics.column_statistics, expected);
1198
1199 Ok(())
1200 }
1201
1202 #[tokio::test]
1203 async fn test_filter_statistics_zero_selective() -> Result<()> {
1204 let schema = Schema::new(vec![
1208 Field::new("a", DataType::Int32, false),
1209 Field::new("b", DataType::Int32, false),
1210 ]);
1211 let input = Arc::new(StatisticsExec::new(
1212 Statistics {
1213 num_rows: Precision::Inexact(1000),
1214 total_byte_size: Precision::Inexact(4000),
1215 column_statistics: vec![
1216 ColumnStatistics {
1217 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1218 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1219 ..Default::default()
1220 },
1221 ColumnStatistics {
1222 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1223 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1224 ..Default::default()
1225 },
1226 ],
1227 },
1228 schema,
1229 ));
1230 let predicate = Arc::new(BinaryExpr::new(
1232 Arc::new(BinaryExpr::new(
1233 Arc::new(Column::new("a", 0)),
1234 Operator::Gt,
1235 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1236 )),
1237 Operator::And,
1238 Arc::new(BinaryExpr::new(
1239 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1240 Operator::LtEq,
1241 Arc::new(Column::new("b", 1)),
1242 )),
1243 ));
1244 let filter: Arc<dyn ExecutionPlan> =
1245 Arc::new(FilterExec::try_new(predicate, input)?);
1246 let statistics = filter.partition_statistics(None)?;
1247
1248 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1249 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1250 assert_eq!(
1251 statistics.column_statistics,
1252 vec![
1253 ColumnStatistics {
1254 min_value: Precision::Exact(ScalarValue::Null),
1255 max_value: Precision::Exact(ScalarValue::Null),
1256 sum_value: Precision::Exact(ScalarValue::Null),
1257 distinct_count: Precision::Exact(0),
1258 null_count: Precision::Exact(0),
1259 },
1260 ColumnStatistics {
1261 min_value: Precision::Exact(ScalarValue::Null),
1262 max_value: Precision::Exact(ScalarValue::Null),
1263 sum_value: Precision::Exact(ScalarValue::Null),
1264 distinct_count: Precision::Exact(0),
1265 null_count: Precision::Exact(0),
1266 },
1267 ]
1268 );
1269
1270 Ok(())
1271 }
1272
1273 #[tokio::test]
1274 async fn test_filter_statistics_more_inputs() -> Result<()> {
1275 let schema = Schema::new(vec![
1276 Field::new("a", DataType::Int32, false),
1277 Field::new("b", DataType::Int32, false),
1278 ]);
1279 let input = Arc::new(StatisticsExec::new(
1280 Statistics {
1281 num_rows: Precision::Inexact(1000),
1282 total_byte_size: Precision::Inexact(4000),
1283 column_statistics: vec![
1284 ColumnStatistics {
1285 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1286 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1287 ..Default::default()
1288 },
1289 ColumnStatistics {
1290 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1291 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1292 ..Default::default()
1293 },
1294 ],
1295 },
1296 schema,
1297 ));
1298 let predicate = Arc::new(BinaryExpr::new(
1300 Arc::new(Column::new("a", 0)),
1301 Operator::Lt,
1302 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1303 ));
1304 let filter: Arc<dyn ExecutionPlan> =
1305 Arc::new(FilterExec::try_new(predicate, input)?);
1306 let statistics = filter.partition_statistics(None)?;
1307
1308 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1309 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1310 assert_eq!(
1311 statistics.column_statistics,
1312 vec![
1313 ColumnStatistics {
1314 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1315 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1316 ..Default::default()
1317 },
1318 ColumnStatistics {
1319 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1320 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1321 ..Default::default()
1322 },
1323 ]
1324 );
1325
1326 Ok(())
1327 }
1328
1329 #[tokio::test]
1330 async fn test_empty_input_statistics() -> Result<()> {
1331 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1332 let input = Arc::new(StatisticsExec::new(
1333 Statistics::new_unknown(&schema),
1334 schema,
1335 ));
1336 let predicate = Arc::new(BinaryExpr::new(
1338 Arc::new(BinaryExpr::new(
1339 Arc::new(Column::new("a", 0)),
1340 Operator::LtEq,
1341 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1342 )),
1343 Operator::And,
1344 Arc::new(BinaryExpr::new(
1345 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1346 Operator::LtEq,
1347 Arc::new(BinaryExpr::new(
1348 Arc::new(Column::new("a", 0)),
1349 Operator::Minus,
1350 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1351 )),
1352 )),
1353 ));
1354 let filter: Arc<dyn ExecutionPlan> =
1355 Arc::new(FilterExec::try_new(predicate, input)?);
1356 let filter_statistics = filter.partition_statistics(None)?;
1357
1358 let expected_filter_statistics = Statistics {
1359 num_rows: Precision::Absent,
1360 total_byte_size: Precision::Absent,
1361 column_statistics: vec![ColumnStatistics {
1362 null_count: Precision::Absent,
1363 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1364 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1365 sum_value: Precision::Absent,
1366 distinct_count: Precision::Absent,
1367 }],
1368 };
1369
1370 assert_eq!(filter_statistics, expected_filter_statistics);
1371
1372 Ok(())
1373 }
1374
1375 #[tokio::test]
1376 async fn test_statistics_with_constant_column() -> Result<()> {
1377 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1378 let input = Arc::new(StatisticsExec::new(
1379 Statistics::new_unknown(&schema),
1380 schema,
1381 ));
1382 let predicate = Arc::new(BinaryExpr::new(
1384 Arc::new(Column::new("a", 0)),
1385 Operator::Eq,
1386 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1387 ));
1388 let filter: Arc<dyn ExecutionPlan> =
1389 Arc::new(FilterExec::try_new(predicate, input)?);
1390 let filter_statistics = filter.partition_statistics(None)?;
1391 assert!(filter_statistics.column_statistics[0].is_singleton());
1393
1394 Ok(())
1395 }
1396
1397 #[tokio::test]
1398 async fn test_validation_filter_selectivity() -> Result<()> {
1399 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1400 let input = Arc::new(StatisticsExec::new(
1401 Statistics::new_unknown(&schema),
1402 schema,
1403 ));
1404 let predicate = Arc::new(BinaryExpr::new(
1406 Arc::new(Column::new("a", 0)),
1407 Operator::Eq,
1408 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1409 ));
1410 let filter = FilterExec::try_new(predicate, input)?;
1411 assert!(filter.with_default_selectivity(120).is_err());
1412 Ok(())
1413 }
1414
1415 #[tokio::test]
1416 async fn test_custom_filter_selectivity() -> Result<()> {
1417 let schema =
1419 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1420 let input = Arc::new(StatisticsExec::new(
1421 Statistics {
1422 num_rows: Precision::Inexact(1000),
1423 total_byte_size: Precision::Inexact(4000),
1424 column_statistics: vec![ColumnStatistics {
1425 ..Default::default()
1426 }],
1427 },
1428 schema,
1429 ));
1430 let predicate = Arc::new(BinaryExpr::new(
1432 Arc::new(Column::new("a", 0)),
1433 Operator::Eq,
1434 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1435 ));
1436 let filter = FilterExec::try_new(predicate, input)?;
1437 let statistics = filter.partition_statistics(None)?;
1438 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1439 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1440 let filter = filter.with_default_selectivity(40)?;
1441 let statistics = filter.partition_statistics(None)?;
1442 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1443 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1444 Ok(())
1445 }
1446
1447 #[test]
1448 fn test_equivalence_properties_union_type() -> Result<()> {
1449 let union_type = DataType::Union(
1450 UnionFields::new(
1451 vec![0, 1],
1452 vec![
1453 Field::new("f1", DataType::Int32, true),
1454 Field::new("f2", DataType::Utf8, true),
1455 ],
1456 ),
1457 UnionMode::Sparse,
1458 );
1459
1460 let schema = Arc::new(Schema::new(vec![
1461 Field::new("c1", DataType::Int32, true),
1462 Field::new("c2", union_type, true),
1463 ]));
1464
1465 let exec = FilterExec::try_new(
1466 binary(
1467 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1468 Operator::And,
1469 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1470 &schema,
1471 )?,
1472 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1473 )?;
1474
1475 exec.partition_statistics(None).unwrap();
1476
1477 Ok(())
1478 }
1479}