use std::sync::Arc;
use crate::metrics::ExpressionEvaluatorMetrics;
use crate::physical_expr::PhysicalExpr;
use crate::tree_node::ExprContext;
use arrow::array::{Array, ArrayRef, BooleanArray, MutableArrayData, make_array};
use arrow::compute::{SlicesIterator, and_kleene, is_not_null};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr_common::sort_properties::ExprProperties;
pub type ExprPropertiesNode = ExprContext<ExprProperties>;
impl ExprPropertiesNode {
pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
let children = expr
.children()
.into_iter()
.cloned()
.map(Self::new_unknown)
.collect();
Self {
expr,
data: ExprProperties::new_unknown(),
children,
}
}
}
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
let truthy = truthy.to_data();
let mask = and_kleene(mask, &is_not_null(mask)?)?;
let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
let mut filled = 0;
let mut true_pos = 0;
SlicesIterator::new(&mask).for_each(|(start, end)| {
if start > filled {
mutable.extend_nulls(start - filled);
}
let len = end - start;
mutable.extend(0, true_pos, true_pos + len);
true_pos += len;
filled = end;
});
if filled < mask.len() {
mutable.extend_nulls(mask.len() - filled);
}
let data = mutable.freeze();
Ok(make_array(data))
}
#[inline]
pub fn evaluate_expressions_to_arrays<'a>(
exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
evaluate_expressions_to_arrays_with_metrics(exprs, batch, None)
}
#[inline]
pub fn evaluate_expressions_to_arrays_with_metrics<'a>(
exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
batch: &RecordBatch,
metrics: Option<&ExpressionEvaluatorMetrics>,
) -> Result<Vec<ArrayRef>> {
let num_rows = batch.num_rows();
exprs
.into_iter()
.enumerate()
.map(|(idx, e)| {
let _timer = metrics.and_then(|m| m.scoped_timer(idx));
e.evaluate(batch)
.and_then(|col| col.into_array_of_size(num_rows))
})
.collect::<Result<Vec<ArrayRef>>>()
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::Int32Array;
use datafusion_common::cast::{as_boolean_array, as_int32_array};
use super::*;
#[test]
fn scatter_int() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
let expected =
Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_int_end_with_false() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
let expected =
Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_with_null_mask() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
.into_iter()
.collect();
let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_int32_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
#[test]
fn scatter_boolean() -> Result<()> {
let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
let expected = BooleanArray::from_iter(vec![
Some(false),
Some(false),
None,
None,
Some(false),
]);
let result = scatter(&mask, truthy.as_ref())?;
let result = as_boolean_array(&result)?;
assert_eq!(&expected, result);
Ok(())
}
}