datafusion_physical_optimizer/
filter_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//!    on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//!    by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child,
25//!    passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//!    containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//!    passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//!    how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription
33
34use std::sync::Arc;
35
36use crate::PhysicalOptimizerRule;
37
38use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
39use datafusion_common::{Result, assert_eq_or_internal_err, config::ConfigOptions};
40use datafusion_physical_expr::PhysicalExpr;
41use datafusion_physical_expr_common::physical_expr::is_volatile;
42use datafusion_physical_plan::filter_pushdown::{
43    ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase,
44    FilterPushdownPropagation, PushedDown,
45};
46use datafusion_physical_plan::{ExecutionPlan, with_new_children_if_necessary};
47
48use itertools::{Itertools, izip};
49
50/// Attempts to recursively push given filters from the top of the tree into leaves.
51///
52/// # Default Implementation
53///
54/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`]
55/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that:
56///
57/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`])
58/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]).
59/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]).
60///
61/// # Example: Push filter into a `DataSourceExec`
62///
63/// For example, consider the following plan:
64///
65/// ```text
66/// ┌──────────────────────┐
67/// │ CoalesceBatchesExec  │
68/// └──────────────────────┘
69///             │
70///             ▼
71/// ┌──────────────────────┐
72/// │      FilterExec      │
73/// │  filters = [ id=1]   │
74/// └──────────────────────┘
75///             │
76///             ▼
77/// ┌──────────────────────┐
78/// │    DataSourceExec    │
79/// │    projection = *    │
80/// └──────────────────────┘
81/// ```
82///
83/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node.
84///
85/// If this filter is selective pushing it into the scan can avoid massive
86/// amounts of data being read from the source (the projection is `*` so all
87/// matching columns are read).
88///
89/// The new plan looks like:
90///
91/// ```text
92/// ┌──────────────────────┐
93/// │ CoalesceBatchesExec  │
94/// └──────────────────────┘
95///           │
96///           ▼
97/// ┌──────────────────────┐
98/// │    DataSourceExec    │
99/// │    projection = *    │
100/// │   filters = [ id=1]  │
101/// └──────────────────────┘
102/// ```
103///
104/// # Example: Push filters with `ProjectionExec`
105///
106/// Let's consider a more complex example involving a [`ProjectionExec`]
107/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
108/// creates a new column that the filter depends on.
109///
110/// ```text
111/// ┌──────────────────────┐
112/// │ CoalesceBatchesExec  │
113/// └──────────────────────┘
114///             │
115///             ▼
116/// ┌──────────────────────┐
117/// │      FilterExec      │
118/// │    filters =         │
119/// │     [cost>50,id=1]   │
120/// └──────────────────────┘
121///             │
122///             ▼
123/// ┌──────────────────────┐
124/// │    ProjectionExec    │
125/// │ cost = price * 1.2   │
126/// └──────────────────────┘
127///             │
128///             ▼
129/// ┌──────────────────────┐
130/// │    DataSourceExec    │
131/// │    projection = *    │
132/// └──────────────────────┘
133/// ```
134///
135/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
136/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
137/// node to be executed first. A simple thing to do would be to split up the
138/// filter into two separate filters and push down the first one:
139///
140/// ```text
141/// ┌──────────────────────┐
142/// │ CoalesceBatchesExec  │
143/// └──────────────────────┘
144///             │
145///             ▼
146/// ┌──────────────────────┐
147/// │      FilterExec      │
148/// │    filters =         │
149/// │     [cost>50]        │
150/// └──────────────────────┘
151///             │
152///             ▼
153/// ┌──────────────────────┐
154/// │    ProjectionExec    │
155/// │ cost = price * 1.2   │
156/// └──────────────────────┘
157///             │
158///             ▼
159/// ┌──────────────────────┐
160/// │    DataSourceExec    │
161/// │    projection = *    │
162/// │   filters = [ id=1]  │
163/// └──────────────────────┘
164/// ```
165///
166/// We can actually however do better by pushing down `price * 1.2 > 50`
167/// instead of `cost > 50`:
168///
169/// ```text
170/// ┌──────────────────────┐
171/// │ CoalesceBatchesExec  │
172/// └──────────────────────┘
173///            │
174///            ▼
175/// ┌──────────────────────┐
176/// │    ProjectionExec    │
177/// │ cost = price * 1.2   │
178/// └──────────────────────┘
179///            │
180///            ▼
181/// ┌──────────────────────┐
182/// │    DataSourceExec    │
183/// │    projection = *    │
184/// │   filters = [id=1,   │
185/// │   price * 1.2 > 50]  │
186/// └──────────────────────┘
187/// ```
188///
189/// # Example: Push filters within a subtree
190///
191/// There are also cases where we may be able to push down filters within a
192/// subtree but not the entire tree. A good example of this is aggregation
193/// nodes:
194///
195/// ```text
196/// ┌──────────────────────┐
197/// │ ProjectionExec       │
198/// │ projection = *       │
199/// └──────────────────────┘
200///           │
201///           ▼
202/// ┌──────────────────────┐
203/// │ FilterExec           │
204/// │ filters = [sum > 10] │
205/// └──────────────────────┘
206///           │
207///           ▼
208/// ┌───────────────────────┐
209/// │     AggregateExec     │
210/// │    group by = [id]    │
211/// │    aggregate =        │
212/// │      [sum(price)]     │
213/// └───────────────────────┘
214///           │
215///           ▼
216/// ┌──────────────────────┐
217/// │ FilterExec           │
218/// │ filters = [id=1]     │
219/// └──────────────────────┘
220///          │
221///          ▼
222/// ┌──────────────────────┐
223/// │ DataSourceExec       │
224/// │ projection = *       │
225/// └──────────────────────┘
226/// ```
227///
228/// The transformation here is to push down the `id=1` filter to the
229/// `DataSourceExec` node:
230///
231/// ```text
232/// ┌──────────────────────┐
233/// │ ProjectionExec       │
234/// │ projection = *       │
235/// └──────────────────────┘
236///           │
237///           ▼
238/// ┌──────────────────────┐
239/// │ FilterExec           │
240/// │ filters = [sum > 10] │
241/// └──────────────────────┘
242///           │
243///           ▼
244/// ┌───────────────────────┐
245/// │     AggregateExec     │
246/// │    group by = [id]    │
247/// │    aggregate =        │
248/// │      [sum(price)]     │
249/// └───────────────────────┘
250///           │
251///           ▼
252/// ┌──────────────────────┐
253/// │ DataSourceExec       │
254/// │ projection = *       │
255/// │ filters = [id=1]     │
256/// └──────────────────────┘
257/// ```
258///
259/// The point here is that:
260/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node.
261///    Any filters above the [`AggregateExec`] node are not pushed down.
262///    This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node.
263/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push
264///    down the `id=1` filter.
265///
266/// # Example: Push filters through Joins
267///
268/// It is also possible to push down filters through joins and filters that
269/// originate from joins. For example, a hash join where we build a hash
270/// table of the left side and probe the right side (ignoring why we would
271/// choose this order, typically it depends on the size of each table,
272/// etc.).
273///
274/// ```text
275///              ┌─────────────────────┐
276///              │     FilterExec      │
277///              │ filters =           │
278///              │  [d.size > 100]     │
279///              └─────────────────────┘
280///                         │
281///                         │
282///              ┌──────────▼──────────┐
283///              │                     │
284///              │    HashJoinExec     │
285///              │ [u.dept@hash(d.id)] │
286///              │                     │
287///              └─────────────────────┘
288///                         │
289///            ┌────────────┴────────────┐
290/// ┌──────────▼──────────┐   ┌──────────▼──────────┐
291/// │   DataSourceExec    │   │   DataSourceExec    │
292/// │  alias [users as u] │   │  alias [dept as d]  │
293/// │                     │   │                     │
294/// └─────────────────────┘   └─────────────────────┘
295/// ```
296///
297/// There are two pushdowns we can do here:
298/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec`
299///    node for the `departments` table.
300/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading
301///    rows from the `users` table that will be eliminated by the join.
302///    This can be done via a bloom filter or similar and is not (yet) supported
303///    in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
304///
305/// ```text
306///              ┌─────────────────────┐
307///              │                     │
308///              │    HashJoinExec     │
309///              │ [u.dept@hash(d.id)] │
310///              │                     │
311///              └─────────────────────┘
312///                         │
313///            ┌────────────┴────────────┐
314/// ┌──────────▼──────────┐   ┌──────────▼──────────┐
315/// │   DataSourceExec    │   │   DataSourceExec    │
316/// │  alias [users as u] │   │  alias [dept as d]  │
317/// │ filters =           │   │  filters =          │
318/// │   [depg@hash(d.id)] │   │    [ d.size > 100]  │
319/// └─────────────────────┘   └─────────────────────┘
320/// ```
321///
322/// You may notice in this case that the filter is *dynamic*: the hash table
323/// is built _after_ the `departments` table is read and at runtime. We
324/// don't have a concrete `InList` filter or similar to push down at
325/// optimization time. These sorts of dynamic filters are handled by
326/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
327/// and internally maintains a reference to the hash table or other state.
328///
329/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`]
330/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
331/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
332/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
333///
334/// # Example: Push TopK filters into Scans
335///
336/// Another form of dynamic filter is pushing down the state of a `TopK`
337/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
338///
339/// ```text
340/// ┌──────────────────────┐
341/// │       TopK           │
342/// │     limit = 10       │
343/// │   order by = [id]    │
344/// └──────────────────────┘
345///            │
346///            ▼
347/// ┌──────────────────────┐
348/// │    DataSourceExec    │
349/// │    projection = *    │
350/// └──────────────────────┘
351/// ```
352///
353/// We can avoid large amounts of data processing by transforming this into:
354///
355/// ```text
356/// ┌──────────────────────┐
357/// │       TopK           │
358/// │     limit = 10       │
359/// │   order by = [id]    │
360/// └──────────────────────┘
361///            │
362///            ▼
363/// ┌──────────────────────┐
364/// │    DataSourceExec    │
365/// │    projection = *    │
366/// │ filters =            │
367/// │    [id < @ TopKHeap] │
368/// └──────────────────────┘
369/// ```
370///
371/// Now as we fill our `TopK` heap we can push down the state of the heap to
372/// the `DataSourceExec` node to avoid reading files / row groups / pages /
373/// rows that could not possibly be in the top 10.
374///
375/// This is not yet implemented in DataFusion. See
376/// <https://github.com/apache/datafusion/issues/15037>
377///
378/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
379/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot
380/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
381/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
382/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
383#[derive(Debug)]
384pub struct FilterPushdown {
385    phase: FilterPushdownPhase,
386    name: String,
387}
388
389impl FilterPushdown {
390    fn new_with_phase(phase: FilterPushdownPhase) -> Self {
391        let name = match phase {
392            FilterPushdownPhase::Pre => "FilterPushdown",
393            FilterPushdownPhase::Post => "FilterPushdown(Post)",
394        }
395        .to_string();
396        Self { phase, name }
397    }
398
399    /// Create a new [`FilterPushdown`] optimizer rule that runs in the pre-optimization phase.
400    /// See [`FilterPushdownPhase`] for more details.
401    pub fn new() -> Self {
402        Self::new_with_phase(FilterPushdownPhase::Pre)
403    }
404
405    /// Create a new [`FilterPushdown`] optimizer rule that runs in the post-optimization phase.
406    /// See [`FilterPushdownPhase`] for more details.
407    pub fn new_post_optimization() -> Self {
408        Self::new_with_phase(FilterPushdownPhase::Post)
409    }
410}
411
412impl Default for FilterPushdown {
413    fn default() -> Self {
414        Self::new()
415    }
416}
417
418impl PhysicalOptimizerRule for FilterPushdown {
419    fn optimize(
420        &self,
421        plan: Arc<dyn ExecutionPlan>,
422        config: &ConfigOptions,
423    ) -> Result<Arc<dyn ExecutionPlan>> {
424        Ok(
425            push_down_filters(&Arc::clone(&plan), vec![], config, self.phase)?
426                .updated_node
427                .unwrap_or(plan),
428        )
429    }
430
431    fn name(&self) -> &str {
432        &self.name
433    }
434
435    fn schema_check(&self) -> bool {
436        true // Filter pushdown does not change the schema of the plan
437    }
438}
439
440fn push_down_filters(
441    node: &Arc<dyn ExecutionPlan>,
442    parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
443    config: &ConfigOptions,
444    phase: FilterPushdownPhase,
445) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
446    let mut parent_filter_pushdown_supports: Vec<Vec<PushedDown>> =
447        vec![vec![]; parent_predicates.len()];
448    let mut self_filters_pushdown_supports = vec![];
449    let mut new_children = Vec::with_capacity(node.children().len());
450
451    let children = node.children();
452
453    // Filter out expressions that are not allowed for pushdown
454    let parent_filtered = FilteredVec::new(&parent_predicates, allow_pushdown_for_expr);
455
456    let filter_description = node.gather_filters_for_pushdown(
457        phase,
458        parent_filtered.items().to_vec(),
459        config,
460    )?;
461
462    let filter_description_parent_filters = filter_description.parent_filters();
463    let filter_description_self_filters = filter_description.self_filters();
464    assert_eq_or_internal_err!(
465        filter_description_parent_filters.len(),
466        children.len(),
467        "Filter pushdown expected parent filters count to match number of children for node {}",
468        node.name()
469    );
470    assert_eq_or_internal_err!(
471        filter_description_self_filters.len(),
472        children.len(),
473        "Filter pushdown expected self filters count to match number of children for node {}",
474        node.name()
475    );
476
477    for (child_idx, (child, parent_filters, self_filters)) in izip!(
478        children,
479        filter_description.parent_filters(),
480        filter_description.self_filters()
481    )
482    .enumerate()
483    {
484        // Here, `parent_filters` are the predicates which are provided by the parent node of
485        // the current node, and tried to be pushed down over the child which the loop points
486        // currently. `self_filters` are the predicates which are provided by the current node,
487        // and tried to be pushed down over the child similarly.
488
489        // Filter out self_filters that contain volatile expressions and track indices
490        let self_filtered = FilteredVec::new(&self_filters, allow_pushdown_for_expr);
491
492        let num_self_filters = self_filtered.len();
493        let mut all_predicates = self_filtered.items().to_vec();
494
495        // Apply second filter pass: collect indices of parent filters that can be pushed down
496        let parent_filters_for_child = parent_filtered
497            .chain_filter_slice(&parent_filters, |filter| {
498                matches!(filter.discriminant, PushedDown::Yes)
499            });
500
501        // Add the filtered parent predicates to all_predicates
502        for filter in parent_filters_for_child.items() {
503            all_predicates.push(Arc::clone(&filter.predicate));
504        }
505
506        let num_parent_filters = all_predicates.len() - num_self_filters;
507
508        // Any filters that could not be pushed down to a child are marked as not-supported to our parents
509        let result =
510            push_down_filters(&Arc::clone(child), all_predicates, config, phase)?;
511
512        if let Some(new_child) = result.updated_node {
513            // If we have a filter pushdown result, we need to update our children
514            new_children.push(new_child);
515        } else {
516            // If we don't have a filter pushdown result, we need to update our children
517            new_children.push(Arc::clone(child));
518        }
519
520        // Our child doesn't know the difference between filters that were passed down
521        // from our parents and filters that the current node injected. We need to de-entangle
522        // this since we do need to distinguish between them.
523        let mut all_filters = result.filters.into_iter().collect_vec();
524        assert_eq_or_internal_err!(
525            all_filters.len(),
526            num_self_filters + num_parent_filters,
527            "Filter pushdown did not return the expected number of filters from {}",
528            child.name()
529        );
530        let parent_filters = all_filters
531            .split_off(num_self_filters)
532            .into_iter()
533            .collect_vec();
534        // Map the results from filtered self filters back to their original positions using FilteredVec
535        let mapped_self_results =
536            self_filtered.map_results_to_original(all_filters, PushedDown::No);
537
538        // Wrap each result with its corresponding expression
539        let self_filter_results: Vec<_> = mapped_self_results
540            .into_iter()
541            .zip(self_filters)
542            .map(|(support, filter)| support.wrap_expression(filter))
543            .collect();
544
545        self_filters_pushdown_supports.push(self_filter_results);
546
547        // Start by marking all parent filters as unsupported for this child
548        for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() {
549            parent_filter_pushdown_support.push(PushedDown::No);
550            assert_eq!(
551                parent_filter_pushdown_support.len(),
552                child_idx + 1,
553                "Parent filter pushdown supports should have the same length as the number of children"
554            );
555        }
556        // Map results from pushed-down filters back to original parent filter indices
557        let mapped_parent_results = parent_filters_for_child
558            .map_results_to_original(parent_filters, PushedDown::No);
559
560        // Update parent_filter_pushdown_supports with the mapped results
561        // mapped_parent_results already has the results at their original indices
562        for (idx, support) in parent_filter_pushdown_supports.iter_mut().enumerate() {
563            support[child_idx] = mapped_parent_results[idx];
564        }
565    }
566
567    // Re-create this node with new children
568    let updated_node = with_new_children_if_necessary(Arc::clone(node), new_children)?;
569
570    // TODO: by calling `handle_child_pushdown_result` we are assuming that the
571    // `ExecutionPlan` implementation will not change the plan itself.
572    // Should we have a separate method for dynamic pushdown that does not allow modifying the plan?
573    let mut res = updated_node.handle_child_pushdown_result(
574        phase,
575        ChildPushdownResult {
576            parent_filters: parent_predicates
577                .into_iter()
578                .enumerate()
579                .map(
580                    |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult {
581                        filter: parent_filter,
582                        child_results: parent_filter_pushdown_supports[parent_filter_idx]
583                            .clone(),
584                    },
585                )
586                .collect(),
587            self_filters: self_filters_pushdown_supports,
588        },
589        config,
590    )?;
591    // Compare pointers for new_node and node, if they are different we must replace
592    // ourselves because of changes in our children.
593    if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, node) {
594        res.updated_node = Some(updated_node)
595    }
596    Ok(res)
597}
598
599/// A helper structure for filtering elements from a vector through multiple passes while
600/// tracking their original indices, allowing results to be mapped back to the original positions.
601struct FilteredVec<T> {
602    items: Vec<T>,
603    // Chain of index mappings: each Vec maps from current level to previous level
604    // index_mappings[0] maps from first filter to original indices
605    // index_mappings[1] maps from second filter to first filter indices, etc.
606    index_mappings: Vec<Vec<usize>>,
607    original_len: usize,
608}
609
610impl<T: Clone> FilteredVec<T> {
611    /// Creates a new FilteredVec by filtering items based on the given predicate
612    fn new<F>(items: &[T], predicate: F) -> Self
613    where
614        F: Fn(&T) -> bool,
615    {
616        let mut filtered_items = Vec::new();
617        let mut original_indices = Vec::new();
618
619        for (idx, item) in items.iter().enumerate() {
620            if predicate(item) {
621                filtered_items.push(item.clone());
622                original_indices.push(idx);
623            }
624        }
625
626        Self {
627            items: filtered_items,
628            index_mappings: vec![original_indices],
629            original_len: items.len(),
630        }
631    }
632
633    /// Returns a reference to the filtered items
634    fn items(&self) -> &[T] {
635        &self.items
636    }
637
638    /// Returns the number of filtered items
639    fn len(&self) -> usize {
640        self.items.len()
641    }
642
643    /// Maps results from the filtered items back to their original positions
644    /// Returns a vector with the same length as the original input, filled with default_value
645    /// and updated with results at their original positions
646    fn map_results_to_original<R: Clone>(
647        &self,
648        results: Vec<R>,
649        default_value: R,
650    ) -> Vec<R> {
651        let mut mapped_results = vec![default_value; self.original_len];
652
653        for (result_idx, result) in results.into_iter().enumerate() {
654            let original_idx = self.trace_to_original_index(result_idx);
655            mapped_results[original_idx] = result;
656        }
657
658        mapped_results
659    }
660
661    /// Traces a filtered index back to its original index through all filter passes
662    fn trace_to_original_index(&self, mut current_idx: usize) -> usize {
663        // Work backwards through the chain of index mappings
664        for mapping in self.index_mappings.iter().rev() {
665            current_idx = mapping[current_idx];
666        }
667        current_idx
668    }
669
670    /// Apply a filter to a new set of items while chaining the index mapping from self (parent)
671    /// This is useful when you have filtered items and then get a transformed slice
672    /// (e.g., from gather_filters_for_pushdown) that you need to filter again
673    fn chain_filter_slice<U: Clone, F>(&self, items: &[U], predicate: F) -> FilteredVec<U>
674    where
675        F: Fn(&U) -> bool,
676    {
677        let mut filtered_items = Vec::new();
678        let mut filtered_indices = Vec::new();
679
680        for (idx, item) in items.iter().enumerate() {
681            if predicate(item) {
682                filtered_items.push(item.clone());
683                filtered_indices.push(idx);
684            }
685        }
686
687        // Chain the index mappings from parent (self)
688        let mut index_mappings = self.index_mappings.clone();
689        index_mappings.push(filtered_indices);
690
691        FilteredVec {
692            items: filtered_items,
693            index_mappings,
694            original_len: self.original_len,
695        }
696    }
697}
698
699fn allow_pushdown_for_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
700    let mut allow_pushdown = true;
701    expr.apply(|e| {
702        allow_pushdown = allow_pushdown && !is_volatile(e);
703        if allow_pushdown {
704            Ok(TreeNodeRecursion::Continue)
705        } else {
706            Ok(TreeNodeRecursion::Stop)
707        }
708    })
709    .expect("Infallible traversal of PhysicalExpr tree failed");
710    allow_pushdown
711}
712
713#[cfg(test)]
714mod tests {
715    use super::*;
716
717    #[test]
718    fn test_filtered_vec_single_pass() {
719        let items = vec![1, 2, 3, 4, 5, 6];
720        let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
721
722        // Check filtered items
723        assert_eq!(filtered.items(), &[2, 4, 6]);
724        assert_eq!(filtered.len(), 3);
725
726        // Check index mapping
727        let results = vec!["a", "b", "c"];
728        let mapped = filtered.map_results_to_original(results, "default");
729        assert_eq!(mapped, vec!["default", "a", "default", "b", "default", "c"]);
730    }
731
732    #[test]
733    fn test_filtered_vec_empty_filter() {
734        let items = vec![1, 3, 5];
735        let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
736
737        assert_eq!(filtered.items(), &[] as &[i32]);
738        assert_eq!(filtered.len(), 0);
739
740        let results: Vec<&str> = vec![];
741        let mapped = filtered.map_results_to_original(results, "default");
742        assert_eq!(mapped, vec!["default", "default", "default"]);
743    }
744
745    #[test]
746    fn test_filtered_vec_all_pass() {
747        let items = vec![2, 4, 6];
748        let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
749
750        assert_eq!(filtered.items(), &[2, 4, 6]);
751        assert_eq!(filtered.len(), 3);
752
753        let results = vec!["a", "b", "c"];
754        let mapped = filtered.map_results_to_original(results, "default");
755        assert_eq!(mapped, vec!["a", "b", "c"]);
756    }
757
758    #[test]
759    fn test_chain_filter_slice_different_types() {
760        // First pass: filter numbers
761        let numbers = vec![1, 2, 3, 4, 5, 6];
762        let first_pass = FilteredVec::new(&numbers, |&x| x > 3);
763        assert_eq!(first_pass.items(), &[4, 5, 6]);
764
765        // Transform to strings (simulating gather_filters_for_pushdown transformation)
766        let strings = vec!["four", "five", "six"];
767
768        // Second pass: filter strings that contain 'i'
769        let second_pass = first_pass.chain_filter_slice(&strings, |s| s.contains('i'));
770        assert_eq!(second_pass.items(), &["five", "six"]);
771
772        // Map results back to original indices
773        let results = vec![100, 200];
774        let mapped = second_pass.map_results_to_original(results, 0);
775        // "five" was at index 4 (1-based: 5), "six" was at index 5 (1-based: 6)
776        assert_eq!(mapped, vec![0, 0, 0, 0, 100, 200]);
777    }
778
779    #[test]
780    fn test_chain_filter_slice_complex_scenario() {
781        // Simulating the filter pushdown scenario
782        // Parent predicates: [A, B, C, D, E]
783        let parent_predicates = vec!["A", "B", "C", "D", "E"];
784
785        // First pass: filter out some predicates (simulating allow_pushdown_for_expr)
786        let first_pass = FilteredVec::new(&parent_predicates, |s| *s != "B" && *s != "D");
787        assert_eq!(first_pass.items(), &["A", "C", "E"]);
788
789        // After gather_filters_for_pushdown, we get transformed results for a specific child
790        // Let's say child gets [A_transformed, C_transformed, E_transformed]
791        // but only C and E can be pushed down
792        #[derive(Clone, Debug, PartialEq)]
793        struct TransformedPredicate {
794            name: String,
795            can_push: bool,
796        }
797
798        let child_predicates = vec![
799            TransformedPredicate {
800                name: "A_transformed".to_string(),
801                can_push: false,
802            },
803            TransformedPredicate {
804                name: "C_transformed".to_string(),
805                can_push: true,
806            },
807            TransformedPredicate {
808                name: "E_transformed".to_string(),
809                can_push: true,
810            },
811        ];
812
813        // Second pass: filter based on can_push
814        let second_pass =
815            first_pass.chain_filter_slice(&child_predicates, |p| p.can_push);
816        assert_eq!(second_pass.len(), 2);
817        assert_eq!(second_pass.items()[0].name, "C_transformed");
818        assert_eq!(second_pass.items()[1].name, "E_transformed");
819
820        // Simulate getting results back from child
821        let child_results = vec!["C_result", "E_result"];
822        let mapped = second_pass.map_results_to_original(child_results, "no_result");
823
824        // Results should be at original positions: C was at index 2, E was at index 4
825        assert_eq!(
826            mapped,
827            vec![
828                "no_result",
829                "no_result",
830                "C_result",
831                "no_result",
832                "E_result"
833            ]
834        );
835    }
836
837    #[test]
838    fn test_trace_to_original_index() {
839        let items = vec![10, 20, 30, 40, 50];
840        let filtered = FilteredVec::new(&items, |&x| x != 20 && x != 40);
841
842        // filtered items are [10, 30, 50] at original indices [0, 2, 4]
843        assert_eq!(filtered.trace_to_original_index(0), 0); // 10 was at index 0
844        assert_eq!(filtered.trace_to_original_index(1), 2); // 30 was at index 2
845        assert_eq!(filtered.trace_to_original_index(2), 4); // 50 was at index 4
846    }
847
848    #[test]
849    fn test_chain_filter_preserves_original_len() {
850        let items = vec![1, 2, 3, 4, 5];
851        let first = FilteredVec::new(&items, |&x| x > 2);
852
853        let strings = vec!["three", "four", "five"];
854        let second = first.chain_filter_slice(&strings, |s| s.len() == 4);
855
856        // Original length should still be 5
857        let results = vec!["x", "y"];
858        let mapped = second.map_results_to_original(results, "-");
859        assert_eq!(mapped.len(), 5);
860    }
861}