1use std::borrow::Cow;
19use std::sync::Arc;
20
21use crate::metrics::ExpressionEvaluatorMetrics;
22use crate::physical_expr::PhysicalExpr;
23use crate::tree_node::ExprContext;
24
25use arrow::array::cast::AsArray;
26use arrow::array::{
27 Array, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder,
28 DictionaryArray, FixedSizeBinaryArray, GenericByteArray, GenericByteViewArray,
29 MutableArrayData, PrimitiveArray, make_array, new_null_array,
30};
31use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer};
32use arrow::compute::{SlicesIterator, prep_null_mask_filter};
33use arrow::datatypes::{
34 ArrowDictionaryKeyType, ArrowNativeType, ArrowPrimitiveType, ByteArrayType,
35 ByteViewType, DataType,
36};
37use arrow::record_batch::RecordBatch;
38use arrow::{downcast_dictionary_array, downcast_primitive_array};
39use datafusion_common::Result;
40use datafusion_expr_common::sort_properties::ExprProperties;
41
42pub type ExprPropertiesNode = ExprContext<ExprProperties>;
45
46impl ExprPropertiesNode {
47 pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
51 let children = expr
52 .children()
53 .into_iter()
54 .cloned()
55 .map(Self::new_unknown)
56 .collect();
57 Self {
58 expr,
59 data: ExprProperties::new_unknown(),
60 children,
61 }
62 }
63}
64
65const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
69
70pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
77 let mask = match mask.null_count() {
78 0 => Cow::Borrowed(mask),
79 n if n == mask.len() => {
80 return Ok(new_null_array(truthy.data_type(), mask.len()));
81 }
82 _ => Cow::Owned(prep_null_mask_filter(mask)),
83 };
84
85 let output_len = mask.len();
86
87 if !mask.has_true() {
89 return Ok(new_null_array(truthy.data_type(), output_len));
90 }
91
92 if mask.null_count() == 0 && !mask.has_false() {
94 return Ok(truthy.slice(0, truthy.len()));
95 }
96
97 let count = mask.true_count();
98 let selectivity = count as f64 / output_len as f64;
99 let mask_buffer = mask.values();
100
101 scatter_array(truthy, mask_buffer, output_len, selectivity)
102}
103
104fn scatter_array(
105 truthy: &dyn Array,
106 mask: &BooleanBuffer,
107 output_len: usize,
108 selectivity: f64,
109) -> Result<ArrayRef> {
110 downcast_primitive_array! {
111 truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len, selectivity))),
112 DataType::Boolean => {
113 Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len, selectivity)))
114 }
115 DataType::Utf8 => {
116 Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask, output_len, selectivity)))
117 }
118 DataType::LargeUtf8 => {
119 Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask, output_len, selectivity)))
120 }
121 DataType::Utf8View => {
122 Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask, output_len, selectivity)))
123 }
124 DataType::Binary => {
125 Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask, output_len, selectivity)))
126 }
127 DataType::LargeBinary => {
128 Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask, output_len, selectivity)))
129 }
130 DataType::BinaryView => {
131 Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask, output_len, selectivity)))
132 }
133 DataType::FixedSizeBinary(_) => {
134 Ok(Arc::new(scatter_fixed_size_binary(
135 truthy.as_fixed_size_binary(), mask, output_len, selectivity,
136 )))
137 }
138 DataType::Dictionary(_, _) => {
139 downcast_dictionary_array! {
140 truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len, selectivity))),
141 _t => scatter_fallback(truthy, mask, output_len)
142 }
143 }
144 _ => scatter_fallback(truthy, mask, output_len)
145 }
146}
147
148#[inline(never)]
149fn scatter_native<T: ArrowNativeType>(
150 src: &[T],
151 mask: &BooleanBuffer,
152 output_len: usize,
153 selectivity: f64,
154) -> ScalarBuffer<T> {
155 let mut output = vec![T::default(); output_len];
156 let mut src_offset = 0;
157
158 if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
159 for (start, end) in mask.set_slices() {
160 let len = end - start;
161 output[start..end].copy_from_slice(&src[src_offset..src_offset + len]);
162 src_offset += len;
163 }
164 } else {
165 for dst_idx in mask.set_indices() {
166 output[dst_idx] = src[src_offset];
167 src_offset += 1;
168 }
169 }
170
171 ScalarBuffer::from(output)
172}
173
174fn scatter_bits(
175 src: &BooleanBuffer,
176 mask: &BooleanBuffer,
177 output_len: usize,
178 selectivity: f64,
179) -> BooleanBuffer {
180 let mut builder = BooleanBufferBuilder::new(output_len);
181 builder.advance(output_len);
182 let mut src_offset = 0;
183
184 if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
185 for (start, end) in mask.set_slices() {
186 for i in start..end {
187 if src.value(src_offset) {
188 builder.set_bit(i, true);
189 }
190 src_offset += 1;
191 }
192 }
193 } else {
194 for dst_idx in mask.set_indices() {
195 if src.value(src_offset) {
196 builder.set_bit(dst_idx, true);
197 }
198 src_offset += 1;
199 }
200 }
201
202 builder.finish()
203}
204
205fn scatter_null_mask(
206 src_nulls: Option<&NullBuffer>,
207 mask: &BooleanBuffer,
208 output_len: usize,
209 selectivity: f64,
210) -> Option<NullBuffer> {
211 let false_count = output_len - mask.count_set_bits();
212 let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
213
214 if src_null_count == 0 {
215 if false_count == 0 {
216 None
217 } else {
218 Some(NullBuffer::new(mask.clone()))
219 }
220 } else {
221 let src_nulls = src_nulls.unwrap();
222 let scattered = scatter_bits(src_nulls.inner(), mask, output_len, selectivity);
223 Some(NullBuffer::new(scattered)).filter(|n| n.null_count() > 0)
224 }
225}
226
227fn scatter_primitive<T: ArrowPrimitiveType>(
228 truthy: &PrimitiveArray<T>,
229 mask: &BooleanBuffer,
230 output_len: usize,
231 selectivity: f64,
232) -> PrimitiveArray<T> {
233 let values = scatter_native(truthy.values(), mask, output_len, selectivity);
234 let nulls = scatter_null_mask(truthy.nulls(), mask, output_len, selectivity);
235
236 PrimitiveArray::new(values, nulls).with_data_type(truthy.data_type().clone())
237}
238
239fn scatter_boolean(
240 truthy: &BooleanArray,
241 mask: &BooleanBuffer,
242 output_len: usize,
243 selectivity: f64,
244) -> BooleanArray {
245 let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
246 let nulls = scatter_null_mask(truthy.nulls(), mask, output_len, selectivity);
247
248 BooleanArray::new(values, nulls)
249}
250
251fn scatter_bytes<T: ByteArrayType>(
252 truthy: &GenericByteArray<T>,
253 mask: &BooleanBuffer,
254 output_len: usize,
255 selectivity: f64,
256) -> GenericByteArray<T> {
257 let src_offsets = truthy.value_offsets();
258 let src_data = truthy.value_data();
259
260 let mut dst_offsets: Vec<T::Offset> = Vec::with_capacity(output_len + 1);
262 let mut cur_offset = T::Offset::default();
263 dst_offsets.push(cur_offset);
264
265 let mut src_idx = 0;
266 for i in 0..output_len {
267 if mask.value(i) {
268 let len =
269 src_offsets[src_idx + 1].as_usize() - src_offsets[src_idx].as_usize();
270 cur_offset += T::Offset::from_usize(len).unwrap();
271 src_idx += 1;
272 }
273 dst_offsets.push(cur_offset);
274 }
275
276 let byte_start = src_offsets[0].as_usize();
277 let byte_end = src_offsets[src_idx].as_usize();
278 let dst_data: Buffer = src_data[byte_start..byte_end].into();
279
280 let nulls = scatter_null_mask(truthy.nulls(), mask, output_len, selectivity);
281
282 let offsets_buffer: Buffer = dst_offsets.into();
283 let data = unsafe {
285 ArrayDataBuilder::new(truthy.data_type().clone())
286 .len(output_len)
287 .add_buffer(offsets_buffer)
288 .add_buffer(dst_data)
289 .nulls(nulls)
290 .build_unchecked()
291 };
292 GenericByteArray::from(data)
293}
294
295fn scatter_byte_view<T: ByteViewType>(
296 truthy: &GenericByteViewArray<T>,
297 mask: &BooleanBuffer,
298 output_len: usize,
299 selectivity: f64,
300) -> GenericByteViewArray<T> {
301 let new_views = scatter_native(truthy.views(), mask, output_len, selectivity);
302 let nulls = scatter_null_mask(truthy.nulls(), mask, output_len, selectivity);
303
304 unsafe {
306 GenericByteViewArray::new_unchecked(
307 new_views,
308 truthy.data_buffers().to_vec(),
309 nulls,
310 )
311 }
312}
313
314fn scatter_fixed_size_binary(
315 truthy: &FixedSizeBinaryArray,
316 mask: &BooleanBuffer,
317 output_len: usize,
318 selectivity: f64,
319) -> FixedSizeBinaryArray {
320 let value_length = truthy.value_length() as usize;
321 let mut output = vec![0u8; output_len * value_length];
322 let mut src_idx = 0;
323
324 if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
325 for (start, end) in mask.set_slices() {
326 for dst_idx in start..end {
327 let src_bytes = truthy.value(src_idx);
328 let dst_start = dst_idx * value_length;
329 output[dst_start..dst_start + value_length].copy_from_slice(src_bytes);
330 src_idx += 1;
331 }
332 }
333 } else {
334 for dst_idx in mask.set_indices() {
335 let src_bytes = truthy.value(src_idx);
336 let dst_start = dst_idx * value_length;
337 output[dst_start..dst_start + value_length].copy_from_slice(src_bytes);
338 src_idx += 1;
339 }
340 }
341 let nulls = scatter_null_mask(truthy.nulls(), mask, output_len, selectivity);
342
343 FixedSizeBinaryArray::new(truthy.value_length(), Buffer::from(output), nulls)
344}
345
346fn scatter_dict<K: ArrowDictionaryKeyType>(
347 truthy: &DictionaryArray<K>,
348 mask: &BooleanBuffer,
349 output_len: usize,
350 selectivity: f64,
351) -> DictionaryArray<K> {
352 let scattered_keys = scatter_primitive(truthy.keys(), mask, output_len, selectivity);
353 DictionaryArray::new(scattered_keys, Arc::clone(truthy.values()))
354}
355
356fn scatter_fallback(
357 truthy: &dyn Array,
358 mask: &BooleanBuffer,
359 output_len: usize,
360) -> Result<ArrayRef> {
361 let truthy_data = truthy.to_data();
362 let mut mutable = MutableArrayData::new(vec![&truthy_data], true, output_len);
363
364 let mut filled = 0;
369 let mut true_pos = 0;
371
372 let mask_array = BooleanArray::new(mask.clone(), None);
373 SlicesIterator::new(&mask_array).for_each(|(start, end)| {
374 if start > filled {
376 mutable.extend_nulls(start - filled);
377 }
378 let len = end - start;
380 mutable.extend(0, true_pos, true_pos + len);
381 true_pos += len;
382 filled = end;
383 });
384
385 if filled < output_len {
387 mutable.extend_nulls(output_len - filled);
388 }
389
390 let data = mutable.freeze();
391 Ok(make_array(data))
392}
393
394#[inline]
400pub fn evaluate_expressions_to_arrays<'a>(
401 exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
402 batch: &RecordBatch,
403) -> Result<Vec<ArrayRef>> {
404 evaluate_expressions_to_arrays_with_metrics(exprs, batch, None)
405}
406
407#[inline]
411pub fn evaluate_expressions_to_arrays_with_metrics<'a>(
412 exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
413 batch: &RecordBatch,
414 metrics: Option<&ExpressionEvaluatorMetrics>,
415) -> Result<Vec<ArrayRef>> {
416 let num_rows = batch.num_rows();
417 exprs
418 .into_iter()
419 .enumerate()
420 .map(|(idx, e)| {
421 let _timer = metrics.and_then(|m| m.scoped_timer(idx));
422 e.evaluate(batch)
423 .and_then(|col| col.into_array_of_size(num_rows))
424 })
425 .collect::<Result<Vec<ArrayRef>>>()
426}
427
428#[cfg(test)]
429mod tests {
430
431 use arrow::array::{Int32Array, StringArray, StringViewArray, as_string_array};
432 use arrow::compute::filter;
433 use datafusion_common::cast::{as_boolean_array, as_int32_array};
434
435 use super::*;
436
437 #[test]
438 fn scatter_int() -> Result<()> {
439 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
440 let mask = BooleanArray::from(vec![true, true, false, false, true]);
441
442 let expected =
444 Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
445 let result = scatter(&mask, truthy.as_ref())?;
446 let result = as_int32_array(&result)?;
447
448 assert_eq!(&expected, result);
449 Ok(())
450 }
451
452 #[test]
453 fn scatter_int_end_with_false() -> Result<()> {
454 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
455 let mask = BooleanArray::from(vec![true, false, true, false, false, false]);
456
457 let expected =
459 Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
460 let result = scatter(&mask, truthy.as_ref())?;
461 let result = as_int32_array(&result)?;
462
463 assert_eq!(&expected, result);
464 Ok(())
465 }
466
467 #[test]
468 fn scatter_with_null_mask() -> Result<()> {
469 let truthy = Arc::new(Int32Array::from(vec![1, 10, 11]));
470 let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None]
471 .into_iter()
472 .collect();
473
474 let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
476 let result = scatter(&mask, truthy.as_ref())?;
477 let result = as_int32_array(&result)?;
478
479 assert_eq!(&expected, result);
480 Ok(())
481 }
482
483 #[test]
484 fn scatter_boolean() -> Result<()> {
485 let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true]));
486 let mask = BooleanArray::from(vec![true, true, false, false, true]);
487
488 let expected = BooleanArray::from_iter(vec![
490 Some(false),
491 Some(false),
492 None,
493 None,
494 Some(false),
495 ]);
496 let result = scatter(&mask, truthy.as_ref())?;
497 let result = as_boolean_array(&result)?;
498
499 assert_eq!(&expected, result);
500 Ok(())
501 }
502
503 #[test]
504 fn scatter_all_true() -> Result<()> {
505 let truthy = Arc::new(Int32Array::from(vec![1, 2, 3]));
506 let mask = BooleanArray::from(vec![true, true, true]);
507
508 let result = scatter(&mask, truthy.as_ref())?;
509 let result = as_int32_array(&result)?;
510 assert_eq!(&Int32Array::from(vec![1, 2, 3]), result);
511 Ok(())
512 }
513
514 #[test]
515 fn scatter_all_false() -> Result<()> {
516 let truthy = Arc::new(Int32Array::from(Vec::<i32>::new()));
517 let mask = BooleanArray::from(vec![false, false, false]);
518
519 let result = scatter(&mask, truthy.as_ref())?;
520 let result = as_int32_array(&result)?;
521 let expected = Int32Array::from(vec![None, None, None]);
522 assert_eq!(&expected, result);
523 Ok(())
524 }
525
526 #[test]
527 fn scatter_empty() -> Result<()> {
528 let truthy = Arc::new(Int32Array::from(Vec::<i32>::new()));
529 let mask = BooleanArray::from(Vec::<bool>::new());
530
531 let result = scatter(&mask, truthy.as_ref())?;
532 assert_eq!(result.len(), 0);
533 Ok(())
534 }
535
536 #[test]
537 fn scatter_primitive_with_source_nulls() -> Result<()> {
538 let truthy = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
539 let mask = BooleanArray::from(vec![true, false, true, true, false]);
540
541 let expected = Int32Array::from_iter(vec![Some(1), None, None, Some(3), None]);
542 let result = scatter(&mask, truthy.as_ref())?;
543 let result = as_int32_array(&result)?;
544
545 assert_eq!(&expected, result);
546 Ok(())
547 }
548
549 #[test]
550 fn scatter_string_test() -> Result<()> {
551 let truthy = Arc::new(StringArray::from(vec!["hello", "world"]));
552 let mask = BooleanArray::from(vec![true, false, false, true]);
553
554 let result = scatter(&mask, truthy.as_ref())?;
555 let result = as_string_array(&result);
556
557 assert_eq!(result.len(), 4);
558 assert!(result.is_valid(0));
559 assert_eq!(result.value(0), "hello");
560 assert!(result.is_null(1));
561 assert!(result.is_null(2));
562 assert!(result.is_valid(3));
563 assert_eq!(result.value(3), "world");
564 Ok(())
565 }
566
567 #[test]
568 fn scatter_string_view_test() -> Result<()> {
569 let truthy = Arc::new(StringViewArray::from(vec![
570 "short",
571 "a longer string that exceeds inline",
572 ]));
573 let mask = BooleanArray::from(vec![false, true, true, false]);
574
575 let result = scatter(&mask, truthy.as_ref())?;
576 let result = result.as_any().downcast_ref::<StringViewArray>().unwrap();
577
578 assert_eq!(result.len(), 4);
579 assert!(result.is_null(0));
580 assert_eq!(result.value(1), "short");
581 assert_eq!(result.value(2), "a longer string that exceeds inline");
582 assert!(result.is_null(3));
583 Ok(())
584 }
585
586 #[test]
587 fn scatter_dictionary_test() -> Result<()> {
588 use arrow::datatypes::Int8Type;
589
590 let values = StringArray::from(vec!["a", "b"]);
591 let truthy = Arc::new(
592 DictionaryArray::<Int8Type>::try_new(
593 arrow::array::Int8Array::from(vec![0, 1, 0]),
594 Arc::new(values),
595 )
596 .unwrap(),
597 );
598 let mask = BooleanArray::from(vec![true, false, true, true, false]);
599
600 let result = scatter(&mask, truthy.as_ref())?;
601 let result = result
602 .as_any()
603 .downcast_ref::<DictionaryArray<Int8Type>>()
604 .unwrap();
605
606 assert_eq!(result.len(), 5);
607 assert!(result.is_valid(0));
608 assert!(result.is_null(1));
609 assert!(result.is_valid(2));
610 assert!(result.is_valid(3));
611 assert!(result.is_null(4));
612 Ok(())
613 }
614
615 #[test]
616 fn scatter_fixed_size_binary_test() -> Result<()> {
617 let truthy = Arc::new(FixedSizeBinaryArray::from(vec![
618 &[1u8, 2][..],
619 &[3, 4][..],
620 &[5, 6][..],
621 ]));
622 let mask = BooleanArray::from(vec![true, false, true, false, true]);
623
624 let result = scatter(&mask, truthy.as_ref())?;
625 let result = result
626 .as_any()
627 .downcast_ref::<FixedSizeBinaryArray>()
628 .unwrap();
629
630 assert_eq!(result.len(), 5);
631 assert!(result.is_valid(0));
632 assert_eq!(result.value(0), &[1, 2]);
633 assert!(result.is_null(1));
634 assert!(result.is_valid(2));
635 assert_eq!(result.value(2), &[3, 4]);
636 assert!(result.is_null(3));
637 assert!(result.is_valid(4));
638 assert_eq!(result.value(4), &[5, 6]);
639 Ok(())
640 }
641
642 #[test]
643 fn scatter_filter_roundtrip() -> Result<()> {
644 let original = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50]));
645 let mask = BooleanArray::from(vec![true, false, true, false, true]);
646
647 let filtered = filter(original.as_ref(), &mask).unwrap();
649 let scattered = scatter(&mask, filtered.as_ref())?;
651 let scattered = as_int32_array(&scattered)?;
652
653 assert_eq!(scattered.len(), 5);
654 assert_eq!(scattered.value(0), 10);
655 assert!(scattered.is_null(1));
656 assert_eq!(scattered.value(2), 30);
657 assert!(scattered.is_null(3));
658 assert_eq!(scattered.value(4), 50);
659 Ok(())
660 }
661}