use crate::optimizer::{ApplyOrder, ApplyOrder::BottomUp};
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::utils::expand_wildcard;
use datafusion_expr::{
aggregate_function::AggregateFunction as AggregateFunctionFunc, col,
expr::AggregateFunction, LogicalPlanBuilder,
};
use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan};
#[derive(Default)]
pub struct ReplaceDistinctWithAggregate {}
impl ReplaceDistinctWithAggregate {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl OptimizerRule for ReplaceDistinctWithAggregate {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Distinct(Distinct::All(input)) => {
let group_expr = expand_wildcard(input.schema(), input, None)?;
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
group_expr,
vec![],
)?);
Ok(Some(aggregate))
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
select_expr,
on_expr,
sort_expr,
input,
schema,
})) => {
let aggr_expr = select_expr
.iter()
.map(|e| {
Expr::AggregateFunction(AggregateFunction::new(
AggregateFunctionFunc::FirstValue,
vec![e.clone()],
false,
None,
sort_expr.clone(),
None,
))
})
.collect::<Vec<Expr>>();
let plan = LogicalPlanBuilder::from(input.as_ref().clone())
.aggregate(on_expr.clone(), aggr_expr.to_vec())?
.build()?;
let plan = if let Some(sort_expr) = sort_expr {
LogicalPlanBuilder::from(plan)
.sort(sort_expr[..on_expr.len()].to_vec())?
.build()?
} else {
plan
};
let project_exprs = plan
.schema()
.fields()
.iter()
.skip(on_expr.len())
.zip(schema.fields().iter())
.map(|(new_field, old_field)| {
Ok(col(new_field.qualified_column()).alias_qualified(
old_field.qualifier().cloned(),
old_field.name(),
))
})
.collect::<Result<Vec<Expr>>>()?;
let plan = LogicalPlanBuilder::from(plan)
.project(project_exprs)?
.build()?;
Ok(Some(plan))
}
_ => Ok(None),
}
}
fn name(&self) -> &str {
"replace_distinct_aggregate"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(BottomUp)
}
}
#[cfg(test)]
mod tests {
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
use crate::test::{assert_optimized_plan_eq, test_table_scan};
use datafusion_expr::{col, LogicalPlanBuilder};
use std::sync::Arc;
#[test]
fn replace_distinct() -> datafusion_common::Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a"), col("b")])?
.distinct()?
.build()?;
let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\
\n Projection: test.a, test.b\
\n TableScan: test";
assert_optimized_plan_eq(
Arc::new(ReplaceDistinctWithAggregate::new()),
&plan,
expected,
)
}
#[test]
fn replace_distinct_on() -> datafusion_common::Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.distinct_on(
vec![col("a")],
vec![col("b")],
Some(vec![col("a").sort(false, true), col("c").sort(true, false)]),
)?
.build()?;
let expected = "Projection: FIRST_VALUE(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST] AS b\
\n Sort: test.a DESC NULLS FIRST\
\n Aggregate: groupBy=[[test.a]], aggr=[[FIRST_VALUE(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST]]]\
\n TableScan: test";
assert_optimized_plan_eq(
Arc::new(ReplaceDistinctWithAggregate::new()),
&plan,
expected,
)
}
}