1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27 RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::LimitedBatchCoalescer;
30use crate::coalesce::PushBatchStatus::LimitReached;
31use crate::common::can_project;
32use crate::execution_plan::CardinalityEffect;
33use crate::filter_pushdown::{
34 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
35 FilterPushdownPropagation, PushedDown, PushedDownPredicate,
36};
37use crate::metrics::{MetricBuilder, MetricType};
38use crate::projection::{
39 EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
40 try_embed_projection, update_expr,
41};
42use crate::{
43 DisplayFormatType, ExecutionPlan,
44 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
45};
46
47use arrow::compute::filter_record_batch;
48use arrow::datatypes::{DataType, SchemaRef};
49use arrow::record_batch::RecordBatch;
50use datafusion_common::cast::as_boolean_array;
51use datafusion_common::config::ConfigOptions;
52use datafusion_common::stats::Precision;
53use datafusion_common::{
54 DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
55};
56use datafusion_execution::TaskContext;
57use datafusion_expr::Operator;
58use datafusion_physical_expr::equivalence::ProjectionMapping;
59use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
60use datafusion_physical_expr::intervals::utils::check_support;
61use datafusion_physical_expr::utils::collect_columns;
62use datafusion_physical_expr::{
63 AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
64 conjunction, split_conjunction,
65};
66
67use datafusion_physical_expr_common::physical_expr::fmt_sql;
68use futures::stream::{Stream, StreamExt};
69use log::trace;
70
71const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
72const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
73
74#[derive(Debug, Clone)]
77pub struct FilterExec {
78 predicate: Arc<dyn PhysicalExpr>,
80 input: Arc<dyn ExecutionPlan>,
82 metrics: ExecutionPlanMetricsSet,
84 default_selectivity: u8,
86 cache: PlanProperties,
88 projection: Option<Vec<usize>>,
90 batch_size: usize,
92 fetch: Option<usize>,
94}
95
96impl FilterExec {
97 #[expect(clippy::needless_pass_by_value)]
99 pub fn try_new(
100 predicate: Arc<dyn PhysicalExpr>,
101 input: Arc<dyn ExecutionPlan>,
102 ) -> Result<Self> {
103 match predicate.data_type(input.schema().as_ref())? {
104 DataType::Boolean => {
105 let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
106 let cache = Self::compute_properties(
107 &input,
108 &predicate,
109 default_selectivity,
110 None,
111 )?;
112 Ok(Self {
113 predicate,
114 input: Arc::clone(&input),
115 metrics: ExecutionPlanMetricsSet::new(),
116 default_selectivity,
117 cache,
118 projection: None,
119 batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
120 fetch: None,
121 })
122 }
123 other => {
124 plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
125 }
126 }
127 }
128
129 pub fn with_default_selectivity(
130 mut self,
131 default_selectivity: u8,
132 ) -> Result<Self, DataFusionError> {
133 if default_selectivity > 100 {
134 return plan_err!(
135 "Default filter selectivity value needs to be less than or equal to 100"
136 );
137 }
138 self.default_selectivity = default_selectivity;
139 Ok(self)
140 }
141
142 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
144 can_project(&self.schema(), projection.as_ref())?;
146
147 let projection = match projection {
148 Some(projection) => match &self.projection {
149 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
150 None => Some(projection),
151 },
152 None => None,
153 };
154
155 let cache = Self::compute_properties(
156 &self.input,
157 &self.predicate,
158 self.default_selectivity,
159 projection.as_ref(),
160 )?;
161 Ok(Self {
162 predicate: Arc::clone(&self.predicate),
163 input: Arc::clone(&self.input),
164 metrics: self.metrics.clone(),
165 default_selectivity: self.default_selectivity,
166 cache,
167 projection,
168 batch_size: self.batch_size,
169 fetch: self.fetch,
170 })
171 }
172
173 pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
174 Ok(Self {
175 predicate: Arc::clone(&self.predicate),
176 input: Arc::clone(&self.input),
177 metrics: self.metrics.clone(),
178 default_selectivity: self.default_selectivity,
179 cache: self.cache.clone(),
180 projection: self.projection.clone(),
181 batch_size,
182 fetch: self.fetch,
183 })
184 }
185
186 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
188 &self.predicate
189 }
190
191 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
193 &self.input
194 }
195
196 pub fn default_selectivity(&self) -> u8 {
198 self.default_selectivity
199 }
200
201 pub fn projection(&self) -> Option<&Vec<usize>> {
203 self.projection.as_ref()
204 }
205
206 fn statistics_helper(
208 schema: &SchemaRef,
209 input_stats: Statistics,
210 predicate: &Arc<dyn PhysicalExpr>,
211 default_selectivity: u8,
212 ) -> Result<Statistics> {
213 if !check_support(predicate, schema) {
214 let selectivity = default_selectivity as f64 / 100.0;
215 let mut stats = input_stats.to_inexact();
216 stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
217 stats.total_byte_size = stats
218 .total_byte_size
219 .with_estimated_selectivity(selectivity);
220 return Ok(stats);
221 }
222
223 let num_rows = input_stats.num_rows;
224 let total_byte_size = input_stats.total_byte_size;
225 let input_analysis_ctx =
226 AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
227
228 let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
229
230 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
232 let num_rows = num_rows.with_estimated_selectivity(selectivity);
233 let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
234
235 let column_statistics = collect_new_statistics(
236 &input_stats.column_statistics,
237 analysis_ctx.boundaries,
238 );
239 Ok(Statistics {
240 num_rows,
241 total_byte_size,
242 column_statistics,
243 })
244 }
245
246 fn extend_constants(
247 input: &Arc<dyn ExecutionPlan>,
248 predicate: &Arc<dyn PhysicalExpr>,
249 ) -> Vec<ConstExpr> {
250 let mut res_constants = Vec::new();
251 let input_eqs = input.equivalence_properties();
252
253 let conjunctions = split_conjunction(predicate);
254 for conjunction in conjunctions {
255 if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
256 && binary.op() == &Operator::Eq
257 {
258 if input_eqs.is_expr_constant(binary.left()).is_some() {
260 let across = input_eqs
261 .is_expr_constant(binary.right())
262 .unwrap_or_default();
263 res_constants
264 .push(ConstExpr::new(Arc::clone(binary.right()), across));
265 } else if input_eqs.is_expr_constant(binary.right()).is_some() {
266 let across = input_eqs
267 .is_expr_constant(binary.left())
268 .unwrap_or_default();
269 res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
270 }
271 }
272 }
273 res_constants
274 }
275 fn compute_properties(
277 input: &Arc<dyn ExecutionPlan>,
278 predicate: &Arc<dyn PhysicalExpr>,
279 default_selectivity: u8,
280 projection: Option<&Vec<usize>>,
281 ) -> Result<PlanProperties> {
282 let schema = input.schema();
285 let stats = Self::statistics_helper(
286 &schema,
287 input.partition_statistics(None)?,
288 predicate,
289 default_selectivity,
290 )?;
291 let mut eq_properties = input.equivalence_properties().clone();
292 let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
293 for (lhs, rhs) in equal_pairs {
294 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
295 }
296 let constants = collect_columns(predicate)
299 .into_iter()
300 .filter(|column| stats.column_statistics[column.index()].is_singleton())
301 .map(|column| {
302 let value = stats.column_statistics[column.index()]
303 .min_value
304 .get_value();
305 let expr = Arc::new(column) as _;
306 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
307 });
308 eq_properties.add_constants(constants)?;
310 eq_properties.add_constants(Self::extend_constants(input, predicate))?;
313
314 let mut output_partitioning = input.output_partitioning().clone();
315 if let Some(projection) = projection {
317 let schema = eq_properties.schema();
318 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
319 let out_schema = project_schema(schema, Some(projection))?;
320 output_partitioning =
321 output_partitioning.project(&projection_mapping, &eq_properties);
322 eq_properties = eq_properties.project(&projection_mapping, out_schema);
323 }
324
325 Ok(PlanProperties::new(
326 eq_properties,
327 output_partitioning,
328 input.pipeline_behavior(),
329 input.boundedness(),
330 ))
331 }
332}
333
334impl DisplayAs for FilterExec {
335 fn fmt_as(
336 &self,
337 t: DisplayFormatType,
338 f: &mut std::fmt::Formatter,
339 ) -> std::fmt::Result {
340 match t {
341 DisplayFormatType::Default | DisplayFormatType::Verbose => {
342 let display_projections = if let Some(projection) =
343 self.projection.as_ref()
344 {
345 format!(
346 ", projection=[{}]",
347 projection
348 .iter()
349 .map(|index| format!(
350 "{}@{}",
351 self.input.schema().fields().get(*index).unwrap().name(),
352 index
353 ))
354 .collect::<Vec<_>>()
355 .join(", ")
356 )
357 } else {
358 "".to_string()
359 };
360 let fetch = self
361 .fetch
362 .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
363 write!(
364 f,
365 "FilterExec: {}{}{}",
366 self.predicate, display_projections, fetch
367 )
368 }
369 DisplayFormatType::TreeRender => {
370 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
371 }
372 }
373 }
374}
375
376impl ExecutionPlan for FilterExec {
377 fn name(&self) -> &'static str {
378 "FilterExec"
379 }
380
381 fn as_any(&self) -> &dyn Any {
383 self
384 }
385
386 fn properties(&self) -> &PlanProperties {
387 &self.cache
388 }
389
390 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
391 vec![&self.input]
392 }
393
394 fn maintains_input_order(&self) -> Vec<bool> {
395 vec![true]
397 }
398
399 fn with_new_children(
400 self: Arc<Self>,
401 mut children: Vec<Arc<dyn ExecutionPlan>>,
402 ) -> Result<Arc<dyn ExecutionPlan>> {
403 FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
404 .and_then(|e| {
405 let selectivity = e.default_selectivity();
406 e.with_default_selectivity(selectivity)
407 })
408 .and_then(|e| e.with_projection(self.projection().cloned()))
409 .map(|e| e.with_fetch(self.fetch).unwrap())
410 }
411
412 fn execute(
413 &self,
414 partition: usize,
415 context: Arc<TaskContext>,
416 ) -> Result<SendableRecordBatchStream> {
417 trace!(
418 "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
419 partition,
420 context.session_id(),
421 context.task_id()
422 );
423 let metrics = FilterExecMetrics::new(&self.metrics, partition);
424 Ok(Box::pin(FilterExecStream {
425 schema: self.schema(),
426 predicate: Arc::clone(&self.predicate),
427 input: self.input.execute(partition, context)?,
428 metrics,
429 projection: self.projection.clone(),
430 batch_coalescer: LimitedBatchCoalescer::new(
431 self.schema(),
432 self.batch_size,
433 self.fetch,
434 ),
435 }))
436 }
437
438 fn metrics(&self) -> Option<MetricsSet> {
439 Some(self.metrics.clone_inner())
440 }
441
442 fn statistics(&self) -> Result<Statistics> {
445 self.partition_statistics(None)
446 }
447
448 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
449 let input_stats = self.input.partition_statistics(partition)?;
450 let schema = self.schema();
451 let stats = Self::statistics_helper(
452 &schema,
453 input_stats,
454 self.predicate(),
455 self.default_selectivity,
456 )?;
457 Ok(stats.project(self.projection.as_ref()))
458 }
459
460 fn cardinality_effect(&self) -> CardinalityEffect {
461 CardinalityEffect::LowerEqual
462 }
463
464 fn try_swapping_with_projection(
467 &self,
468 projection: &ProjectionExec,
469 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
470 if projection.expr().len() < projection.input().schema().fields().len() {
472 if let Some(new_predicate) =
474 update_expr(self.predicate(), projection.expr(), false)?
475 {
476 return FilterExec::try_new(
477 new_predicate,
478 make_with_child(projection, self.input())?,
479 )
480 .and_then(|e| {
481 let selectivity = self.default_selectivity();
482 e.with_default_selectivity(selectivity)
483 })
484 .map(|e| Some(Arc::new(e) as _));
485 }
486 }
487 try_embed_projection(projection, self)
488 }
489
490 fn gather_filters_for_pushdown(
491 &self,
492 phase: FilterPushdownPhase,
493 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
494 _config: &ConfigOptions,
495 ) -> Result<FilterDescription> {
496 if !matches!(phase, FilterPushdownPhase::Pre) {
497 let filter_supports = parent_filters
499 .into_iter()
500 .map(PushedDownPredicate::supported)
501 .collect();
502
503 return Ok(FilterDescription::new().with_child(ChildFilterDescription {
504 parent_filters: filter_supports,
505 self_filters: vec![],
506 }));
507 }
508
509 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
510 .with_self_filters(
511 split_conjunction(&self.predicate)
512 .into_iter()
513 .cloned()
514 .collect(),
515 );
516
517 Ok(FilterDescription::new().with_child(child))
518 }
519
520 fn handle_child_pushdown_result(
521 &self,
522 phase: FilterPushdownPhase,
523 child_pushdown_result: ChildPushdownResult,
524 _config: &ConfigOptions,
525 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
526 if !matches!(phase, FilterPushdownPhase::Pre) {
527 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
528 }
529 let unsupported_parent_filters =
531 child_pushdown_result.parent_filters.iter().filter_map(|f| {
532 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
533 });
534 let unsupported_self_filters = child_pushdown_result
535 .self_filters
536 .first()
537 .expect("we have exactly one child")
538 .iter()
539 .filter_map(|f| match f.discriminant {
540 PushedDown::Yes => None,
541 PushedDown::No => Some(&f.predicate),
542 })
543 .cloned();
544
545 let unhandled_filters = unsupported_parent_filters
546 .into_iter()
547 .chain(unsupported_self_filters)
548 .collect_vec();
549
550 let filter_input = Arc::clone(self.input());
552 let new_predicate = conjunction(unhandled_filters);
553 let updated_node = if new_predicate.eq(&lit(true)) {
554 match self.projection() {
556 Some(projection_indices) => {
557 let filter_child_schema = filter_input.schema();
558 let proj_exprs = projection_indices
559 .iter()
560 .map(|p| {
561 let field = filter_child_schema.field(*p).clone();
562 ProjectionExpr {
563 expr: Arc::new(Column::new(field.name(), *p))
564 as Arc<dyn PhysicalExpr>,
565 alias: field.name().to_string(),
566 }
567 })
568 .collect::<Vec<_>>();
569 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
570 as Arc<dyn ExecutionPlan>)
571 }
572 None => {
573 Some(filter_input)
575 }
576 }
577 } else if new_predicate.eq(&self.predicate) {
578 None
580 } else {
581 let new = FilterExec {
583 predicate: Arc::clone(&new_predicate),
584 input: Arc::clone(&filter_input),
585 metrics: self.metrics.clone(),
586 default_selectivity: self.default_selectivity,
587 cache: Self::compute_properties(
588 &filter_input,
589 &new_predicate,
590 self.default_selectivity,
591 self.projection.as_ref(),
592 )?,
593 projection: None,
594 batch_size: self.batch_size,
595 fetch: self.fetch,
596 };
597 Some(Arc::new(new) as _)
598 };
599
600 Ok(FilterPushdownPropagation {
601 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
602 updated_node,
603 })
604 }
605
606 fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
607 Some(Arc::new(Self {
608 predicate: Arc::clone(&self.predicate),
609 input: Arc::clone(&self.input),
610 metrics: self.metrics.clone(),
611 default_selectivity: self.default_selectivity,
612 cache: self.cache.clone(),
613 projection: self.projection.clone(),
614 batch_size: self.batch_size,
615 fetch,
616 }))
617 }
618}
619
620impl EmbeddedProjection for FilterExec {
621 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
622 self.with_projection(projection)
623 }
624}
625
626fn collect_new_statistics(
631 input_column_stats: &[ColumnStatistics],
632 analysis_boundaries: Vec<ExprBoundaries>,
633) -> Vec<ColumnStatistics> {
634 analysis_boundaries
635 .into_iter()
636 .enumerate()
637 .map(
638 |(
639 idx,
640 ExprBoundaries {
641 interval,
642 distinct_count,
643 ..
644 },
645 )| {
646 let Some(interval) = interval else {
647 return ColumnStatistics {
649 null_count: Precision::Exact(0),
650 max_value: Precision::Exact(ScalarValue::Null),
651 min_value: Precision::Exact(ScalarValue::Null),
652 sum_value: Precision::Exact(ScalarValue::Null),
653 distinct_count: Precision::Exact(0),
654 byte_size: input_column_stats[idx].byte_size,
655 };
656 };
657 let (lower, upper) = interval.into_bounds();
658 let (min_value, max_value) = if lower.eq(&upper) {
659 (Precision::Exact(lower), Precision::Exact(upper))
660 } else {
661 (Precision::Inexact(lower), Precision::Inexact(upper))
662 };
663 ColumnStatistics {
664 null_count: input_column_stats[idx].null_count.to_inexact(),
665 max_value,
666 min_value,
667 sum_value: Precision::Absent,
668 distinct_count: distinct_count.to_inexact(),
669 byte_size: input_column_stats[idx].byte_size,
670 }
671 },
672 )
673 .collect()
674}
675
676struct FilterExecStream {
679 schema: SchemaRef,
681 predicate: Arc<dyn PhysicalExpr>,
683 input: SendableRecordBatchStream,
685 metrics: FilterExecMetrics,
687 projection: Option<Vec<usize>>,
689 batch_coalescer: LimitedBatchCoalescer,
691}
692
693struct FilterExecMetrics {
695 baseline_metrics: BaselineMetrics,
697 selectivity: RatioMetrics,
699 }
702
703impl FilterExecMetrics {
704 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
705 Self {
706 baseline_metrics: BaselineMetrics::new(metrics, partition),
707 selectivity: MetricBuilder::new(metrics)
708 .with_type(MetricType::SUMMARY)
709 .ratio_metrics("selectivity", partition),
710 }
711 }
712}
713
714impl FilterExecStream {
715 fn flush_remaining_batches(
716 &mut self,
717 ) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
718 match self.batch_coalescer.finish() {
720 Ok(()) => {
721 Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
722 self.metrics.selectivity.add_part(batch.num_rows());
723 Ok(batch)
724 }))
725 }
726 Err(e) => Poll::Ready(Some(Err(e))),
727 }
728 }
729}
730
731pub fn batch_filter(
732 batch: &RecordBatch,
733 predicate: &Arc<dyn PhysicalExpr>,
734) -> Result<RecordBatch> {
735 filter_and_project(batch, predicate, None)
736}
737
738fn filter_and_project(
739 batch: &RecordBatch,
740 predicate: &Arc<dyn PhysicalExpr>,
741 projection: Option<&Vec<usize>>,
742) -> Result<RecordBatch> {
743 predicate
744 .evaluate(batch)
745 .and_then(|v| v.into_array(batch.num_rows()))
746 .and_then(|array| {
747 Ok(match (as_boolean_array(&array), projection) {
748 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
750 (Ok(filter_array), Some(projection)) => {
751 let projected_batch = batch.project(projection)?;
752 filter_record_batch(&projected_batch, filter_array)?
753 }
754 (Err(_), _) => {
755 return internal_err!(
756 "Cannot create filter_array from non-boolean predicates"
757 );
758 }
759 })
760 })
761}
762
763impl Stream for FilterExecStream {
764 type Item = Result<RecordBatch>;
765
766 fn poll_next(
767 mut self: Pin<&mut Self>,
768 cx: &mut Context<'_>,
769 ) -> Poll<Option<Self::Item>> {
770 let poll;
771 let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
772 loop {
773 match ready!(self.input.poll_next_unpin(cx)) {
774 Some(Ok(batch)) => {
775 let timer = elapsed_compute.timer();
776 let status = self.predicate.as_ref()
777 .evaluate(&batch)
778 .and_then(|v| v.into_array(batch.num_rows()))
779 .and_then(|array| {
780 Ok(match self.projection {
781 Some(ref projection) => {
782 let projected_batch = batch.project(projection)?;
783 (array, projected_batch)
784 },
785 None => (array, batch)
786 })
787 }).and_then(|(array, batch)| {
788 match as_boolean_array(&array) {
789 Ok(filter_array) => {
790 self.metrics.selectivity.add_total(batch.num_rows());
791 let batch = filter_record_batch(&batch, filter_array)?;
793 let state = self.batch_coalescer.push_batch(batch)?;
794 Ok(state)
795 }
796 Err(_) => {
797 internal_err!(
798 "Cannot create filter_array from non-boolean predicates"
799 )
800 }
801 }
802 })?;
803 timer.done();
804
805 if let LimitReached = status {
806 poll = self.flush_remaining_batches();
807 break;
808 }
809
810 if let Some(batch) = self.batch_coalescer.next_completed_batch() {
811 self.metrics.selectivity.add_part(batch.num_rows());
812 poll = Poll::Ready(Some(Ok(batch)));
813 break;
814 }
815 continue;
816 }
817 None => {
818 match self.batch_coalescer.finish() {
820 Ok(()) => {
821 poll = self.flush_remaining_batches();
822 }
823 Err(e) => {
824 poll = Poll::Ready(Some(Err(e)));
825 }
826 }
827 break;
828 }
829 value => {
830 poll = Poll::Ready(value);
831 break;
832 }
833 }
834 }
835 self.metrics.baseline_metrics.record_poll(poll)
836 }
837
838 fn size_hint(&self) -> (usize, Option<usize>) {
839 self.input.size_hint()
841 }
842}
843impl RecordBatchStream for FilterExecStream {
844 fn schema(&self) -> SchemaRef {
845 Arc::clone(&self.schema)
846 }
847}
848
849#[deprecated(
851 since = "51.0.0",
852 note = "This function will be internal in the future"
853)]
854pub fn collect_columns_from_predicate(
855 predicate: &'_ Arc<dyn PhysicalExpr>,
856) -> EqualAndNonEqual<'_> {
857 collect_columns_from_predicate_inner(predicate)
858}
859
860fn collect_columns_from_predicate_inner(
861 predicate: &'_ Arc<dyn PhysicalExpr>,
862) -> EqualAndNonEqual<'_> {
863 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
864 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
865
866 let predicates = split_conjunction(predicate);
867 predicates.into_iter().for_each(|p| {
868 if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
869 match binary.op() {
870 Operator::Eq => {
871 eq_predicate_columns.push((binary.left(), binary.right()))
872 }
873 Operator::NotEq => {
874 ne_predicate_columns.push((binary.left(), binary.right()))
875 }
876 _ => {}
877 }
878 }
879 });
880
881 (eq_predicate_columns, ne_predicate_columns)
882}
883
884pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
886
887pub type EqualAndNonEqual<'a> =
889 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
890
891#[cfg(test)]
892mod tests {
893 use super::*;
894 use crate::empty::EmptyExec;
895 use crate::expressions::*;
896 use crate::test;
897 use crate::test::exec::StatisticsExec;
898 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
899 use datafusion_common::ScalarValue;
900
901 #[tokio::test]
902 async fn collect_columns_predicates() -> Result<()> {
903 let schema = test::aggr_test_schema();
904 let predicate: Arc<dyn PhysicalExpr> = binary(
905 binary(
906 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
907 Operator::And,
908 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
909 &schema,
910 )?,
911 Operator::And,
912 binary(
913 binary(
914 col("c2", &schema)?,
915 Operator::Eq,
916 col("c9", &schema)?,
917 &schema,
918 )?,
919 Operator::And,
920 binary(
921 col("c1", &schema)?,
922 Operator::NotEq,
923 col("c13", &schema)?,
924 &schema,
925 )?,
926 &schema,
927 )?,
928 &schema,
929 )?;
930
931 let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
932 assert_eq!(2, equal_pairs.len());
933 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
934 assert!(equal_pairs[0].1.eq(&lit(4u32)));
935
936 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
937 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
938
939 assert_eq!(1, ne_pairs.len());
940 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
941 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
942
943 Ok(())
944 }
945
946 #[tokio::test]
947 async fn test_filter_statistics_basic_expr() -> Result<()> {
948 let bytes_per_row = 4;
951 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
952 let input = Arc::new(StatisticsExec::new(
953 Statistics {
954 num_rows: Precision::Inexact(100),
955 total_byte_size: Precision::Inexact(100 * bytes_per_row),
956 column_statistics: vec![ColumnStatistics {
957 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
958 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
959 ..Default::default()
960 }],
961 },
962 schema.clone(),
963 ));
964
965 let predicate: Arc<dyn PhysicalExpr> =
967 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
968
969 let filter: Arc<dyn ExecutionPlan> =
971 Arc::new(FilterExec::try_new(predicate, input)?);
972
973 let statistics = filter.partition_statistics(None)?;
974 assert_eq!(statistics.num_rows, Precision::Inexact(25));
975 assert_eq!(
976 statistics.total_byte_size,
977 Precision::Inexact(25 * bytes_per_row)
978 );
979 assert_eq!(
980 statistics.column_statistics,
981 vec![ColumnStatistics {
982 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
983 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
984 ..Default::default()
985 }]
986 );
987
988 Ok(())
989 }
990
991 #[tokio::test]
992 async fn test_filter_statistics_column_level_nested() -> Result<()> {
993 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
996 let input = Arc::new(StatisticsExec::new(
997 Statistics {
998 num_rows: Precision::Inexact(100),
999 column_statistics: vec![ColumnStatistics {
1000 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1001 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1002 ..Default::default()
1003 }],
1004 total_byte_size: Precision::Absent,
1005 },
1006 schema.clone(),
1007 ));
1008
1009 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1011 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1012 input,
1013 )?);
1014
1015 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1019 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1020 sub_filter,
1021 )?);
1022
1023 let statistics = filter.partition_statistics(None)?;
1024 assert_eq!(statistics.num_rows, Precision::Inexact(16));
1025 assert_eq!(
1026 statistics.column_statistics,
1027 vec![ColumnStatistics {
1028 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1029 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1030 ..Default::default()
1031 }]
1032 );
1033
1034 Ok(())
1035 }
1036
1037 #[tokio::test]
1038 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1039 let schema = Schema::new(vec![
1043 Field::new("a", DataType::Int32, false),
1044 Field::new("b", DataType::Int32, false),
1045 ]);
1046 let input = Arc::new(StatisticsExec::new(
1047 Statistics {
1048 num_rows: Precision::Inexact(100),
1049 column_statistics: vec![
1050 ColumnStatistics {
1051 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1052 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1053 ..Default::default()
1054 },
1055 ColumnStatistics {
1056 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1057 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1058 ..Default::default()
1059 },
1060 ],
1061 total_byte_size: Precision::Absent,
1062 },
1063 schema.clone(),
1064 ));
1065
1066 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1068 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1069 input,
1070 )?);
1071
1072 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1074 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1075 a_lte_25,
1076 )?);
1077
1078 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1080 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1081 b_gt_5,
1082 )?);
1083 let statistics = filter.partition_statistics(None)?;
1084 assert_eq!(statistics.num_rows, Precision::Inexact(2));
1091 assert_eq!(
1092 statistics.column_statistics,
1093 vec![
1094 ColumnStatistics {
1095 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1096 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1097 ..Default::default()
1098 },
1099 ColumnStatistics {
1100 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1101 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1102 ..Default::default()
1103 }
1104 ]
1105 );
1106
1107 Ok(())
1108 }
1109
1110 #[tokio::test]
1111 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1112 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1115 let input = Arc::new(StatisticsExec::new(
1116 Statistics::new_unknown(&schema),
1117 schema.clone(),
1118 ));
1119
1120 let predicate: Arc<dyn PhysicalExpr> =
1122 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1123
1124 let filter: Arc<dyn ExecutionPlan> =
1126 Arc::new(FilterExec::try_new(predicate, input)?);
1127
1128 let statistics = filter.partition_statistics(None)?;
1129 assert_eq!(statistics.num_rows, Precision::Absent);
1130
1131 Ok(())
1132 }
1133
1134 #[tokio::test]
1135 async fn test_filter_statistics_multiple_columns() -> Result<()> {
1136 let schema = Schema::new(vec![
1141 Field::new("a", DataType::Int32, false),
1142 Field::new("b", DataType::Int32, false),
1143 Field::new("c", DataType::Float32, false),
1144 ]);
1145 let input = Arc::new(StatisticsExec::new(
1146 Statistics {
1147 num_rows: Precision::Inexact(1000),
1148 total_byte_size: Precision::Inexact(4000),
1149 column_statistics: vec![
1150 ColumnStatistics {
1151 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1152 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1153 ..Default::default()
1154 },
1155 ColumnStatistics {
1156 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1157 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1158 ..Default::default()
1159 },
1160 ColumnStatistics {
1161 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1162 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1163 ..Default::default()
1164 },
1165 ],
1166 },
1167 schema,
1168 ));
1169 let predicate = Arc::new(BinaryExpr::new(
1171 Arc::new(BinaryExpr::new(
1172 Arc::new(Column::new("a", 0)),
1173 Operator::LtEq,
1174 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1175 )),
1176 Operator::And,
1177 Arc::new(BinaryExpr::new(
1178 Arc::new(BinaryExpr::new(
1179 Arc::new(Column::new("b", 1)),
1180 Operator::Eq,
1181 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1182 )),
1183 Operator::And,
1184 Arc::new(BinaryExpr::new(
1185 Arc::new(BinaryExpr::new(
1186 Arc::new(Column::new("c", 2)),
1187 Operator::LtEq,
1188 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1189 )),
1190 Operator::And,
1191 Arc::new(BinaryExpr::new(
1192 Arc::new(Column::new("a", 0)),
1193 Operator::Gt,
1194 Arc::new(Column::new("b", 1)),
1195 )),
1196 )),
1197 )),
1198 ));
1199 let filter: Arc<dyn ExecutionPlan> =
1200 Arc::new(FilterExec::try_new(predicate, input)?);
1201 let statistics = filter.partition_statistics(None)?;
1202 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1206 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1207 let exp_col_stats = vec![
1208 ColumnStatistics {
1209 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1210 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1211 ..Default::default()
1212 },
1213 ColumnStatistics {
1214 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1215 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1216 ..Default::default()
1217 },
1218 ColumnStatistics {
1219 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1220 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1221 ..Default::default()
1222 },
1223 ];
1224 let _ = exp_col_stats
1225 .into_iter()
1226 .zip(statistics.column_statistics)
1227 .map(|(expected, actual)| {
1228 if let Some(val) = actual.min_value.get_value() {
1229 if val.data_type().is_floating() {
1230 let actual_min = actual.min_value.get_value().unwrap();
1233 let actual_max = actual.max_value.get_value().unwrap();
1234 let expected_min = expected.min_value.get_value().unwrap();
1235 let expected_max = expected.max_value.get_value().unwrap();
1236 let eps = ScalarValue::Float32(Some(1e-6));
1237
1238 assert!(actual_min.sub(expected_min).unwrap() < eps);
1239 assert!(actual_min.sub(expected_min).unwrap() < eps);
1240
1241 assert!(actual_max.sub(expected_max).unwrap() < eps);
1242 assert!(actual_max.sub(expected_max).unwrap() < eps);
1243 } else {
1244 assert_eq!(actual, expected);
1245 }
1246 } else {
1247 assert_eq!(actual, expected);
1248 }
1249 });
1250
1251 Ok(())
1252 }
1253
1254 #[tokio::test]
1255 async fn test_filter_statistics_full_selective() -> Result<()> {
1256 let schema = Schema::new(vec![
1260 Field::new("a", DataType::Int32, false),
1261 Field::new("b", DataType::Int32, false),
1262 ]);
1263 let input = Arc::new(StatisticsExec::new(
1264 Statistics {
1265 num_rows: Precision::Inexact(1000),
1266 total_byte_size: Precision::Inexact(4000),
1267 column_statistics: vec![
1268 ColumnStatistics {
1269 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1270 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1271 ..Default::default()
1272 },
1273 ColumnStatistics {
1274 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1275 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1276 ..Default::default()
1277 },
1278 ],
1279 },
1280 schema,
1281 ));
1282 let predicate = Arc::new(BinaryExpr::new(
1284 Arc::new(BinaryExpr::new(
1285 Arc::new(Column::new("a", 0)),
1286 Operator::Lt,
1287 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1288 )),
1289 Operator::And,
1290 Arc::new(BinaryExpr::new(
1291 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1292 Operator::LtEq,
1293 Arc::new(Column::new("b", 1)),
1294 )),
1295 ));
1296 let expected = input.partition_statistics(None)?.column_statistics;
1298 let filter: Arc<dyn ExecutionPlan> =
1299 Arc::new(FilterExec::try_new(predicate, input)?);
1300 let statistics = filter.partition_statistics(None)?;
1301
1302 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1303 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1304 assert_eq!(statistics.column_statistics, expected);
1305
1306 Ok(())
1307 }
1308
1309 #[tokio::test]
1310 async fn test_filter_statistics_zero_selective() -> Result<()> {
1311 let schema = Schema::new(vec![
1315 Field::new("a", DataType::Int32, false),
1316 Field::new("b", DataType::Int32, false),
1317 ]);
1318 let input = Arc::new(StatisticsExec::new(
1319 Statistics {
1320 num_rows: Precision::Inexact(1000),
1321 total_byte_size: Precision::Inexact(4000),
1322 column_statistics: vec![
1323 ColumnStatistics {
1324 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1325 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1326 ..Default::default()
1327 },
1328 ColumnStatistics {
1329 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1330 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1331 ..Default::default()
1332 },
1333 ],
1334 },
1335 schema,
1336 ));
1337 let predicate = Arc::new(BinaryExpr::new(
1339 Arc::new(BinaryExpr::new(
1340 Arc::new(Column::new("a", 0)),
1341 Operator::Gt,
1342 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1343 )),
1344 Operator::And,
1345 Arc::new(BinaryExpr::new(
1346 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1347 Operator::LtEq,
1348 Arc::new(Column::new("b", 1)),
1349 )),
1350 ));
1351 let filter: Arc<dyn ExecutionPlan> =
1352 Arc::new(FilterExec::try_new(predicate, input)?);
1353 let statistics = filter.partition_statistics(None)?;
1354
1355 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1356 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1357 assert_eq!(
1358 statistics.column_statistics,
1359 vec![
1360 ColumnStatistics {
1361 min_value: Precision::Exact(ScalarValue::Null),
1362 max_value: Precision::Exact(ScalarValue::Null),
1363 sum_value: Precision::Exact(ScalarValue::Null),
1364 distinct_count: Precision::Exact(0),
1365 null_count: Precision::Exact(0),
1366 byte_size: Precision::Absent,
1367 },
1368 ColumnStatistics {
1369 min_value: Precision::Exact(ScalarValue::Null),
1370 max_value: Precision::Exact(ScalarValue::Null),
1371 sum_value: Precision::Exact(ScalarValue::Null),
1372 distinct_count: Precision::Exact(0),
1373 null_count: Precision::Exact(0),
1374 byte_size: Precision::Absent,
1375 },
1376 ]
1377 );
1378
1379 Ok(())
1380 }
1381
1382 #[tokio::test]
1383 async fn test_filter_statistics_more_inputs() -> Result<()> {
1384 let schema = Schema::new(vec![
1385 Field::new("a", DataType::Int32, false),
1386 Field::new("b", DataType::Int32, false),
1387 ]);
1388 let input = Arc::new(StatisticsExec::new(
1389 Statistics {
1390 num_rows: Precision::Inexact(1000),
1391 total_byte_size: Precision::Inexact(4000),
1392 column_statistics: vec![
1393 ColumnStatistics {
1394 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1395 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1396 ..Default::default()
1397 },
1398 ColumnStatistics {
1399 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1400 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1401 ..Default::default()
1402 },
1403 ],
1404 },
1405 schema,
1406 ));
1407 let predicate = Arc::new(BinaryExpr::new(
1409 Arc::new(Column::new("a", 0)),
1410 Operator::Lt,
1411 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1412 ));
1413 let filter: Arc<dyn ExecutionPlan> =
1414 Arc::new(FilterExec::try_new(predicate, input)?);
1415 let statistics = filter.partition_statistics(None)?;
1416
1417 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1418 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1419 assert_eq!(
1420 statistics.column_statistics,
1421 vec![
1422 ColumnStatistics {
1423 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1424 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1425 ..Default::default()
1426 },
1427 ColumnStatistics {
1428 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1429 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1430 ..Default::default()
1431 },
1432 ]
1433 );
1434
1435 Ok(())
1436 }
1437
1438 #[tokio::test]
1439 async fn test_empty_input_statistics() -> Result<()> {
1440 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1441 let input = Arc::new(StatisticsExec::new(
1442 Statistics::new_unknown(&schema),
1443 schema,
1444 ));
1445 let predicate = Arc::new(BinaryExpr::new(
1447 Arc::new(BinaryExpr::new(
1448 Arc::new(Column::new("a", 0)),
1449 Operator::LtEq,
1450 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1451 )),
1452 Operator::And,
1453 Arc::new(BinaryExpr::new(
1454 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1455 Operator::LtEq,
1456 Arc::new(BinaryExpr::new(
1457 Arc::new(Column::new("a", 0)),
1458 Operator::Minus,
1459 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1460 )),
1461 )),
1462 ));
1463 let filter: Arc<dyn ExecutionPlan> =
1464 Arc::new(FilterExec::try_new(predicate, input)?);
1465 let filter_statistics = filter.partition_statistics(None)?;
1466
1467 let expected_filter_statistics = Statistics {
1468 num_rows: Precision::Absent,
1469 total_byte_size: Precision::Absent,
1470 column_statistics: vec![ColumnStatistics {
1471 null_count: Precision::Absent,
1472 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1473 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1474 sum_value: Precision::Absent,
1475 distinct_count: Precision::Absent,
1476 byte_size: Precision::Absent,
1477 }],
1478 };
1479
1480 assert_eq!(filter_statistics, expected_filter_statistics);
1481
1482 Ok(())
1483 }
1484
1485 #[tokio::test]
1486 async fn test_statistics_with_constant_column() -> Result<()> {
1487 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1488 let input = Arc::new(StatisticsExec::new(
1489 Statistics::new_unknown(&schema),
1490 schema,
1491 ));
1492 let predicate = Arc::new(BinaryExpr::new(
1494 Arc::new(Column::new("a", 0)),
1495 Operator::Eq,
1496 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1497 ));
1498 let filter: Arc<dyn ExecutionPlan> =
1499 Arc::new(FilterExec::try_new(predicate, input)?);
1500 let filter_statistics = filter.partition_statistics(None)?;
1501 assert!(filter_statistics.column_statistics[0].is_singleton());
1503
1504 Ok(())
1505 }
1506
1507 #[tokio::test]
1508 async fn test_validation_filter_selectivity() -> Result<()> {
1509 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1510 let input = Arc::new(StatisticsExec::new(
1511 Statistics::new_unknown(&schema),
1512 schema,
1513 ));
1514 let predicate = Arc::new(BinaryExpr::new(
1516 Arc::new(Column::new("a", 0)),
1517 Operator::Eq,
1518 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1519 ));
1520 let filter = FilterExec::try_new(predicate, input)?;
1521 assert!(filter.with_default_selectivity(120).is_err());
1522 Ok(())
1523 }
1524
1525 #[tokio::test]
1526 async fn test_custom_filter_selectivity() -> Result<()> {
1527 let schema =
1529 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1530 let input = Arc::new(StatisticsExec::new(
1531 Statistics {
1532 num_rows: Precision::Inexact(1000),
1533 total_byte_size: Precision::Inexact(4000),
1534 column_statistics: vec![ColumnStatistics {
1535 ..Default::default()
1536 }],
1537 },
1538 schema,
1539 ));
1540 let predicate = Arc::new(BinaryExpr::new(
1542 Arc::new(Column::new("a", 0)),
1543 Operator::Eq,
1544 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1545 ));
1546 let filter = FilterExec::try_new(predicate, input)?;
1547 let statistics = filter.partition_statistics(None)?;
1548 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1549 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1550 let filter = filter.with_default_selectivity(40)?;
1551 let statistics = filter.partition_statistics(None)?;
1552 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1553 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1554 Ok(())
1555 }
1556
1557 #[test]
1558 fn test_equivalence_properties_union_type() -> Result<()> {
1559 let union_type = DataType::Union(
1560 UnionFields::new(
1561 vec![0, 1],
1562 vec![
1563 Field::new("f1", DataType::Int32, true),
1564 Field::new("f2", DataType::Utf8, true),
1565 ],
1566 ),
1567 UnionMode::Sparse,
1568 );
1569
1570 let schema = Arc::new(Schema::new(vec![
1571 Field::new("c1", DataType::Int32, true),
1572 Field::new("c2", union_type, true),
1573 ]));
1574
1575 let exec = FilterExec::try_new(
1576 binary(
1577 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1578 Operator::And,
1579 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1580 &schema,
1581 )?,
1582 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1583 )?;
1584
1585 exec.partition_statistics(None).unwrap();
1586
1587 Ok(())
1588 }
1589}