use std::fmt::Debug;
use std::sync::Arc;
use crate::expressions::Column;
use crate::intervals::cp_solver::PropagationResult;
use crate::intervals::{cardinality_ratio, ExprIntervalGraph, Interval, IntervalBound};
use crate::utils::collect_columns;
use crate::PhysicalExpr;
use arrow::datatypes::Schema;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_err, ColumnStatistics, DataFusionError, Result, ScalarValue,
};
#[derive(Clone, Debug, PartialEq)]
pub struct AnalysisContext {
pub boundaries: Vec<ExprBoundaries>,
pub selectivity: Option<f64>,
}
impl AnalysisContext {
pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
Self {
boundaries,
selectivity: None,
}
}
pub fn with_selectivity(mut self, selectivity: f64) -> Self {
self.selectivity = Some(selectivity);
self
}
pub fn try_from_statistics(
input_schema: &Schema,
statistics: &[ColumnStatistics],
) -> Result<Self> {
statistics
.iter()
.enumerate()
.map(|(idx, stats)| ExprBoundaries::try_from_column(input_schema, stats, idx))
.collect::<Result<Vec<_>>>()
.map(Self::new)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub column: Column,
pub interval: Interval,
pub distinct_count: Precision<usize>,
}
impl ExprBoundaries {
pub fn try_from_column(
schema: &Schema,
col_stats: &ColumnStatistics,
col_index: usize,
) -> Result<Self> {
let field = &schema.fields()[col_index];
let empty_field = ScalarValue::try_from(field.data_type())?;
let interval = Interval::new(
IntervalBound::new_closed(
col_stats
.min_value
.get_value()
.cloned()
.unwrap_or(empty_field.clone()),
),
IntervalBound::new_closed(
col_stats
.max_value
.get_value()
.cloned()
.unwrap_or(empty_field),
),
);
let column = Column::new(field.name(), col_index);
Ok(ExprBoundaries {
column,
interval,
distinct_count: col_stats.distinct_count.clone(),
})
}
}
pub fn analyze(
expr: &Arc<dyn PhysicalExpr>,
context: AnalysisContext,
) -> Result<AnalysisContext> {
let target_boundaries = context.boundaries;
let mut graph = ExprIntervalGraph::try_new(expr.clone())?;
let columns: Vec<Arc<dyn PhysicalExpr>> = collect_columns(expr)
.into_iter()
.map(|c| Arc::new(c) as Arc<dyn PhysicalExpr>)
.collect();
let target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)> =
graph.gather_node_indices(columns.as_slice());
let mut target_indices_and_boundaries: Vec<(usize, Interval)> =
target_expr_and_indices
.iter()
.filter_map(|(expr, i)| {
target_boundaries.iter().find_map(|bound| {
expr.as_any()
.downcast_ref::<Column>()
.filter(|expr_column| bound.column.eq(*expr_column))
.map(|_| (*i, bound.interval.clone()))
})
})
.collect();
Ok(
match graph.update_ranges(&mut target_indices_and_boundaries)? {
PropagationResult::Success => shrink_boundaries(
expr,
graph,
target_boundaries,
target_expr_and_indices,
)?,
PropagationResult::Infeasible => {
AnalysisContext::new(target_boundaries).with_selectivity(0.0)
}
PropagationResult::CannotPropagate => {
AnalysisContext::new(target_boundaries).with_selectivity(1.0)
}
},
)
}
fn shrink_boundaries(
expr: &Arc<dyn PhysicalExpr>,
mut graph: ExprIntervalGraph,
mut target_boundaries: Vec<ExprBoundaries>,
target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
) -> Result<AnalysisContext> {
let initial_boundaries = target_boundaries.clone();
target_expr_and_indices.iter().for_each(|(expr, i)| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if let Some(bound) = target_boundaries
.iter_mut()
.find(|bound| bound.column.eq(column))
{
bound.interval = graph.get_interval(*i);
};
}
});
let graph_nodes = graph.gather_node_indices(&[expr.clone()]);
let Some((_, root_index)) = graph_nodes.get(0) else {
return internal_err!(
"The ExprIntervalGraph under investigation does not have any nodes."
);
};
let final_result = graph.get_interval(*root_index);
let selectivity = calculate_selectivity(
&final_result.lower.value,
&final_result.upper.value,
&target_boundaries,
&initial_boundaries,
)?;
Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
}
fn calculate_selectivity(
lower_value: &ScalarValue,
upper_value: &ScalarValue,
target_boundaries: &[ExprBoundaries],
initial_boundaries: &[ExprBoundaries],
) -> Result<f64> {
match (lower_value, upper_value) {
(ScalarValue::Boolean(Some(true)), ScalarValue::Boolean(Some(true))) => Ok(1.0),
(ScalarValue::Boolean(Some(false)), ScalarValue::Boolean(Some(false))) => Ok(0.0),
_ => {
target_boundaries.iter().enumerate().try_fold(
1.0,
|acc, (i, ExprBoundaries { interval, .. })| {
let temp =
cardinality_ratio(&initial_boundaries[i].interval, interval)?;
Ok(acc * temp)
},
)
}
}
}