1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27 RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
30use crate::common::can_project;
31use crate::execution_plan::CardinalityEffect;
32use crate::filter_pushdown::{
33 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
34 FilterPushdownPropagation, PushedDown,
35};
36use crate::metrics::{MetricBuilder, MetricType};
37use crate::projection::{
38 EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
39 try_embed_projection, update_expr,
40};
41use crate::{
42 DisplayFormatType, ExecutionPlan,
43 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
44};
45
46use arrow::compute::filter_record_batch;
47use arrow::datatypes::{DataType, SchemaRef};
48use arrow::record_batch::RecordBatch;
49use datafusion_common::cast::as_boolean_array;
50use datafusion_common::config::ConfigOptions;
51use datafusion_common::stats::Precision;
52use datafusion_common::{
53 DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
54};
55use datafusion_execution::TaskContext;
56use datafusion_expr::Operator;
57use datafusion_physical_expr::equivalence::ProjectionMapping;
58use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
59use datafusion_physical_expr::intervals::utils::check_support;
60use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
61use datafusion_physical_expr::{
62 AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
63 PhysicalExpr, analyze, conjunction, split_conjunction,
64};
65
66use datafusion_physical_expr_common::physical_expr::fmt_sql;
67use futures::stream::{Stream, StreamExt};
68use log::trace;
69
70const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
71const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
72
73#[derive(Debug, Clone)]
76pub struct FilterExec {
77 predicate: Arc<dyn PhysicalExpr>,
79 input: Arc<dyn ExecutionPlan>,
81 metrics: ExecutionPlanMetricsSet,
83 default_selectivity: u8,
85 cache: PlanProperties,
87 projection: Option<Vec<usize>>,
89 batch_size: usize,
91 fetch: Option<usize>,
93}
94
95impl FilterExec {
96 #[expect(clippy::needless_pass_by_value)]
98 pub fn try_new(
99 predicate: Arc<dyn PhysicalExpr>,
100 input: Arc<dyn ExecutionPlan>,
101 ) -> Result<Self> {
102 match predicate.data_type(input.schema().as_ref())? {
103 DataType::Boolean => {
104 let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
105 let cache = Self::compute_properties(
106 &input,
107 &predicate,
108 default_selectivity,
109 None,
110 )?;
111 Ok(Self {
112 predicate,
113 input: Arc::clone(&input),
114 metrics: ExecutionPlanMetricsSet::new(),
115 default_selectivity,
116 cache,
117 projection: None,
118 batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119 fetch: None,
120 })
121 }
122 other => {
123 plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
124 }
125 }
126 }
127
128 pub fn with_default_selectivity(
129 mut self,
130 default_selectivity: u8,
131 ) -> Result<Self, DataFusionError> {
132 if default_selectivity > 100 {
133 return plan_err!(
134 "Default filter selectivity value needs to be less than or equal to 100"
135 );
136 }
137 self.default_selectivity = default_selectivity;
138 Ok(self)
139 }
140
141 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
143 can_project(&self.schema(), projection.as_ref())?;
145
146 let projection = match projection {
147 Some(projection) => match &self.projection {
148 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
149 None => Some(projection),
150 },
151 None => None,
152 };
153
154 let cache = Self::compute_properties(
155 &self.input,
156 &self.predicate,
157 self.default_selectivity,
158 projection.as_ref(),
159 )?;
160 Ok(Self {
161 predicate: Arc::clone(&self.predicate),
162 input: Arc::clone(&self.input),
163 metrics: self.metrics.clone(),
164 default_selectivity: self.default_selectivity,
165 cache,
166 projection,
167 batch_size: self.batch_size,
168 fetch: self.fetch,
169 })
170 }
171
172 pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
173 Ok(Self {
174 predicate: Arc::clone(&self.predicate),
175 input: Arc::clone(&self.input),
176 metrics: self.metrics.clone(),
177 default_selectivity: self.default_selectivity,
178 cache: self.cache.clone(),
179 projection: self.projection.clone(),
180 batch_size,
181 fetch: self.fetch,
182 })
183 }
184
185 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
187 &self.predicate
188 }
189
190 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
192 &self.input
193 }
194
195 pub fn default_selectivity(&self) -> u8 {
197 self.default_selectivity
198 }
199
200 pub fn projection(&self) -> Option<&Vec<usize>> {
202 self.projection.as_ref()
203 }
204
205 fn statistics_helper(
207 schema: &SchemaRef,
208 input_stats: Statistics,
209 predicate: &Arc<dyn PhysicalExpr>,
210 default_selectivity: u8,
211 ) -> Result<Statistics> {
212 if !check_support(predicate, schema) {
213 let selectivity = default_selectivity as f64 / 100.0;
214 let mut stats = input_stats.to_inexact();
215 stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
216 stats.total_byte_size = stats
217 .total_byte_size
218 .with_estimated_selectivity(selectivity);
219 return Ok(stats);
220 }
221
222 let num_rows = input_stats.num_rows;
223 let total_byte_size = input_stats.total_byte_size;
224 let input_analysis_ctx =
225 AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
226
227 let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
228
229 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
231 let num_rows = num_rows.with_estimated_selectivity(selectivity);
232 let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
233
234 let column_statistics = collect_new_statistics(
235 schema,
236 &input_stats.column_statistics,
237 analysis_ctx.boundaries,
238 );
239 Ok(Statistics {
240 num_rows,
241 total_byte_size,
242 column_statistics,
243 })
244 }
245
246 fn expr_constant_or_literal(
250 expr: &Arc<dyn PhysicalExpr>,
251 input_eqs: &EquivalenceProperties,
252 ) -> Option<AcrossPartitions> {
253 input_eqs.is_expr_constant(expr).or_else(|| {
254 expr.as_any()
255 .downcast_ref::<Literal>()
256 .map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
257 })
258 }
259
260 fn extend_constants(
261 input: &Arc<dyn ExecutionPlan>,
262 predicate: &Arc<dyn PhysicalExpr>,
263 ) -> Vec<ConstExpr> {
264 let mut res_constants = Vec::new();
265 let input_eqs = input.equivalence_properties();
266
267 let conjunctions = split_conjunction(predicate);
268 for conjunction in conjunctions {
269 if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
270 && binary.op() == &Operator::Eq
271 {
272 let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
276 let right_const =
277 Self::expr_constant_or_literal(binary.right(), input_eqs);
278
279 if let Some(left_across) = left_const {
280 let across = right_const.unwrap_or(left_across);
284 res_constants
285 .push(ConstExpr::new(Arc::clone(binary.right()), across));
286 } else if let Some(right_across) = right_const {
287 res_constants
289 .push(ConstExpr::new(Arc::clone(binary.left()), right_across));
290 }
291 }
292 }
293 res_constants
294 }
295 fn compute_properties(
297 input: &Arc<dyn ExecutionPlan>,
298 predicate: &Arc<dyn PhysicalExpr>,
299 default_selectivity: u8,
300 projection: Option<&Vec<usize>>,
301 ) -> Result<PlanProperties> {
302 let schema = input.schema();
305 let stats = Self::statistics_helper(
306 &schema,
307 input.partition_statistics(None)?,
308 predicate,
309 default_selectivity,
310 )?;
311 let mut eq_properties = input.equivalence_properties().clone();
312 let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
313 for (lhs, rhs) in equal_pairs {
314 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
315 }
316 let constants = collect_columns(predicate)
319 .into_iter()
320 .filter(|column| stats.column_statistics[column.index()].is_singleton())
321 .map(|column| {
322 let value = stats.column_statistics[column.index()]
323 .min_value
324 .get_value();
325 let expr = Arc::new(column) as _;
326 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
327 });
328 eq_properties.add_constants(constants)?;
330 eq_properties.add_constants(Self::extend_constants(input, predicate))?;
333
334 let mut output_partitioning = input.output_partitioning().clone();
335 if let Some(projection) = projection {
337 let schema = eq_properties.schema();
338 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
339 let out_schema = project_schema(schema, Some(projection))?;
340 output_partitioning =
341 output_partitioning.project(&projection_mapping, &eq_properties);
342 eq_properties = eq_properties.project(&projection_mapping, out_schema);
343 }
344
345 Ok(PlanProperties::new(
346 eq_properties,
347 output_partitioning,
348 input.pipeline_behavior(),
349 input.boundedness(),
350 ))
351 }
352}
353
354impl DisplayAs for FilterExec {
355 fn fmt_as(
356 &self,
357 t: DisplayFormatType,
358 f: &mut std::fmt::Formatter,
359 ) -> std::fmt::Result {
360 match t {
361 DisplayFormatType::Default | DisplayFormatType::Verbose => {
362 let display_projections = if let Some(projection) =
363 self.projection.as_ref()
364 {
365 format!(
366 ", projection=[{}]",
367 projection
368 .iter()
369 .map(|index| format!(
370 "{}@{}",
371 self.input.schema().fields().get(*index).unwrap().name(),
372 index
373 ))
374 .collect::<Vec<_>>()
375 .join(", ")
376 )
377 } else {
378 "".to_string()
379 };
380 let fetch = self
381 .fetch
382 .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
383 write!(
384 f,
385 "FilterExec: {}{}{}",
386 self.predicate, display_projections, fetch
387 )
388 }
389 DisplayFormatType::TreeRender => {
390 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
391 }
392 }
393 }
394}
395
396impl ExecutionPlan for FilterExec {
397 fn name(&self) -> &'static str {
398 "FilterExec"
399 }
400
401 fn as_any(&self) -> &dyn Any {
403 self
404 }
405
406 fn properties(&self) -> &PlanProperties {
407 &self.cache
408 }
409
410 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
411 vec![&self.input]
412 }
413
414 fn maintains_input_order(&self) -> Vec<bool> {
415 vec![true]
417 }
418
419 fn with_new_children(
420 self: Arc<Self>,
421 mut children: Vec<Arc<dyn ExecutionPlan>>,
422 ) -> Result<Arc<dyn ExecutionPlan>> {
423 FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
424 .and_then(|e| {
425 let selectivity = e.default_selectivity();
426 e.with_default_selectivity(selectivity)
427 })
428 .and_then(|e| e.with_projection(self.projection().cloned()))
429 .map(|e| e.with_fetch(self.fetch).unwrap())
430 }
431
432 fn execute(
433 &self,
434 partition: usize,
435 context: Arc<TaskContext>,
436 ) -> Result<SendableRecordBatchStream> {
437 trace!(
438 "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
439 partition,
440 context.session_id(),
441 context.task_id()
442 );
443 let metrics = FilterExecMetrics::new(&self.metrics, partition);
444 Ok(Box::pin(FilterExecStream {
445 schema: self.schema(),
446 predicate: Arc::clone(&self.predicate),
447 input: self.input.execute(partition, context)?,
448 metrics,
449 projection: self.projection.clone(),
450 batch_coalescer: LimitedBatchCoalescer::new(
451 self.schema(),
452 self.batch_size,
453 self.fetch,
454 ),
455 }))
456 }
457
458 fn metrics(&self) -> Option<MetricsSet> {
459 Some(self.metrics.clone_inner())
460 }
461
462 fn statistics(&self) -> Result<Statistics> {
465 self.partition_statistics(None)
466 }
467
468 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
469 let input_stats = self.input.partition_statistics(partition)?;
470 let schema = self.schema();
471 let stats = Self::statistics_helper(
472 &schema,
473 input_stats,
474 self.predicate(),
475 self.default_selectivity,
476 )?;
477 Ok(stats.project(self.projection.as_ref()))
478 }
479
480 fn cardinality_effect(&self) -> CardinalityEffect {
481 CardinalityEffect::LowerEqual
482 }
483
484 fn try_swapping_with_projection(
487 &self,
488 projection: &ProjectionExec,
489 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
490 if projection.expr().len() < projection.input().schema().fields().len() {
492 if let Some(new_predicate) =
494 update_expr(self.predicate(), projection.expr(), false)?
495 {
496 return FilterExec::try_new(
497 new_predicate,
498 make_with_child(projection, self.input())?,
499 )
500 .and_then(|e| {
501 let selectivity = self.default_selectivity();
502 e.with_default_selectivity(selectivity)
503 })
504 .map(|e| Some(Arc::new(e) as _));
505 }
506 }
507 try_embed_projection(projection, self)
508 }
509
510 fn gather_filters_for_pushdown(
511 &self,
512 phase: FilterPushdownPhase,
513 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
514 _config: &ConfigOptions,
515 ) -> Result<FilterDescription> {
516 if !matches!(phase, FilterPushdownPhase::Pre) {
517 let child =
519 ChildFilterDescription::from_child(&parent_filters, self.input())?;
520 return Ok(FilterDescription::new().with_child(child));
521 }
522
523 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
524 .with_self_filters(
525 split_conjunction(&self.predicate)
526 .into_iter()
527 .cloned()
528 .collect(),
529 );
530
531 Ok(FilterDescription::new().with_child(child))
532 }
533
534 fn handle_child_pushdown_result(
535 &self,
536 phase: FilterPushdownPhase,
537 child_pushdown_result: ChildPushdownResult,
538 _config: &ConfigOptions,
539 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
540 if !matches!(phase, FilterPushdownPhase::Pre) {
541 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
542 }
543 let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
545 child_pushdown_result
546 .parent_filters
547 .iter()
548 .filter_map(|f| {
549 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
550 })
551 .collect();
552
553 if self.projection.is_some() {
557 let input_schema = self.input().schema();
558 unsupported_parent_filters = unsupported_parent_filters
559 .into_iter()
560 .map(|expr| reassign_expr_columns(expr, &input_schema))
561 .collect::<Result<Vec<_>>>()?;
562 }
563
564 let unsupported_self_filters = child_pushdown_result
565 .self_filters
566 .first()
567 .expect("we have exactly one child")
568 .iter()
569 .filter_map(|f| match f.discriminant {
570 PushedDown::Yes => None,
571 PushedDown::No => Some(&f.predicate),
572 })
573 .cloned();
574
575 let unhandled_filters = unsupported_parent_filters
576 .into_iter()
577 .chain(unsupported_self_filters)
578 .collect_vec();
579
580 let filter_input = Arc::clone(self.input());
582 let new_predicate = conjunction(unhandled_filters);
583 let updated_node = if new_predicate.eq(&lit(true)) {
584 match self.projection() {
586 Some(projection_indices) => {
587 let filter_child_schema = filter_input.schema();
588 let proj_exprs = projection_indices
589 .iter()
590 .map(|p| {
591 let field = filter_child_schema.field(*p).clone();
592 ProjectionExpr {
593 expr: Arc::new(Column::new(field.name(), *p))
594 as Arc<dyn PhysicalExpr>,
595 alias: field.name().to_string(),
596 }
597 })
598 .collect::<Vec<_>>();
599 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
600 as Arc<dyn ExecutionPlan>)
601 }
602 None => {
603 Some(filter_input)
605 }
606 }
607 } else if new_predicate.eq(&self.predicate) {
608 None
610 } else {
611 let new = FilterExec {
613 predicate: Arc::clone(&new_predicate),
614 input: Arc::clone(&filter_input),
615 metrics: self.metrics.clone(),
616 default_selectivity: self.default_selectivity,
617 cache: Self::compute_properties(
618 &filter_input,
619 &new_predicate,
620 self.default_selectivity,
621 self.projection.as_ref(),
622 )?,
623 projection: self.projection.clone(),
624 batch_size: self.batch_size,
625 fetch: self.fetch,
626 };
627 Some(Arc::new(new) as _)
628 };
629
630 Ok(FilterPushdownPropagation {
631 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
632 updated_node,
633 })
634 }
635
636 fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
637 Some(Arc::new(Self {
638 predicate: Arc::clone(&self.predicate),
639 input: Arc::clone(&self.input),
640 metrics: self.metrics.clone(),
641 default_selectivity: self.default_selectivity,
642 cache: self.cache.clone(),
643 projection: self.projection.clone(),
644 batch_size: self.batch_size,
645 fetch,
646 }))
647 }
648}
649
650impl EmbeddedProjection for FilterExec {
651 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
652 self.with_projection(projection)
653 }
654}
655
656fn collect_new_statistics(
661 schema: &SchemaRef,
662 input_column_stats: &[ColumnStatistics],
663 analysis_boundaries: Vec<ExprBoundaries>,
664) -> Vec<ColumnStatistics> {
665 analysis_boundaries
666 .into_iter()
667 .enumerate()
668 .map(
669 |(
670 idx,
671 ExprBoundaries {
672 interval,
673 distinct_count,
674 ..
675 },
676 )| {
677 let Some(interval) = interval else {
678 let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
683 .unwrap_or(ScalarValue::Null);
684 return ColumnStatistics {
685 null_count: Precision::Exact(0),
686 max_value: Precision::Exact(typed_null.clone()),
687 min_value: Precision::Exact(typed_null.clone()),
688 sum_value: Precision::Exact(typed_null),
689 distinct_count: Precision::Exact(0),
690 byte_size: input_column_stats[idx].byte_size,
691 };
692 };
693 let (lower, upper) = interval.into_bounds();
694 let (min_value, max_value) = if lower.eq(&upper) {
695 (Precision::Exact(lower), Precision::Exact(upper))
696 } else {
697 (Precision::Inexact(lower), Precision::Inexact(upper))
698 };
699 ColumnStatistics {
700 null_count: input_column_stats[idx].null_count.to_inexact(),
701 max_value,
702 min_value,
703 sum_value: Precision::Absent,
704 distinct_count: distinct_count.to_inexact(),
705 byte_size: input_column_stats[idx].byte_size,
706 }
707 },
708 )
709 .collect()
710}
711
712struct FilterExecStream {
715 schema: SchemaRef,
717 predicate: Arc<dyn PhysicalExpr>,
719 input: SendableRecordBatchStream,
721 metrics: FilterExecMetrics,
723 projection: Option<Vec<usize>>,
725 batch_coalescer: LimitedBatchCoalescer,
727}
728
729struct FilterExecMetrics {
731 baseline_metrics: BaselineMetrics,
733 selectivity: RatioMetrics,
735 }
738
739impl FilterExecMetrics {
740 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
741 Self {
742 baseline_metrics: BaselineMetrics::new(metrics, partition),
743 selectivity: MetricBuilder::new(metrics)
744 .with_type(MetricType::SUMMARY)
745 .ratio_metrics("selectivity", partition),
746 }
747 }
748}
749
750pub fn batch_filter(
751 batch: &RecordBatch,
752 predicate: &Arc<dyn PhysicalExpr>,
753) -> Result<RecordBatch> {
754 filter_and_project(batch, predicate, None)
755}
756
757fn filter_and_project(
758 batch: &RecordBatch,
759 predicate: &Arc<dyn PhysicalExpr>,
760 projection: Option<&Vec<usize>>,
761) -> Result<RecordBatch> {
762 predicate
763 .evaluate(batch)
764 .and_then(|v| v.into_array(batch.num_rows()))
765 .and_then(|array| {
766 Ok(match (as_boolean_array(&array), projection) {
767 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
769 (Ok(filter_array), Some(projection)) => {
770 let projected_batch = batch.project(projection)?;
771 filter_record_batch(&projected_batch, filter_array)?
772 }
773 (Err(_), _) => {
774 return internal_err!(
775 "Cannot create filter_array from non-boolean predicates"
776 );
777 }
778 })
779 })
780}
781
782impl Stream for FilterExecStream {
783 type Item = Result<RecordBatch>;
784
785 fn poll_next(
786 mut self: Pin<&mut Self>,
787 cx: &mut Context<'_>,
788 ) -> Poll<Option<Self::Item>> {
789 let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
790 loop {
791 if let Some(batch) = self.batch_coalescer.next_completed_batch() {
793 self.metrics.selectivity.add_part(batch.num_rows());
794 let poll = Poll::Ready(Some(Ok(batch)));
795 return self.metrics.baseline_metrics.record_poll(poll);
796 }
797
798 if self.batch_coalescer.is_finished() {
799 return Poll::Ready(None);
801 }
802
803 match ready!(self.input.poll_next_unpin(cx)) {
805 None => {
806 self.batch_coalescer.finish()?;
807 }
809 Some(Ok(batch)) => {
810 let timer = elapsed_compute.timer();
811 let status = self.predicate.as_ref()
812 .evaluate(&batch)
813 .and_then(|v| v.into_array(batch.num_rows()))
814 .and_then(|array| {
815 Ok(match self.projection {
816 Some(ref projection) => {
817 let projected_batch = batch.project(projection)?;
818 (array, projected_batch)
819 },
820 None => (array, batch)
821 })
822 }).and_then(|(array, batch)| {
823 match as_boolean_array(&array) {
824 Ok(filter_array) => {
825 self.metrics.selectivity.add_total(batch.num_rows());
826 let batch = filter_record_batch(&batch, filter_array)?;
828 let state = self.batch_coalescer.push_batch(batch)?;
829 Ok(state)
830 }
831 Err(_) => {
832 internal_err!(
833 "Cannot create filter_array from non-boolean predicates"
834 )
835 }
836 }
837 })?;
838 timer.done();
839
840 match status {
841 PushBatchStatus::Continue => {
842 }
844 PushBatchStatus::LimitReached => {
845 self.batch_coalescer.finish()?;
847 }
849 }
850 }
851
852 other => return Poll::Ready(other),
854 }
855 }
856 }
857
858 fn size_hint(&self) -> (usize, Option<usize>) {
859 self.input.size_hint()
861 }
862}
863impl RecordBatchStream for FilterExecStream {
864 fn schema(&self) -> SchemaRef {
865 Arc::clone(&self.schema)
866 }
867}
868
869#[deprecated(
871 since = "51.0.0",
872 note = "This function will be internal in the future"
873)]
874pub fn collect_columns_from_predicate(
875 predicate: &'_ Arc<dyn PhysicalExpr>,
876) -> EqualAndNonEqual<'_> {
877 collect_columns_from_predicate_inner(predicate)
878}
879
880fn collect_columns_from_predicate_inner(
881 predicate: &'_ Arc<dyn PhysicalExpr>,
882) -> EqualAndNonEqual<'_> {
883 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
884 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
885
886 let predicates = split_conjunction(predicate);
887 predicates.into_iter().for_each(|p| {
888 if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
889 let has_direct_column_operand =
897 binary.left().as_any().downcast_ref::<Column>().is_some()
898 || binary.right().as_any().downcast_ref::<Column>().is_some();
899 if !has_direct_column_operand {
900 return;
901 }
902 match binary.op() {
903 Operator::Eq => {
904 eq_predicate_columns.push((binary.left(), binary.right()))
905 }
906 Operator::NotEq => {
907 ne_predicate_columns.push((binary.left(), binary.right()))
908 }
909 _ => {}
910 }
911 }
912 });
913
914 (eq_predicate_columns, ne_predicate_columns)
915}
916
917pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
919
920pub type EqualAndNonEqual<'a> =
922 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927 use crate::empty::EmptyExec;
928 use crate::expressions::*;
929 use crate::test;
930 use crate::test::exec::StatisticsExec;
931 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
932 use datafusion_common::ScalarValue;
933
934 #[tokio::test]
935 async fn collect_columns_predicates() -> Result<()> {
936 let schema = test::aggr_test_schema();
937 let predicate: Arc<dyn PhysicalExpr> = binary(
938 binary(
939 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
940 Operator::And,
941 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
942 &schema,
943 )?,
944 Operator::And,
945 binary(
946 binary(
947 col("c2", &schema)?,
948 Operator::Eq,
949 col("c9", &schema)?,
950 &schema,
951 )?,
952 Operator::And,
953 binary(
954 col("c1", &schema)?,
955 Operator::NotEq,
956 col("c13", &schema)?,
957 &schema,
958 )?,
959 &schema,
960 )?,
961 &schema,
962 )?;
963
964 let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
965 assert_eq!(2, equal_pairs.len());
966 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
967 assert!(equal_pairs[0].1.eq(&lit(4u32)));
968
969 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
970 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
971
972 assert_eq!(1, ne_pairs.len());
973 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
974 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
975
976 Ok(())
977 }
978
979 #[tokio::test]
980 async fn test_filter_statistics_basic_expr() -> Result<()> {
981 let bytes_per_row = 4;
984 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
985 let input = Arc::new(StatisticsExec::new(
986 Statistics {
987 num_rows: Precision::Inexact(100),
988 total_byte_size: Precision::Inexact(100 * bytes_per_row),
989 column_statistics: vec![ColumnStatistics {
990 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
991 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
992 ..Default::default()
993 }],
994 },
995 schema.clone(),
996 ));
997
998 let predicate: Arc<dyn PhysicalExpr> =
1000 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1001
1002 let filter: Arc<dyn ExecutionPlan> =
1004 Arc::new(FilterExec::try_new(predicate, input)?);
1005
1006 let statistics = filter.partition_statistics(None)?;
1007 assert_eq!(statistics.num_rows, Precision::Inexact(25));
1008 assert_eq!(
1009 statistics.total_byte_size,
1010 Precision::Inexact(25 * bytes_per_row)
1011 );
1012 assert_eq!(
1013 statistics.column_statistics,
1014 vec![ColumnStatistics {
1015 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1016 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1017 ..Default::default()
1018 }]
1019 );
1020
1021 Ok(())
1022 }
1023
1024 #[tokio::test]
1025 async fn test_filter_statistics_column_level_nested() -> Result<()> {
1026 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1029 let input = Arc::new(StatisticsExec::new(
1030 Statistics {
1031 num_rows: Precision::Inexact(100),
1032 column_statistics: vec![ColumnStatistics {
1033 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1034 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1035 ..Default::default()
1036 }],
1037 total_byte_size: Precision::Absent,
1038 },
1039 schema.clone(),
1040 ));
1041
1042 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1044 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1045 input,
1046 )?);
1047
1048 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1052 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1053 sub_filter,
1054 )?);
1055
1056 let statistics = filter.partition_statistics(None)?;
1057 assert_eq!(statistics.num_rows, Precision::Inexact(16));
1058 assert_eq!(
1059 statistics.column_statistics,
1060 vec![ColumnStatistics {
1061 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1062 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1063 ..Default::default()
1064 }]
1065 );
1066
1067 Ok(())
1068 }
1069
1070 #[tokio::test]
1071 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1072 let schema = Schema::new(vec![
1076 Field::new("a", DataType::Int32, false),
1077 Field::new("b", DataType::Int32, false),
1078 ]);
1079 let input = Arc::new(StatisticsExec::new(
1080 Statistics {
1081 num_rows: Precision::Inexact(100),
1082 column_statistics: vec![
1083 ColumnStatistics {
1084 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1085 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1086 ..Default::default()
1087 },
1088 ColumnStatistics {
1089 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1090 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1091 ..Default::default()
1092 },
1093 ],
1094 total_byte_size: Precision::Absent,
1095 },
1096 schema.clone(),
1097 ));
1098
1099 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1101 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1102 input,
1103 )?);
1104
1105 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1107 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1108 a_lte_25,
1109 )?);
1110
1111 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1113 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1114 b_gt_5,
1115 )?);
1116 let statistics = filter.partition_statistics(None)?;
1117 assert_eq!(statistics.num_rows, Precision::Inexact(2));
1124 assert_eq!(
1125 statistics.column_statistics,
1126 vec![
1127 ColumnStatistics {
1128 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1129 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1130 ..Default::default()
1131 },
1132 ColumnStatistics {
1133 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1134 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1135 ..Default::default()
1136 }
1137 ]
1138 );
1139
1140 Ok(())
1141 }
1142
1143 #[tokio::test]
1144 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1145 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1148 let input = Arc::new(StatisticsExec::new(
1149 Statistics::new_unknown(&schema),
1150 schema.clone(),
1151 ));
1152
1153 let predicate: Arc<dyn PhysicalExpr> =
1155 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1156
1157 let filter: Arc<dyn ExecutionPlan> =
1159 Arc::new(FilterExec::try_new(predicate, input)?);
1160
1161 let statistics = filter.partition_statistics(None)?;
1162 assert_eq!(statistics.num_rows, Precision::Absent);
1163
1164 Ok(())
1165 }
1166
1167 #[tokio::test]
1168 async fn test_filter_statistics_multiple_columns() -> Result<()> {
1169 let schema = Schema::new(vec![
1174 Field::new("a", DataType::Int32, false),
1175 Field::new("b", DataType::Int32, false),
1176 Field::new("c", DataType::Float32, false),
1177 ]);
1178 let input = Arc::new(StatisticsExec::new(
1179 Statistics {
1180 num_rows: Precision::Inexact(1000),
1181 total_byte_size: Precision::Inexact(4000),
1182 column_statistics: vec![
1183 ColumnStatistics {
1184 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1185 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1186 ..Default::default()
1187 },
1188 ColumnStatistics {
1189 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1190 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1191 ..Default::default()
1192 },
1193 ColumnStatistics {
1194 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1195 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1196 ..Default::default()
1197 },
1198 ],
1199 },
1200 schema,
1201 ));
1202 let predicate = Arc::new(BinaryExpr::new(
1204 Arc::new(BinaryExpr::new(
1205 Arc::new(Column::new("a", 0)),
1206 Operator::LtEq,
1207 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1208 )),
1209 Operator::And,
1210 Arc::new(BinaryExpr::new(
1211 Arc::new(BinaryExpr::new(
1212 Arc::new(Column::new("b", 1)),
1213 Operator::Eq,
1214 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1215 )),
1216 Operator::And,
1217 Arc::new(BinaryExpr::new(
1218 Arc::new(BinaryExpr::new(
1219 Arc::new(Column::new("c", 2)),
1220 Operator::LtEq,
1221 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1222 )),
1223 Operator::And,
1224 Arc::new(BinaryExpr::new(
1225 Arc::new(Column::new("a", 0)),
1226 Operator::Gt,
1227 Arc::new(Column::new("b", 1)),
1228 )),
1229 )),
1230 )),
1231 ));
1232 let filter: Arc<dyn ExecutionPlan> =
1233 Arc::new(FilterExec::try_new(predicate, input)?);
1234 let statistics = filter.partition_statistics(None)?;
1235 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1239 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1240 let exp_col_stats = vec![
1241 ColumnStatistics {
1242 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1243 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1244 ..Default::default()
1245 },
1246 ColumnStatistics {
1247 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1248 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1249 ..Default::default()
1250 },
1251 ColumnStatistics {
1252 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1253 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1254 ..Default::default()
1255 },
1256 ];
1257 let _ = exp_col_stats
1258 .into_iter()
1259 .zip(statistics.column_statistics)
1260 .map(|(expected, actual)| {
1261 if let Some(val) = actual.min_value.get_value() {
1262 if val.data_type().is_floating() {
1263 let actual_min = actual.min_value.get_value().unwrap();
1266 let actual_max = actual.max_value.get_value().unwrap();
1267 let expected_min = expected.min_value.get_value().unwrap();
1268 let expected_max = expected.max_value.get_value().unwrap();
1269 let eps = ScalarValue::Float32(Some(1e-6));
1270
1271 assert!(actual_min.sub(expected_min).unwrap() < eps);
1272 assert!(actual_min.sub(expected_min).unwrap() < eps);
1273
1274 assert!(actual_max.sub(expected_max).unwrap() < eps);
1275 assert!(actual_max.sub(expected_max).unwrap() < eps);
1276 } else {
1277 assert_eq!(actual, expected);
1278 }
1279 } else {
1280 assert_eq!(actual, expected);
1281 }
1282 });
1283
1284 Ok(())
1285 }
1286
1287 #[tokio::test]
1288 async fn test_filter_statistics_full_selective() -> Result<()> {
1289 let schema = Schema::new(vec![
1293 Field::new("a", DataType::Int32, false),
1294 Field::new("b", DataType::Int32, false),
1295 ]);
1296 let input = Arc::new(StatisticsExec::new(
1297 Statistics {
1298 num_rows: Precision::Inexact(1000),
1299 total_byte_size: Precision::Inexact(4000),
1300 column_statistics: vec![
1301 ColumnStatistics {
1302 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1303 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1304 ..Default::default()
1305 },
1306 ColumnStatistics {
1307 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1308 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1309 ..Default::default()
1310 },
1311 ],
1312 },
1313 schema,
1314 ));
1315 let predicate = Arc::new(BinaryExpr::new(
1317 Arc::new(BinaryExpr::new(
1318 Arc::new(Column::new("a", 0)),
1319 Operator::Lt,
1320 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1321 )),
1322 Operator::And,
1323 Arc::new(BinaryExpr::new(
1324 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1325 Operator::LtEq,
1326 Arc::new(Column::new("b", 1)),
1327 )),
1328 ));
1329 let expected = input.partition_statistics(None)?.column_statistics;
1331 let filter: Arc<dyn ExecutionPlan> =
1332 Arc::new(FilterExec::try_new(predicate, input)?);
1333 let statistics = filter.partition_statistics(None)?;
1334
1335 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1336 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1337 assert_eq!(statistics.column_statistics, expected);
1338
1339 Ok(())
1340 }
1341
1342 #[tokio::test]
1343 async fn test_filter_statistics_zero_selective() -> Result<()> {
1344 let schema = Schema::new(vec![
1348 Field::new("a", DataType::Int32, false),
1349 Field::new("b", DataType::Int32, false),
1350 ]);
1351 let input = Arc::new(StatisticsExec::new(
1352 Statistics {
1353 num_rows: Precision::Inexact(1000),
1354 total_byte_size: Precision::Inexact(4000),
1355 column_statistics: vec![
1356 ColumnStatistics {
1357 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1358 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1359 ..Default::default()
1360 },
1361 ColumnStatistics {
1362 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1363 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1364 ..Default::default()
1365 },
1366 ],
1367 },
1368 schema,
1369 ));
1370 let predicate = Arc::new(BinaryExpr::new(
1372 Arc::new(BinaryExpr::new(
1373 Arc::new(Column::new("a", 0)),
1374 Operator::Gt,
1375 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1376 )),
1377 Operator::And,
1378 Arc::new(BinaryExpr::new(
1379 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1380 Operator::LtEq,
1381 Arc::new(Column::new("b", 1)),
1382 )),
1383 ));
1384 let filter: Arc<dyn ExecutionPlan> =
1385 Arc::new(FilterExec::try_new(predicate, input)?);
1386 let statistics = filter.partition_statistics(None)?;
1387
1388 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1389 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1390 assert_eq!(
1391 statistics.column_statistics,
1392 vec![
1393 ColumnStatistics {
1394 min_value: Precision::Exact(ScalarValue::Int32(None)),
1395 max_value: Precision::Exact(ScalarValue::Int32(None)),
1396 sum_value: Precision::Exact(ScalarValue::Int32(None)),
1397 distinct_count: Precision::Exact(0),
1398 null_count: Precision::Exact(0),
1399 byte_size: Precision::Absent,
1400 },
1401 ColumnStatistics {
1402 min_value: Precision::Exact(ScalarValue::Int32(None)),
1403 max_value: Precision::Exact(ScalarValue::Int32(None)),
1404 sum_value: Precision::Exact(ScalarValue::Int32(None)),
1405 distinct_count: Precision::Exact(0),
1406 null_count: Precision::Exact(0),
1407 byte_size: Precision::Absent,
1408 },
1409 ]
1410 );
1411
1412 Ok(())
1413 }
1414
1415 #[tokio::test]
1425 async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1426 let schema = Schema::new(vec![
1428 Field::new("a", DataType::Int32, false),
1429 Field::new("b", DataType::Int32, false),
1430 ]);
1431 let input = Arc::new(StatisticsExec::new(
1432 Statistics {
1433 num_rows: Precision::Inexact(1000),
1434 total_byte_size: Precision::Inexact(4000),
1435 column_statistics: vec![
1436 ColumnStatistics {
1437 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1438 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1439 ..Default::default()
1440 },
1441 ColumnStatistics {
1442 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1443 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1444 ..Default::default()
1445 },
1446 ],
1447 },
1448 schema,
1449 ));
1450
1451 let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1453 Arc::new(Column::new("a", 0)),
1454 Operator::Gt,
1455 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1456 ));
1457 let inner_filter: Arc<dyn ExecutionPlan> =
1458 Arc::new(FilterExec::try_new(inner_predicate, input)?);
1459
1460 let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1465 Arc::new(Column::new("a", 0)),
1466 Operator::Eq,
1467 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1468 ));
1469 let outer_filter: Arc<dyn ExecutionPlan> =
1470 Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1471
1472 let statistics = outer_filter.partition_statistics(None)?;
1474 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1475
1476 Ok(())
1477 }
1478
1479 #[tokio::test]
1480 async fn test_filter_statistics_more_inputs() -> Result<()> {
1481 let schema = Schema::new(vec![
1482 Field::new("a", DataType::Int32, false),
1483 Field::new("b", DataType::Int32, false),
1484 ]);
1485 let input = Arc::new(StatisticsExec::new(
1486 Statistics {
1487 num_rows: Precision::Inexact(1000),
1488 total_byte_size: Precision::Inexact(4000),
1489 column_statistics: vec![
1490 ColumnStatistics {
1491 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1492 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1493 ..Default::default()
1494 },
1495 ColumnStatistics {
1496 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1497 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1498 ..Default::default()
1499 },
1500 ],
1501 },
1502 schema,
1503 ));
1504 let predicate = Arc::new(BinaryExpr::new(
1506 Arc::new(Column::new("a", 0)),
1507 Operator::Lt,
1508 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1509 ));
1510 let filter: Arc<dyn ExecutionPlan> =
1511 Arc::new(FilterExec::try_new(predicate, input)?);
1512 let statistics = filter.partition_statistics(None)?;
1513
1514 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1515 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1516 assert_eq!(
1517 statistics.column_statistics,
1518 vec![
1519 ColumnStatistics {
1520 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1521 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1522 ..Default::default()
1523 },
1524 ColumnStatistics {
1525 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1526 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1527 ..Default::default()
1528 },
1529 ]
1530 );
1531
1532 Ok(())
1533 }
1534
1535 #[tokio::test]
1536 async fn test_empty_input_statistics() -> Result<()> {
1537 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1538 let input = Arc::new(StatisticsExec::new(
1539 Statistics::new_unknown(&schema),
1540 schema,
1541 ));
1542 let predicate = Arc::new(BinaryExpr::new(
1544 Arc::new(BinaryExpr::new(
1545 Arc::new(Column::new("a", 0)),
1546 Operator::LtEq,
1547 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1548 )),
1549 Operator::And,
1550 Arc::new(BinaryExpr::new(
1551 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1552 Operator::LtEq,
1553 Arc::new(BinaryExpr::new(
1554 Arc::new(Column::new("a", 0)),
1555 Operator::Minus,
1556 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1557 )),
1558 )),
1559 ));
1560 let filter: Arc<dyn ExecutionPlan> =
1561 Arc::new(FilterExec::try_new(predicate, input)?);
1562 let filter_statistics = filter.partition_statistics(None)?;
1563
1564 let expected_filter_statistics = Statistics {
1565 num_rows: Precision::Absent,
1566 total_byte_size: Precision::Absent,
1567 column_statistics: vec![ColumnStatistics {
1568 null_count: Precision::Absent,
1569 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1570 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1571 sum_value: Precision::Absent,
1572 distinct_count: Precision::Absent,
1573 byte_size: Precision::Absent,
1574 }],
1575 };
1576
1577 assert_eq!(filter_statistics, expected_filter_statistics);
1578
1579 Ok(())
1580 }
1581
1582 #[tokio::test]
1583 async fn test_statistics_with_constant_column() -> Result<()> {
1584 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1585 let input = Arc::new(StatisticsExec::new(
1586 Statistics::new_unknown(&schema),
1587 schema,
1588 ));
1589 let predicate = Arc::new(BinaryExpr::new(
1591 Arc::new(Column::new("a", 0)),
1592 Operator::Eq,
1593 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1594 ));
1595 let filter: Arc<dyn ExecutionPlan> =
1596 Arc::new(FilterExec::try_new(predicate, input)?);
1597 let filter_statistics = filter.partition_statistics(None)?;
1598 assert!(filter_statistics.column_statistics[0].is_singleton());
1600
1601 Ok(())
1602 }
1603
1604 #[tokio::test]
1605 async fn test_validation_filter_selectivity() -> Result<()> {
1606 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1607 let input = Arc::new(StatisticsExec::new(
1608 Statistics::new_unknown(&schema),
1609 schema,
1610 ));
1611 let predicate = Arc::new(BinaryExpr::new(
1613 Arc::new(Column::new("a", 0)),
1614 Operator::Eq,
1615 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1616 ));
1617 let filter = FilterExec::try_new(predicate, input)?;
1618 assert!(filter.with_default_selectivity(120).is_err());
1619 Ok(())
1620 }
1621
1622 #[tokio::test]
1623 async fn test_custom_filter_selectivity() -> Result<()> {
1624 let schema =
1626 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1627 let input = Arc::new(StatisticsExec::new(
1628 Statistics {
1629 num_rows: Precision::Inexact(1000),
1630 total_byte_size: Precision::Inexact(4000),
1631 column_statistics: vec![ColumnStatistics {
1632 ..Default::default()
1633 }],
1634 },
1635 schema,
1636 ));
1637 let predicate = Arc::new(BinaryExpr::new(
1639 Arc::new(Column::new("a", 0)),
1640 Operator::Eq,
1641 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1642 ));
1643 let filter = FilterExec::try_new(predicate, input)?;
1644 let statistics = filter.partition_statistics(None)?;
1645 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1646 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1647 let filter = filter.with_default_selectivity(40)?;
1648 let statistics = filter.partition_statistics(None)?;
1649 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1650 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1651 Ok(())
1652 }
1653
1654 #[test]
1655 fn test_equivalence_properties_union_type() -> Result<()> {
1656 let union_type = DataType::Union(
1657 UnionFields::new(
1658 vec![0, 1],
1659 vec![
1660 Field::new("f1", DataType::Int32, true),
1661 Field::new("f2", DataType::Utf8, true),
1662 ],
1663 ),
1664 UnionMode::Sparse,
1665 );
1666
1667 let schema = Arc::new(Schema::new(vec![
1668 Field::new("c1", DataType::Int32, true),
1669 Field::new("c2", union_type, true),
1670 ]));
1671
1672 let exec = FilterExec::try_new(
1673 binary(
1674 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1675 Operator::And,
1676 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1677 &schema,
1678 )?,
1679 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1680 )?;
1681
1682 exec.partition_statistics(None).unwrap();
1683
1684 Ok(())
1685 }
1686
1687 #[test]
1688 fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
1689 let input_schema = Arc::new(Schema::new(vec![
1693 Field::new("a", DataType::Int32, false),
1694 Field::new("b", DataType::Utf8, false),
1695 Field::new("c", DataType::Float64, false),
1696 ]));
1697 let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
1698
1699 let predicate = Arc::new(BinaryExpr::new(
1701 Arc::new(Column::new("a", 0)),
1702 Operator::Gt,
1703 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1704 ));
1705 let filter =
1706 FilterExec::try_new(predicate, input)?.with_projection(Some(vec![2]))?;
1707
1708 let output_schema = filter.schema();
1710 assert_eq!(output_schema.fields().len(), 1);
1711 assert_eq!(output_schema.field(0).name(), "c");
1712
1713 let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
1715
1716 let config = ConfigOptions::new();
1717 let desc = filter.gather_filters_for_pushdown(
1718 FilterPushdownPhase::Post,
1719 vec![parent_filter],
1720 &config,
1721 )?;
1722
1723 let parent_filters = desc.parent_filters();
1726 assert_eq!(parent_filters.len(), 1); assert_eq!(parent_filters[0].len(), 1); let remapped = &parent_filters[0][0].predicate;
1729 let display = format!("{remapped}");
1730 assert_eq!(
1731 display, "c@2",
1732 "Post-phase parent filter column index must be remapped \
1733 from output schema (c@0) to input schema (c@2)"
1734 );
1735
1736 Ok(())
1737 }
1738 #[test]
1746 fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
1747 let schema = test::aggr_test_schema();
1748
1749 let complex_expr: Arc<dyn PhysicalExpr> = binary(
1752 col("c2", &schema)?,
1753 Operator::IsDistinctFrom,
1754 lit(0u32),
1755 &schema,
1756 )?;
1757 let predicate: Arc<dyn PhysicalExpr> =
1758 binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
1759
1760 let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
1761 assert_eq!(
1762 0,
1763 equal_pairs.len(),
1764 "Should not extract equality pairs where neither side is a Column"
1765 );
1766
1767 let predicate: Arc<dyn PhysicalExpr> =
1769 binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
1770 let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
1771 assert_eq!(
1772 1,
1773 equal_pairs.len(),
1774 "Should extract equality pairs where one side is a Column"
1775 );
1776
1777 Ok(())
1778 }
1779}