use crate::optimize_projections::outer_columns;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{Column, DFSchemaRef, Result};
use datafusion_expr::{Expr, LogicalPlan};
#[derive(Debug, Clone, Default)]
pub(super) struct RequiredIndices {
indices: Vec<usize>,
projection_beneficial: bool,
}
impl RequiredIndices {
pub fn new() -> Self {
Self::default()
}
pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self {
Self {
indices: (0..plan.schema().fields().len()).collect(),
projection_beneficial: false,
}
}
pub fn new_from_indices(indices: Vec<usize>) -> Self {
Self {
indices,
projection_beneficial: false,
}
.compact()
}
pub fn into_inner(self) -> Vec<usize> {
self.indices
}
pub fn with_projection_beneficial(mut self) -> Self {
self.projection_beneficial = true;
self
}
pub fn projection_beneficial(&self) -> bool {
self.projection_beneficial
}
pub fn indices(&self) -> &[usize] {
&self.indices
}
pub fn with_plan_exprs(
mut self,
plan: &LogicalPlan,
schema: &DFSchemaRef,
) -> Result<Self> {
plan.apply_expressions(|e| {
self.add_expr(schema, e);
Ok(TreeNodeRecursion::Continue)
})?;
Ok(self.compact())
}
fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) {
let mut cols = expr.column_refs();
outer_columns(expr, &mut cols);
self.indices.reserve(cols.len());
for col in cols {
if let Some(idx) = input_schema.maybe_index_of_column(col) {
self.indices.push(idx);
}
}
}
pub fn with_exprs<'a>(
self,
schema: &DFSchemaRef,
exprs: impl IntoIterator<Item = &'a Expr>,
) -> Self {
exprs
.into_iter()
.fold(self, |mut acc, expr| {
acc.add_expr(schema, expr);
acc
})
.compact()
}
pub fn append(mut self, indices: &[usize]) -> Self {
self.indices.extend_from_slice(indices);
self.compact()
}
pub fn split_off(self, n: usize) -> (Self, Self) {
let (l, r) = self.partition(|idx| idx < n);
(l, r.map_indices(|idx| idx - n))
}
fn partition<F>(&self, f: F) -> (Self, Self)
where
F: Fn(usize) -> bool,
{
let (l, r): (Vec<usize>, Vec<usize>) =
self.indices.iter().partition(|&&idx| f(idx));
let projection_beneficial = self.projection_beneficial;
(
Self {
indices: l,
projection_beneficial,
},
Self {
indices: r,
projection_beneficial,
},
)
}
fn map_indices<F>(mut self, f: F) -> Self
where
F: Fn(usize) -> usize,
{
self.indices.iter_mut().for_each(|idx| *idx = f(*idx));
self
}
pub fn into_mapped_indices<F>(self, f: F) -> Vec<usize>
where
F: Fn(usize) -> usize,
{
self.map_indices(f).into_inner()
}
pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec<Expr> {
self.indices.iter().map(|&idx| exprs[idx].clone()).collect()
}
pub fn get_required_exprs(&self, input_schema: &DFSchemaRef) -> Vec<Expr> {
self.indices
.iter()
.map(|&idx| Expr::from(Column::from(input_schema.qualified_field(idx))))
.collect()
}
fn compact(mut self) -> Self {
self.indices.sort_unstable();
self.indices.dedup();
self
}
}