datafusion_physical_optimizer/filter_pushdown.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child,
25//! passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//! containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription
33
34use std::sync::Arc;
35
36use crate::PhysicalOptimizerRule;
37
38use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
39use datafusion_common::{config::ConfigOptions, Result};
40use datafusion_physical_expr::PhysicalExpr;
41use datafusion_physical_expr_common::physical_expr::is_volatile;
42use datafusion_physical_plan::filter_pushdown::{
43 ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase,
44 FilterPushdownPropagation, PushedDown,
45};
46use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan};
47
48use itertools::{izip, Itertools};
49
50/// Attempts to recursively push given filters from the top of the tree into leaves.
51///
52/// # Default Implementation
53///
54/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`]
55/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that:
56///
57/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`])
58/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]).
59/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]).
60///
61/// # Example: Push filter into a `DataSourceExec`
62///
63/// For example, consider the following plan:
64///
65/// ```text
66/// ┌──────────────────────┐
67/// │ CoalesceBatchesExec │
68/// └──────────────────────┘
69/// │
70/// ▼
71/// ┌──────────────────────┐
72/// │ FilterExec │
73/// │ filters = [ id=1] │
74/// └──────────────────────┘
75/// │
76/// ▼
77/// ┌──────────────────────┐
78/// │ DataSourceExec │
79/// │ projection = * │
80/// └──────────────────────┘
81/// ```
82///
83/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node.
84///
85/// If this filter is selective pushing it into the scan can avoid massive
86/// amounts of data being read from the source (the projection is `*` so all
87/// matching columns are read).
88///
89/// The new plan looks like:
90///
91/// ```text
92/// ┌──────────────────────┐
93/// │ CoalesceBatchesExec │
94/// └──────────────────────┘
95/// │
96/// ▼
97/// ┌──────────────────────┐
98/// │ DataSourceExec │
99/// │ projection = * │
100/// │ filters = [ id=1] │
101/// └──────────────────────┘
102/// ```
103///
104/// # Example: Push filters with `ProjectionExec`
105///
106/// Let's consider a more complex example involving a [`ProjectionExec`]
107/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
108/// creates a new column that the filter depends on.
109///
110/// ```text
111/// ┌──────────────────────┐
112/// │ CoalesceBatchesExec │
113/// └──────────────────────┘
114/// │
115/// ▼
116/// ┌──────────────────────┐
117/// │ FilterExec │
118/// │ filters = │
119/// │ [cost>50,id=1] │
120/// └──────────────────────┘
121/// │
122/// ▼
123/// ┌──────────────────────┐
124/// │ ProjectionExec │
125/// │ cost = price * 1.2 │
126/// └──────────────────────┘
127/// │
128/// ▼
129/// ┌──────────────────────┐
130/// │ DataSourceExec │
131/// │ projection = * │
132/// └──────────────────────┘
133/// ```
134///
135/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
136/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
137/// node to be executed first. A simple thing to do would be to split up the
138/// filter into two separate filters and push down the first one:
139///
140/// ```text
141/// ┌──────────────────────┐
142/// │ CoalesceBatchesExec │
143/// └──────────────────────┘
144/// │
145/// ▼
146/// ┌──────────────────────┐
147/// │ FilterExec │
148/// │ filters = │
149/// │ [cost>50] │
150/// └──────────────────────┘
151/// │
152/// ▼
153/// ┌──────────────────────┐
154/// │ ProjectionExec │
155/// │ cost = price * 1.2 │
156/// └──────────────────────┘
157/// │
158/// ▼
159/// ┌──────────────────────┐
160/// │ DataSourceExec │
161/// │ projection = * │
162/// │ filters = [ id=1] │
163/// └──────────────────────┘
164/// ```
165///
166/// We can actually however do better by pushing down `price * 1.2 > 50`
167/// instead of `cost > 50`:
168///
169/// ```text
170/// ┌──────────────────────┐
171/// │ CoalesceBatchesExec │
172/// └──────────────────────┘
173/// │
174/// ▼
175/// ┌──────────────────────┐
176/// │ ProjectionExec │
177/// │ cost = price * 1.2 │
178/// └──────────────────────┘
179/// │
180/// ▼
181/// ┌──────────────────────┐
182/// │ DataSourceExec │
183/// │ projection = * │
184/// │ filters = [id=1, │
185/// │ price * 1.2 > 50] │
186/// └──────────────────────┘
187/// ```
188///
189/// # Example: Push filters within a subtree
190///
191/// There are also cases where we may be able to push down filters within a
192/// subtree but not the entire tree. A good example of this is aggregation
193/// nodes:
194///
195/// ```text
196/// ┌──────────────────────┐
197/// │ ProjectionExec │
198/// │ projection = * │
199/// └──────────────────────┘
200/// │
201/// ▼
202/// ┌──────────────────────┐
203/// │ FilterExec │
204/// │ filters = [sum > 10] │
205/// └──────────────────────┘
206/// │
207/// ▼
208/// ┌───────────────────────┐
209/// │ AggregateExec │
210/// │ group by = [id] │
211/// │ aggregate = │
212/// │ [sum(price)] │
213/// └───────────────────────┘
214/// │
215/// ▼
216/// ┌──────────────────────┐
217/// │ FilterExec │
218/// │ filters = [id=1] │
219/// └──────────────────────┘
220/// │
221/// ▼
222/// ┌──────────────────────┐
223/// │ DataSourceExec │
224/// │ projection = * │
225/// └──────────────────────┘
226/// ```
227///
228/// The transformation here is to push down the `id=1` filter to the
229/// `DataSourceExec` node:
230///
231/// ```text
232/// ┌──────────────────────┐
233/// │ ProjectionExec │
234/// │ projection = * │
235/// └──────────────────────┘
236/// │
237/// ▼
238/// ┌──────────────────────┐
239/// │ FilterExec │
240/// │ filters = [sum > 10] │
241/// └──────────────────────┘
242/// │
243/// ▼
244/// ┌───────────────────────┐
245/// │ AggregateExec │
246/// │ group by = [id] │
247/// │ aggregate = │
248/// │ [sum(price)] │
249/// └───────────────────────┘
250/// │
251/// ▼
252/// ┌──────────────────────┐
253/// │ DataSourceExec │
254/// │ projection = * │
255/// │ filters = [id=1] │
256/// └──────────────────────┘
257/// ```
258///
259/// The point here is that:
260/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node.
261/// Any filters above the [`AggregateExec`] node are not pushed down.
262/// This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node.
263/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push
264/// down the `id=1` filter.
265///
266/// # Example: Push filters through Joins
267///
268/// It is also possible to push down filters through joins and filters that
269/// originate from joins. For example, a hash join where we build a hash
270/// table of the left side and probe the right side (ignoring why we would
271/// choose this order, typically it depends on the size of each table,
272/// etc.).
273///
274/// ```text
275/// ┌─────────────────────┐
276/// │ FilterExec │
277/// │ filters = │
278/// │ [d.size > 100] │
279/// └─────────────────────┘
280/// │
281/// │
282/// ┌──────────▼──────────┐
283/// │ │
284/// │ HashJoinExec │
285/// │ [u.dept@hash(d.id)] │
286/// │ │
287/// └─────────────────────┘
288/// │
289/// ┌────────────┴────────────┐
290/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
291/// │ DataSourceExec │ │ DataSourceExec │
292/// │ alias [users as u] │ │ alias [dept as d] │
293/// │ │ │ │
294/// └─────────────────────┘ └─────────────────────┘
295/// ```
296///
297/// There are two pushdowns we can do here:
298/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec`
299/// node for the `departments` table.
300/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading
301/// rows from the `users` table that will be eliminated by the join.
302/// This can be done via a bloom filter or similar and is not (yet) supported
303/// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
304///
305/// ```text
306/// ┌─────────────────────┐
307/// │ │
308/// │ HashJoinExec │
309/// │ [u.dept@hash(d.id)] │
310/// │ │
311/// └─────────────────────┘
312/// │
313/// ┌────────────┴────────────┐
314/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
315/// │ DataSourceExec │ │ DataSourceExec │
316/// │ alias [users as u] │ │ alias [dept as d] │
317/// │ filters = │ │ filters = │
318/// │ [depg@hash(d.id)] │ │ [ d.size > 100] │
319/// └─────────────────────┘ └─────────────────────┘
320/// ```
321///
322/// You may notice in this case that the filter is *dynamic*: the hash table
323/// is built _after_ the `departments` table is read and at runtime. We
324/// don't have a concrete `InList` filter or similar to push down at
325/// optimization time. These sorts of dynamic filters are handled by
326/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
327/// and internally maintains a reference to the hash table or other state.
328///
329/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`]
330/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
331/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
332/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
333///
334/// # Example: Push TopK filters into Scans
335///
336/// Another form of dynamic filter is pushing down the state of a `TopK`
337/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
338///
339/// ```text
340/// ┌──────────────────────┐
341/// │ TopK │
342/// │ limit = 10 │
343/// │ order by = [id] │
344/// └──────────────────────┘
345/// │
346/// ▼
347/// ┌──────────────────────┐
348/// │ DataSourceExec │
349/// │ projection = * │
350/// └──────────────────────┘
351/// ```
352///
353/// We can avoid large amounts of data processing by transforming this into:
354///
355/// ```text
356/// ┌──────────────────────┐
357/// │ TopK │
358/// │ limit = 10 │
359/// │ order by = [id] │
360/// └──────────────────────┘
361/// │
362/// ▼
363/// ┌──────────────────────┐
364/// │ DataSourceExec │
365/// │ projection = * │
366/// │ filters = │
367/// │ [id < @ TopKHeap] │
368/// └──────────────────────┘
369/// ```
370///
371/// Now as we fill our `TopK` heap we can push down the state of the heap to
372/// the `DataSourceExec` node to avoid reading files / row groups / pages /
373/// rows that could not possibly be in the top 10.
374///
375/// This is not yet implemented in DataFusion. See
376/// <https://github.com/apache/datafusion/issues/15037>
377///
378/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
379/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot
380/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
381/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
382/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
383#[derive(Debug)]
384pub struct FilterPushdown {
385 phase: FilterPushdownPhase,
386 name: String,
387}
388
389impl FilterPushdown {
390 fn new_with_phase(phase: FilterPushdownPhase) -> Self {
391 let name = match phase {
392 FilterPushdownPhase::Pre => "FilterPushdown",
393 FilterPushdownPhase::Post => "FilterPushdown(Post)",
394 }
395 .to_string();
396 Self { phase, name }
397 }
398
399 /// Create a new [`FilterPushdown`] optimizer rule that runs in the pre-optimization phase.
400 /// See [`FilterPushdownPhase`] for more details.
401 pub fn new() -> Self {
402 Self::new_with_phase(FilterPushdownPhase::Pre)
403 }
404
405 /// Create a new [`FilterPushdown`] optimizer rule that runs in the post-optimization phase.
406 /// See [`FilterPushdownPhase`] for more details.
407 pub fn new_post_optimization() -> Self {
408 Self::new_with_phase(FilterPushdownPhase::Post)
409 }
410}
411
412impl Default for FilterPushdown {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418impl PhysicalOptimizerRule for FilterPushdown {
419 fn optimize(
420 &self,
421 plan: Arc<dyn ExecutionPlan>,
422 config: &ConfigOptions,
423 ) -> Result<Arc<dyn ExecutionPlan>> {
424 Ok(
425 push_down_filters(Arc::clone(&plan), vec![], config, self.phase)?
426 .updated_node
427 .unwrap_or(plan),
428 )
429 }
430
431 fn name(&self) -> &str {
432 &self.name
433 }
434
435 fn schema_check(&self) -> bool {
436 true // Filter pushdown does not change the schema of the plan
437 }
438}
439
440fn push_down_filters(
441 node: Arc<dyn ExecutionPlan>,
442 parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
443 config: &ConfigOptions,
444 phase: FilterPushdownPhase,
445) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
446 let mut parent_filter_pushdown_supports: Vec<Vec<PushedDown>> =
447 vec![vec![]; parent_predicates.len()];
448 let mut self_filters_pushdown_supports = vec![];
449 let mut new_children = Vec::with_capacity(node.children().len());
450
451 let children = node.children();
452
453 // Filter out expressions that are not allowed for pushdown
454 let parent_filtered = FilteredVec::new(&parent_predicates, allow_pushdown_for_expr);
455
456 let filter_description = node.gather_filters_for_pushdown(
457 phase,
458 parent_filtered.items().to_vec(),
459 config,
460 )?;
461
462 let filter_description_parent_filters = filter_description.parent_filters();
463 let filter_description_self_filters = filter_description.self_filters();
464 if filter_description_parent_filters.len() != children.len() {
465 return Err(datafusion_common::DataFusionError::Internal(
466 format!(
467 "Filter pushdown expected FilterDescription to have parent filters for {expected_num_children}, but got {actual_num_children} for node {node_name}",
468 expected_num_children = children.len(),
469 actual_num_children = filter_description_parent_filters.len(),
470 node_name = node.name(),
471 ),
472 ));
473 }
474 if filter_description_self_filters.len() != children.len() {
475 return Err(datafusion_common::DataFusionError::Internal(
476 format!(
477 "Filter pushdown expected FilterDescription to have self filters for {expected_num_children}, but got {actual_num_children} for node {node_name}",
478 expected_num_children = children.len(),
479 actual_num_children = filter_description_self_filters.len(),
480 node_name = node.name(),
481 ),
482 ));
483 }
484
485 for (child_idx, (child, parent_filters, self_filters)) in izip!(
486 children,
487 filter_description.parent_filters(),
488 filter_description.self_filters()
489 )
490 .enumerate()
491 {
492 // Here, `parent_filters` are the predicates which are provided by the parent node of
493 // the current node, and tried to be pushed down over the child which the loop points
494 // currently. `self_filters` are the predicates which are provided by the current node,
495 // and tried to be pushed down over the child similarly.
496
497 // Filter out self_filters that contain volatile expressions and track indices
498 let self_filtered = FilteredVec::new(&self_filters, allow_pushdown_for_expr);
499
500 let num_self_filters = self_filtered.len();
501 let mut all_predicates = self_filtered.items().to_vec();
502
503 // Apply second filter pass: collect indices of parent filters that can be pushed down
504 let parent_filters_for_child = parent_filtered
505 .chain_filter_slice(&parent_filters, |filter| {
506 matches!(filter.discriminant, PushedDown::Yes)
507 });
508
509 // Add the filtered parent predicates to all_predicates
510 for filter in parent_filters_for_child.items() {
511 all_predicates.push(Arc::clone(&filter.predicate));
512 }
513
514 let num_parent_filters = all_predicates.len() - num_self_filters;
515
516 // Any filters that could not be pushed down to a child are marked as not-supported to our parents
517 let result = push_down_filters(Arc::clone(child), all_predicates, config, phase)?;
518
519 if let Some(new_child) = result.updated_node {
520 // If we have a filter pushdown result, we need to update our children
521 new_children.push(new_child);
522 } else {
523 // If we don't have a filter pushdown result, we need to update our children
524 new_children.push(Arc::clone(child));
525 }
526
527 // Our child doesn't know the difference between filters that were passed down
528 // from our parents and filters that the current node injected. We need to de-entangle
529 // this since we do need to distinguish between them.
530 let mut all_filters = result.filters.into_iter().collect_vec();
531 if all_filters.len() != num_self_filters + num_parent_filters {
532 return Err(datafusion_common::DataFusionError::Internal(
533 format!(
534 "Filter pushdown did not return the expected number of filters: expected {num_self_filters} self filters and {num_parent_filters} parent filters, but got {num_filters_from_child}. Likely culprit is {child}",
535 num_self_filters = num_self_filters,
536 num_parent_filters = num_parent_filters,
537 num_filters_from_child = all_filters.len(),
538 child = child.name(),
539 ),
540 ));
541 }
542 let parent_filters = all_filters
543 .split_off(num_self_filters)
544 .into_iter()
545 .collect_vec();
546 // Map the results from filtered self filters back to their original positions using FilteredVec
547 let mapped_self_results =
548 self_filtered.map_results_to_original(all_filters, PushedDown::No);
549
550 // Wrap each result with its corresponding expression
551 let self_filter_results: Vec<_> = mapped_self_results
552 .into_iter()
553 .zip(self_filters)
554 .map(|(support, filter)| support.wrap_expression(filter))
555 .collect();
556
557 self_filters_pushdown_supports.push(self_filter_results);
558
559 // Start by marking all parent filters as unsupported for this child
560 for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() {
561 parent_filter_pushdown_support.push(PushedDown::No);
562 assert_eq!(
563 parent_filter_pushdown_support.len(),
564 child_idx + 1,
565 "Parent filter pushdown supports should have the same length as the number of children"
566 );
567 }
568 // Map results from pushed-down filters back to original parent filter indices
569 let mapped_parent_results = parent_filters_for_child
570 .map_results_to_original(parent_filters, PushedDown::No);
571
572 // Update parent_filter_pushdown_supports with the mapped results
573 // mapped_parent_results already has the results at their original indices
574 for (idx, support) in parent_filter_pushdown_supports.iter_mut().enumerate() {
575 support[child_idx] = mapped_parent_results[idx];
576 }
577 }
578
579 // Re-create this node with new children
580 let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?;
581
582 // TODO: by calling `handle_child_pushdown_result` we are assuming that the
583 // `ExecutionPlan` implementation will not change the plan itself.
584 // Should we have a separate method for dynamic pushdown that does not allow modifying the plan?
585 let mut res = updated_node.handle_child_pushdown_result(
586 phase,
587 ChildPushdownResult {
588 parent_filters: parent_predicates
589 .into_iter()
590 .enumerate()
591 .map(
592 |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult {
593 filter: parent_filter,
594 child_results: parent_filter_pushdown_supports[parent_filter_idx]
595 .clone(),
596 },
597 )
598 .collect(),
599 self_filters: self_filters_pushdown_supports,
600 },
601 config,
602 )?;
603 // Compare pointers for new_node and node, if they are different we must replace
604 // ourselves because of changes in our children.
605 if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, &node) {
606 res.updated_node = Some(updated_node)
607 }
608 Ok(res)
609}
610
611/// A helper structure for filtering elements from a vector through multiple passes while
612/// tracking their original indices, allowing results to be mapped back to the original positions.
613struct FilteredVec<T> {
614 items: Vec<T>,
615 // Chain of index mappings: each Vec maps from current level to previous level
616 // index_mappings[0] maps from first filter to original indices
617 // index_mappings[1] maps from second filter to first filter indices, etc.
618 index_mappings: Vec<Vec<usize>>,
619 original_len: usize,
620}
621
622impl<T: Clone> FilteredVec<T> {
623 /// Creates a new FilteredVec by filtering items based on the given predicate
624 fn new<F>(items: &[T], predicate: F) -> Self
625 where
626 F: Fn(&T) -> bool,
627 {
628 let mut filtered_items = Vec::new();
629 let mut original_indices = Vec::new();
630
631 for (idx, item) in items.iter().enumerate() {
632 if predicate(item) {
633 filtered_items.push(item.clone());
634 original_indices.push(idx);
635 }
636 }
637
638 Self {
639 items: filtered_items,
640 index_mappings: vec![original_indices],
641 original_len: items.len(),
642 }
643 }
644
645 /// Returns a reference to the filtered items
646 fn items(&self) -> &[T] {
647 &self.items
648 }
649
650 /// Returns the number of filtered items
651 fn len(&self) -> usize {
652 self.items.len()
653 }
654
655 /// Maps results from the filtered items back to their original positions
656 /// Returns a vector with the same length as the original input, filled with default_value
657 /// and updated with results at their original positions
658 fn map_results_to_original<R: Clone>(
659 &self,
660 results: Vec<R>,
661 default_value: R,
662 ) -> Vec<R> {
663 let mut mapped_results = vec![default_value; self.original_len];
664
665 for (result_idx, result) in results.into_iter().enumerate() {
666 let original_idx = self.trace_to_original_index(result_idx);
667 mapped_results[original_idx] = result;
668 }
669
670 mapped_results
671 }
672
673 /// Traces a filtered index back to its original index through all filter passes
674 fn trace_to_original_index(&self, mut current_idx: usize) -> usize {
675 // Work backwards through the chain of index mappings
676 for mapping in self.index_mappings.iter().rev() {
677 current_idx = mapping[current_idx];
678 }
679 current_idx
680 }
681
682 /// Apply a filter to a new set of items while chaining the index mapping from self (parent)
683 /// This is useful when you have filtered items and then get a transformed slice
684 /// (e.g., from gather_filters_for_pushdown) that you need to filter again
685 fn chain_filter_slice<U: Clone, F>(&self, items: &[U], predicate: F) -> FilteredVec<U>
686 where
687 F: Fn(&U) -> bool,
688 {
689 let mut filtered_items = Vec::new();
690 let mut filtered_indices = Vec::new();
691
692 for (idx, item) in items.iter().enumerate() {
693 if predicate(item) {
694 filtered_items.push(item.clone());
695 filtered_indices.push(idx);
696 }
697 }
698
699 // Chain the index mappings from parent (self)
700 let mut index_mappings = self.index_mappings.clone();
701 index_mappings.push(filtered_indices);
702
703 FilteredVec {
704 items: filtered_items,
705 index_mappings,
706 original_len: self.original_len,
707 }
708 }
709}
710
711fn allow_pushdown_for_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
712 let mut allow_pushdown = true;
713 expr.apply(|e| {
714 allow_pushdown = allow_pushdown && !is_volatile(e);
715 if allow_pushdown {
716 Ok(TreeNodeRecursion::Continue)
717 } else {
718 Ok(TreeNodeRecursion::Stop)
719 }
720 })
721 .expect("Infallible traversal of PhysicalExpr tree failed");
722 allow_pushdown
723}
724
725#[cfg(test)]
726mod tests {
727 use super::*;
728
729 #[test]
730 fn test_filtered_vec_single_pass() {
731 let items = vec![1, 2, 3, 4, 5, 6];
732 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
733
734 // Check filtered items
735 assert_eq!(filtered.items(), &[2, 4, 6]);
736 assert_eq!(filtered.len(), 3);
737
738 // Check index mapping
739 let results = vec!["a", "b", "c"];
740 let mapped = filtered.map_results_to_original(results, "default");
741 assert_eq!(mapped, vec!["default", "a", "default", "b", "default", "c"]);
742 }
743
744 #[test]
745 fn test_filtered_vec_empty_filter() {
746 let items = vec![1, 3, 5];
747 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
748
749 assert_eq!(filtered.items(), &[] as &[i32]);
750 assert_eq!(filtered.len(), 0);
751
752 let results: Vec<&str> = vec![];
753 let mapped = filtered.map_results_to_original(results, "default");
754 assert_eq!(mapped, vec!["default", "default", "default"]);
755 }
756
757 #[test]
758 fn test_filtered_vec_all_pass() {
759 let items = vec![2, 4, 6];
760 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
761
762 assert_eq!(filtered.items(), &[2, 4, 6]);
763 assert_eq!(filtered.len(), 3);
764
765 let results = vec!["a", "b", "c"];
766 let mapped = filtered.map_results_to_original(results, "default");
767 assert_eq!(mapped, vec!["a", "b", "c"]);
768 }
769
770 #[test]
771 fn test_chain_filter_slice_different_types() {
772 // First pass: filter numbers
773 let numbers = vec![1, 2, 3, 4, 5, 6];
774 let first_pass = FilteredVec::new(&numbers, |&x| x > 3);
775 assert_eq!(first_pass.items(), &[4, 5, 6]);
776
777 // Transform to strings (simulating gather_filters_for_pushdown transformation)
778 let strings = vec!["four", "five", "six"];
779
780 // Second pass: filter strings that contain 'i'
781 let second_pass = first_pass.chain_filter_slice(&strings, |s| s.contains('i'));
782 assert_eq!(second_pass.items(), &["five", "six"]);
783
784 // Map results back to original indices
785 let results = vec![100, 200];
786 let mapped = second_pass.map_results_to_original(results, 0);
787 // "five" was at index 4 (1-based: 5), "six" was at index 5 (1-based: 6)
788 assert_eq!(mapped, vec![0, 0, 0, 0, 100, 200]);
789 }
790
791 #[test]
792 fn test_chain_filter_slice_complex_scenario() {
793 // Simulating the filter pushdown scenario
794 // Parent predicates: [A, B, C, D, E]
795 let parent_predicates = vec!["A", "B", "C", "D", "E"];
796
797 // First pass: filter out some predicates (simulating allow_pushdown_for_expr)
798 let first_pass = FilteredVec::new(&parent_predicates, |s| *s != "B" && *s != "D");
799 assert_eq!(first_pass.items(), &["A", "C", "E"]);
800
801 // After gather_filters_for_pushdown, we get transformed results for a specific child
802 // Let's say child gets [A_transformed, C_transformed, E_transformed]
803 // but only C and E can be pushed down
804 #[derive(Clone, Debug, PartialEq)]
805 struct TransformedPredicate {
806 name: String,
807 can_push: bool,
808 }
809
810 let child_predicates = vec![
811 TransformedPredicate {
812 name: "A_transformed".to_string(),
813 can_push: false,
814 },
815 TransformedPredicate {
816 name: "C_transformed".to_string(),
817 can_push: true,
818 },
819 TransformedPredicate {
820 name: "E_transformed".to_string(),
821 can_push: true,
822 },
823 ];
824
825 // Second pass: filter based on can_push
826 let second_pass =
827 first_pass.chain_filter_slice(&child_predicates, |p| p.can_push);
828 assert_eq!(second_pass.len(), 2);
829 assert_eq!(second_pass.items()[0].name, "C_transformed");
830 assert_eq!(second_pass.items()[1].name, "E_transformed");
831
832 // Simulate getting results back from child
833 let child_results = vec!["C_result", "E_result"];
834 let mapped = second_pass.map_results_to_original(child_results, "no_result");
835
836 // Results should be at original positions: C was at index 2, E was at index 4
837 assert_eq!(
838 mapped,
839 vec![
840 "no_result",
841 "no_result",
842 "C_result",
843 "no_result",
844 "E_result"
845 ]
846 );
847 }
848
849 #[test]
850 fn test_trace_to_original_index() {
851 let items = vec![10, 20, 30, 40, 50];
852 let filtered = FilteredVec::new(&items, |&x| x != 20 && x != 40);
853
854 // filtered items are [10, 30, 50] at original indices [0, 2, 4]
855 assert_eq!(filtered.trace_to_original_index(0), 0); // 10 was at index 0
856 assert_eq!(filtered.trace_to_original_index(1), 2); // 30 was at index 2
857 assert_eq!(filtered.trace_to_original_index(2), 4); // 50 was at index 4
858 }
859
860 #[test]
861 fn test_chain_filter_preserves_original_len() {
862 let items = vec![1, 2, 3, 4, 5];
863 let first = FilteredVec::new(&items, |&x| x > 2);
864
865 let strings = vec!["three", "four", "five"];
866 let second = first.chain_filter_slice(&strings, |s| s.len() == 4);
867
868 // Original length should still be 5
869 let results = vec!["x", "y"];
870 let mapped = second.map_results_to_original(results, "-");
871 assert_eq!(mapped.len(), 5);
872 }
873}