datafusion_physical_expr_common/
utils.rs1use std::sync::Arc;
19
20use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
21use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
22
23use datafusion_common::Result;
24use datafusion_expr_common::sort_properties::ExprProperties;
25
26use crate::physical_expr::PhysicalExpr;
27use crate::sort_expr::{LexOrdering, PhysicalSortExpr};
28use crate::tree_node::ExprContext;
29
30pub type ExprPropertiesNode = ExprContext<ExprProperties>;
33
34impl ExprPropertiesNode {
35 pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
39 let children = expr
40 .children()
41 .into_iter()
42 .cloned()
43 .map(Self::new_unknown)
44 .collect();
45 Self {
46 expr,
47 data: ExprProperties::new_unknown(),
48 children,
49 }
50 }
51}
52
53pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
60 let truthy = truthy.to_data();
61
62 let mask = and_kleene(mask, &is_not_null(mask)?)?;
65
66 let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
67
68 let mut filled = 0;
73 let mut true_pos = 0;
75
76 SlicesIterator::new(&mask).for_each(|(start, end)| {
77 if start > filled {
79 mutable.extend_nulls(start - filled);
80 }
81 let len = end - start;
83 mutable.extend(0, true_pos, true_pos + len);
84 true_pos += len;
85 filled = end;
86 });
87 if filled < mask.len() {
89 mutable.extend_nulls(mask.len() - filled);
90 }
91
92 let data = mutable.freeze();
93 Ok(make_array(data))
94}
95
96pub fn reverse_order_bys(order_bys: &LexOrdering) -> LexOrdering {
100 order_bys
101 .iter()
102 .map(|e| PhysicalSortExpr::new(Arc::clone(&e.expr), !e.options))
103 .collect()
104}
105
106#[cfg(test)]
107mod tests {
108 use std::sync::Arc;
109
110 use arrow::array::Int32Array;
111
112 use datafusion_common::cast::{as_boolean_array, as_int32_array};
113
114 use super::*;
115
116 #[test]
117 fn scatter_int() -> Result<()> {
118 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
119 let mask = BooleanArray::from(vec![true, true, false, false, true]);
120
121 let expected =
123 Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
124 let result = scatter(&mask, truthy.as_ref())?;
125 let result = as_int32_array(&result)?;
126
127 assert_eq!(&expected, result);
128 Ok(())
129 }
130
131 #[test]
132 fn scatter_int_end_with_false() -> Result<()> {
133 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
134 let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
135
136 let expected =
138 Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
139 let result = scatter(&mask, truthy.as_ref())?;
140 let result = as_int32_array(&result)?;
141
142 assert_eq!(&expected, result);
143 Ok(())
144 }
145
146 #[test]
147 fn scatter_with_null_mask() -> Result<()> {
148 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
149 let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
150 .into_iter()
151 .collect();
152
153 let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
155 let result = scatter(&mask, truthy.as_ref())?;
156 let result = as_int32_array(&result)?;
157
158 assert_eq!(&expected, result);
159 Ok(())
160 }
161
162 #[test]
163 fn scatter_boolean() -> Result<()> {
164 let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
165 let mask = BooleanArray::from(vec![true, true, false, false, true]);
166
167 let expected = BooleanArray::from_iter(vec![
169 Some(false),
170 Some(false),
171 None,
172 None,
173 Some(false),
174 ]);
175 let result = scatter(&mask, truthy.as_ref())?;
176 let result = as_boolean_array(&result)?;
177
178 assert_eq!(&expected, result);
179 Ok(())
180 }
181}