datafusion_physical_expr_common/
utils.rs1use std::sync::Arc;
19
20use crate::physical_expr::PhysicalExpr;
21use crate::tree_node::ExprContext;
22
23use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
24use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
25use datafusion_common::Result;
26use datafusion_expr_common::sort_properties::ExprProperties;
27
28pub type ExprPropertiesNode = ExprContext<ExprProperties>;
31
32impl ExprPropertiesNode {
33 pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
37 let children = expr
38 .children()
39 .into_iter()
40 .cloned()
41 .map(Self::new_unknown)
42 .collect();
43 Self {
44 expr,
45 data: ExprProperties::new_unknown(),
46 children,
47 }
48 }
49}
50
51pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
58 let truthy = truthy.to_data();
59
60 let mask = and_kleene(mask, &is_not_null(mask)?)?;
63
64 let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
65
66 let mut filled = 0;
71 let mut true_pos = 0;
73
74 SlicesIterator::new(&mask).for_each(|(start, end)| {
75 if start > filled {
77 mutable.extend_nulls(start - filled);
78 }
79 let len = end - start;
81 mutable.extend(0, true_pos, true_pos + len);
82 true_pos += len;
83 filled = end;
84 });
85 if filled < mask.len() {
87 mutable.extend_nulls(mask.len() - filled);
88 }
89
90 let data = mutable.freeze();
91 Ok(make_array(data))
92}
93
94#[cfg(test)]
95mod tests {
96 use std::sync::Arc;
97
98 use arrow::array::Int32Array;
99
100 use datafusion_common::cast::{as_boolean_array, as_int32_array};
101
102 use super::*;
103
104 #[test]
105 fn scatter_int() -> Result<()> {
106 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
107 let mask = BooleanArray::from(vec![true, true, false, false, true]);
108
109 let expected =
111 Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
112 let result = scatter(&mask, truthy.as_ref())?;
113 let result = as_int32_array(&result)?;
114
115 assert_eq!(&expected, result);
116 Ok(())
117 }
118
119 #[test]
120 fn scatter_int_end_with_false() -> Result<()> {
121 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
122 let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
123
124 let expected =
126 Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
127 let result = scatter(&mask, truthy.as_ref())?;
128 let result = as_int32_array(&result)?;
129
130 assert_eq!(&expected, result);
131 Ok(())
132 }
133
134 #[test]
135 fn scatter_with_null_mask() -> Result<()> {
136 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
137 let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
138 .into_iter()
139 .collect();
140
141 let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
143 let result = scatter(&mask, truthy.as_ref())?;
144 let result = as_int32_array(&result)?;
145
146 assert_eq!(&expected, result);
147 Ok(())
148 }
149
150 #[test]
151 fn scatter_boolean() -> Result<()> {
152 let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
153 let mask = BooleanArray::from(vec![true, true, false, false, true]);
154
155 let expected = BooleanArray::from_iter(vec![
157 Some(false),
158 Some(false),
159 None,
160 None,
161 Some(false),
162 ]);
163 let result = scatter(&mask, truthy.as_ref())?;
164 let result = as_boolean_array(&result)?;
165
166 assert_eq!(&expected, result);
167 Ok(())
168 }
169}