use std::sync::Arc;
use arrow::array::new_null_array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{Result, ScalarValue, internal_datafusion_err};
use datafusion_expr_common::columnar_value::ColumnarValue;
use crate::PhysicalExpr;
use crate::expressions::{Column, Literal};
#[deprecated(
since = "53.0.0",
note = "This function will be removed in a future release in favor of a private implementation that depends on other implementation details. Please open an issue if you have a use case for keeping it."
)]
pub fn simplify_const_expr(
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
let batch = create_dummy_batch()?;
if expr.is::<Literal>() || (!can_evaluate_as_constant(&expr)) {
return Ok(Transformed::no(expr));
}
match expr.evaluate(batch) {
Ok(ColumnarValue::Scalar(scalar)) => {
Ok(Transformed::yes(Arc::new(Literal::new(scalar))))
}
Ok(ColumnarValue::Array(arr)) if arr.len() == 1 => {
let scalar = ScalarValue::try_from_array(&arr, 0)?;
Ok(Transformed::yes(Arc::new(Literal::new(scalar))))
}
Ok(_) => {
Ok(Transformed::no(expr))
}
Err(_) => {
Ok(Transformed::no(expr))
}
}
}
pub(crate) fn simplify_const_expr_immediate(
expr: Arc<dyn PhysicalExpr>,
batch: &RecordBatch,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if expr.is::<Literal>() {
return Ok(Transformed::no(expr));
}
if expr.is::<Column>() {
return Ok(Transformed::no(expr));
}
if expr.is_volatile_node() {
return Ok(Transformed::no(expr));
}
let children = expr.children();
if children.is_empty() || !children.iter().all(|c| c.is::<Literal>()) {
return Ok(Transformed::no(expr));
}
match expr.evaluate(batch) {
Ok(ColumnarValue::Scalar(scalar)) => {
Ok(Transformed::yes(Arc::new(Literal::new(scalar))))
}
Ok(ColumnarValue::Array(arr)) if arr.len() == 1 => {
let scalar = ScalarValue::try_from_array(&arr, 0)?;
Ok(Transformed::yes(Arc::new(Literal::new(scalar))))
}
Ok(_) => {
Ok(Transformed::no(expr))
}
Err(_) => {
Ok(Transformed::no(expr))
}
}
}
pub(crate) fn create_dummy_batch() -> Result<&'static RecordBatch> {
static DUMMY_BATCH: std::sync::OnceLock<Result<RecordBatch>> =
std::sync::OnceLock::new();
DUMMY_BATCH
.get_or_init(|| {
let dummy_schema =
Arc::new(Schema::new(vec![Field::new("_", DataType::Null, true)]));
let col = new_null_array(&DataType::Null, 1);
Ok(RecordBatch::try_new(dummy_schema, vec![col])?)
})
.as_ref()
.map_err(|e| {
internal_datafusion_err!(
"Failed to create dummy batch for constant expression evaluation: {e}"
)
})
}
fn can_evaluate_as_constant(expr: &Arc<dyn PhysicalExpr>) -> bool {
let mut can_evaluate = true;
expr.apply(|e| {
if e.is::<Column>() || e.is_volatile_node() {
can_evaluate = false;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
})
.expect("apply should not fail");
can_evaluate
}
#[deprecated(
since = "53.0.0",
note = "This function isn't used internally and is trivial to implement, therefore it will be removed in a future release."
)]
pub fn has_column_references(expr: &Arc<dyn PhysicalExpr>) -> bool {
let mut has_columns = false;
expr.apply(|expr| {
if expr.downcast_ref::<Column>().is_some() {
has_columns = true;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
})
.expect("apply should not fail");
has_columns
}