datafusion_physical_expr_common/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::metrics::ExpressionEvaluatorMetrics;
21use crate::physical_expr::PhysicalExpr;
22use crate::tree_node::ExprContext;
23
24use arrow::array::{Array, ArrayRef, BooleanArray, MutableArrayData, make_array};
25use arrow::compute::{SlicesIterator, and_kleene, is_not_null};
26use arrow::record_batch::RecordBatch;
27use datafusion_common::Result;
28use datafusion_expr_common::sort_properties::ExprProperties;
29
30/// Represents a [`PhysicalExpr`] node with associated properties (order and
31/// range) in a context where properties are tracked.
32pub type ExprPropertiesNode = ExprContext<ExprProperties>;
33
34impl ExprPropertiesNode {
35    /// Constructs a new `ExprPropertiesNode` with unknown properties for a
36    /// given physical expression. This node initializes with default properties
37    /// and recursively applies this to all child expressions.
38    pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
39        let children = expr
40            .children()
41            .into_iter()
42            .cloned()
43            .map(Self::new_unknown)
44            .collect();
45        Self {
46            expr,
47            data: ExprProperties::new_unknown(),
48            children,
49        }
50    }
51}
52
53/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
54/// are taken, when the mask evaluates `false` values null values are filled.
55///
56/// # Arguments
57/// * `mask` - Boolean values used to determine where to put the `truthy` values
58/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
59pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
60    let truthy = truthy.to_data();
61
62    // update the mask so that any null values become false
63    // (SlicesIterator doesn't respect nulls)
64    let mask = and_kleene(mask, &is_not_null(mask)?)?;
65
66    let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
67
68    // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to
69    // fill with falsy values
70
71    // keep track of how much is filled
72    let mut filled = 0;
73    // keep track of current position we have in truthy array
74    let mut true_pos = 0;
75
76    SlicesIterator::new(&mask).for_each(|(start, end)| {
77        // the gap needs to be filled with nulls
78        if start > filled {
79            mutable.extend_nulls(start - filled);
80        }
81        // fill with truthy values
82        let len = end - start;
83        mutable.extend(0, true_pos, true_pos + len);
84        true_pos += len;
85        filled = end;
86    });
87    // the remaining part is falsy
88    if filled < mask.len() {
89        mutable.extend_nulls(mask.len() - filled);
90    }
91
92    let data = mutable.freeze();
93    Ok(make_array(data))
94}
95
96/// Evaluates expressions against a record batch.
97/// This will convert the resulting ColumnarValues to ArrayRefs,
98/// duplicating any ScalarValues that may have been returned,
99/// and validating that the returned arrays all have the same
100/// number of rows as the input batch.
101#[inline]
102pub fn evaluate_expressions_to_arrays<'a>(
103    exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
104    batch: &RecordBatch,
105) -> Result<Vec<ArrayRef>> {
106    evaluate_expressions_to_arrays_with_metrics(exprs, batch, None)
107}
108
109/// Same as [`evaluate_expressions_to_arrays`] but records optional per-expression metrics.
110///
111/// For metrics tracking, see [`ExpressionEvaluatorMetrics`] for details.
112#[inline]
113pub fn evaluate_expressions_to_arrays_with_metrics<'a>(
114    exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
115    batch: &RecordBatch,
116    metrics: Option<&ExpressionEvaluatorMetrics>,
117) -> Result<Vec<ArrayRef>> {
118    let num_rows = batch.num_rows();
119    exprs
120        .into_iter()
121        .enumerate()
122        .map(|(idx, e)| {
123            let _timer = metrics.and_then(|m| m.scoped_timer(idx));
124            e.evaluate(batch)
125                .and_then(|col| col.into_array_of_size(num_rows))
126        })
127        .collect::<Result<Vec<ArrayRef>>>()
128}
129
130#[cfg(test)]
131mod tests {
132    use std::sync::Arc;
133
134    use arrow::array::Int32Array;
135
136    use datafusion_common::cast::{as_boolean_array, as_int32_array};
137
138    use super::*;
139
140    #[test]
141    fn scatter_int() -> Result<()> {
142        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
143        let mask = BooleanArray::from(vec![true, true, false, false, true]);
144
145        // the output array is expected to be the same length as the mask array
146        let expected =
147            Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
148        let result = scatter(&mask, truthy.as_ref())?;
149        let result = as_int32_array(&result)?;
150
151        assert_eq!(&expected, result);
152        Ok(())
153    }
154
155    #[test]
156    fn scatter_int_end_with_false() -> Result<()> {
157        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
158        let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
159
160        // output should be same length as mask
161        let expected =
162            Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
163        let result = scatter(&mask, truthy.as_ref())?;
164        let result = as_int32_array(&result)?;
165
166        assert_eq!(&expected, result);
167        Ok(())
168    }
169
170    #[test]
171    fn scatter_with_null_mask() -> Result<()> {
172        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
173        let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
174            .into_iter()
175            .collect();
176
177        // output should treat nulls as though they are false
178        let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
179        let result = scatter(&mask, truthy.as_ref())?;
180        let result = as_int32_array(&result)?;
181
182        assert_eq!(&expected, result);
183        Ok(())
184    }
185
186    #[test]
187    fn scatter_boolean() -> Result<()> {
188        let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
189        let mask = BooleanArray::from(vec![true, true, false, false, true]);
190
191        // the output array is expected to be the same length as the mask array
192        let expected = BooleanArray::from_iter(vec![
193            Some(false),
194            Some(false),
195            None,
196            None,
197            Some(false),
198        ]);
199        let result = scatter(&mask, truthy.as_ref())?;
200        let result = as_boolean_array(&result)?;
201
202        assert_eq!(&expected, result);
203        Ok(())
204    }
205}