use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{
logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan,
};
#[derive(Default)]
pub struct InlineTableScan;
impl InlineTableScan {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
fetch: None,
..
}) if filters.is_empty() => {
if let Some(sub_plan) = source.get_logical_plan() {
let plan = inline_table_scan(sub_plan)?;
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
vec![Expr::Wildcard],
Some(table_name.to_string()),
)?;
plan.build()
} else {
Ok(plan.clone())
}
}
_ => {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| inline_table_scan(plan))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &plan.expressions(), &new_inputs)
}
}
}
impl OptimizerRule for InlineTableScan {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
inline_table_scan(plan)
}
fn name(&self) -> &str {
"inline_table_scan"
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};
use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule};
pub struct RawTableSource {}
impl TableSource for RawTableSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
}
}
pub struct CustomSource {
plan: LogicalPlan,
}
impl CustomSource {
fn new() -> Self {
Self {
plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
.unwrap()
.build()
.unwrap(),
}
}
}
impl TableSource for CustomSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}
}
#[test]
fn inline_table_scan() {
let rule = InlineTableScan::new();
let source = Arc::new(CustomSource::new());
let scan = LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap();
let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();
let optimized_plan = rule
.optimize(&plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
let expected = "\
Filter: x.a = Int32(1)\
\n Projection: y.a, alias=x\
\n TableScan: y";
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
}
}