use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan};
#[derive(Default)]
pub struct InlineTableScan;
impl InlineTableScan {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl OptimizerRule for InlineTableScan {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
projection,
..
}) if filters.is_empty() => {
if let Some(sub_plan) = source.get_logical_plan() {
let projection_exprs =
generate_projection_expr(projection, sub_plan)?;
let plan = LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
.alias(table_name)?;
Ok(Some(plan.build()?))
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
fn name(&self) -> &str {
"inline_table_scan"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
}
fn generate_projection_expr(
projection: &Option<Vec<usize>>,
sub_plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let mut exprs = vec![];
if let Some(projection) = projection {
for i in projection {
exprs.push(Expr::Column(
sub_plan.schema().fields()[*i].qualified_column(),
));
}
} else {
exprs.push(Expr::Wildcard);
}
Ok(exprs)
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};
use crate::inline_table_scan::InlineTableScan;
use crate::test::assert_optimized_plan_eq;
pub struct RawTableSource {}
impl TableSource for RawTableSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]))
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
}
}
pub struct CustomSource {
plan: LogicalPlan,
}
impl CustomSource {
fn new() -> Self {
Self {
plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
.unwrap()
.build()
.unwrap(),
}
}
}
impl TableSource for CustomSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}
}
#[test]
fn inline_table_scan() -> datafusion_common::Result<()> {
let scan = LogicalPlanBuilder::scan(
"x".to_string(),
Arc::new(CustomSource::new()),
None,
)?;
let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
let expected = "Filter: x.a = Int32(1)\
\n SubqueryAlias: x\
\n Projection: y.a, y.b\
\n TableScan: y";
assert_optimized_plan_eq(Arc::new(InlineTableScan::new()), &plan, expected)
}
#[test]
fn inline_table_scan_with_projection() -> datafusion_common::Result<()> {
let scan = LogicalPlanBuilder::scan(
"x".to_string(),
Arc::new(CustomSource::new()),
Some(vec![0]),
)?;
let plan = scan.build()?;
let expected = "SubqueryAlias: x\
\n Projection: y.a\
\n TableScan: y";
assert_optimized_plan_eq(Arc::new(InlineTableScan::new()), &plan, expected)
}
}