use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::logical_plan::{and, LogicalPlan};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::{error::Result, logical_plan::Operator};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
pub struct FilterPushDown {}
#[derive(Debug, Clone, Default)]
struct State {
filters: Vec<(Expr, HashSet<String>)>,
}
type Predicates<'a> = (Vec<&'a Expr>, Vec<&'a HashSet<String>>);
fn get_predicates<'a>(
state: &'a State,
used_columns: &HashSet<String>,
) -> Predicates<'a> {
state
.filters
.iter()
.filter(|(_, columns)| {
!columns
.intersection(used_columns)
.collect::<HashSet<_>>()
.is_empty()
})
.map(|&(ref a, ref b)| (a, b))
.unzip()
}
fn get_join_predicates<'a>(
state: &'a State,
left: &DFSchema,
right: &DFSchema,
) -> (
Vec<&'a HashSet<String>>,
Vec<&'a HashSet<String>>,
Predicates<'a>,
) {
let left_columns = &left
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<HashSet<_>>();
let right_columns = &right
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<HashSet<_>>();
let filters = state
.filters
.iter()
.map(|(predicate, columns)| {
(
(predicate, columns),
(
columns,
left_columns.intersection(columns).collect::<HashSet<_>>(),
right_columns.intersection(columns).collect::<HashSet<_>>(),
),
)
})
.collect::<Vec<_>>();
let pushable_to_left = filters
.iter()
.filter(|(_, (columns, left, _))| left.len() == columns.len())
.map(|((_, b), _)| *b)
.collect();
let pushable_to_right = filters
.iter()
.filter(|(_, (columns, _, right))| right.len() == columns.len())
.map(|((_, b), _)| *b)
.collect();
let keep = filters
.iter()
.filter(|(_, (columns, left, right))| {
let all_in_left = left.len() == columns.len();
let all_in_right = right.len() == columns.len();
!all_in_left && !all_in_right
})
.map(|((ref a, ref b), _)| (a, b))
.unzip();
(pushable_to_left, pushable_to_right, keep)
}
fn push_down(state: &State, plan: &LogicalPlan) -> Result<LogicalPlan> {
let new_inputs = plan
.inputs()
.iter()
.map(|input| optimize(input, state.clone()))
.collect::<Result<Vec<_>>>()?;
let expr = plan.expressions();
utils::from_plan(&plan, &expr, &new_inputs)
}
fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
let predicate = predicates
.iter()
.skip(1)
.fold(predicates[0].clone(), |acc, predicate| {
and(acc, (*predicate).to_owned())
});
LogicalPlan::Filter {
predicate,
input: Arc::new(plan),
}
}
fn remove_filters(
filters: &[(Expr, HashSet<String>)],
predicate_columns: &[&HashSet<String>],
) -> Vec<(Expr, HashSet<String>)> {
filters
.iter()
.filter(|(_, columns)| !predicate_columns.contains(&columns))
.cloned()
.collect::<Vec<_>>()
}
fn keep_filters(
filters: &[(Expr, HashSet<String>)],
predicate_columns: &[&HashSet<String>],
) -> Vec<(Expr, HashSet<String>)> {
filters
.iter()
.filter(|(_, columns)| predicate_columns.contains(&columns))
.cloned()
.collect::<Vec<_>>()
}
fn issue_filters(
mut state: State,
used_columns: HashSet<String>,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let (predicates, predicate_columns) = get_predicates(&state, &used_columns);
if predicates.is_empty() {
return push_down(&state, plan);
}
let plan = add_filter(plan.clone(), &predicates);
state.filters = remove_filters(&state.filters, &predicate_columns);
push_down(&state, &plan)
}
fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
match predicate {
Expr::BinaryExpr {
right,
op: Operator::And,
left,
} => {
split_members(&left, predicates);
split_members(&right, predicates);
}
other => predicates.push(other),
}
}
fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter { input, predicate } => {
let mut predicates = vec![];
split_members(predicate, &mut predicates);
predicates
.into_iter()
.try_for_each::<_, Result<()>>(|predicate| {
let mut columns: HashSet<String> = HashSet::new();
utils::expr_to_column_names(predicate, &mut columns)?;
state.filters.push((predicate.clone(), columns));
Ok(())
})?;
optimize(input, state)
}
LogicalPlan::Projection {
input,
expr,
schema,
} => {
let mut projection = HashMap::new();
schema.fields().iter().enumerate().for_each(|(i, field)| {
let expr = match &expr[i] {
Expr::Alias(expr, _) => expr.as_ref().clone(),
expr => expr.clone(),
};
projection.insert(field.name().clone(), expr);
});
for (predicate, columns) in state.filters.iter_mut() {
*predicate = rewrite(predicate, &projection)?;
columns.clear();
utils::expr_to_column_names(predicate, columns)?;
}
let new_input = optimize(input, state)?;
utils::from_plan(&plan, &expr, &[new_input])
}
LogicalPlan::Aggregate {
input, aggr_expr, ..
} => {
let mut used_columns = HashSet::new();
utils::exprlist_to_column_names(aggr_expr, &mut used_columns)?;
let agg_columns = aggr_expr
.iter()
.map(|x| x.name(input.schema()))
.collect::<Result<HashSet<_>>>()?;
used_columns.extend(agg_columns);
issue_filters(state, used_columns, plan)
}
LogicalPlan::Sort { .. } => {
push_down(&state, plan)
}
LogicalPlan::Limit { input, .. } => {
let used_columns = input
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<HashSet<_>>();
issue_filters(state, used_columns, plan)
}
LogicalPlan::Join { left, right, .. } => {
let (pushable_to_left, pushable_to_right, keep) =
get_join_predicates(&state, &left.schema(), &right.schema());
let mut left_state = state.clone();
left_state.filters = keep_filters(&left_state.filters, &pushable_to_left);
let left = optimize(left, left_state)?;
let mut right_state = state.clone();
right_state.filters = keep_filters(&right_state.filters, &pushable_to_right);
let right = optimize(right, right_state)?;
let expr = plan.expressions();
let plan = utils::from_plan(&plan, &expr, &[left, right])?;
if keep.0.is_empty() {
Ok(plan)
} else {
let plan = add_filter(plan, &keep.0);
state.filters = remove_filters(&state.filters, &keep.1);
Ok(plan)
}
}
LogicalPlan::TableScan {
source,
projected_schema,
filters,
projection,
table_name,
limit,
} => {
let mut used_columns = HashSet::new();
let mut new_filters = filters.clone();
for (filter_expr, cols) in &state.filters {
let (preserve_filter_node, add_to_provider) =
match source.supports_filter_pushdown(filter_expr)? {
TableProviderFilterPushDown::Unsupported => (true, false),
TableProviderFilterPushDown::Inexact => (true, true),
TableProviderFilterPushDown::Exact => (false, true),
};
if preserve_filter_node {
used_columns.extend(cols.clone());
}
if add_to_provider {
new_filters.push(filter_expr.clone());
}
}
issue_filters(
state,
used_columns,
&LogicalPlan::TableScan {
source: source.clone(),
projection: projection.clone(),
projected_schema: projected_schema.clone(),
table_name: table_name.clone(),
filters: new_filters,
limit: *limit,
},
)
}
_ => {
let used_columns = plan
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<HashSet<_>>();
issue_filters(state, used_columns, plan)
}
}
}
impl OptimizerRule for FilterPushDown {
fn name(&self) -> &str {
"filter_push_down"
}
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
impl FilterPushDown {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
let expressions = utils::expr_sub_expressions(&expr)?;
let expressions = expressions
.iter()
.map(|e| rewrite(e, &projection))
.collect::<Result<Vec<_>>>()?;
if let Expr::Column(name) = expr {
if let Some(expr) = projection.get(name) {
return Ok(expr.clone());
}
}
utils::rewrite_expression(&expr, &expressions)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
use crate::logical_plan::{lit, sum, DFSchema, Expr, LogicalPlanBuilder, Operator};
use crate::physical_plan::ExecutionPlan;
use crate::test::*;
use crate::{logical_plan::col, prelude::JoinType};
use arrow::datatypes::SchemaRef;
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = FilterPushDown::new();
let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}
#[test]
fn filter_before_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("b")])?
.filter(col("a").eq(lit(1i64)))?
.build()?;
let expected = "\
Projection: #a, #b\
\n Filter: #a Eq Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_after_limit() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("b")])?
.limit(10)?
.filter(col("a").eq(lit(1i64)))?
.build()?;
let expected = "\
Filter: #a Eq Int64(1)\
\n Limit: 10\
\n Projection: #a, #b\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_jump_2_plans() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("b"), col("c")])?
.project(vec![col("c"), col("b")])?
.filter(col("a").eq(lit(1i64)))?
.build()?;
let expected = "\
Projection: #c, #b\
\n Projection: #a, #b, #c\
\n Filter: #a Eq Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_move_agg() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.aggregate(vec![col("a")], vec![sum(col("b")).alias("total_salary")])?
.filter(col("a").gt(lit(10i64)))?
.build()?;
let expected = "\
Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS total_salary]]\
\n Filter: #a Gt Int64(10)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_keep_agg() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.aggregate(vec![col("a")], vec![sum(col("b")).alias("b")])?
.filter(col("b").gt(lit(10i64)))?
.build()?;
let expected = "\
Filter: #b Gt Int64(10)\
\n Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS b]]\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn alias() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.filter(col("b").eq(lit(1i64)))?
.build()?;
let expected = "\
Projection: #a AS b, #c\
\n Filter: #a Eq Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
fn add(left: Expr, right: Expr) -> Expr {
Expr::BinaryExpr {
left: Box::new(left),
op: Operator::Plus,
right: Box::new(right),
}
}
fn multiply(left: Expr, right: Expr) -> Expr {
Expr::BinaryExpr {
left: Box::new(left),
op: Operator::Multiply,
right: Box::new(right),
}
}
#[test]
fn complex_expression() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![
add(multiply(col("a"), lit(2)), col("c")).alias("b"),
col("c"),
])?
.filter(col("b").eq(lit(1i64)))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b Eq Int64(1)\
\n Projection: #a Multiply Int32(2) Plus #c AS b, #c\
\n TableScan: test projection=None"
);
let expected = "\
Projection: #a Multiply Int32(2) Plus #c AS b, #c\
\n Filter: #a Multiply Int32(2) Plus #c Eq Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn complex_plan() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![
add(multiply(col("a"), lit(2)), col("c")).alias("b"),
col("c"),
])?
.project(vec![multiply(col("b"), lit(3)).alias("a"), col("c")])?
.filter(col("a").eq(lit(1i64)))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"\
Filter: #a Eq Int64(1)\
\n Projection: #b Multiply Int32(3) AS a, #c\
\n Projection: #a Multiply Int32(2) Plus #c AS b, #c\
\n TableScan: test projection=None"
);
let expected = "\
Projection: #b Multiply Int32(3) AS a, #c\
\n Projection: #a Multiply Int32(2) Plus #c AS b, #c\
\n Filter: #a Multiply Int32(2) Plus #c Multiply Int32(3) Eq Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn multi_filter() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.aggregate(vec![col("b")], vec![sum(col("c"))])?
.filter(col("b").gt(lit(10i64)))?
.filter(col("SUM(c)").gt(lit(10i64)))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"\
Filter: #SUM(c) Gt Int64(10)\
\n Filter: #b Gt Int64(10)\
\n Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
\n Projection: #a AS b, #c\
\n TableScan: test projection=None"
);
let expected = "\
Filter: #SUM(c) Gt Int64(10)\
\n Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
\n Projection: #a AS b, #c\
\n Filter: #a Gt Int64(10)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn split_filter() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.aggregate(vec![col("b")], vec![sum(col("c"))])?
.filter(and(
col("SUM(c)").gt(lit(10i64)),
and(col("b").gt(lit(10i64)), col("SUM(c)").lt(lit(20i64))),
))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"\
Filter: #SUM(c) Gt Int64(10) And #b Gt Int64(10) And #SUM(c) Lt Int64(20)\
\n Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
\n Projection: #a AS b, #c\
\n TableScan: test projection=None"
);
let expected = "\
Filter: #SUM(c) Gt Int64(10) And #SUM(c) Lt Int64(20)\
\n Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
\n Projection: #a AS b, #c\
\n Filter: #a Gt Int64(10)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn double_limit() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("b")])?
.limit(20)?
.limit(10)?
.project(vec![col("a"), col("b")])?
.filter(col("a").eq(lit(1i64)))?
.build()?;
let expected = "\
Projection: #a, #b\
\n Filter: #a Eq Int64(1)\
\n Limit: 10\
\n Limit: 20\
\n Projection: #a, #b\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_2_breaks_limits() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a")])?
.filter(col("a").lt_eq(lit(1i64)))?
.limit(1)?
.project(vec![col("a")])?
.filter(col("a").gt_eq(lit(1i64)))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"Filter: #a GtEq Int64(1)\
\n Projection: #a\
\n Limit: 1\
\n Filter: #a LtEq Int64(1)\
\n Projection: #a\
\n TableScan: test projection=None"
);
let expected = "\
Projection: #a\
\n Filter: #a GtEq Int64(1)\
\n Limit: 1\
\n Projection: #a\
\n Filter: #a LtEq Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn two_filters_on_same_depth() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.limit(1)?
.filter(col("a").lt_eq(lit(1i64)))?
.filter(col("a").gt_eq(lit(1i64)))?
.project(vec![col("a")])?
.build()?;
assert_eq!(
format!("{:?}", plan),
"Projection: #a\
\n Filter: #a GtEq Int64(1)\
\n Filter: #a LtEq Int64(1)\
\n Limit: 1\
\n TableScan: test projection=None"
);
let expected = "\
Projection: #a\
\n Filter: #a GtEq Int64(1) And #a LtEq Int64(1)\
\n Limit: 1\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filters_user_defined_node() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(&table_scan)
.filter(col("a").lt_eq(lit(1i64)))?
.build()?;
let plan = crate::test::user_defined::new(plan);
let expected = "\
TestUserDefined\
\n Filter: #a LtEq Int64(1)\
\n TableScan: test projection=None";
assert_eq!(format!("{:?}", plan), expected);
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_join_on_common_independent() -> Result<()> {
let table_scan = test_table_scan()?;
let left = LogicalPlanBuilder::from(&table_scan).build()?;
let right = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a")])?
.build()?;
let plan = LogicalPlanBuilder::from(&left)
.join(&right, JoinType::Inner, &["a"], &["a"])?
.filter(col("a").lt_eq(lit(1i64)))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"\
Filter: #a LtEq Int64(1)\
\n Join: a = a\
\n TableScan: test projection=None\
\n Projection: #a\
\n TableScan: test projection=None"
);
let expected = "\
Join: a = a\
\n Filter: #a LtEq Int64(1)\
\n TableScan: test projection=None\
\n Projection: #a\
\n Filter: #a LtEq Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_join_on_common_dependent() -> Result<()> {
let table_scan = test_table_scan()?;
let left = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("c")])?
.build()?;
let right = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("b")])?
.build()?;
let plan = LogicalPlanBuilder::from(&left)
.join(&right, JoinType::Inner, &["a"], &["a"])?
.filter(col("c").lt_eq(col("b")))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"\
Filter: #c LtEq #b\
\n Join: a = a\
\n Projection: #a, #c\
\n TableScan: test projection=None\
\n Projection: #a, #b\
\n TableScan: test projection=None"
);
let expected = &format!("{:?}", plan);
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_join_on_one_side() -> Result<()> {
let table_scan = test_table_scan()?;
let left = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("b")])?
.build()?;
let right = LogicalPlanBuilder::from(&table_scan)
.project(vec![col("a"), col("c")])?
.build()?;
let plan = LogicalPlanBuilder::from(&left)
.join(&right, JoinType::Inner, &["a"], &["a"])?
.filter(col("b").lt_eq(lit(1i64)))?
.build()?;
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b LtEq Int64(1)\
\n Join: a = a\
\n Projection: #a, #b\
\n TableScan: test projection=None\
\n Projection: #a, #c\
\n TableScan: test projection=None"
);
let expected = "\
Join: a = a\
\n Projection: #a, #b\
\n Filter: #b LtEq Int64(1)\
\n TableScan: test projection=None\
\n Projection: #a, #c\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
struct PushDownProvider {
pub filter_support: TableProviderFilterPushDown,
}
impl TableProvider for PushDownProvider {
fn schema(&self) -> SchemaRef {
Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new(
"a",
arrow::datatypes::DataType::Int32,
true,
),
]))
}
fn scan(
&self,
_: &Option<Vec<usize>>,
_: usize,
_: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn supports_filter_pushdown(
&self,
_: &Expr,
) -> Result<TableProviderFilterPushDown> {
Ok(self.filter_support.clone())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn statistics(&self) -> Statistics {
Statistics::default()
}
}
fn table_scan_with_pushdown_provider(
filter_support: TableProviderFilterPushDown,
) -> Result<LogicalPlan> {
let test_provider = PushDownProvider { filter_support };
let table_scan = LogicalPlan::TableScan {
table_name: "".into(),
filters: vec![],
projected_schema: Arc::new(DFSchema::try_from_qualified(
"",
&*test_provider.schema(),
)?),
projection: None,
source: Arc::new(test_provider),
limit: None,
};
LogicalPlanBuilder::from(&table_scan)
.filter(col("a").eq(lit(1i64)))?
.build()
}
#[test]
fn filter_with_table_provider_exact() -> Result<()> {
let plan = table_scan_with_pushdown_provider(TableProviderFilterPushDown::Exact)?;
let expected = "\
TableScan: projection=None, filters=[#a Eq Int64(1)]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_with_table_provider_inexact() -> Result<()> {
let plan =
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?;
let expected = "\
Filter: #a Eq Int64(1)\
\n TableScan: projection=None, filters=[#a Eq Int64(1)]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn filter_with_table_provider_unsupported() -> Result<()> {
let plan =
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Unsupported)?;
let expected = "\
Filter: #a Eq Int64(1)\
\n TableScan: projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
}