1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
24use itertools::Itertools;
25
26use super::{
27 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
28 RecordBatchStream, SendableRecordBatchStream, Statistics,
29};
30use crate::check_if_same_properties;
31use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
32use crate::common::can_project;
33use crate::execution_plan::CardinalityEffect;
34use crate::filter_pushdown::{
35 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
36 FilterPushdownPropagation, PushedDown,
37};
38use crate::metrics::{MetricBuilder, MetricType};
39use crate::projection::{
40 EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
41 try_embed_projection, update_expr,
42};
43use crate::{
44 DisplayFormatType, ExecutionPlan,
45 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
46};
47
48use arrow::compute::filter_record_batch;
49use arrow::datatypes::{DataType, SchemaRef};
50use arrow::record_batch::RecordBatch;
51use datafusion_common::cast::as_boolean_array;
52use datafusion_common::config::ConfigOptions;
53use datafusion_common::stats::Precision;
54use datafusion_common::{
55 DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
56};
57use datafusion_execution::TaskContext;
58use datafusion_expr::Operator;
59use datafusion_physical_expr::equivalence::ProjectionMapping;
60use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
61use datafusion_physical_expr::intervals::utils::check_support;
62use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
63use datafusion_physical_expr::{
64 AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
65 PhysicalExpr, analyze, conjunction, split_conjunction,
66};
67
68use datafusion_physical_expr_common::physical_expr::fmt_sql;
69use futures::stream::{Stream, StreamExt};
70use log::trace;
71
72const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
73const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
74
75#[derive(Debug, Clone)]
78pub struct FilterExec {
79 predicate: Arc<dyn PhysicalExpr>,
81 input: Arc<dyn ExecutionPlan>,
83 metrics: ExecutionPlanMetricsSet,
85 default_selectivity: u8,
87 cache: Arc<PlanProperties>,
89 projection: Option<ProjectionRef>,
91 batch_size: usize,
93 fetch: Option<usize>,
95}
96
97pub struct FilterExecBuilder {
99 predicate: Arc<dyn PhysicalExpr>,
100 input: Arc<dyn ExecutionPlan>,
101 projection: Option<ProjectionRef>,
102 default_selectivity: u8,
103 batch_size: usize,
104 fetch: Option<usize>,
105}
106
107impl FilterExecBuilder {
108 pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
110 Self {
111 predicate,
112 input,
113 projection: None,
114 default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY,
115 batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
116 fetch: None,
117 }
118 }
119
120 pub fn with_input(mut self, input: Arc<dyn ExecutionPlan>) -> Self {
122 self.input = input;
123 self
124 }
125
126 pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
128 self.predicate = predicate;
129 self
130 }
131
132 pub fn apply_projection(self, projection: Option<Vec<usize>>) -> Result<Self> {
142 let projection = projection.map(Into::into);
143 self.apply_projection_by_ref(projection.as_ref())
144 }
145
146 pub fn apply_projection_by_ref(
148 mut self,
149 projection: Option<&ProjectionRef>,
150 ) -> Result<Self> {
151 can_project(&self.input.schema(), projection.map(AsRef::as_ref))?;
153 self.projection = combine_projections(projection, self.projection.as_ref())?;
154 Ok(self)
155 }
156
157 pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self {
159 self.default_selectivity = default_selectivity;
160 self
161 }
162
163 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
165 self.batch_size = batch_size;
166 self
167 }
168
169 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
171 self.fetch = fetch;
172 self
173 }
174
175 pub fn build(self) -> Result<FilterExec> {
177 match self.predicate.data_type(self.input.schema().as_ref())? {
179 DataType::Boolean => {}
180 other => {
181 return plan_err!(
182 "Filter predicate must return BOOLEAN values, got {other:?}"
183 );
184 }
185 }
186
187 if self.default_selectivity > 100 {
189 return plan_err!(
190 "Default filter selectivity value needs to be less than or equal to 100"
191 );
192 }
193
194 can_project(&self.input.schema(), self.projection.as_deref())?;
196
197 let cache = FilterExec::compute_properties(
199 &self.input,
200 &self.predicate,
201 self.default_selectivity,
202 self.projection.as_deref(),
203 )?;
204
205 Ok(FilterExec {
206 predicate: self.predicate,
207 input: self.input,
208 metrics: ExecutionPlanMetricsSet::new(),
209 default_selectivity: self.default_selectivity,
210 cache: Arc::new(cache),
211 projection: self.projection,
212 batch_size: self.batch_size,
213 fetch: self.fetch,
214 })
215 }
216}
217
218impl From<&FilterExec> for FilterExecBuilder {
219 fn from(exec: &FilterExec) -> Self {
220 Self {
221 predicate: Arc::clone(&exec.predicate),
222 input: Arc::clone(&exec.input),
223 projection: exec.projection.clone(),
224 default_selectivity: exec.default_selectivity,
225 batch_size: exec.batch_size,
226 fetch: exec.fetch,
227 }
232 }
233}
234
235impl FilterExec {
236 pub fn try_new(
238 predicate: Arc<dyn PhysicalExpr>,
239 input: Arc<dyn ExecutionPlan>,
240 ) -> Result<Self> {
241 FilterExecBuilder::new(predicate, input).build()
242 }
243
244 pub fn batch_size(&self) -> usize {
246 self.batch_size
247 }
248
249 pub fn with_default_selectivity(
251 mut self,
252 default_selectivity: u8,
253 ) -> Result<Self, DataFusionError> {
254 if default_selectivity > 100 {
255 return plan_err!(
256 "Default filter selectivity value needs to be less than or equal to 100"
257 );
258 }
259 self.default_selectivity = default_selectivity;
260 Ok(self)
261 }
262
263 #[deprecated(
268 since = "52.0.0",
269 note = "Use FilterExecBuilder::apply_projection instead"
270 )]
271 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
272 let builder = FilterExecBuilder::from(self);
273 builder.apply_projection(projection)?.build()
274 }
275
276 pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
278 Ok(Self {
279 predicate: Arc::clone(&self.predicate),
280 input: Arc::clone(&self.input),
281 metrics: self.metrics.clone(),
282 default_selectivity: self.default_selectivity,
283 cache: Arc::clone(&self.cache),
284 projection: self.projection.clone(),
285 batch_size,
286 fetch: self.fetch,
287 })
288 }
289
290 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
292 &self.predicate
293 }
294
295 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
297 &self.input
298 }
299
300 pub fn default_selectivity(&self) -> u8 {
302 self.default_selectivity
303 }
304
305 pub fn projection(&self) -> &Option<ProjectionRef> {
307 &self.projection
308 }
309
310 fn statistics_helper(
312 schema: &SchemaRef,
313 input_stats: Statistics,
314 predicate: &Arc<dyn PhysicalExpr>,
315 default_selectivity: u8,
316 ) -> Result<Statistics> {
317 if !check_support(predicate, schema) {
318 let selectivity = default_selectivity as f64 / 100.0;
319 let mut stats = input_stats.to_inexact();
320 stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
321 stats.total_byte_size = stats
322 .total_byte_size
323 .with_estimated_selectivity(selectivity);
324 return Ok(stats);
325 }
326
327 let num_rows = input_stats.num_rows;
328 let total_byte_size = input_stats.total_byte_size;
329 let input_analysis_ctx =
330 AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
331
332 let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
333
334 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
336 let num_rows = num_rows.with_estimated_selectivity(selectivity);
337 let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
338
339 let column_statistics = collect_new_statistics(
340 schema,
341 &input_stats.column_statistics,
342 analysis_ctx.boundaries,
343 );
344 Ok(Statistics {
345 num_rows,
346 total_byte_size,
347 column_statistics,
348 })
349 }
350
351 fn expr_constant_or_literal(
355 expr: &Arc<dyn PhysicalExpr>,
356 input_eqs: &EquivalenceProperties,
357 ) -> Option<AcrossPartitions> {
358 input_eqs.is_expr_constant(expr).or_else(|| {
359 expr.as_any()
360 .downcast_ref::<Literal>()
361 .map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
362 })
363 }
364
365 fn extend_constants(
366 input: &Arc<dyn ExecutionPlan>,
367 predicate: &Arc<dyn PhysicalExpr>,
368 ) -> Vec<ConstExpr> {
369 let mut res_constants = Vec::new();
370 let input_eqs = input.equivalence_properties();
371
372 let conjunctions = split_conjunction(predicate);
373 for conjunction in conjunctions {
374 if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
375 && binary.op() == &Operator::Eq
376 {
377 let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
381 let right_const =
382 Self::expr_constant_or_literal(binary.right(), input_eqs);
383
384 if let Some(left_across) = left_const {
385 let across = right_const.unwrap_or(left_across);
389 res_constants
390 .push(ConstExpr::new(Arc::clone(binary.right()), across));
391 } else if let Some(right_across) = right_const {
392 res_constants
394 .push(ConstExpr::new(Arc::clone(binary.left()), right_across));
395 }
396 }
397 }
398 res_constants
399 }
400 fn compute_properties(
402 input: &Arc<dyn ExecutionPlan>,
403 predicate: &Arc<dyn PhysicalExpr>,
404 default_selectivity: u8,
405 projection: Option<&[usize]>,
406 ) -> Result<PlanProperties> {
407 let schema = input.schema();
410 let stats = Self::statistics_helper(
411 &schema,
412 input.partition_statistics(None)?,
413 predicate,
414 default_selectivity,
415 )?;
416 let mut eq_properties = input.equivalence_properties().clone();
417 let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
418 for (lhs, rhs) in equal_pairs {
419 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
420 }
421 let constants = collect_columns(predicate)
424 .into_iter()
425 .filter(|column| stats.column_statistics[column.index()].is_singleton())
426 .map(|column| {
427 let value = stats.column_statistics[column.index()]
428 .min_value
429 .get_value();
430 let expr = Arc::new(column) as _;
431 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
432 });
433 eq_properties.add_constants(constants)?;
435 eq_properties.add_constants(Self::extend_constants(input, predicate))?;
438
439 let mut output_partitioning = input.output_partitioning().clone();
440 if let Some(projection) = projection {
442 let schema = eq_properties.schema();
443 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
444 let out_schema = project_schema(schema, Some(&projection))?;
445 output_partitioning =
446 output_partitioning.project(&projection_mapping, &eq_properties);
447 eq_properties = eq_properties.project(&projection_mapping, out_schema);
448 }
449
450 Ok(PlanProperties::new(
451 eq_properties,
452 output_partitioning,
453 input.pipeline_behavior(),
454 input.boundedness(),
455 ))
456 }
457
458 fn with_new_children_and_same_properties(
459 &self,
460 mut children: Vec<Arc<dyn ExecutionPlan>>,
461 ) -> Self {
462 Self {
463 input: children.swap_remove(0),
464 metrics: ExecutionPlanMetricsSet::new(),
465 ..Self::clone(self)
466 }
467 }
468}
469
470impl DisplayAs for FilterExec {
471 fn fmt_as(
472 &self,
473 t: DisplayFormatType,
474 f: &mut std::fmt::Formatter,
475 ) -> std::fmt::Result {
476 match t {
477 DisplayFormatType::Default | DisplayFormatType::Verbose => {
478 let display_projections = if let Some(projection) =
479 self.projection.as_ref()
480 {
481 format!(
482 ", projection=[{}]",
483 projection
484 .iter()
485 .map(|index| format!(
486 "{}@{}",
487 self.input.schema().fields().get(*index).unwrap().name(),
488 index
489 ))
490 .collect::<Vec<_>>()
491 .join(", ")
492 )
493 } else {
494 "".to_string()
495 };
496 let fetch = self
497 .fetch
498 .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
499 write!(
500 f,
501 "FilterExec: {}{}{}",
502 self.predicate, display_projections, fetch
503 )
504 }
505 DisplayFormatType::TreeRender => {
506 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
507 }
508 }
509 }
510}
511
512impl ExecutionPlan for FilterExec {
513 fn name(&self) -> &'static str {
514 "FilterExec"
515 }
516
517 fn as_any(&self) -> &dyn Any {
519 self
520 }
521
522 fn properties(&self) -> &Arc<PlanProperties> {
523 &self.cache
524 }
525
526 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
527 vec![&self.input]
528 }
529
530 fn maintains_input_order(&self) -> Vec<bool> {
531 vec![true]
533 }
534
535 fn with_new_children(
536 self: Arc<Self>,
537 mut children: Vec<Arc<dyn ExecutionPlan>>,
538 ) -> Result<Arc<dyn ExecutionPlan>> {
539 check_if_same_properties!(self, children);
540 let new_input = children.swap_remove(0);
541 FilterExecBuilder::from(&*self)
542 .with_input(new_input)
543 .build()
544 .map(|e| Arc::new(e) as _)
545 }
546
547 fn execute(
548 &self,
549 partition: usize,
550 context: Arc<TaskContext>,
551 ) -> Result<SendableRecordBatchStream> {
552 trace!(
553 "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
554 partition,
555 context.session_id(),
556 context.task_id()
557 );
558 let metrics = FilterExecMetrics::new(&self.metrics, partition);
559 Ok(Box::pin(FilterExecStream {
560 schema: self.schema(),
561 predicate: Arc::clone(&self.predicate),
562 input: self.input.execute(partition, context)?,
563 metrics,
564 projection: self.projection.clone(),
565 batch_coalescer: LimitedBatchCoalescer::new(
566 self.schema(),
567 self.batch_size,
568 self.fetch,
569 ),
570 }))
571 }
572
573 fn metrics(&self) -> Option<MetricsSet> {
574 Some(self.metrics.clone_inner())
575 }
576
577 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
580 let input_stats = self.input.partition_statistics(partition)?;
581 let stats = Self::statistics_helper(
582 &self.input.schema(),
583 input_stats,
584 self.predicate(),
585 self.default_selectivity,
586 )?;
587 Ok(stats.project(self.projection.as_ref()))
588 }
589
590 fn cardinality_effect(&self) -> CardinalityEffect {
591 CardinalityEffect::LowerEqual
592 }
593
594 fn try_swapping_with_projection(
597 &self,
598 projection: &ProjectionExec,
599 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
600 if projection.expr().len() < projection.input().schema().fields().len() {
602 if let Some(new_predicate) =
604 update_expr(self.predicate(), projection.expr(), false)?
605 {
606 return FilterExecBuilder::from(self)
607 .with_input(make_with_child(projection, self.input())?)
608 .with_predicate(new_predicate)
609 .build()
610 .map(|e| Some(Arc::new(e) as _));
611 }
612 }
613 try_embed_projection(projection, self)
614 }
615
616 fn gather_filters_for_pushdown(
617 &self,
618 phase: FilterPushdownPhase,
619 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
620 _config: &ConfigOptions,
621 ) -> Result<FilterDescription> {
622 if phase != FilterPushdownPhase::Pre {
623 let child =
624 ChildFilterDescription::from_child(&parent_filters, self.input())?;
625 return Ok(FilterDescription::new().with_child(child));
626 }
627
628 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
629 .with_self_filters(
630 split_conjunction(&self.predicate)
631 .into_iter()
632 .cloned()
633 .collect(),
634 );
635
636 Ok(FilterDescription::new().with_child(child))
637 }
638
639 fn handle_child_pushdown_result(
640 &self,
641 phase: FilterPushdownPhase,
642 child_pushdown_result: ChildPushdownResult,
643 _config: &ConfigOptions,
644 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
645 if phase != FilterPushdownPhase::Pre {
646 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
647 }
648 let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
650 child_pushdown_result
651 .parent_filters
652 .iter()
653 .filter_map(|f| {
654 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
655 })
656 .collect();
657
658 if self.projection.is_some() {
662 let input_schema = self.input().schema();
663 unsupported_parent_filters = unsupported_parent_filters
664 .into_iter()
665 .map(|expr| reassign_expr_columns(expr, &input_schema))
666 .collect::<Result<Vec<_>>>()?;
667 }
668
669 let unsupported_self_filters = child_pushdown_result
670 .self_filters
671 .first()
672 .expect("we have exactly one child")
673 .iter()
674 .filter_map(|f| match f.discriminant {
675 PushedDown::Yes => None,
676 PushedDown::No => Some(&f.predicate),
677 })
678 .cloned();
679
680 let unhandled_filters = unsupported_parent_filters
681 .into_iter()
682 .chain(unsupported_self_filters)
683 .collect_vec();
684
685 let filter_input = Arc::clone(self.input());
687 let new_predicate = conjunction(unhandled_filters);
688 let updated_node = if new_predicate.eq(&lit(true)) {
689 match self.projection().as_ref() {
691 Some(projection_indices) => {
692 let filter_child_schema = filter_input.schema();
693 let proj_exprs = projection_indices
694 .iter()
695 .map(|p| {
696 let field = filter_child_schema.field(*p).clone();
697 ProjectionExpr {
698 expr: Arc::new(Column::new(field.name(), *p))
699 as Arc<dyn PhysicalExpr>,
700 alias: field.name().to_string(),
701 }
702 })
703 .collect::<Vec<_>>();
704 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
705 as Arc<dyn ExecutionPlan>)
706 }
707 None => {
708 Some(filter_input)
710 }
711 }
712 } else if new_predicate.eq(&self.predicate) {
713 None
715 } else {
716 let new = FilterExec {
718 predicate: Arc::clone(&new_predicate),
719 input: Arc::clone(&filter_input),
720 metrics: self.metrics.clone(),
721 default_selectivity: self.default_selectivity,
722 cache: Arc::new(Self::compute_properties(
723 &filter_input,
724 &new_predicate,
725 self.default_selectivity,
726 self.projection.as_deref(),
727 )?),
728 projection: self.projection.clone(),
729 batch_size: self.batch_size,
730 fetch: self.fetch,
731 };
732 Some(Arc::new(new) as _)
733 };
734
735 Ok(FilterPushdownPropagation {
736 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
737 updated_node,
738 })
739 }
740
741 fn fetch(&self) -> Option<usize> {
742 self.fetch
743 }
744
745 fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
746 Some(Arc::new(Self {
747 predicate: Arc::clone(&self.predicate),
748 input: Arc::clone(&self.input),
749 metrics: self.metrics.clone(),
750 default_selectivity: self.default_selectivity,
751 cache: Arc::clone(&self.cache),
752 projection: self.projection.clone(),
753 batch_size: self.batch_size,
754 fetch,
755 }))
756 }
757
758 fn with_preserve_order(
759 &self,
760 preserve_order: bool,
761 ) -> Option<Arc<dyn ExecutionPlan>> {
762 self.input
763 .with_preserve_order(preserve_order)
764 .and_then(|new_input| {
765 Arc::new(self.clone())
766 .with_new_children(vec![new_input])
767 .ok()
768 })
769 }
770}
771
772impl EmbeddedProjection for FilterExec {
773 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
774 FilterExecBuilder::from(self)
775 .apply_projection(projection)?
776 .build()
777 }
778}
779
780fn interval_bound_to_precision(
783 bound: ScalarValue,
784 is_exact: bool,
785) -> Precision<ScalarValue> {
786 if bound.is_null() {
787 Precision::Absent
788 } else if is_exact {
789 Precision::Exact(bound)
790 } else {
791 Precision::Inexact(bound)
792 }
793}
794
795fn collect_new_statistics(
800 schema: &SchemaRef,
801 input_column_stats: &[ColumnStatistics],
802 analysis_boundaries: Vec<ExprBoundaries>,
803) -> Vec<ColumnStatistics> {
804 analysis_boundaries
805 .into_iter()
806 .enumerate()
807 .map(
808 |(
809 idx,
810 ExprBoundaries {
811 interval,
812 distinct_count,
813 ..
814 },
815 )| {
816 let Some(interval) = interval else {
817 let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
822 .unwrap_or(ScalarValue::Null);
823 return ColumnStatistics {
824 null_count: Precision::Exact(0),
825 max_value: Precision::Exact(typed_null.clone()),
826 min_value: Precision::Exact(typed_null.clone()),
827 sum_value: Precision::Exact(typed_null),
828 distinct_count: Precision::Exact(0),
829 byte_size: input_column_stats[idx].byte_size,
830 };
831 };
832 let (lower, upper) = interval.into_bounds();
833 let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
834 let min_value = interval_bound_to_precision(lower, is_exact);
835 let max_value = interval_bound_to_precision(upper, is_exact);
836 ColumnStatistics {
837 null_count: input_column_stats[idx].null_count.to_inexact(),
838 max_value,
839 min_value,
840 sum_value: Precision::Absent,
841 distinct_count: distinct_count.to_inexact(),
842 byte_size: input_column_stats[idx].byte_size,
843 }
844 },
845 )
846 .collect()
847}
848
849struct FilterExecStream {
852 schema: SchemaRef,
854 predicate: Arc<dyn PhysicalExpr>,
856 input: SendableRecordBatchStream,
858 metrics: FilterExecMetrics,
860 projection: Option<ProjectionRef>,
862 batch_coalescer: LimitedBatchCoalescer,
864}
865
866struct FilterExecMetrics {
868 baseline_metrics: BaselineMetrics,
870 selectivity: RatioMetrics,
872 }
875
876impl FilterExecMetrics {
877 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
878 Self {
879 baseline_metrics: BaselineMetrics::new(metrics, partition),
880 selectivity: MetricBuilder::new(metrics)
881 .with_type(MetricType::SUMMARY)
882 .ratio_metrics("selectivity", partition),
883 }
884 }
885}
886
887pub fn batch_filter(
888 batch: &RecordBatch,
889 predicate: &Arc<dyn PhysicalExpr>,
890) -> Result<RecordBatch> {
891 filter_and_project(batch, predicate, None)
892}
893
894fn filter_and_project(
895 batch: &RecordBatch,
896 predicate: &Arc<dyn PhysicalExpr>,
897 projection: Option<&Vec<usize>>,
898) -> Result<RecordBatch> {
899 predicate
900 .evaluate(batch)
901 .and_then(|v| v.into_array(batch.num_rows()))
902 .and_then(|array| {
903 Ok(match (as_boolean_array(&array), projection) {
904 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
906 (Ok(filter_array), Some(projection)) => {
907 let projected_batch = batch.project(projection)?;
908 filter_record_batch(&projected_batch, filter_array)?
909 }
910 (Err(_), _) => {
911 return internal_err!(
912 "Cannot create filter_array from non-boolean predicates"
913 );
914 }
915 })
916 })
917}
918
919impl Stream for FilterExecStream {
920 type Item = Result<RecordBatch>;
921
922 fn poll_next(
923 mut self: Pin<&mut Self>,
924 cx: &mut Context<'_>,
925 ) -> Poll<Option<Self::Item>> {
926 let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
927 loop {
928 if let Some(batch) = self.batch_coalescer.next_completed_batch() {
930 self.metrics.selectivity.add_part(batch.num_rows());
931 let poll = Poll::Ready(Some(Ok(batch)));
932 return self.metrics.baseline_metrics.record_poll(poll);
933 }
934
935 if self.batch_coalescer.is_finished() {
936 return Poll::Ready(None);
938 }
939
940 match ready!(self.input.poll_next_unpin(cx)) {
942 None => {
943 self.batch_coalescer.finish()?;
944 }
946 Some(Ok(batch)) => {
947 let timer = elapsed_compute.timer();
948 let status = self.predicate.as_ref()
949 .evaluate(&batch)
950 .and_then(|v| v.into_array(batch.num_rows()))
951 .and_then(|array| {
952 Ok(match self.projection.as_ref() {
953 Some(projection) => {
954 let projected_batch = batch.project(projection)?;
955 (array, projected_batch)
956 },
957 None => (array, batch)
958 })
959 }).and_then(|(array, batch)| {
960 match as_boolean_array(&array) {
961 Ok(filter_array) => {
962 self.metrics.selectivity.add_total(batch.num_rows());
963 let batch = filter_record_batch(&batch, filter_array)?;
965 let state = self.batch_coalescer.push_batch(batch)?;
966 Ok(state)
967 }
968 Err(_) => {
969 internal_err!(
970 "Cannot create filter_array from non-boolean predicates"
971 )
972 }
973 }
974 })?;
975 timer.done();
976
977 match status {
978 PushBatchStatus::Continue => {
979 }
981 PushBatchStatus::LimitReached => {
982 self.batch_coalescer.finish()?;
984 }
986 }
987 }
988
989 other => return Poll::Ready(other),
991 }
992 }
993 }
994
995 fn size_hint(&self) -> (usize, Option<usize>) {
996 self.input.size_hint()
998 }
999}
1000impl RecordBatchStream for FilterExecStream {
1001 fn schema(&self) -> SchemaRef {
1002 Arc::clone(&self.schema)
1003 }
1004}
1005
1006#[deprecated(
1008 since = "51.0.0",
1009 note = "This function will be internal in the future"
1010)]
1011pub fn collect_columns_from_predicate(
1012 predicate: &'_ Arc<dyn PhysicalExpr>,
1013) -> EqualAndNonEqual<'_> {
1014 collect_columns_from_predicate_inner(predicate)
1015}
1016
1017fn collect_columns_from_predicate_inner(
1018 predicate: &'_ Arc<dyn PhysicalExpr>,
1019) -> EqualAndNonEqual<'_> {
1020 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1021 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1022
1023 let predicates = split_conjunction(predicate);
1024 predicates.into_iter().for_each(|p| {
1025 if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
1026 let has_direct_column_operand =
1034 binary.left().as_any().downcast_ref::<Column>().is_some()
1035 || binary.right().as_any().downcast_ref::<Column>().is_some();
1036 if !has_direct_column_operand {
1037 return;
1038 }
1039 match binary.op() {
1040 Operator::Eq => {
1041 eq_predicate_columns.push((binary.left(), binary.right()))
1042 }
1043 Operator::NotEq => {
1044 ne_predicate_columns.push((binary.left(), binary.right()))
1045 }
1046 _ => {}
1047 }
1048 }
1049 });
1050
1051 (eq_predicate_columns, ne_predicate_columns)
1052}
1053
1054pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
1056
1057pub type EqualAndNonEqual<'a> =
1059 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
1060
1061#[cfg(test)]
1062mod tests {
1063 use super::*;
1064 use crate::empty::EmptyExec;
1065 use crate::expressions::*;
1066 use crate::test;
1067 use crate::test::exec::StatisticsExec;
1068 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
1069 use datafusion_common::ScalarValue;
1070
1071 #[tokio::test]
1072 async fn collect_columns_predicates() -> Result<()> {
1073 let schema = test::aggr_test_schema();
1074 let predicate: Arc<dyn PhysicalExpr> = binary(
1075 binary(
1076 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
1077 Operator::And,
1078 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
1079 &schema,
1080 )?,
1081 Operator::And,
1082 binary(
1083 binary(
1084 col("c2", &schema)?,
1085 Operator::Eq,
1086 col("c9", &schema)?,
1087 &schema,
1088 )?,
1089 Operator::And,
1090 binary(
1091 col("c1", &schema)?,
1092 Operator::NotEq,
1093 col("c13", &schema)?,
1094 &schema,
1095 )?,
1096 &schema,
1097 )?,
1098 &schema,
1099 )?;
1100
1101 let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
1102 assert_eq!(2, equal_pairs.len());
1103 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
1104 assert!(equal_pairs[0].1.eq(&lit(4u32)));
1105
1106 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
1107 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
1108
1109 assert_eq!(1, ne_pairs.len());
1110 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
1111 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
1112
1113 Ok(())
1114 }
1115
1116 #[tokio::test]
1117 async fn test_filter_statistics_basic_expr() -> Result<()> {
1118 let bytes_per_row = 4;
1121 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1122 let input = Arc::new(StatisticsExec::new(
1123 Statistics {
1124 num_rows: Precision::Inexact(100),
1125 total_byte_size: Precision::Inexact(100 * bytes_per_row),
1126 column_statistics: vec![ColumnStatistics {
1127 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1128 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1129 ..Default::default()
1130 }],
1131 },
1132 schema.clone(),
1133 ));
1134
1135 let predicate: Arc<dyn PhysicalExpr> =
1137 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1138
1139 let filter: Arc<dyn ExecutionPlan> =
1141 Arc::new(FilterExec::try_new(predicate, input)?);
1142
1143 let statistics = filter.partition_statistics(None)?;
1144 assert_eq!(statistics.num_rows, Precision::Inexact(25));
1145 assert_eq!(
1146 statistics.total_byte_size,
1147 Precision::Inexact(25 * bytes_per_row)
1148 );
1149 assert_eq!(
1150 statistics.column_statistics,
1151 vec![ColumnStatistics {
1152 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1153 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1154 ..Default::default()
1155 }]
1156 );
1157
1158 Ok(())
1159 }
1160
1161 #[tokio::test]
1162 async fn test_filter_statistics_column_level_nested() -> Result<()> {
1163 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1166 let input = Arc::new(StatisticsExec::new(
1167 Statistics {
1168 num_rows: Precision::Inexact(100),
1169 column_statistics: vec![ColumnStatistics {
1170 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1171 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1172 ..Default::default()
1173 }],
1174 total_byte_size: Precision::Absent,
1175 },
1176 schema.clone(),
1177 ));
1178
1179 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1181 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1182 input,
1183 )?);
1184
1185 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1189 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1190 sub_filter,
1191 )?);
1192
1193 let statistics = filter.partition_statistics(None)?;
1194 assert_eq!(statistics.num_rows, Precision::Inexact(16));
1195 assert_eq!(
1196 statistics.column_statistics,
1197 vec![ColumnStatistics {
1198 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1199 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1200 ..Default::default()
1201 }]
1202 );
1203
1204 Ok(())
1205 }
1206
1207 #[tokio::test]
1208 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1209 let schema = Schema::new(vec![
1213 Field::new("a", DataType::Int32, false),
1214 Field::new("b", DataType::Int32, false),
1215 ]);
1216 let input = Arc::new(StatisticsExec::new(
1217 Statistics {
1218 num_rows: Precision::Inexact(100),
1219 column_statistics: vec![
1220 ColumnStatistics {
1221 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1222 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1223 ..Default::default()
1224 },
1225 ColumnStatistics {
1226 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1227 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1228 ..Default::default()
1229 },
1230 ],
1231 total_byte_size: Precision::Absent,
1232 },
1233 schema.clone(),
1234 ));
1235
1236 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1238 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1239 input,
1240 )?);
1241
1242 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1244 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1245 a_lte_25,
1246 )?);
1247
1248 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1250 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1251 b_gt_5,
1252 )?);
1253 let statistics = filter.partition_statistics(None)?;
1254 assert_eq!(statistics.num_rows, Precision::Inexact(2));
1261 assert_eq!(
1262 statistics.column_statistics,
1263 vec![
1264 ColumnStatistics {
1265 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1266 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1267 ..Default::default()
1268 },
1269 ColumnStatistics {
1270 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1271 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1272 ..Default::default()
1273 }
1274 ]
1275 );
1276
1277 Ok(())
1278 }
1279
1280 #[tokio::test]
1281 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1282 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1285 let input = Arc::new(StatisticsExec::new(
1286 Statistics::new_unknown(&schema),
1287 schema.clone(),
1288 ));
1289
1290 let predicate: Arc<dyn PhysicalExpr> =
1292 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1293
1294 let filter: Arc<dyn ExecutionPlan> =
1296 Arc::new(FilterExec::try_new(predicate, input)?);
1297
1298 let statistics = filter.partition_statistics(None)?;
1299 assert_eq!(statistics.num_rows, Precision::Absent);
1300
1301 Ok(())
1302 }
1303
1304 #[tokio::test]
1305 async fn test_filter_statistics_multiple_columns() -> Result<()> {
1306 let schema = Schema::new(vec![
1311 Field::new("a", DataType::Int32, false),
1312 Field::new("b", DataType::Int32, false),
1313 Field::new("c", DataType::Float32, false),
1314 ]);
1315 let input = Arc::new(StatisticsExec::new(
1316 Statistics {
1317 num_rows: Precision::Inexact(1000),
1318 total_byte_size: Precision::Inexact(4000),
1319 column_statistics: vec![
1320 ColumnStatistics {
1321 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1322 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1323 ..Default::default()
1324 },
1325 ColumnStatistics {
1326 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1327 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1328 ..Default::default()
1329 },
1330 ColumnStatistics {
1331 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1332 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1333 ..Default::default()
1334 },
1335 ],
1336 },
1337 schema,
1338 ));
1339 let predicate = Arc::new(BinaryExpr::new(
1341 Arc::new(BinaryExpr::new(
1342 Arc::new(Column::new("a", 0)),
1343 Operator::LtEq,
1344 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1345 )),
1346 Operator::And,
1347 Arc::new(BinaryExpr::new(
1348 Arc::new(BinaryExpr::new(
1349 Arc::new(Column::new("b", 1)),
1350 Operator::Eq,
1351 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1352 )),
1353 Operator::And,
1354 Arc::new(BinaryExpr::new(
1355 Arc::new(BinaryExpr::new(
1356 Arc::new(Column::new("c", 2)),
1357 Operator::LtEq,
1358 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1359 )),
1360 Operator::And,
1361 Arc::new(BinaryExpr::new(
1362 Arc::new(Column::new("a", 0)),
1363 Operator::Gt,
1364 Arc::new(Column::new("b", 1)),
1365 )),
1366 )),
1367 )),
1368 ));
1369 let filter: Arc<dyn ExecutionPlan> =
1370 Arc::new(FilterExec::try_new(predicate, input)?);
1371 let statistics = filter.partition_statistics(None)?;
1372 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1376 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1377 let exp_col_stats = vec![
1378 ColumnStatistics {
1379 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1380 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1381 ..Default::default()
1382 },
1383 ColumnStatistics {
1384 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1385 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1386 ..Default::default()
1387 },
1388 ColumnStatistics {
1389 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1390 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1391 ..Default::default()
1392 },
1393 ];
1394 let _ = exp_col_stats
1395 .into_iter()
1396 .zip(statistics.column_statistics)
1397 .map(|(expected, actual)| {
1398 if let Some(val) = actual.min_value.get_value() {
1399 if val.data_type().is_floating() {
1400 let actual_min = actual.min_value.get_value().unwrap();
1403 let actual_max = actual.max_value.get_value().unwrap();
1404 let expected_min = expected.min_value.get_value().unwrap();
1405 let expected_max = expected.max_value.get_value().unwrap();
1406 let eps = ScalarValue::Float32(Some(1e-6));
1407
1408 assert!(actual_min.sub(expected_min).unwrap() < eps);
1409 assert!(actual_min.sub(expected_min).unwrap() < eps);
1410
1411 assert!(actual_max.sub(expected_max).unwrap() < eps);
1412 assert!(actual_max.sub(expected_max).unwrap() < eps);
1413 } else {
1414 assert_eq!(actual, expected);
1415 }
1416 } else {
1417 assert_eq!(actual, expected);
1418 }
1419 });
1420
1421 Ok(())
1422 }
1423
1424 #[tokio::test]
1425 async fn test_filter_statistics_full_selective() -> Result<()> {
1426 let schema = Schema::new(vec![
1430 Field::new("a", DataType::Int32, false),
1431 Field::new("b", DataType::Int32, false),
1432 ]);
1433 let input = Arc::new(StatisticsExec::new(
1434 Statistics {
1435 num_rows: Precision::Inexact(1000),
1436 total_byte_size: Precision::Inexact(4000),
1437 column_statistics: vec![
1438 ColumnStatistics {
1439 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1440 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1441 ..Default::default()
1442 },
1443 ColumnStatistics {
1444 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1445 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1446 ..Default::default()
1447 },
1448 ],
1449 },
1450 schema,
1451 ));
1452 let predicate = Arc::new(BinaryExpr::new(
1454 Arc::new(BinaryExpr::new(
1455 Arc::new(Column::new("a", 0)),
1456 Operator::Lt,
1457 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1458 )),
1459 Operator::And,
1460 Arc::new(BinaryExpr::new(
1461 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1462 Operator::LtEq,
1463 Arc::new(Column::new("b", 1)),
1464 )),
1465 ));
1466 let expected = input.partition_statistics(None)?.column_statistics;
1468 let filter: Arc<dyn ExecutionPlan> =
1469 Arc::new(FilterExec::try_new(predicate, input)?);
1470 let statistics = filter.partition_statistics(None)?;
1471
1472 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1473 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1474 assert_eq!(statistics.column_statistics, expected);
1475
1476 Ok(())
1477 }
1478
1479 #[tokio::test]
1480 async fn test_filter_statistics_zero_selective() -> Result<()> {
1481 let schema = Schema::new(vec![
1485 Field::new("a", DataType::Int32, false),
1486 Field::new("b", DataType::Int32, false),
1487 ]);
1488 let input = Arc::new(StatisticsExec::new(
1489 Statistics {
1490 num_rows: Precision::Inexact(1000),
1491 total_byte_size: Precision::Inexact(4000),
1492 column_statistics: vec![
1493 ColumnStatistics {
1494 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1495 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1496 ..Default::default()
1497 },
1498 ColumnStatistics {
1499 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1500 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1501 ..Default::default()
1502 },
1503 ],
1504 },
1505 schema,
1506 ));
1507 let predicate = Arc::new(BinaryExpr::new(
1509 Arc::new(BinaryExpr::new(
1510 Arc::new(Column::new("a", 0)),
1511 Operator::Gt,
1512 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1513 )),
1514 Operator::And,
1515 Arc::new(BinaryExpr::new(
1516 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1517 Operator::LtEq,
1518 Arc::new(Column::new("b", 1)),
1519 )),
1520 ));
1521 let filter: Arc<dyn ExecutionPlan> =
1522 Arc::new(FilterExec::try_new(predicate, input)?);
1523 let statistics = filter.partition_statistics(None)?;
1524
1525 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1526 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1527 assert_eq!(
1528 statistics.column_statistics,
1529 vec![
1530 ColumnStatistics {
1531 min_value: Precision::Exact(ScalarValue::Int32(None)),
1532 max_value: Precision::Exact(ScalarValue::Int32(None)),
1533 sum_value: Precision::Exact(ScalarValue::Int32(None)),
1534 distinct_count: Precision::Exact(0),
1535 null_count: Precision::Exact(0),
1536 byte_size: Precision::Absent,
1537 },
1538 ColumnStatistics {
1539 min_value: Precision::Exact(ScalarValue::Int32(None)),
1540 max_value: Precision::Exact(ScalarValue::Int32(None)),
1541 sum_value: Precision::Exact(ScalarValue::Int32(None)),
1542 distinct_count: Precision::Exact(0),
1543 null_count: Precision::Exact(0),
1544 byte_size: Precision::Absent,
1545 },
1546 ]
1547 );
1548
1549 Ok(())
1550 }
1551
1552 #[tokio::test]
1562 async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1563 let schema = Schema::new(vec![
1565 Field::new("a", DataType::Int32, false),
1566 Field::new("b", DataType::Int32, false),
1567 ]);
1568 let input = Arc::new(StatisticsExec::new(
1569 Statistics {
1570 num_rows: Precision::Inexact(1000),
1571 total_byte_size: Precision::Inexact(4000),
1572 column_statistics: vec![
1573 ColumnStatistics {
1574 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1575 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1576 ..Default::default()
1577 },
1578 ColumnStatistics {
1579 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1580 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1581 ..Default::default()
1582 },
1583 ],
1584 },
1585 schema,
1586 ));
1587
1588 let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1590 Arc::new(Column::new("a", 0)),
1591 Operator::Gt,
1592 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1593 ));
1594 let inner_filter: Arc<dyn ExecutionPlan> =
1595 Arc::new(FilterExec::try_new(inner_predicate, input)?);
1596
1597 let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1602 Arc::new(Column::new("a", 0)),
1603 Operator::Eq,
1604 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1605 ));
1606 let outer_filter: Arc<dyn ExecutionPlan> =
1607 Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1608
1609 let statistics = outer_filter.partition_statistics(None)?;
1611 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1612
1613 Ok(())
1614 }
1615
1616 #[tokio::test]
1617 async fn test_filter_statistics_more_inputs() -> Result<()> {
1618 let schema = Schema::new(vec![
1619 Field::new("a", DataType::Int32, false),
1620 Field::new("b", DataType::Int32, false),
1621 ]);
1622 let input = Arc::new(StatisticsExec::new(
1623 Statistics {
1624 num_rows: Precision::Inexact(1000),
1625 total_byte_size: Precision::Inexact(4000),
1626 column_statistics: vec![
1627 ColumnStatistics {
1628 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1629 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1630 ..Default::default()
1631 },
1632 ColumnStatistics {
1633 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1634 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1635 ..Default::default()
1636 },
1637 ],
1638 },
1639 schema,
1640 ));
1641 let predicate = Arc::new(BinaryExpr::new(
1643 Arc::new(Column::new("a", 0)),
1644 Operator::Lt,
1645 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1646 ));
1647 let filter: Arc<dyn ExecutionPlan> =
1648 Arc::new(FilterExec::try_new(predicate, input)?);
1649 let statistics = filter.partition_statistics(None)?;
1650
1651 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1652 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1653 assert_eq!(
1654 statistics.column_statistics,
1655 vec![
1656 ColumnStatistics {
1657 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1658 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1659 ..Default::default()
1660 },
1661 ColumnStatistics {
1662 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1663 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1664 ..Default::default()
1665 },
1666 ]
1667 );
1668
1669 Ok(())
1670 }
1671
1672 #[tokio::test]
1673 async fn test_empty_input_statistics() -> Result<()> {
1674 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1675 let input = Arc::new(StatisticsExec::new(
1676 Statistics::new_unknown(&schema),
1677 schema,
1678 ));
1679 let predicate = Arc::new(BinaryExpr::new(
1681 Arc::new(BinaryExpr::new(
1682 Arc::new(Column::new("a", 0)),
1683 Operator::LtEq,
1684 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1685 )),
1686 Operator::And,
1687 Arc::new(BinaryExpr::new(
1688 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1689 Operator::LtEq,
1690 Arc::new(BinaryExpr::new(
1691 Arc::new(Column::new("a", 0)),
1692 Operator::Minus,
1693 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1694 )),
1695 )),
1696 ));
1697 let filter: Arc<dyn ExecutionPlan> =
1698 Arc::new(FilterExec::try_new(predicate, input)?);
1699 let filter_statistics = filter.partition_statistics(None)?;
1700
1701 let expected_filter_statistics = Statistics {
1702 num_rows: Precision::Absent,
1703 total_byte_size: Precision::Absent,
1704 column_statistics: vec![ColumnStatistics {
1705 null_count: Precision::Absent,
1706 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1707 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1708 sum_value: Precision::Absent,
1709 distinct_count: Precision::Absent,
1710 byte_size: Precision::Absent,
1711 }],
1712 };
1713
1714 assert_eq!(filter_statistics, expected_filter_statistics);
1715
1716 Ok(())
1717 }
1718
1719 #[tokio::test]
1720 async fn test_statistics_with_constant_column() -> Result<()> {
1721 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1722 let input = Arc::new(StatisticsExec::new(
1723 Statistics::new_unknown(&schema),
1724 schema,
1725 ));
1726 let predicate = Arc::new(BinaryExpr::new(
1728 Arc::new(Column::new("a", 0)),
1729 Operator::Eq,
1730 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1731 ));
1732 let filter: Arc<dyn ExecutionPlan> =
1733 Arc::new(FilterExec::try_new(predicate, input)?);
1734 let filter_statistics = filter.partition_statistics(None)?;
1735 assert!(filter_statistics.column_statistics[0].is_singleton());
1737
1738 Ok(())
1739 }
1740
1741 #[tokio::test]
1742 async fn test_validation_filter_selectivity() -> Result<()> {
1743 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1744 let input = Arc::new(StatisticsExec::new(
1745 Statistics::new_unknown(&schema),
1746 schema,
1747 ));
1748 let predicate = Arc::new(BinaryExpr::new(
1750 Arc::new(Column::new("a", 0)),
1751 Operator::Eq,
1752 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1753 ));
1754 let filter = FilterExec::try_new(predicate, input)?;
1755 assert!(filter.with_default_selectivity(120).is_err());
1756 Ok(())
1757 }
1758
1759 #[tokio::test]
1760 async fn test_custom_filter_selectivity() -> Result<()> {
1761 let schema =
1763 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1764 let input = Arc::new(StatisticsExec::new(
1765 Statistics {
1766 num_rows: Precision::Inexact(1000),
1767 total_byte_size: Precision::Inexact(4000),
1768 column_statistics: vec![ColumnStatistics {
1769 ..Default::default()
1770 }],
1771 },
1772 schema,
1773 ));
1774 let predicate = Arc::new(BinaryExpr::new(
1776 Arc::new(Column::new("a", 0)),
1777 Operator::Eq,
1778 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1779 ));
1780 let filter = FilterExec::try_new(predicate, input)?;
1781 let statistics = filter.partition_statistics(None)?;
1782 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1783 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1784 let filter = filter.with_default_selectivity(40)?;
1785 let statistics = filter.partition_statistics(None)?;
1786 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1787 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1788 Ok(())
1789 }
1790
1791 #[test]
1792 fn test_equivalence_properties_union_type() -> Result<()> {
1793 let union_type = DataType::Union(
1794 UnionFields::try_new(
1795 vec![0, 1],
1796 vec![
1797 Field::new("f1", DataType::Int32, true),
1798 Field::new("f2", DataType::Utf8, true),
1799 ],
1800 )
1801 .unwrap(),
1802 UnionMode::Sparse,
1803 );
1804
1805 let schema = Arc::new(Schema::new(vec![
1806 Field::new("c1", DataType::Int32, true),
1807 Field::new("c2", union_type, true),
1808 ]));
1809
1810 let exec = FilterExec::try_new(
1811 binary(
1812 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1813 Operator::And,
1814 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1815 &schema,
1816 )?,
1817 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1818 )?;
1819
1820 exec.partition_statistics(None).unwrap();
1821
1822 Ok(())
1823 }
1824
1825 #[tokio::test]
1826 async fn test_builder_with_projection() -> Result<()> {
1827 let schema = Arc::new(Schema::new(vec![
1829 Field::new("a", DataType::Int32, false),
1830 Field::new("b", DataType::Int32, false),
1831 Field::new("c", DataType::Int32, false),
1832 ]));
1833
1834 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1835
1836 let predicate = Arc::new(BinaryExpr::new(
1838 Arc::new(Column::new("a", 0)),
1839 Operator::Gt,
1840 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1841 ));
1842
1843 let projection = Some(vec![0, 2]);
1845 let filter = FilterExecBuilder::new(predicate, input)
1846 .apply_projection(projection.clone())
1847 .unwrap()
1848 .build()?;
1849
1850 assert_eq!(filter.projection(), &Some([0, 2].into()));
1852
1853 let output_schema = filter.schema();
1855 assert_eq!(output_schema.fields().len(), 2);
1856 assert_eq!(output_schema.field(0).name(), "a");
1857 assert_eq!(output_schema.field(1).name(), "c");
1858
1859 Ok(())
1860 }
1861
1862 #[tokio::test]
1863 async fn test_builder_without_projection() -> Result<()> {
1864 let schema = Arc::new(Schema::new(vec![
1865 Field::new("a", DataType::Int32, false),
1866 Field::new("b", DataType::Int32, false),
1867 ]));
1868
1869 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1870
1871 let predicate = Arc::new(BinaryExpr::new(
1872 Arc::new(Column::new("a", 0)),
1873 Operator::Gt,
1874 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1875 ));
1876
1877 let filter = FilterExecBuilder::new(predicate, input).build()?;
1879
1880 assert!(filter.projection().is_none());
1882
1883 let output_schema = filter.schema();
1885 assert_eq!(output_schema.fields().len(), 2);
1886
1887 Ok(())
1888 }
1889
1890 #[tokio::test]
1891 async fn test_builder_invalid_projection() -> Result<()> {
1892 let schema = Arc::new(Schema::new(vec![
1893 Field::new("a", DataType::Int32, false),
1894 Field::new("b", DataType::Int32, false),
1895 ]));
1896
1897 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1898
1899 let predicate = Arc::new(BinaryExpr::new(
1900 Arc::new(Column::new("a", 0)),
1901 Operator::Gt,
1902 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1903 ));
1904
1905 let result =
1907 FilterExecBuilder::new(predicate, input).apply_projection(Some(vec![0, 5])); assert!(result.is_err());
1911
1912 Ok(())
1913 }
1914
1915 #[tokio::test]
1916 async fn test_builder_vs_with_projection() -> Result<()> {
1917 let schema = Schema::new(vec![
1920 Field::new("a", DataType::Int32, false),
1921 Field::new("b", DataType::Int32, false),
1922 Field::new("c", DataType::Int32, false),
1923 Field::new("d", DataType::Int32, false),
1924 ]);
1925
1926 let input = Arc::new(StatisticsExec::new(
1927 Statistics {
1928 num_rows: Precision::Inexact(1000),
1929 total_byte_size: Precision::Inexact(4000),
1930 column_statistics: vec![
1931 ColumnStatistics {
1932 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1933 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1934 ..Default::default()
1935 },
1936 ColumnStatistics {
1937 ..Default::default()
1938 },
1939 ColumnStatistics {
1940 ..Default::default()
1941 },
1942 ColumnStatistics {
1943 ..Default::default()
1944 },
1945 ],
1946 },
1947 schema,
1948 ));
1949 let input: Arc<dyn ExecutionPlan> = input;
1950
1951 let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1952 Arc::new(Column::new("a", 0)),
1953 Operator::Lt,
1954 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1955 ));
1956
1957 let projection = Some(vec![0, 2]);
1958
1959 let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
1961 .apply_projection(projection.clone())
1962 .unwrap()
1963 .build()?;
1964
1965 let filter2 = FilterExecBuilder::new(predicate, input)
1967 .apply_projection(projection)
1968 .unwrap()
1969 .build()?;
1970
1971 assert_eq!(filter1.schema(), filter2.schema());
1973 assert_eq!(filter1.projection(), filter2.projection());
1974
1975 let stats1 = filter1.partition_statistics(None)?;
1977 let stats2 = filter2.partition_statistics(None)?;
1978 assert_eq!(stats1.num_rows, stats2.num_rows);
1979 assert_eq!(stats1.total_byte_size, stats2.total_byte_size);
1980
1981 Ok(())
1982 }
1983
1984 #[tokio::test]
1985 async fn test_builder_statistics_with_projection() -> Result<()> {
1986 let schema = Schema::new(vec![
1988 Field::new("a", DataType::Int32, false),
1989 Field::new("b", DataType::Int32, false),
1990 Field::new("c", DataType::Int32, false),
1991 ]);
1992
1993 let input = Arc::new(StatisticsExec::new(
1994 Statistics {
1995 num_rows: Precision::Inexact(1000),
1996 total_byte_size: Precision::Inexact(12000),
1997 column_statistics: vec![
1998 ColumnStatistics {
1999 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2000 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2001 ..Default::default()
2002 },
2003 ColumnStatistics {
2004 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
2005 max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2006 ..Default::default()
2007 },
2008 ColumnStatistics {
2009 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
2010 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2011 ..Default::default()
2012 },
2013 ],
2014 },
2015 schema,
2016 ));
2017
2018 let predicate = Arc::new(BinaryExpr::new(
2020 Arc::new(Column::new("a", 0)),
2021 Operator::Lt,
2022 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
2023 ));
2024
2025 let filter = FilterExecBuilder::new(predicate, input)
2026 .apply_projection(Some(vec![0, 2]))
2027 .unwrap()
2028 .build()?;
2029
2030 let statistics = filter.partition_statistics(None)?;
2031
2032 assert!(matches!(statistics.num_rows, Precision::Inexact(_)));
2034
2035 assert_eq!(filter.schema().fields().len(), 2);
2037
2038 Ok(())
2039 }
2040
2041 #[test]
2042 fn test_builder_predicate_validation() -> Result<()> {
2043 let schema = Arc::new(Schema::new(vec![
2045 Field::new("a", DataType::Int32, false),
2046 Field::new("b", DataType::Int32, false),
2047 ]));
2048
2049 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2050
2051 let invalid_predicate = Arc::new(Column::new("a", 0));
2053
2054 let result = FilterExecBuilder::new(invalid_predicate, input)
2056 .apply_projection(Some(vec![0]))
2057 .unwrap()
2058 .build();
2059
2060 assert!(result.is_err());
2061
2062 Ok(())
2063 }
2064
2065 #[tokio::test]
2066 async fn test_builder_projection_composition() -> Result<()> {
2067 let schema = Arc::new(Schema::new(vec![
2071 Field::new("a", DataType::Int32, false),
2072 Field::new("b", DataType::Int32, false),
2073 Field::new("c", DataType::Int32, false),
2074 Field::new("d", DataType::Int32, false),
2075 ]));
2076
2077 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2078
2079 let predicate = Arc::new(BinaryExpr::new(
2081 Arc::new(Column::new("a", 0)),
2082 Operator::Gt,
2083 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2084 ));
2085
2086 let filter = FilterExecBuilder::new(predicate, input)
2090 .apply_projection(Some(vec![0, 2, 3]))?
2091 .apply_projection(Some(vec![0, 2]))?
2092 .build()?;
2093
2094 assert_eq!(filter.projection(), &Some([0, 3].into()));
2096
2097 let output_schema = filter.schema();
2099 assert_eq!(output_schema.fields().len(), 2);
2100 assert_eq!(output_schema.field(0).name(), "a");
2101 assert_eq!(output_schema.field(1).name(), "d");
2102
2103 Ok(())
2104 }
2105
2106 #[tokio::test]
2107 async fn test_builder_projection_composition_none_clears() -> Result<()> {
2108 let schema = Arc::new(Schema::new(vec![
2110 Field::new("a", DataType::Int32, false),
2111 Field::new("b", DataType::Int32, false),
2112 ]));
2113
2114 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2115
2116 let predicate = Arc::new(BinaryExpr::new(
2117 Arc::new(Column::new("a", 0)),
2118 Operator::Gt,
2119 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2120 ));
2121
2122 let filter = FilterExecBuilder::new(predicate, input)
2124 .apply_projection(Some(vec![0]))?
2125 .apply_projection(None)?
2126 .build()?;
2127
2128 assert_eq!(filter.projection(), &None);
2130
2131 let output_schema = filter.schema();
2133 assert_eq!(output_schema.fields().len(), 2);
2134
2135 Ok(())
2136 }
2137
2138 #[test]
2139 fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
2140 let input_schema = Arc::new(Schema::new(vec![
2144 Field::new("a", DataType::Int32, false),
2145 Field::new("b", DataType::Utf8, false),
2146 Field::new("c", DataType::Float64, false),
2147 ]));
2148 let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
2149
2150 let predicate = Arc::new(BinaryExpr::new(
2152 Arc::new(Column::new("a", 0)),
2153 Operator::Gt,
2154 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
2155 ));
2156 let filter = FilterExecBuilder::new(predicate, input)
2157 .apply_projection(Some(vec![2]))?
2158 .build()?;
2159
2160 let output_schema = filter.schema();
2162 assert_eq!(output_schema.fields().len(), 1);
2163 assert_eq!(output_schema.field(0).name(), "c");
2164
2165 let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
2167
2168 let config = ConfigOptions::new();
2169 let desc = filter.gather_filters_for_pushdown(
2170 FilterPushdownPhase::Post,
2171 vec![parent_filter],
2172 &config,
2173 )?;
2174
2175 let parent_filters = desc.parent_filters();
2178 assert_eq!(parent_filters.len(), 1); assert_eq!(parent_filters[0].len(), 1); let remapped = &parent_filters[0][0].predicate;
2181 let display = format!("{remapped}");
2182 assert_eq!(
2183 display, "c@2",
2184 "Post-phase parent filter column index must be remapped \
2185 from output schema (c@0) to input schema (c@2)"
2186 );
2187
2188 Ok(())
2189 }
2190
2191 #[test]
2199 fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
2200 let schema = test::aggr_test_schema();
2201
2202 let complex_expr: Arc<dyn PhysicalExpr> = binary(
2205 col("c2", &schema)?,
2206 Operator::IsDistinctFrom,
2207 lit(0u32),
2208 &schema,
2209 )?;
2210 let predicate: Arc<dyn PhysicalExpr> =
2211 binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
2212
2213 let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2214 assert_eq!(
2215 0,
2216 equal_pairs.len(),
2217 "Should not extract equality pairs where neither side is a Column"
2218 );
2219
2220 let predicate: Arc<dyn PhysicalExpr> =
2222 binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
2223 let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2224 assert_eq!(
2225 1,
2226 equal_pairs.len(),
2227 "Should extract equality pairs where one side is a Column"
2228 );
2229
2230 Ok(())
2231 }
2232
2233 #[tokio::test]
2236 async fn test_filter_statistics_absent_columns_stay_absent() -> Result<()> {
2237 let schema = Schema::new(vec![
2238 Field::new("a", DataType::Int32, false),
2239 Field::new("b", DataType::Int32, false),
2240 ]);
2241 let input = Arc::new(StatisticsExec::new(
2242 Statistics {
2243 num_rows: Precision::Inexact(1000),
2244 total_byte_size: Precision::Absent,
2245 column_statistics: vec![
2246 ColumnStatistics::default(),
2247 ColumnStatistics::default(),
2248 ],
2249 },
2250 schema.clone(),
2251 ));
2252
2253 let predicate = Arc::new(BinaryExpr::new(
2254 Arc::new(Column::new("a", 0)),
2255 Operator::Eq,
2256 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2257 ));
2258 let filter: Arc<dyn ExecutionPlan> =
2259 Arc::new(FilterExec::try_new(predicate, input)?);
2260
2261 let statistics = filter.partition_statistics(None)?;
2262 let col_b_stats = &statistics.column_statistics[1];
2263 assert_eq!(col_b_stats.min_value, Precision::Absent);
2264 assert_eq!(col_b_stats.max_value, Precision::Absent);
2265
2266 Ok(())
2267 }
2268}