use std::any::Any;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::utils::scatter;
use arrow::array::BooleanArray;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::sort_properties::ExprProperties;
pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
fn as_any(&self) -> &dyn Any;
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
fn evaluate_selection(
&self,
batch: &RecordBatch,
selection: &BooleanArray,
) -> Result<ColumnarValue> {
let tmp_batch = filter_record_batch(batch, selection)?;
let tmp_result = self.evaluate(&tmp_batch)?;
if batch.num_rows() == tmp_batch.num_rows() {
Ok(tmp_result)
} else if let ColumnarValue::Array(a) = tmp_result {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
} else {
Ok(tmp_result)
}
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>>;
fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
not_impl_err!("Not implemented for {self}")
}
fn propagate_constraints(
&self,
_interval: &Interval,
_children: &[&Interval],
) -> Result<Option<Vec<Interval>>> {
Ok(Some(vec![]))
}
fn get_properties(&self, _children: &[ExprProperties]) -> Result<ExprProperties> {
Ok(ExprProperties::new_unknown())
}
}
pub trait DynEq {
fn dyn_eq(&self, other: &dyn Any) -> bool;
}
impl<T: Eq + Any> DynEq for T {
fn dyn_eq(&self, other: &dyn Any) -> bool {
other
.downcast_ref::<Self>()
.map_or(false, |other| other == self)
}
}
impl PartialEq for dyn PhysicalExpr {
fn eq(&self, other: &Self) -> bool {
self.dyn_eq(other.as_any())
}
}
impl Eq for dyn PhysicalExpr {}
pub trait DynHash {
fn dyn_hash(&self, _state: &mut dyn Hasher);
}
impl<T: Hash + Any> DynHash for T {
fn dyn_hash(&self, mut state: &mut dyn Hasher) {
self.type_id().hash(&mut state);
self.hash(&mut state)
}
}
impl Hash for dyn PhysicalExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.dyn_hash(state);
}
}
pub fn with_new_children_if_necessary(
expr: Arc<dyn PhysicalExpr>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
let old_children = expr.children();
if children.len() != old_children.len() {
internal_err!("PhysicalExpr: Wrong number of children")
} else if children.is_empty()
|| children
.iter()
.zip(old_children.iter())
.any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
{
Ok(expr.with_new_children(children)?)
} else {
Ok(expr)
}
}
#[deprecated(since = "44.0.0")]
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
if any.is::<Arc<dyn PhysicalExpr>>() {
any.downcast_ref::<Arc<dyn PhysicalExpr>>()
.unwrap()
.as_any()
} else if any.is::<Box<dyn PhysicalExpr>>() {
any.downcast_ref::<Box<dyn PhysicalExpr>>()
.unwrap()
.as_any()
} else {
any
}
}
pub fn format_physical_expr_list<T>(exprs: T) -> impl Display
where
T: IntoIterator,
T::Item: Display,
T::IntoIter: Clone,
{
struct DisplayWrapper<I>(I)
where
I: Iterator + Clone,
I::Item: Display;
impl<I> Display for DisplayWrapper<I>
where
I: Iterator + Clone,
I::Item: Display,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut iter = self.0.clone();
write!(f, "[")?;
if let Some(expr) = iter.next() {
write!(f, "{}", expr)?;
}
for expr in iter {
write!(f, ", {}", expr)?;
}
write!(f, "]")?;
Ok(())
}
}
DisplayWrapper(exprs.into_iter())
}