datafusion_physical_plan/filter_pushdown.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively pushes down filters for each child,
25//! passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//! containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`ExecutionPlan::gather_filters_for_pushdown`]: crate::ExecutionPlan::gather_filters_for_pushdown
33//! [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
34//!
35//! See also datafusion/physical-optimizer/src/filter_pushdown.rs.
36
37use std::collections::HashSet;
38use std::sync::Arc;
39
40use datafusion_common::Result;
41use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
42use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
43use itertools::Itertools;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum FilterPushdownPhase {
47 /// Pushdown that happens before most other optimizations.
48 /// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down.
49 /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at this stage since the whole plan tree may be rewritten
50 /// by other optimizations.
51 /// Implementers are however allowed to modify the execution plan themselves during this phase, for example by returning a completely
52 /// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`].
53 ///
54 /// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a pre-pushdown.
55 /// Unlike filter pushdown in the logical phase, which operates on the logical plan to push filters into the logical table scan,
56 /// the `Pre` phase in the physical plan targets the actual physical scan, pushing filters down to specific data source implementations.
57 /// For example, Parquet supports filter pushdown to reduce data read during scanning, while CSV typically does not.
58 ///
59 /// [`ExecutionPlan`]: crate::ExecutionPlan
60 /// [`FilterExec`]: crate::filter::FilterExec
61 /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
62 Pre,
63 /// Pushdown that happens after most other optimizations.
64 /// This stage of filter pushdown allows filters that reference an [`ExecutionPlan`] to be pushed down.
65 /// Since subsequent optimizations should not change the structure of the plan tree except for calling [`ExecutionPlan::with_new_children`]
66 /// (which generally preserves internal references) it is safe for references between [`ExecutionPlan`]s to be established at this stage.
67 ///
68 /// This phase is used to link a [`SortExec`] (with a TopK operator) or a [`HashJoinExec`] to a `DataSourceExec`.
69 ///
70 /// [`ExecutionPlan`]: crate::ExecutionPlan
71 /// [`ExecutionPlan::with_new_children`]: crate::ExecutionPlan::with_new_children
72 /// [`SortExec`]: crate::sorts::sort::SortExec
73 /// [`HashJoinExec`]: crate::joins::HashJoinExec
74 /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
75 Post,
76}
77
78impl std::fmt::Display for FilterPushdownPhase {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 FilterPushdownPhase::Pre => write!(f, "Pre"),
82 FilterPushdownPhase::Post => write!(f, "Post"),
83 }
84 }
85}
86
87/// The result of a plan for pushing down a filter into a child node.
88/// This contains references to filters so that nodes can mutate a filter
89/// before pushing it down to a child node (e.g. to adjust a projection)
90/// or can directly take ownership of filters that their children
91/// could not handle.
92#[derive(Debug, Clone)]
93pub struct PushedDownPredicate {
94 pub discriminant: PushedDown,
95 pub predicate: Arc<dyn PhysicalExpr>,
96}
97
98impl PushedDownPredicate {
99 /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported.
100 pub fn into_inner(self) -> Arc<dyn PhysicalExpr> {
101 self.predicate
102 }
103
104 /// Create a new [`PushedDownPredicate`] with supported pushdown.
105 pub fn supported(predicate: Arc<dyn PhysicalExpr>) -> Self {
106 Self {
107 discriminant: PushedDown::Yes,
108 predicate,
109 }
110 }
111
112 /// Create a new [`PushedDownPredicate`] with unsupported pushdown.
113 pub fn unsupported(predicate: Arc<dyn PhysicalExpr>) -> Self {
114 Self {
115 discriminant: PushedDown::No,
116 predicate,
117 }
118 }
119}
120
121/// Discriminant for the result of pushing down a filter into a child node.
122#[derive(Debug, Clone, Copy)]
123pub enum PushedDown {
124 /// The predicate was successfully pushed down into the child node.
125 Yes,
126 /// The predicate could not be pushed down into the child node.
127 No,
128}
129
130impl PushedDown {
131 /// Logical AND operation: returns `Yes` only if both operands are `Yes`.
132 pub fn and(self, other: PushedDown) -> PushedDown {
133 match (self, other) {
134 (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes,
135 _ => PushedDown::No,
136 }
137 }
138
139 /// Logical OR operation: returns `Yes` if either operand is `Yes`.
140 pub fn or(self, other: PushedDown) -> PushedDown {
141 match (self, other) {
142 (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes,
143 (PushedDown::No, PushedDown::No) => PushedDown::No,
144 }
145 }
146
147 /// Wrap a [`PhysicalExpr`] with this pushdown result.
148 pub fn wrap_expression(self, expr: Arc<dyn PhysicalExpr>) -> PushedDownPredicate {
149 PushedDownPredicate {
150 discriminant: self,
151 predicate: expr,
152 }
153 }
154}
155
156/// The result of pushing down a single parent filter into all children.
157#[derive(Debug, Clone)]
158pub struct ChildFilterPushdownResult {
159 pub filter: Arc<dyn PhysicalExpr>,
160 pub child_results: Vec<PushedDown>,
161}
162
163impl ChildFilterPushdownResult {
164 /// Combine all child results using OR logic.
165 /// Returns `Yes` if **any** child supports the filter.
166 /// Returns `No` if **all** children reject the filter or if there are no children.
167 pub fn any(&self) -> PushedDown {
168 if self.child_results.is_empty() {
169 // If there are no children, filters cannot be supported
170 PushedDown::No
171 } else {
172 self.child_results
173 .iter()
174 .fold(PushedDown::No, |acc, result| acc.or(*result))
175 }
176 }
177
178 /// Combine all child results using AND logic.
179 /// Returns `Yes` if **all** children support the filter.
180 /// Returns `No` if **any** child rejects the filter or if there are no children.
181 pub fn all(&self) -> PushedDown {
182 if self.child_results.is_empty() {
183 // If there are no children, filters cannot be supported
184 PushedDown::No
185 } else {
186 self.child_results
187 .iter()
188 .fold(PushedDown::Yes, |acc, result| acc.and(*result))
189 }
190 }
191}
192
193/// The result of pushing down filters into a child node.
194///
195/// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`].
196/// Nodes process this result and convert it into a [`FilterPushdownPropagation`]
197/// that is returned to their parent.
198///
199/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
200#[derive(Debug, Clone)]
201pub struct ChildPushdownResult {
202 /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called.
203 /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them
204 /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`].
205 /// Attached to each filter is a [`PushedDown`] *per child* that indicates whether the filter was supported or unsupported by each child.
206 /// To get combined results see [`ChildFilterPushdownResult::any`] and [`ChildFilterPushdownResult::all`].
207 pub parent_filters: Vec<ChildFilterPushdownResult>,
208 /// The result of pushing down each filter this node provided into each of it's children.
209 /// The outer vector corresponds to each child, and the inner vector corresponds to each filter.
210 /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all.
211 /// It is up to each node to interpret this result based on the filters it provided for each child in [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result).
212 pub self_filters: Vec<Vec<PushedDownPredicate>>,
213}
214
215/// The result of pushing down filters into a node.
216///
217/// Returned from [`ExecutionPlan::handle_child_pushdown_result`] to communicate
218/// to the optimizer:
219///
220/// 1. What to do with any parent filters that were could not be pushed down into the children.
221/// 2. If the node needs to be replaced in the execution plan with a new node or not.
222///
223/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
224#[derive(Debug, Clone)]
225pub struct FilterPushdownPropagation<T> {
226 /// What filters were pushed into the parent node.
227 pub filters: Vec<PushedDown>,
228 /// The updated node, if it was updated during pushdown
229 pub updated_node: Option<T>,
230}
231
232impl<T> FilterPushdownPropagation<T> {
233 /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
234 /// is supported if it was supported by *all* children.
235 pub fn if_all(child_pushdown_result: ChildPushdownResult) -> Self {
236 let filters = child_pushdown_result
237 .parent_filters
238 .into_iter()
239 .map(|result| result.all())
240 .collect();
241 Self {
242 filters,
243 updated_node: None,
244 }
245 }
246
247 /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
248 /// is supported if it was supported by *any* child.
249 pub fn if_any(child_pushdown_result: ChildPushdownResult) -> Self {
250 let filters = child_pushdown_result
251 .parent_filters
252 .into_iter()
253 .map(|result| result.any())
254 .collect();
255 Self {
256 filters,
257 updated_node: None,
258 }
259 }
260
261 /// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results.
262 pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
263 let filters = child_pushdown_result
264 .parent_filters
265 .into_iter()
266 .map(|_| PushedDown::No)
267 .collect();
268 Self {
269 filters,
270 updated_node: None,
271 }
272 }
273
274 /// Create a new [`FilterPushdownPropagation`] with the specified filter support.
275 /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was.
276 pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
277 Self {
278 filters,
279 updated_node: None,
280 }
281 }
282
283 /// Bind an updated node to the [`FilterPushdownPropagation`].
284 /// Use this when the current node wants to update itself in the tree or replace itself with a new node (e.g. one of it's children).
285 /// You do not need to call this if one of the children of the current node may have updated itself, that is handled by the optimizer.
286 pub fn with_updated_node(mut self, updated_node: T) -> Self {
287 self.updated_node = Some(updated_node);
288 self
289 }
290}
291
292/// Describes filter pushdown for a single child node.
293///
294/// This structure contains two types of filters:
295/// - **Parent filters**: Filters received from the parent node, marked as supported or unsupported
296/// - **Self filters**: Filters generated by the current node to be pushed down to this child
297#[derive(Debug, Clone)]
298pub struct ChildFilterDescription {
299 /// Description of which parent filters can be pushed down into this node.
300 /// Since we need to transmit filter pushdown results back to this node's parent
301 /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down.
302 pub(crate) parent_filters: Vec<PushedDownPredicate>,
303 /// Description of which filters this node is pushing down to its children.
304 /// Since this is not transmitted back to the parents we can have variable sized inner arrays
305 /// instead of having to track supported/unsupported.
306 pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
307}
308
309impl ChildFilterDescription {
310 /// Build a child filter description by analyzing which parent filters can be pushed to a specific child.
311 ///
312 /// This method performs column analysis to determine which filters can be pushed down:
313 /// - If all columns referenced by a filter exist in the child's schema, it can be pushed down
314 /// - Otherwise, it cannot be pushed down to that child
315 ///
316 /// See [`FilterDescription::from_children`] for more details
317 pub fn from_child(
318 parent_filters: &[Arc<dyn PhysicalExpr>],
319 child: &Arc<dyn crate::ExecutionPlan>,
320 ) -> Result<Self> {
321 let child_schema = child.schema();
322
323 // Get column names from child schema for quick lookup
324 let child_column_names: HashSet<&str> = child_schema
325 .fields()
326 .iter()
327 .map(|f| f.name().as_str())
328 .collect();
329
330 // Analyze each parent filter
331 let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
332
333 for filter in parent_filters {
334 // Check which columns the filter references
335 let referenced_columns = collect_columns(filter);
336
337 // Check if all referenced columns exist in the child schema
338 let all_columns_exist = referenced_columns
339 .iter()
340 .all(|col| child_column_names.contains(col.name()));
341
342 if all_columns_exist {
343 // All columns exist in child - we can push down
344 // Need to reassign column indices to match child schema
345 let reassigned_filter =
346 reassign_expr_columns(Arc::clone(filter), &child_schema)?;
347 child_parent_filters
348 .push(PushedDownPredicate::supported(reassigned_filter));
349 } else {
350 // Some columns don't exist in child - cannot push down
351 child_parent_filters
352 .push(PushedDownPredicate::unsupported(Arc::clone(filter)));
353 }
354 }
355
356 Ok(Self {
357 parent_filters: child_parent_filters,
358 self_filters: vec![],
359 })
360 }
361
362 /// Add a self filter (from the current node) to be pushed down to this child.
363 pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
364 self.self_filters.push(filter);
365 self
366 }
367
368 /// Add multiple self filters.
369 pub fn with_self_filters(mut self, filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
370 self.self_filters.extend(filters);
371 self
372 }
373}
374
375/// Describes how filters should be pushed down to children.
376///
377/// This structure contains filter descriptions for each child node, specifying:
378/// - Which parent filters can be pushed down to each child
379/// - Which self-generated filters should be pushed down to each child
380///
381/// The filter routing is determined by column analysis - filters can only be pushed
382/// to children whose schemas contain all the referenced columns.
383#[derive(Debug, Clone)]
384pub struct FilterDescription {
385 /// A filter description for each child.
386 /// This includes which parent filters and which self filters (from the node in question)
387 /// will get pushed down to each child.
388 child_filter_descriptions: Vec<ChildFilterDescription>,
389}
390
391impl Default for FilterDescription {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397impl FilterDescription {
398 /// Create a new empty FilterDescription
399 pub fn new() -> Self {
400 Self {
401 child_filter_descriptions: vec![],
402 }
403 }
404
405 /// Add a child filter description
406 pub fn with_child(mut self, child: ChildFilterDescription) -> Self {
407 self.child_filter_descriptions.push(child);
408 self
409 }
410
411 /// Build a filter description by analyzing which parent filters can be pushed to each child.
412 /// This method automatically determines filter routing based on column analysis:
413 /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down
414 /// - Otherwise, it cannot be pushed down to that child
415 pub fn from_children(
416 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
417 children: &[&Arc<dyn crate::ExecutionPlan>],
418 ) -> Result<Self> {
419 let mut desc = Self::new();
420
421 // For each child, create a ChildFilterDescription
422 for child in children {
423 desc = desc
424 .with_child(ChildFilterDescription::from_child(&parent_filters, child)?);
425 }
426
427 Ok(desc)
428 }
429
430 /// Mark all parent filters as unsupported for all children.
431 pub fn all_unsupported(
432 parent_filters: &[Arc<dyn PhysicalExpr>],
433 children: &[&Arc<dyn crate::ExecutionPlan>],
434 ) -> Self {
435 let mut desc = Self::new();
436 let child_filters = parent_filters
437 .iter()
438 .map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
439 .collect_vec();
440 for _ in 0..children.len() {
441 desc = desc.with_child(ChildFilterDescription {
442 parent_filters: child_filters.clone(),
443 self_filters: vec![],
444 });
445 }
446 desc
447 }
448
449 pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
450 self.child_filter_descriptions
451 .iter()
452 .map(|d| &d.parent_filters)
453 .cloned()
454 .collect()
455 }
456
457 pub fn self_filters(&self) -> Vec<Vec<Arc<dyn PhysicalExpr>>> {
458 self.child_filter_descriptions
459 .iter()
460 .map(|d| &d.self_filters)
461 .cloned()
462 .collect()
463 }
464}