datafusion_physical_optimizer/filter_pushdown.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child,
25//! passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//! containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription
33
34use std::sync::Arc;
35
36use crate::PhysicalOptimizerRule;
37
38use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
39use datafusion_common::{Result, assert_eq_or_internal_err, config::ConfigOptions};
40use datafusion_physical_expr::PhysicalExpr;
41use datafusion_physical_expr_common::physical_expr::is_volatile;
42use datafusion_physical_plan::filter_pushdown::{
43 ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase,
44 FilterPushdownPropagation, PushedDown,
45};
46use datafusion_physical_plan::{ExecutionPlan, with_new_children_if_necessary};
47
48use itertools::{Itertools, izip};
49
50/// Attempts to recursively push given filters from the top of the tree into leaves.
51///
52/// # Default Implementation
53///
54/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`]
55/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that:
56///
57/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`])
58/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]).
59/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]).
60///
61/// # Example: Push filter into a `DataSourceExec`
62///
63/// For example, consider the following plan:
64///
65/// ```text
66/// ┌──────────────────────┐
67/// │ CoalesceBatchesExec │
68/// └──────────────────────┘
69/// │
70/// ▼
71/// ┌──────────────────────┐
72/// │ FilterExec │
73/// │ filters = [ id=1] │
74/// └──────────────────────┘
75/// │
76/// ▼
77/// ┌──────────────────────┐
78/// │ DataSourceExec │
79/// │ projection = * │
80/// └──────────────────────┘
81/// ```
82///
83/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node.
84///
85/// If this filter is selective pushing it into the scan can avoid massive
86/// amounts of data being read from the source (the projection is `*` so all
87/// matching columns are read).
88///
89/// The new plan looks like:
90///
91/// ```text
92/// ┌──────────────────────┐
93/// │ CoalesceBatchesExec │
94/// └──────────────────────┘
95/// │
96/// ▼
97/// ┌──────────────────────┐
98/// │ DataSourceExec │
99/// │ projection = * │
100/// │ filters = [ id=1] │
101/// └──────────────────────┘
102/// ```
103///
104/// # Example: Push filters with `ProjectionExec`
105///
106/// Let's consider a more complex example involving a [`ProjectionExec`]
107/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
108/// creates a new column that the filter depends on.
109///
110/// ```text
111/// ┌──────────────────────┐
112/// │ CoalesceBatchesExec │
113/// └──────────────────────┘
114/// │
115/// ▼
116/// ┌──────────────────────┐
117/// │ FilterExec │
118/// │ filters = │
119/// │ [cost>50,id=1] │
120/// └──────────────────────┘
121/// │
122/// ▼
123/// ┌──────────────────────┐
124/// │ ProjectionExec │
125/// │ cost = price * 1.2 │
126/// └──────────────────────┘
127/// │
128/// ▼
129/// ┌──────────────────────┐
130/// │ DataSourceExec │
131/// │ projection = * │
132/// └──────────────────────┘
133/// ```
134///
135/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
136/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
137/// node to be executed first. A simple thing to do would be to split up the
138/// filter into two separate filters and push down the first one:
139///
140/// ```text
141/// ┌──────────────────────┐
142/// │ CoalesceBatchesExec │
143/// └──────────────────────┘
144/// │
145/// ▼
146/// ┌──────────────────────┐
147/// │ FilterExec │
148/// │ filters = │
149/// │ [cost>50] │
150/// └──────────────────────┘
151/// │
152/// ▼
153/// ┌──────────────────────┐
154/// │ ProjectionExec │
155/// │ cost = price * 1.2 │
156/// └──────────────────────┘
157/// │
158/// ▼
159/// ┌──────────────────────┐
160/// │ DataSourceExec │
161/// │ projection = * │
162/// │ filters = [ id=1] │
163/// └──────────────────────┘
164/// ```
165///
166/// We can actually however do better by pushing down `price * 1.2 > 50`
167/// instead of `cost > 50`:
168///
169/// ```text
170/// ┌──────────────────────┐
171/// │ CoalesceBatchesExec │
172/// └──────────────────────┘
173/// │
174/// ▼
175/// ┌──────────────────────┐
176/// │ ProjectionExec │
177/// │ cost = price * 1.2 │
178/// └──────────────────────┘
179/// │
180/// ▼
181/// ┌──────────────────────┐
182/// │ DataSourceExec │
183/// │ projection = * │
184/// │ filters = [id=1, │
185/// │ price * 1.2 > 50] │
186/// └──────────────────────┘
187/// ```
188///
189/// # Example: Push filters within a subtree
190///
191/// There are also cases where we may be able to push down filters within a
192/// subtree but not the entire tree. A good example of this is aggregation
193/// nodes:
194///
195/// ```text
196/// ┌──────────────────────┐
197/// │ ProjectionExec │
198/// │ projection = * │
199/// └──────────────────────┘
200/// │
201/// ▼
202/// ┌──────────────────────┐
203/// │ FilterExec │
204/// │ filters = [sum > 10] │
205/// └──────────────────────┘
206/// │
207/// ▼
208/// ┌───────────────────────┐
209/// │ AggregateExec │
210/// │ group by = [id] │
211/// │ aggregate = │
212/// │ [sum(price)] │
213/// └───────────────────────┘
214/// │
215/// ▼
216/// ┌──────────────────────┐
217/// │ FilterExec │
218/// │ filters = [id=1] │
219/// └──────────────────────┘
220/// │
221/// ▼
222/// ┌──────────────────────┐
223/// │ DataSourceExec │
224/// │ projection = * │
225/// └──────────────────────┘
226/// ```
227///
228/// The transformation here is to push down the `id=1` filter to the
229/// `DataSourceExec` node:
230///
231/// ```text
232/// ┌──────────────────────┐
233/// │ ProjectionExec │
234/// │ projection = * │
235/// └──────────────────────┘
236/// │
237/// ▼
238/// ┌──────────────────────┐
239/// │ FilterExec │
240/// │ filters = [sum > 10] │
241/// └──────────────────────┘
242/// │
243/// ▼
244/// ┌───────────────────────┐
245/// │ AggregateExec │
246/// │ group by = [id] │
247/// │ aggregate = │
248/// │ [sum(price)] │
249/// └───────────────────────┘
250/// │
251/// ▼
252/// ┌──────────────────────┐
253/// │ DataSourceExec │
254/// │ projection = * │
255/// │ filters = [id=1] │
256/// └──────────────────────┘
257/// ```
258///
259/// The point here is that:
260/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node.
261/// Any filters above the [`AggregateExec`] node are not pushed down.
262/// This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node.
263/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push
264/// down the `id=1` filter.
265///
266/// # Example: Push filters through Joins
267///
268/// It is also possible to push down filters through joins and filters that
269/// originate from joins. For example, a hash join where we build a hash
270/// table of the left side and probe the right side (ignoring why we would
271/// choose this order, typically it depends on the size of each table,
272/// etc.).
273///
274/// ```text
275/// ┌─────────────────────┐
276/// │ FilterExec │
277/// │ filters = │
278/// │ [d.size > 100] │
279/// └─────────────────────┘
280/// │
281/// │
282/// ┌──────────▼──────────┐
283/// │ │
284/// │ HashJoinExec │
285/// │ [u.dept@hash(d.id)] │
286/// │ │
287/// └─────────────────────┘
288/// │
289/// ┌────────────┴────────────┐
290/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
291/// │ DataSourceExec │ │ DataSourceExec │
292/// │ alias [users as u] │ │ alias [dept as d] │
293/// │ │ │ │
294/// └─────────────────────┘ └─────────────────────┘
295/// ```
296///
297/// There are two pushdowns we can do here:
298/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec`
299/// node for the `departments` table.
300/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading
301/// rows from the `users` table that will be eliminated by the join.
302/// This can be done via a bloom filter or similar and is not (yet) supported
303/// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
304///
305/// ```text
306/// ┌─────────────────────┐
307/// │ │
308/// │ HashJoinExec │
309/// │ [u.dept@hash(d.id)] │
310/// │ │
311/// └─────────────────────┘
312/// │
313/// ┌────────────┴────────────┐
314/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
315/// │ DataSourceExec │ │ DataSourceExec │
316/// │ alias [users as u] │ │ alias [dept as d] │
317/// │ filters = │ │ filters = │
318/// │ [depg@hash(d.id)] │ │ [ d.size > 100] │
319/// └─────────────────────┘ └─────────────────────┘
320/// ```
321///
322/// You may notice in this case that the filter is *dynamic*: the hash table
323/// is built _after_ the `departments` table is read and at runtime. We
324/// don't have a concrete `InList` filter or similar to push down at
325/// optimization time. These sorts of dynamic filters are handled by
326/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
327/// and internally maintains a reference to the hash table or other state.
328///
329/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`]
330/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
331/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
332/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
333///
334/// # Example: Push TopK filters into Scans
335///
336/// Another form of dynamic filter is pushing down the state of a `TopK`
337/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
338///
339/// ```text
340/// ┌──────────────────────┐
341/// │ TopK │
342/// │ limit = 10 │
343/// │ order by = [id] │
344/// └──────────────────────┘
345/// │
346/// ▼
347/// ┌──────────────────────┐
348/// │ DataSourceExec │
349/// │ projection = * │
350/// └──────────────────────┘
351/// ```
352///
353/// We can avoid large amounts of data processing by transforming this into:
354///
355/// ```text
356/// ┌──────────────────────┐
357/// │ TopK │
358/// │ limit = 10 │
359/// │ order by = [id] │
360/// └──────────────────────┘
361/// │
362/// ▼
363/// ┌──────────────────────┐
364/// │ DataSourceExec │
365/// │ projection = * │
366/// │ filters = │
367/// │ [id < @ TopKHeap] │
368/// └──────────────────────┘
369/// ```
370///
371/// Now as we fill our `TopK` heap we can push down the state of the heap to
372/// the `DataSourceExec` node to avoid reading files / row groups / pages /
373/// rows that could not possibly be in the top 10.
374///
375/// This is not yet implemented in DataFusion. See
376/// <https://github.com/apache/datafusion/issues/15037>
377///
378/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
379/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot
380/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
381/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
382/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
383#[derive(Debug)]
384pub struct FilterPushdown {
385 phase: FilterPushdownPhase,
386 name: String,
387}
388
389impl FilterPushdown {
390 fn new_with_phase(phase: FilterPushdownPhase) -> Self {
391 let name = match phase {
392 FilterPushdownPhase::Pre => "FilterPushdown",
393 FilterPushdownPhase::Post => "FilterPushdown(Post)",
394 }
395 .to_string();
396 Self { phase, name }
397 }
398
399 /// Create a new [`FilterPushdown`] optimizer rule that runs in the pre-optimization phase.
400 /// See [`FilterPushdownPhase`] for more details.
401 pub fn new() -> Self {
402 Self::new_with_phase(FilterPushdownPhase::Pre)
403 }
404
405 /// Create a new [`FilterPushdown`] optimizer rule that runs in the post-optimization phase.
406 /// See [`FilterPushdownPhase`] for more details.
407 pub fn new_post_optimization() -> Self {
408 Self::new_with_phase(FilterPushdownPhase::Post)
409 }
410}
411
412impl Default for FilterPushdown {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418impl PhysicalOptimizerRule for FilterPushdown {
419 fn optimize(
420 &self,
421 plan: Arc<dyn ExecutionPlan>,
422 config: &ConfigOptions,
423 ) -> Result<Arc<dyn ExecutionPlan>> {
424 Ok(
425 push_down_filters(&Arc::clone(&plan), vec![], config, self.phase)?
426 .updated_node
427 .unwrap_or(plan),
428 )
429 }
430
431 fn name(&self) -> &str {
432 &self.name
433 }
434
435 fn schema_check(&self) -> bool {
436 true // Filter pushdown does not change the schema of the plan
437 }
438}
439
440fn push_down_filters(
441 node: &Arc<dyn ExecutionPlan>,
442 parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
443 config: &ConfigOptions,
444 phase: FilterPushdownPhase,
445) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
446 let mut parent_filter_pushdown_supports: Vec<Vec<PushedDown>> =
447 vec![vec![]; parent_predicates.len()];
448 let mut self_filters_pushdown_supports = vec![];
449 let mut new_children = Vec::with_capacity(node.children().len());
450
451 let children = node.children();
452
453 // Filter out expressions that are not allowed for pushdown
454 let parent_filtered = FilteredVec::new(&parent_predicates, allow_pushdown_for_expr);
455
456 let filter_description = node.gather_filters_for_pushdown(
457 phase,
458 parent_filtered.items().to_vec(),
459 config,
460 )?;
461
462 let filter_description_parent_filters = filter_description.parent_filters();
463 let filter_description_self_filters = filter_description.self_filters();
464 assert_eq_or_internal_err!(
465 filter_description_parent_filters.len(),
466 children.len(),
467 "Filter pushdown expected parent filters count to match number of children for node {}",
468 node.name()
469 );
470 assert_eq_or_internal_err!(
471 filter_description_self_filters.len(),
472 children.len(),
473 "Filter pushdown expected self filters count to match number of children for node {}",
474 node.name()
475 );
476
477 for (child_idx, (child, parent_filters, self_filters)) in izip!(
478 children,
479 filter_description.parent_filters(),
480 filter_description.self_filters()
481 )
482 .enumerate()
483 {
484 // Here, `parent_filters` are the predicates which are provided by the parent node of
485 // the current node, and tried to be pushed down over the child which the loop points
486 // currently. `self_filters` are the predicates which are provided by the current node,
487 // and tried to be pushed down over the child similarly.
488
489 // Filter out self_filters that contain volatile expressions and track indices
490 let self_filtered = FilteredVec::new(&self_filters, allow_pushdown_for_expr);
491
492 let num_self_filters = self_filtered.len();
493 let mut all_predicates = self_filtered.items().to_vec();
494
495 // Apply second filter pass: collect indices of parent filters that can be pushed down
496 let parent_filters_for_child = parent_filtered
497 .chain_filter_slice(&parent_filters, |filter| {
498 matches!(filter.discriminant, PushedDown::Yes)
499 });
500
501 // Add the filtered parent predicates to all_predicates
502 for filter in parent_filters_for_child.items() {
503 all_predicates.push(Arc::clone(&filter.predicate));
504 }
505
506 let num_parent_filters = all_predicates.len() - num_self_filters;
507
508 // Any filters that could not be pushed down to a child are marked as not-supported to our parents
509 let result =
510 push_down_filters(&Arc::clone(child), all_predicates, config, phase)?;
511
512 if let Some(new_child) = result.updated_node {
513 // If we have a filter pushdown result, we need to update our children
514 new_children.push(new_child);
515 } else {
516 // If we don't have a filter pushdown result, we need to update our children
517 new_children.push(Arc::clone(child));
518 }
519
520 // Our child doesn't know the difference between filters that were passed down
521 // from our parents and filters that the current node injected. We need to de-entangle
522 // this since we do need to distinguish between them.
523 let mut all_filters = result.filters.into_iter().collect_vec();
524 assert_eq_or_internal_err!(
525 all_filters.len(),
526 num_self_filters + num_parent_filters,
527 "Filter pushdown did not return the expected number of filters from {}",
528 child.name()
529 );
530 let parent_filters = all_filters
531 .split_off(num_self_filters)
532 .into_iter()
533 .collect_vec();
534 // Map the results from filtered self filters back to their original positions using FilteredVec
535 let mapped_self_results =
536 self_filtered.map_results_to_original(all_filters, PushedDown::No);
537
538 // Wrap each result with its corresponding expression
539 let self_filter_results: Vec<_> = mapped_self_results
540 .into_iter()
541 .zip(self_filters)
542 .map(|(support, filter)| support.wrap_expression(filter))
543 .collect();
544
545 self_filters_pushdown_supports.push(self_filter_results);
546
547 // Start by marking all parent filters as unsupported for this child
548 for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() {
549 parent_filter_pushdown_support.push(PushedDown::No);
550 assert_eq!(
551 parent_filter_pushdown_support.len(),
552 child_idx + 1,
553 "Parent filter pushdown supports should have the same length as the number of children"
554 );
555 }
556 // Map results from pushed-down filters back to original parent filter indices
557 let mapped_parent_results = parent_filters_for_child
558 .map_results_to_original(parent_filters, PushedDown::No);
559
560 // Update parent_filter_pushdown_supports with the mapped results
561 // mapped_parent_results already has the results at their original indices
562 for (idx, support) in parent_filter_pushdown_supports.iter_mut().enumerate() {
563 support[child_idx] = mapped_parent_results[idx];
564 }
565 }
566
567 // Re-create this node with new children
568 let updated_node = with_new_children_if_necessary(Arc::clone(node), new_children)?;
569
570 // TODO: by calling `handle_child_pushdown_result` we are assuming that the
571 // `ExecutionPlan` implementation will not change the plan itself.
572 // Should we have a separate method for dynamic pushdown that does not allow modifying the plan?
573 let mut res = updated_node.handle_child_pushdown_result(
574 phase,
575 ChildPushdownResult {
576 parent_filters: parent_predicates
577 .into_iter()
578 .enumerate()
579 .map(
580 |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult {
581 filter: parent_filter,
582 child_results: parent_filter_pushdown_supports[parent_filter_idx]
583 .clone(),
584 },
585 )
586 .collect(),
587 self_filters: self_filters_pushdown_supports,
588 },
589 config,
590 )?;
591 // Compare pointers for new_node and node, if they are different we must replace
592 // ourselves because of changes in our children.
593 if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, node) {
594 res.updated_node = Some(updated_node)
595 }
596 Ok(res)
597}
598
599/// A helper structure for filtering elements from a vector through multiple passes while
600/// tracking their original indices, allowing results to be mapped back to the original positions.
601struct FilteredVec<T> {
602 items: Vec<T>,
603 // Chain of index mappings: each Vec maps from current level to previous level
604 // index_mappings[0] maps from first filter to original indices
605 // index_mappings[1] maps from second filter to first filter indices, etc.
606 index_mappings: Vec<Vec<usize>>,
607 original_len: usize,
608}
609
610impl<T: Clone> FilteredVec<T> {
611 /// Creates a new FilteredVec by filtering items based on the given predicate
612 fn new<F>(items: &[T], predicate: F) -> Self
613 where
614 F: Fn(&T) -> bool,
615 {
616 let mut filtered_items = Vec::new();
617 let mut original_indices = Vec::new();
618
619 for (idx, item) in items.iter().enumerate() {
620 if predicate(item) {
621 filtered_items.push(item.clone());
622 original_indices.push(idx);
623 }
624 }
625
626 Self {
627 items: filtered_items,
628 index_mappings: vec![original_indices],
629 original_len: items.len(),
630 }
631 }
632
633 /// Returns a reference to the filtered items
634 fn items(&self) -> &[T] {
635 &self.items
636 }
637
638 /// Returns the number of filtered items
639 fn len(&self) -> usize {
640 self.items.len()
641 }
642
643 /// Maps results from the filtered items back to their original positions
644 /// Returns a vector with the same length as the original input, filled with default_value
645 /// and updated with results at their original positions
646 fn map_results_to_original<R: Clone>(
647 &self,
648 results: Vec<R>,
649 default_value: R,
650 ) -> Vec<R> {
651 let mut mapped_results = vec![default_value; self.original_len];
652
653 for (result_idx, result) in results.into_iter().enumerate() {
654 let original_idx = self.trace_to_original_index(result_idx);
655 mapped_results[original_idx] = result;
656 }
657
658 mapped_results
659 }
660
661 /// Traces a filtered index back to its original index through all filter passes
662 fn trace_to_original_index(&self, mut current_idx: usize) -> usize {
663 // Work backwards through the chain of index mappings
664 for mapping in self.index_mappings.iter().rev() {
665 current_idx = mapping[current_idx];
666 }
667 current_idx
668 }
669
670 /// Apply a filter to a new set of items while chaining the index mapping from self (parent)
671 /// This is useful when you have filtered items and then get a transformed slice
672 /// (e.g., from gather_filters_for_pushdown) that you need to filter again
673 fn chain_filter_slice<U: Clone, F>(&self, items: &[U], predicate: F) -> FilteredVec<U>
674 where
675 F: Fn(&U) -> bool,
676 {
677 let mut filtered_items = Vec::new();
678 let mut filtered_indices = Vec::new();
679
680 for (idx, item) in items.iter().enumerate() {
681 if predicate(item) {
682 filtered_items.push(item.clone());
683 filtered_indices.push(idx);
684 }
685 }
686
687 // Chain the index mappings from parent (self)
688 let mut index_mappings = self.index_mappings.clone();
689 index_mappings.push(filtered_indices);
690
691 FilteredVec {
692 items: filtered_items,
693 index_mappings,
694 original_len: self.original_len,
695 }
696 }
697}
698
699fn allow_pushdown_for_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
700 let mut allow_pushdown = true;
701 expr.apply(|e| {
702 allow_pushdown = allow_pushdown && !is_volatile(e);
703 if allow_pushdown {
704 Ok(TreeNodeRecursion::Continue)
705 } else {
706 Ok(TreeNodeRecursion::Stop)
707 }
708 })
709 .expect("Infallible traversal of PhysicalExpr tree failed");
710 allow_pushdown
711}
712
713#[cfg(test)]
714mod tests {
715 use super::*;
716
717 #[test]
718 fn test_filtered_vec_single_pass() {
719 let items = vec![1, 2, 3, 4, 5, 6];
720 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
721
722 // Check filtered items
723 assert_eq!(filtered.items(), &[2, 4, 6]);
724 assert_eq!(filtered.len(), 3);
725
726 // Check index mapping
727 let results = vec!["a", "b", "c"];
728 let mapped = filtered.map_results_to_original(results, "default");
729 assert_eq!(mapped, vec!["default", "a", "default", "b", "default", "c"]);
730 }
731
732 #[test]
733 fn test_filtered_vec_empty_filter() {
734 let items = vec![1, 3, 5];
735 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
736
737 assert_eq!(filtered.items(), &[] as &[i32]);
738 assert_eq!(filtered.len(), 0);
739
740 let results: Vec<&str> = vec![];
741 let mapped = filtered.map_results_to_original(results, "default");
742 assert_eq!(mapped, vec!["default", "default", "default"]);
743 }
744
745 #[test]
746 fn test_filtered_vec_all_pass() {
747 let items = vec![2, 4, 6];
748 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
749
750 assert_eq!(filtered.items(), &[2, 4, 6]);
751 assert_eq!(filtered.len(), 3);
752
753 let results = vec!["a", "b", "c"];
754 let mapped = filtered.map_results_to_original(results, "default");
755 assert_eq!(mapped, vec!["a", "b", "c"]);
756 }
757
758 #[test]
759 fn test_chain_filter_slice_different_types() {
760 // First pass: filter numbers
761 let numbers = vec![1, 2, 3, 4, 5, 6];
762 let first_pass = FilteredVec::new(&numbers, |&x| x > 3);
763 assert_eq!(first_pass.items(), &[4, 5, 6]);
764
765 // Transform to strings (simulating gather_filters_for_pushdown transformation)
766 let strings = vec!["four", "five", "six"];
767
768 // Second pass: filter strings that contain 'i'
769 let second_pass = first_pass.chain_filter_slice(&strings, |s| s.contains('i'));
770 assert_eq!(second_pass.items(), &["five", "six"]);
771
772 // Map results back to original indices
773 let results = vec![100, 200];
774 let mapped = second_pass.map_results_to_original(results, 0);
775 // "five" was at index 4 (1-based: 5), "six" was at index 5 (1-based: 6)
776 assert_eq!(mapped, vec![0, 0, 0, 0, 100, 200]);
777 }
778
779 #[test]
780 fn test_chain_filter_slice_complex_scenario() {
781 // Simulating the filter pushdown scenario
782 // Parent predicates: [A, B, C, D, E]
783 let parent_predicates = vec!["A", "B", "C", "D", "E"];
784
785 // First pass: filter out some predicates (simulating allow_pushdown_for_expr)
786 let first_pass = FilteredVec::new(&parent_predicates, |s| *s != "B" && *s != "D");
787 assert_eq!(first_pass.items(), &["A", "C", "E"]);
788
789 // After gather_filters_for_pushdown, we get transformed results for a specific child
790 // Let's say child gets [A_transformed, C_transformed, E_transformed]
791 // but only C and E can be pushed down
792 #[derive(Clone, Debug, PartialEq)]
793 struct TransformedPredicate {
794 name: String,
795 can_push: bool,
796 }
797
798 let child_predicates = vec![
799 TransformedPredicate {
800 name: "A_transformed".to_string(),
801 can_push: false,
802 },
803 TransformedPredicate {
804 name: "C_transformed".to_string(),
805 can_push: true,
806 },
807 TransformedPredicate {
808 name: "E_transformed".to_string(),
809 can_push: true,
810 },
811 ];
812
813 // Second pass: filter based on can_push
814 let second_pass =
815 first_pass.chain_filter_slice(&child_predicates, |p| p.can_push);
816 assert_eq!(second_pass.len(), 2);
817 assert_eq!(second_pass.items()[0].name, "C_transformed");
818 assert_eq!(second_pass.items()[1].name, "E_transformed");
819
820 // Simulate getting results back from child
821 let child_results = vec!["C_result", "E_result"];
822 let mapped = second_pass.map_results_to_original(child_results, "no_result");
823
824 // Results should be at original positions: C was at index 2, E was at index 4
825 assert_eq!(
826 mapped,
827 vec![
828 "no_result",
829 "no_result",
830 "C_result",
831 "no_result",
832 "E_result"
833 ]
834 );
835 }
836
837 #[test]
838 fn test_trace_to_original_index() {
839 let items = vec![10, 20, 30, 40, 50];
840 let filtered = FilteredVec::new(&items, |&x| x != 20 && x != 40);
841
842 // filtered items are [10, 30, 50] at original indices [0, 2, 4]
843 assert_eq!(filtered.trace_to_original_index(0), 0); // 10 was at index 0
844 assert_eq!(filtered.trace_to_original_index(1), 2); // 30 was at index 2
845 assert_eq!(filtered.trace_to_original_index(2), 4); // 50 was at index 4
846 }
847
848 #[test]
849 fn test_chain_filter_preserves_original_len() {
850 let items = vec![1, 2, 3, 4, 5];
851 let first = FilteredVec::new(&items, |&x| x > 2);
852
853 let strings = vec!["three", "four", "five"];
854 let second = first.chain_filter_slice(&strings, |s| s.len() == 4);
855
856 // Original length should still be 5
857 let results = vec!["x", "y"];
858 let mapped = second.map_results_to_original(results, "-");
859 assert_eq!(mapped.len(), 5);
860 }
861}