1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27 RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
30use crate::common::can_project;
31use crate::execution_plan::CardinalityEffect;
32use crate::filter_pushdown::{
33 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
34 FilterPushdownPropagation, PushedDown,
35};
36use crate::metrics::{MetricBuilder, MetricType};
37use crate::projection::{
38 EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
39 try_embed_projection, update_expr,
40};
41use crate::{
42 DisplayFormatType, ExecutionPlan,
43 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
44};
45
46use arrow::compute::filter_record_batch;
47use arrow::datatypes::{DataType, SchemaRef};
48use arrow::record_batch::RecordBatch;
49use datafusion_common::cast::as_boolean_array;
50use datafusion_common::config::ConfigOptions;
51use datafusion_common::stats::Precision;
52use datafusion_common::{
53 DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
54};
55use datafusion_execution::TaskContext;
56use datafusion_expr::Operator;
57use datafusion_physical_expr::equivalence::ProjectionMapping;
58use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
59use datafusion_physical_expr::intervals::utils::check_support;
60use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
61use datafusion_physical_expr::{
62 AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
63 conjunction, split_conjunction,
64};
65
66use datafusion_physical_expr_common::physical_expr::fmt_sql;
67use futures::stream::{Stream, StreamExt};
68use log::trace;
69
70const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
71const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
72
73#[derive(Debug, Clone)]
76pub struct FilterExec {
77 predicate: Arc<dyn PhysicalExpr>,
79 input: Arc<dyn ExecutionPlan>,
81 metrics: ExecutionPlanMetricsSet,
83 default_selectivity: u8,
85 cache: PlanProperties,
87 projection: Option<Vec<usize>>,
89 batch_size: usize,
91 fetch: Option<usize>,
93}
94
95impl FilterExec {
96 #[expect(clippy::needless_pass_by_value)]
98 pub fn try_new(
99 predicate: Arc<dyn PhysicalExpr>,
100 input: Arc<dyn ExecutionPlan>,
101 ) -> Result<Self> {
102 match predicate.data_type(input.schema().as_ref())? {
103 DataType::Boolean => {
104 let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
105 let cache = Self::compute_properties(
106 &input,
107 &predicate,
108 default_selectivity,
109 None,
110 )?;
111 Ok(Self {
112 predicate,
113 input: Arc::clone(&input),
114 metrics: ExecutionPlanMetricsSet::new(),
115 default_selectivity,
116 cache,
117 projection: None,
118 batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119 fetch: None,
120 })
121 }
122 other => {
123 plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
124 }
125 }
126 }
127
128 pub fn with_default_selectivity(
129 mut self,
130 default_selectivity: u8,
131 ) -> Result<Self, DataFusionError> {
132 if default_selectivity > 100 {
133 return plan_err!(
134 "Default filter selectivity value needs to be less than or equal to 100"
135 );
136 }
137 self.default_selectivity = default_selectivity;
138 Ok(self)
139 }
140
141 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
143 can_project(&self.schema(), projection.as_ref())?;
145
146 let projection = match projection {
147 Some(projection) => match &self.projection {
148 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
149 None => Some(projection),
150 },
151 None => None,
152 };
153
154 let cache = Self::compute_properties(
155 &self.input,
156 &self.predicate,
157 self.default_selectivity,
158 projection.as_ref(),
159 )?;
160 Ok(Self {
161 predicate: Arc::clone(&self.predicate),
162 input: Arc::clone(&self.input),
163 metrics: self.metrics.clone(),
164 default_selectivity: self.default_selectivity,
165 cache,
166 projection,
167 batch_size: self.batch_size,
168 fetch: self.fetch,
169 })
170 }
171
172 pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
173 Ok(Self {
174 predicate: Arc::clone(&self.predicate),
175 input: Arc::clone(&self.input),
176 metrics: self.metrics.clone(),
177 default_selectivity: self.default_selectivity,
178 cache: self.cache.clone(),
179 projection: self.projection.clone(),
180 batch_size,
181 fetch: self.fetch,
182 })
183 }
184
185 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
187 &self.predicate
188 }
189
190 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
192 &self.input
193 }
194
195 pub fn default_selectivity(&self) -> u8 {
197 self.default_selectivity
198 }
199
200 pub fn projection(&self) -> Option<&Vec<usize>> {
202 self.projection.as_ref()
203 }
204
205 fn statistics_helper(
207 schema: &SchemaRef,
208 input_stats: Statistics,
209 predicate: &Arc<dyn PhysicalExpr>,
210 default_selectivity: u8,
211 ) -> Result<Statistics> {
212 if !check_support(predicate, schema) {
213 let selectivity = default_selectivity as f64 / 100.0;
214 let mut stats = input_stats.to_inexact();
215 stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
216 stats.total_byte_size = stats
217 .total_byte_size
218 .with_estimated_selectivity(selectivity);
219 return Ok(stats);
220 }
221
222 let num_rows = input_stats.num_rows;
223 let total_byte_size = input_stats.total_byte_size;
224 let input_analysis_ctx =
225 AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
226
227 let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
228
229 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
231 let num_rows = num_rows.with_estimated_selectivity(selectivity);
232 let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
233
234 let column_statistics = collect_new_statistics(
235 &input_stats.column_statistics,
236 analysis_ctx.boundaries,
237 );
238 Ok(Statistics {
239 num_rows,
240 total_byte_size,
241 column_statistics,
242 })
243 }
244
245 fn extend_constants(
246 input: &Arc<dyn ExecutionPlan>,
247 predicate: &Arc<dyn PhysicalExpr>,
248 ) -> Vec<ConstExpr> {
249 let mut res_constants = Vec::new();
250 let input_eqs = input.equivalence_properties();
251
252 let conjunctions = split_conjunction(predicate);
253 for conjunction in conjunctions {
254 if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
255 && binary.op() == &Operator::Eq
256 {
257 if input_eqs.is_expr_constant(binary.left()).is_some() {
259 let across = input_eqs
260 .is_expr_constant(binary.right())
261 .unwrap_or_default();
262 res_constants
263 .push(ConstExpr::new(Arc::clone(binary.right()), across));
264 } else if input_eqs.is_expr_constant(binary.right()).is_some() {
265 let across = input_eqs
266 .is_expr_constant(binary.left())
267 .unwrap_or_default();
268 res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
269 }
270 }
271 }
272 res_constants
273 }
274 fn compute_properties(
276 input: &Arc<dyn ExecutionPlan>,
277 predicate: &Arc<dyn PhysicalExpr>,
278 default_selectivity: u8,
279 projection: Option<&Vec<usize>>,
280 ) -> Result<PlanProperties> {
281 let schema = input.schema();
284 let stats = Self::statistics_helper(
285 &schema,
286 input.partition_statistics(None)?,
287 predicate,
288 default_selectivity,
289 )?;
290 let mut eq_properties = input.equivalence_properties().clone();
291 let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
292 for (lhs, rhs) in equal_pairs {
293 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
294 }
295 let constants = collect_columns(predicate)
298 .into_iter()
299 .filter(|column| stats.column_statistics[column.index()].is_singleton())
300 .map(|column| {
301 let value = stats.column_statistics[column.index()]
302 .min_value
303 .get_value();
304 let expr = Arc::new(column) as _;
305 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
306 });
307 eq_properties.add_constants(constants)?;
309 eq_properties.add_constants(Self::extend_constants(input, predicate))?;
312
313 let mut output_partitioning = input.output_partitioning().clone();
314 if let Some(projection) = projection {
316 let schema = eq_properties.schema();
317 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
318 let out_schema = project_schema(schema, Some(projection))?;
319 output_partitioning =
320 output_partitioning.project(&projection_mapping, &eq_properties);
321 eq_properties = eq_properties.project(&projection_mapping, out_schema);
322 }
323
324 Ok(PlanProperties::new(
325 eq_properties,
326 output_partitioning,
327 input.pipeline_behavior(),
328 input.boundedness(),
329 ))
330 }
331}
332
333impl DisplayAs for FilterExec {
334 fn fmt_as(
335 &self,
336 t: DisplayFormatType,
337 f: &mut std::fmt::Formatter,
338 ) -> std::fmt::Result {
339 match t {
340 DisplayFormatType::Default | DisplayFormatType::Verbose => {
341 let display_projections = if let Some(projection) =
342 self.projection.as_ref()
343 {
344 format!(
345 ", projection=[{}]",
346 projection
347 .iter()
348 .map(|index| format!(
349 "{}@{}",
350 self.input.schema().fields().get(*index).unwrap().name(),
351 index
352 ))
353 .collect::<Vec<_>>()
354 .join(", ")
355 )
356 } else {
357 "".to_string()
358 };
359 let fetch = self
360 .fetch
361 .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
362 write!(
363 f,
364 "FilterExec: {}{}{}",
365 self.predicate, display_projections, fetch
366 )
367 }
368 DisplayFormatType::TreeRender => {
369 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
370 }
371 }
372 }
373}
374
375impl ExecutionPlan for FilterExec {
376 fn name(&self) -> &'static str {
377 "FilterExec"
378 }
379
380 fn as_any(&self) -> &dyn Any {
382 self
383 }
384
385 fn properties(&self) -> &PlanProperties {
386 &self.cache
387 }
388
389 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
390 vec![&self.input]
391 }
392
393 fn maintains_input_order(&self) -> Vec<bool> {
394 vec![true]
396 }
397
398 fn with_new_children(
399 self: Arc<Self>,
400 mut children: Vec<Arc<dyn ExecutionPlan>>,
401 ) -> Result<Arc<dyn ExecutionPlan>> {
402 FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
403 .and_then(|e| {
404 let selectivity = e.default_selectivity();
405 e.with_default_selectivity(selectivity)
406 })
407 .and_then(|e| e.with_projection(self.projection().cloned()))
408 .map(|e| e.with_fetch(self.fetch).unwrap())
409 }
410
411 fn execute(
412 &self,
413 partition: usize,
414 context: Arc<TaskContext>,
415 ) -> Result<SendableRecordBatchStream> {
416 trace!(
417 "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
418 partition,
419 context.session_id(),
420 context.task_id()
421 );
422 let metrics = FilterExecMetrics::new(&self.metrics, partition);
423 Ok(Box::pin(FilterExecStream {
424 schema: self.schema(),
425 predicate: Arc::clone(&self.predicate),
426 input: self.input.execute(partition, context)?,
427 metrics,
428 projection: self.projection.clone(),
429 batch_coalescer: LimitedBatchCoalescer::new(
430 self.schema(),
431 self.batch_size,
432 self.fetch,
433 ),
434 }))
435 }
436
437 fn metrics(&self) -> Option<MetricsSet> {
438 Some(self.metrics.clone_inner())
439 }
440
441 fn statistics(&self) -> Result<Statistics> {
444 self.partition_statistics(None)
445 }
446
447 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
448 let input_stats = self.input.partition_statistics(partition)?;
449 let schema = self.schema();
450 let stats = Self::statistics_helper(
451 &schema,
452 input_stats,
453 self.predicate(),
454 self.default_selectivity,
455 )?;
456 Ok(stats.project(self.projection.as_ref()))
457 }
458
459 fn cardinality_effect(&self) -> CardinalityEffect {
460 CardinalityEffect::LowerEqual
461 }
462
463 fn try_swapping_with_projection(
466 &self,
467 projection: &ProjectionExec,
468 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
469 if projection.expr().len() < projection.input().schema().fields().len() {
471 if let Some(new_predicate) =
473 update_expr(self.predicate(), projection.expr(), false)?
474 {
475 return FilterExec::try_new(
476 new_predicate,
477 make_with_child(projection, self.input())?,
478 )
479 .and_then(|e| {
480 let selectivity = self.default_selectivity();
481 e.with_default_selectivity(selectivity)
482 })
483 .map(|e| Some(Arc::new(e) as _));
484 }
485 }
486 try_embed_projection(projection, self)
487 }
488
489 fn gather_filters_for_pushdown(
490 &self,
491 phase: FilterPushdownPhase,
492 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
493 _config: &ConfigOptions,
494 ) -> Result<FilterDescription> {
495 if !matches!(phase, FilterPushdownPhase::Pre) {
496 let child =
498 ChildFilterDescription::from_child(&parent_filters, self.input())?;
499 return Ok(FilterDescription::new().with_child(child));
500 }
501
502 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
503 .with_self_filters(
504 split_conjunction(&self.predicate)
505 .into_iter()
506 .cloned()
507 .collect(),
508 );
509
510 Ok(FilterDescription::new().with_child(child))
511 }
512
513 fn handle_child_pushdown_result(
514 &self,
515 phase: FilterPushdownPhase,
516 child_pushdown_result: ChildPushdownResult,
517 _config: &ConfigOptions,
518 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
519 if !matches!(phase, FilterPushdownPhase::Pre) {
520 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
521 }
522 let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
524 child_pushdown_result
525 .parent_filters
526 .iter()
527 .filter_map(|f| {
528 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
529 })
530 .collect();
531
532 if self.projection.is_some() {
536 let input_schema = self.input().schema();
537 unsupported_parent_filters = unsupported_parent_filters
538 .into_iter()
539 .map(|expr| reassign_expr_columns(expr, &input_schema))
540 .collect::<Result<Vec<_>>>()?;
541 }
542
543 let unsupported_self_filters = child_pushdown_result
544 .self_filters
545 .first()
546 .expect("we have exactly one child")
547 .iter()
548 .filter_map(|f| match f.discriminant {
549 PushedDown::Yes => None,
550 PushedDown::No => Some(&f.predicate),
551 })
552 .cloned();
553
554 let unhandled_filters = unsupported_parent_filters
555 .into_iter()
556 .chain(unsupported_self_filters)
557 .collect_vec();
558
559 let filter_input = Arc::clone(self.input());
561 let new_predicate = conjunction(unhandled_filters);
562 let updated_node = if new_predicate.eq(&lit(true)) {
563 match self.projection() {
565 Some(projection_indices) => {
566 let filter_child_schema = filter_input.schema();
567 let proj_exprs = projection_indices
568 .iter()
569 .map(|p| {
570 let field = filter_child_schema.field(*p).clone();
571 ProjectionExpr {
572 expr: Arc::new(Column::new(field.name(), *p))
573 as Arc<dyn PhysicalExpr>,
574 alias: field.name().to_string(),
575 }
576 })
577 .collect::<Vec<_>>();
578 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
579 as Arc<dyn ExecutionPlan>)
580 }
581 None => {
582 Some(filter_input)
584 }
585 }
586 } else if new_predicate.eq(&self.predicate) {
587 None
589 } else {
590 let new = FilterExec {
592 predicate: Arc::clone(&new_predicate),
593 input: Arc::clone(&filter_input),
594 metrics: self.metrics.clone(),
595 default_selectivity: self.default_selectivity,
596 cache: Self::compute_properties(
597 &filter_input,
598 &new_predicate,
599 self.default_selectivity,
600 self.projection.as_ref(),
601 )?,
602 projection: self.projection.clone(),
603 batch_size: self.batch_size,
604 fetch: self.fetch,
605 };
606 Some(Arc::new(new) as _)
607 };
608
609 Ok(FilterPushdownPropagation {
610 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
611 updated_node,
612 })
613 }
614
615 fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
616 Some(Arc::new(Self {
617 predicate: Arc::clone(&self.predicate),
618 input: Arc::clone(&self.input),
619 metrics: self.metrics.clone(),
620 default_selectivity: self.default_selectivity,
621 cache: self.cache.clone(),
622 projection: self.projection.clone(),
623 batch_size: self.batch_size,
624 fetch,
625 }))
626 }
627}
628
629impl EmbeddedProjection for FilterExec {
630 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
631 self.with_projection(projection)
632 }
633}
634
635fn collect_new_statistics(
640 input_column_stats: &[ColumnStatistics],
641 analysis_boundaries: Vec<ExprBoundaries>,
642) -> Vec<ColumnStatistics> {
643 analysis_boundaries
644 .into_iter()
645 .enumerate()
646 .map(
647 |(
648 idx,
649 ExprBoundaries {
650 interval,
651 distinct_count,
652 ..
653 },
654 )| {
655 let Some(interval) = interval else {
656 return ColumnStatistics {
658 null_count: Precision::Exact(0),
659 max_value: Precision::Exact(ScalarValue::Null),
660 min_value: Precision::Exact(ScalarValue::Null),
661 sum_value: Precision::Exact(ScalarValue::Null),
662 distinct_count: Precision::Exact(0),
663 byte_size: input_column_stats[idx].byte_size,
664 };
665 };
666 let (lower, upper) = interval.into_bounds();
667 let (min_value, max_value) = if lower.eq(&upper) {
668 (Precision::Exact(lower), Precision::Exact(upper))
669 } else {
670 (Precision::Inexact(lower), Precision::Inexact(upper))
671 };
672 ColumnStatistics {
673 null_count: input_column_stats[idx].null_count.to_inexact(),
674 max_value,
675 min_value,
676 sum_value: Precision::Absent,
677 distinct_count: distinct_count.to_inexact(),
678 byte_size: input_column_stats[idx].byte_size,
679 }
680 },
681 )
682 .collect()
683}
684
685struct FilterExecStream {
688 schema: SchemaRef,
690 predicate: Arc<dyn PhysicalExpr>,
692 input: SendableRecordBatchStream,
694 metrics: FilterExecMetrics,
696 projection: Option<Vec<usize>>,
698 batch_coalescer: LimitedBatchCoalescer,
700}
701
702struct FilterExecMetrics {
704 baseline_metrics: BaselineMetrics,
706 selectivity: RatioMetrics,
708 }
711
712impl FilterExecMetrics {
713 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
714 Self {
715 baseline_metrics: BaselineMetrics::new(metrics, partition),
716 selectivity: MetricBuilder::new(metrics)
717 .with_type(MetricType::SUMMARY)
718 .ratio_metrics("selectivity", partition),
719 }
720 }
721}
722
723pub fn batch_filter(
724 batch: &RecordBatch,
725 predicate: &Arc<dyn PhysicalExpr>,
726) -> Result<RecordBatch> {
727 filter_and_project(batch, predicate, None)
728}
729
730fn filter_and_project(
731 batch: &RecordBatch,
732 predicate: &Arc<dyn PhysicalExpr>,
733 projection: Option<&Vec<usize>>,
734) -> Result<RecordBatch> {
735 predicate
736 .evaluate(batch)
737 .and_then(|v| v.into_array(batch.num_rows()))
738 .and_then(|array| {
739 Ok(match (as_boolean_array(&array), projection) {
740 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
742 (Ok(filter_array), Some(projection)) => {
743 let projected_batch = batch.project(projection)?;
744 filter_record_batch(&projected_batch, filter_array)?
745 }
746 (Err(_), _) => {
747 return internal_err!(
748 "Cannot create filter_array from non-boolean predicates"
749 );
750 }
751 })
752 })
753}
754
755impl Stream for FilterExecStream {
756 type Item = Result<RecordBatch>;
757
758 fn poll_next(
759 mut self: Pin<&mut Self>,
760 cx: &mut Context<'_>,
761 ) -> Poll<Option<Self::Item>> {
762 let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
763 loop {
764 if let Some(batch) = self.batch_coalescer.next_completed_batch() {
766 self.metrics.selectivity.add_part(batch.num_rows());
767 let poll = Poll::Ready(Some(Ok(batch)));
768 return self.metrics.baseline_metrics.record_poll(poll);
769 }
770
771 if self.batch_coalescer.is_finished() {
772 return Poll::Ready(None);
774 }
775
776 match ready!(self.input.poll_next_unpin(cx)) {
778 None => {
779 self.batch_coalescer.finish()?;
780 }
782 Some(Ok(batch)) => {
783 let timer = elapsed_compute.timer();
784 let status = self.predicate.as_ref()
785 .evaluate(&batch)
786 .and_then(|v| v.into_array(batch.num_rows()))
787 .and_then(|array| {
788 Ok(match self.projection {
789 Some(ref projection) => {
790 let projected_batch = batch.project(projection)?;
791 (array, projected_batch)
792 },
793 None => (array, batch)
794 })
795 }).and_then(|(array, batch)| {
796 match as_boolean_array(&array) {
797 Ok(filter_array) => {
798 self.metrics.selectivity.add_total(batch.num_rows());
799 let batch = filter_record_batch(&batch, filter_array)?;
801 let state = self.batch_coalescer.push_batch(batch)?;
802 Ok(state)
803 }
804 Err(_) => {
805 internal_err!(
806 "Cannot create filter_array from non-boolean predicates"
807 )
808 }
809 }
810 })?;
811 timer.done();
812
813 match status {
814 PushBatchStatus::Continue => {
815 }
817 PushBatchStatus::LimitReached => {
818 self.batch_coalescer.finish()?;
820 }
822 }
823 }
824
825 other => return Poll::Ready(other),
827 }
828 }
829 }
830
831 fn size_hint(&self) -> (usize, Option<usize>) {
832 self.input.size_hint()
834 }
835}
836impl RecordBatchStream for FilterExecStream {
837 fn schema(&self) -> SchemaRef {
838 Arc::clone(&self.schema)
839 }
840}
841
842#[deprecated(
844 since = "51.0.0",
845 note = "This function will be internal in the future"
846)]
847pub fn collect_columns_from_predicate(
848 predicate: &'_ Arc<dyn PhysicalExpr>,
849) -> EqualAndNonEqual<'_> {
850 collect_columns_from_predicate_inner(predicate)
851}
852
853fn collect_columns_from_predicate_inner(
854 predicate: &'_ Arc<dyn PhysicalExpr>,
855) -> EqualAndNonEqual<'_> {
856 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
857 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
858
859 let predicates = split_conjunction(predicate);
860 predicates.into_iter().for_each(|p| {
861 if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
862 match binary.op() {
863 Operator::Eq => {
864 eq_predicate_columns.push((binary.left(), binary.right()))
865 }
866 Operator::NotEq => {
867 ne_predicate_columns.push((binary.left(), binary.right()))
868 }
869 _ => {}
870 }
871 }
872 });
873
874 (eq_predicate_columns, ne_predicate_columns)
875}
876
877pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
879
880pub type EqualAndNonEqual<'a> =
882 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
883
884#[cfg(test)]
885mod tests {
886 use super::*;
887 use crate::empty::EmptyExec;
888 use crate::expressions::*;
889 use crate::test;
890 use crate::test::exec::StatisticsExec;
891 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
892 use datafusion_common::ScalarValue;
893
894 #[tokio::test]
895 async fn collect_columns_predicates() -> Result<()> {
896 let schema = test::aggr_test_schema();
897 let predicate: Arc<dyn PhysicalExpr> = binary(
898 binary(
899 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
900 Operator::And,
901 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
902 &schema,
903 )?,
904 Operator::And,
905 binary(
906 binary(
907 col("c2", &schema)?,
908 Operator::Eq,
909 col("c9", &schema)?,
910 &schema,
911 )?,
912 Operator::And,
913 binary(
914 col("c1", &schema)?,
915 Operator::NotEq,
916 col("c13", &schema)?,
917 &schema,
918 )?,
919 &schema,
920 )?,
921 &schema,
922 )?;
923
924 let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
925 assert_eq!(2, equal_pairs.len());
926 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
927 assert!(equal_pairs[0].1.eq(&lit(4u32)));
928
929 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
930 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
931
932 assert_eq!(1, ne_pairs.len());
933 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
934 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
935
936 Ok(())
937 }
938
939 #[tokio::test]
940 async fn test_filter_statistics_basic_expr() -> Result<()> {
941 let bytes_per_row = 4;
944 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
945 let input = Arc::new(StatisticsExec::new(
946 Statistics {
947 num_rows: Precision::Inexact(100),
948 total_byte_size: Precision::Inexact(100 * bytes_per_row),
949 column_statistics: vec![ColumnStatistics {
950 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
951 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
952 ..Default::default()
953 }],
954 },
955 schema.clone(),
956 ));
957
958 let predicate: Arc<dyn PhysicalExpr> =
960 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
961
962 let filter: Arc<dyn ExecutionPlan> =
964 Arc::new(FilterExec::try_new(predicate, input)?);
965
966 let statistics = filter.partition_statistics(None)?;
967 assert_eq!(statistics.num_rows, Precision::Inexact(25));
968 assert_eq!(
969 statistics.total_byte_size,
970 Precision::Inexact(25 * bytes_per_row)
971 );
972 assert_eq!(
973 statistics.column_statistics,
974 vec![ColumnStatistics {
975 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
976 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
977 ..Default::default()
978 }]
979 );
980
981 Ok(())
982 }
983
984 #[tokio::test]
985 async fn test_filter_statistics_column_level_nested() -> Result<()> {
986 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
989 let input = Arc::new(StatisticsExec::new(
990 Statistics {
991 num_rows: Precision::Inexact(100),
992 column_statistics: vec![ColumnStatistics {
993 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
994 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
995 ..Default::default()
996 }],
997 total_byte_size: Precision::Absent,
998 },
999 schema.clone(),
1000 ));
1001
1002 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1004 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1005 input,
1006 )?);
1007
1008 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1012 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1013 sub_filter,
1014 )?);
1015
1016 let statistics = filter.partition_statistics(None)?;
1017 assert_eq!(statistics.num_rows, Precision::Inexact(16));
1018 assert_eq!(
1019 statistics.column_statistics,
1020 vec![ColumnStatistics {
1021 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1022 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1023 ..Default::default()
1024 }]
1025 );
1026
1027 Ok(())
1028 }
1029
1030 #[tokio::test]
1031 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1032 let schema = Schema::new(vec![
1036 Field::new("a", DataType::Int32, false),
1037 Field::new("b", DataType::Int32, false),
1038 ]);
1039 let input = Arc::new(StatisticsExec::new(
1040 Statistics {
1041 num_rows: Precision::Inexact(100),
1042 column_statistics: vec![
1043 ColumnStatistics {
1044 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1045 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1046 ..Default::default()
1047 },
1048 ColumnStatistics {
1049 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1050 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1051 ..Default::default()
1052 },
1053 ],
1054 total_byte_size: Precision::Absent,
1055 },
1056 schema.clone(),
1057 ));
1058
1059 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1061 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1062 input,
1063 )?);
1064
1065 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1067 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1068 a_lte_25,
1069 )?);
1070
1071 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1073 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1074 b_gt_5,
1075 )?);
1076 let statistics = filter.partition_statistics(None)?;
1077 assert_eq!(statistics.num_rows, Precision::Inexact(2));
1084 assert_eq!(
1085 statistics.column_statistics,
1086 vec![
1087 ColumnStatistics {
1088 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1089 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1090 ..Default::default()
1091 },
1092 ColumnStatistics {
1093 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1094 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1095 ..Default::default()
1096 }
1097 ]
1098 );
1099
1100 Ok(())
1101 }
1102
1103 #[tokio::test]
1104 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1105 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1108 let input = Arc::new(StatisticsExec::new(
1109 Statistics::new_unknown(&schema),
1110 schema.clone(),
1111 ));
1112
1113 let predicate: Arc<dyn PhysicalExpr> =
1115 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1116
1117 let filter: Arc<dyn ExecutionPlan> =
1119 Arc::new(FilterExec::try_new(predicate, input)?);
1120
1121 let statistics = filter.partition_statistics(None)?;
1122 assert_eq!(statistics.num_rows, Precision::Absent);
1123
1124 Ok(())
1125 }
1126
1127 #[tokio::test]
1128 async fn test_filter_statistics_multiple_columns() -> Result<()> {
1129 let schema = Schema::new(vec![
1134 Field::new("a", DataType::Int32, false),
1135 Field::new("b", DataType::Int32, false),
1136 Field::new("c", DataType::Float32, false),
1137 ]);
1138 let input = Arc::new(StatisticsExec::new(
1139 Statistics {
1140 num_rows: Precision::Inexact(1000),
1141 total_byte_size: Precision::Inexact(4000),
1142 column_statistics: vec![
1143 ColumnStatistics {
1144 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1145 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1146 ..Default::default()
1147 },
1148 ColumnStatistics {
1149 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1150 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1151 ..Default::default()
1152 },
1153 ColumnStatistics {
1154 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1155 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1156 ..Default::default()
1157 },
1158 ],
1159 },
1160 schema,
1161 ));
1162 let predicate = Arc::new(BinaryExpr::new(
1164 Arc::new(BinaryExpr::new(
1165 Arc::new(Column::new("a", 0)),
1166 Operator::LtEq,
1167 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1168 )),
1169 Operator::And,
1170 Arc::new(BinaryExpr::new(
1171 Arc::new(BinaryExpr::new(
1172 Arc::new(Column::new("b", 1)),
1173 Operator::Eq,
1174 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1175 )),
1176 Operator::And,
1177 Arc::new(BinaryExpr::new(
1178 Arc::new(BinaryExpr::new(
1179 Arc::new(Column::new("c", 2)),
1180 Operator::LtEq,
1181 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1182 )),
1183 Operator::And,
1184 Arc::new(BinaryExpr::new(
1185 Arc::new(Column::new("a", 0)),
1186 Operator::Gt,
1187 Arc::new(Column::new("b", 1)),
1188 )),
1189 )),
1190 )),
1191 ));
1192 let filter: Arc<dyn ExecutionPlan> =
1193 Arc::new(FilterExec::try_new(predicate, input)?);
1194 let statistics = filter.partition_statistics(None)?;
1195 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1199 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1200 let exp_col_stats = vec![
1201 ColumnStatistics {
1202 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1203 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1204 ..Default::default()
1205 },
1206 ColumnStatistics {
1207 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1208 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1209 ..Default::default()
1210 },
1211 ColumnStatistics {
1212 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1213 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1214 ..Default::default()
1215 },
1216 ];
1217 let _ = exp_col_stats
1218 .into_iter()
1219 .zip(statistics.column_statistics)
1220 .map(|(expected, actual)| {
1221 if let Some(val) = actual.min_value.get_value() {
1222 if val.data_type().is_floating() {
1223 let actual_min = actual.min_value.get_value().unwrap();
1226 let actual_max = actual.max_value.get_value().unwrap();
1227 let expected_min = expected.min_value.get_value().unwrap();
1228 let expected_max = expected.max_value.get_value().unwrap();
1229 let eps = ScalarValue::Float32(Some(1e-6));
1230
1231 assert!(actual_min.sub(expected_min).unwrap() < eps);
1232 assert!(actual_min.sub(expected_min).unwrap() < eps);
1233
1234 assert!(actual_max.sub(expected_max).unwrap() < eps);
1235 assert!(actual_max.sub(expected_max).unwrap() < eps);
1236 } else {
1237 assert_eq!(actual, expected);
1238 }
1239 } else {
1240 assert_eq!(actual, expected);
1241 }
1242 });
1243
1244 Ok(())
1245 }
1246
1247 #[tokio::test]
1248 async fn test_filter_statistics_full_selective() -> Result<()> {
1249 let schema = Schema::new(vec![
1253 Field::new("a", DataType::Int32, false),
1254 Field::new("b", DataType::Int32, false),
1255 ]);
1256 let input = Arc::new(StatisticsExec::new(
1257 Statistics {
1258 num_rows: Precision::Inexact(1000),
1259 total_byte_size: Precision::Inexact(4000),
1260 column_statistics: vec![
1261 ColumnStatistics {
1262 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1263 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1264 ..Default::default()
1265 },
1266 ColumnStatistics {
1267 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1268 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1269 ..Default::default()
1270 },
1271 ],
1272 },
1273 schema,
1274 ));
1275 let predicate = Arc::new(BinaryExpr::new(
1277 Arc::new(BinaryExpr::new(
1278 Arc::new(Column::new("a", 0)),
1279 Operator::Lt,
1280 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1281 )),
1282 Operator::And,
1283 Arc::new(BinaryExpr::new(
1284 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1285 Operator::LtEq,
1286 Arc::new(Column::new("b", 1)),
1287 )),
1288 ));
1289 let expected = input.partition_statistics(None)?.column_statistics;
1291 let filter: Arc<dyn ExecutionPlan> =
1292 Arc::new(FilterExec::try_new(predicate, input)?);
1293 let statistics = filter.partition_statistics(None)?;
1294
1295 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1296 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1297 assert_eq!(statistics.column_statistics, expected);
1298
1299 Ok(())
1300 }
1301
1302 #[tokio::test]
1303 async fn test_filter_statistics_zero_selective() -> Result<()> {
1304 let schema = Schema::new(vec![
1308 Field::new("a", DataType::Int32, false),
1309 Field::new("b", DataType::Int32, false),
1310 ]);
1311 let input = Arc::new(StatisticsExec::new(
1312 Statistics {
1313 num_rows: Precision::Inexact(1000),
1314 total_byte_size: Precision::Inexact(4000),
1315 column_statistics: vec![
1316 ColumnStatistics {
1317 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1318 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1319 ..Default::default()
1320 },
1321 ColumnStatistics {
1322 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1323 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1324 ..Default::default()
1325 },
1326 ],
1327 },
1328 schema,
1329 ));
1330 let predicate = Arc::new(BinaryExpr::new(
1332 Arc::new(BinaryExpr::new(
1333 Arc::new(Column::new("a", 0)),
1334 Operator::Gt,
1335 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1336 )),
1337 Operator::And,
1338 Arc::new(BinaryExpr::new(
1339 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1340 Operator::LtEq,
1341 Arc::new(Column::new("b", 1)),
1342 )),
1343 ));
1344 let filter: Arc<dyn ExecutionPlan> =
1345 Arc::new(FilterExec::try_new(predicate, input)?);
1346 let statistics = filter.partition_statistics(None)?;
1347
1348 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1349 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1350 assert_eq!(
1351 statistics.column_statistics,
1352 vec![
1353 ColumnStatistics {
1354 min_value: Precision::Exact(ScalarValue::Null),
1355 max_value: Precision::Exact(ScalarValue::Null),
1356 sum_value: Precision::Exact(ScalarValue::Null),
1357 distinct_count: Precision::Exact(0),
1358 null_count: Precision::Exact(0),
1359 byte_size: Precision::Absent,
1360 },
1361 ColumnStatistics {
1362 min_value: Precision::Exact(ScalarValue::Null),
1363 max_value: Precision::Exact(ScalarValue::Null),
1364 sum_value: Precision::Exact(ScalarValue::Null),
1365 distinct_count: Precision::Exact(0),
1366 null_count: Precision::Exact(0),
1367 byte_size: Precision::Absent,
1368 },
1369 ]
1370 );
1371
1372 Ok(())
1373 }
1374
1375 #[tokio::test]
1376 async fn test_filter_statistics_more_inputs() -> Result<()> {
1377 let schema = Schema::new(vec![
1378 Field::new("a", DataType::Int32, false),
1379 Field::new("b", DataType::Int32, false),
1380 ]);
1381 let input = Arc::new(StatisticsExec::new(
1382 Statistics {
1383 num_rows: Precision::Inexact(1000),
1384 total_byte_size: Precision::Inexact(4000),
1385 column_statistics: vec![
1386 ColumnStatistics {
1387 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1388 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1389 ..Default::default()
1390 },
1391 ColumnStatistics {
1392 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1393 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1394 ..Default::default()
1395 },
1396 ],
1397 },
1398 schema,
1399 ));
1400 let predicate = Arc::new(BinaryExpr::new(
1402 Arc::new(Column::new("a", 0)),
1403 Operator::Lt,
1404 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1405 ));
1406 let filter: Arc<dyn ExecutionPlan> =
1407 Arc::new(FilterExec::try_new(predicate, input)?);
1408 let statistics = filter.partition_statistics(None)?;
1409
1410 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1411 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1412 assert_eq!(
1413 statistics.column_statistics,
1414 vec![
1415 ColumnStatistics {
1416 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1417 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1418 ..Default::default()
1419 },
1420 ColumnStatistics {
1421 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1422 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1423 ..Default::default()
1424 },
1425 ]
1426 );
1427
1428 Ok(())
1429 }
1430
1431 #[tokio::test]
1432 async fn test_empty_input_statistics() -> Result<()> {
1433 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1434 let input = Arc::new(StatisticsExec::new(
1435 Statistics::new_unknown(&schema),
1436 schema,
1437 ));
1438 let predicate = Arc::new(BinaryExpr::new(
1440 Arc::new(BinaryExpr::new(
1441 Arc::new(Column::new("a", 0)),
1442 Operator::LtEq,
1443 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1444 )),
1445 Operator::And,
1446 Arc::new(BinaryExpr::new(
1447 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1448 Operator::LtEq,
1449 Arc::new(BinaryExpr::new(
1450 Arc::new(Column::new("a", 0)),
1451 Operator::Minus,
1452 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1453 )),
1454 )),
1455 ));
1456 let filter: Arc<dyn ExecutionPlan> =
1457 Arc::new(FilterExec::try_new(predicate, input)?);
1458 let filter_statistics = filter.partition_statistics(None)?;
1459
1460 let expected_filter_statistics = Statistics {
1461 num_rows: Precision::Absent,
1462 total_byte_size: Precision::Absent,
1463 column_statistics: vec![ColumnStatistics {
1464 null_count: Precision::Absent,
1465 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1466 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1467 sum_value: Precision::Absent,
1468 distinct_count: Precision::Absent,
1469 byte_size: Precision::Absent,
1470 }],
1471 };
1472
1473 assert_eq!(filter_statistics, expected_filter_statistics);
1474
1475 Ok(())
1476 }
1477
1478 #[tokio::test]
1479 async fn test_statistics_with_constant_column() -> Result<()> {
1480 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1481 let input = Arc::new(StatisticsExec::new(
1482 Statistics::new_unknown(&schema),
1483 schema,
1484 ));
1485 let predicate = Arc::new(BinaryExpr::new(
1487 Arc::new(Column::new("a", 0)),
1488 Operator::Eq,
1489 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1490 ));
1491 let filter: Arc<dyn ExecutionPlan> =
1492 Arc::new(FilterExec::try_new(predicate, input)?);
1493 let filter_statistics = filter.partition_statistics(None)?;
1494 assert!(filter_statistics.column_statistics[0].is_singleton());
1496
1497 Ok(())
1498 }
1499
1500 #[tokio::test]
1501 async fn test_validation_filter_selectivity() -> Result<()> {
1502 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1503 let input = Arc::new(StatisticsExec::new(
1504 Statistics::new_unknown(&schema),
1505 schema,
1506 ));
1507 let predicate = Arc::new(BinaryExpr::new(
1509 Arc::new(Column::new("a", 0)),
1510 Operator::Eq,
1511 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1512 ));
1513 let filter = FilterExec::try_new(predicate, input)?;
1514 assert!(filter.with_default_selectivity(120).is_err());
1515 Ok(())
1516 }
1517
1518 #[tokio::test]
1519 async fn test_custom_filter_selectivity() -> Result<()> {
1520 let schema =
1522 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1523 let input = Arc::new(StatisticsExec::new(
1524 Statistics {
1525 num_rows: Precision::Inexact(1000),
1526 total_byte_size: Precision::Inexact(4000),
1527 column_statistics: vec![ColumnStatistics {
1528 ..Default::default()
1529 }],
1530 },
1531 schema,
1532 ));
1533 let predicate = Arc::new(BinaryExpr::new(
1535 Arc::new(Column::new("a", 0)),
1536 Operator::Eq,
1537 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1538 ));
1539 let filter = FilterExec::try_new(predicate, input)?;
1540 let statistics = filter.partition_statistics(None)?;
1541 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1542 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1543 let filter = filter.with_default_selectivity(40)?;
1544 let statistics = filter.partition_statistics(None)?;
1545 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1546 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1547 Ok(())
1548 }
1549
1550 #[test]
1551 fn test_equivalence_properties_union_type() -> Result<()> {
1552 let union_type = DataType::Union(
1553 UnionFields::new(
1554 vec![0, 1],
1555 vec![
1556 Field::new("f1", DataType::Int32, true),
1557 Field::new("f2", DataType::Utf8, true),
1558 ],
1559 ),
1560 UnionMode::Sparse,
1561 );
1562
1563 let schema = Arc::new(Schema::new(vec![
1564 Field::new("c1", DataType::Int32, true),
1565 Field::new("c2", union_type, true),
1566 ]));
1567
1568 let exec = FilterExec::try_new(
1569 binary(
1570 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1571 Operator::And,
1572 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1573 &schema,
1574 )?,
1575 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1576 )?;
1577
1578 exec.partition_statistics(None).unwrap();
1579
1580 Ok(())
1581 }
1582
1583 #[test]
1584 fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
1585 let input_schema = Arc::new(Schema::new(vec![
1589 Field::new("a", DataType::Int32, false),
1590 Field::new("b", DataType::Utf8, false),
1591 Field::new("c", DataType::Float64, false),
1592 ]));
1593 let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
1594
1595 let predicate = Arc::new(BinaryExpr::new(
1597 Arc::new(Column::new("a", 0)),
1598 Operator::Gt,
1599 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1600 ));
1601 let filter =
1602 FilterExec::try_new(predicate, input)?.with_projection(Some(vec![2]))?;
1603
1604 let output_schema = filter.schema();
1606 assert_eq!(output_schema.fields().len(), 1);
1607 assert_eq!(output_schema.field(0).name(), "c");
1608
1609 let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
1611
1612 let config = ConfigOptions::new();
1613 let desc = filter.gather_filters_for_pushdown(
1614 FilterPushdownPhase::Post,
1615 vec![parent_filter],
1616 &config,
1617 )?;
1618
1619 let parent_filters = desc.parent_filters();
1622 assert_eq!(parent_filters.len(), 1); assert_eq!(parent_filters[0].len(), 1); let remapped = &parent_filters[0][0].predicate;
1625 let display = format!("{remapped}");
1626 assert_eq!(
1627 display, "c@2",
1628 "Post-phase parent filter column index must be remapped \
1629 from output schema (c@0) to input schema (c@2)"
1630 );
1631
1632 Ok(())
1633 }
1634}