1use std::collections::hash_map::Entry;
19use std::collections::{HashMap, HashSet};
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll, ready};
23
24use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
25use itertools::Itertools;
26
27use super::{
28 ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
29 RecordBatchStream, SendableRecordBatchStream, Statistics,
30};
31use crate::check_if_same_properties;
32use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
33use crate::common::can_project;
34use crate::execution_plan::CardinalityEffect;
35use crate::filter_pushdown::{
36 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
37 FilterPushdownPropagation, PushedDown,
38};
39use crate::limit::LocalLimitExec;
40use crate::metrics::{MetricBuilder, MetricType};
41use crate::projection::{
42 EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
43 try_embed_projection, update_expr,
44};
45use crate::stream::EmptyRecordBatchStream;
46use crate::{
47 DisplayFormatType, ExecutionPlan,
48 metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
49};
50
51use arrow::compute::filter_record_batch;
52use arrow::datatypes::{DataType, SchemaRef};
53use arrow::record_batch::RecordBatch;
54use datafusion_common::cast::as_boolean_array;
55use datafusion_common::config::ConfigOptions;
56use datafusion_common::stats::Precision;
57use datafusion_common::{
58 DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
59};
60use datafusion_execution::TaskContext;
61use datafusion_expr::Operator;
62use datafusion_physical_expr::equivalence::ProjectionMapping;
63use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
64use datafusion_physical_expr::intervals::utils::check_support;
65use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
66use datafusion_physical_expr::{
67 AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
68 conjunction, split_conjunction,
69};
70
71use datafusion_physical_expr_common::physical_expr::fmt_sql;
72use futures::stream::{Stream, StreamExt};
73use log::trace;
74
75const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
76const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
77
78#[derive(Debug, Clone)]
81pub struct FilterExec {
82 predicate: Arc<dyn PhysicalExpr>,
84 input: Arc<dyn ExecutionPlan>,
86 metrics: ExecutionPlanMetricsSet,
88 default_selectivity: u8,
90 cache: Arc<PlanProperties>,
92 projection: Option<ProjectionRef>,
94 batch_size: usize,
96 fetch: Option<usize>,
98}
99
100pub struct FilterExecBuilder {
102 predicate: Arc<dyn PhysicalExpr>,
103 input: Arc<dyn ExecutionPlan>,
104 projection: Option<ProjectionRef>,
105 default_selectivity: u8,
106 batch_size: usize,
107 fetch: Option<usize>,
108}
109
110impl FilterExecBuilder {
111 pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
113 Self {
114 predicate,
115 input,
116 projection: None,
117 default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY,
118 batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119 fetch: None,
120 }
121 }
122
123 pub fn with_input(mut self, input: Arc<dyn ExecutionPlan>) -> Self {
125 self.input = input;
126 self
127 }
128
129 pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
131 self.predicate = predicate;
132 self
133 }
134
135 pub fn apply_projection(self, projection: Option<Vec<usize>>) -> Result<Self> {
145 let projection = projection.map(Into::into);
146 self.apply_projection_by_ref(projection.as_ref())
147 }
148
149 pub fn apply_projection_by_ref(
151 mut self,
152 projection: Option<&ProjectionRef>,
153 ) -> Result<Self> {
154 can_project(&self.input.schema(), projection.map(AsRef::as_ref))?;
156 self.projection = combine_projections(projection, self.projection.as_ref())?;
157 Ok(self)
158 }
159
160 pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self {
162 self.default_selectivity = default_selectivity;
163 self
164 }
165
166 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
168 self.batch_size = batch_size;
169 self
170 }
171
172 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
174 self.fetch = fetch;
175 self
176 }
177
178 pub fn build(self) -> Result<FilterExec> {
180 match self.predicate.data_type(self.input.schema().as_ref())? {
182 DataType::Boolean => {}
183 other => {
184 return plan_err!(
185 "Filter predicate must return BOOLEAN values, got {other:?}"
186 );
187 }
188 }
189
190 if self.default_selectivity > 100 {
192 return plan_err!(
193 "Default filter selectivity value needs to be less than or equal to 100"
194 );
195 }
196
197 can_project(&self.input.schema(), self.projection.as_deref())?;
199
200 let cache = FilterExec::compute_properties(
202 &self.input,
203 &self.predicate,
204 self.default_selectivity,
205 self.projection.as_deref(),
206 )?;
207
208 Ok(FilterExec {
209 predicate: self.predicate,
210 input: self.input,
211 metrics: ExecutionPlanMetricsSet::new(),
212 default_selectivity: self.default_selectivity,
213 cache: Arc::new(cache),
214 projection: self.projection,
215 batch_size: self.batch_size,
216 fetch: self.fetch,
217 })
218 }
219}
220
221impl From<&FilterExec> for FilterExecBuilder {
222 fn from(exec: &FilterExec) -> Self {
223 Self {
224 predicate: Arc::clone(&exec.predicate),
225 input: Arc::clone(&exec.input),
226 projection: exec.projection.clone(),
227 default_selectivity: exec.default_selectivity,
228 batch_size: exec.batch_size,
229 fetch: exec.fetch,
230 }
235 }
236}
237
238impl FilterExec {
239 pub fn try_new(
241 predicate: Arc<dyn PhysicalExpr>,
242 input: Arc<dyn ExecutionPlan>,
243 ) -> Result<Self> {
244 FilterExecBuilder::new(predicate, input).build()
245 }
246
247 pub fn batch_size(&self) -> usize {
249 self.batch_size
250 }
251
252 pub fn with_default_selectivity(
254 mut self,
255 default_selectivity: u8,
256 ) -> Result<Self, DataFusionError> {
257 if default_selectivity > 100 {
258 return plan_err!(
259 "Default filter selectivity value needs to be less than or equal to 100"
260 );
261 }
262 self.default_selectivity = default_selectivity;
263 Ok(self)
264 }
265
266 #[deprecated(
271 since = "52.0.0",
272 note = "Use FilterExecBuilder::apply_projection instead"
273 )]
274 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
275 let builder = FilterExecBuilder::from(self);
276 builder.apply_projection(projection)?.build()
277 }
278
279 pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
281 Ok(Self {
282 predicate: Arc::clone(&self.predicate),
283 input: Arc::clone(&self.input),
284 metrics: self.metrics.clone(),
285 default_selectivity: self.default_selectivity,
286 cache: Arc::clone(&self.cache),
287 projection: self.projection.clone(),
288 batch_size,
289 fetch: self.fetch,
290 })
291 }
292
293 pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
295 &self.predicate
296 }
297
298 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
300 &self.input
301 }
302
303 pub fn default_selectivity(&self) -> u8 {
305 self.default_selectivity
306 }
307
308 pub fn projection(&self) -> &Option<ProjectionRef> {
310 &self.projection
311 }
312
313 pub(crate) fn statistics_helper(
319 schema: &SchemaRef,
320 input_stats: Statistics,
321 predicate: &Arc<dyn PhysicalExpr>,
322 default_selectivity: u8,
323 ) -> Result<Statistics> {
324 let (eq_columns, is_infeasible) = collect_equality_columns(predicate);
325
326 let input_num_rows = input_stats.num_rows;
327 let input_total_byte_size = input_stats.total_byte_size;
328
329 let (selectivity, num_rows, column_statistics) = if is_infeasible {
330 let mut cs = input_stats.to_inexact().column_statistics;
333 for col_stat in &mut cs {
334 col_stat.distinct_count = Precision::Exact(0);
335 col_stat.null_count = Precision::Exact(0);
336 col_stat.min_value = Precision::Absent;
337 col_stat.max_value = Precision::Absent;
338 col_stat.sum_value = Precision::Absent;
339 col_stat.byte_size = Precision::Exact(0);
340 }
341 (0.0, Precision::Exact(0), cs)
342 } else if !check_support(predicate, schema) {
343 let selectivity = default_selectivity as f64 / 100.0;
346 let mut cs = input_stats.to_inexact().column_statistics;
347 for &idx in &eq_columns {
348 if idx < cs.len() && cs[idx].distinct_count != Precision::Exact(0) {
349 cs[idx].distinct_count = Precision::Exact(1);
350 }
351 }
352 (
353 selectivity,
354 input_num_rows.with_estimated_selectivity(selectivity),
355 cs,
356 )
357 } else {
358 let input_analysis_ctx = AnalysisContext::try_from_statistics(
362 schema,
363 &input_stats.column_statistics,
364 )?;
365 let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
366 let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
367 let filtered_num_rows =
368 input_num_rows.with_estimated_selectivity(selectivity);
369 let cs = collect_new_statistics(
370 schema,
371 &input_stats.column_statistics,
372 analysis_ctx.boundaries,
373 match &filtered_num_rows {
374 Precision::Absent => None,
375 p => Some(*p),
376 },
377 );
378 (selectivity, filtered_num_rows, cs)
379 };
380
381 let total_byte_size =
382 input_total_byte_size.with_estimated_selectivity(selectivity);
383
384 Ok(Statistics {
385 num_rows,
386 total_byte_size,
387 column_statistics,
388 })
389 }
390
391 fn compute_properties(
393 input: &Arc<dyn ExecutionPlan>,
394 predicate: &Arc<dyn PhysicalExpr>,
395 default_selectivity: u8,
396 projection: Option<&[usize]>,
397 ) -> Result<PlanProperties> {
398 let schema = input.schema();
401 let stats = Self::statistics_helper(
402 &schema,
403 Arc::unwrap_or_clone(input.partition_statistics(None)?),
404 predicate,
405 default_selectivity,
406 )?;
407 let mut eq_properties = input.equivalence_properties().clone();
408 let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
409 for (lhs, rhs) in equal_pairs {
410 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
411 }
412 let constants = collect_columns(predicate)
415 .into_iter()
416 .filter(|column| stats.column_statistics[column.index()].is_singleton())
417 .map(|column| {
418 let value = stats.column_statistics[column.index()]
419 .min_value
420 .get_value();
421 let expr = Arc::new(column) as _;
422 ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
423 });
424 eq_properties.add_constants(constants)?;
426 eq_properties.add_constants(ConstExpr::collect_predicate_constants(
429 input.equivalence_properties(),
430 predicate,
431 ))?;
432
433 let mut output_partitioning = input.output_partitioning().clone();
434 if let Some(projection) = projection {
436 let schema = eq_properties.schema();
437 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
438 let out_schema = project_schema(schema, Some(&projection))?;
439 output_partitioning =
440 output_partitioning.project(&projection_mapping, &eq_properties);
441 eq_properties = eq_properties.project(&projection_mapping, out_schema);
442 }
443
444 Ok(PlanProperties::new(
445 eq_properties,
446 output_partitioning,
447 input.pipeline_behavior(),
448 input.boundedness(),
449 ))
450 }
451
452 fn with_new_children_and_same_properties(
453 &self,
454 mut children: Vec<Arc<dyn ExecutionPlan>>,
455 ) -> Self {
456 Self {
457 input: children.swap_remove(0),
458 metrics: ExecutionPlanMetricsSet::new(),
459 ..Self::clone(self)
460 }
461 }
462}
463
464impl DisplayAs for FilterExec {
465 fn fmt_as(
466 &self,
467 t: DisplayFormatType,
468 f: &mut std::fmt::Formatter,
469 ) -> std::fmt::Result {
470 match t {
471 DisplayFormatType::Default | DisplayFormatType::Verbose => {
472 let display_projections = if let Some(projection) =
473 self.projection.as_ref()
474 {
475 format!(
476 ", projection=[{}]",
477 projection
478 .iter()
479 .map(|index| format!(
480 "{}@{}",
481 self.input.schema().fields().get(*index).unwrap().name(),
482 index
483 ))
484 .collect::<Vec<_>>()
485 .join(", ")
486 )
487 } else {
488 "".to_string()
489 };
490 let fetch = self
491 .fetch
492 .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
493 write!(
494 f,
495 "FilterExec: {}{}{}",
496 self.predicate, display_projections, fetch
497 )
498 }
499 DisplayFormatType::TreeRender => {
500 if let Some(fetch) = self.fetch {
501 writeln!(f, "fetch={fetch}")?;
502 }
503 write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
504 }
505 }
506 }
507}
508
509impl ExecutionPlan for FilterExec {
510 fn name(&self) -> &'static str {
511 "FilterExec"
512 }
513
514 fn properties(&self) -> &Arc<PlanProperties> {
516 &self.cache
517 }
518
519 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
520 vec![&self.input]
521 }
522
523 fn maintains_input_order(&self) -> Vec<bool> {
524 vec![true]
526 }
527
528 fn with_new_children(
529 self: Arc<Self>,
530 mut children: Vec<Arc<dyn ExecutionPlan>>,
531 ) -> Result<Arc<dyn ExecutionPlan>> {
532 check_if_same_properties!(self, children);
533 let new_input = children.swap_remove(0);
534 FilterExecBuilder::from(&*self)
535 .with_input(new_input)
536 .build()
537 .map(|e| Arc::new(e) as _)
538 }
539
540 fn execute(
541 &self,
542 partition: usize,
543 context: Arc<TaskContext>,
544 ) -> Result<SendableRecordBatchStream> {
545 trace!(
546 "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
547 partition,
548 context.session_id(),
549 context.task_id()
550 );
551 let metrics = FilterExecMetrics::new(&self.metrics, partition);
552 Ok(Box::pin(FilterExecStream {
553 schema: self.schema(),
554 predicate: Arc::clone(&self.predicate),
555 input: self.input.execute(partition, context)?,
556 metrics,
557 projection: self.projection.clone(),
558 batch_coalescer: LimitedBatchCoalescer::new(
559 self.schema(),
560 self.batch_size,
561 self.fetch,
562 ),
563 }))
564 }
565
566 fn metrics(&self) -> Option<MetricsSet> {
567 Some(self.metrics.clone_inner())
568 }
569
570 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
573 let input_stats =
574 Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
575 let stats = Self::statistics_helper(
576 &self.input.schema(),
577 input_stats,
578 self.predicate(),
579 self.default_selectivity,
580 )?;
581 Ok(Arc::new(stats.project(self.projection.as_ref())))
582 }
583
584 fn cardinality_effect(&self) -> CardinalityEffect {
585 CardinalityEffect::LowerEqual
586 }
587
588 fn try_swapping_with_projection(
591 &self,
592 projection: &ProjectionExec,
593 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
594 if projection.expr().len() < projection.input().schema().fields().len() {
596 if let Some(new_predicate) =
598 update_expr(self.predicate(), projection.expr(), false)?
599 {
600 return FilterExecBuilder::from(self)
601 .with_input(make_with_child(projection, self.input())?)
602 .with_predicate(new_predicate)
603 .apply_projection(None)?
607 .build()
608 .map(|e| Some(Arc::new(e) as _));
609 }
610 }
611 try_embed_projection(projection, self)
612 }
613
614 fn gather_filters_for_pushdown(
615 &self,
616 phase: FilterPushdownPhase,
617 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
618 _config: &ConfigOptions,
619 ) -> Result<FilterDescription> {
620 if phase != FilterPushdownPhase::Pre {
621 let child =
622 ChildFilterDescription::from_child(&parent_filters, self.input())?;
623 return Ok(FilterDescription::new().with_child(child));
624 }
625
626 let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
627 .with_self_filters(
628 split_conjunction(&self.predicate)
629 .into_iter()
630 .cloned()
631 .collect(),
632 );
633
634 Ok(FilterDescription::new().with_child(child))
635 }
636
637 fn handle_child_pushdown_result(
638 &self,
639 phase: FilterPushdownPhase,
640 child_pushdown_result: ChildPushdownResult,
641 _config: &ConfigOptions,
642 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
643 if phase != FilterPushdownPhase::Pre {
644 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
645 }
646 let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
648 child_pushdown_result
649 .parent_filters
650 .iter()
651 .filter_map(|f| {
652 matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
653 })
654 .collect();
655
656 if self.projection.is_some() {
660 let input_schema = self.input().schema();
661 unsupported_parent_filters = unsupported_parent_filters
662 .into_iter()
663 .map(|expr| reassign_expr_columns(expr, &input_schema))
664 .collect::<Result<Vec<_>>>()?;
665 }
666
667 let unsupported_self_filters = child_pushdown_result
668 .self_filters
669 .first()
670 .expect("we have exactly one child")
671 .iter()
672 .filter_map(|f| match f.discriminant {
673 PushedDown::Yes => None,
674 PushedDown::No => Some(&f.predicate),
675 })
676 .cloned();
677
678 let unhandled_filters = unsupported_parent_filters
679 .into_iter()
680 .chain(unsupported_self_filters)
681 .collect_vec();
682
683 let filter_input = Arc::clone(self.input());
685 let new_predicate = conjunction(unhandled_filters);
686 let updated_node = if new_predicate.eq(&lit(true)) {
687 let filter_input = if let Some(outer_fetch) = self.fetch {
692 let effective_fetch = match filter_input.fetch() {
693 Some(inner_fetch) => outer_fetch.min(inner_fetch),
694 None => outer_fetch,
695 };
696 match filter_input.with_fetch(Some(effective_fetch)) {
697 Some(node) => node,
698 None => Arc::new(LocalLimitExec::new(filter_input, effective_fetch)),
699 }
700 } else {
701 filter_input
702 };
703 match self.projection().as_ref() {
704 Some(projection_indices) => {
705 let filter_child_schema = filter_input.schema();
706 let proj_exprs = projection_indices
707 .iter()
708 .map(|p| {
709 let field = filter_child_schema.field(*p).clone();
710 ProjectionExpr {
711 expr: Arc::new(Column::new(field.name(), *p))
712 as Arc<dyn PhysicalExpr>,
713 alias: field.name().to_string(),
714 }
715 })
716 .collect::<Vec<_>>();
717 Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
718 as Arc<dyn ExecutionPlan>)
719 }
720 None => {
721 Some(filter_input)
723 }
724 }
725 } else if new_predicate.eq(&self.predicate) {
726 None
728 } else {
729 let new = FilterExec {
731 predicate: Arc::clone(&new_predicate),
732 input: Arc::clone(&filter_input),
733 metrics: self.metrics.clone(),
734 default_selectivity: self.default_selectivity,
735 cache: Arc::new(Self::compute_properties(
736 &filter_input,
737 &new_predicate,
738 self.default_selectivity,
739 self.projection.as_deref(),
740 )?),
741 projection: self.projection.clone(),
742 batch_size: self.batch_size,
743 fetch: self.fetch,
744 };
745 Some(Arc::new(new) as _)
746 };
747
748 Ok(FilterPushdownPropagation {
749 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
750 updated_node,
751 })
752 }
753
754 fn fetch(&self) -> Option<usize> {
755 self.fetch
756 }
757
758 fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
759 Some(Arc::new(Self {
760 predicate: Arc::clone(&self.predicate),
761 input: Arc::clone(&self.input),
762 metrics: self.metrics.clone(),
763 default_selectivity: self.default_selectivity,
764 cache: Arc::clone(&self.cache),
765 projection: self.projection.clone(),
766 batch_size: self.batch_size,
767 fetch,
768 }))
769 }
770
771 fn with_preserve_order(
772 &self,
773 preserve_order: bool,
774 ) -> Option<Arc<dyn ExecutionPlan>> {
775 self.input
776 .with_preserve_order(preserve_order)
777 .and_then(|new_input| {
778 Arc::new(self.clone())
779 .with_new_children(vec![new_input])
780 .ok()
781 })
782 }
783}
784
785impl EmbeddedProjection for FilterExec {
786 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
787 FilterExecBuilder::from(self)
788 .apply_projection(projection)?
789 .build()
790 }
791}
792
793fn collect_equality_columns(predicate: &Arc<dyn PhysicalExpr>) -> (HashSet<usize>, bool) {
805 let mut eq_values: HashMap<usize, ScalarValue> = HashMap::new();
806 let mut infeasible = false;
807
808 for expr in split_conjunction(predicate) {
809 let Some(binary) = expr.downcast_ref::<BinaryExpr>() else {
810 continue;
811 };
812 if *binary.op() != Operator::Eq {
813 continue;
814 }
815 let left = binary.left();
816 let right = binary.right();
817 let pair = if let Some(col) = left.downcast_ref::<Column>()
818 && let Some(lit) = right.downcast_ref::<Literal>()
819 && !lit.value().is_null()
820 {
821 Some((col.index(), lit.value().clone()))
822 } else if let Some(col) = right.downcast_ref::<Column>()
823 && let Some(lit) = left.downcast_ref::<Literal>()
824 && !lit.value().is_null()
825 {
826 Some((col.index(), lit.value().clone()))
827 } else {
828 None
829 };
830
831 if let Some((idx, value)) = pair {
832 match eq_values.entry(idx) {
833 Entry::Occupied(prev) => {
834 if *prev.get() != value {
835 infeasible = true;
836 break;
837 }
838 }
839 Entry::Vacant(slot) => {
840 slot.insert(value);
841 }
842 }
843 }
844 }
845
846 (eq_values.into_keys().collect(), infeasible)
847}
848
849fn interval_bound_to_precision(
852 bound: ScalarValue,
853 is_exact: bool,
854) -> Precision<ScalarValue> {
855 if bound.is_null() {
856 Precision::Absent
857 } else if is_exact {
858 Precision::Exact(bound)
859 } else {
860 Precision::Inexact(bound)
861 }
862}
863
864fn collect_new_statistics(
869 schema: &SchemaRef,
870 input_column_stats: &[ColumnStatistics],
871 analysis_boundaries: Vec<ExprBoundaries>,
872 filtered_num_rows: Option<Precision<usize>>,
873) -> Vec<ColumnStatistics> {
874 analysis_boundaries
875 .into_iter()
876 .enumerate()
877 .map(
878 |(
879 idx,
880 ExprBoundaries {
881 interval,
882 distinct_count,
883 ..
884 },
885 )| {
886 let Some(interval) = interval else {
887 let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
892 .unwrap_or(ScalarValue::Null);
893 return ColumnStatistics {
894 null_count: Precision::Exact(0),
895 max_value: Precision::Exact(typed_null.clone()),
896 min_value: Precision::Exact(typed_null.clone()),
897 sum_value: Precision::Exact(typed_null),
898 distinct_count: Precision::Exact(0),
899 byte_size: Precision::Exact(0),
900 };
901 };
902 let (lower, upper) = interval.into_bounds();
903 let is_single_value =
904 !lower.is_null() && !upper.is_null() && lower == upper;
905 let min_value = interval_bound_to_precision(lower, is_single_value);
906 let max_value = interval_bound_to_precision(upper, is_single_value);
907 let capped_distinct_count = if is_single_value {
911 Precision::Exact(1)
912 } else {
913 match filtered_num_rows {
914 Some(rows) => distinct_count.to_inexact().min(&rows),
915 None => distinct_count.to_inexact(),
916 }
917 };
918 ColumnStatistics {
919 null_count: input_column_stats[idx].null_count.to_inexact(),
920 max_value,
921 min_value,
922 sum_value: Precision::Absent,
923 distinct_count: capped_distinct_count,
924 byte_size: input_column_stats[idx].byte_size,
925 }
926 },
927 )
928 .collect()
929}
930
931struct FilterExecStream {
934 schema: SchemaRef,
936 predicate: Arc<dyn PhysicalExpr>,
938 input: SendableRecordBatchStream,
940 metrics: FilterExecMetrics,
942 projection: Option<ProjectionRef>,
944 batch_coalescer: LimitedBatchCoalescer,
946}
947
948struct FilterExecMetrics {
950 baseline_metrics: BaselineMetrics,
952 selectivity: RatioMetrics,
954 }
957
958impl FilterExecMetrics {
959 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
960 Self {
961 baseline_metrics: BaselineMetrics::new(metrics, partition),
962 selectivity: MetricBuilder::new(metrics)
963 .with_type(MetricType::Summary)
964 .ratio_metrics("selectivity", partition),
965 }
966 }
967}
968
969pub fn batch_filter(
970 batch: &RecordBatch,
971 predicate: &Arc<dyn PhysicalExpr>,
972) -> Result<RecordBatch> {
973 filter_and_project(batch, predicate, None)
974}
975
976fn filter_and_project(
977 batch: &RecordBatch,
978 predicate: &Arc<dyn PhysicalExpr>,
979 projection: Option<&Vec<usize>>,
980) -> Result<RecordBatch> {
981 predicate
982 .evaluate(batch)
983 .and_then(|v| v.into_array(batch.num_rows()))
984 .and_then(|array| {
985 Ok(match (as_boolean_array(&array), projection) {
986 (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
988 (Ok(filter_array), Some(projection)) => {
989 let projected_batch = batch.project(projection)?;
990 filter_record_batch(&projected_batch, filter_array)?
991 }
992 (Err(_), _) => {
993 return internal_err!(
994 "Cannot create filter_array from non-boolean predicates"
995 );
996 }
997 })
998 })
999}
1000
1001impl Stream for FilterExecStream {
1002 type Item = Result<RecordBatch>;
1003
1004 fn poll_next(
1005 mut self: Pin<&mut Self>,
1006 cx: &mut Context<'_>,
1007 ) -> Poll<Option<Self::Item>> {
1008 let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
1009 loop {
1010 if let Some(batch) = self.batch_coalescer.next_completed_batch() {
1012 self.metrics.selectivity.add_part(batch.num_rows());
1013 let poll = Poll::Ready(Some(Ok(batch)));
1014 return self.metrics.baseline_metrics.record_poll(poll);
1015 }
1016
1017 if self.batch_coalescer.is_finished() {
1018 return Poll::Ready(None);
1020 }
1021
1022 match ready!(self.input.poll_next_unpin(cx)) {
1024 None => {
1025 self.batch_coalescer.finish()?;
1026 let input_schema = self.input.schema();
1028 self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
1029 }
1031 Some(Ok(batch)) => {
1032 let timer = elapsed_compute.timer();
1033 let status = self.predicate.as_ref()
1034 .evaluate(&batch)
1035 .and_then(|v| v.into_array(batch.num_rows()))
1036 .and_then(|array| {
1037 Ok(match self.projection.as_ref() {
1038 Some(projection) => {
1039 let projected_batch = batch.project(projection)?;
1040 (array, projected_batch)
1041 },
1042 None => (array, batch)
1043 })
1044 }).and_then(|(array, batch)| {
1045 match as_boolean_array(&array) {
1046 Ok(filter_array) => {
1047 self.metrics.selectivity.add_total(batch.num_rows());
1048 let batch = filter_record_batch(&batch, filter_array)?;
1050 let state = self.batch_coalescer.push_batch(batch)?;
1051 Ok(state)
1052 }
1053 Err(_) => {
1054 internal_err!(
1055 "Cannot create filter_array from non-boolean predicates"
1056 )
1057 }
1058 }
1059 })?;
1060 timer.done();
1061
1062 match status {
1063 PushBatchStatus::Continue => {
1064 }
1066 PushBatchStatus::LimitReached => {
1067 self.batch_coalescer.finish()?;
1069 let input_schema = self.input.schema();
1071 self.input =
1072 Box::pin(EmptyRecordBatchStream::new(input_schema));
1073 }
1075 }
1076 }
1077
1078 other => return Poll::Ready(other),
1080 }
1081 }
1082 }
1083
1084 fn size_hint(&self) -> (usize, Option<usize>) {
1085 self.input.size_hint()
1087 }
1088}
1089impl RecordBatchStream for FilterExecStream {
1090 fn schema(&self) -> SchemaRef {
1091 Arc::clone(&self.schema)
1092 }
1093}
1094
1095#[deprecated(
1097 since = "51.0.0",
1098 note = "This function will be internal in the future"
1099)]
1100pub fn collect_columns_from_predicate(
1101 predicate: &'_ Arc<dyn PhysicalExpr>,
1102) -> EqualAndNonEqual<'_> {
1103 collect_columns_from_predicate_inner(predicate)
1104}
1105
1106fn collect_columns_from_predicate_inner(
1107 predicate: &'_ Arc<dyn PhysicalExpr>,
1108) -> EqualAndNonEqual<'_> {
1109 let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1110 let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1111
1112 let predicates = split_conjunction(predicate);
1113 predicates.into_iter().for_each(|p| {
1114 if let Some(binary) = p.downcast_ref::<BinaryExpr>() {
1115 let has_direct_column_operand =
1123 binary.left().downcast_ref::<Column>().is_some()
1124 || binary.right().downcast_ref::<Column>().is_some();
1125 if !has_direct_column_operand {
1126 return;
1127 }
1128 match binary.op() {
1129 Operator::Eq => {
1130 eq_predicate_columns.push((binary.left(), binary.right()))
1131 }
1132 Operator::NotEq => {
1133 ne_predicate_columns.push((binary.left(), binary.right()))
1134 }
1135 _ => {}
1136 }
1137 }
1138 });
1139
1140 (eq_predicate_columns, ne_predicate_columns)
1141}
1142
1143pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
1145
1146pub type EqualAndNonEqual<'a> =
1148 (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
1149
1150#[cfg(test)]
1151mod tests {
1152 use super::*;
1153 use crate::empty::EmptyExec;
1154 use crate::expressions::*;
1155 use crate::test;
1156 use crate::test::exec::StatisticsExec;
1157 use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
1158
1159 #[tokio::test]
1160 async fn collect_columns_predicates() -> Result<()> {
1161 let schema = test::aggr_test_schema();
1162 let predicate: Arc<dyn PhysicalExpr> = binary(
1163 binary(
1164 binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
1165 Operator::And,
1166 binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
1167 &schema,
1168 )?,
1169 Operator::And,
1170 binary(
1171 binary(
1172 col("c2", &schema)?,
1173 Operator::Eq,
1174 col("c9", &schema)?,
1175 &schema,
1176 )?,
1177 Operator::And,
1178 binary(
1179 col("c1", &schema)?,
1180 Operator::NotEq,
1181 col("c13", &schema)?,
1182 &schema,
1183 )?,
1184 &schema,
1185 )?,
1186 &schema,
1187 )?;
1188
1189 let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
1190 assert_eq!(2, equal_pairs.len());
1191 assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
1192 assert!(equal_pairs[0].1.eq(&lit(4u32)));
1193
1194 assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
1195 assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
1196
1197 assert_eq!(1, ne_pairs.len());
1198 assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
1199 assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
1200
1201 Ok(())
1202 }
1203
1204 #[tokio::test]
1205 async fn test_filter_statistics_basic_expr() -> Result<()> {
1206 let bytes_per_row = 4;
1209 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1210 let input = Arc::new(StatisticsExec::new(
1211 Statistics {
1212 num_rows: Precision::Inexact(100),
1213 total_byte_size: Precision::Inexact(100 * bytes_per_row),
1214 column_statistics: vec![ColumnStatistics {
1215 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1216 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1217 ..Default::default()
1218 }],
1219 },
1220 schema.clone(),
1221 ));
1222
1223 let predicate: Arc<dyn PhysicalExpr> =
1225 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1226
1227 let filter: Arc<dyn ExecutionPlan> =
1229 Arc::new(FilterExec::try_new(predicate, input)?);
1230
1231 let statistics = filter.partition_statistics(None)?;
1232 assert_eq!(statistics.num_rows, Precision::Inexact(25));
1233 assert_eq!(
1234 statistics.total_byte_size,
1235 Precision::Inexact(25 * bytes_per_row)
1236 );
1237 assert_eq!(
1238 statistics.column_statistics,
1239 vec![ColumnStatistics {
1240 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1241 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1242 ..Default::default()
1243 }]
1244 );
1245
1246 Ok(())
1247 }
1248
1249 #[tokio::test]
1250 async fn test_filter_statistics_column_level_nested() -> Result<()> {
1251 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1254 let input = Arc::new(StatisticsExec::new(
1255 Statistics {
1256 num_rows: Precision::Inexact(100),
1257 column_statistics: vec![ColumnStatistics {
1258 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1259 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1260 ..Default::default()
1261 }],
1262 total_byte_size: Precision::Absent,
1263 },
1264 schema.clone(),
1265 ));
1266
1267 let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1269 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1270 input,
1271 )?);
1272
1273 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1277 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1278 sub_filter,
1279 )?);
1280
1281 let statistics = filter.partition_statistics(None)?;
1282 assert_eq!(statistics.num_rows, Precision::Inexact(16));
1283 assert_eq!(
1284 statistics.column_statistics,
1285 vec![ColumnStatistics {
1286 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1287 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1288 ..Default::default()
1289 }]
1290 );
1291
1292 Ok(())
1293 }
1294
1295 #[tokio::test]
1296 async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1297 let schema = Schema::new(vec![
1301 Field::new("a", DataType::Int32, false),
1302 Field::new("b", DataType::Int32, false),
1303 ]);
1304 let input = Arc::new(StatisticsExec::new(
1305 Statistics {
1306 num_rows: Precision::Inexact(100),
1307 column_statistics: vec![
1308 ColumnStatistics {
1309 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1310 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1311 ..Default::default()
1312 },
1313 ColumnStatistics {
1314 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1315 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1316 ..Default::default()
1317 },
1318 ],
1319 total_byte_size: Precision::Absent,
1320 },
1321 schema.clone(),
1322 ));
1323
1324 let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1326 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1327 input,
1328 )?);
1329
1330 let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1332 binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1333 a_lte_25,
1334 )?);
1335
1336 let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1338 binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1339 b_gt_5,
1340 )?);
1341 let statistics = filter.partition_statistics(None)?;
1342 assert_eq!(statistics.num_rows, Precision::Inexact(2));
1349 assert_eq!(
1350 statistics.column_statistics,
1351 vec![
1352 ColumnStatistics {
1353 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1354 max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1355 ..Default::default()
1356 },
1357 ColumnStatistics {
1358 min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1359 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1360 ..Default::default()
1361 }
1362 ]
1363 );
1364
1365 Ok(())
1366 }
1367
1368 #[tokio::test]
1369 async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1370 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1373 let input = Arc::new(StatisticsExec::new(
1374 Statistics::new_unknown(&schema),
1375 schema.clone(),
1376 ));
1377
1378 let predicate: Arc<dyn PhysicalExpr> =
1380 binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1381
1382 let filter: Arc<dyn ExecutionPlan> =
1384 Arc::new(FilterExec::try_new(predicate, input)?);
1385
1386 let statistics = filter.partition_statistics(None)?;
1387 assert_eq!(statistics.num_rows, Precision::Absent);
1388
1389 Ok(())
1390 }
1391
1392 #[tokio::test]
1393 async fn test_filter_statistics_multiple_columns() -> Result<()> {
1394 let schema = Schema::new(vec![
1399 Field::new("a", DataType::Int32, false),
1400 Field::new("b", DataType::Int32, false),
1401 Field::new("c", DataType::Float32, false),
1402 ]);
1403 let input = Arc::new(StatisticsExec::new(
1404 Statistics {
1405 num_rows: Precision::Inexact(1000),
1406 total_byte_size: Precision::Inexact(4000),
1407 column_statistics: vec![
1408 ColumnStatistics {
1409 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1410 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1411 ..Default::default()
1412 },
1413 ColumnStatistics {
1414 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1415 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1416 ..Default::default()
1417 },
1418 ColumnStatistics {
1419 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1420 max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1421 ..Default::default()
1422 },
1423 ],
1424 },
1425 schema,
1426 ));
1427 let predicate = Arc::new(BinaryExpr::new(
1429 Arc::new(BinaryExpr::new(
1430 Arc::new(Column::new("a", 0)),
1431 Operator::LtEq,
1432 Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1433 )),
1434 Operator::And,
1435 Arc::new(BinaryExpr::new(
1436 Arc::new(BinaryExpr::new(
1437 Arc::new(Column::new("b", 1)),
1438 Operator::Eq,
1439 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1440 )),
1441 Operator::And,
1442 Arc::new(BinaryExpr::new(
1443 Arc::new(BinaryExpr::new(
1444 Arc::new(Column::new("c", 2)),
1445 Operator::LtEq,
1446 Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1447 )),
1448 Operator::And,
1449 Arc::new(BinaryExpr::new(
1450 Arc::new(Column::new("a", 0)),
1451 Operator::Gt,
1452 Arc::new(Column::new("b", 1)),
1453 )),
1454 )),
1455 )),
1456 ));
1457 let filter: Arc<dyn ExecutionPlan> =
1458 Arc::new(FilterExec::try_new(predicate, input)?);
1459 let statistics = filter.partition_statistics(None)?;
1460 assert_eq!(statistics.num_rows, Precision::Inexact(134));
1464 assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1465 let exp_col_stats = vec![
1466 ColumnStatistics {
1467 min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1468 max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1469 ..Default::default()
1470 },
1471 ColumnStatistics {
1472 min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1473 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1474 ..Default::default()
1475 },
1476 ColumnStatistics {
1477 min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1478 max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1479 ..Default::default()
1480 },
1481 ];
1482 let _ = exp_col_stats
1483 .into_iter()
1484 .zip(statistics.column_statistics.clone())
1485 .map(|(expected, actual)| {
1486 if let Some(val) = actual.min_value.get_value() {
1487 if val.data_type().is_floating() {
1488 let actual_min = actual.min_value.get_value().unwrap();
1491 let actual_max = actual.max_value.get_value().unwrap();
1492 let expected_min = expected.min_value.get_value().unwrap();
1493 let expected_max = expected.max_value.get_value().unwrap();
1494 let eps = ScalarValue::Float32(Some(1e-6));
1495
1496 assert!(actual_min.sub(expected_min).unwrap() < eps);
1497 assert!(actual_min.sub(expected_min).unwrap() < eps);
1498
1499 assert!(actual_max.sub(expected_max).unwrap() < eps);
1500 assert!(actual_max.sub(expected_max).unwrap() < eps);
1501 } else {
1502 assert_eq!(actual, expected);
1503 }
1504 } else {
1505 assert_eq!(actual, expected);
1506 }
1507 });
1508
1509 Ok(())
1510 }
1511
1512 #[tokio::test]
1513 async fn test_filter_statistics_full_selective() -> Result<()> {
1514 let schema = Schema::new(vec![
1518 Field::new("a", DataType::Int32, false),
1519 Field::new("b", DataType::Int32, false),
1520 ]);
1521 let input = Arc::new(StatisticsExec::new(
1522 Statistics {
1523 num_rows: Precision::Inexact(1000),
1524 total_byte_size: Precision::Inexact(4000),
1525 column_statistics: vec![
1526 ColumnStatistics {
1527 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1528 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1529 ..Default::default()
1530 },
1531 ColumnStatistics {
1532 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1533 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1534 ..Default::default()
1535 },
1536 ],
1537 },
1538 schema,
1539 ));
1540 let predicate = Arc::new(BinaryExpr::new(
1542 Arc::new(BinaryExpr::new(
1543 Arc::new(Column::new("a", 0)),
1544 Operator::Lt,
1545 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1546 )),
1547 Operator::And,
1548 Arc::new(BinaryExpr::new(
1549 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1550 Operator::LtEq,
1551 Arc::new(Column::new("b", 1)),
1552 )),
1553 ));
1554 let expected = input.partition_statistics(None)?.column_statistics.clone();
1556 let filter: Arc<dyn ExecutionPlan> =
1557 Arc::new(FilterExec::try_new(predicate, input)?);
1558 let statistics = filter.partition_statistics(None)?;
1559
1560 assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1561 assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1562 assert_eq!(statistics.column_statistics, expected);
1563
1564 Ok(())
1565 }
1566
1567 #[tokio::test]
1568 async fn test_filter_statistics_zero_selective() -> Result<()> {
1569 let schema = Schema::new(vec![
1573 Field::new("a", DataType::Int32, false),
1574 Field::new("b", DataType::Int32, false),
1575 ]);
1576 let input = Arc::new(StatisticsExec::new(
1577 Statistics {
1578 num_rows: Precision::Inexact(1000),
1579 total_byte_size: Precision::Inexact(4000),
1580 column_statistics: vec![
1581 ColumnStatistics {
1582 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1583 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1584 ..Default::default()
1585 },
1586 ColumnStatistics {
1587 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1588 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1589 ..Default::default()
1590 },
1591 ],
1592 },
1593 schema,
1594 ));
1595 let predicate = Arc::new(BinaryExpr::new(
1597 Arc::new(BinaryExpr::new(
1598 Arc::new(Column::new("a", 0)),
1599 Operator::Gt,
1600 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1601 )),
1602 Operator::And,
1603 Arc::new(BinaryExpr::new(
1604 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1605 Operator::LtEq,
1606 Arc::new(Column::new("b", 1)),
1607 )),
1608 ));
1609 let filter: Arc<dyn ExecutionPlan> =
1610 Arc::new(FilterExec::try_new(predicate, input)?);
1611 let statistics = filter.partition_statistics(None)?;
1612
1613 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1614 assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1615 assert_eq!(
1616 statistics.column_statistics,
1617 vec![
1618 ColumnStatistics {
1619 min_value: Precision::Exact(ScalarValue::Int32(None)),
1620 max_value: Precision::Exact(ScalarValue::Int32(None)),
1621 sum_value: Precision::Exact(ScalarValue::Int32(None)),
1622 distinct_count: Precision::Exact(0),
1623 null_count: Precision::Exact(0),
1624 byte_size: Precision::Exact(0),
1625 },
1626 ColumnStatistics {
1627 min_value: Precision::Exact(ScalarValue::Int32(None)),
1628 max_value: Precision::Exact(ScalarValue::Int32(None)),
1629 sum_value: Precision::Exact(ScalarValue::Int32(None)),
1630 distinct_count: Precision::Exact(0),
1631 null_count: Precision::Exact(0),
1632 byte_size: Precision::Exact(0),
1633 },
1634 ]
1635 );
1636
1637 Ok(())
1638 }
1639
1640 #[tokio::test]
1650 async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1651 let schema = Schema::new(vec![
1653 Field::new("a", DataType::Int32, false),
1654 Field::new("b", DataType::Int32, false),
1655 ]);
1656 let input = Arc::new(StatisticsExec::new(
1657 Statistics {
1658 num_rows: Precision::Inexact(1000),
1659 total_byte_size: Precision::Inexact(4000),
1660 column_statistics: vec![
1661 ColumnStatistics {
1662 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1663 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1664 ..Default::default()
1665 },
1666 ColumnStatistics {
1667 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1668 max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1669 ..Default::default()
1670 },
1671 ],
1672 },
1673 schema,
1674 ));
1675
1676 let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1678 Arc::new(Column::new("a", 0)),
1679 Operator::Gt,
1680 Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1681 ));
1682 let inner_filter: Arc<dyn ExecutionPlan> =
1683 Arc::new(FilterExec::try_new(inner_predicate, input)?);
1684
1685 let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1690 Arc::new(Column::new("a", 0)),
1691 Operator::Eq,
1692 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1693 ));
1694 let outer_filter: Arc<dyn ExecutionPlan> =
1695 Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1696
1697 let statistics = outer_filter.partition_statistics(None)?;
1699 assert_eq!(statistics.num_rows, Precision::Inexact(0));
1700
1701 Ok(())
1702 }
1703
1704 #[tokio::test]
1705 async fn test_filter_statistics_more_inputs() -> Result<()> {
1706 let schema = Schema::new(vec![
1707 Field::new("a", DataType::Int32, false),
1708 Field::new("b", DataType::Int32, false),
1709 ]);
1710 let input = Arc::new(StatisticsExec::new(
1711 Statistics {
1712 num_rows: Precision::Inexact(1000),
1713 total_byte_size: Precision::Inexact(4000),
1714 column_statistics: vec![
1715 ColumnStatistics {
1716 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1717 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1718 ..Default::default()
1719 },
1720 ColumnStatistics {
1721 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1722 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1723 ..Default::default()
1724 },
1725 ],
1726 },
1727 schema,
1728 ));
1729 let predicate = Arc::new(BinaryExpr::new(
1731 Arc::new(Column::new("a", 0)),
1732 Operator::Lt,
1733 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1734 ));
1735 let filter: Arc<dyn ExecutionPlan> =
1736 Arc::new(FilterExec::try_new(predicate, input)?);
1737 let statistics = filter.partition_statistics(None)?;
1738
1739 assert_eq!(statistics.num_rows, Precision::Inexact(490));
1740 assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1741 assert_eq!(
1742 statistics.column_statistics,
1743 vec![
1744 ColumnStatistics {
1745 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1746 max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1747 ..Default::default()
1748 },
1749 ColumnStatistics {
1750 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1751 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1752 ..Default::default()
1753 },
1754 ]
1755 );
1756
1757 Ok(())
1758 }
1759
1760 #[tokio::test]
1761 async fn test_empty_input_statistics() -> Result<()> {
1762 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1763 let input = Arc::new(StatisticsExec::new(
1764 Statistics::new_unknown(&schema),
1765 schema,
1766 ));
1767 let predicate = Arc::new(BinaryExpr::new(
1769 Arc::new(BinaryExpr::new(
1770 Arc::new(Column::new("a", 0)),
1771 Operator::LtEq,
1772 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1773 )),
1774 Operator::And,
1775 Arc::new(BinaryExpr::new(
1776 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1777 Operator::LtEq,
1778 Arc::new(BinaryExpr::new(
1779 Arc::new(Column::new("a", 0)),
1780 Operator::Minus,
1781 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1782 )),
1783 )),
1784 ));
1785 let filter: Arc<dyn ExecutionPlan> =
1786 Arc::new(FilterExec::try_new(predicate, input)?);
1787 let filter_statistics = filter.partition_statistics(None)?;
1788
1789 let expected_filter_statistics = Statistics {
1790 num_rows: Precision::Absent,
1791 total_byte_size: Precision::Absent,
1792 column_statistics: vec![ColumnStatistics {
1793 null_count: Precision::Absent,
1794 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1795 max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1796 sum_value: Precision::Absent,
1797 distinct_count: Precision::Absent,
1798 byte_size: Precision::Absent,
1799 }],
1800 };
1801
1802 assert_eq!(*filter_statistics, expected_filter_statistics);
1803
1804 Ok(())
1805 }
1806
1807 #[tokio::test]
1808 async fn test_statistics_with_constant_column() -> Result<()> {
1809 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1810 let input = Arc::new(StatisticsExec::new(
1811 Statistics::new_unknown(&schema),
1812 schema,
1813 ));
1814 let predicate = Arc::new(BinaryExpr::new(
1816 Arc::new(Column::new("a", 0)),
1817 Operator::Eq,
1818 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1819 ));
1820 let filter: Arc<dyn ExecutionPlan> =
1821 Arc::new(FilterExec::try_new(predicate, input)?);
1822 let filter_statistics = filter.partition_statistics(None)?;
1823 assert!(filter_statistics.column_statistics[0].is_singleton());
1825
1826 Ok(())
1827 }
1828
1829 #[tokio::test]
1830 async fn test_validation_filter_selectivity() -> Result<()> {
1831 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1832 let input = Arc::new(StatisticsExec::new(
1833 Statistics::new_unknown(&schema),
1834 schema,
1835 ));
1836 let predicate = Arc::new(BinaryExpr::new(
1838 Arc::new(Column::new("a", 0)),
1839 Operator::Eq,
1840 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1841 ));
1842 let filter = FilterExec::try_new(predicate, input)?;
1843 assert!(filter.with_default_selectivity(120).is_err());
1844 Ok(())
1845 }
1846
1847 #[tokio::test]
1848 async fn test_custom_filter_selectivity() -> Result<()> {
1849 let schema =
1851 Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1852 let input = Arc::new(StatisticsExec::new(
1853 Statistics {
1854 num_rows: Precision::Inexact(1000),
1855 total_byte_size: Precision::Inexact(4000),
1856 column_statistics: vec![ColumnStatistics {
1857 ..Default::default()
1858 }],
1859 },
1860 schema,
1861 ));
1862 let predicate = Arc::new(BinaryExpr::new(
1864 Arc::new(Column::new("a", 0)),
1865 Operator::Eq,
1866 Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1867 ));
1868 let filter = FilterExec::try_new(predicate, input)?;
1869 let statistics = filter.partition_statistics(None)?;
1870 assert_eq!(statistics.num_rows, Precision::Inexact(200));
1871 assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1872 let filter = filter.with_default_selectivity(40)?;
1873 let statistics = filter.partition_statistics(None)?;
1874 assert_eq!(statistics.num_rows, Precision::Inexact(400));
1875 assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1876 Ok(())
1877 }
1878
1879 #[test]
1880 fn test_equivalence_properties_union_type() -> Result<()> {
1881 let union_type = DataType::Union(
1882 UnionFields::try_new(
1883 vec![0, 1],
1884 vec![
1885 Field::new("f1", DataType::Int32, true),
1886 Field::new("f2", DataType::Utf8, true),
1887 ],
1888 )
1889 .unwrap(),
1890 UnionMode::Sparse,
1891 );
1892
1893 let schema = Arc::new(Schema::new(vec![
1894 Field::new("c1", DataType::Int32, true),
1895 Field::new("c2", union_type, true),
1896 ]));
1897
1898 let exec = FilterExec::try_new(
1899 binary(
1900 binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1901 Operator::And,
1902 binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1903 &schema,
1904 )?,
1905 Arc::new(EmptyExec::new(Arc::clone(&schema))),
1906 )?;
1907
1908 exec.partition_statistics(None).unwrap();
1909
1910 Ok(())
1911 }
1912
1913 #[tokio::test]
1914 async fn test_builder_with_projection() -> Result<()> {
1915 let schema = Arc::new(Schema::new(vec![
1917 Field::new("a", DataType::Int32, false),
1918 Field::new("b", DataType::Int32, false),
1919 Field::new("c", DataType::Int32, false),
1920 ]));
1921
1922 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1923
1924 let predicate = Arc::new(BinaryExpr::new(
1926 Arc::new(Column::new("a", 0)),
1927 Operator::Gt,
1928 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1929 ));
1930
1931 let projection = Some(vec![0, 2]);
1933 let filter = FilterExecBuilder::new(predicate, input)
1934 .apply_projection(projection.clone())
1935 .unwrap()
1936 .build()?;
1937
1938 assert_eq!(filter.projection(), &Some([0, 2].into()));
1940
1941 let output_schema = filter.schema();
1943 assert_eq!(output_schema.fields().len(), 2);
1944 assert_eq!(output_schema.field(0).name(), "a");
1945 assert_eq!(output_schema.field(1).name(), "c");
1946
1947 Ok(())
1948 }
1949
1950 #[tokio::test]
1951 async fn test_builder_without_projection() -> Result<()> {
1952 let schema = Arc::new(Schema::new(vec![
1953 Field::new("a", DataType::Int32, false),
1954 Field::new("b", DataType::Int32, false),
1955 ]));
1956
1957 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1958
1959 let predicate = Arc::new(BinaryExpr::new(
1960 Arc::new(Column::new("a", 0)),
1961 Operator::Gt,
1962 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1963 ));
1964
1965 let filter = FilterExecBuilder::new(predicate, input).build()?;
1967
1968 assert!(filter.projection().is_none());
1970
1971 let output_schema = filter.schema();
1973 assert_eq!(output_schema.fields().len(), 2);
1974
1975 Ok(())
1976 }
1977
1978 #[tokio::test]
1979 async fn test_builder_invalid_projection() -> Result<()> {
1980 let schema = Arc::new(Schema::new(vec![
1981 Field::new("a", DataType::Int32, false),
1982 Field::new("b", DataType::Int32, false),
1983 ]));
1984
1985 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1986
1987 let predicate = Arc::new(BinaryExpr::new(
1988 Arc::new(Column::new("a", 0)),
1989 Operator::Gt,
1990 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1991 ));
1992
1993 let result =
1995 FilterExecBuilder::new(predicate, input).apply_projection(Some(vec![0, 5])); assert!(result.is_err());
1999
2000 Ok(())
2001 }
2002
2003 #[tokio::test]
2004 async fn test_builder_vs_with_projection() -> Result<()> {
2005 let schema = Schema::new(vec![
2008 Field::new("a", DataType::Int32, false),
2009 Field::new("b", DataType::Int32, false),
2010 Field::new("c", DataType::Int32, false),
2011 Field::new("d", DataType::Int32, false),
2012 ]);
2013
2014 let input = Arc::new(StatisticsExec::new(
2015 Statistics {
2016 num_rows: Precision::Inexact(1000),
2017 total_byte_size: Precision::Inexact(4000),
2018 column_statistics: vec![
2019 ColumnStatistics {
2020 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2021 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2022 ..Default::default()
2023 },
2024 ColumnStatistics {
2025 ..Default::default()
2026 },
2027 ColumnStatistics {
2028 ..Default::default()
2029 },
2030 ColumnStatistics {
2031 ..Default::default()
2032 },
2033 ],
2034 },
2035 schema,
2036 ));
2037 let input: Arc<dyn ExecutionPlan> = input;
2038
2039 let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2040 Arc::new(Column::new("a", 0)),
2041 Operator::Lt,
2042 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
2043 ));
2044
2045 let projection = Some(vec![0, 2]);
2046
2047 let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
2049 .apply_projection(projection.clone())
2050 .unwrap()
2051 .build()?;
2052
2053 let filter2 = FilterExecBuilder::new(predicate, input)
2055 .apply_projection(projection)
2056 .unwrap()
2057 .build()?;
2058
2059 assert_eq!(filter1.schema(), filter2.schema());
2061 assert_eq!(filter1.projection(), filter2.projection());
2062
2063 let stats1 = filter1.partition_statistics(None)?;
2065 let stats2 = filter2.partition_statistics(None)?;
2066 assert_eq!(stats1.num_rows, stats2.num_rows);
2067 assert_eq!(stats1.total_byte_size, stats2.total_byte_size);
2068
2069 Ok(())
2070 }
2071
2072 #[tokio::test]
2073 async fn test_builder_statistics_with_projection() -> Result<()> {
2074 let schema = Schema::new(vec![
2076 Field::new("a", DataType::Int32, false),
2077 Field::new("b", DataType::Int32, false),
2078 Field::new("c", DataType::Int32, false),
2079 ]);
2080
2081 let input = Arc::new(StatisticsExec::new(
2082 Statistics {
2083 num_rows: Precision::Inexact(1000),
2084 total_byte_size: Precision::Inexact(12000),
2085 column_statistics: vec![
2086 ColumnStatistics {
2087 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2088 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2089 ..Default::default()
2090 },
2091 ColumnStatistics {
2092 min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
2093 max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2094 ..Default::default()
2095 },
2096 ColumnStatistics {
2097 min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
2098 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2099 ..Default::default()
2100 },
2101 ],
2102 },
2103 schema,
2104 ));
2105
2106 let predicate = Arc::new(BinaryExpr::new(
2108 Arc::new(Column::new("a", 0)),
2109 Operator::Lt,
2110 Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
2111 ));
2112
2113 let filter = FilterExecBuilder::new(predicate, input)
2114 .apply_projection(Some(vec![0, 2]))
2115 .unwrap()
2116 .build()?;
2117
2118 let statistics = filter.partition_statistics(None)?;
2119
2120 assert!(matches!(statistics.num_rows, Precision::Inexact(_)));
2122
2123 assert_eq!(filter.schema().fields().len(), 2);
2125
2126 Ok(())
2127 }
2128
2129 #[test]
2130 fn test_builder_predicate_validation() -> Result<()> {
2131 let schema = Arc::new(Schema::new(vec![
2133 Field::new("a", DataType::Int32, false),
2134 Field::new("b", DataType::Int32, false),
2135 ]));
2136
2137 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2138
2139 let invalid_predicate = Arc::new(Column::new("a", 0));
2141
2142 let result = FilterExecBuilder::new(invalid_predicate, input)
2144 .apply_projection(Some(vec![0]))
2145 .unwrap()
2146 .build();
2147
2148 assert!(result.is_err());
2149
2150 Ok(())
2151 }
2152
2153 #[tokio::test]
2154 async fn test_builder_projection_composition() -> Result<()> {
2155 let schema = Arc::new(Schema::new(vec![
2159 Field::new("a", DataType::Int32, false),
2160 Field::new("b", DataType::Int32, false),
2161 Field::new("c", DataType::Int32, false),
2162 Field::new("d", DataType::Int32, false),
2163 ]));
2164
2165 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2166
2167 let predicate = Arc::new(BinaryExpr::new(
2169 Arc::new(Column::new("a", 0)),
2170 Operator::Gt,
2171 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2172 ));
2173
2174 let filter = FilterExecBuilder::new(predicate, input)
2178 .apply_projection(Some(vec![0, 2, 3]))?
2179 .apply_projection(Some(vec![0, 2]))?
2180 .build()?;
2181
2182 assert_eq!(filter.projection(), &Some([0, 3].into()));
2184
2185 let output_schema = filter.schema();
2187 assert_eq!(output_schema.fields().len(), 2);
2188 assert_eq!(output_schema.field(0).name(), "a");
2189 assert_eq!(output_schema.field(1).name(), "d");
2190
2191 Ok(())
2192 }
2193
2194 #[tokio::test]
2195 async fn test_builder_projection_composition_none_clears() -> Result<()> {
2196 let schema = Arc::new(Schema::new(vec![
2198 Field::new("a", DataType::Int32, false),
2199 Field::new("b", DataType::Int32, false),
2200 ]));
2201
2202 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2203
2204 let predicate = Arc::new(BinaryExpr::new(
2205 Arc::new(Column::new("a", 0)),
2206 Operator::Gt,
2207 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2208 ));
2209
2210 let filter = FilterExecBuilder::new(predicate, input)
2212 .apply_projection(Some(vec![0]))?
2213 .apply_projection(None)?
2214 .build()?;
2215
2216 assert_eq!(filter.projection(), &None);
2218
2219 let output_schema = filter.schema();
2221 assert_eq!(output_schema.fields().len(), 2);
2222
2223 Ok(())
2224 }
2225
2226 #[test]
2227 fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
2228 let input_schema = Arc::new(Schema::new(vec![
2232 Field::new("a", DataType::Int32, false),
2233 Field::new("b", DataType::Utf8, false),
2234 Field::new("c", DataType::Float64, false),
2235 ]));
2236 let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
2237
2238 let predicate = Arc::new(BinaryExpr::new(
2240 Arc::new(Column::new("a", 0)),
2241 Operator::Gt,
2242 Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
2243 ));
2244 let filter = FilterExecBuilder::new(predicate, input)
2245 .apply_projection(Some(vec![2]))?
2246 .build()?;
2247
2248 let output_schema = filter.schema();
2250 assert_eq!(output_schema.fields().len(), 1);
2251 assert_eq!(output_schema.field(0).name(), "c");
2252
2253 let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
2255
2256 let config = ConfigOptions::new();
2257 let desc = filter.gather_filters_for_pushdown(
2258 FilterPushdownPhase::Post,
2259 vec![parent_filter],
2260 &config,
2261 )?;
2262
2263 let parent_filters = desc.parent_filters();
2266 assert_eq!(parent_filters.len(), 1); assert_eq!(parent_filters[0].len(), 1); let remapped = &parent_filters[0][0].predicate;
2269 let display = format!("{remapped}");
2270 assert_eq!(
2271 display, "c@2",
2272 "Post-phase parent filter column index must be remapped \
2273 from output schema (c@0) to input schema (c@2)"
2274 );
2275
2276 Ok(())
2277 }
2278
2279 #[test]
2287 fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
2288 let schema = test::aggr_test_schema();
2289
2290 let complex_expr: Arc<dyn PhysicalExpr> = binary(
2293 col("c2", &schema)?,
2294 Operator::IsDistinctFrom,
2295 lit(0u32),
2296 &schema,
2297 )?;
2298 let predicate: Arc<dyn PhysicalExpr> =
2299 binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
2300
2301 let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2302 assert_eq!(
2303 0,
2304 equal_pairs.len(),
2305 "Should not extract equality pairs where neither side is a Column"
2306 );
2307
2308 let predicate: Arc<dyn PhysicalExpr> =
2310 binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
2311 let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2312 assert_eq!(
2313 1,
2314 equal_pairs.len(),
2315 "Should extract equality pairs where one side is a Column"
2316 );
2317
2318 Ok(())
2319 }
2320
2321 #[tokio::test]
2324 async fn test_filter_statistics_absent_columns_stay_absent() -> Result<()> {
2325 let schema = Schema::new(vec![
2326 Field::new("a", DataType::Int32, false),
2327 Field::new("b", DataType::Int32, false),
2328 ]);
2329 let input = Arc::new(StatisticsExec::new(
2330 Statistics {
2331 num_rows: Precision::Inexact(1000),
2332 total_byte_size: Precision::Absent,
2333 column_statistics: vec![
2334 ColumnStatistics::default(),
2335 ColumnStatistics::default(),
2336 ],
2337 },
2338 schema.clone(),
2339 ));
2340
2341 let predicate = Arc::new(BinaryExpr::new(
2342 Arc::new(Column::new("a", 0)),
2343 Operator::Eq,
2344 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2345 ));
2346 let filter: Arc<dyn ExecutionPlan> =
2347 Arc::new(FilterExec::try_new(predicate, input)?);
2348
2349 let statistics = filter.partition_statistics(None)?;
2350 let col_b_stats = &statistics.column_statistics[1];
2351 assert_eq!(col_b_stats.min_value, Precision::Absent);
2352 assert_eq!(col_b_stats.max_value, Precision::Absent);
2353
2354 Ok(())
2355 }
2356
2357 #[tokio::test]
2358 async fn test_filter_statistics_equality_ndv() -> Result<()> {
2359 #[expect(clippy::type_complexity)]
2360 let cases: Vec<(
2361 &str,
2362 Vec<Field>,
2363 Vec<ColumnStatistics>,
2364 Arc<dyn PhysicalExpr>,
2365 Vec<Precision<usize>>,
2366 )> = vec![
2367 (
2368 "utf8 equality",
2369 vec![Field::new("name", DataType::Utf8, false)],
2370 vec![ColumnStatistics {
2371 distinct_count: Precision::Inexact(50),
2372 ..Default::default()
2373 }],
2374 Arc::new(BinaryExpr::new(
2375 Arc::new(Column::new("name", 0)),
2376 Operator::Eq,
2377 Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))),
2378 )),
2379 vec![Precision::Exact(1)],
2380 ),
2381 (
2382 "utf8view equality",
2383 vec![Field::new("name", DataType::Utf8View, false)],
2384 vec![ColumnStatistics {
2385 distinct_count: Precision::Inexact(50),
2386 ..Default::default()
2387 }],
2388 Arc::new(BinaryExpr::new(
2389 Arc::new(Column::new("name", 0)),
2390 Operator::Eq,
2391 Arc::new(Literal::new(ScalarValue::Utf8View(Some(
2392 "hello".to_string(),
2393 )))),
2394 )),
2395 vec![Precision::Exact(1)],
2396 ),
2397 (
2398 "largeutf8 equality",
2399 vec![Field::new("name", DataType::LargeUtf8, false)],
2400 vec![ColumnStatistics {
2401 distinct_count: Precision::Inexact(50),
2402 ..Default::default()
2403 }],
2404 Arc::new(BinaryExpr::new(
2405 Arc::new(Column::new("name", 0)),
2406 Operator::Eq,
2407 Arc::new(Literal::new(ScalarValue::LargeUtf8(Some(
2408 "hello".to_string(),
2409 )))),
2410 )),
2411 vec![Precision::Exact(1)],
2412 ),
2413 (
2414 "utf8 reversed (literal = column)",
2415 vec![Field::new("name", DataType::Utf8, false)],
2416 vec![ColumnStatistics {
2417 distinct_count: Precision::Inexact(50),
2418 ..Default::default()
2419 }],
2420 Arc::new(BinaryExpr::new(
2421 Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))),
2422 Operator::Eq,
2423 Arc::new(Column::new("name", 0)),
2424 )),
2425 vec![Precision::Exact(1)],
2426 ),
2427 (
2428 "OR preserves original NDV",
2429 vec![Field::new("name", DataType::Utf8, false)],
2430 vec![ColumnStatistics {
2431 distinct_count: Precision::Inexact(50),
2432 ..Default::default()
2433 }],
2434 Arc::new(BinaryExpr::new(
2435 Arc::new(BinaryExpr::new(
2436 Arc::new(Column::new("name", 0)),
2437 Operator::Eq,
2438 Arc::new(Literal::new(ScalarValue::Utf8(Some("a".to_string())))),
2439 )),
2440 Operator::Or,
2441 Arc::new(BinaryExpr::new(
2442 Arc::new(Column::new("name", 0)),
2443 Operator::Eq,
2444 Arc::new(Literal::new(ScalarValue::Utf8(Some("b".to_string())))),
2445 )),
2446 )),
2447 vec![Precision::Inexact(50)],
2448 ),
2449 (
2450 "AND with mixed types (Utf8 + Int32)",
2451 vec![
2452 Field::new("name", DataType::Utf8, false),
2453 Field::new("age", DataType::Int32, false),
2454 ],
2455 vec![
2456 ColumnStatistics {
2457 distinct_count: Precision::Inexact(50),
2458 ..Default::default()
2459 },
2460 ColumnStatistics {
2461 distinct_count: Precision::Inexact(80),
2462 ..Default::default()
2463 },
2464 ],
2465 Arc::new(BinaryExpr::new(
2466 Arc::new(BinaryExpr::new(
2467 Arc::new(Column::new("name", 0)),
2468 Operator::Eq,
2469 Arc::new(Literal::new(ScalarValue::Utf8(Some(
2470 "hello".to_string(),
2471 )))),
2472 )),
2473 Operator::And,
2474 Arc::new(BinaryExpr::new(
2475 Arc::new(Column::new("age", 1)),
2476 Operator::Eq,
2477 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2478 )),
2479 )),
2480 vec![Precision::Exact(1), Precision::Exact(1)],
2481 ),
2482 (
2483 "numeric equality with min/max bounds (interval analysis path)",
2484 vec![Field::new("a", DataType::Int32, false)],
2485 vec![ColumnStatistics {
2486 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2487 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2488 distinct_count: Precision::Inexact(80),
2489 ..Default::default()
2490 }],
2491 Arc::new(BinaryExpr::new(
2492 Arc::new(Column::new("a", 0)),
2493 Operator::Eq,
2494 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2495 )),
2496 vec![Precision::Exact(1)],
2497 ),
2498 (
2499 "timestamp equality",
2500 vec![Field::new(
2501 "ts",
2502 DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
2503 false,
2504 )],
2505 vec![ColumnStatistics {
2506 distinct_count: Precision::Inexact(500),
2507 ..Default::default()
2508 }],
2509 Arc::new(BinaryExpr::new(
2510 Arc::new(Column::new("ts", 0)),
2511 Operator::Eq,
2512 Arc::new(Literal::new(ScalarValue::TimestampNanosecond(
2513 Some(1_609_459_200_000_000_000),
2514 None,
2515 ))),
2516 )),
2517 vec![Precision::Exact(1)],
2518 ),
2519 (
2520 "contradictory numeric equality (infeasible)",
2521 vec![Field::new("a", DataType::Int32, false)],
2522 vec![ColumnStatistics {
2523 distinct_count: Precision::Inexact(50),
2524 ..Default::default()
2525 }],
2526 Arc::new(BinaryExpr::new(
2527 Arc::new(BinaryExpr::new(
2528 Arc::new(Column::new("a", 0)),
2529 Operator::Eq,
2530 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2531 )),
2532 Operator::And,
2533 Arc::new(BinaryExpr::new(
2534 Arc::new(Column::new("a", 0)),
2535 Operator::Eq,
2536 Arc::new(Literal::new(ScalarValue::Int32(Some(99)))),
2537 )),
2538 )),
2539 vec![Precision::Exact(0)],
2540 ),
2541 (
2542 "utf8 equality with absent input NDV",
2543 vec![Field::new("name", DataType::Utf8, false)],
2544 vec![ColumnStatistics {
2545 distinct_count: Precision::Absent,
2546 ..Default::default()
2547 }],
2548 Arc::new(BinaryExpr::new(
2549 Arc::new(Column::new("name", 0)),
2550 Operator::Eq,
2551 Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))),
2552 )),
2553 vec![Precision::Exact(1)],
2554 ),
2555 (
2556 "contradictory utf8 equality (infeasible)",
2557 vec![Field::new("name", DataType::Utf8, false)],
2558 vec![ColumnStatistics {
2559 distinct_count: Precision::Inexact(100),
2560 ..Default::default()
2561 }],
2562 Arc::new(BinaryExpr::new(
2563 Arc::new(BinaryExpr::new(
2564 Arc::new(Column::new("name", 0)),
2565 Operator::Eq,
2566 Arc::new(Literal::new(ScalarValue::Utf8(Some(
2567 "alice".to_string(),
2568 )))),
2569 )),
2570 Operator::And,
2571 Arc::new(BinaryExpr::new(
2572 Arc::new(Column::new("name", 0)),
2573 Operator::Eq,
2574 Arc::new(Literal::new(ScalarValue::Utf8(Some(
2575 "bob".to_string(),
2576 )))),
2577 )),
2578 )),
2579 vec![Precision::Exact(0)],
2580 ),
2581 (
2582 "redundant same-value equality combined with another column",
2583 vec![
2584 Field::new("a", DataType::Int32, false),
2585 Field::new("b", DataType::Int32, false),
2586 ],
2587 vec![
2588 ColumnStatistics {
2589 distinct_count: Precision::Inexact(80),
2590 ..Default::default()
2591 },
2592 ColumnStatistics {
2593 distinct_count: Precision::Inexact(40),
2594 ..Default::default()
2595 },
2596 ],
2597 Arc::new(BinaryExpr::new(
2598 Arc::new(BinaryExpr::new(
2599 Arc::new(BinaryExpr::new(
2600 Arc::new(Column::new("a", 0)),
2601 Operator::Eq,
2602 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2603 )),
2604 Operator::And,
2605 Arc::new(BinaryExpr::new(
2606 Arc::new(Column::new("a", 0)),
2607 Operator::Eq,
2608 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2609 )),
2610 )),
2611 Operator::And,
2612 Arc::new(BinaryExpr::new(
2613 Arc::new(Column::new("b", 1)),
2614 Operator::Eq,
2615 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
2616 )),
2617 )),
2618 vec![Precision::Exact(1), Precision::Exact(1)],
2619 ),
2620 ];
2621
2622 for (desc, fields, col_stats, predicate, expected_ndvs) in cases {
2623 let schema = Schema::new(fields);
2624 let input = Arc::new(StatisticsExec::new(
2625 Statistics {
2626 num_rows: Precision::Inexact(100),
2627 total_byte_size: Precision::Inexact(1000),
2628 column_statistics: col_stats,
2629 },
2630 schema.clone(),
2631 ));
2632 let filter: Arc<dyn ExecutionPlan> =
2633 Arc::new(FilterExec::try_new(predicate, input)?);
2634 let statistics = filter.partition_statistics(None)?;
2635
2636 for (i, expected) in expected_ndvs.iter().enumerate() {
2637 assert_eq!(
2638 statistics.column_statistics[i].distinct_count, *expected,
2639 "case '{desc}': column {i} NDV mismatch"
2640 );
2641 }
2642 }
2643 Ok(())
2644 }
2645
2646 #[tokio::test]
2647 async fn test_filter_statistics_and_equality_ndv() -> Result<()> {
2648 let schema = Schema::new(vec![
2652 Field::new("a", DataType::Int32, false),
2653 Field::new("b", DataType::Int32, false),
2654 Field::new("c", DataType::Int32, false),
2655 ]);
2656 let input = Arc::new(StatisticsExec::new(
2657 Statistics {
2658 num_rows: Precision::Inexact(100),
2659 total_byte_size: Precision::Inexact(1200),
2660 column_statistics: vec![
2661 ColumnStatistics {
2662 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2663 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2664 distinct_count: Precision::Inexact(80),
2665 ..Default::default()
2666 },
2667 ColumnStatistics {
2668 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2669 max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2670 distinct_count: Precision::Inexact(40),
2671 ..Default::default()
2672 },
2673 ColumnStatistics {
2674 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2675 max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2676 distinct_count: Precision::Inexact(150),
2677 ..Default::default()
2678 },
2679 ],
2680 },
2681 schema.clone(),
2682 ));
2683
2684 let predicate = Arc::new(BinaryExpr::new(
2686 Arc::new(BinaryExpr::new(
2687 Arc::new(BinaryExpr::new(
2688 Arc::new(Column::new("a", 0)),
2689 Operator::Eq,
2690 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2691 )),
2692 Operator::And,
2693 Arc::new(BinaryExpr::new(
2694 Arc::new(Column::new("b", 1)),
2695 Operator::Gt,
2696 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2697 )),
2698 )),
2699 Operator::And,
2700 Arc::new(BinaryExpr::new(
2701 Arc::new(Column::new("c", 2)),
2702 Operator::Eq,
2703 Arc::new(Literal::new(ScalarValue::Int32(Some(7)))),
2704 )),
2705 ));
2706 let filter: Arc<dyn ExecutionPlan> =
2707 Arc::new(FilterExec::try_new(predicate, input)?);
2708 let statistics = filter.partition_statistics(None)?;
2709 assert_eq!(
2711 statistics.column_statistics[0].distinct_count,
2712 Precision::Exact(1)
2713 );
2714 assert_eq!(
2718 statistics.column_statistics[1].distinct_count,
2719 Precision::Inexact(1)
2720 );
2721 assert_eq!(
2723 statistics.column_statistics[2].distinct_count,
2724 Precision::Exact(1)
2725 );
2726 Ok(())
2727 }
2728
2729 #[tokio::test]
2730 async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> {
2731 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2733 let input = Arc::new(StatisticsExec::new(
2734 Statistics {
2735 num_rows: Precision::Inexact(100),
2736 total_byte_size: Precision::Inexact(400),
2737 column_statistics: vec![ColumnStatistics {
2738 distinct_count: Precision::Inexact(80),
2739 ..Default::default()
2740 }],
2741 },
2742 schema.clone(),
2743 ));
2744
2745 let predicate = Arc::new(BinaryExpr::new(
2748 Arc::new(Column::new("a", 0)),
2749 Operator::Eq,
2750 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2751 ));
2752 let filter: Arc<dyn ExecutionPlan> =
2753 Arc::new(FilterExec::try_new(predicate, input)?);
2754 let statistics = filter.partition_statistics(None)?;
2755 assert_eq!(
2756 statistics.column_statistics[0].distinct_count,
2757 Precision::Exact(1)
2758 );
2759 Ok(())
2760 }
2761
2762 #[tokio::test]
2763 async fn test_filter_statistics_equality_int8_ndv() -> Result<()> {
2764 let schema = Schema::new(vec![Field::new("a", DataType::Int8, false)]);
2766 let input = Arc::new(StatisticsExec::new(
2767 Statistics {
2768 num_rows: Precision::Inexact(100),
2769 total_byte_size: Precision::Inexact(100),
2770 column_statistics: vec![ColumnStatistics {
2771 min_value: Precision::Inexact(ScalarValue::Int8(Some(-100))),
2772 max_value: Precision::Inexact(ScalarValue::Int8(Some(100))),
2773 distinct_count: Precision::Inexact(50),
2774 ..Default::default()
2775 }],
2776 },
2777 schema.clone(),
2778 ));
2779
2780 let predicate = Arc::new(BinaryExpr::new(
2781 Arc::new(Column::new("a", 0)),
2782 Operator::Eq,
2783 Arc::new(Literal::new(ScalarValue::Int8(Some(42)))),
2784 ));
2785 let filter: Arc<dyn ExecutionPlan> =
2786 Arc::new(FilterExec::try_new(predicate, input)?);
2787 let statistics = filter.partition_statistics(None)?;
2788 assert_eq!(
2789 statistics.column_statistics[0].distinct_count,
2790 Precision::Exact(1)
2791 );
2792 Ok(())
2793 }
2794
2795 #[tokio::test]
2796 async fn test_filter_statistics_equality_int64_ndv() -> Result<()> {
2797 let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
2799 let input = Arc::new(StatisticsExec::new(
2800 Statistics {
2801 num_rows: Precision::Inexact(100_000),
2802 total_byte_size: Precision::Inexact(800_000),
2803 column_statistics: vec![ColumnStatistics {
2804 min_value: Precision::Inexact(ScalarValue::Int64(Some(0))),
2805 max_value: Precision::Inexact(ScalarValue::Int64(Some(1_000_000))),
2806 distinct_count: Precision::Inexact(100_000),
2807 ..Default::default()
2808 }],
2809 },
2810 schema.clone(),
2811 ));
2812
2813 let predicate = Arc::new(BinaryExpr::new(
2814 Arc::new(Column::new("a", 0)),
2815 Operator::Eq,
2816 Arc::new(Literal::new(ScalarValue::Int64(Some(42)))),
2817 ));
2818 let filter: Arc<dyn ExecutionPlan> =
2819 Arc::new(FilterExec::try_new(predicate, input)?);
2820 let statistics = filter.partition_statistics(None)?;
2821 assert_eq!(
2822 statistics.column_statistics[0].distinct_count,
2823 Precision::Exact(1)
2824 );
2825 Ok(())
2826 }
2827
2828 #[tokio::test]
2829 async fn test_filter_statistics_equality_float32_ndv() -> Result<()> {
2830 let schema = Schema::new(vec![Field::new("a", DataType::Float32, false)]);
2832 let input = Arc::new(StatisticsExec::new(
2833 Statistics {
2834 num_rows: Precision::Inexact(100),
2835 total_byte_size: Precision::Inexact(400),
2836 column_statistics: vec![ColumnStatistics {
2837 min_value: Precision::Inexact(ScalarValue::Float32(Some(0.0))),
2838 max_value: Precision::Inexact(ScalarValue::Float32(Some(100.0))),
2839 distinct_count: Precision::Inexact(50),
2840 ..Default::default()
2841 }],
2842 },
2843 schema.clone(),
2844 ));
2845
2846 let predicate = Arc::new(BinaryExpr::new(
2847 Arc::new(Column::new("a", 0)),
2848 Operator::Eq,
2849 Arc::new(Literal::new(ScalarValue::Float32(Some(42.5)))),
2850 ));
2851 let filter: Arc<dyn ExecutionPlan> =
2852 Arc::new(FilterExec::try_new(predicate, input)?);
2853 let statistics = filter.partition_statistics(None)?;
2854 assert_eq!(
2855 statistics.column_statistics[0].distinct_count,
2856 Precision::Exact(1)
2857 );
2858 Ok(())
2859 }
2860
2861 #[tokio::test]
2862 async fn test_filter_statistics_equality_reversed_ndv() -> Result<()> {
2863 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2865 let input = Arc::new(StatisticsExec::new(
2866 Statistics {
2867 num_rows: Precision::Inexact(100),
2868 total_byte_size: Precision::Inexact(400),
2869 column_statistics: vec![ColumnStatistics {
2870 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2871 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2872 distinct_count: Precision::Inexact(80),
2873 ..Default::default()
2874 }],
2875 },
2876 schema.clone(),
2877 ));
2878
2879 let predicate = Arc::new(BinaryExpr::new(
2881 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2882 Operator::Eq,
2883 Arc::new(Column::new("a", 0)),
2884 ));
2885 let filter: Arc<dyn ExecutionPlan> =
2886 Arc::new(FilterExec::try_new(predicate, input)?);
2887 let statistics = filter.partition_statistics(None)?;
2888 assert_eq!(
2889 statistics.column_statistics[0].distinct_count,
2890 Precision::Exact(1)
2891 );
2892 Ok(())
2893 }
2894
2895 #[tokio::test]
2896 async fn test_filter_statistics_equality_timestamp_ndv() -> Result<()> {
2897 let schema = Schema::new(vec![Field::new(
2899 "ts",
2900 DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
2901 false,
2902 )]);
2903 let input = Arc::new(StatisticsExec::new(
2904 Statistics {
2905 num_rows: Precision::Inexact(1000),
2906 total_byte_size: Precision::Inexact(8000),
2907 column_statistics: vec![ColumnStatistics {
2908 min_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
2909 Some(1_000_000_000),
2910 None,
2911 )),
2912 max_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
2913 Some(2_000_000_000),
2914 None,
2915 )),
2916 distinct_count: Precision::Inexact(500),
2917 ..Default::default()
2918 }],
2919 },
2920 schema.clone(),
2921 ));
2922
2923 let predicate = Arc::new(BinaryExpr::new(
2924 Arc::new(Column::new("ts", 0)),
2925 Operator::Eq,
2926 Arc::new(Literal::new(ScalarValue::TimestampNanosecond(
2927 Some(1_500_000_000),
2928 None,
2929 ))),
2930 ));
2931 let filter: Arc<dyn ExecutionPlan> =
2932 Arc::new(FilterExec::try_new(predicate, input)?);
2933 let statistics = filter.partition_statistics(None)?;
2934 assert_eq!(
2935 statistics.column_statistics[0].distinct_count,
2936 Precision::Exact(1)
2937 );
2938 Ok(())
2939 }
2940
2941 #[test]
2942 fn test_collect_equality_columns() {
2943 use std::collections::HashSet;
2944 #[expect(clippy::type_complexity)]
2946 let cases: Vec<(&str, Arc<dyn PhysicalExpr>, Vec<usize>, bool)> = vec![
2947 (
2948 "simple col = literal",
2949 Arc::new(BinaryExpr::new(
2950 Arc::new(Column::new("a", 0)),
2951 Operator::Eq,
2952 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2953 )),
2954 vec![0],
2955 false,
2956 ),
2957 (
2958 "reversed literal = col",
2959 Arc::new(BinaryExpr::new(
2960 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2961 Operator::Eq,
2962 Arc::new(Column::new("a", 0)),
2963 )),
2964 vec![0],
2965 false,
2966 ),
2967 (
2968 "AND with two equalities",
2969 Arc::new(BinaryExpr::new(
2970 Arc::new(BinaryExpr::new(
2971 Arc::new(Column::new("a", 0)),
2972 Operator::Eq,
2973 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2974 )),
2975 Operator::And,
2976 Arc::new(BinaryExpr::new(
2977 Arc::new(Column::new("b", 1)),
2978 Operator::Eq,
2979 Arc::new(Literal::new(ScalarValue::Utf8(Some(
2980 "hello".to_string(),
2981 )))),
2982 )),
2983 )),
2984 vec![0, 1],
2985 false,
2986 ),
2987 (
2988 "OR produces empty set",
2989 Arc::new(BinaryExpr::new(
2990 Arc::new(BinaryExpr::new(
2991 Arc::new(Column::new("a", 0)),
2992 Operator::Eq,
2993 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2994 )),
2995 Operator::Or,
2996 Arc::new(BinaryExpr::new(
2997 Arc::new(Column::new("a", 0)),
2998 Operator::Eq,
2999 Arc::new(Literal::new(ScalarValue::Int32(Some(99)))),
3000 )),
3001 )),
3002 vec![],
3003 false,
3004 ),
3005 (
3006 "greater-than produces empty set",
3007 Arc::new(BinaryExpr::new(
3008 Arc::new(Column::new("a", 0)),
3009 Operator::Gt,
3010 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3011 )),
3012 vec![],
3013 false,
3014 ),
3015 (
3016 "col = col produces empty set",
3017 Arc::new(BinaryExpr::new(
3018 Arc::new(Column::new("a", 0)),
3019 Operator::Eq,
3020 Arc::new(Column::new("b", 1)),
3021 )),
3022 vec![],
3023 false,
3024 ),
3025 (
3026 "nested AND with three equalities",
3027 Arc::new(BinaryExpr::new(
3028 Arc::new(BinaryExpr::new(
3029 Arc::new(BinaryExpr::new(
3030 Arc::new(Column::new("a", 0)),
3031 Operator::Eq,
3032 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
3033 )),
3034 Operator::And,
3035 Arc::new(BinaryExpr::new(
3036 Arc::new(Column::new("b", 1)),
3037 Operator::Eq,
3038 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
3039 )),
3040 )),
3041 Operator::And,
3042 Arc::new(BinaryExpr::new(
3043 Arc::new(Column::new("c", 2)),
3044 Operator::Eq,
3045 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
3046 )),
3047 )),
3048 vec![0, 1, 2],
3049 false,
3050 ),
3051 (
3052 "AND with mixed equality and non-equality",
3053 Arc::new(BinaryExpr::new(
3054 Arc::new(BinaryExpr::new(
3055 Arc::new(Column::new("a", 0)),
3056 Operator::Eq,
3057 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3058 )),
3059 Operator::And,
3060 Arc::new(BinaryExpr::new(
3061 Arc::new(Column::new("b", 1)),
3062 Operator::Gt,
3063 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3064 )),
3065 )),
3066 vec![0],
3067 false,
3068 ),
3069 (
3070 "col = NULL is excluded",
3071 Arc::new(BinaryExpr::new(
3072 Arc::new(Column::new("a", 0)),
3073 Operator::Eq,
3074 Arc::new(Literal::new(ScalarValue::Int32(None))),
3075 )),
3076 vec![],
3077 false,
3078 ),
3079 (
3080 "NULL = col is excluded",
3081 Arc::new(BinaryExpr::new(
3082 Arc::new(Literal::new(ScalarValue::Utf8(None))),
3083 Operator::Eq,
3084 Arc::new(Column::new("a", 0)),
3085 )),
3086 vec![],
3087 false,
3088 ),
3089 (
3090 "contradictory: same col, different literals",
3091 Arc::new(BinaryExpr::new(
3092 Arc::new(BinaryExpr::new(
3093 Arc::new(Column::new("a", 0)),
3094 Operator::Eq,
3095 Arc::new(Literal::new(ScalarValue::Utf8(Some(
3096 "alice".to_string(),
3097 )))),
3098 )),
3099 Operator::And,
3100 Arc::new(BinaryExpr::new(
3101 Arc::new(Column::new("a", 0)),
3102 Operator::Eq,
3103 Arc::new(Literal::new(ScalarValue::Utf8(Some(
3104 "bob".to_string(),
3105 )))),
3106 )),
3107 )),
3108 vec![0],
3109 true,
3110 ),
3111 (
3112 "same col, same literal is not contradictory",
3113 Arc::new(BinaryExpr::new(
3114 Arc::new(BinaryExpr::new(
3115 Arc::new(Column::new("a", 0)),
3116 Operator::Eq,
3117 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3118 )),
3119 Operator::And,
3120 Arc::new(BinaryExpr::new(
3121 Arc::new(Column::new("a", 0)),
3122 Operator::Eq,
3123 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3124 )),
3125 )),
3126 vec![0],
3127 false,
3128 ),
3129 ];
3130
3131 for (desc, expr, expected_cols, expected_infeasible) in cases {
3132 let (result, infeasible) = collect_equality_columns(&expr);
3133 let expected: HashSet<usize> = expected_cols.into_iter().collect();
3134 if expected_infeasible {
3135 assert!(infeasible, "case '{desc}': expected infeasible");
3139 } else {
3140 assert_eq!(result, expected, "case '{desc}': columns mismatch");
3141 assert!(!infeasible, "case '{desc}': expected feasible");
3142 }
3143 }
3144 }
3145
3146 #[test]
3156 fn test_filter_with_projection_swap_does_not_panic() -> Result<()> {
3157 use crate::projection::ProjectionExpr;
3158 use datafusion_physical_expr::expressions::col;
3159
3160 let schema = Arc::new(Schema::new(vec![
3162 Field::new("ts", DataType::Int64, false),
3163 Field::new("tokens", DataType::Int64, false),
3164 Field::new("svc", DataType::Utf8, false),
3165 ]));
3166 let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
3167
3168 let predicate = Arc::new(BinaryExpr::new(
3170 Arc::new(Column::new("ts", 0)),
3171 Operator::Gt,
3172 Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
3173 ));
3174 let filter = Arc::new(
3175 FilterExecBuilder::new(predicate, input)
3176 .apply_projection(Some(vec![0, 1, 2]))?
3177 .build()?,
3178 );
3179
3180 let proj_exprs = vec![
3182 ProjectionExpr {
3183 expr: col("ts", &filter.schema())?,
3184 alias: "ts".to_string(),
3185 },
3186 ProjectionExpr {
3187 expr: col("tokens", &filter.schema())?,
3188 alias: "tokens".to_string(),
3189 },
3190 ];
3191 let projection = Arc::new(ProjectionExec::try_new(
3192 proj_exprs,
3193 Arc::clone(&filter) as _,
3194 )?);
3195
3196 let result = filter.try_swapping_with_projection(&projection)?;
3198 assert!(result.is_some(), "swap should succeed");
3199
3200 let new_plan = result.unwrap();
3201 let out_schema = new_plan.schema();
3203 assert_eq!(out_schema.fields().len(), 2);
3204 assert_eq!(out_schema.field(0).name(), "ts");
3205 assert_eq!(out_schema.field(1).name(), "tokens");
3206 Ok(())
3207 }
3208
3209 #[tokio::test]
3210 async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> {
3211 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3213 let input = Arc::new(StatisticsExec::new(
3214 Statistics {
3215 num_rows: Precision::Inexact(100),
3216 total_byte_size: Precision::Inexact(400),
3217 column_statistics: vec![ColumnStatistics {
3218 min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
3219 max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
3220 distinct_count: Precision::Inexact(80),
3221 ..Default::default()
3222 }],
3223 },
3224 schema.clone(),
3225 ));
3226
3227 let predicate: Arc<dyn PhysicalExpr> =
3229 binary(col("a", &schema)?, Operator::LtEq, lit(10i32), &schema)?;
3230
3231 let filter: Arc<dyn ExecutionPlan> =
3232 Arc::new(FilterExec::try_new(predicate, input)?);
3233
3234 let statistics = filter.partition_statistics(None)?;
3235 assert_eq!(statistics.num_rows, Precision::Inexact(10));
3237 let ndv = &statistics.column_statistics[0].distinct_count;
3239 assert!(
3240 ndv.get_value().copied() <= Some(10),
3241 "Expected NDV <= 10 (filtered row count), got {ndv:?}"
3242 );
3243 Ok(())
3244 }
3245}