datafusion_physical_plan/filter_pushdown.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively pushes down filters for each child,
25//! passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//! containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`ExecutionPlan::gather_filters_for_pushdown`]: crate::ExecutionPlan::gather_filters_for_pushdown
33//! [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
34//!
35//! See also datafusion/physical-optimizer/src/filter_pushdown.rs.
36
37use std::collections::HashSet;
38use std::sync::Arc;
39
40use arrow_schema::SchemaRef;
41use datafusion_common::{
42 Result,
43 tree_node::{Transformed, TreeNode},
44};
45use datafusion_physical_expr::expressions::Column;
46use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushdownPhase {
50 /// Pushdown that happens before most other optimizations.
51 /// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down.
52 /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at this stage since the whole plan tree may be rewritten
53 /// by other optimizations.
54 /// Implementers are however allowed to modify the execution plan themselves during this phase, for example by returning a completely
55 /// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`].
56 ///
57 /// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a pre-pushdown.
58 /// Unlike filter pushdown in the logical phase, which operates on the logical plan to push filters into the logical table scan,
59 /// the `Pre` phase in the physical plan targets the actual physical scan, pushing filters down to specific data source implementations.
60 /// For example, Parquet supports filter pushdown to reduce data read during scanning, while CSV typically does not.
61 ///
62 /// [`ExecutionPlan`]: crate::ExecutionPlan
63 /// [`FilterExec`]: crate::filter::FilterExec
64 /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
65 Pre,
66 /// Pushdown that happens after most other optimizations.
67 /// This stage of filter pushdown allows filters that reference an [`ExecutionPlan`] to be pushed down.
68 /// Since subsequent optimizations should not change the structure of the plan tree except for calling [`ExecutionPlan::with_new_children`]
69 /// (which generally preserves internal references) it is safe for references between [`ExecutionPlan`]s to be established at this stage.
70 ///
71 /// This phase is used to link a [`SortExec`] (with a TopK operator) or a [`HashJoinExec`] to a `DataSourceExec`.
72 ///
73 /// [`ExecutionPlan`]: crate::ExecutionPlan
74 /// [`ExecutionPlan::with_new_children`]: crate::ExecutionPlan::with_new_children
75 /// [`SortExec`]: crate::sorts::sort::SortExec
76 /// [`HashJoinExec`]: crate::joins::HashJoinExec
77 /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
78 Post,
79}
80
81impl std::fmt::Display for FilterPushdownPhase {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 FilterPushdownPhase::Pre => write!(f, "Pre"),
85 FilterPushdownPhase::Post => write!(f, "Post"),
86 }
87 }
88}
89
90/// The result of a plan for pushing down a filter into a child node.
91/// This contains references to filters so that nodes can mutate a filter
92/// before pushing it down to a child node (e.g. to adjust a projection)
93/// or can directly take ownership of filters that their children
94/// could not handle.
95#[derive(Debug, Clone)]
96pub struct PushedDownPredicate {
97 pub discriminant: PushedDown,
98 pub predicate: Arc<dyn PhysicalExpr>,
99}
100
101impl PushedDownPredicate {
102 /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported.
103 pub fn into_inner(self) -> Arc<dyn PhysicalExpr> {
104 self.predicate
105 }
106
107 /// Create a new [`PushedDownPredicate`] with supported pushdown.
108 pub fn supported(predicate: Arc<dyn PhysicalExpr>) -> Self {
109 Self {
110 discriminant: PushedDown::Yes,
111 predicate,
112 }
113 }
114
115 /// Create a new [`PushedDownPredicate`] with unsupported pushdown.
116 pub fn unsupported(predicate: Arc<dyn PhysicalExpr>) -> Self {
117 Self {
118 discriminant: PushedDown::No,
119 predicate,
120 }
121 }
122}
123
124/// Discriminant for the result of pushing down a filter into a child node.
125#[derive(Debug, Clone, Copy)]
126pub enum PushedDown {
127 /// The predicate was successfully pushed down into the child node.
128 Yes,
129 /// The predicate could not be pushed down into the child node.
130 No,
131}
132
133impl PushedDown {
134 /// Logical AND operation: returns `Yes` only if both operands are `Yes`.
135 pub fn and(self, other: PushedDown) -> PushedDown {
136 match (self, other) {
137 (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes,
138 _ => PushedDown::No,
139 }
140 }
141
142 /// Logical OR operation: returns `Yes` if either operand is `Yes`.
143 pub fn or(self, other: PushedDown) -> PushedDown {
144 match (self, other) {
145 (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes,
146 (PushedDown::No, PushedDown::No) => PushedDown::No,
147 }
148 }
149
150 /// Wrap a [`PhysicalExpr`] with this pushdown result.
151 pub fn wrap_expression(self, expr: Arc<dyn PhysicalExpr>) -> PushedDownPredicate {
152 PushedDownPredicate {
153 discriminant: self,
154 predicate: expr,
155 }
156 }
157}
158
159/// The result of pushing down a single parent filter into all children.
160#[derive(Debug, Clone)]
161pub struct ChildFilterPushdownResult {
162 pub filter: Arc<dyn PhysicalExpr>,
163 pub child_results: Vec<PushedDown>,
164}
165
166impl ChildFilterPushdownResult {
167 /// Combine all child results using OR logic.
168 /// Returns `Yes` if **any** child supports the filter.
169 /// Returns `No` if **all** children reject the filter or if there are no children.
170 pub fn any(&self) -> PushedDown {
171 if self.child_results.is_empty() {
172 // If there are no children, filters cannot be supported
173 PushedDown::No
174 } else {
175 self.child_results
176 .iter()
177 .fold(PushedDown::No, |acc, result| acc.or(*result))
178 }
179 }
180
181 /// Combine all child results using AND logic.
182 /// Returns `Yes` if **all** children support the filter.
183 /// Returns `No` if **any** child rejects the filter or if there are no children.
184 pub fn all(&self) -> PushedDown {
185 if self.child_results.is_empty() {
186 // If there are no children, filters cannot be supported
187 PushedDown::No
188 } else {
189 self.child_results
190 .iter()
191 .fold(PushedDown::Yes, |acc, result| acc.and(*result))
192 }
193 }
194}
195
196/// The result of pushing down filters into a child node.
197///
198/// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`].
199/// Nodes process this result and convert it into a [`FilterPushdownPropagation`]
200/// that is returned to their parent.
201///
202/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
203#[derive(Debug, Clone)]
204pub struct ChildPushdownResult {
205 /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called.
206 /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them
207 /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`].
208 /// Attached to each filter is a [`PushedDown`] *per child* that indicates whether the filter was supported or unsupported by each child.
209 /// To get combined results see [`ChildFilterPushdownResult::any`] and [`ChildFilterPushdownResult::all`].
210 pub parent_filters: Vec<ChildFilterPushdownResult>,
211 /// The result of pushing down each filter this node provided into each of it's children.
212 /// The outer vector corresponds to each child, and the inner vector corresponds to each filter.
213 /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all.
214 /// It is up to each node to interpret this result based on the filters it provided for each child in [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result).
215 pub self_filters: Vec<Vec<PushedDownPredicate>>,
216}
217
218/// The result of pushing down filters into a node.
219///
220/// Returned from [`ExecutionPlan::handle_child_pushdown_result`] to communicate
221/// to the optimizer:
222///
223/// 1. What to do with any parent filters that could not be pushed down into the children.
224/// 2. If the node needs to be replaced in the execution plan with a new node or not.
225///
226/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result
227#[derive(Debug, Clone)]
228pub struct FilterPushdownPropagation<T> {
229 /// Which parent filters were pushed down into this node's children.
230 pub filters: Vec<PushedDown>,
231 /// The updated node, if it was updated during pushdown
232 pub updated_node: Option<T>,
233}
234
235impl<T> FilterPushdownPropagation<T> {
236 /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
237 /// is supported if it was supported by *all* children.
238 pub fn if_all(child_pushdown_result: ChildPushdownResult) -> Self {
239 let filters = child_pushdown_result
240 .parent_filters
241 .into_iter()
242 .map(|result| result.all())
243 .collect();
244 Self {
245 filters,
246 updated_node: None,
247 }
248 }
249
250 /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter
251 /// is supported if it was supported by *any* child.
252 pub fn if_any(child_pushdown_result: ChildPushdownResult) -> Self {
253 let filters = child_pushdown_result
254 .parent_filters
255 .into_iter()
256 .map(|result| result.any())
257 .collect();
258 Self {
259 filters,
260 updated_node: None,
261 }
262 }
263
264 /// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results.
265 pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
266 let filters = child_pushdown_result
267 .parent_filters
268 .into_iter()
269 .map(|_| PushedDown::No)
270 .collect();
271 Self {
272 filters,
273 updated_node: None,
274 }
275 }
276
277 /// Create a new [`FilterPushdownPropagation`] with the specified filter support.
278 /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was.
279 pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
280 Self {
281 filters,
282 updated_node: None,
283 }
284 }
285
286 /// Bind an updated node to the [`FilterPushdownPropagation`].
287 /// Use this when the current node wants to update itself in the tree or replace itself with a new node (e.g. one of it's children).
288 /// You do not need to call this if one of the children of the current node may have updated itself, that is handled by the optimizer.
289 pub fn with_updated_node(mut self, updated_node: T) -> Self {
290 self.updated_node = Some(updated_node);
291 self
292 }
293}
294
295/// Describes filter pushdown for a single child node.
296///
297/// This structure contains two types of filters:
298/// - **Parent filters**: Filters received from the parent node, marked as supported or unsupported
299/// - **Self filters**: Filters generated by the current node to be pushed down to this child
300#[derive(Debug, Clone)]
301pub struct ChildFilterDescription {
302 /// Description of which parent filters can be pushed down into this node.
303 /// Since we need to transmit filter pushdown results back to this node's parent
304 /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down.
305 pub(crate) parent_filters: Vec<PushedDownPredicate>,
306 /// Description of which filters this node is pushing down to its children.
307 /// Since this is not transmitted back to the parents we can have variable sized inner arrays
308 /// instead of having to track supported/unsupported.
309 pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
310}
311
312/// Validates and remaps filter column references to a target schema in one step.
313///
314/// When pushing filters from a parent to a child node, we need to:
315/// 1. Verify that all columns referenced by the filter exist in the target
316/// 2. Remap column indices to match the target schema
317///
318/// `allowed_indices` controls which column indices (in the parent schema) are
319/// considered valid. For single-input nodes this defaults to
320/// `0..child_schema.len()` (all columns are reachable). For join nodes it is
321/// restricted to the subset of output columns that map to the target child,
322/// which is critical when different sides have same-named columns.
323pub(crate) struct FilterRemapper {
324 /// The target schema to remap column indices into.
325 child_schema: SchemaRef,
326 /// Only columns at these indices (in the *parent* schema) are considered
327 /// valid. For non-join nodes this defaults to `0..child_schema.len()`.
328 allowed_indices: HashSet<usize>,
329}
330
331impl FilterRemapper {
332 /// Create a remapper that accepts any column whose index falls within
333 /// `0..child_schema.len()` and whose name exists in the target schema.
334 pub(crate) fn new(child_schema: SchemaRef) -> Self {
335 let allowed_indices = (0..child_schema.fields().len()).collect();
336 Self {
337 child_schema,
338 allowed_indices,
339 }
340 }
341
342 /// Create a remapper that only accepts columns at the given indices.
343 /// This is used by join nodes to restrict pushdown to one side of the
344 /// join when both sides have same-named columns.
345 fn with_allowed_indices(
346 child_schema: SchemaRef,
347 allowed_indices: HashSet<usize>,
348 ) -> Self {
349 Self {
350 child_schema,
351 allowed_indices,
352 }
353 }
354
355 /// Try to remap a filter's column references to the target schema.
356 ///
357 /// Validates and remaps in a single tree traversal: for each column,
358 /// checks that its index is in the allowed set and that
359 /// its name exists in the target schema, then remaps the index.
360 /// Returns `Some(remapped)` if all columns are valid, or `None` if any
361 /// column fails validation.
362 pub(crate) fn try_remap(
363 &self,
364 filter: &Arc<dyn PhysicalExpr>,
365 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
366 let mut all_valid = true;
367 let transformed = Arc::clone(filter).transform_down(|expr| {
368 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
369 if self.allowed_indices.contains(&col.index())
370 && let Ok(new_index) = self.child_schema.index_of(col.name())
371 {
372 Ok(Transformed::yes(Arc::new(Column::new(
373 col.name(),
374 new_index,
375 ))))
376 } else {
377 all_valid = false;
378 Ok(Transformed::complete(expr))
379 }
380 } else {
381 Ok(Transformed::no(expr))
382 }
383 })?;
384
385 Ok(all_valid.then_some(transformed.data))
386 }
387}
388
389impl ChildFilterDescription {
390 /// Build a child filter description by analyzing which parent filters can be pushed to a specific child.
391 ///
392 /// This method performs column analysis to determine which filters can be pushed down:
393 /// - If all columns referenced by a filter exist in the child's schema, it can be pushed down
394 /// - Otherwise, it cannot be pushed down to that child
395 ///
396 /// See [`FilterDescription::from_children`] for more details
397 pub fn from_child(
398 parent_filters: &[Arc<dyn PhysicalExpr>],
399 child: &Arc<dyn crate::ExecutionPlan>,
400 ) -> Result<Self> {
401 let remapper = FilterRemapper::new(child.schema());
402 Self::remap_filters(parent_filters, &remapper)
403 }
404
405 /// Like [`Self::from_child`], but restricts which parent-level columns are
406 /// considered reachable through this child.
407 ///
408 /// `allowed_indices` is the set of column indices (in the *parent*
409 /// schema) that map to this child's side of a join. A filter is only
410 /// eligible for pushdown when **every** column index it references
411 /// appears in `allowed_indices`.
412 ///
413 /// This prevents incorrect pushdown when different join sides have
414 /// columns with the same name: matching on index ensures a filter
415 /// referencing the right side's `k@2` is not pushed to the left side
416 /// which also has a column named `k` but at a different index.
417 pub fn from_child_with_allowed_indices(
418 parent_filters: &[Arc<dyn PhysicalExpr>],
419 allowed_indices: HashSet<usize>,
420 child: &Arc<dyn crate::ExecutionPlan>,
421 ) -> Result<Self> {
422 let remapper =
423 FilterRemapper::with_allowed_indices(child.schema(), allowed_indices);
424 Self::remap_filters(parent_filters, &remapper)
425 }
426
427 fn remap_filters(
428 parent_filters: &[Arc<dyn PhysicalExpr>],
429 remapper: &FilterRemapper,
430 ) -> Result<Self> {
431 let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
432 for filter in parent_filters {
433 if let Some(remapped) = remapper.try_remap(filter)? {
434 child_parent_filters.push(PushedDownPredicate::supported(remapped));
435 } else {
436 child_parent_filters
437 .push(PushedDownPredicate::unsupported(Arc::clone(filter)));
438 }
439 }
440
441 Ok(Self {
442 parent_filters: child_parent_filters,
443 self_filters: vec![],
444 })
445 }
446
447 /// Mark all parent filters as unsupported for this child.
448 pub fn all_unsupported(parent_filters: &[Arc<dyn PhysicalExpr>]) -> Self {
449 Self {
450 parent_filters: parent_filters
451 .iter()
452 .map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
453 .collect(),
454 self_filters: vec![],
455 }
456 }
457
458 /// Add a self filter (from the current node) to be pushed down to this child.
459 pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
460 self.self_filters.push(filter);
461 self
462 }
463
464 /// Add multiple self filters.
465 pub fn with_self_filters(mut self, filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
466 self.self_filters.extend(filters);
467 self
468 }
469}
470
471/// Describes how filters should be pushed down to children.
472///
473/// This structure contains filter descriptions for each child node, specifying:
474/// - Which parent filters can be pushed down to each child
475/// - Which self-generated filters should be pushed down to each child
476///
477/// The filter routing is determined by column analysis - filters can only be pushed
478/// to children whose schemas contain all the referenced columns.
479#[derive(Debug, Clone)]
480pub struct FilterDescription {
481 /// A filter description for each child.
482 /// This includes which parent filters and which self filters (from the node in question)
483 /// will get pushed down to each child.
484 child_filter_descriptions: Vec<ChildFilterDescription>,
485}
486
487impl Default for FilterDescription {
488 fn default() -> Self {
489 Self::new()
490 }
491}
492
493impl FilterDescription {
494 /// Create a new empty FilterDescription
495 pub fn new() -> Self {
496 Self {
497 child_filter_descriptions: vec![],
498 }
499 }
500
501 /// Add a child filter description
502 pub fn with_child(mut self, child: ChildFilterDescription) -> Self {
503 self.child_filter_descriptions.push(child);
504 self
505 }
506
507 /// Build a filter description by analyzing which parent filters can be pushed to each child.
508 /// This method automatically determines filter routing based on column analysis:
509 /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down
510 /// - Otherwise, it cannot be pushed down to that child
511 #[expect(clippy::needless_pass_by_value)]
512 pub fn from_children(
513 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
514 children: &[&Arc<dyn crate::ExecutionPlan>],
515 ) -> Result<Self> {
516 let mut desc = Self::new();
517
518 // For each child, create a ChildFilterDescription
519 for child in children {
520 desc = desc
521 .with_child(ChildFilterDescription::from_child(&parent_filters, child)?);
522 }
523
524 Ok(desc)
525 }
526
527 /// Mark all parent filters as unsupported for all children.
528 pub fn all_unsupported(
529 parent_filters: &[Arc<dyn PhysicalExpr>],
530 children: &[&Arc<dyn crate::ExecutionPlan>],
531 ) -> Self {
532 let mut desc = Self::new();
533 for _ in 0..children.len() {
534 desc =
535 desc.with_child(ChildFilterDescription::all_unsupported(parent_filters));
536 }
537 desc
538 }
539
540 pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
541 self.child_filter_descriptions
542 .iter()
543 .map(|d| &d.parent_filters)
544 .cloned()
545 .collect()
546 }
547
548 pub fn self_filters(&self) -> Vec<Vec<Arc<dyn PhysicalExpr>>> {
549 self.child_filter_descriptions
550 .iter()
551 .map(|d| &d.self_filters)
552 .cloned()
553 .collect()
554 }
555}