1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27 RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
30use crate::common::can_project;
31use crate::execution_plan::CardinalityEffect;
32use crate::filter_pushdown::{
33 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
34 FilterPushdownPropagation, PushedDown, PushedDownPredicate,
35};
36use crate::metrics::{MetricBuilder, MetricType};
37use crate::projection::{
38 EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
39 try_embed_projection, update_expr,
40};
41use crate::{
42 DisplayFormatType, ExecutionPlan,
43 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
44};
45
46use arrow::compute::filter_record_batch;
47use arrow::datatypes::{DataType, SchemaRef};
48use arrow::record_batch::RecordBatch;
49use datafusion_common::cast::as_boolean_array;
50use datafusion_common::config::ConfigOptions;
51use datafusion_common::stats::Precision;
52use datafusion_common::{
53 DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
54};
55use datafusion_execution::TaskContext;
56use datafusion_expr::Operator;
57use datafusion_physical_expr::equivalence::ProjectionMapping;
58use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
59use datafusion_physical_expr::intervals::utils::check_support;
60use datafusion_physical_expr::utils::collect_columns;
61use datafusion_physical_expr::{
62 AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
63 conjunction, split_conjunction,
64};
65
66use datafusion_physical_expr_common::physical_expr::fmt_sql;
67use futures::stream::{Stream, StreamExt};
68use log::trace;
69
70const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
71const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
72
73#[derive(Debug, Clone)]
76pub struct FilterExec {
77 predicate: Arc<dyn PhysicalExpr>,
79 input: Arc<dyn ExecutionPlan>,
81 metrics: ExecutionPlanMetricsSet,
83 default_selectivity: u8,
85 cache: PlanProperties,
87 projection: Option<Vec<usize>>,
89 batch_size: usize,
91 fetch: Option<usize>,
93}
94
95impl FilterExec {
96 #[expect(clippy::needless_pass_by_value)]
98 pub fn try_new(
99 predicate: Arc<dyn PhysicalExpr>,
100 input: Arc<dyn ExecutionPlan>,
101 ) -> Result<Self> {
102 match predicate.data_type(input.schema().as_ref())? {
103 DataType::Boolean => {
104 let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
105 let cache = Self::compute_properties(
106 &input,
107 &predicate,
108 default_selectivity,
109 None,
110 )?;
111 Ok(Self {
112 predicate,
113 input: Arc::clone(&input),
114 metrics: ExecutionPlanMetricsSet::new(),
115 default_selectivity,
116 cache,
117 projection: None,
118 batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119 fetch: None,
120 })
121 }
122 other => {
123 plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
124 }
125 }
126 }
127
128 pub fn with_default_selectivity(
129 mut self,
130 default_selectivity: u8,
131 ) -> Result<Self, DataFusionError> {
132 if default_selectivity > 100 {
133 return plan_err!(
134 "Default filter selectivity value needs to be less than or equal to 100"
135 );
136 }
137 self.default_selectivity = default_selectivity;
138 Ok(self)
139 }
140
141 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
143 can_project(&self.schema(), projection.as_ref())?;
145
146 let projection = match projection {
147 Some(projection) => match &self.projection {
148 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
149 None => Some(projection),
150 },
151 None => None,
152 };
153
154 let cache = Self::compute_properties(
155 &self.input,
156 &self.predicate,
157 self.default_selectivity,
158 projection.as_ref(),
159 )?;
160 Ok(Self {
161 predicate: Arc::clone(&self.predicate),
162 input: Arc::clone(&self.input),
163 metrics: self.metrics.clone(),
164 default_selectivity: self.default_selectivity,
165 cache,
166 projection,
167 batch_size: self.batch_size,
168 fetch: self.fetch,
169 })
170 }
171
172 pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
173 Ok(Self {
174 predicate: Arc::clone(&self.predicate),
175 input: Arc::clone(&self.input),
176 metrics: self.metrics.clone(),
177 default_selectivity: self.default_selectivity,
178 cache: self.cache.clone(),
179 projection: self.projection.clone(),
180 batch_size,
181 fetch: self.fetch,
182 })
183 }
184
185 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
187 &self.predicate
188 }
189
190 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
192 &self.input
193 }
194
195 pub fn default_selectivity(&self) -> u8 {
197 self.default_selectivity
198 }
199
200 pub fn projection(&self) -> Option<&Vec<usize>> {
202 self.projection.as_ref()
203 }
204
205 fn statistics_helper(
207 schema: &SchemaRef,
208 input_stats: Statistics,
209 predicate: &Arc<dyn PhysicalExpr>,
210 default_selectivity: u8,
211 ) -> Result<Statistics> {
212 if !check_support(predicate, schema) {
213 let selectivity = default_selectivity as f64 / 100.0;
214 let mut stats = input_stats.to_inexact();
215 stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
216 stats.total_byte_size = stats
217 .total_byte_size
218 .with_estimated_selectivity(selectivity);
219 return Ok(stats);
220 }
221
222 let num_rows = input_stats.num_rows;
223 let total_byte_size = input_stats.total_byte_size;
224 let input_analysis_ctx =
225 AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
226
227 let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
228
229 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
231 let num_rows = num_rows.with_estimated_selectivity(selectivity);
232 let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
233
234 let column_statistics = collect_new_statistics(
235 &input_stats.column_statistics,
236 analysis_ctx.boundaries,
237 );
238 Ok(Statistics {
239 num_rows,
240 total_byte_size,
241 column_statistics,
242 })
243 }
244
245 fn extend_constants(
246 input: &Arc<dyn ExecutionPlan>,
247 predicate: &Arc<dyn PhysicalExpr>,
248 ) -> Vec<ConstExpr> {
249 let mut res_constants = Vec::new();
250 let input_eqs = input.equivalence_properties();
251
252 let conjunctions = split_conjunction(predicate);
253 for conjunction in conjunctions {
254 if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
255 && binary.op() == &Operator::Eq
256 {
257 if input_eqs.is_expr_constant(binary.left()).is_some() {
259 let across = input_eqs
260 .is_expr_constant(binary.right())
261 .unwrap_or_default();
262 res_constants
263 .push(ConstExpr::new(Arc::clone(binary.right()), across));
264 } else if input_eqs.is_expr_constant(binary.right()).is_some() {
265 let across = input_eqs
266 .is_expr_constant(binary.left())
267 .unwrap_or_default();
268 res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
269 }
270 }
271 }
272 res_constants
273 }
274 fn compute_properties(
276 input: &Arc<dyn ExecutionPlan>,
277 predicate: &Arc<dyn PhysicalExpr>,
278 default_selectivity: u8,
279 projection: Option<&Vec<usize>>,
280 ) -> Result<PlanProperties> {
281 let schema = input.schema();
284 let stats = Self::statistics_helper(
285 &schema,
286 input.partition_statistics(None)?,
287 predicate,
288 default_selectivity,
289 )?;
290 let mut eq_properties = input.equivalence_properties().clone();
291 let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
292 for (lhs, rhs) in equal_pairs {
293 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
294 }
295 let constants = collect_columns(predicate)
298 .into_iter()
299 .filter(|column| stats.column_statistics[column.index()].is_singleton())
300 .map(|column| {
301 let value = stats.column_statistics[column.index()]
302 .min_value
303 .get_value();
304 let expr = Arc::new(column) as _;
305 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
306 });
307 eq_properties.add_constants(constants)?;
309 eq_properties.add_constants(Self::extend_constants(input, predicate))?;
312
313 let mut output_partitioning = input.output_partitioning().clone();
314 if let Some(projection) = projection {
316 let schema = eq_properties.schema();
317 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
318 let out_schema = project_schema(schema, Some(projection))?;
319 output_partitioning =
320 output_partitioning.project(&projection_mapping, &eq_properties);
321 eq_properties = eq_properties.project(&projection_mapping, out_schema);
322 }
323
324 Ok(PlanProperties::new(
325 eq_properties,
326 output_partitioning,
327 input.pipeline_behavior(),
328 input.boundedness(),
329 ))
330 }
331}
332
333impl DisplayAs for FilterExec {
334 fn fmt_as(
335 &self,
336 t: DisplayFormatType,
337 f: &mut std::fmt::Formatter,
338 ) -> std::fmt::Result {
339 match t {
340 DisplayFormatType::Default | DisplayFormatType::Verbose => {
341 let display_projections = if let Some(projection) =
342 self.projection.as_ref()
343 {
344 format!(
345 ", projection=[{}]",
346 projection
347 .iter()
348 .map(|index| format!(
349 "{}@{}",
350 self.input.schema().fields().get(*index).unwrap().name(),
351 index
352 ))
353 .collect::<Vec<_>>()
354 .join(", ")
355 )
356 } else {
357 "".to_string()
358 };
359 let fetch = self
360 .fetch
361 .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
362 write!(
363 f,
364 "FilterExec: {}{}{}",
365 self.predicate, display_projections, fetch
366 )
367 }
368 DisplayFormatType::TreeRender => {
369 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
370 }
371 }
372 }
373}
374
375impl ExecutionPlan for FilterExec {
376 fn name(&self) -> &'static str {
377 "FilterExec"
378 }
379
380 fn as_any(&self) -> &dyn Any {
382 self
383 }
384
385 fn properties(&self) -> &PlanProperties {
386 &self.cache
387 }
388
389 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
390 vec![&self.input]
391 }
392
393 fn maintains_input_order(&self) -> Vec<bool> {
394 vec![true]
396 }
397
398 fn with_new_children(
399 self: Arc<Self>,
400 mut children: Vec<Arc<dyn ExecutionPlan>>,
401 ) -> Result<Arc<dyn ExecutionPlan>> {
402 FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
403 .and_then(|e| {
404 let selectivity = e.default_selectivity();
405 e.with_default_selectivity(selectivity)
406 })
407 .and_then(|e| e.with_projection(self.projection().cloned()))
408 .map(|e| e.with_fetch(self.fetch).unwrap())
409 }
410
411 fn execute(
412 &self,
413 partition: usize,
414 context: Arc<TaskContext>,
415 ) -> Result<SendableRecordBatchStream> {
416 trace!(
417 "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
418 partition,
419 context.session_id(),
420 context.task_id()
421 );
422 let metrics = FilterExecMetrics::new(&self.metrics, partition);
423 Ok(Box::pin(FilterExecStream {
424 schema: self.schema(),
425 predicate: Arc::clone(&self.predicate),
426 input: self.input.execute(partition, context)?,
427 metrics,
428 projection: self.projection.clone(),
429 batch_coalescer: LimitedBatchCoalescer::new(
430 self.schema(),
431 self.batch_size,
432 self.fetch,
433 ),
434 }))
435 }
436
437 fn metrics(&self) -> Option<MetricsSet> {
438 Some(self.metrics.clone_inner())
439 }
440
441 fn statistics(&self) -> Result<Statistics> {
444 self.partition_statistics(None)
445 }
446
447 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
448 let input_stats = self.input.partition_statistics(partition)?;
449 let schema = self.schema();
450 let stats = Self::statistics_helper(
451 &schema,
452 input_stats,
453 self.predicate(),
454 self.default_selectivity,
455 )?;
456 Ok(stats.project(self.projection.as_ref()))
457 }
458
459 fn cardinality_effect(&self) -> CardinalityEffect {
460 CardinalityEffect::LowerEqual
461 }
462
463 fn try_swapping_with_projection(
466 &self,
467 projection: &ProjectionExec,
468 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
469 if projection.expr().len() < projection.input().schema().fields().len() {
471 if let Some(new_predicate) =
473 update_expr(self.predicate(), projection.expr(), false)?
474 {
475 return FilterExec::try_new(
476 new_predicate,
477 make_with_child(projection, self.input())?,
478 )
479 .and_then(|e| {
480 let selectivity = self.default_selectivity();
481 e.with_default_selectivity(selectivity)
482 })
483 .map(|e| Some(Arc::new(e) as _));
484 }
485 }
486 try_embed_projection(projection, self)
487 }
488
489 fn gather_filters_for_pushdown(
490 &self,
491 phase: FilterPushdownPhase,
492 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
493 _config: &ConfigOptions,
494 ) -> Result<FilterDescription> {
495 if !matches!(phase, FilterPushdownPhase::Pre) {
496 let filter_supports = parent_filters
498 .into_iter()
499 .map(PushedDownPredicate::supported)
500 .collect();
501
502 return Ok(FilterDescription::new().with_child(ChildFilterDescription {
503 parent_filters: filter_supports,
504 self_filters: vec![],
505 }));
506 }
507
508 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
509 .with_self_filters(
510 split_conjunction(&self.predicate)
511 .into_iter()
512 .cloned()
513 .collect(),
514 );
515
516 Ok(FilterDescription::new().with_child(child))
517 }
518
519 fn handle_child_pushdown_result(
520 &self,
521 phase: FilterPushdownPhase,
522 child_pushdown_result: ChildPushdownResult,
523 _config: &ConfigOptions,
524 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
525 if !matches!(phase, FilterPushdownPhase::Pre) {
526 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
527 }
528 let unsupported_parent_filters =
530 child_pushdown_result.parent_filters.iter().filter_map(|f| {
531 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
532 });
533 let unsupported_self_filters = child_pushdown_result
534 .self_filters
535 .first()
536 .expect("we have exactly one child")
537 .iter()
538 .filter_map(|f| match f.discriminant {
539 PushedDown::Yes => None,
540 PushedDown::No => Some(&f.predicate),
541 })
542 .cloned();
543
544 let unhandled_filters = unsupported_parent_filters
545 .into_iter()
546 .chain(unsupported_self_filters)
547 .collect_vec();
548
549 let filter_input = Arc::clone(self.input());
551 let new_predicate = conjunction(unhandled_filters);
552 let updated_node = if new_predicate.eq(&lit(true)) {
553 match self.projection() {
555 Some(projection_indices) => {
556 let filter_child_schema = filter_input.schema();
557 let proj_exprs = projection_indices
558 .iter()
559 .map(|p| {
560 let field = filter_child_schema.field(*p).clone();
561 ProjectionExpr {
562 expr: Arc::new(Column::new(field.name(), *p))
563 as Arc<dyn PhysicalExpr>,
564 alias: field.name().to_string(),
565 }
566 })
567 .collect::<Vec<_>>();
568 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
569 as Arc<dyn ExecutionPlan>)
570 }
571 None => {
572 Some(filter_input)
574 }
575 }
576 } else if new_predicate.eq(&self.predicate) {
577 None
579 } else {
580 let new = FilterExec {
582 predicate: Arc::clone(&new_predicate),
583 input: Arc::clone(&filter_input),
584 metrics: self.metrics.clone(),
585 default_selectivity: self.default_selectivity,
586 cache: Self::compute_properties(
587 &filter_input,
588 &new_predicate,
589 self.default_selectivity,
590 self.projection.as_ref(),
591 )?,
592 projection: None,
593 batch_size: self.batch_size,
594 fetch: self.fetch,
595 };
596 Some(Arc::new(new) as _)
597 };
598
599 Ok(FilterPushdownPropagation {
600 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
601 updated_node,
602 })
603 }
604
605 fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
606 Some(Arc::new(Self {
607 predicate: Arc::clone(&self.predicate),
608 input: Arc::clone(&self.input),
609 metrics: self.metrics.clone(),
610 default_selectivity: self.default_selectivity,
611 cache: self.cache.clone(),
612 projection: self.projection.clone(),
613 batch_size: self.batch_size,
614 fetch,
615 }))
616 }
617}
618
619impl EmbeddedProjection for FilterExec {
620 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
621 self.with_projection(projection)
622 }
623}
624
625fn collect_new_statistics(
630 input_column_stats: &[ColumnStatistics],
631 analysis_boundaries: Vec<ExprBoundaries>,
632) -> Vec<ColumnStatistics> {
633 analysis_boundaries
634 .into_iter()
635 .enumerate()
636 .map(
637 |(
638 idx,
639 ExprBoundaries {
640 interval,
641 distinct_count,
642 ..
643 },
644 )| {
645 let Some(interval) = interval else {
646 return ColumnStatistics {
648 null_count: Precision::Exact(0),
649 max_value: Precision::Exact(ScalarValue::Null),
650 min_value: Precision::Exact(ScalarValue::Null),
651 sum_value: Precision::Exact(ScalarValue::Null),
652 distinct_count: Precision::Exact(0),
653 byte_size: input_column_stats[idx].byte_size,
654 };
655 };
656 let (lower, upper) = interval.into_bounds();
657 let (min_value, max_value) = if lower.eq(&upper) {
658 (Precision::Exact(lower), Precision::Exact(upper))
659 } else {
660 (Precision::Inexact(lower), Precision::Inexact(upper))
661 };
662 ColumnStatistics {
663 null_count: input_column_stats[idx].null_count.to_inexact(),
664 max_value,
665 min_value,
666 sum_value: Precision::Absent,
667 distinct_count: distinct_count.to_inexact(),
668 byte_size: input_column_stats[idx].byte_size,
669 }
670 },
671 )
672 .collect()
673}
674
675struct FilterExecStream {
678 schema: SchemaRef,
680 predicate: Arc<dyn PhysicalExpr>,
682 input: SendableRecordBatchStream,
684 metrics: FilterExecMetrics,
686 projection: Option<Vec<usize>>,
688 batch_coalescer: LimitedBatchCoalescer,
690}
691
692struct FilterExecMetrics {
694 baseline_metrics: BaselineMetrics,
696 selectivity: RatioMetrics,
698 }
701
702impl FilterExecMetrics {
703 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
704 Self {
705 baseline_metrics: BaselineMetrics::new(metrics, partition),
706 selectivity: MetricBuilder::new(metrics)
707 .with_type(MetricType::SUMMARY)
708 .ratio_metrics("selectivity", partition),
709 }
710 }
711}
712
713pub fn batch_filter(
714 batch: &RecordBatch,
715 predicate: &Arc<dyn PhysicalExpr>,
716) -> Result<RecordBatch> {
717 filter_and_project(batch, predicate, None)
718}
719
720fn filter_and_project(
721 batch: &RecordBatch,
722 predicate: &Arc<dyn PhysicalExpr>,
723 projection: Option<&Vec<usize>>,
724) -> Result<RecordBatch> {
725 predicate
726 .evaluate(batch)
727 .and_then(|v| v.into_array(batch.num_rows()))
728 .and_then(|array| {
729 Ok(match (as_boolean_array(&array), projection) {
730 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
732 (Ok(filter_array), Some(projection)) => {
733 let projected_batch = batch.project(projection)?;
734 filter_record_batch(&projected_batch, filter_array)?
735 }
736 (Err(_), _) => {
737 return internal_err!(
738 "Cannot create filter_array from non-boolean predicates"
739 );
740 }
741 })
742 })
743}
744
745impl Stream for FilterExecStream {
746 type Item = Result<RecordBatch>;
747
748 fn poll_next(
749 mut self: Pin<&mut Self>,
750 cx: &mut Context<'_>,
751 ) -> Poll<Option<Self::Item>> {
752 let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
753 loop {
754 if let Some(batch) = self.batch_coalescer.next_completed_batch() {
756 self.metrics.selectivity.add_part(batch.num_rows());
757 let poll = Poll::Ready(Some(Ok(batch)));
758 return self.metrics.baseline_metrics.record_poll(poll);
759 }
760
761 if self.batch_coalescer.is_finished() {
762 return Poll::Ready(None);
764 }
765
766 match ready!(self.input.poll_next_unpin(cx)) {
768 None => {
769 self.batch_coalescer.finish()?;
770 }
772 Some(Ok(batch)) => {
773 let timer = elapsed_compute.timer();
774 let status = self.predicate.as_ref()
775 .evaluate(&batch)
776 .and_then(|v| v.into_array(batch.num_rows()))
777 .and_then(|array| {
778 Ok(match self.projection {
779 Some(ref projection) => {
780 let projected_batch = batch.project(projection)?;
781 (array, projected_batch)
782 },
783 None => (array, batch)
784 })
785 }).and_then(|(array, batch)| {
786 match as_boolean_array(&array) {
787 Ok(filter_array) => {
788 self.metrics.selectivity.add_total(batch.num_rows());
789 let batch = filter_record_batch(&batch, filter_array)?;
791 let state = self.batch_coalescer.push_batch(batch)?;
792 Ok(state)
793 }
794 Err(_) => {
795 internal_err!(
796 "Cannot create filter_array from non-boolean predicates"
797 )
798 }
799 }
800 })?;
801 timer.done();
802
803 match status {
804 PushBatchStatus::Continue => {
805 }
807 PushBatchStatus::LimitReached => {
808 self.batch_coalescer.finish()?;
810 }
812 }
813 }
814
815 other => return Poll::Ready(other),
817 }
818 }
819 }
820
821 fn size_hint(&self) -> (usize, Option<usize>) {
822 self.input.size_hint()
824 }
825}
826impl RecordBatchStream for FilterExecStream {
827 fn schema(&self) -> SchemaRef {
828 Arc::clone(&self.schema)
829 }
830}
831
832#[deprecated(
834 since = "51.0.0",
835 note = "This function will be internal in the future"
836)]
837pub fn collect_columns_from_predicate(
838 predicate: &'_ Arc<dyn PhysicalExpr>,
839) -> EqualAndNonEqual<'_> {
840 collect_columns_from_predicate_inner(predicate)
841}
842
843fn collect_columns_from_predicate_inner(
844 predicate: &'_ Arc<dyn PhysicalExpr>,
845) -> EqualAndNonEqual<'_> {
846 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
847 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
848
849 let predicates = split_conjunction(predicate);
850 predicates.into_iter().for_each(|p| {
851 if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
852 match binary.op() {
853 Operator::Eq => {
854 eq_predicate_columns.push((binary.left(), binary.right()))
855 }
856 Operator::NotEq => {
857 ne_predicate_columns.push((binary.left(), binary.right()))
858 }
859 _ => {}
860 }
861 }
862 });
863
864 (eq_predicate_columns, ne_predicate_columns)
865}
866
867pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
869
870pub type EqualAndNonEqual<'a> =
872 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877 use crate::empty::EmptyExec;
878 use crate::expressions::*;
879 use crate::test;
880 use crate::test::exec::StatisticsExec;
881 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
882 use datafusion_common::ScalarValue;
883
884 #[tokio::test]
885 async fn collect_columns_predicates() -> Result<()> {
886 let schema = test::aggr_test_schema();
887 let predicate: Arc<dyn PhysicalExpr> = binary(
888 binary(
889 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
890 Operator::And,
891 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
892 &schema,
893 )?,
894 Operator::And,
895 binary(
896 binary(
897 col("c2", &schema)?,
898 Operator::Eq,
899 col("c9", &schema)?,
900 &schema,
901 )?,
902 Operator::And,
903 binary(
904 col("c1", &schema)?,
905 Operator::NotEq,
906 col("c13", &schema)?,
907 &schema,
908 )?,
909 &schema,
910 )?,
911 &schema,
912 )?;
913
914 let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
915 assert_eq!(2, equal_pairs.len());
916 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
917 assert!(equal_pairs[0].1.eq(&lit(4u32)));
918
919 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
920 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
921
922 assert_eq!(1, ne_pairs.len());
923 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
924 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
925
926 Ok(())
927 }
928
929 #[tokio::test]
930 async fn test_filter_statistics_basic_expr() -> Result<()> {
931 let bytes_per_row = 4;
934 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
935 let input = Arc::new(StatisticsExec::new(
936 Statistics {
937 num_rows: Precision::Inexact(100),
938 total_byte_size: Precision::Inexact(100 * bytes_per_row),
939 column_statistics: vec![ColumnStatistics {
940 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
941 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
942 ..Default::default()
943 }],
944 },
945 schema.clone(),
946 ));
947
948 let predicate: Arc<dyn PhysicalExpr> =
950 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
951
952 let filter: Arc<dyn ExecutionPlan> =
954 Arc::new(FilterExec::try_new(predicate, input)?);
955
956 let statistics = filter.partition_statistics(None)?;
957 assert_eq!(statistics.num_rows, Precision::Inexact(25));
958 assert_eq!(
959 statistics.total_byte_size,
960 Precision::Inexact(25 * bytes_per_row)
961 );
962 assert_eq!(
963 statistics.column_statistics,
964 vec![ColumnStatistics {
965 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
966 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
967 ..Default::default()
968 }]
969 );
970
971 Ok(())
972 }
973
974 #[tokio::test]
975 async fn test_filter_statistics_column_level_nested() -> Result<()> {
976 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
979 let input = Arc::new(StatisticsExec::new(
980 Statistics {
981 num_rows: Precision::Inexact(100),
982 column_statistics: vec![ColumnStatistics {
983 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
984 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
985 ..Default::default()
986 }],
987 total_byte_size: Precision::Absent,
988 },
989 schema.clone(),
990 ));
991
992 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
994 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
995 input,
996 )?);
997
998 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1002 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1003 sub_filter,
1004 )?);
1005
1006 let statistics = filter.partition_statistics(None)?;
1007 assert_eq!(statistics.num_rows, Precision::Inexact(16));
1008 assert_eq!(
1009 statistics.column_statistics,
1010 vec![ColumnStatistics {
1011 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1012 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1013 ..Default::default()
1014 }]
1015 );
1016
1017 Ok(())
1018 }
1019
1020 #[tokio::test]
1021 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1022 let schema = Schema::new(vec![
1026 Field::new("a", DataType::Int32, false),
1027 Field::new("b", DataType::Int32, false),
1028 ]);
1029 let input = Arc::new(StatisticsExec::new(
1030 Statistics {
1031 num_rows: Precision::Inexact(100),
1032 column_statistics: vec![
1033 ColumnStatistics {
1034 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1035 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1036 ..Default::default()
1037 },
1038 ColumnStatistics {
1039 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1040 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1041 ..Default::default()
1042 },
1043 ],
1044 total_byte_size: Precision::Absent,
1045 },
1046 schema.clone(),
1047 ));
1048
1049 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1051 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1052 input,
1053 )?);
1054
1055 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1057 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1058 a_lte_25,
1059 )?);
1060
1061 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1063 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1064 b_gt_5,
1065 )?);
1066 let statistics = filter.partition_statistics(None)?;
1067 assert_eq!(statistics.num_rows, Precision::Inexact(2));
1074 assert_eq!(
1075 statistics.column_statistics,
1076 vec![
1077 ColumnStatistics {
1078 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1079 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1080 ..Default::default()
1081 },
1082 ColumnStatistics {
1083 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1084 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1085 ..Default::default()
1086 }
1087 ]
1088 );
1089
1090 Ok(())
1091 }
1092
1093 #[tokio::test]
1094 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1095 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1098 let input = Arc::new(StatisticsExec::new(
1099 Statistics::new_unknown(&schema),
1100 schema.clone(),
1101 ));
1102
1103 let predicate: Arc<dyn PhysicalExpr> =
1105 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1106
1107 let filter: Arc<dyn ExecutionPlan> =
1109 Arc::new(FilterExec::try_new(predicate, input)?);
1110
1111 let statistics = filter.partition_statistics(None)?;
1112 assert_eq!(statistics.num_rows, Precision::Absent);
1113
1114 Ok(())
1115 }
1116
1117 #[tokio::test]
1118 async fn test_filter_statistics_multiple_columns() -> Result<()> {
1119 let schema = Schema::new(vec![
1124 Field::new("a", DataType::Int32, false),
1125 Field::new("b", DataType::Int32, false),
1126 Field::new("c", DataType::Float32, false),
1127 ]);
1128 let input = Arc::new(StatisticsExec::new(
1129 Statistics {
1130 num_rows: Precision::Inexact(1000),
1131 total_byte_size: Precision::Inexact(4000),
1132 column_statistics: vec![
1133 ColumnStatistics {
1134 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1135 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1136 ..Default::default()
1137 },
1138 ColumnStatistics {
1139 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1140 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1141 ..Default::default()
1142 },
1143 ColumnStatistics {
1144 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1145 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1146 ..Default::default()
1147 },
1148 ],
1149 },
1150 schema,
1151 ));
1152 let predicate = Arc::new(BinaryExpr::new(
1154 Arc::new(BinaryExpr::new(
1155 Arc::new(Column::new("a", 0)),
1156 Operator::LtEq,
1157 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1158 )),
1159 Operator::And,
1160 Arc::new(BinaryExpr::new(
1161 Arc::new(BinaryExpr::new(
1162 Arc::new(Column::new("b", 1)),
1163 Operator::Eq,
1164 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1165 )),
1166 Operator::And,
1167 Arc::new(BinaryExpr::new(
1168 Arc::new(BinaryExpr::new(
1169 Arc::new(Column::new("c", 2)),
1170 Operator::LtEq,
1171 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1172 )),
1173 Operator::And,
1174 Arc::new(BinaryExpr::new(
1175 Arc::new(Column::new("a", 0)),
1176 Operator::Gt,
1177 Arc::new(Column::new("b", 1)),
1178 )),
1179 )),
1180 )),
1181 ));
1182 let filter: Arc<dyn ExecutionPlan> =
1183 Arc::new(FilterExec::try_new(predicate, input)?);
1184 let statistics = filter.partition_statistics(None)?;
1185 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1189 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1190 let exp_col_stats = vec![
1191 ColumnStatistics {
1192 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1193 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1194 ..Default::default()
1195 },
1196 ColumnStatistics {
1197 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1198 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1199 ..Default::default()
1200 },
1201 ColumnStatistics {
1202 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1203 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1204 ..Default::default()
1205 },
1206 ];
1207 let _ = exp_col_stats
1208 .into_iter()
1209 .zip(statistics.column_statistics)
1210 .map(|(expected, actual)| {
1211 if let Some(val) = actual.min_value.get_value() {
1212 if val.data_type().is_floating() {
1213 let actual_min = actual.min_value.get_value().unwrap();
1216 let actual_max = actual.max_value.get_value().unwrap();
1217 let expected_min = expected.min_value.get_value().unwrap();
1218 let expected_max = expected.max_value.get_value().unwrap();
1219 let eps = ScalarValue::Float32(Some(1e-6));
1220
1221 assert!(actual_min.sub(expected_min).unwrap() < eps);
1222 assert!(actual_min.sub(expected_min).unwrap() < eps);
1223
1224 assert!(actual_max.sub(expected_max).unwrap() < eps);
1225 assert!(actual_max.sub(expected_max).unwrap() < eps);
1226 } else {
1227 assert_eq!(actual, expected);
1228 }
1229 } else {
1230 assert_eq!(actual, expected);
1231 }
1232 });
1233
1234 Ok(())
1235 }
1236
1237 #[tokio::test]
1238 async fn test_filter_statistics_full_selective() -> Result<()> {
1239 let schema = Schema::new(vec![
1243 Field::new("a", DataType::Int32, false),
1244 Field::new("b", DataType::Int32, false),
1245 ]);
1246 let input = Arc::new(StatisticsExec::new(
1247 Statistics {
1248 num_rows: Precision::Inexact(1000),
1249 total_byte_size: Precision::Inexact(4000),
1250 column_statistics: vec![
1251 ColumnStatistics {
1252 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1253 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1254 ..Default::default()
1255 },
1256 ColumnStatistics {
1257 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1258 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1259 ..Default::default()
1260 },
1261 ],
1262 },
1263 schema,
1264 ));
1265 let predicate = Arc::new(BinaryExpr::new(
1267 Arc::new(BinaryExpr::new(
1268 Arc::new(Column::new("a", 0)),
1269 Operator::Lt,
1270 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1271 )),
1272 Operator::And,
1273 Arc::new(BinaryExpr::new(
1274 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1275 Operator::LtEq,
1276 Arc::new(Column::new("b", 1)),
1277 )),
1278 ));
1279 let expected = input.partition_statistics(None)?.column_statistics;
1281 let filter: Arc<dyn ExecutionPlan> =
1282 Arc::new(FilterExec::try_new(predicate, input)?);
1283 let statistics = filter.partition_statistics(None)?;
1284
1285 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1286 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1287 assert_eq!(statistics.column_statistics, expected);
1288
1289 Ok(())
1290 }
1291
1292 #[tokio::test]
1293 async fn test_filter_statistics_zero_selective() -> Result<()> {
1294 let schema = Schema::new(vec![
1298 Field::new("a", DataType::Int32, false),
1299 Field::new("b", DataType::Int32, false),
1300 ]);
1301 let input = Arc::new(StatisticsExec::new(
1302 Statistics {
1303 num_rows: Precision::Inexact(1000),
1304 total_byte_size: Precision::Inexact(4000),
1305 column_statistics: vec![
1306 ColumnStatistics {
1307 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1308 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1309 ..Default::default()
1310 },
1311 ColumnStatistics {
1312 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1313 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1314 ..Default::default()
1315 },
1316 ],
1317 },
1318 schema,
1319 ));
1320 let predicate = Arc::new(BinaryExpr::new(
1322 Arc::new(BinaryExpr::new(
1323 Arc::new(Column::new("a", 0)),
1324 Operator::Gt,
1325 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1326 )),
1327 Operator::And,
1328 Arc::new(BinaryExpr::new(
1329 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1330 Operator::LtEq,
1331 Arc::new(Column::new("b", 1)),
1332 )),
1333 ));
1334 let filter: Arc<dyn ExecutionPlan> =
1335 Arc::new(FilterExec::try_new(predicate, input)?);
1336 let statistics = filter.partition_statistics(None)?;
1337
1338 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1339 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1340 assert_eq!(
1341 statistics.column_statistics,
1342 vec![
1343 ColumnStatistics {
1344 min_value: Precision::Exact(ScalarValue::Null),
1345 max_value: Precision::Exact(ScalarValue::Null),
1346 sum_value: Precision::Exact(ScalarValue::Null),
1347 distinct_count: Precision::Exact(0),
1348 null_count: Precision::Exact(0),
1349 byte_size: Precision::Absent,
1350 },
1351 ColumnStatistics {
1352 min_value: Precision::Exact(ScalarValue::Null),
1353 max_value: Precision::Exact(ScalarValue::Null),
1354 sum_value: Precision::Exact(ScalarValue::Null),
1355 distinct_count: Precision::Exact(0),
1356 null_count: Precision::Exact(0),
1357 byte_size: Precision::Absent,
1358 },
1359 ]
1360 );
1361
1362 Ok(())
1363 }
1364
1365 #[tokio::test]
1366 async fn test_filter_statistics_more_inputs() -> Result<()> {
1367 let schema = Schema::new(vec![
1368 Field::new("a", DataType::Int32, false),
1369 Field::new("b", DataType::Int32, false),
1370 ]);
1371 let input = Arc::new(StatisticsExec::new(
1372 Statistics {
1373 num_rows: Precision::Inexact(1000),
1374 total_byte_size: Precision::Inexact(4000),
1375 column_statistics: vec![
1376 ColumnStatistics {
1377 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1378 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1379 ..Default::default()
1380 },
1381 ColumnStatistics {
1382 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1383 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1384 ..Default::default()
1385 },
1386 ],
1387 },
1388 schema,
1389 ));
1390 let predicate = Arc::new(BinaryExpr::new(
1392 Arc::new(Column::new("a", 0)),
1393 Operator::Lt,
1394 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1395 ));
1396 let filter: Arc<dyn ExecutionPlan> =
1397 Arc::new(FilterExec::try_new(predicate, input)?);
1398 let statistics = filter.partition_statistics(None)?;
1399
1400 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1401 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1402 assert_eq!(
1403 statistics.column_statistics,
1404 vec![
1405 ColumnStatistics {
1406 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1407 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1408 ..Default::default()
1409 },
1410 ColumnStatistics {
1411 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1412 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1413 ..Default::default()
1414 },
1415 ]
1416 );
1417
1418 Ok(())
1419 }
1420
1421 #[tokio::test]
1422 async fn test_empty_input_statistics() -> Result<()> {
1423 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1424 let input = Arc::new(StatisticsExec::new(
1425 Statistics::new_unknown(&schema),
1426 schema,
1427 ));
1428 let predicate = Arc::new(BinaryExpr::new(
1430 Arc::new(BinaryExpr::new(
1431 Arc::new(Column::new("a", 0)),
1432 Operator::LtEq,
1433 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1434 )),
1435 Operator::And,
1436 Arc::new(BinaryExpr::new(
1437 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1438 Operator::LtEq,
1439 Arc::new(BinaryExpr::new(
1440 Arc::new(Column::new("a", 0)),
1441 Operator::Minus,
1442 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1443 )),
1444 )),
1445 ));
1446 let filter: Arc<dyn ExecutionPlan> =
1447 Arc::new(FilterExec::try_new(predicate, input)?);
1448 let filter_statistics = filter.partition_statistics(None)?;
1449
1450 let expected_filter_statistics = Statistics {
1451 num_rows: Precision::Absent,
1452 total_byte_size: Precision::Absent,
1453 column_statistics: vec![ColumnStatistics {
1454 null_count: Precision::Absent,
1455 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1456 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1457 sum_value: Precision::Absent,
1458 distinct_count: Precision::Absent,
1459 byte_size: Precision::Absent,
1460 }],
1461 };
1462
1463 assert_eq!(filter_statistics, expected_filter_statistics);
1464
1465 Ok(())
1466 }
1467
1468 #[tokio::test]
1469 async fn test_statistics_with_constant_column() -> Result<()> {
1470 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1471 let input = Arc::new(StatisticsExec::new(
1472 Statistics::new_unknown(&schema),
1473 schema,
1474 ));
1475 let predicate = Arc::new(BinaryExpr::new(
1477 Arc::new(Column::new("a", 0)),
1478 Operator::Eq,
1479 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1480 ));
1481 let filter: Arc<dyn ExecutionPlan> =
1482 Arc::new(FilterExec::try_new(predicate, input)?);
1483 let filter_statistics = filter.partition_statistics(None)?;
1484 assert!(filter_statistics.column_statistics[0].is_singleton());
1486
1487 Ok(())
1488 }
1489
1490 #[tokio::test]
1491 async fn test_validation_filter_selectivity() -> Result<()> {
1492 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1493 let input = Arc::new(StatisticsExec::new(
1494 Statistics::new_unknown(&schema),
1495 schema,
1496 ));
1497 let predicate = Arc::new(BinaryExpr::new(
1499 Arc::new(Column::new("a", 0)),
1500 Operator::Eq,
1501 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1502 ));
1503 let filter = FilterExec::try_new(predicate, input)?;
1504 assert!(filter.with_default_selectivity(120).is_err());
1505 Ok(())
1506 }
1507
1508 #[tokio::test]
1509 async fn test_custom_filter_selectivity() -> Result<()> {
1510 let schema =
1512 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1513 let input = Arc::new(StatisticsExec::new(
1514 Statistics {
1515 num_rows: Precision::Inexact(1000),
1516 total_byte_size: Precision::Inexact(4000),
1517 column_statistics: vec![ColumnStatistics {
1518 ..Default::default()
1519 }],
1520 },
1521 schema,
1522 ));
1523 let predicate = Arc::new(BinaryExpr::new(
1525 Arc::new(Column::new("a", 0)),
1526 Operator::Eq,
1527 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1528 ));
1529 let filter = FilterExec::try_new(predicate, input)?;
1530 let statistics = filter.partition_statistics(None)?;
1531 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1532 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1533 let filter = filter.with_default_selectivity(40)?;
1534 let statistics = filter.partition_statistics(None)?;
1535 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1536 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1537 Ok(())
1538 }
1539
1540 #[test]
1541 fn test_equivalence_properties_union_type() -> Result<()> {
1542 let union_type = DataType::Union(
1543 UnionFields::new(
1544 vec![0, 1],
1545 vec![
1546 Field::new("f1", DataType::Int32, true),
1547 Field::new("f2", DataType::Utf8, true),
1548 ],
1549 ),
1550 UnionMode::Sparse,
1551 );
1552
1553 let schema = Arc::new(Schema::new(vec![
1554 Field::new("c1", DataType::Int32, true),
1555 Field::new("c2", union_type, true),
1556 ]));
1557
1558 let exec = FilterExec::try_new(
1559 binary(
1560 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1561 Operator::And,
1562 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1563 &schema,
1564 )?,
1565 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1566 )?;
1567
1568 exec.partition_statistics(None).unwrap();
1569
1570 Ok(())
1571 }
1572}