use std::collections::HashSet;
use std::sync::Arc;
use arrow_schema::SchemaRef;
use datafusion_common::{
Result,
tree_node::{Transformed, TreeNode},
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterPushdownPhase {
Pre,
Post,
}
impl std::fmt::Display for FilterPushdownPhase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FilterPushdownPhase::Pre => write!(f, "Pre"),
FilterPushdownPhase::Post => write!(f, "Post"),
}
}
}
#[derive(Debug, Clone)]
pub struct PushedDownPredicate {
pub discriminant: PushedDown,
pub predicate: Arc<dyn PhysicalExpr>,
}
impl PushedDownPredicate {
pub fn into_inner(self) -> Arc<dyn PhysicalExpr> {
self.predicate
}
pub fn supported(predicate: Arc<dyn PhysicalExpr>) -> Self {
Self {
discriminant: PushedDown::Yes,
predicate,
}
}
pub fn unsupported(predicate: Arc<dyn PhysicalExpr>) -> Self {
Self {
discriminant: PushedDown::No,
predicate,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum PushedDown {
Yes,
No,
}
impl PushedDown {
pub fn and(self, other: PushedDown) -> PushedDown {
match (self, other) {
(PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes,
_ => PushedDown::No,
}
}
pub fn or(self, other: PushedDown) -> PushedDown {
match (self, other) {
(PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes,
(PushedDown::No, PushedDown::No) => PushedDown::No,
}
}
pub fn wrap_expression(self, expr: Arc<dyn PhysicalExpr>) -> PushedDownPredicate {
PushedDownPredicate {
discriminant: self,
predicate: expr,
}
}
}
#[derive(Debug, Clone)]
pub struct ChildFilterPushdownResult {
pub filter: Arc<dyn PhysicalExpr>,
pub child_results: Vec<PushedDown>,
}
impl ChildFilterPushdownResult {
pub fn any(&self) -> PushedDown {
if self.child_results.is_empty() {
PushedDown::No
} else {
self.child_results
.iter()
.fold(PushedDown::No, |acc, result| acc.or(*result))
}
}
pub fn all(&self) -> PushedDown {
if self.child_results.is_empty() {
PushedDown::No
} else {
self.child_results
.iter()
.fold(PushedDown::Yes, |acc, result| acc.and(*result))
}
}
}
#[derive(Debug, Clone)]
pub struct ChildPushdownResult {
pub parent_filters: Vec<ChildFilterPushdownResult>,
pub self_filters: Vec<Vec<PushedDownPredicate>>,
}
#[derive(Debug, Clone)]
pub struct FilterPushdownPropagation<T> {
pub filters: Vec<PushedDown>,
pub updated_node: Option<T>,
}
impl<T> FilterPushdownPropagation<T> {
pub fn if_all(child_pushdown_result: ChildPushdownResult) -> Self {
let filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|result| result.all())
.collect();
Self {
filters,
updated_node: None,
}
}
pub fn if_any(child_pushdown_result: ChildPushdownResult) -> Self {
let filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|result| result.any())
.collect();
Self {
filters,
updated_node: None,
}
}
pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
let filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|_| PushedDown::No)
.collect();
Self {
filters,
updated_node: None,
}
}
pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
Self {
filters,
updated_node: None,
}
}
pub fn with_updated_node(mut self, updated_node: T) -> Self {
self.updated_node = Some(updated_node);
self
}
}
#[derive(Debug, Clone)]
pub struct ChildFilterDescription {
pub(crate) parent_filters: Vec<PushedDownPredicate>,
pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
}
pub(crate) struct FilterRemapper {
child_schema: SchemaRef,
allowed_indices: HashSet<usize>,
}
impl FilterRemapper {
pub(crate) fn new(child_schema: SchemaRef) -> Self {
let allowed_indices = (0..child_schema.fields().len()).collect();
Self {
child_schema,
allowed_indices,
}
}
fn with_allowed_indices(
child_schema: SchemaRef,
allowed_indices: HashSet<usize>,
) -> Self {
Self {
child_schema,
allowed_indices,
}
}
pub(crate) fn try_remap(
&self,
filter: &Arc<dyn PhysicalExpr>,
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
let mut all_valid = true;
let transformed = Arc::clone(filter).transform_down(|expr| {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
if self.allowed_indices.contains(&col.index())
&& let Ok(new_index) = self.child_schema.index_of(col.name())
{
Ok(Transformed::yes(Arc::new(Column::new(
col.name(),
new_index,
))))
} else {
all_valid = false;
Ok(Transformed::complete(expr))
}
} else {
Ok(Transformed::no(expr))
}
})?;
Ok(all_valid.then_some(transformed.data))
}
}
impl ChildFilterDescription {
pub fn from_child(
parent_filters: &[Arc<dyn PhysicalExpr>],
child: &Arc<dyn crate::ExecutionPlan>,
) -> Result<Self> {
let remapper = FilterRemapper::new(child.schema());
Self::remap_filters(parent_filters, &remapper)
}
pub fn from_child_with_allowed_indices(
parent_filters: &[Arc<dyn PhysicalExpr>],
allowed_indices: HashSet<usize>,
child: &Arc<dyn crate::ExecutionPlan>,
) -> Result<Self> {
let remapper =
FilterRemapper::with_allowed_indices(child.schema(), allowed_indices);
Self::remap_filters(parent_filters, &remapper)
}
fn remap_filters(
parent_filters: &[Arc<dyn PhysicalExpr>],
remapper: &FilterRemapper,
) -> Result<Self> {
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
for filter in parent_filters {
if let Some(remapped) = remapper.try_remap(filter)? {
child_parent_filters.push(PushedDownPredicate::supported(remapped));
} else {
child_parent_filters
.push(PushedDownPredicate::unsupported(Arc::clone(filter)));
}
}
Ok(Self {
parent_filters: child_parent_filters,
self_filters: vec![],
})
}
pub fn all_unsupported(parent_filters: &[Arc<dyn PhysicalExpr>]) -> Self {
Self {
parent_filters: parent_filters
.iter()
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
.collect(),
self_filters: vec![],
}
}
pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
self.self_filters.push(filter);
self
}
pub fn with_self_filters(mut self, filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
self.self_filters.extend(filters);
self
}
}
#[derive(Debug, Clone)]
pub struct FilterDescription {
child_filter_descriptions: Vec<ChildFilterDescription>,
}
impl Default for FilterDescription {
fn default() -> Self {
Self::new()
}
}
impl FilterDescription {
pub fn new() -> Self {
Self {
child_filter_descriptions: vec![],
}
}
pub fn with_child(mut self, child: ChildFilterDescription) -> Self {
self.child_filter_descriptions.push(child);
self
}
#[expect(clippy::needless_pass_by_value)]
pub fn from_children(
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
children: &[&Arc<dyn crate::ExecutionPlan>],
) -> Result<Self> {
let mut desc = Self::new();
for child in children {
desc = desc
.with_child(ChildFilterDescription::from_child(&parent_filters, child)?);
}
Ok(desc)
}
pub fn all_unsupported(
parent_filters: &[Arc<dyn PhysicalExpr>],
children: &[&Arc<dyn crate::ExecutionPlan>],
) -> Self {
let mut desc = Self::new();
for _ in 0..children.len() {
desc =
desc.with_child(ChildFilterDescription::all_unsupported(parent_filters));
}
desc
}
pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
self.child_filter_descriptions
.iter()
.map(|d| &d.parent_filters)
.cloned()
.collect()
}
pub fn self_filters(&self) -> Vec<Vec<Arc<dyn PhysicalExpr>>> {
self.child_filter_descriptions
.iter()
.map(|d| &d.self_filters)
.cloned()
.collect()
}
}