datafusion_physical_expr_common/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
21use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
22
23use datafusion_common::Result;
24use datafusion_expr_common::sort_properties::ExprProperties;
25
26use crate::physical_expr::PhysicalExpr;
27use crate::sort_expr::{LexOrdering, PhysicalSortExpr};
28use crate::tree_node::ExprContext;
29
30/// Represents a [`PhysicalExpr`] node with associated properties (order and
31/// range) in a context where properties are tracked.
32pub type ExprPropertiesNode = ExprContext<ExprProperties>;
33
34impl ExprPropertiesNode {
35    /// Constructs a new `ExprPropertiesNode` with unknown properties for a
36    /// given physical expression. This node initializes with default properties
37    /// and recursively applies this to all child expressions.
38    pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
39        let children = expr
40            .children()
41            .into_iter()
42            .cloned()
43            .map(Self::new_unknown)
44            .collect();
45        Self {
46            expr,
47            data: ExprProperties::new_unknown(),
48            children,
49        }
50    }
51}
52
53/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
54/// are taken, when the mask evaluates `false` values null values are filled.
55///
56/// # Arguments
57/// * `mask` - Boolean values used to determine where to put the `truthy` values
58/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
59pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
60    let truthy = truthy.to_data();
61
62    // update the mask so that any null values become false
63    // (SlicesIterator doesn't respect nulls)
64    let mask = and_kleene(mask, &is_not_null(mask)?)?;
65
66    let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
67
68    // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to
69    // fill with falsy values
70
71    // keep track of how much is filled
72    let mut filled = 0;
73    // keep track of current position we have in truthy array
74    let mut true_pos = 0;
75
76    SlicesIterator::new(&mask).for_each(|(start, end)| {
77        // the gap needs to be filled with nulls
78        if start > filled {
79            mutable.extend_nulls(start - filled);
80        }
81        // fill with truthy values
82        let len = end - start;
83        mutable.extend(0, true_pos, true_pos + len);
84        true_pos += len;
85        filled = end;
86    });
87    // the remaining part is falsy
88    if filled < mask.len() {
89        mutable.extend_nulls(mask.len() - filled);
90    }
91
92    let data = mutable.freeze();
93    Ok(make_array(data))
94}
95
96/// Reverses the ORDER BY expression, which is useful during equivalent window
97/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
98/// 'ORDER BY a DESC, NULLS FIRST'.
99pub fn reverse_order_bys(order_bys: &LexOrdering) -> LexOrdering {
100    order_bys
101        .iter()
102        .map(|e| PhysicalSortExpr::new(Arc::clone(&e.expr), !e.options))
103        .collect()
104}
105
106#[cfg(test)]
107mod tests {
108    use std::sync::Arc;
109
110    use arrow::array::Int32Array;
111
112    use datafusion_common::cast::{as_boolean_array, as_int32_array};
113
114    use super::*;
115
116    #[test]
117    fn scatter_int() -> Result<()> {
118        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
119        let mask = BooleanArray::from(vec![true, true, false, false, true]);
120
121        // the output array is expected to be the same length as the mask array
122        let expected =
123            Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
124        let result = scatter(&mask, truthy.as_ref())?;
125        let result = as_int32_array(&result)?;
126
127        assert_eq!(&expected, result);
128        Ok(())
129    }
130
131    #[test]
132    fn scatter_int_end_with_false() -> Result<()> {
133        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
134        let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
135
136        // output should be same length as mask
137        let expected =
138            Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
139        let result = scatter(&mask, truthy.as_ref())?;
140        let result = as_int32_array(&result)?;
141
142        assert_eq!(&expected, result);
143        Ok(())
144    }
145
146    #[test]
147    fn scatter_with_null_mask() -> Result<()> {
148        let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
149        let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
150            .into_iter()
151            .collect();
152
153        // output should treat nulls as though they are false
154        let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
155        let result = scatter(&mask, truthy.as_ref())?;
156        let result = as_int32_array(&result)?;
157
158        assert_eq!(&expected, result);
159        Ok(())
160    }
161
162    #[test]
163    fn scatter_boolean() -> Result<()> {
164        let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
165        let mask = BooleanArray::from(vec![true, true, false, false, true]);
166
167        // the output array is expected to be the same length as the mask array
168        let expected = BooleanArray::from_iter(vec![
169            Some(false),
170            Some(false),
171            None,
172            None,
173            Some(false),
174        ]);
175        let result = scatter(&mask, truthy.as_ref())?;
176        let result = as_boolean_array(&result)?;
177
178        assert_eq!(&expected, result);
179        Ok(())
180    }
181}