datafusion-physical-expr-common 53.1.0

Common functionality of physical expression for DataFusion query engine
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::metrics::ExpressionEvaluatorMetrics;
use crate::physical_expr::PhysicalExpr;
use crate::tree_node::ExprContext;

use arrow::array::{Array, ArrayRef, BooleanArray, MutableArrayData, make_array};
use arrow::compute::{SlicesIterator, and_kleene, is_not_null};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr_common::sort_properties::ExprProperties;

/// Represents a [`PhysicalExpr`] node with associated properties (order and
/// range) in a context where properties are tracked.
pub type ExprPropertiesNode = ExprContext<ExprProperties>;

impl ExprPropertiesNode {
    /// Constructs a new `ExprPropertiesNode` with unknown properties for a
    /// given physical expression. This node initializes with default properties
    /// and recursively applies this to all child expressions.
    pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
        let children = expr
            .children()
            .into_iter()
            .cloned()
            .map(Self::new_unknown)
            .collect();
        Self {
            expr,
            data: ExprProperties::new_unknown(),
            children,
        }
    }
}

/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy` values
/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
    let truthy = truthy.to_data();

    // update the mask so that any null values become false
    // (SlicesIterator doesn't respect nulls)
    let mask = and_kleene(mask, &is_not_null(mask)?)?;

    let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());

    // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to
    // fill with falsy values

    // keep track of how much is filled
    let mut filled = 0;
    // keep track of current position we have in truthy array
    let mut true_pos = 0;

    SlicesIterator::new(&mask).for_each(|(start, end)| {
        // the gap needs to be filled with nulls
        if start > filled {
            mutable.extend_nulls(start - filled);
        }
        // fill with truthy values
        let len = end - start;
        mutable.extend(0, true_pos, true_pos + len);
        true_pos += len;
        filled = end;
    });
    // the remaining part is falsy
    if filled < mask.len() {
        mutable.extend_nulls(mask.len() - filled);
    }

    let data = mutable.freeze();
    Ok(make_array(data))
}

/// Evaluates expressions against a record batch.
/// This will convert the resulting ColumnarValues to ArrayRefs,
/// duplicating any ScalarValues that may have been returned,
/// and validating that the returned arrays all have the same
/// number of rows as the input batch.
#[inline]
pub fn evaluate_expressions_to_arrays<'a>(
    exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
    batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
    evaluate_expressions_to_arrays_with_metrics(exprs, batch, None)
}

/// Same as [`evaluate_expressions_to_arrays`] but records optional per-expression metrics.
///
/// For metrics tracking, see [`ExpressionEvaluatorMetrics`] for details.
#[inline]
pub fn evaluate_expressions_to_arrays_with_metrics<'a>(
    exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
    batch: &RecordBatch,
    metrics: Option<&ExpressionEvaluatorMetrics>,
) -> Result<Vec<ArrayRef>> {
    let num_rows = batch.num_rows();
    exprs
        .into_iter()
        .enumerate()
        .map(|(idx, e)| {
            let _timer = metrics.and_then(|m| m.scoped_timer(idx));
            e.evaluate(batch)
                .and_then(|col| col.into_array_of_size(num_rows))
        })
        .collect::<Result<Vec<ArrayRef>>>()
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use arrow::array::Int32Array;

    use datafusion_common::cast::{as_boolean_array, as_int32_array};

    use super::*;

    #[test]
    fn scatter_int() -> Result<()> {
        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
        let mask = BooleanArray::from(vec![true, true, false, false, true]);

        // the output array is expected to be the same length as the mask array
        let expected =
            Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
        let result = scatter(&mask, truthy.as_ref())?;
        let result = as_int32_array(&result)?;

        assert_eq!(&expected, result);
        Ok(())
    }

    #[test]
    fn scatter_int_end_with_false() -> Result<()> {
        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
        let mask = BooleanArray::from(vec![true, false, true, false, false, false]);

        // output should be same length as mask
        let expected =
            Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
        let result = scatter(&mask, truthy.as_ref())?;
        let result = as_int32_array(&result)?;

        assert_eq!(&expected, result);
        Ok(())
    }

    #[test]
    fn scatter_with_null_mask() -> Result<()> {
        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
        let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
            .into_iter()
            .collect();

        // output should treat nulls as though they are false
        let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
        let result = scatter(&mask, truthy.as_ref())?;
        let result = as_int32_array(&result)?;

        assert_eq!(&expected, result);
        Ok(())
    }

    #[test]
    fn scatter_boolean() -> Result<()> {
        let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
        let mask = BooleanArray::from(vec![true, true, false, false, true]);

        // the output array is expected to be the same length as the mask array
        let expected = BooleanArray::from_iter(vec![
            Some(false),
            Some(false),
            None,
            None,
            Some(false),
        ]);
        let result = scatter(&mask, truthy.as_ref())?;
        let result = as_boolean_array(&result)?;

        assert_eq!(&expected, result);
        Ok(())
    }
}