datafusion_physical_expr_common/
utils.rs1use std::sync::Arc;
19
20use crate::metrics::ExpressionEvaluatorMetrics;
21use crate::physical_expr::PhysicalExpr;
22use crate::tree_node::ExprContext;
23
24use arrow::array::{Array, ArrayRef, BooleanArray, MutableArrayData, make_array};
25use arrow::compute::{SlicesIterator, and_kleene, is_not_null};
26use arrow::record_batch::RecordBatch;
27use datafusion_common::Result;
28use datafusion_expr_common::sort_properties::ExprProperties;
29
30pub type ExprPropertiesNode = ExprContext<ExprProperties>;
33
34impl ExprPropertiesNode {
35 pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
39 let children = expr
40 .children()
41 .into_iter()
42 .cloned()
43 .map(Self::new_unknown)
44 .collect();
45 Self {
46 expr,
47 data: ExprProperties::new_unknown(),
48 children,
49 }
50 }
51}
52
53pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
60 let truthy = truthy.to_data();
61
62 let mask = and_kleene(mask, &is_not_null(mask)?)?;
65
66 let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
67
68 let mut filled = 0;
73 let mut true_pos = 0;
75
76 SlicesIterator::new(&mask).for_each(|(start, end)| {
77 if start > filled {
79 mutable.extend_nulls(start - filled);
80 }
81 let len = end - start;
83 mutable.extend(0, true_pos, true_pos + len);
84 true_pos += len;
85 filled = end;
86 });
87 if filled < mask.len() {
89 mutable.extend_nulls(mask.len() - filled);
90 }
91
92 let data = mutable.freeze();
93 Ok(make_array(data))
94}
95
96#[inline]
102pub fn evaluate_expressions_to_arrays<'a>(
103 exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
104 batch: &RecordBatch,
105) -> Result<Vec<ArrayRef>> {
106 evaluate_expressions_to_arrays_with_metrics(exprs, batch, None)
107}
108
109#[inline]
113pub fn evaluate_expressions_to_arrays_with_metrics<'a>(
114 exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
115 batch: &RecordBatch,
116 metrics: Option<&ExpressionEvaluatorMetrics>,
117) -> Result<Vec<ArrayRef>> {
118 let num_rows = batch.num_rows();
119 exprs
120 .into_iter()
121 .enumerate()
122 .map(|(idx, e)| {
123 let _timer = metrics.and_then(|m| m.scoped_timer(idx));
124 e.evaluate(batch)
125 .and_then(|col| col.into_array_of_size(num_rows))
126 })
127 .collect::<Result<Vec<ArrayRef>>>()
128}
129
130#[cfg(test)]
131mod tests {
132 use std::sync::Arc;
133
134 use arrow::array::Int32Array;
135
136 use datafusion_common::cast::{as_boolean_array, as_int32_array};
137
138 use super::*;
139
140 #[test]
141 fn scatter_int() -> Result<()> {
142 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
143 let mask = BooleanArray::from(vec![true, true, false, false, true]);
144
145 let expected =
147 Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
148 let result = scatter(&mask, truthy.as_ref())?;
149 let result = as_int32_array(&result)?;
150
151 assert_eq!(&expected, result);
152 Ok(())
153 }
154
155 #[test]
156 fn scatter_int_end_with_false() -> Result<()> {
157 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
158 let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
159
160 let expected =
162 Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
163 let result = scatter(&mask, truthy.as_ref())?;
164 let result = as_int32_array(&result)?;
165
166 assert_eq!(&expected, result);
167 Ok(())
168 }
169
170 #[test]
171 fn scatter_with_null_mask() -> Result<()> {
172 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
173 let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
174 .into_iter()
175 .collect();
176
177 let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
179 let result = scatter(&mask, truthy.as_ref())?;
180 let result = as_int32_array(&result)?;
181
182 assert_eq!(&expected, result);
183 Ok(())
184 }
185
186 #[test]
187 fn scatter_boolean() -> Result<()> {
188 let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
189 let mask = BooleanArray::from(vec![true, true, false, false, true]);
190
191 let expected = BooleanArray::from_iter(vec![
193 Some(false),
194 Some(false),
195 None,
196 None,
197 Some(false),
198 ]);
199 let result = scatter(&mask, truthy.as_ref())?;
200 let result = as_boolean_array(&result)?;
201
202 assert_eq!(&expected, result);
203 Ok(())
204 }
205}