1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{ready, Context, Poll};
22
23use itertools::Itertools;
24
25use super::{
26 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27 RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::common::can_project;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33 FilterPushdownPropagation, PushedDown, PushedDownPredicate,
34};
35use crate::projection::{
36 make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
37 ProjectionExec, ProjectionExpr,
38};
39use crate::{
40 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
41 DisplayFormatType, ExecutionPlan,
42};
43
44use arrow::compute::filter_record_batch;
45use arrow::datatypes::{DataType, SchemaRef};
46use arrow::record_batch::RecordBatch;
47use datafusion_common::cast::as_boolean_array;
48use datafusion_common::config::ConfigOptions;
49use datafusion_common::stats::Precision;
50use datafusion_common::{
51 internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue,
52};
53use datafusion_execution::TaskContext;
54use datafusion_expr::Operator;
55use datafusion_physical_expr::equivalence::ProjectionMapping;
56use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column};
57use datafusion_physical_expr::intervals::utils::check_support;
58use datafusion_physical_expr::utils::collect_columns;
59use datafusion_physical_expr::{
60 analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext,
61 ConstExpr, ExprBoundaries, PhysicalExpr,
62};
63
64use datafusion_physical_expr_common::physical_expr::fmt_sql;
65use futures::stream::{Stream, StreamExt};
66use log::trace;
67
68const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
69
70#[derive(Debug, Clone)]
73pub struct FilterExec {
74 predicate: Arc<dyn PhysicalExpr>,
76 input: Arc<dyn ExecutionPlan>,
78 metrics: ExecutionPlanMetricsSet,
80 default_selectivity: u8,
82 cache: PlanProperties,
84 projection: Option<Vec<usize>>,
86}
87
88impl FilterExec {
89 pub fn try_new(
91 predicate: Arc<dyn PhysicalExpr>,
92 input: Arc<dyn ExecutionPlan>,
93 ) -> Result<Self> {
94 match predicate.data_type(input.schema().as_ref())? {
95 DataType::Boolean => {
96 let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
97 let cache = Self::compute_properties(
98 &input,
99 &predicate,
100 default_selectivity,
101 None,
102 )?;
103 Ok(Self {
104 predicate,
105 input: Arc::clone(&input),
106 metrics: ExecutionPlanMetricsSet::new(),
107 default_selectivity,
108 cache,
109 projection: None,
110 })
111 }
112 other => {
113 plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
114 }
115 }
116 }
117
118 pub fn with_default_selectivity(
119 mut self,
120 default_selectivity: u8,
121 ) -> Result<Self, DataFusionError> {
122 if default_selectivity > 100 {
123 return plan_err!(
124 "Default filter selectivity value needs to be less than or equal to 100"
125 );
126 }
127 self.default_selectivity = default_selectivity;
128 Ok(self)
129 }
130
131 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
133 can_project(&self.schema(), projection.as_ref())?;
135
136 let projection = match projection {
137 Some(projection) => match &self.projection {
138 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
139 None => Some(projection),
140 },
141 None => None,
142 };
143
144 let cache = Self::compute_properties(
145 &self.input,
146 &self.predicate,
147 self.default_selectivity,
148 projection.as_ref(),
149 )?;
150 Ok(Self {
151 predicate: Arc::clone(&self.predicate),
152 input: Arc::clone(&self.input),
153 metrics: self.metrics.clone(),
154 default_selectivity: self.default_selectivity,
155 cache,
156 projection,
157 })
158 }
159
160 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
162 &self.predicate
163 }
164
165 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
167 &self.input
168 }
169
170 pub fn default_selectivity(&self) -> u8 {
172 self.default_selectivity
173 }
174
175 pub fn projection(&self) -> Option<&Vec<usize>> {
177 self.projection.as_ref()
178 }
179
180 fn statistics_helper(
182 schema: SchemaRef,
183 input_stats: Statistics,
184 predicate: &Arc<dyn PhysicalExpr>,
185 default_selectivity: u8,
186 ) -> Result<Statistics> {
187 if !check_support(predicate, &schema) {
188 let selectivity = default_selectivity as f64 / 100.0;
189 let mut stats = input_stats.to_inexact();
190 stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
191 stats.total_byte_size = stats
192 .total_byte_size
193 .with_estimated_selectivity(selectivity);
194 return Ok(stats);
195 }
196
197 let num_rows = input_stats.num_rows;
198 let total_byte_size = input_stats.total_byte_size;
199 let input_analysis_ctx = AnalysisContext::try_from_statistics(
200 &schema,
201 &input_stats.column_statistics,
202 )?;
203
204 let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?;
205
206 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
208 let num_rows = num_rows.with_estimated_selectivity(selectivity);
209 let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
210
211 let column_statistics = collect_new_statistics(
212 &input_stats.column_statistics,
213 analysis_ctx.boundaries,
214 );
215 Ok(Statistics {
216 num_rows,
217 total_byte_size,
218 column_statistics,
219 })
220 }
221
222 fn extend_constants(
223 input: &Arc<dyn ExecutionPlan>,
224 predicate: &Arc<dyn PhysicalExpr>,
225 ) -> Vec<ConstExpr> {
226 let mut res_constants = Vec::new();
227 let input_eqs = input.equivalence_properties();
228
229 let conjunctions = split_conjunction(predicate);
230 for conjunction in conjunctions {
231 if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() {
232 if binary.op() == &Operator::Eq {
233 if input_eqs.is_expr_constant(binary.left()).is_some() {
235 let across = input_eqs
236 .is_expr_constant(binary.right())
237 .unwrap_or_default();
238 res_constants
239 .push(ConstExpr::new(Arc::clone(binary.right()), across));
240 } else if input_eqs.is_expr_constant(binary.right()).is_some() {
241 let across = input_eqs
242 .is_expr_constant(binary.left())
243 .unwrap_or_default();
244 res_constants
245 .push(ConstExpr::new(Arc::clone(binary.left()), across));
246 }
247 }
248 }
249 }
250 res_constants
251 }
252 fn compute_properties(
254 input: &Arc<dyn ExecutionPlan>,
255 predicate: &Arc<dyn PhysicalExpr>,
256 default_selectivity: u8,
257 projection: Option<&Vec<usize>>,
258 ) -> Result<PlanProperties> {
259 let stats = Self::statistics_helper(
262 input.schema(),
263 input.partition_statistics(None)?,
264 predicate,
265 default_selectivity,
266 )?;
267 let mut eq_properties = input.equivalence_properties().clone();
268 let (equal_pairs, _) = collect_columns_from_predicate(predicate);
269 for (lhs, rhs) in equal_pairs {
270 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
271 }
272 let constants = collect_columns(predicate)
275 .into_iter()
276 .filter(|column| stats.column_statistics[column.index()].is_singleton())
277 .map(|column| {
278 let value = stats.column_statistics[column.index()]
279 .min_value
280 .get_value();
281 let expr = Arc::new(column) as _;
282 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
283 });
284 eq_properties.add_constants(constants)?;
286 eq_properties.add_constants(Self::extend_constants(input, predicate))?;
289
290 let mut output_partitioning = input.output_partitioning().clone();
291 if let Some(projection) = projection {
293 let schema = eq_properties.schema();
294 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
295 let out_schema = project_schema(schema, Some(projection))?;
296 output_partitioning =
297 output_partitioning.project(&projection_mapping, &eq_properties);
298 eq_properties = eq_properties.project(&projection_mapping, out_schema);
299 }
300
301 Ok(PlanProperties::new(
302 eq_properties,
303 output_partitioning,
304 input.pipeline_behavior(),
305 input.boundedness(),
306 ))
307 }
308}
309
310impl DisplayAs for FilterExec {
311 fn fmt_as(
312 &self,
313 t: DisplayFormatType,
314 f: &mut std::fmt::Formatter,
315 ) -> std::fmt::Result {
316 match t {
317 DisplayFormatType::Default | DisplayFormatType::Verbose => {
318 let display_projections = if let Some(projection) =
319 self.projection.as_ref()
320 {
321 format!(
322 ", projection=[{}]",
323 projection
324 .iter()
325 .map(|index| format!(
326 "{}@{}",
327 self.input.schema().fields().get(*index).unwrap().name(),
328 index
329 ))
330 .collect::<Vec<_>>()
331 .join(", ")
332 )
333 } else {
334 "".to_string()
335 };
336 write!(f, "FilterExec: {}{}", self.predicate, display_projections)
337 }
338 DisplayFormatType::TreeRender => {
339 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
340 }
341 }
342 }
343}
344
345impl ExecutionPlan for FilterExec {
346 fn name(&self) -> &'static str {
347 "FilterExec"
348 }
349
350 fn as_any(&self) -> &dyn Any {
352 self
353 }
354
355 fn properties(&self) -> &PlanProperties {
356 &self.cache
357 }
358
359 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
360 vec![&self.input]
361 }
362
363 fn maintains_input_order(&self) -> Vec<bool> {
364 vec![true]
366 }
367
368 fn with_new_children(
369 self: Arc<Self>,
370 mut children: Vec<Arc<dyn ExecutionPlan>>,
371 ) -> Result<Arc<dyn ExecutionPlan>> {
372 FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
373 .and_then(|e| {
374 let selectivity = e.default_selectivity();
375 e.with_default_selectivity(selectivity)
376 })
377 .and_then(|e| e.with_projection(self.projection().cloned()))
378 .map(|e| Arc::new(e) as _)
379 }
380
381 fn execute(
382 &self,
383 partition: usize,
384 context: Arc<TaskContext>,
385 ) -> Result<SendableRecordBatchStream> {
386 trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
387 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
388 Ok(Box::pin(FilterExecStream {
389 schema: self.schema(),
390 predicate: Arc::clone(&self.predicate),
391 input: self.input.execute(partition, context)?,
392 baseline_metrics,
393 projection: self.projection.clone(),
394 }))
395 }
396
397 fn metrics(&self) -> Option<MetricsSet> {
398 Some(self.metrics.clone_inner())
399 }
400
401 fn statistics(&self) -> Result<Statistics> {
404 self.partition_statistics(None)
405 }
406
407 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
408 let input_stats = self.input.partition_statistics(partition)?;
409 let stats = Self::statistics_helper(
410 self.schema(),
411 input_stats,
412 self.predicate(),
413 self.default_selectivity,
414 )?;
415 Ok(stats.project(self.projection.as_ref()))
416 }
417
418 fn cardinality_effect(&self) -> CardinalityEffect {
419 CardinalityEffect::LowerEqual
420 }
421
422 fn try_swapping_with_projection(
425 &self,
426 projection: &ProjectionExec,
427 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
428 if projection.expr().len() < projection.input().schema().fields().len() {
430 if let Some(new_predicate) =
432 update_expr(self.predicate(), projection.expr(), false)?
433 {
434 return FilterExec::try_new(
435 new_predicate,
436 make_with_child(projection, self.input())?,
437 )
438 .and_then(|e| {
439 let selectivity = self.default_selectivity();
440 e.with_default_selectivity(selectivity)
441 })
442 .map(|e| Some(Arc::new(e) as _));
443 }
444 }
445 try_embed_projection(projection, self)
446 }
447
448 fn gather_filters_for_pushdown(
449 &self,
450 phase: FilterPushdownPhase,
451 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
452 _config: &ConfigOptions,
453 ) -> Result<FilterDescription> {
454 if !matches!(phase, FilterPushdownPhase::Pre) {
455 let filter_supports = parent_filters
457 .into_iter()
458 .map(PushedDownPredicate::supported)
459 .collect();
460 return Ok(FilterDescription::new().with_child(ChildFilterDescription {
461 parent_filters: filter_supports,
462 self_filters: vec![],
463 }));
464 }
465
466 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
467 .with_self_filters(
468 split_conjunction(&self.predicate)
469 .into_iter()
470 .cloned()
471 .collect(),
472 );
473
474 Ok(FilterDescription::new().with_child(child))
475 }
476
477 fn handle_child_pushdown_result(
478 &self,
479 phase: FilterPushdownPhase,
480 child_pushdown_result: ChildPushdownResult,
481 _config: &ConfigOptions,
482 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
483 if !matches!(phase, FilterPushdownPhase::Pre) {
484 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
485 }
486 let unsupported_parent_filters =
488 child_pushdown_result.parent_filters.iter().filter_map(|f| {
489 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
490 });
491 let unsupported_self_filters = child_pushdown_result
492 .self_filters
493 .first()
494 .expect("we have exactly one child")
495 .iter()
496 .filter_map(|f| match f.discriminant {
497 PushedDown::Yes => None,
498 PushedDown::No => Some(&f.predicate),
499 })
500 .cloned();
501
502 let unhandled_filters = unsupported_parent_filters
503 .into_iter()
504 .chain(unsupported_self_filters)
505 .collect_vec();
506
507 let filter_input = Arc::clone(self.input());
509 let new_predicate = conjunction(unhandled_filters);
510 let updated_node = if new_predicate.eq(&lit(true)) {
511 match self.projection() {
513 Some(projection_indices) => {
514 let filter_child_schema = filter_input.schema();
515 let proj_exprs = projection_indices
516 .iter()
517 .map(|p| {
518 let field = filter_child_schema.field(*p).clone();
519 ProjectionExpr {
520 expr: Arc::new(Column::new(field.name(), *p))
521 as Arc<dyn PhysicalExpr>,
522 alias: field.name().to_string(),
523 }
524 })
525 .collect::<Vec<_>>();
526 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
527 as Arc<dyn ExecutionPlan>)
528 }
529 None => {
530 Some(filter_input)
532 }
533 }
534 } else if new_predicate.eq(&self.predicate) {
535 None
537 } else {
538 let new = FilterExec {
540 predicate: Arc::clone(&new_predicate),
541 input: Arc::clone(&filter_input),
542 metrics: self.metrics.clone(),
543 default_selectivity: self.default_selectivity,
544 cache: Self::compute_properties(
545 &filter_input,
546 &new_predicate,
547 self.default_selectivity,
548 self.projection.as_ref(),
549 )?,
550 projection: None,
551 };
552 Some(Arc::new(new) as _)
553 };
554
555 Ok(FilterPushdownPropagation {
556 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
557 updated_node,
558 })
559 }
560}
561
562impl EmbeddedProjection for FilterExec {
563 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
564 self.with_projection(projection)
565 }
566}
567
568fn collect_new_statistics(
573 input_column_stats: &[ColumnStatistics],
574 analysis_boundaries: Vec<ExprBoundaries>,
575) -> Vec<ColumnStatistics> {
576 analysis_boundaries
577 .into_iter()
578 .enumerate()
579 .map(
580 |(
581 idx,
582 ExprBoundaries {
583 interval,
584 distinct_count,
585 ..
586 },
587 )| {
588 let Some(interval) = interval else {
589 return ColumnStatistics {
591 null_count: Precision::Exact(0),
592 max_value: Precision::Exact(ScalarValue::Null),
593 min_value: Precision::Exact(ScalarValue::Null),
594 sum_value: Precision::Exact(ScalarValue::Null),
595 distinct_count: Precision::Exact(0),
596 };
597 };
598 let (lower, upper) = interval.into_bounds();
599 let (min_value, max_value) = if lower.eq(&upper) {
600 (Precision::Exact(lower), Precision::Exact(upper))
601 } else {
602 (Precision::Inexact(lower), Precision::Inexact(upper))
603 };
604 ColumnStatistics {
605 null_count: input_column_stats[idx].null_count.to_inexact(),
606 max_value,
607 min_value,
608 sum_value: Precision::Absent,
609 distinct_count: distinct_count.to_inexact(),
610 }
611 },
612 )
613 .collect()
614}
615
616struct FilterExecStream {
619 schema: SchemaRef,
621 predicate: Arc<dyn PhysicalExpr>,
623 input: SendableRecordBatchStream,
625 baseline_metrics: BaselineMetrics,
627 projection: Option<Vec<usize>>,
629}
630
631pub fn batch_filter(
632 batch: &RecordBatch,
633 predicate: &Arc<dyn PhysicalExpr>,
634) -> Result<RecordBatch> {
635 filter_and_project(batch, predicate, None, &batch.schema())
636}
637
638fn filter_and_project(
639 batch: &RecordBatch,
640 predicate: &Arc<dyn PhysicalExpr>,
641 projection: Option<&Vec<usize>>,
642 output_schema: &SchemaRef,
643) -> Result<RecordBatch> {
644 predicate
645 .evaluate(batch)
646 .and_then(|v| v.into_array(batch.num_rows()))
647 .and_then(|array| {
648 Ok(match (as_boolean_array(&array), projection) {
649 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
651 (Ok(filter_array), Some(projection)) => {
652 let projected_columns = projection
653 .iter()
654 .map(|i| Arc::clone(batch.column(*i)))
655 .collect();
656 let projected_batch = RecordBatch::try_new(
657 Arc::clone(output_schema),
658 projected_columns,
659 )?;
660 filter_record_batch(&projected_batch, filter_array)?
661 }
662 (Err(_), _) => {
663 return internal_err!(
664 "Cannot create filter_array from non-boolean predicates"
665 );
666 }
667 })
668 })
669}
670
671impl Stream for FilterExecStream {
672 type Item = Result<RecordBatch>;
673
674 fn poll_next(
675 mut self: Pin<&mut Self>,
676 cx: &mut Context<'_>,
677 ) -> Poll<Option<Self::Item>> {
678 let poll;
679 loop {
680 match ready!(self.input.poll_next_unpin(cx)) {
681 Some(Ok(batch)) => {
682 let timer = self.baseline_metrics.elapsed_compute().timer();
683 let filtered_batch = filter_and_project(
684 &batch,
685 &self.predicate,
686 self.projection.as_ref(),
687 &self.schema,
688 )?;
689 timer.done();
690 if filtered_batch.num_rows() == 0 {
692 continue;
693 }
694 poll = Poll::Ready(Some(Ok(filtered_batch)));
695 break;
696 }
697 value => {
698 poll = Poll::Ready(value);
699 break;
700 }
701 }
702 }
703 self.baseline_metrics.record_poll(poll)
704 }
705
706 fn size_hint(&self) -> (usize, Option<usize>) {
707 self.input.size_hint()
709 }
710}
711
712impl RecordBatchStream for FilterExecStream {
713 fn schema(&self) -> SchemaRef {
714 Arc::clone(&self.schema)
715 }
716}
717
718pub fn collect_columns_from_predicate(
720 predicate: &'_ Arc<dyn PhysicalExpr>,
721) -> EqualAndNonEqual<'_> {
722 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
723 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
724
725 let predicates = split_conjunction(predicate);
726 predicates.into_iter().for_each(|p| {
727 if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
728 match binary.op() {
729 Operator::Eq => {
730 eq_predicate_columns.push((binary.left(), binary.right()))
731 }
732 Operator::NotEq => {
733 ne_predicate_columns.push((binary.left(), binary.right()))
734 }
735 _ => {}
736 }
737 }
738 });
739
740 (eq_predicate_columns, ne_predicate_columns)
741}
742
743pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
745
746pub type EqualAndNonEqual<'a> =
748 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use crate::empty::EmptyExec;
754 use crate::expressions::*;
755 use crate::test;
756 use crate::test::exec::StatisticsExec;
757 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
758 use datafusion_common::ScalarValue;
759
760 #[tokio::test]
761 async fn collect_columns_predicates() -> Result<()> {
762 let schema = test::aggr_test_schema();
763 let predicate: Arc<dyn PhysicalExpr> = binary(
764 binary(
765 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
766 Operator::And,
767 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
768 &schema,
769 )?,
770 Operator::And,
771 binary(
772 binary(
773 col("c2", &schema)?,
774 Operator::Eq,
775 col("c9", &schema)?,
776 &schema,
777 )?,
778 Operator::And,
779 binary(
780 col("c1", &schema)?,
781 Operator::NotEq,
782 col("c13", &schema)?,
783 &schema,
784 )?,
785 &schema,
786 )?,
787 &schema,
788 )?;
789
790 let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate);
791 assert_eq!(2, equal_pairs.len());
792 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
793 assert!(equal_pairs[0].1.eq(&lit(4u32)));
794
795 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
796 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
797
798 assert_eq!(1, ne_pairs.len());
799 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
800 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
801
802 Ok(())
803 }
804
805 #[tokio::test]
806 async fn test_filter_statistics_basic_expr() -> Result<()> {
807 let bytes_per_row = 4;
810 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
811 let input = Arc::new(StatisticsExec::new(
812 Statistics {
813 num_rows: Precision::Inexact(100),
814 total_byte_size: Precision::Inexact(100 * bytes_per_row),
815 column_statistics: vec![ColumnStatistics {
816 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
817 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
818 ..Default::default()
819 }],
820 },
821 schema.clone(),
822 ));
823
824 let predicate: Arc<dyn PhysicalExpr> =
826 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
827
828 let filter: Arc<dyn ExecutionPlan> =
830 Arc::new(FilterExec::try_new(predicate, input)?);
831
832 let statistics = filter.partition_statistics(None)?;
833 assert_eq!(statistics.num_rows, Precision::Inexact(25));
834 assert_eq!(
835 statistics.total_byte_size,
836 Precision::Inexact(25 * bytes_per_row)
837 );
838 assert_eq!(
839 statistics.column_statistics,
840 vec![ColumnStatistics {
841 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
842 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
843 ..Default::default()
844 }]
845 );
846
847 Ok(())
848 }
849
850 #[tokio::test]
851 async fn test_filter_statistics_column_level_nested() -> Result<()> {
852 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
855 let input = Arc::new(StatisticsExec::new(
856 Statistics {
857 num_rows: Precision::Inexact(100),
858 column_statistics: vec![ColumnStatistics {
859 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
860 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
861 ..Default::default()
862 }],
863 total_byte_size: Precision::Absent,
864 },
865 schema.clone(),
866 ));
867
868 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
870 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
871 input,
872 )?);
873
874 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
878 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
879 sub_filter,
880 )?);
881
882 let statistics = filter.partition_statistics(None)?;
883 assert_eq!(statistics.num_rows, Precision::Inexact(16));
884 assert_eq!(
885 statistics.column_statistics,
886 vec![ColumnStatistics {
887 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
888 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
889 ..Default::default()
890 }]
891 );
892
893 Ok(())
894 }
895
896 #[tokio::test]
897 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
898 let schema = Schema::new(vec![
902 Field::new("a", DataType::Int32, false),
903 Field::new("b", DataType::Int32, false),
904 ]);
905 let input = Arc::new(StatisticsExec::new(
906 Statistics {
907 num_rows: Precision::Inexact(100),
908 column_statistics: vec![
909 ColumnStatistics {
910 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
911 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
912 ..Default::default()
913 },
914 ColumnStatistics {
915 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
916 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
917 ..Default::default()
918 },
919 ],
920 total_byte_size: Precision::Absent,
921 },
922 schema.clone(),
923 ));
924
925 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
927 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
928 input,
929 )?);
930
931 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
933 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
934 a_lte_25,
935 )?);
936
937 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
939 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
940 b_gt_5,
941 )?);
942 let statistics = filter.partition_statistics(None)?;
943 assert_eq!(statistics.num_rows, Precision::Inexact(2));
950 assert_eq!(
951 statistics.column_statistics,
952 vec![
953 ColumnStatistics {
954 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
955 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
956 ..Default::default()
957 },
958 ColumnStatistics {
959 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
960 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
961 ..Default::default()
962 }
963 ]
964 );
965
966 Ok(())
967 }
968
969 #[tokio::test]
970 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
971 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
974 let input = Arc::new(StatisticsExec::new(
975 Statistics::new_unknown(&schema),
976 schema.clone(),
977 ));
978
979 let predicate: Arc<dyn PhysicalExpr> =
981 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
982
983 let filter: Arc<dyn ExecutionPlan> =
985 Arc::new(FilterExec::try_new(predicate, input)?);
986
987 let statistics = filter.partition_statistics(None)?;
988 assert_eq!(statistics.num_rows, Precision::Absent);
989
990 Ok(())
991 }
992
993 #[tokio::test]
994 async fn test_filter_statistics_multiple_columns() -> Result<()> {
995 let schema = Schema::new(vec![
1000 Field::new("a", DataType::Int32, false),
1001 Field::new("b", DataType::Int32, false),
1002 Field::new("c", DataType::Float32, false),
1003 ]);
1004 let input = Arc::new(StatisticsExec::new(
1005 Statistics {
1006 num_rows: Precision::Inexact(1000),
1007 total_byte_size: Precision::Inexact(4000),
1008 column_statistics: vec![
1009 ColumnStatistics {
1010 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1011 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1012 ..Default::default()
1013 },
1014 ColumnStatistics {
1015 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1016 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1017 ..Default::default()
1018 },
1019 ColumnStatistics {
1020 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1021 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1022 ..Default::default()
1023 },
1024 ],
1025 },
1026 schema,
1027 ));
1028 let predicate = Arc::new(BinaryExpr::new(
1030 Arc::new(BinaryExpr::new(
1031 Arc::new(Column::new("a", 0)),
1032 Operator::LtEq,
1033 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1034 )),
1035 Operator::And,
1036 Arc::new(BinaryExpr::new(
1037 Arc::new(BinaryExpr::new(
1038 Arc::new(Column::new("b", 1)),
1039 Operator::Eq,
1040 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1041 )),
1042 Operator::And,
1043 Arc::new(BinaryExpr::new(
1044 Arc::new(BinaryExpr::new(
1045 Arc::new(Column::new("c", 2)),
1046 Operator::LtEq,
1047 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1048 )),
1049 Operator::And,
1050 Arc::new(BinaryExpr::new(
1051 Arc::new(Column::new("a", 0)),
1052 Operator::Gt,
1053 Arc::new(Column::new("b", 1)),
1054 )),
1055 )),
1056 )),
1057 ));
1058 let filter: Arc<dyn ExecutionPlan> =
1059 Arc::new(FilterExec::try_new(predicate, input)?);
1060 let statistics = filter.partition_statistics(None)?;
1061 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1065 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1066 let exp_col_stats = vec![
1067 ColumnStatistics {
1068 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1069 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1070 ..Default::default()
1071 },
1072 ColumnStatistics {
1073 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1074 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1075 ..Default::default()
1076 },
1077 ColumnStatistics {
1078 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1079 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1080 ..Default::default()
1081 },
1082 ];
1083 let _ = exp_col_stats
1084 .into_iter()
1085 .zip(statistics.column_statistics)
1086 .map(|(expected, actual)| {
1087 if let Some(val) = actual.min_value.get_value() {
1088 if val.data_type().is_floating() {
1089 let actual_min = actual.min_value.get_value().unwrap();
1092 let actual_max = actual.max_value.get_value().unwrap();
1093 let expected_min = expected.min_value.get_value().unwrap();
1094 let expected_max = expected.max_value.get_value().unwrap();
1095 let eps = ScalarValue::Float32(Some(1e-6));
1096
1097 assert!(actual_min.sub(expected_min).unwrap() < eps);
1098 assert!(actual_min.sub(expected_min).unwrap() < eps);
1099
1100 assert!(actual_max.sub(expected_max).unwrap() < eps);
1101 assert!(actual_max.sub(expected_max).unwrap() < eps);
1102 } else {
1103 assert_eq!(actual, expected);
1104 }
1105 } else {
1106 assert_eq!(actual, expected);
1107 }
1108 });
1109
1110 Ok(())
1111 }
1112
1113 #[tokio::test]
1114 async fn test_filter_statistics_full_selective() -> Result<()> {
1115 let schema = Schema::new(vec![
1119 Field::new("a", DataType::Int32, false),
1120 Field::new("b", DataType::Int32, false),
1121 ]);
1122 let input = Arc::new(StatisticsExec::new(
1123 Statistics {
1124 num_rows: Precision::Inexact(1000),
1125 total_byte_size: Precision::Inexact(4000),
1126 column_statistics: vec![
1127 ColumnStatistics {
1128 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1129 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1130 ..Default::default()
1131 },
1132 ColumnStatistics {
1133 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1134 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1135 ..Default::default()
1136 },
1137 ],
1138 },
1139 schema,
1140 ));
1141 let predicate = Arc::new(BinaryExpr::new(
1143 Arc::new(BinaryExpr::new(
1144 Arc::new(Column::new("a", 0)),
1145 Operator::Lt,
1146 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1147 )),
1148 Operator::And,
1149 Arc::new(BinaryExpr::new(
1150 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1151 Operator::LtEq,
1152 Arc::new(Column::new("b", 1)),
1153 )),
1154 ));
1155 let expected = input.partition_statistics(None)?.column_statistics;
1157 let filter: Arc<dyn ExecutionPlan> =
1158 Arc::new(FilterExec::try_new(predicate, input)?);
1159 let statistics = filter.partition_statistics(None)?;
1160
1161 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1162 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1163 assert_eq!(statistics.column_statistics, expected);
1164
1165 Ok(())
1166 }
1167
1168 #[tokio::test]
1169 async fn test_filter_statistics_zero_selective() -> Result<()> {
1170 let schema = Schema::new(vec![
1174 Field::new("a", DataType::Int32, false),
1175 Field::new("b", DataType::Int32, false),
1176 ]);
1177 let input = Arc::new(StatisticsExec::new(
1178 Statistics {
1179 num_rows: Precision::Inexact(1000),
1180 total_byte_size: Precision::Inexact(4000),
1181 column_statistics: vec![
1182 ColumnStatistics {
1183 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1184 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1185 ..Default::default()
1186 },
1187 ColumnStatistics {
1188 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1189 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1190 ..Default::default()
1191 },
1192 ],
1193 },
1194 schema,
1195 ));
1196 let predicate = Arc::new(BinaryExpr::new(
1198 Arc::new(BinaryExpr::new(
1199 Arc::new(Column::new("a", 0)),
1200 Operator::Gt,
1201 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1202 )),
1203 Operator::And,
1204 Arc::new(BinaryExpr::new(
1205 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1206 Operator::LtEq,
1207 Arc::new(Column::new("b", 1)),
1208 )),
1209 ));
1210 let filter: Arc<dyn ExecutionPlan> =
1211 Arc::new(FilterExec::try_new(predicate, input)?);
1212 let statistics = filter.partition_statistics(None)?;
1213
1214 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1215 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1216 assert_eq!(
1217 statistics.column_statistics,
1218 vec![
1219 ColumnStatistics {
1220 min_value: Precision::Exact(ScalarValue::Null),
1221 max_value: Precision::Exact(ScalarValue::Null),
1222 sum_value: Precision::Exact(ScalarValue::Null),
1223 distinct_count: Precision::Exact(0),
1224 null_count: Precision::Exact(0),
1225 },
1226 ColumnStatistics {
1227 min_value: Precision::Exact(ScalarValue::Null),
1228 max_value: Precision::Exact(ScalarValue::Null),
1229 sum_value: Precision::Exact(ScalarValue::Null),
1230 distinct_count: Precision::Exact(0),
1231 null_count: Precision::Exact(0),
1232 },
1233 ]
1234 );
1235
1236 Ok(())
1237 }
1238
1239 #[tokio::test]
1240 async fn test_filter_statistics_more_inputs() -> Result<()> {
1241 let schema = Schema::new(vec![
1242 Field::new("a", DataType::Int32, false),
1243 Field::new("b", DataType::Int32, false),
1244 ]);
1245 let input = Arc::new(StatisticsExec::new(
1246 Statistics {
1247 num_rows: Precision::Inexact(1000),
1248 total_byte_size: Precision::Inexact(4000),
1249 column_statistics: vec![
1250 ColumnStatistics {
1251 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1252 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1253 ..Default::default()
1254 },
1255 ColumnStatistics {
1256 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1257 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1258 ..Default::default()
1259 },
1260 ],
1261 },
1262 schema,
1263 ));
1264 let predicate = Arc::new(BinaryExpr::new(
1266 Arc::new(Column::new("a", 0)),
1267 Operator::Lt,
1268 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1269 ));
1270 let filter: Arc<dyn ExecutionPlan> =
1271 Arc::new(FilterExec::try_new(predicate, input)?);
1272 let statistics = filter.partition_statistics(None)?;
1273
1274 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1275 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1276 assert_eq!(
1277 statistics.column_statistics,
1278 vec![
1279 ColumnStatistics {
1280 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1281 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1282 ..Default::default()
1283 },
1284 ColumnStatistics {
1285 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1286 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1287 ..Default::default()
1288 },
1289 ]
1290 );
1291
1292 Ok(())
1293 }
1294
1295 #[tokio::test]
1296 async fn test_empty_input_statistics() -> Result<()> {
1297 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1298 let input = Arc::new(StatisticsExec::new(
1299 Statistics::new_unknown(&schema),
1300 schema,
1301 ));
1302 let predicate = Arc::new(BinaryExpr::new(
1304 Arc::new(BinaryExpr::new(
1305 Arc::new(Column::new("a", 0)),
1306 Operator::LtEq,
1307 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1308 )),
1309 Operator::And,
1310 Arc::new(BinaryExpr::new(
1311 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1312 Operator::LtEq,
1313 Arc::new(BinaryExpr::new(
1314 Arc::new(Column::new("a", 0)),
1315 Operator::Minus,
1316 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1317 )),
1318 )),
1319 ));
1320 let filter: Arc<dyn ExecutionPlan> =
1321 Arc::new(FilterExec::try_new(predicate, input)?);
1322 let filter_statistics = filter.partition_statistics(None)?;
1323
1324 let expected_filter_statistics = Statistics {
1325 num_rows: Precision::Absent,
1326 total_byte_size: Precision::Absent,
1327 column_statistics: vec![ColumnStatistics {
1328 null_count: Precision::Absent,
1329 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1330 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1331 sum_value: Precision::Absent,
1332 distinct_count: Precision::Absent,
1333 }],
1334 };
1335
1336 assert_eq!(filter_statistics, expected_filter_statistics);
1337
1338 Ok(())
1339 }
1340
1341 #[tokio::test]
1342 async fn test_statistics_with_constant_column() -> Result<()> {
1343 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1344 let input = Arc::new(StatisticsExec::new(
1345 Statistics::new_unknown(&schema),
1346 schema,
1347 ));
1348 let predicate = Arc::new(BinaryExpr::new(
1350 Arc::new(Column::new("a", 0)),
1351 Operator::Eq,
1352 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1353 ));
1354 let filter: Arc<dyn ExecutionPlan> =
1355 Arc::new(FilterExec::try_new(predicate, input)?);
1356 let filter_statistics = filter.partition_statistics(None)?;
1357 assert!(filter_statistics.column_statistics[0].is_singleton());
1359
1360 Ok(())
1361 }
1362
1363 #[tokio::test]
1364 async fn test_validation_filter_selectivity() -> Result<()> {
1365 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1366 let input = Arc::new(StatisticsExec::new(
1367 Statistics::new_unknown(&schema),
1368 schema,
1369 ));
1370 let predicate = Arc::new(BinaryExpr::new(
1372 Arc::new(Column::new("a", 0)),
1373 Operator::Eq,
1374 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1375 ));
1376 let filter = FilterExec::try_new(predicate, input)?;
1377 assert!(filter.with_default_selectivity(120).is_err());
1378 Ok(())
1379 }
1380
1381 #[tokio::test]
1382 async fn test_custom_filter_selectivity() -> Result<()> {
1383 let schema =
1385 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1386 let input = Arc::new(StatisticsExec::new(
1387 Statistics {
1388 num_rows: Precision::Inexact(1000),
1389 total_byte_size: Precision::Inexact(4000),
1390 column_statistics: vec![ColumnStatistics {
1391 ..Default::default()
1392 }],
1393 },
1394 schema,
1395 ));
1396 let predicate = Arc::new(BinaryExpr::new(
1398 Arc::new(Column::new("a", 0)),
1399 Operator::Eq,
1400 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1401 ));
1402 let filter = FilterExec::try_new(predicate, input)?;
1403 let statistics = filter.partition_statistics(None)?;
1404 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1405 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1406 let filter = filter.with_default_selectivity(40)?;
1407 let statistics = filter.partition_statistics(None)?;
1408 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1409 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1410 Ok(())
1411 }
1412
1413 #[test]
1414 fn test_equivalence_properties_union_type() -> Result<()> {
1415 let union_type = DataType::Union(
1416 UnionFields::new(
1417 vec![0, 1],
1418 vec![
1419 Field::new("f1", DataType::Int32, true),
1420 Field::new("f2", DataType::Utf8, true),
1421 ],
1422 ),
1423 UnionMode::Sparse,
1424 );
1425
1426 let schema = Arc::new(Schema::new(vec![
1427 Field::new("c1", DataType::Int32, true),
1428 Field::new("c2", union_type, true),
1429 ]));
1430
1431 let exec = FilterExec::try_new(
1432 binary(
1433 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1434 Operator::And,
1435 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1436 &schema,
1437 )?,
1438 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1439 )?;
1440
1441 exec.partition_statistics(None).unwrap();
1442
1443 Ok(())
1444 }
1445}