datafusion_physical_expr_common/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::physical_expr::PhysicalExpr;
21use crate::tree_node::ExprContext;
22
23use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
24use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
25use datafusion_common::Result;
26use datafusion_expr_common::sort_properties::ExprProperties;
27
28/// Represents a [`PhysicalExpr`] node with associated properties (order and
29/// range) in a context where properties are tracked.
30pub type ExprPropertiesNode = ExprContext<ExprProperties>;
31
32impl ExprPropertiesNode {
33    /// Constructs a new `ExprPropertiesNode` with unknown properties for a
34    /// given physical expression. This node initializes with default properties
35    /// and recursively applies this to all child expressions.
36    pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
37        let children = expr
38            .children()
39            .into_iter()
40            .cloned()
41            .map(Self::new_unknown)
42            .collect();
43        Self {
44            expr,
45            data: ExprProperties::new_unknown(),
46            children,
47        }
48    }
49}
50
51/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
52/// are taken, when the mask evaluates `false` values null values are filled.
53///
54/// # Arguments
55/// * `mask` - Boolean values used to determine where to put the `truthy` values
56/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
57pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
58    let truthy = truthy.to_data();
59
60    // update the mask so that any null values become false
61    // (SlicesIterator doesn't respect nulls)
62    let mask = and_kleene(mask, &is_not_null(mask)?)?;
63
64    let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
65
66    // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to
67    // fill with falsy values
68
69    // keep track of how much is filled
70    let mut filled = 0;
71    // keep track of current position we have in truthy array
72    let mut true_pos = 0;
73
74    SlicesIterator::new(&mask).for_each(|(start, end)| {
75        // the gap needs to be filled with nulls
76        if start > filled {
77            mutable.extend_nulls(start - filled);
78        }
79        // fill with truthy values
80        let len = end - start;
81        mutable.extend(0, true_pos, true_pos + len);
82        true_pos += len;
83        filled = end;
84    });
85    // the remaining part is falsy
86    if filled < mask.len() {
87        mutable.extend_nulls(mask.len() - filled);
88    }
89
90    let data = mutable.freeze();
91    Ok(make_array(data))
92}
93
94#[cfg(test)]
95mod tests {
96    use std::sync::Arc;
97
98    use arrow::array::Int32Array;
99
100    use datafusion_common::cast::{as_boolean_array, as_int32_array};
101
102    use super::*;
103
104    #[test]
105    fn scatter_int() -> Result<()> {
106        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
107        let mask = BooleanArray::from(vec![true, true, false, false, true]);
108
109        // the output array is expected to be the same length as the mask array
110        let expected =
111            Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
112        let result = scatter(&mask, truthy.as_ref())?;
113        let result = as_int32_array(&result)?;
114
115        assert_eq!(&expected, result);
116        Ok(())
117    }
118
119    #[test]
120    fn scatter_int_end_with_false() -> Result<()> {
121        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
122        let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
123
124        // output should be same length as mask
125        let expected =
126            Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
127        let result = scatter(&mask, truthy.as_ref())?;
128        let result = as_int32_array(&result)?;
129
130        assert_eq!(&expected, result);
131        Ok(())
132    }
133
134    #[test]
135    fn scatter_with_null_mask() -> Result<()> {
136        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
137        let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
138            .into_iter()
139            .collect();
140
141        // output should treat nulls as though they are false
142        let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
143        let result = scatter(&mask, truthy.as_ref())?;
144        let result = as_int32_array(&result)?;
145
146        assert_eq!(&expected, result);
147        Ok(())
148    }
149
150    #[test]
151    fn scatter_boolean() -> Result<()> {
152        let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
153        let mask = BooleanArray::from(vec![true, true, false, false, true]);
154
155        // the output array is expected to be the same length as the mask array
156        let expected = BooleanArray::from_iter(vec![
157            Some(false),
158            Some(false),
159            None,
160            None,
161            Some(false),
162        ]);
163        let result = scatter(&mask, truthy.as_ref())?;
164        let result = as_boolean_array(&result)?;
165
166        assert_eq!(&expected, result);
167        Ok(())
168    }
169}