use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
};
use datafusion_expr::ColumnarValue;
use std::cmp::Ordering;
use std::fmt::{Debug, Display};
use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterator};
use std::any::Any;
use std::sync::Arc;
pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
fn as_any(&self) -> &dyn Any;
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
fn evaluate_selection(
&self,
batch: &RecordBatch,
selection: &BooleanArray,
) -> Result<ColumnarValue> {
let tmp_batch = filter_record_batch(batch, selection)?;
let tmp_result = self.evaluate(&tmp_batch)?;
if batch.num_rows() == tmp_batch.num_rows() {
return Ok(tmp_result);
}
if let ColumnarValue::Array(a) = tmp_result {
let result = scatter(selection, a.as_ref())?;
Ok(ColumnarValue::Array(result))
} else {
Ok(tmp_result)
}
}
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>>;
fn analyze(&self, context: AnalysisContext) -> AnalysisContext {
context
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct AnalysisContext {
pub column_boundaries: Vec<Option<ExprBoundaries>>,
pub boundaries: Option<ExprBoundaries>,
}
impl AnalysisContext {
pub fn new(
input_schema: &Schema,
column_boundaries: Vec<Option<ExprBoundaries>>,
) -> Self {
assert_eq!(input_schema.fields().len(), column_boundaries.len());
Self {
column_boundaries,
boundaries: None,
}
}
pub fn from_statistics(input_schema: &Schema, statistics: &Statistics) -> Self {
let column_boundaries = match &statistics.column_statistics {
Some(columns) => columns
.iter()
.map(ExprBoundaries::from_column)
.collect::<Vec<_>>(),
None => vec![None; input_schema.fields().len()],
};
Self::new(input_schema, column_boundaries)
}
pub fn boundaries(&self) -> Option<&ExprBoundaries> {
self.boundaries.as_ref()
}
pub fn with_boundaries(mut self, result: Option<ExprBoundaries>) -> Self {
self.boundaries = result;
self
}
pub fn with_column_update(
mut self,
column: usize,
boundaries: ExprBoundaries,
) -> Self {
self.column_boundaries[column] = Some(boundaries);
self
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub min_value: ScalarValue,
pub max_value: ScalarValue,
pub distinct_count: Option<usize>,
pub selectivity: Option<f64>,
}
impl ExprBoundaries {
pub fn new(
min_value: ScalarValue,
max_value: ScalarValue,
distinct_count: Option<usize>,
) -> Self {
Self::new_with_selectivity(min_value, max_value, distinct_count, None)
}
pub fn new_with_selectivity(
min_value: ScalarValue,
max_value: ScalarValue,
distinct_count: Option<usize>,
selectivity: Option<f64>,
) -> Self {
assert!(!matches!(
min_value.partial_cmp(&max_value),
Some(Ordering::Greater)
));
Self {
min_value,
max_value,
distinct_count,
selectivity,
}
}
pub fn from_column(column: &ColumnStatistics) -> Option<Self> {
Some(Self {
min_value: column.min_value.clone()?,
max_value: column.max_value.clone()?,
distinct_count: column.distinct_count,
selectivity: None,
})
}
pub fn reduce(&self) -> Option<ScalarValue> {
if self.min_value == self.max_value {
Some(self.min_value.clone())
} else {
None
}
}
}
#[allow(clippy::vtable_address_comparisons)]
pub fn with_new_children_if_necessary(
expr: Arc<dyn PhysicalExpr>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
let old_children = expr.children();
if children.len() != old_children.len() {
Err(DataFusionError::Internal(
"PhysicalExpr: Wrong number of children".to_string(),
))
} else if children.is_empty()
|| children
.iter()
.zip(old_children.iter())
.any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
{
expr.with_new_children(children)
} else {
Ok(expr)
}
}
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
if any.is::<Arc<dyn PhysicalExpr>>() {
any.downcast_ref::<Arc<dyn PhysicalExpr>>()
.unwrap()
.as_any()
} else if any.is::<Box<dyn PhysicalExpr>>() {
any.downcast_ref::<Box<dyn PhysicalExpr>>()
.unwrap()
.as_any()
} else {
any
}
}
fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
let truthy = truthy.data();
let mask = and_kleene(mask, &is_not_null(mask)?)?;
let mut mutable = MutableArrayData::new(vec![truthy], true, mask.len());
let mut filled = 0;
let mut true_pos = 0;
SlicesIterator::new(&mask).for_each(|(start, end)| {
if start > filled {
mutable.extend_nulls(start - filled);
}
let len = end - start;
mutable.extend(0, true_pos, true_pos + len);
true_pos += len;
filled = end;
});
if filled < mask.len() {
mutable.extend_nulls(mask.len() - filled);
}
let data = mutable.freeze();
Ok(make_array(data))
}
#[macro_export]
macro_rules! analysis_expect {
($context: ident, $expr: expr) => {
match $expr {
Some(expr) => expr,
None => return $context.with_boundaries(None),
}
};
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use arrow::array::Int32Array;
use datafusion_common::{
cast::{as_boolean_array, as_int32_array},
Result,
};
#[test]
fn scatter_int() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
let expected =
Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_int_end_with_false() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
let expected =
Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_with_null_mask() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
.into_iter()
.collect();
let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_boolean() -> Result<()> {
let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
let expected = BooleanArray::from_iter(vec![
Some(false),
Some(false),
None,
None,
Some(false),
]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_boolean_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn reduce_boundaries() -> Result<()> {
let different_boundaries = ExprBoundaries::new(
ScalarValue::Int32(Some(1)),
ScalarValue::Int32(Some(10)),
None,
);
assert_eq!(different_boundaries.reduce(), None);
let scalar_boundaries = ExprBoundaries::new(
ScalarValue::Int32(Some(1)),
ScalarValue::Int32(Some(1)),
None,
);
assert_eq!(
scalar_boundaries.reduce(),
Some(ScalarValue::Int32(Some(1)))
);
let no_boundaries =
ExprBoundaries::new(ScalarValue::Int32(None), ScalarValue::Int32(None), None);
assert_eq!(no_boundaries.reduce(), Some(ScalarValue::Int32(None)));
Ok(())
}
}