Skip to main content

datafusion_physical_plan/
filter_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//!    on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//!    by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively pushes down filters for each child,
25//!    passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//!    containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//!    passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//!    how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`ExecutionPlan::gather_filters_for_pushdown`]: crate::ExecutionPlan::gather_filters_for_pushdown
33//! [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
34//!
35//! See also datafusion/physical-optimizer/src/filter_pushdown.rs.
36
37use std::collections::HashSet;
38use std::sync::Arc;
39
40use arrow_schema::SchemaRef;
41use datafusion_common::{
42    Result,
43    tree_node::{Transformed, TreeNode},
44};
45use datafusion_physical_expr::expressions::Column;
46use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushdownPhase {
50    /// Pushdown that happens before most other optimizations.
51    /// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down.
52    /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at this stage since the whole plan tree may be rewritten
53    /// by other optimizations.
54    /// Implementers are however allowed to modify the execution plan themselves during this phase, for example by returning a completely
55    /// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`].
56    ///
57    /// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a pre-pushdown.
58    /// Unlike filter pushdown in the logical phase, which operates on the logical plan to push filters into the logical table scan,
59    /// the `Pre` phase in the physical plan targets the actual physical scan, pushing filters down to specific data source implementations.
60    /// For example, Parquet supports filter pushdown to reduce data read during scanning, while CSV typically does not.
61    ///
62    /// [`ExecutionPlan`]: crate::ExecutionPlan
63    /// [`FilterExec`]: crate::filter::FilterExec
64    /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
65    Pre,
66    /// Pushdown that happens after most other optimizations.
67    /// This stage of filter pushdown allows filters that reference an [`ExecutionPlan`] to be pushed down.
68    /// Since subsequent optimizations should not change the structure of the plan tree except for calling [`ExecutionPlan::with_new_children`]
69    /// (which generally preserves internal references) it is safe for references between [`ExecutionPlan`]s to be established at this stage.
70    ///
71    /// This phase is used to link a [`SortExec`] (with a TopK operator) or a [`HashJoinExec`] to a `DataSourceExec`.
72    ///
73    /// [`ExecutionPlan`]: crate::ExecutionPlan
74    /// [`ExecutionPlan::with_new_children`]: crate::ExecutionPlan::with_new_children
75    /// [`SortExec`]: crate::sorts::sort::SortExec
76    /// [`HashJoinExec`]: crate::joins::HashJoinExec
77    /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
78    Post,
79}
80
81impl std::fmt::Display for FilterPushdownPhase {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match self {
84            FilterPushdownPhase::Pre => write!(f, "Pre"),
85            FilterPushdownPhase::Post => write!(f, "Post"),
86        }
87    }
88}
89
90/// The result of a plan for pushing down a filter into a child node.
91/// This contains references to filters so that nodes can mutate a filter
92/// before pushing it down to a child node (e.g. to adjust a projection)
93/// or can directly take ownership of filters that their children
94/// could not handle.
95#[derive(Debug, Clone)]
96pub struct PushedDownPredicate {
97    pub discriminant: PushedDown,
98    pub predicate: Arc<dyn PhysicalExpr>,
99}
100
101impl PushedDownPredicate {
102    /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported.
103    pub fn into_inner(self) -> Arc<dyn PhysicalExpr> {
104        self.predicate
105    }
106
107    /// Create a new [`PushedDownPredicate`] with supported pushdown.
108    pub fn supported(predicate: Arc<dyn PhysicalExpr>) -> Self {
109        Self {
110            discriminant: PushedDown::Yes,
111            predicate,
112        }
113    }
114
115    /// Create a new [`PushedDownPredicate`] with unsupported pushdown.
116    pub fn unsupported(predicate: Arc<dyn PhysicalExpr>) -> Self {
117        Self {
118            discriminant: PushedDown::No,
119            predicate,
120        }
121    }
122}
123
124/// Discriminant for the result of pushing down a filter into a child node.
125#[derive(Debug, Clone, Copy)]
126pub enum PushedDown {
127    /// The predicate was successfully pushed down into the child node.
128    Yes,
129    /// The predicate could not be pushed down into the child node.
130    No,
131}
132
133impl PushedDown {
134    /// Logical AND operation: returns `Yes` only if both operands are `Yes`.
135    pub fn and(self, other: PushedDown) -> PushedDown {
136        match (self, other) {
137            (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes,
138            _ => PushedDown::No,
139        }
140    }
141
142    /// Logical OR operation: returns `Yes` if either operand is `Yes`.
143    pub fn or(self, other: PushedDown) -> PushedDown {
144        match (self, other) {
145            (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes,
146            (PushedDown::No, PushedDown::No) => PushedDown::No,
147        }
148    }
149
150    /// Wrap a [`PhysicalExpr`] with this pushdown result.
151    pub fn wrap_expression(self, expr: Arc<dyn PhysicalExpr>) -> PushedDownPredicate {
152        PushedDownPredicate {
153            discriminant: self,
154            predicate: expr,
155        }
156    }
157}
158
159/// The result of pushing down a single parent filter into all children.
160#[derive(Debug, Clone)]
161pub struct ChildFilterPushdownResult {
162    pub filter: Arc<dyn PhysicalExpr>,
163    pub child_results: Vec<PushedDown>,
164}
165
166impl ChildFilterPushdownResult {
167    /// Combine all child results using OR logic.
168    /// Returns `Yes` if **any** child supports the filter.
169    /// Returns `No` if **all** children reject the filter or if there are no children.
170    pub fn any(&self) -> PushedDown {
171        if self.child_results.is_empty() {
172            // If there are no children, filters cannot be supported
173            PushedDown::No
174        } else {
175            self.child_results
176                .iter()
177                .fold(PushedDown::No, |acc, result| acc.or(*result))
178        }
179    }
180
181    /// Combine all child results using AND logic.
182    /// Returns `Yes` if **all** children support the filter.
183    /// Returns `No` if **any** child rejects the filter or if there are no children.
184    pub fn all(&self) -> PushedDown {
185        if self.child_results.is_empty() {
186            // If there are no children, filters cannot be supported
187            PushedDown::No
188        } else {
189            self.child_results
190                .iter()
191                .fold(PushedDown::Yes, |acc, result| acc.and(*result))
192        }
193    }
194}
195
196/// The result of pushing down filters into a child node.
197///
198/// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`].
199/// Nodes process this result and convert it into a [`FilterPushdownPropagation`]
200/// that is returned to their parent.
201///
202/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
203#[derive(Debug, Clone)]
204pub struct ChildPushdownResult {
205    /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called.
206    /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them
207    /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`].
208    /// Attached to each filter is a [`PushedDown`] *per child* that indicates whether the filter was supported or unsupported by each child.
209    /// To get combined results see [`ChildFilterPushdownResult::any`] and [`ChildFilterPushdownResult::all`].
210    pub parent_filters: Vec<ChildFilterPushdownResult>,
211    /// The result of pushing down each filter this node provided into each of it's children.
212    /// The outer vector corresponds to each child, and the inner vector corresponds to each filter.
213    /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all.
214    /// It is up to each node to interpret this result based on the filters it provided for each child in [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result).
215    pub self_filters: Vec<Vec<PushedDownPredicate>>,
216}
217
218/// The result of pushing down filters into a node.
219///
220/// Returned from [`ExecutionPlan::handle_child_pushdown_result`] to communicate
221/// to the optimizer:
222///
223/// 1. What to do with any parent filters that could not be pushed down into the children.
224/// 2. If the node needs to be replaced in the execution plan with a new node or not.
225///
226/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
227#[derive(Debug, Clone)]
228pub struct FilterPushdownPropagation<T> {
229    /// Which parent filters were pushed down into this node's children.
230    pub filters: Vec<PushedDown>,
231    /// The updated node, if it was updated during pushdown
232    pub updated_node: Option<T>,
233}
234
235impl<T> FilterPushdownPropagation<T> {
236    /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
237    /// is supported if it was supported by *all* children.
238    pub fn if_all(child_pushdown_result: ChildPushdownResult) -> Self {
239        let filters = child_pushdown_result
240            .parent_filters
241            .into_iter()
242            .map(|result| result.all())
243            .collect();
244        Self {
245            filters,
246            updated_node: None,
247        }
248    }
249
250    /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
251    /// is supported if it was supported by *any* child.
252    pub fn if_any(child_pushdown_result: ChildPushdownResult) -> Self {
253        let filters = child_pushdown_result
254            .parent_filters
255            .into_iter()
256            .map(|result| result.any())
257            .collect();
258        Self {
259            filters,
260            updated_node: None,
261        }
262    }
263
264    /// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results.
265    pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
266        let filters = child_pushdown_result
267            .parent_filters
268            .into_iter()
269            .map(|_| PushedDown::No)
270            .collect();
271        Self {
272            filters,
273            updated_node: None,
274        }
275    }
276
277    /// Create a new [`FilterPushdownPropagation`] with the specified filter support.
278    /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was.
279    pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
280        Self {
281            filters,
282            updated_node: None,
283        }
284    }
285
286    /// Bind an updated node to the [`FilterPushdownPropagation`].
287    /// Use this when the current node wants to update itself in the tree or replace itself with a new node (e.g. one of it's children).
288    /// You do not need to call this if one of the children of the current node may have updated itself, that is handled by the optimizer.
289    pub fn with_updated_node(mut self, updated_node: T) -> Self {
290        self.updated_node = Some(updated_node);
291        self
292    }
293}
294
295/// Describes filter pushdown for a single child node.
296///
297/// This structure contains two types of filters:
298/// - **Parent filters**: Filters received from the parent node, marked as supported or unsupported
299/// - **Self filters**: Filters generated by the current node to be pushed down to this child
300#[derive(Debug, Clone)]
301pub struct ChildFilterDescription {
302    /// Description of which parent filters can be pushed down into this node.
303    /// Since we need to transmit filter pushdown results back to this node's parent
304    /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down.
305    pub(crate) parent_filters: Vec<PushedDownPredicate>,
306    /// Description of which filters this node is pushing down to its children.
307    /// Since this is not transmitted back to the parents we can have variable sized inner arrays
308    /// instead of having to track supported/unsupported.
309    pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
310}
311
312/// Validates and remaps filter column references to a target schema in one step.
313///
314/// When pushing filters from a parent to a child node, we need to:
315/// 1. Verify that all columns referenced by the filter exist in the target
316/// 2. Remap column indices to match the target schema
317///
318/// `allowed_indices` controls which column indices (in the parent schema) are
319/// considered valid. For single-input nodes this defaults to
320/// `0..child_schema.len()` (all columns are reachable). For join nodes it is
321/// restricted to the subset of output columns that map to the target child,
322/// which is critical when different sides have same-named columns.
323pub(crate) struct FilterRemapper {
324    /// The target schema to remap column indices into.
325    child_schema: SchemaRef,
326    /// Only columns at these indices (in the *parent* schema) are considered
327    /// valid. For non-join nodes this defaults to `0..child_schema.len()`.
328    allowed_indices: HashSet<usize>,
329}
330
331impl FilterRemapper {
332    /// Create a remapper that accepts any column whose index falls within
333    /// `0..child_schema.len()` and whose name exists in the target schema.
334    pub(crate) fn new(child_schema: SchemaRef) -> Self {
335        let allowed_indices = (0..child_schema.fields().len()).collect();
336        Self {
337            child_schema,
338            allowed_indices,
339        }
340    }
341
342    /// Create a remapper that only accepts columns at the given indices.
343    /// This is used by join nodes to restrict pushdown to one side of the
344    /// join when both sides have same-named columns.
345    fn with_allowed_indices(
346        child_schema: SchemaRef,
347        allowed_indices: HashSet<usize>,
348    ) -> Self {
349        Self {
350            child_schema,
351            allowed_indices,
352        }
353    }
354
355    /// Try to remap a filter's column references to the target schema.
356    ///
357    /// Validates and remaps in a single tree traversal: for each column,
358    /// checks that its index is in the allowed set and that
359    /// its name exists in the target schema, then remaps the index.
360    /// Returns `Some(remapped)` if all columns are valid, or `None` if any
361    /// column fails validation.
362    pub(crate) fn try_remap(
363        &self,
364        filter: &Arc<dyn PhysicalExpr>,
365    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
366        let mut all_valid = true;
367        let transformed = Arc::clone(filter).transform_down(|expr| {
368            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
369                if self.allowed_indices.contains(&col.index())
370                    && let Ok(new_index) = self.child_schema.index_of(col.name())
371                {
372                    Ok(Transformed::yes(Arc::new(Column::new(
373                        col.name(),
374                        new_index,
375                    ))))
376                } else {
377                    all_valid = false;
378                    Ok(Transformed::complete(expr))
379                }
380            } else {
381                Ok(Transformed::no(expr))
382            }
383        })?;
384
385        Ok(all_valid.then_some(transformed.data))
386    }
387}
388
389impl ChildFilterDescription {
390    /// Build a child filter description by analyzing which parent filters can be pushed to a specific child.
391    ///
392    /// This method performs column analysis to determine which filters can be pushed down:
393    /// - If all columns referenced by a filter exist in the child's schema, it can be pushed down
394    /// - Otherwise, it cannot be pushed down to that child
395    ///
396    /// See [`FilterDescription::from_children`] for more details
397    pub fn from_child(
398        parent_filters: &[Arc<dyn PhysicalExpr>],
399        child: &Arc<dyn crate::ExecutionPlan>,
400    ) -> Result<Self> {
401        let remapper = FilterRemapper::new(child.schema());
402        Self::remap_filters(parent_filters, &remapper)
403    }
404
405    /// Like [`Self::from_child`], but restricts which parent-level columns are
406    /// considered reachable through this child.
407    ///
408    /// `allowed_indices` is the set of column indices (in the *parent*
409    /// schema) that map to this child's side of a join. A filter is only
410    /// eligible for pushdown when **every** column index it references
411    /// appears in `allowed_indices`.
412    ///
413    /// This prevents incorrect pushdown when different join sides have
414    /// columns with the same name: matching on index ensures a filter
415    /// referencing the right side's `k@2` is not pushed to the left side
416    /// which also has a column named `k` but at a different index.
417    pub fn from_child_with_allowed_indices(
418        parent_filters: &[Arc<dyn PhysicalExpr>],
419        allowed_indices: HashSet<usize>,
420        child: &Arc<dyn crate::ExecutionPlan>,
421    ) -> Result<Self> {
422        let remapper =
423            FilterRemapper::with_allowed_indices(child.schema(), allowed_indices);
424        Self::remap_filters(parent_filters, &remapper)
425    }
426
427    fn remap_filters(
428        parent_filters: &[Arc<dyn PhysicalExpr>],
429        remapper: &FilterRemapper,
430    ) -> Result<Self> {
431        let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
432        for filter in parent_filters {
433            if let Some(remapped) = remapper.try_remap(filter)? {
434                child_parent_filters.push(PushedDownPredicate::supported(remapped));
435            } else {
436                child_parent_filters
437                    .push(PushedDownPredicate::unsupported(Arc::clone(filter)));
438            }
439        }
440
441        Ok(Self {
442            parent_filters: child_parent_filters,
443            self_filters: vec![],
444        })
445    }
446
447    /// Mark all parent filters as unsupported for this child.
448    pub fn all_unsupported(parent_filters: &[Arc<dyn PhysicalExpr>]) -> Self {
449        Self {
450            parent_filters: parent_filters
451                .iter()
452                .map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
453                .collect(),
454            self_filters: vec![],
455        }
456    }
457
458    /// Add a self filter (from the current node) to be pushed down to this child.
459    pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
460        self.self_filters.push(filter);
461        self
462    }
463
464    /// Add multiple self filters.
465    pub fn with_self_filters(mut self, filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
466        self.self_filters.extend(filters);
467        self
468    }
469}
470
471/// Describes how filters should be pushed down to children.
472///
473/// This structure contains filter descriptions for each child node, specifying:
474/// - Which parent filters can be pushed down to each child
475/// - Which self-generated filters should be pushed down to each child
476///
477/// The filter routing is determined by column analysis - filters can only be pushed
478/// to children whose schemas contain all the referenced columns.
479#[derive(Debug, Clone)]
480pub struct FilterDescription {
481    /// A filter description for each child.
482    /// This includes which parent filters and which self filters (from the node in question)
483    /// will get pushed down to each child.
484    child_filter_descriptions: Vec<ChildFilterDescription>,
485}
486
487impl Default for FilterDescription {
488    fn default() -> Self {
489        Self::new()
490    }
491}
492
493impl FilterDescription {
494    /// Create a new empty FilterDescription
495    pub fn new() -> Self {
496        Self {
497            child_filter_descriptions: vec![],
498        }
499    }
500
501    /// Add a child filter description
502    pub fn with_child(mut self, child: ChildFilterDescription) -> Self {
503        self.child_filter_descriptions.push(child);
504        self
505    }
506
507    /// Build a filter description by analyzing which parent filters can be pushed to each child.
508    /// This method automatically determines filter routing based on column analysis:
509    /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down
510    /// - Otherwise, it cannot be pushed down to that child
511    #[expect(clippy::needless_pass_by_value)]
512    pub fn from_children(
513        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
514        children: &[&Arc<dyn crate::ExecutionPlan>],
515    ) -> Result<Self> {
516        let mut desc = Self::new();
517
518        // For each child, create a ChildFilterDescription
519        for child in children {
520            desc = desc
521                .with_child(ChildFilterDescription::from_child(&parent_filters, child)?);
522        }
523
524        Ok(desc)
525    }
526
527    /// Mark all parent filters as unsupported for all children.
528    pub fn all_unsupported(
529        parent_filters: &[Arc<dyn PhysicalExpr>],
530        children: &[&Arc<dyn crate::ExecutionPlan>],
531    ) -> Self {
532        let mut desc = Self::new();
533        for _ in 0..children.len() {
534            desc =
535                desc.with_child(ChildFilterDescription::all_unsupported(parent_filters));
536        }
537        desc
538    }
539
540    pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
541        self.child_filter_descriptions
542            .iter()
543            .map(|d| &d.parent_filters)
544            .cloned()
545            .collect()
546    }
547
548    pub fn self_filters(&self) -> Vec<Vec<Arc<dyn PhysicalExpr>>> {
549        self.child_filter_descriptions
550            .iter()
551            .map(|d| &d.self_filters)
552            .cloned()
553            .collect()
554    }
555}