use std::sync::Arc;
use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
use datafusion_common::{exec_err, DFSchema, Result};
use datafusion_expr::expr::Alias;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::Expr;
use crate::expressions::column::Column;
use crate::expressions::literal::Literal;
use crate::expressions::CastExpr;
use crate::physical_expr::PhysicalExpr;
use crate::sort_expr::PhysicalSortExpr;
use crate::tree_node::ExprContext;
pub type ExprPropertiesNode = ExprContext<ExprProperties>;
impl ExprPropertiesNode {
pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
let children = expr
.children()
.into_iter()
.cloned()
.map(Self::new_unknown)
.collect();
Self {
expr,
data: ExprProperties::new_unknown(),
children,
}
}
}
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
let truthy = truthy.to_data();
let mask = and_kleene(mask, &is_not_null(mask)?)?;
let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
let mut filled = 0;
let mut true_pos = 0;
SlicesIterator::new(&mask).for_each(|(start, end)| {
if start > filled {
mutable.extend_nulls(start - filled);
}
let len = end - start;
mutable.extend(0, true_pos, true_pos + len);
true_pos += len;
filled = end;
});
if filled < mask.len() {
mutable.extend_nulls(mask.len() - filled);
}
let data = mutable.freeze();
Ok(make_array(data))
}
pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
order_bys
.iter()
.map(|e| PhysicalSortExpr::new(e.expr.clone(), !e.options))
.collect()
}
pub fn limited_convert_logical_expr_to_physical_expr_with_dfschema(
expr: &Expr,
dfschema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
match expr {
Expr::Alias(Alias { expr, .. }) => Ok(
limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, dfschema)?,
),
Expr::Column(col) => {
let idx = dfschema.index_of_column(col)?;
Ok(Arc::new(Column::new(&col.name, idx)))
}
Expr::Cast(cast_expr) => Ok(Arc::new(CastExpr::new(
limited_convert_logical_expr_to_physical_expr_with_dfschema(
cast_expr.expr.as_ref(),
dfschema,
)?,
cast_expr.data_type.clone(),
None,
))),
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
_ => exec_err!(
"Unsupported expression: {expr} for conversion to Arc<dyn PhysicalExpr>"
),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::Int32Array;
use datafusion_common::cast::{as_boolean_array, as_int32_array};
use super::*;
#[test]
fn scatter_int() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
let expected =
Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_int_end_with_false() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
let expected =
Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_with_null_mask() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
.into_iter()
.collect();
let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_boolean() -> Result<()> {
let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
let expected = BooleanArray::from_iter(vec![
Some(false),
Some(false),
None,
None,
Some(false),
]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_boolean_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
}