use crate::logical_plan::consumer::SubstraitConsumer;
use crate::logical_plan::consumer::utils::NameTracker;
use async_recursion::async_recursion;
use datafusion::common::{Column, not_impl_err};
use datafusion::logical_expr::builder::project;
use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use std::collections::HashSet;
use std::sync::Arc;
use substrait::proto::ProjectRel;
#[async_recursion]
pub async fn from_project_rel(
consumer: &impl SubstraitConsumer,
p: &ProjectRel,
) -> datafusion::common::Result<LogicalPlan> {
if let Some(input) = p.input.as_ref() {
let input = consumer.consume_rel(input).await?;
let original_schema = Arc::clone(input.schema());
let mut name_tracker = NameTracker::new();
let mut explicit_exprs: Vec<Expr> = vec![];
let mut window_exprs: HashSet<Expr> = HashSet::new();
for expr in &p.expressions {
let e = consumer
.consume_expression(expr, input.clone().schema())
.await?;
if let Expr::WindowFunction(_) = &e {
window_exprs.insert(e.clone());
}
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
}
let input = if !window_exprs.is_empty() {
LogicalPlanBuilder::window_plan(input, window_exprs)?
} else {
input
};
let mut final_exprs: Vec<Expr> = vec![];
for index in 0..original_schema.fields().len() {
let e = Expr::Column(Column::from(original_schema.qualified_field(index)));
final_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
}
final_exprs.append(&mut explicit_exprs);
project(input, final_exprs)
} else {
not_impl_err!("Projection without an input is not supported")
}
}