datafusion_physical_plan/
filter_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//!    on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//!    by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively pushes down filters for each child,
25//!    passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//!    containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//!    passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//!    how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`ExecutionPlan::gather_filters_for_pushdown`]: crate::ExecutionPlan::gather_filters_for_pushdown
33//! [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
34//!
35//! See also datafusion/physical-optimizer/src/filter_pushdown.rs.
36
37use std::collections::HashSet;
38use std::sync::Arc;
39
40use datafusion_common::Result;
41use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
42use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
43use itertools::Itertools;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum FilterPushdownPhase {
47    /// Pushdown that happens before most other optimizations.
48    /// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down.
49    /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at this stage since the whole plan tree may be rewritten
50    /// by other optimizations.
51    /// Implementers are however allowed to modify the execution plan themselves during this phase, for example by returning a completely
52    /// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`].
53    ///
54    /// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a pre-pushdown.
55    /// Unlike filter pushdown in the logical phase, which operates on the logical plan to push filters into the logical table scan,
56    /// the `Pre` phase in the physical plan targets the actual physical scan, pushing filters down to specific data source implementations.
57    /// For example, Parquet supports filter pushdown to reduce data read during scanning, while CSV typically does not.
58    ///
59    /// [`ExecutionPlan`]: crate::ExecutionPlan
60    /// [`FilterExec`]: crate::filter::FilterExec
61    /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
62    Pre,
63    /// Pushdown that happens after most other optimizations.
64    /// This stage of filter pushdown allows filters that reference an [`ExecutionPlan`] to be pushed down.
65    /// Since subsequent optimizations should not change the structure of the plan tree except for calling [`ExecutionPlan::with_new_children`]
66    /// (which generally preserves internal references) it is safe for references between [`ExecutionPlan`]s to be established at this stage.
67    ///
68    /// This phase is used to link a [`SortExec`] (with a TopK operator) or a [`HashJoinExec`] to a `DataSourceExec`.
69    ///
70    /// [`ExecutionPlan`]: crate::ExecutionPlan
71    /// [`ExecutionPlan::with_new_children`]: crate::ExecutionPlan::with_new_children
72    /// [`SortExec`]: crate::sorts::sort::SortExec
73    /// [`HashJoinExec`]: crate::joins::HashJoinExec
74    /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
75    Post,
76}
77
78impl std::fmt::Display for FilterPushdownPhase {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            FilterPushdownPhase::Pre => write!(f, "Pre"),
82            FilterPushdownPhase::Post => write!(f, "Post"),
83        }
84    }
85}
86
87/// The result of a plan for pushing down a filter into a child node.
88/// This contains references to filters so that nodes can mutate a filter
89/// before pushing it down to a child node (e.g. to adjust a projection)
90/// or can directly take ownership of filters that their children
91/// could not handle.
92#[derive(Debug, Clone)]
93pub struct PushedDownPredicate {
94    pub discriminant: PushedDown,
95    pub predicate: Arc<dyn PhysicalExpr>,
96}
97
98impl PushedDownPredicate {
99    /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported.
100    pub fn into_inner(self) -> Arc<dyn PhysicalExpr> {
101        self.predicate
102    }
103
104    /// Create a new [`PushedDownPredicate`] with supported pushdown.
105    pub fn supported(predicate: Arc<dyn PhysicalExpr>) -> Self {
106        Self {
107            discriminant: PushedDown::Yes,
108            predicate,
109        }
110    }
111
112    /// Create a new [`PushedDownPredicate`] with unsupported pushdown.
113    pub fn unsupported(predicate: Arc<dyn PhysicalExpr>) -> Self {
114        Self {
115            discriminant: PushedDown::No,
116            predicate,
117        }
118    }
119}
120
121/// Discriminant for the result of pushing down a filter into a child node.
122#[derive(Debug, Clone, Copy)]
123pub enum PushedDown {
124    /// The predicate was successfully pushed down into the child node.
125    Yes,
126    /// The predicate could not be pushed down into the child node.
127    No,
128}
129
130impl PushedDown {
131    /// Logical AND operation: returns `Yes` only if both operands are `Yes`.
132    pub fn and(self, other: PushedDown) -> PushedDown {
133        match (self, other) {
134            (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes,
135            _ => PushedDown::No,
136        }
137    }
138
139    /// Logical OR operation: returns `Yes` if either operand is `Yes`.
140    pub fn or(self, other: PushedDown) -> PushedDown {
141        match (self, other) {
142            (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes,
143            (PushedDown::No, PushedDown::No) => PushedDown::No,
144        }
145    }
146
147    /// Wrap a [`PhysicalExpr`] with this pushdown result.
148    pub fn wrap_expression(self, expr: Arc<dyn PhysicalExpr>) -> PushedDownPredicate {
149        PushedDownPredicate {
150            discriminant: self,
151            predicate: expr,
152        }
153    }
154}
155
156/// The result of pushing down a single parent filter into all children.
157#[derive(Debug, Clone)]
158pub struct ChildFilterPushdownResult {
159    pub filter: Arc<dyn PhysicalExpr>,
160    pub child_results: Vec<PushedDown>,
161}
162
163impl ChildFilterPushdownResult {
164    /// Combine all child results using OR logic.
165    /// Returns `Yes` if **any** child supports the filter.
166    /// Returns `No` if **all** children reject the filter or if there are no children.
167    pub fn any(&self) -> PushedDown {
168        if self.child_results.is_empty() {
169            // If there are no children, filters cannot be supported
170            PushedDown::No
171        } else {
172            self.child_results
173                .iter()
174                .fold(PushedDown::No, |acc, result| acc.or(*result))
175        }
176    }
177
178    /// Combine all child results using AND logic.
179    /// Returns `Yes` if **all** children support the filter.
180    /// Returns `No` if **any** child rejects the filter or if there are no children.
181    pub fn all(&self) -> PushedDown {
182        if self.child_results.is_empty() {
183            // If there are no children, filters cannot be supported
184            PushedDown::No
185        } else {
186            self.child_results
187                .iter()
188                .fold(PushedDown::Yes, |acc, result| acc.and(*result))
189        }
190    }
191}
192
193/// The result of pushing down filters into a child node.
194///
195/// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`].
196/// Nodes process this result and convert it into a [`FilterPushdownPropagation`]
197/// that is returned to their parent.
198///
199/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
200#[derive(Debug, Clone)]
201pub struct ChildPushdownResult {
202    /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called.
203    /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them
204    /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`].
205    /// Attached to each filter is a [`PushedDown`] *per child* that indicates whether the filter was supported or unsupported by each child.
206    /// To get combined results see [`ChildFilterPushdownResult::any`] and [`ChildFilterPushdownResult::all`].
207    pub parent_filters: Vec<ChildFilterPushdownResult>,
208    /// The result of pushing down each filter this node provided into each of it's children.
209    /// The outer vector corresponds to each child, and the inner vector corresponds to each filter.
210    /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all.
211    /// It is up to each node to interpret this result based on the filters it provided for each child in [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result).
212    pub self_filters: Vec<Vec<PushedDownPredicate>>,
213}
214
215/// The result of pushing down filters into a node.
216///
217/// Returned from [`ExecutionPlan::handle_child_pushdown_result`] to communicate
218/// to the optimizer:
219///
220/// 1. What to do with any parent filters that were could not be pushed down into the children.
221/// 2. If the node needs to be replaced in the execution plan with a new node or not.
222///
223/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
224#[derive(Debug, Clone)]
225pub struct FilterPushdownPropagation<T> {
226    /// What filters were pushed into the parent node.
227    pub filters: Vec<PushedDown>,
228    /// The updated node, if it was updated during pushdown
229    pub updated_node: Option<T>,
230}
231
232impl<T> FilterPushdownPropagation<T> {
233    /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
234    /// is supported if it was supported by *all* children.
235    pub fn if_all(child_pushdown_result: ChildPushdownResult) -> Self {
236        let filters = child_pushdown_result
237            .parent_filters
238            .into_iter()
239            .map(|result| result.all())
240            .collect();
241        Self {
242            filters,
243            updated_node: None,
244        }
245    }
246
247    /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
248    /// is supported if it was supported by *any* child.
249    pub fn if_any(child_pushdown_result: ChildPushdownResult) -> Self {
250        let filters = child_pushdown_result
251            .parent_filters
252            .into_iter()
253            .map(|result| result.any())
254            .collect();
255        Self {
256            filters,
257            updated_node: None,
258        }
259    }
260
261    /// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results.
262    pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
263        let filters = child_pushdown_result
264            .parent_filters
265            .into_iter()
266            .map(|_| PushedDown::No)
267            .collect();
268        Self {
269            filters,
270            updated_node: None,
271        }
272    }
273
274    /// Create a new [`FilterPushdownPropagation`] with the specified filter support.
275    /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was.
276    pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
277        Self {
278            filters,
279            updated_node: None,
280        }
281    }
282
283    /// Bind an updated node to the [`FilterPushdownPropagation`].
284    /// Use this when the current node wants to update itself in the tree or replace itself with a new node (e.g. one of it's children).
285    /// You do not need to call this if one of the children of the current node may have updated itself, that is handled by the optimizer.
286    pub fn with_updated_node(mut self, updated_node: T) -> Self {
287        self.updated_node = Some(updated_node);
288        self
289    }
290}
291
292/// Describes filter pushdown for a single child node.
293///
294/// This structure contains two types of filters:
295/// - **Parent filters**: Filters received from the parent node, marked as supported or unsupported
296/// - **Self filters**: Filters generated by the current node to be pushed down to this child
297#[derive(Debug, Clone)]
298pub struct ChildFilterDescription {
299    /// Description of which parent filters can be pushed down into this node.
300    /// Since we need to transmit filter pushdown results back to this node's parent
301    /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down.
302    pub(crate) parent_filters: Vec<PushedDownPredicate>,
303    /// Description of which filters this node is pushing down to its children.
304    /// Since this is not transmitted back to the parents we can have variable sized inner arrays
305    /// instead of having to track supported/unsupported.
306    pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
307}
308
309impl ChildFilterDescription {
310    /// Build a child filter description by analyzing which parent filters can be pushed to a specific child.
311    ///
312    /// This method performs column analysis to determine which filters can be pushed down:
313    /// - If all columns referenced by a filter exist in the child's schema, it can be pushed down
314    /// - Otherwise, it cannot be pushed down to that child
315    ///
316    /// See [`FilterDescription::from_children`] for more details
317    pub fn from_child(
318        parent_filters: &[Arc<dyn PhysicalExpr>],
319        child: &Arc<dyn crate::ExecutionPlan>,
320    ) -> Result<Self> {
321        let child_schema = child.schema();
322
323        // Get column names from child schema for quick lookup
324        let child_column_names: HashSet<&str> = child_schema
325            .fields()
326            .iter()
327            .map(|f| f.name().as_str())
328            .collect();
329
330        // Analyze each parent filter
331        let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
332
333        for filter in parent_filters {
334            // Check which columns the filter references
335            let referenced_columns = collect_columns(filter);
336
337            // Check if all referenced columns exist in the child schema
338            let all_columns_exist = referenced_columns
339                .iter()
340                .all(|col| child_column_names.contains(col.name()));
341
342            if all_columns_exist {
343                // All columns exist in child - we can push down
344                // Need to reassign column indices to match child schema
345                let reassigned_filter =
346                    reassign_expr_columns(Arc::clone(filter), &child_schema)?;
347                child_parent_filters
348                    .push(PushedDownPredicate::supported(reassigned_filter));
349            } else {
350                // Some columns don't exist in child - cannot push down
351                child_parent_filters
352                    .push(PushedDownPredicate::unsupported(Arc::clone(filter)));
353            }
354        }
355
356        Ok(Self {
357            parent_filters: child_parent_filters,
358            self_filters: vec![],
359        })
360    }
361
362    /// Add a self filter (from the current node) to be pushed down to this child.
363    pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
364        self.self_filters.push(filter);
365        self
366    }
367
368    /// Add multiple self filters.
369    pub fn with_self_filters(mut self, filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
370        self.self_filters.extend(filters);
371        self
372    }
373}
374
375/// Describes how filters should be pushed down to children.
376///
377/// This structure contains filter descriptions for each child node, specifying:
378/// - Which parent filters can be pushed down to each child
379/// - Which self-generated filters should be pushed down to each child
380///
381/// The filter routing is determined by column analysis - filters can only be pushed
382/// to children whose schemas contain all the referenced columns.
383#[derive(Debug, Clone)]
384pub struct FilterDescription {
385    /// A filter description for each child.
386    /// This includes which parent filters and which self filters (from the node in question)
387    /// will get pushed down to each child.
388    child_filter_descriptions: Vec<ChildFilterDescription>,
389}
390
391impl Default for FilterDescription {
392    fn default() -> Self {
393        Self::new()
394    }
395}
396
397impl FilterDescription {
398    /// Create a new empty FilterDescription
399    pub fn new() -> Self {
400        Self {
401            child_filter_descriptions: vec![],
402        }
403    }
404
405    /// Add a child filter description
406    pub fn with_child(mut self, child: ChildFilterDescription) -> Self {
407        self.child_filter_descriptions.push(child);
408        self
409    }
410
411    /// Build a filter description by analyzing which parent filters can be pushed to each child.
412    /// This method automatically determines filter routing based on column analysis:
413    /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down
414    /// - Otherwise, it cannot be pushed down to that child
415    pub fn from_children(
416        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
417        children: &[&Arc<dyn crate::ExecutionPlan>],
418    ) -> Result<Self> {
419        let mut desc = Self::new();
420
421        // For each child, create a ChildFilterDescription
422        for child in children {
423            desc = desc
424                .with_child(ChildFilterDescription::from_child(&parent_filters, child)?);
425        }
426
427        Ok(desc)
428    }
429
430    /// Mark all parent filters as unsupported for all children.
431    pub fn all_unsupported(
432        parent_filters: &[Arc<dyn PhysicalExpr>],
433        children: &[&Arc<dyn crate::ExecutionPlan>],
434    ) -> Self {
435        let mut desc = Self::new();
436        let child_filters = parent_filters
437            .iter()
438            .map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
439            .collect_vec();
440        for _ in 0..children.len() {
441            desc = desc.with_child(ChildFilterDescription {
442                parent_filters: child_filters.clone(),
443                self_filters: vec![],
444            });
445        }
446        desc
447    }
448
449    pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
450        self.child_filter_descriptions
451            .iter()
452            .map(|d| &d.parent_filters)
453            .cloned()
454            .collect()
455    }
456
457    pub fn self_filters(&self) -> Vec<Vec<Arc<dyn PhysicalExpr>>> {
458        self.child_filter_descriptions
459            .iter()
460            .map(|d| &d.self_filters)
461            .cloned()
462            .collect()
463    }
464}