use std::sync::Arc;
use crate::expressions::{self, Column};
use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use arrow::compute::SortOptions;
use arrow::datatypes::Schema;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result};
use datafusion_common::{DFSchema, HashMap};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, SortExpr};
use itertools::izip;
pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
pub fn add_offset_to_expr(
expr: Arc<dyn PhysicalExpr>,
offset: isize,
) -> Result<Arc<dyn PhysicalExpr>> {
expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
Some(col) => {
let Some(idx) = col.index().checked_add_signed(offset) else {
return plan_err!("Column index overflow");
};
Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
}
None => Ok(Transformed::no(e)),
})
.data()
}
pub fn physical_exprs_contains(
physical_exprs: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
physical_exprs
.iter()
.any(|physical_expr| physical_expr.eq(expr))
}
pub fn physical_exprs_equal(
lhs: &[Arc<dyn PhysicalExpr>],
rhs: &[Arc<dyn PhysicalExpr>],
) -> bool {
lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
}
pub fn physical_exprs_bag_equal(
lhs: &[Arc<dyn PhysicalExpr>],
rhs: &[Arc<dyn PhysicalExpr>],
) -> bool {
let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
for expr in lhs {
*multi_set_lhs.entry(expr).or_insert(0) += 1;
}
for expr in rhs {
*multi_set_rhs.entry(expr).or_insert(0) += 1;
}
multi_set_lhs == multi_set_rhs
}
pub fn create_ordering(
schema: &Schema,
sort_order: &[Vec<SortExpr>],
) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];
for (group_idx, exprs) in sort_order.iter().enumerate() {
let mut sort_exprs = vec![];
for (expr_idx, sort) in exprs.iter().enumerate() {
match &sort.expr {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
let opts = SortOptions::new(!sort.asc, sort.nulls_first);
sort_exprs.push(PhysicalSortExpr::new(expr, opts));
}
Err(_) => break,
},
expr => {
return plan_err!(
"Expected single column reference in sort_order[{}][{}], got {}",
group_idx,
expr_idx,
expr
);
}
}
}
all_sort_orders.extend(LexOrdering::new(sort_exprs));
}
Ok(all_sort_orders)
}
pub fn create_physical_sort_expr(
e: &SortExpr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
let options = SortOptions::new(!e.asc, e.nulls_first);
PhysicalSortExpr::new(expr, options)
})
}
pub fn create_physical_sort_exprs(
exprs: &[SortExpr],
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<Vec<PhysicalSortExpr>> {
exprs
.iter()
.map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
.collect()
}
pub fn add_offset_to_physical_sort_exprs(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
offset: isize,
) -> Result<Vec<PhysicalSortExpr>> {
sort_exprs
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
Ok(sort_expr)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::{BinaryExpr, Column, Literal};
use crate::physical_expr::{
physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
};
use datafusion_physical_expr_common::physical_expr::is_volatile;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::ColumnarValue;
use datafusion_expr::Operator;
use std::any::Any;
use std::fmt;
#[test]
fn test_physical_exprs_contains() {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit4 =
Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::clone(&lit_true),
Arc::clone(&lit_false),
Arc::clone(&lit4),
Arc::clone(&lit2),
Arc::clone(&col_a_expr),
Arc::clone(&col_b_expr),
];
assert!(physical_exprs_contains(&physical_exprs, &lit_true));
assert!(physical_exprs_contains(&physical_exprs, &lit2));
assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
assert!(!physical_exprs_contains(&physical_exprs, &lit1));
}
#[test]
fn test_physical_exprs_equal() {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
assert!(physical_exprs_equal(&vec1, &vec1));
assert!(physical_exprs_equal(&vec1, &vec4));
assert!(physical_exprs_bag_equal(&vec1, &vec1));
assert!(physical_exprs_bag_equal(&vec1, &vec4));
assert!(!physical_exprs_equal(&vec1, &vec2));
assert!(!physical_exprs_equal(&vec1, &vec3));
assert!(!physical_exprs_bag_equal(&vec1, &vec2));
assert!(!physical_exprs_bag_equal(&vec1, &vec3));
}
#[test]
fn test_physical_exprs_set_equal() {
let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
];
let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("a", 0)),
];
assert!(!physical_exprs_bag_equal(
list1.as_slice(),
list2.as_slice()
));
assert!(!physical_exprs_bag_equal(
list2.as_slice(),
list1.as_slice()
));
assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("c", 2)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
];
let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("c", 2)),
Arc::new(Column::new("a", 0)),
];
assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
}
#[test]
fn test_is_volatile_default_behavior() {
let literal =
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
let column = Arc::new(Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
assert!(!literal.is_volatile_node());
assert!(!column.is_volatile_node());
assert!(!is_volatile(&literal));
assert!(!is_volatile(&column));
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MockVolatileExpr {
volatile: bool,
}
impl MockVolatileExpr {
fn new(volatile: bool) -> Self {
Self { volatile }
}
}
impl fmt::Display for MockVolatileExpr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MockVolatile({})", self.volatile)
}
}
impl PhysicalExpr for MockVolatileExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(DataType::Boolean)
}
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
Ok(false)
}
fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(
self.volatile,
))))
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
fn is_volatile_node(&self) -> bool {
self.volatile
}
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "mock_volatile({})", self.volatile)
}
}
#[test]
fn test_nested_expression_volatility() {
let volatile_expr =
Arc::new(MockVolatileExpr::new(true)) as Arc<dyn PhysicalExpr>;
assert!(volatile_expr.is_volatile_node());
assert!(is_volatile(&volatile_expr));
let stable_expr = Arc::new(MockVolatileExpr::new(false)) as Arc<dyn PhysicalExpr>;
assert!(!stable_expr.is_volatile_node());
assert!(!is_volatile(&stable_expr));
let literal =
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
assert!(!literal.is_volatile_node());
assert!(!is_volatile(&literal));
let composite_expr = Arc::new(BinaryExpr::new(
Arc::clone(&volatile_expr),
Operator::And,
Arc::clone(&literal),
)) as Arc<dyn PhysicalExpr>;
assert!(!composite_expr.is_volatile_node()); assert!(is_volatile(&composite_expr));
let stable_composite = Arc::new(BinaryExpr::new(
Arc::clone(&stable_expr),
Operator::And,
Arc::clone(&literal),
)) as Arc<dyn PhysicalExpr>;
assert!(!stable_composite.is_volatile_node());
assert!(!is_volatile(&stable_composite)); }
}